4

I have a process which is reading from 4 databases with 4 tables each. I am consolidating that data into 1 postgres database with 4 tables total. (Each of the original 4 databases have the same 4 tables which need to be consolidated).

The way I am doing it now works using pandas. I read one table from all 4 databases at a time, concatenate the data into one dataframe, then I use to_sql to save it on my postgres database. I then loop through to the remaining databases doing the same thing for the other tables.

My issue is speed. One of my tables has about 1 - 2mil rows per date so it can take about 5,000 - 6,000 seconds to finish writing the data to postgres. It is much quicker to write it to a .csv file and then use COPY FROM in pgadmin.

Here is my current code. Note that there are some function calls but it is just referring to the table names basically. I also have some basic logging being done but that is not too necessary. I am adding a column for the source database which is required though. I am stripping .0 from fields which are actually strings but pandas sees them as a float too, and I fill blank integers with 0 and make sure the columns are really type int.

def query_database(table, table_name, query_date):
    df_list = []
    log_list = []
    for db in ['NJ', 'NJ2', 'LA', 'NA']:
        start_time = time.clock()
        query_timestamp = dt.datetime.now(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S')
        engine_name = '{}{}{}{}'.format(connection_type, server_name, '/', db)
        print('Accessing {} from {}'.format((select_database(db)[0][table]), engine_name))
        engine = create_engine(engine_name)
        df = pd.read_sql_query(query.format(select_database(db)[0][table]), engine, params={query_date})
        query_end = time.clock() - start_time
        df['source_database'] = db
        df['insert_date_utc'] = query_timestamp
        df['row_count'] = df.shape[0]
        df['column_count'] = df.shape[1]
        df['query_time'] = round(query_end, 0)
        df['maximum_id'] = df['Id'].max()
        df['minimum_id'] = df['Id'].min()
        df['source_table'] = table_dict.get(table)
        log = df[['insert_date_utc', 'row_date', 'source_database', 'source_table', 'row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id']].copy()
        df.drop(['row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id', 'source_table'], inplace=True, axis=1)
        df_list.append(df)
        log_list.append(log)
    log = pd.concat(log_list)
    log.drop_duplicates(subset=['row_date', 'source_database', 'source_table'], inplace=True, keep='last')
    result = pd.concat(df_list)
    result.drop_duplicates('Id', inplace=True)
    cols = [i.strip() for i in (create_columns(select_database(db)[0][table]))]
    result = result[cols]
    print('Creating string columns for {}'.format(table_name))
    for col in modify_str_cols(select_database(db)[0][table]):
        create_string(result, col)
    print('Creating integer columns for {}'.format(table_name))
    for col in modify_int_cols(select_database(db)[0][table]):
        create_int(result, col)
    log.to_sql('raw_query_log', cms_dtypes.pg_engine, index=False, if_exists='append', dtype=cms_dtypes.log_dtypes)
    print('Inserting {} data into PostgreSQL'.format(table_name))
    result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table]))

How can I insert a COPY TO and COPY FROM into this to speed it up? Should I just write the .csv files and then loop over those or can I COPY from memory to my postgres?

1 Answer 1

4

psycopg2 offers a number of specific copy related apis. If you want to use csv, you have to use copy_expert (which allows you to specify a fully copy statement).

Normally when I have done this, I have used copy_expert() and a file-like object which iterates through a file on disk. That seems to work reasonably well.

This being said, in your case, I think copy_to and copy_from are better matches because it is simply postgres to postgres transfer here. Note these use PostgreSQL's copy output/input syntax and not csv (if you want to use csv, you have to use copy_expert)

Note before you decide how to do things, you will need to note:

copy_to copies to a file-like object (such as StringIO) and copy_from/copy_expert files from a file-like object. If you want to use a panda data frame you are going to have to think about this a little and either create a file-like object or use csv along with StringIO and copy_expert to generate an in-memory csv and load that.

Sign up to request clarification or add additional context in comments.

6 Comments

To clarify, the databases I am consolidating are from MS SQL Server. Not sure if that makes a difference. If I can avoid writing a file then that would be best. Can the COPY command accept a pandas DataFrame? I am adding a column to the data and specifying the datatypes/filling blanks. Or should I just write a temp file first?
Yes, you need to provide a "file-like object" but that can provide the data from memory.
So I can keep everything the same up until this line: result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table])) That is my final data in memory, ready to be saved to the database. Can you provide me an example code to use to write it to my table? Is it copy_to? How can I make sure my headers are not included? engine.copy_to('table_name', result)
Take a look at the docs for psycopg2 and copy_to/copy_from: initd.org/psycopg/docs/cursor.html
Yes, I did. I am just not clear. None of the examples really show it being used with a pandas dataframe in-memory.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.