Skip to main content
added 2 characters in body; edited tags
Source Link
Peilonrayz
  • 44.5k
  • 7
  • 80
  • 158
import requests
from module import usr, pwd, acct, db, schem, api_token
import snowflake.connector
import datetime
import time
from datetime import datetime
import csv
import os
import contextlib

end_point = 'persons'
limit = 500
start = 0
start_time = time.time()
csvfile = r'C:/Users/User1/PycharmProjects/Pipedrive/persons.csv'
def snowflake_connect():
    mydb = snowflake.connector.connect(
        user=usr,
        password=pwd,
        account=acct,
        database=db,
        schema=schem,
    )
    cursor = mydb.cursor()
    return cursor

def snowflake_truncate(cursor):
    print("Truncating table PERSONS_NEW: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor.execute('TRUNCATE TABLE PERSONS_NEW')
    print("PERSONS_NEW truncated: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    return cursor

def snowflake_insert(cursor):
    cursor.execute("remove @persons_test pattern='.*.csv.gz'")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute('put file://{} @persons_test auto_compress=true'.format(csvfile))
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute("""COPY INTO MARKETING.PIPEDRIVE_MASTER.persons_new FROM @persons_test/persons.csv.gz file_format=(TYPE=csv field_delimiter=',' skip_header=0 FIELD_OPTIONALLY_ENCLOSED_BY = '"') on_error = 'abort_statement'""")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

def get_persons(start):
    url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
    response = requests.request("GET", url).json()
    while (response['additional_data']['pagination']['more_items_in_collection']):
        url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
        response = requests.request("GET", url).json()
        read_persons(response)
        start = start + 500

def read_persons(response):
    for data in response['data']:
        id = data['id']
        activities_count = data['activities_count']
        if data['add_time'] == '':
            add_time = None
        else:
            add_time = data['add_time']
        closed_deals_count = data['closed_deals_count']
        company_id = data['company_id']
        done_activities_count = data['done_activities_count']
        followers_count = data['followers_count']
        label = data['label']
        last_activity_date = data['last_activity_date']
        last_activity_id = data['last_activity_id']
        last_incoming_mail_time = data['last_incoming_mail_time']
        last_name = data['last_name']
        last_outgoing_mail_time = data['last_outgoing_mail_time']
        lost_deals_count = data['lost_deals_count']
        name = data['name']
        next_activity_date = data['next_activity_date']
        next_activity_id = data['next_activity_id']
        next_activity_time = data['next_activity_time']
        notes_count = data['notes_count']
        open_deals_count = data['open_deals_count']
        if data['org_id'] == None:
            org_id = None
        else:
            org_id = data['org_id']['value']
        org_name = data['org_name']
        
        fieldnames = [id, activities_count, add_time, cc_email, closed_deals_count, company_id, done_activities_count, followers_count, label, last_activity_date, last_activity_id, last_incoming_mail_time,
                        last_name, last_outgoing_mail_time, lost_deals_count, name, next_activity_date, next_activity_id, next_activity_time, notes_count, open_deals_count, org_id, org_name]
        write_csv(fieldnames)

def delete_existing_csv():
    with contextlib.suppress(FileNotFoundError):
        os.remove(csvfile)

def write_csv(fieldnames):
    with open(csvfile, "a", encoding="utf-8", newline='') as fp:
        wr = csv.writer(fp, delimiter=',')
        wr.writerow(fieldnames)

if __name__ == "__main__":
    delete_existing_csv()
    print("Creating CSV file: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    get_persons(start)
    print("CSV file succesfully created: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor = snowflake_connect()
    snowflake_truncate(cursor)
    snowflake_insert(cursor)
    cursor.close()
    end_time = time.time()
    elapsed_time = round(end_time - start_time, 2)
    print("Job sucessfully completed in: {} seconds".format(elapsed_time))
```
import requests
from module import usr, pwd, acct, db, schem, api_token
import snowflake.connector
import datetime
import time
from datetime import datetime
import csv
import os
import contextlib

end_point = 'persons'
limit = 500
start = 0
start_time = time.time()
csvfile = r'C:/Users/User1/PycharmProjects/Pipedrive/persons.csv'
def snowflake_connect():
    mydb = snowflake.connector.connect(
        user=usr,
        password=pwd,
        account=acct,
        database=db,
        schema=schem,
    )
    cursor = mydb.cursor()
    return cursor

def snowflake_truncate(cursor):
    print("Truncating table PERSONS_NEW: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor.execute('TRUNCATE TABLE PERSONS_NEW')
    print("PERSONS_NEW truncated: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    return cursor

def snowflake_insert(cursor):
    cursor.execute("remove @persons_test pattern='.*.csv.gz'")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute('put file://{} @persons_test auto_compress=true'.format(csvfile))
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute("""COPY INTO MARKETING.PIPEDRIVE_MASTER.persons_new FROM @persons_test/persons.csv.gz file_format=(TYPE=csv field_delimiter=',' skip_header=0 FIELD_OPTIONALLY_ENCLOSED_BY = '"') on_error = 'abort_statement'""")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

def get_persons(start):
    url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
    response = requests.request("GET", url).json()
    while (response['additional_data']['pagination']['more_items_in_collection']):
        url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
        response = requests.request("GET", url).json()
        read_persons(response)
        start = start + 500

def read_persons(response):
    for data in response['data']:
        id = data['id']
        activities_count = data['activities_count']
        if data['add_time'] == '':
            add_time = None
        else:
            add_time = data['add_time']
        closed_deals_count = data['closed_deals_count']
        company_id = data['company_id']
        done_activities_count = data['done_activities_count']
        followers_count = data['followers_count']
        label = data['label']
        last_activity_date = data['last_activity_date']
        last_activity_id = data['last_activity_id']
        last_incoming_mail_time = data['last_incoming_mail_time']
        last_name = data['last_name']
        last_outgoing_mail_time = data['last_outgoing_mail_time']
        lost_deals_count = data['lost_deals_count']
        name = data['name']
        next_activity_date = data['next_activity_date']
        next_activity_id = data['next_activity_id']
        next_activity_time = data['next_activity_time']
        notes_count = data['notes_count']
        open_deals_count = data['open_deals_count']
        if data['org_id'] == None:
            org_id = None
        else:
            org_id = data['org_id']['value']
        org_name = data['org_name']
        
        fieldnames = [id, activities_count, add_time, cc_email, closed_deals_count, company_id, done_activities_count, followers_count, label, last_activity_date, last_activity_id, last_incoming_mail_time,
                        last_name, last_outgoing_mail_time, lost_deals_count, name, next_activity_date, next_activity_id, next_activity_time, notes_count, open_deals_count, org_id, org_name]
        write_csv(fieldnames)

def delete_existing_csv():
    with contextlib.suppress(FileNotFoundError):
        os.remove(csvfile)

def write_csv(fieldnames):
    with open(csvfile, "a", encoding="utf-8", newline='') as fp:
        wr = csv.writer(fp, delimiter=',')
        wr.writerow(fieldnames)

if __name__ == "__main__":
    delete_existing_csv()
    print("Creating CSV file: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    get_persons(start)
    print("CSV file succesfully created: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor = snowflake_connect()
    snowflake_truncate(cursor)
    snowflake_insert(cursor)
    cursor.close()
    end_time = time.time()
    elapsed_time = round(end_time - start_time, 2)
    print("Job sucessfully completed in: {} seconds".format(elapsed_time))
```
import requests
from module import usr, pwd, acct, db, schem, api_token
import snowflake.connector
import datetime
import time
from datetime import datetime
import csv
import os
import contextlib

end_point = 'persons'
limit = 500
start = 0
start_time = time.time()
csvfile = r'C:/Users/User1/PycharmProjects/Pipedrive/persons.csv'
def snowflake_connect():
    mydb = snowflake.connector.connect(
        user=usr,
        password=pwd,
        account=acct,
        database=db,
        schema=schem,
    )
    cursor = mydb.cursor()
    return cursor

def snowflake_truncate(cursor):
    print("Truncating table PERSONS_NEW: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor.execute('TRUNCATE TABLE PERSONS_NEW')
    print("PERSONS_NEW truncated: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    return cursor

def snowflake_insert(cursor):
    cursor.execute("remove @persons_test pattern='.*.csv.gz'")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute('put file://{} @persons_test auto_compress=true'.format(csvfile))
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute("""COPY INTO MARKETING.PIPEDRIVE_MASTER.persons_new FROM @persons_test/persons.csv.gz file_format=(TYPE=csv field_delimiter=',' skip_header=0 FIELD_OPTIONALLY_ENCLOSED_BY = '"') on_error = 'abort_statement'""")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

def get_persons(start):
    url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
    response = requests.request("GET", url).json()
    while (response['additional_data']['pagination']['more_items_in_collection']):
        url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
        response = requests.request("GET", url).json()
        read_persons(response)
        start = start + 500

def read_persons(response):
    for data in response['data']:
        id = data['id']
        activities_count = data['activities_count']
        if data['add_time'] == '':
            add_time = None
        else:
            add_time = data['add_time']
        closed_deals_count = data['closed_deals_count']
        company_id = data['company_id']
        done_activities_count = data['done_activities_count']
        followers_count = data['followers_count']
        label = data['label']
        last_activity_date = data['last_activity_date']
        last_activity_id = data['last_activity_id']
        last_incoming_mail_time = data['last_incoming_mail_time']
        last_name = data['last_name']
        last_outgoing_mail_time = data['last_outgoing_mail_time']
        lost_deals_count = data['lost_deals_count']
        name = data['name']
        next_activity_date = data['next_activity_date']
        next_activity_id = data['next_activity_id']
        next_activity_time = data['next_activity_time']
        notes_count = data['notes_count']
        open_deals_count = data['open_deals_count']
        if data['org_id'] == None:
            org_id = None
        else:
            org_id = data['org_id']['value']
        org_name = data['org_name']
        
        fieldnames = [id, activities_count, add_time, cc_email, closed_deals_count, company_id, done_activities_count, followers_count, label, last_activity_date, last_activity_id, last_incoming_mail_time,
                        last_name, last_outgoing_mail_time, lost_deals_count, name, next_activity_date, next_activity_id, next_activity_time, notes_count, open_deals_count, org_id, org_name]
        write_csv(fieldnames)

def delete_existing_csv():
    with contextlib.suppress(FileNotFoundError):
        os.remove(csvfile)

def write_csv(fieldnames):
    with open(csvfile, "a", encoding="utf-8", newline='') as fp:
        wr = csv.writer(fp, delimiter=',')
        wr.writerow(fieldnames)

if __name__ == "__main__":
    delete_existing_csv()
    print("Creating CSV file: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    get_persons(start)
    print("CSV file succesfully created: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor = snowflake_connect()
    snowflake_truncate(cursor)
    snowflake_insert(cursor)
    cursor.close()
    end_time = time.time()
    elapsed_time = round(end_time - start_time, 2)
    print("Job sucessfully completed in: {} seconds".format(elapsed_time))
Tweeted twitter.com/StackCodeReview/status/1223350387320614913
added 3 characters in body
Source Link
jmf
  • 121
  • 4

I am a new python programmer. I have created the code to put API data from pipedrive to Snowflake database. Here are the steps in my code.

I am a new python programmer. I have created the code put API data from pipedrive to Snowflake database. Here are the steps in my code.

I am a new python programmer. I have created the code to put API data from pipedrive to Snowflake database. Here are the steps in my code.

Source Link
jmf
  • 121
  • 4

Get data from pipedrive API and insert it into Snowflake

I am a new python programmer. I have created the code put API data from pipedrive to Snowflake database. Here are the steps in my code.

  1. Delete the csv file if it exists.
  2. Make an API call put all the paginated data in a list.
  3. Create a csv file from the list.
  4. Truncate the table in Snowflake.
  5. Remove data from Snowflake stage table
  6. Put the data in Snowflake stage table.
  7. Copy data from stage table to a normal table.

I would love to get some feedback on it as I will create more scripts based on this code.

Here is my code.

import requests
from module import usr, pwd, acct, db, schem, api_token
import snowflake.connector
import datetime
import time
from datetime import datetime
import csv
import os
import contextlib

end_point = 'persons'
limit = 500
start = 0
start_time = time.time()
csvfile = r'C:/Users/User1/PycharmProjects/Pipedrive/persons.csv'
def snowflake_connect():
    mydb = snowflake.connector.connect(
        user=usr,
        password=pwd,
        account=acct,
        database=db,
        schema=schem,
    )
    cursor = mydb.cursor()
    return cursor

def snowflake_truncate(cursor):
    print("Truncating table PERSONS_NEW: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor.execute('TRUNCATE TABLE PERSONS_NEW')
    print("PERSONS_NEW truncated: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    return cursor

def snowflake_insert(cursor):
    cursor.execute("remove @persons_test pattern='.*.csv.gz'")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute('put file://{} @persons_test auto_compress=true'.format(csvfile))
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    cursor.execute("""COPY INTO MARKETING.PIPEDRIVE_MASTER.persons_new FROM @persons_test/persons.csv.gz file_format=(TYPE=csv field_delimiter=',' skip_header=0 FIELD_OPTIONALLY_ENCLOSED_BY = '"') on_error = 'abort_statement'""")
    for c in cursor:
        print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

def get_persons(start):
    url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
    response = requests.request("GET", url).json()
    while (response['additional_data']['pagination']['more_items_in_collection']):
        url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
        response = requests.request("GET", url).json()
        read_persons(response)
        start = start + 500

def read_persons(response):
    for data in response['data']:
        id = data['id']
        activities_count = data['activities_count']
        if data['add_time'] == '':
            add_time = None
        else:
            add_time = data['add_time']
        closed_deals_count = data['closed_deals_count']
        company_id = data['company_id']
        done_activities_count = data['done_activities_count']
        followers_count = data['followers_count']
        label = data['label']
        last_activity_date = data['last_activity_date']
        last_activity_id = data['last_activity_id']
        last_incoming_mail_time = data['last_incoming_mail_time']
        last_name = data['last_name']
        last_outgoing_mail_time = data['last_outgoing_mail_time']
        lost_deals_count = data['lost_deals_count']
        name = data['name']
        next_activity_date = data['next_activity_date']
        next_activity_id = data['next_activity_id']
        next_activity_time = data['next_activity_time']
        notes_count = data['notes_count']
        open_deals_count = data['open_deals_count']
        if data['org_id'] == None:
            org_id = None
        else:
            org_id = data['org_id']['value']
        org_name = data['org_name']
        
        fieldnames = [id, activities_count, add_time, cc_email, closed_deals_count, company_id, done_activities_count, followers_count, label, last_activity_date, last_activity_id, last_incoming_mail_time,
                        last_name, last_outgoing_mail_time, lost_deals_count, name, next_activity_date, next_activity_id, next_activity_time, notes_count, open_deals_count, org_id, org_name]
        write_csv(fieldnames)

def delete_existing_csv():
    with contextlib.suppress(FileNotFoundError):
        os.remove(csvfile)

def write_csv(fieldnames):
    with open(csvfile, "a", encoding="utf-8", newline='') as fp:
        wr = csv.writer(fp, delimiter=',')
        wr.writerow(fieldnames)

if __name__ == "__main__":
    delete_existing_csv()
    print("Creating CSV file: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    get_persons(start)
    print("CSV file succesfully created: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    cursor = snowflake_connect()
    snowflake_truncate(cursor)
    snowflake_insert(cursor)
    cursor.close()
    end_time = time.time()
    elapsed_time = round(end_time - start_time, 2)
    print("Job sucessfully completed in: {} seconds".format(elapsed_time))
```