0

I have a dask dataframe that contains some data after some transformations. I want to write those data back to a mysql table. I have implemented a function that takes a dataframe a db url and writes the dataframe back to database. Because I need some to make some final edits on the data of the dataframe, I use pandas df.to_dict('record') to handle the write.

the function looks like that

def store_partition_to_db(df, db_url):
    from sqlalchemy import create_engine
    from mymodels import DBTableBaseModel

    records_dict = df.to_dict(records)
    records_to_db = []
    for record in records_dict:
        transformed_record = transform_record_some_how # transformed_record is a dictionary
        records_to_db.append(transformed_record)

    engine = create_engine(db_uri)
    engine.execute(DBTableBaseModel.__table__.insert(), records_to_db)

    return records_to_db

In my dask code:

from functools import partial
partial_store_partition_to_db(store_partition_to_db db_url=url)
dask_dataframe = dask_dataframe_data.map_partitions(partial_store_partition_to_db)
all_records = dask_dataframe.compute()

print len([record_dict for record_list in all_records for record_dict in record_list]] # Gives me 7700

But when I go to the respected table in MySQL I get 7702 with the same value on all columns that is 1. When I try to filter all_records with that value, no dictionary is returned. Has anyone met this situation before? How do you handle db writes from paritions with dask?

PS: I use LocalCluster and dask distributed

1

1 Answer 1

1

The problem was that I didn't provide meta information in the map_partition method and because of that it created a ataframe with foo values which I turns were written in the db

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.