Skip to main content
Rollback to Revision 4
Source Link
user673679
  • 12.2k
  • 2
  • 34
  • 65
  1. creating functions.

    creating functions.

  2. Reducing lines of code.

    Reducing lines of code.

  3. Performance tuning using multithreading
  4. Performance tuning using multithreading

     from urllib.request import urlopen
     import requests
     from bs4 import BeautifulSoup
     import json
     from jsonpath_ng import jsonpath, parse
    
    
     def GenerateToken():
         link = "<link>"
         page = urlopen(link)  # Link to generate token
         contents = page.read()
         soup = BeautifulSoup(contents)
         shred = list(soup.stripped_strings)
         token = shred[2]  # gets the token
         return token
    
    
     with open("TestResults.csv", "w") as f_object:
         fields = ["API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
         writer = csv.DictWriter(f_object, fieldnames=fields)
         writer.writeheader()
         headers = {"Authorization": "Bearer {}".format(GenerateToken())}
         with open("postman_collection.json") as f:
             data = json.load(f)
             for res in data["item"]:
    
                 for d in res["request"]:
    
                     if "GET" in res["request"]["method"]:
                         payload = {}
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
    
                         try:
                             response = requests.get(url, headers=headers, data=payload)
    
                             if response.status_code == 200:
                                 rows1 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows1)
                             # writer.writerows(rows1)
                             else:
                                 rows2 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows2)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     elif "POST" in res["request"]["method"]:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows3 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows3)
                             else:
                                 rows4 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows4)
                                 # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     else:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows5 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows5)
                             else:
                                 rows6 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows6)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
     f_object.close()
    
from urllib.request import urlopen
import requests
from bs4 import BeautifulSoup
import json




def generateToken(token_link):
    page = urlopen(token_link)  # Link to generate token
    contents = page.read()
    soup = BeautifulSoup(contents)
    shred = list(soup.stripped_strings)
    token = shred[2]  # gets the token
    headers = {"Authorization": "Bearer {}".format(token)}
    return headers


def readJSON(json_link,headers_input):
    with open(json_link) as f:
        data = json.load(f)
        for res in data["item"]:
             # empty payload for GET
                if "GET" in res["request"]["method"] and (env=="dev" or env=="prd"):
                   payload = {}
                   url = res["request"]["url"]["raw"]
                   method = res["request"]["method"]
                   name=res["name"]
                elif "PUT" in res["request"]["method"] or "POST" in res["request"]["method"] and env=="dev":
                    payload = res["request"]["body"]["raw"]
                    url = res["request"]["url"]["raw"]
                    method = res["request"]["method"]
                    name=res["name"]
                else:
                    print("The environment/schema not supported")

                try:
                    response = requests.request(method, url, headers=headers_input, data=payload)

                    if response.status_code == 200:
                        result = "OK"
                    else:
                        result = response.text

                    rows = [
                                {   
                                    "Name":name,
                                    "API_Endpoint": url,
                                    "Method": method,
                                    "Status_Code": response.status_code,
                                    "Message": result,
                                    "Timestamp": datetime.datetime.utcnow()
                                }
                    ]
                    # print(response.json().get("message"))
                except requests.ConnectionError:
                    print("failed to connect")
 
    return rows

