I am a new python programmer. I have created the code to put API data from pipedrive to Snowflake database. Here are the steps in my code.
- Delete the csv file if it exists.
- Make an API call put all the paginated data in a list.
- Create a csv file from the list.
- Truncate the table in Snowflake.
- Remove data from Snowflake stage table
- Put the data in Snowflake stage table.
- Copy data from stage table to a normal table.
I would love to get some feedback on it as I will create more scripts based on this code.
Here is my code.
import requests
from module import usr, pwd, acct, db, schem, api_token
import snowflake.connector
import datetime
import time
from datetime import datetime
import csv
import os
import contextlib
end_point = 'persons'
limit = 500
start = 0
start_time = time.time()
csvfile = r'C:/Users/User1/PycharmProjects/Pipedrive/persons.csv'
def snowflake_connect():
    mydb = snowflake.connector.connect(
        user=usr,
        password=pwd,
        account=acct,
        database=db,
        schema=schem,
    )
    cursor = mydb.cursor()
    return cursor
def snowflake_truncate(cursor):
    print("Truncating table PERSONS_NEW: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor.execute('TRUNCATE TABLE PERSONS_NEW')
    print("PERSONS_NEW truncated: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    return cursor
def snowflake_insert(cursor):
    cursor.execute("remove @persons_test pattern='.*.csv.gz'")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute('put file://{} @persons_test auto_compress=true'.format(csvfile))
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute("""COPY INTO MARKETING.PIPEDRIVE_MASTER.persons_new FROM @persons_test/persons.csv.gz file_format=(TYPE=csv field_delimiter=',' skip_header=0 FIELD_OPTIONALLY_ENCLOSED_BY = '"') on_error = 'abort_statement'""")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def get_persons(start):
    url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
    response = requests.request("GET", url).json()
    while (response['additional_data']['pagination']['more_items_in_collection']):
        url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
        response = requests.request("GET", url).json()
        read_persons(response)
        start = start + 500
def read_persons(response):
    for data in response['data']:
        id = data['id']
        activities_count = data['activities_count']
        if data['add_time'] == '':
            add_time = None
        else:
            add_time = data['add_time']
        closed_deals_count = data['closed_deals_count']
        company_id = data['company_id']
        done_activities_count = data['done_activities_count']
        followers_count = data['followers_count']
        label = data['label']
        last_activity_date = data['last_activity_date']
        last_activity_id = data['last_activity_id']
        last_incoming_mail_time = data['last_incoming_mail_time']
        last_name = data['last_name']
        last_outgoing_mail_time = data['last_outgoing_mail_time']
        lost_deals_count = data['lost_deals_count']
        name = data['name']
        next_activity_date = data['next_activity_date']
        next_activity_id = data['next_activity_id']
        next_activity_time = data['next_activity_time']
        notes_count = data['notes_count']
        open_deals_count = data['open_deals_count']
        if data['org_id'] == None:
            org_id = None
        else:
            org_id = data['org_id']['value']
        org_name = data['org_name']
        fieldnames = [id, activities_count, add_time, cc_email, closed_deals_count, company_id, done_activities_count, followers_count, label, last_activity_date, last_activity_id, last_incoming_mail_time,
                        last_name, last_outgoing_mail_time, lost_deals_count, name, next_activity_date, next_activity_id, next_activity_time, notes_count, open_deals_count, org_id, org_name]
        write_csv(fieldnames)
def delete_existing_csv():
    with contextlib.suppress(FileNotFoundError):
        os.remove(csvfile)
def write_csv(fieldnames):
    with open(csvfile, "a", encoding="utf-8", newline='') as fp:
        wr = csv.writer(fp, delimiter=',')
        wr.writerow(fieldnames)
if __name__ == "__main__":
    delete_existing_csv()
    print("Creating CSV file: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    get_persons(start)
    print("CSV file succesfully created: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor = snowflake_connect()
    snowflake_truncate(cursor)
    snowflake_insert(cursor)
    cursor.close()
    end_time = time.time()
    elapsed_time = round(end_time - start_time, 2)
    print("Job sucessfully completed in: {} seconds".format(elapsed_time))
