4

I have a very big CSV file (tens of Gigas) containing web logs with the following columns: user_id, time_stamp, category_clicked. I have to build a scorer to identify what categories users like and dislike. Note that I have more than 10 millions users.

I first cut it in chunks and store them in a HDFStore named input.h5 then I use groupby on user_id following Jeff's way.

Here is my data: about 200 millions rows, 10 millions unique user_ids.

user id | timestamp | category_clicked
20140512081646222000004-927168801|20140722|7
20140512081714121000004-383009763|20140727|4
201405011348508050000041009490586|20140728|1
20140512081646222000004-927168801|20140724|1
20140501135024818000004-1623130763|20140728|3

Here is my pandas.show_version():

INSTALLED VERSIONS
------------------
commit: None
python: 2.7.6.final.0
python-bits: 64
OS: Windows
OS-release: 8
machine: AMD64
processor: AMD64 Family 21 Model 2 Stepping 0, AuthenticAMD
byteorder: little
LC_ALL: None
LANG: fr_FR

pandas: 0.13.1
Cython: 0.20.1
numpy: 1.8.1
scipy: 0.13.3
statsmodels: 0.5.0
IPython: 2.0.0
sphinx: 1.2.2
patsy: 0.2.1
scikits.timeseries: None
dateutil: 2.2
pytz: 2013.9
bottleneck: None
tables: 3.1.1
numexpr: 2.3.1
matplotlib: 1.3.1
openpyxl: None
xlrd: 0.9.3
xlwt: 0.7.5
xlsxwriter: None
sqlalchemy: 0.9.4
lxml: None
bs4: None
html5lib: None
bq: None
apiclient: None

Here is what I want as an output:

for each user_id, a list [0.1,0.45,0.89,1.45,5.12,0.,0.,0.45,0.12,2.36,7.8] representing the score of the user for each category and a a global score. I can't tell you more about the score but it needs both ALL the timestamps and the category_clicked to be calculated. You can't sum up later or things like this.

Here is my code:

clean_input_reader = read_csv(work_path + '/input/input.csv', chunksize=500000)
with get_store(work_path+'/input/input.h5') as store:
    for chunk in clean_input_reader:
        store.append('clean_input', chunk,
                     data_columns=['user_id','timestamp','category_clicked'],
                     min_itemsize=15)

    groups = store.select_column('clean_input','user_id').unique()
    for user in groups:
        group_user = store.select('clean_input',where=['user_id==%s' %user])
        <<<<TREATMENT returns a list user_cat_score>>>>
        store.append(user, Series(user_cat_score))

My question is the following: It looks to me that the line: group_user=store.select('clean_input',where=['user_id==%s' %user]) is too heavy in time complexity since I have really a lot of groups, and I am sure there is a lot of redundant sorting in the routine of store.select if I apply it 10 millions times.

To give you an estimation, I take 250 seconds to process 1000 keys with this technique, instead of only 1 second in the case of a usual groupby with full-in-memory CSV file read with read_csv without chunking.

**********UPDATE***********

After applying Jeff's hashing method, I could process 1000 keys in 1 second (same timing as for the full in-memory method), and absolutely reduced the RAM usage. The only time penalty I had not previously is of course the time I take for chunking, saving the 100 hash groups, and getting the real groups from hash ones in the store. But this operation doesn't take more than a few minutes.

8
  • 1
    why are u not just reading in all the data and doing a regular groupby? the technique I showed only makes sense with a relatively low group density sand lots of data. you data in memory is prob only a few gigs (the size of the csv is not the size in memory). you should just try this first. if u can't so that then you should simply preprocessing and output the groupby results via chunking and combine after. Commented Aug 23, 2014 at 10:12
  • My data doesn't fit in RAM, I already tried. What do you mean by "if u can't so that then you should simply preprocessing and output the groupby results via chunking and combine after" ? Commented Aug 23, 2014 at 10:14
  • Please can you give me more detail? I just can't understand what you are thinking to? What I know is that I can't groupby on all my data, and If I groupby chunk by chunk, I dont know how to combine lines with same keys later. Commented Aug 23, 2014 at 13:29
  • show an example of what you are want with a small copy-pastable example of input and output. show pd.show_versions() as well Commented Aug 23, 2014 at 13:46
  • I am really sorry but I can't paste you the treatment because It's not my property. I can only tell you that I have to groupby following the userIds, then give for each category a particular weight which depends on the clikcs on the category and their timestamp, and that I really need to know all the clicks of a user in a given category to calculate the score. That is why I cant calculate partial scores on chunked groupbys and then sum up. I really need to consider the full information avalaible on a given userId for a given category to proceed the treatment. Commented Aug 23, 2014 at 16:28