def writeToCSV(input_rows):
    with open("TestResults.csv", "w") as f_object:
        fields = ["Name","API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
        writer = csv.DictWriter(f_object, fieldnames=fields)
        writer.writerows(input_rows)
        
dev_json_link="postman_collection.json"
dev_token_link="<link>"
dev_headers=generateToken(dev_token_link)
dev_rows=readJSON(dev_json_link,dev_headers)
writeToCSV(dev_rows)
  1. creating functions.
  2. Reducing lines of code.
  3. Performance tuning using multithreading
from urllib.request import urlopen
import requests
from bs4 import BeautifulSoup
import json




def generateToken(token_link):
    page = urlopen(token_link)  # Link to generate token
    contents = page.read()
    soup = BeautifulSoup(contents)
    shred = list(soup.stripped_strings)
    token = shred[2]  # gets the token
    headers = {"Authorization": "Bearer {}".format(token)}
    return headers


def readJSON(json_link,headers_input):
    with open(json_link) as f:
        data = json.load(f)
        for res in data["item"]:
             # empty payload for GET
                if "GET" in res["request"]["method"] and (env=="dev" or env=="prd"):
                   payload = {}
                   url = res["request"]["url"]["raw"]
                   method = res["request"]["method"]
                   name=res["name"]
                elif "PUT" in res["request"]["method"] or "POST" in res["request"]["method"] and env=="dev":
                    payload = res["request"]["body"]["raw"]
                    url = res["request"]["url"]["raw"]
                    method = res["request"]["method"]
                    name=res["name"]
                else:
                    print("The environment/schema not supported")

                try:
                    response = requests.request(method, url, headers=headers_input, data=payload)

                    if response.status_code == 200:
                        result = "OK"
                    else:
                        result = response.text

                    rows = [
                                {   
                                    "Name":name,
                                    "API_Endpoint": url,
                                    "Method": method,
                                    "Status_Code": response.status_code,
                                    "Message": result,
                                    "Timestamp": datetime.datetime.utcnow()
                                }
                    ]
                    # print(response.json().get("message"))
                except requests.ConnectionError:
                    print("failed to connect")
 
    return rows

def writeToCSV(input_rows):
    with open("TestResults.csv", "w") as f_object:
        fields = ["Name","API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
        writer = csv.DictWriter(f_object, fieldnames=fields)
        writer.writerows(input_rows)
        
dev_json_link="postman_collection.json"
dev_token_link="<link>"
dev_headers=generateToken(dev_token_link)
dev_rows=readJSON(dev_json_link,dev_headers)
writeToCSV(dev_rows)
  1. creating functions.

  2. Reducing lines of code.

  3. Performance tuning using multithreading

     from urllib.request import urlopen
     import requests
     from bs4 import BeautifulSoup
     import json
     from jsonpath_ng import jsonpath, parse
    
    
     def GenerateToken():
         link = "<link>"
         page = urlopen(link)  # Link to generate token
         contents = page.read()
         soup = BeautifulSoup(contents)
         shred = list(soup.stripped_strings)
         token = shred[2]  # gets the token
         return token
    
    
     with open("TestResults.csv", "w") as f_object:
         fields = ["API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
         writer = csv.DictWriter(f_object, fieldnames=fields)
         writer.writeheader()
         headers = {"Authorization": "Bearer {}".format(GenerateToken())}
         with open("postman_collection.json") as f:
             data = json.load(f)
             for res in data["item"]:
    
                 for d in res["request"]:
    
                     if "GET" in res["request"]["method"]:
                         payload = {}
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
    
                         try:
                             response = requests.get(url, headers=headers, data=payload)
    
                             if response.status_code == 200:
                                 rows1 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows1)
                             # writer.writerows(rows1)
                             else:
                                 rows2 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows2)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     elif "POST" in res["request"]["method"]:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows3 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows3)
                             else:
                                 rows4 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows4)
                                 # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     else:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows5 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows5)
                             else:
                                 rows6 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows6)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
     f_object.close()
    
deleted 4525 characters in body
Source Link
Durga
  • 51
  • 4
  1. creating functions.

    creating functions.
  2. Reducing lines of code.

    Reducing lines of code.
  3. Performance tuning using multithreading

     from urllib.request import urlopen
     import requests
     from bs4 import BeautifulSoup
     import json
     from jsonpath_ng import jsonpath, parse
    
    
     def GenerateToken():
         link = "<link>"
         page = urlopen(link)  # Link to generate token
         contents = page.read()
         soup = BeautifulSoup(contents)
         shred = list(soup.stripped_strings)
         token = shred[2]  # gets the token
         return token
    
    
     with open("TestResults.csv", "w") as f_object:
         fields = ["API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
         writer = csv.DictWriter(f_object, fieldnames=fields)
         writer.writeheader()
         headers = {"Authorization": "Bearer {}".format(GenerateToken())}
         with open("postman_collection.json") as f:
             data = json.load(f)
             for res in data["item"]:
    
                 for d in res["request"]:
    
                     if "GET" in res["request"]["method"]:
                         payload = {}
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
    
                         try:
                             response = requests.get(url, headers=headers, data=payload)
    
                             if response.status_code == 200:
                                 rows1 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows1)
                             # writer.writerows(rows1)
                             else:
                                 rows2 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows2)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     elif "POST" in res["request"]["method"]:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows3 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows3)
                             else:
                                 rows4 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows4)
                                 # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     else:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows5 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows5)
                             else:
                                 rows6 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows6)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
     f_object.close()
    
  4. Performance tuning using multithreading
from urllib.request import urlopen
import requests
from bs4 import BeautifulSoup
import json




def generateToken(token_link):
    page = urlopen(token_link)  # Link to generate token
    contents = page.read()
    soup = BeautifulSoup(contents)
    shred = list(soup.stripped_strings)
    token = shred[2]  # gets the token
    headers = {"Authorization": "Bearer {}".format(token)}
    return headers


