4

I need to write a spark dataframe to Postgres DB . I have used the following

df.write
.option("numPartitions",partions)
.option("batchsize",batchsize)
.jdbc(url=url, table="table_name", mode=append, properties=properties) 

This works fine however, I want to compare the performance with 'Copy' command

Tried the following

output = io.StringIO() 

 csv_new.write
.format("csv")
.option("header", "true")
.save(path=output)

output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_pivot_table', null="") \\using psycopg2 
con_bb.commit() 

This doesnot seem to work with error 'type' object is not iterable

worked well with Pandas dataframe

output= io.StringIO()
df.to_csv(path_or_buf=output,sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_ts_devicedatacollection_aggregate', null="")  
con_bb.commit()

Any leads on how to implement the Pandas equivalent in Pyspark. P.S: Its performance critical hence converting to spark df to Pandas df is not an option. Any help would be greatly appreciated

3 Answers 3

3

What currently works very well for me (100-200GB of csv fies with around 1.000.000.000 rows) is using psycopg2 together with multiprocessing

Available cores: 200

First I export the spark dataframe in multiple files that are multibles of the available cores

filepath="/base_path/psql_multiprocessing_data"

df.repartition(400) \
    .write \
    .mode("overwrite") \
    .format("csv") \ # even faster using binary format, but ok with csv
    .save(filepath,header='false')

Then I iterate in parallel over all files in the folder via

import glob
import psycopg2   
from multiprocessing import Pool, cpu_count

file_path_list=sorted(glob.glob("/base_path/psql_multiprocessing_data/*.csv"))

def psql_copy_load(fileName):
    con = psycopg2.connect(database="my_db",user="my_user",password="my_password",host="my_host",port="my_port")
    cursor = con.cursor()
    with open(fileName, 'r') as f:
        # next(f)  # in case to skip the header row.
        cursor.copy_from(f, 'my_schema.my_table', sep=",")
    
    con.commit()
    con.close()
    return (fileName)
    

with Pool(cpu_count()) as p:
        p.map(psql_copy_load,file_path_list)

print("parallelism (cores): ",cpu_count())
print("files processed: ",len(file_path_list))

I did not further try to export the data as binary because it got complicated with the correct heades and data types and I was happy with the run time of around 25-30 Minutes (with 6 columns)

Sign up to request clarification or add additional context in comments.

Comments

0

To my knowledge, Spark does not provide a way to use the copy command internally.

If you want to load postgres from hdfs you might be interested in Sqoop. It allows to export a csv stored on hdfs. Moreover, it is able to produce multiple copy statement. In my experiments, adding 4 mappers speeds up the ingesting by factor 2 versus only one mapper. This should be way faster than using the spark jdbc way of doing.

Here are the steps:

  1. df.write.csv("my/hdfs/folder")
  2. sqoop export --connect "jdbc:postgresql://postgres_host/postgres_db" --username --password-file file:///home/$USER/.password --export-dir my_csv_table --table -m 4 --direct --lines-terminated-by '\n' --fields-terminated-by ',' -- --schema

Comments

0

you can try the Postgres extension aws_s3 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.S3Import.html

bucket = "my-bucket"
key = "my/s3/key.csv.gz"

df.write.mode("overwrite").csv(f"s3a://{bucket}/{key}", compression="gzip")
-- in postgres

aws_s3.table_import_from_s3 (
   table_name text, 
   column_list text, 
   options text, 
   s3_info aws_commons._s3_uri_1
) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.