1 Answer 1

6

Here's a soln for scaling this problem arbitrarily. This is in effect a high-density version of this question here

Define a function to hash a particular group value to a smaller number of groups. I would design this such that it divides your dataset into in-memory manageable pieces.

def sub_group_hash(x):
    # x is a dataframe with the 'user id' field given above
    # return the last 2 characters of the input
    # if these are number like, then you will be sub-grouping into 100 sub-groups
    return x['user id'].str[-2:]

Using the data provided above, this creates a grouped frame on the input data like so:

In [199]: [ (grp, grouped) for grp, grouped in df.groupby(sub_group_hash) ][0][1]
Out[199]: 
                             user id  timestamp  category
0  20140512081646222000004-927168801   20140722         7
3  20140512081646222000004-927168801   20140724         1

with grp as the name of the group, and grouped as resultant frame

# read in the input in a chunked way
clean_input_reader = read_csv('input.csv', chunksize=500000)
with get_store('output.h5') as store:
    for chunk in clean_input_reader:

        # create a grouper for each chunk using the sub_group_hash
        g = chunk.groupby(sub_group_hash)

        # append each of the subgroups to a separate group in the resulting hdf file
        # this will be a loop around the sub_groups (100 max in this case)
        for grp, grouped in g:

            store.append('group_%s' % grp, grouped,
                         data_columns=['user_id','timestamp','category_clicked'],
                         min_itemsize=15)

Now you have a hdf file with 100 sub-groups (potentially less if not all groups were represented), each of which contains all of the data necessary for performing your operation.

with get_store('output.h5') as store:

    # all of the groups are now the keys of the store
    for grp in store.keys():

        # this is a complete group that will fit in memory
        grouped = store.select(grp)

        # perform the operation on grouped and write the new output
        grouped.groupby(......).apply(your_cool_function)

So this will reduce the problem by a factor of 100 in this case. If that is not sufficient, then simply increase the sub_group_hash to make more groups.

You should strive for a smaller number as the HDF5 works better (e.g. don't make 10M sub_groups that defeats the purpose, 100, 1000, even 10k is ok). But I think 100 should prob work for you, unless you have a very wild group density (e.g. you have massive numbers in a single group, while very few in other groups).

Note that this problem then scales easily; you could store the sub_groups in separate files if you want, and/or work on them separately (in parallel) if necessary.

This should make your soln time approx O(number_of_sub_groups).

Sign up to request clarification or add additional context in comments.

6 Comments

Thank you a great deal for your help. Your answer is excellent. Coming from a mathematical background, I really lack these ideas of hashing stuff etc... Feel free to ask me any math question, If I can't answer I will ask friends.
@user3478208 chunk.groupby(sub_groupby_chunk) is correct (though the other way you specified will also work ); this is a grouper function
But you have not introduced the variable sub_group_chunk, and If i try chunk.groupby(sub_group_hash) I get an error of the form: long object have no attribute _____get.item____ because It tries to apply the function to the index.
that was a typo, fixed. you can/should pass a function directly (or more usually an index/level to the grouper), and not directly evaluate it (though practically its the same, but more verbose as you have to pass the object to your function as well). As a followup, can you update your question with the perf of this algo (vs your original method, possibly with a little more color on the data set).
oh, I meant give some characteristics of the dataset, e.g. 10M rows, how much memory does that take up, something like that. You must be doing some pretty heavy calc; 1000 keys should take way less than 1 sec, but YMMV.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.