def readJSON(json_link,headers_input):
    with open(json_link) as f:
        data = json.load(f)
        for res in data["item"]:
             # empty payload for GET
                if "GET" in res["request"]["method"] and (env=="dev" or env=="prd"):
                   payload = {}
                   url = res["request"]["url"]["raw"]
                   method = res["request"]["method"]
                   name=res["name"]
                elif "PUT" in res["request"]["method"] or "POST" in res["request"]["method"] and env=="dev":
                    payload = res["request"]["body"]["raw"]
                    url = res["request"]["url"]["raw"]
                    method = res["request"]["method"]
                    name=res["name"]
                else:
                    print("The environment/schema not supported")

                try:
                    response = requests.request(method, url, headers=headers_input, data=payload)

                    if response.status_code == 200:
                        result = "OK"
                    else:
                        result = response.text

                    rows = [
                                {   
                                    "Name":name,
                                    "API_Endpoint": url,
                                    "Method": method,
                                    "Status_Code": response.status_code,
                                    "Message": result,
                                    "Timestamp": datetime.datetime.utcnow()
                                }
                    ]
                    # print(response.json().get("message"))
                except requests.ConnectionError:
                    print("failed to connect")
 
    return rows

def writeToCSV(input_rows):
    with open("TestResults.csv", "w") as f_object:
        fields = ["Name","API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
        writer = csv.DictWriter(f_object, fieldnames=fields)
        writer.writerows(input_rows)
        
dev_json_link="postman_collection.json"
dev_token_link="<link>"
dev_headers=generateToken(dev_token_link)
dev_rows=readJSON(dev_json_link,dev_headers)
writeToCSV(dev_rows)
  1. creating functions.

  2. Reducing lines of code.

  3. Performance tuning using multithreading

     from urllib.request import urlopen
     import requests
     from bs4 import BeautifulSoup
     import json
     from jsonpath_ng import jsonpath, parse
    
    
     def GenerateToken():
         link = "<link>"
         page = urlopen(link)  # Link to generate token
         contents = page.read()
         soup = BeautifulSoup(contents)
         shred = list(soup.stripped_strings)
         token = shred[2]  # gets the token
         return token
    
    
     with open("TestResults.csv", "w") as f_object:
         fields = ["API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
         writer = csv.DictWriter(f_object, fieldnames=fields)
         writer.writeheader()
         headers = {"Authorization": "Bearer {}".format(GenerateToken())}
         with open("postman_collection.json") as f:
             data = json.load(f)
             for res in data["item"]:
    
                 for d in res["request"]:
    
                     if "GET" in res["request"]["method"]:
                         payload = {}
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
    
                         try:
                             response = requests.get(url, headers=headers, data=payload)
    
                             if response.status_code == 200:
                                 rows1 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows1)
                             # writer.writerows(rows1)
                             else:
                                 rows2 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows2)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     elif "POST" in res["request"]["method"]:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows3 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows3)
                             else:
                                 rows4 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows4)
                                 # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     else:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows5 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows5)
                             else:
                                 rows6 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows6)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
     f_object.close()
    
  1. creating functions.
  2. Reducing lines of code.
  3. Performance tuning using multithreading
from urllib.request import urlopen
import requests
from bs4 import BeautifulSoup
import json




def generateToken(token_link):
    page = urlopen(token_link)  # Link to generate token
    contents = page.read()
    soup = BeautifulSoup(contents)
    shred = list(soup.stripped_strings)
    token = shred[2]  # gets the token
    headers = {"Authorization": "Bearer {}".format(token)}
    return headers


def readJSON(json_link,headers_input):
    with open(json_link) as f:
        data = json.load(f)
        for res in data["item"]:
             # empty payload for GET
                if "GET" in res["request"]["method"] and (env=="dev" or env=="prd"):
                   payload = {}
                   url = res["request"]["url"]["raw"]
                   method = res["request"]["method"]
                   name=res["name"]
                elif "PUT" in res["request"]["method"] or "POST" in res["request"]["method"] and env=="dev":
                    payload = res["request"]["body"]["raw"]
                    url = res["request"]["url"]["raw"]
                    method = res["request"]["method"]
                    name=res["name"]
                else:
                    print("The environment/schema not supported")

                try:
                    response = requests.request(method, url, headers=headers_input, data=payload)

                    if response.status_code == 200:
                        result = "OK"
                    else:
                        result = response.text

                    rows = [
                                {   
                                    "Name":name,
                                    "API_Endpoint": url,
                                    "Method": method,
                                    "Status_Code": response.status_code,
                                    "Message": result,
                                    "Timestamp": datetime.datetime.utcnow()
                                }
                    ]
                    # print(response.json().get("message"))
                except requests.ConnectionError:
                    print("failed to connect")
 
    return rows

def writeToCSV(input_rows):
    with open("TestResults.csv", "w") as f_object:
        fields = ["Name","API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
        writer = csv.DictWriter(f_object, fieldnames=fields)
        writer.writerows(input_rows)
        
dev_json_link="postman_collection.json"
dev_token_link="<link>"
dev_headers=generateToken(dev_token_link)
dev_rows=readJSON(dev_json_link,dev_headers)
writeToCSV(dev_rows)
Rollback to Revision 2
Source Link
pacmaninbw
  • 26.1k
  • 13
  • 47
  • 114

Python API framework to be testedtesting

  1. creating functions.

  2. Reducing lines of code.

  3. Performance tuning using multithreading

     from urllib.request import urlopen
     import requests
     from bs4 import BeautifulSoup
     import json
     from jsonpath_ng import jsonpath, parse
    
    
     def GenerateToken():
         link = "<link>"
         page = urlopen(link)  # Link to generate token
         contents = page.read()
         soup = BeautifulSoup(contents)
         shred = list(soup.stripped_strings)
         token = shred[2]  # gets the token
         return token
    
    
     with open("TestResults.csv", "w") as f_object:
         fields = ["API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
         writer = csv.DictWriter(f_object, fieldnames=fields)
         writer.writeheader()
         headers = {"Authorization": "Bearer {}".format(GenerateToken())}
         with open("postman_collection.json") as f:
             data = json.load(f)
             for res in data["item"]:
    
                 for d in res["request"]:
    
                     if "GET" in res["request"]["method"]:
                         payload = {}
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
    
                         try:
                             response = requests.get(url, headers=headers, data=payload)
    
                             if response.status_code == 200:
                                 rows1 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows1)
                             # writer.writerows(rows1)
                             else:
                                 rows2 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows2)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     elif "POST" in res["request"]["method"]:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows3 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows3)
                             else:
                                 rows4 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows4)
                                 # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     else:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows5 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows5)
                             else:
                                 rows6 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows6)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
     f_object.close()
    

Python API framework to be tested

  1. creating functions.

  2. Reducing lines of code.

  3. Performance tuning using multithreading

     from urllib.request import urlopen
     import requests
     from bs4 import BeautifulSoup
     import json
     from jsonpath_ng import jsonpath, parse
    
    
     def GenerateToken():
         link = "<link>"
         page = urlopen(link)  # Link to generate token
         contents = page.read()
         soup = BeautifulSoup(contents)
         shred = list(soup.stripped_strings)
         token = shred[2]  # gets the token
         return token
    
    
     with open("TestResults.csv", "w") as f_object:
         fields = ["API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
         writer = csv.DictWriter(f_object, fieldnames=fields)
         writer.writeheader()
         headers = {"Authorization": "Bearer {}".format(GenerateToken())}
         with open("postman_collection.json") as f:
             data = json.load(f)
    

Python API testing

  1. creating functions.

  2. Reducing lines of code.

  3. Performance tuning using multithreading

     from urllib.request import urlopen
     import requests
     from bs4 import BeautifulSoup
     import json
     from jsonpath_ng import jsonpath, parse
    
    
     def GenerateToken():
         link = "<link>"
         page = urlopen(link)  # Link to generate token
         contents = page.read()
         soup = BeautifulSoup(contents)
         shred = list(soup.stripped_strings)
         token = shred[2]  # gets the token
         return token
    
    
     with open("TestResults.csv", "w") as f_object:
         fields = ["API_Endpoint", "Method", "Status_Code", "Message", "Timestamp"]
         writer = csv.DictWriter(f_object, fieldnames=fields)
         writer.writeheader()
         headers = {"Authorization": "Bearer {}".format(GenerateToken())}
         with open("postman_collection.json") as f:
             data = json.load(f)
             for res in data["item"]:
    
                 for d in res["request"]:
    
                     if "GET" in res["request"]["method"]:
                         payload = {}
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
    
                         try:
                             response = requests.get(url, headers=headers, data=payload)
    
                             if response.status_code == 200:
                                 rows1 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows1)
                             # writer.writerows(rows1)
                             else:
                                 rows2 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows2)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     elif "POST" in res["request"]["method"]:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows3 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows3)
                             else:
                                 rows4 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows4)
                                 # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
    
                     else:
                         url = res["request"]["url"]["raw"]
                         method = res["request"]["method"]
                         payload = res["request"]["body"]["raw"]
                         try:
                             # response = requests.request("GET",url,headers=headers, data=payload)
                             response = requests.request(
                                 "POST", url, headers=headers, data=payload
                             )
                             # print(response.json())
                             if response.status_code == 200:
                                 rows5 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": "OK",
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows5)
                             else:
                                 rows6 = [
                                     {
                                         "API_Endpoint": url,
                                         "Method": method,
                                         "Status_Code": response.status_code,
                                         "Message": response.text,
                                         "Timestamp": datetime.datetime.utcnow(),
                                     }
                                 ]
                                 writer.writerows(rows6)
                         # print(response.json().get("message"))
                         except requests.ConnectionError:
                             print("failed to connect")
     f_object.close()
    
deleted 6299 characters in body
Source Link
Durga
  • 51
  • 4
Loading
edited title
Link
Durga
  • 51
  • 4
Loading
Source Link
Durga
  • 51
  • 4
Loading