How to Work with BIG Datasets on 16G RAM (+Dask)

本文为转载,原文链接:

How to Work with BIG Datasets on 16G RAM (+Dask) | Kaggle

本文有精简。

TIP 1 - Deleting unused variables and gc.collect()

if you used up a dataframe (or other variable), get in the habit of deleting it.

import gc
#delete when no longer needed
del temp
#collect residual garbage
gc.collect()

TIP 2 - Presetting the datatypes

If you import data from CSV, python will do it's best to guess the datatypes. So if you know in advance that your numbers are integers, and don't get bigger than certain values, set the datatypes at minimum requirements before importing.

dtypes = {
        'ip'            : 'uint32',
        'app'           : 'uint16',
        'device'        : 'uint16',
        'os'            : 'uint16',
        'channel'       : 'uint16',
        'is_attributed' : 'uint8',
        }

train = pd.read_csv('../input/train_sample.csv', dtype=dtypes)

#check datatypes:
train.info()

TIP 3 - Importing selected rows of the a file (including generating your own subsamples)

a) Select number of rows to import(the first nrows rows)

you can use parameter nrows to specify number of rows to import  (including the heading).:

train = pd.read_csv('../input/train.csv', nrows=10000, dtype=dtypes)

b) Simple row skip (with or without headings)

You can also specify number of rows to skip (skiprows) , if you, for example want 1 million rows after the first 5 million.

This however will ignore the first line with headers. Instead you can pass in range of rows to skip, that will not include the first row (indexed [0]).

#plain skipping looses heading info.  It's OK for files that don't have headings, 
#or dataframes you'll be linking together, or where you make your own custom headings...
train = pd.read_csv('../input/train.csv', skiprows=5000000, nrows=1000000, header = None, dtype=dtypes)


#but if you want to import the headings from the original file
#skip first 5mil rows, but use the first row for heading:
train = pd.read_csv('../input/train.csv', skiprows=range(1, 5000000), nrows=1000000, dtype=dtypes)

c) Picking which rows to skip (Make a list of what you DON'T want)

This is how you can do your own random sampling

Since 'skiprows' can take in a list of rows you want to skip, you can make a list of random rows you want to input. I.e. you can sample your data anyway you like!

Recall how many rows the train set in TalkingData has:

Number of lines in "train.csv" is: 184903891

Let's say you want to pull a random sample of 1 million lines out of the total dataset. That means that you want a list of lines - 1 - 1000000 random numbers ranging from 1 to 184903891.

Note: generating such long list also takes a lot of space and some time. Be patient and make sure to use del and gc.collect() when done!

#generate list of lines to skip

skiplines = np.random.choice(np.arange(1, lines), size=lines-1-1000000, replace=False)



#sort the list

skiplines=np.sort(skiplines)

#check our list

print('lines to skip:', len(skiplines))

print('remaining lines in sample:', lines-len(skiplines), '(remember that it includes the heading!)')



###################SANITY CHECK###################

#find lines that weren't skipped by checking difference between each consecutive line

#how many out of first 100000 will be imported into the csv?

diff = skiplines[1:100000]-skiplines[2:100001]

remain = sum(diff!=-1)

print('Ratio of lines from first 100000 lines:',  '{0:.5f}'.format(remain/100000) )

print('Ratio imported from all lines:', '{0:.5f}'.format((lines-len(skiplines))/lines) )

lines to skip: 183903890

remaining lines in sample: 1000001 (remember that it includes the heading!)

Ratio of lines from first 100000 lines: 0.00560

Ratio imported from all lines: 0.00541

Now let's import the randomly selected 1million rows

train = pd.read_csv('../input/train.csv', skiprows=skiplines, dtype=dtypes)

del skiplines

gc.collect()

In my previous notebook (TalkingData EDA plus time patterns | Kaggle) we found that the data is organized by click time. Therefore if our random sampling went according to plan, the resulting set should roughly span the full time period and mimic the click pattern. 结果集应大致跨越整个时间段并模仿点击模式。

Now you can analyze your own subsample and run models on it.

TIP 4 - Importing in batches and processing each individually

below I import one million rows, extract only rows that have 'is_attributed'==1 (i.e. app was downloaded) and then merge these results into common dataframe for further inspection.

#set up an empty dataframe

df_converted = pd.DataFrame()



#we are going to work with chunks of size 1 million rows

chunksize = 10 ** 6



#in each chunk, filter for values that have 'is_attributed'==1, and merge these values into one dataframe

for chunk in pd.read_csv('../input/train.csv', chunksize=chunksize, dtype=dtypes):

    filtered = (chunk[(np.where(chunk['is_attributed']==1, True, False))])

    df_converted = pd.concat([df_converted, filtered], ignore_index=True, )

Using analogous method you can explore specific ips, apps, devices, etc combinations, devices, etc...

TIP 5 - Importing just selected columns

If you want to analyze just some specific feature, you can import just the selected columns.

For example, lets say we want to analyze clicks by ips.

#wanted columns

columns = ['ip', 'click_time', 'is_attributed']

dtypes = {

        'ip'            : 'uint32',

        'is_attributed' : 'uint8',

        }



ips_df = pd.read_csv('../input/train.csv', usecols=columns, dtype=dtypes)

TIP 6 - Creative data processing

The kernel cannot handle groupby on the whole dataframe. But it can do it in sections. For example:

#processing part of the table is not a problem

ips_df[0:100][['ip', 'is_attributed']].groupby('ip', as_index=False).count()[:10]

So you can calculate counts in batches, merge them and sum up to total counts.

(Takes a bit of time, but works)

size=100000

all_rows = len(ips_df)

num_parts = all_rows//size



#generate the first batch

ip_counts = ips_df[0:size][['ip', 'is_attributed']].groupby('ip', as_index=False).count()



#add remaining batches

for p in range(1,num_parts):

    start = p*size

    end = p*size + size

    if end < all_rows:

        group = ips_df[start:end][['ip', 'is_attributed']].groupby('ip', as_index=False).count()

    else:

        group = ips_df[start:][['ip', 'is_attributed']].groupby('ip', as_index=False).count()

    ip_counts = ip_counts.merge(group, on='ip', how='outer')

    ip_counts.columns = ['ip', 'count1','count2']

    ip_counts['counts'] = np.nansum((ip_counts['count1'], ip_counts['count2']), axis = 0)

    ip_counts.drop(columns=['count1', 'count2'], axis = 0, inplace=True)

后面还有一堆,中心思想就是分batch计算后面再想办法组合起来

TIP 7 - Using Dask

import dask
import dask.dataframe as dd

DASK

What is it?

  • A python library for parallel computing that can work on a single notebook or large cluster.

What does it do?

  • it parallizes NumPy and Pandas
  • makes it possible to work on larger-than-memory datasets
  • in case of single machine uses its own task scheduler to get the most out of your machine (kaggle kernels are multicore, so there is definetly room for improvement)
  • BASICALLY IT WILL MAKE SOME COMPUTATIONS FIT RAM, AND WILL DO IT FASTER

Its limitations?

  • it's still relativelty early in development
  • it doesn't have all of Panda's options/functions/features. Only the most common ones.
  • many operations that require setting the index are still computationally expensive

Trivially parallelizable operations (fast):

  • Elementwise operations: df.x + df.y, df * df
  • Row-wise selections: df[df.x > 0]
  • Loc: df.loc[4.0:10.5]
  • Common aggregations: df.x.max(), df.max()
  • Is in: df[df.x.isin([1, 2, 3])]
  • Datetime/string accessors: df.timestamp.month

Cleverly parallelizable operations(fast):

  • groupby-aggregate (with common aggregations): df.groupby(df.x).y.max(), df.groupby('x').max()
  • groupby-apply on index: df.groupby(['idx', 'x']).apply(myfunc), where idx is the index level name
  • value_counts: df.x.value_counts()
  • Drop duplicates: df.x.drop_duplicates()
  • Join on index: dd.merge(df1, df2, left_index=True, right_index=True) or dd.merge(df1, df2, on=['idx', 'x']) where idx is the index name for both df1 and df2
  • Join with Pandas DataFrames: dd.merge(df1, df2, on='id')
  • Elementwise operations with different partitions / divisions:df1.x + df2.y
  • Datetime resampling: df.resample(...)
  • Rolling averages: df.rolling(...)
  • Pearson Correlations: df[['col1', 'col2']].corr()

Notes/observations:

  • To actually get results of many of the above functions you have to add .compute() at the end. eg, for value_counts would be: df.x.value_counts().compute(). This hikes up RAM use a lot. I believe it's because .compute() gets the data into pandas format, with all the accompanying overhead. (Please correct me if wrong).

  • I've been playing with dask for the past little while here on Kaggle Kernels, and while they can load full data and do some nice filtering, many actual operations do hike up RAM to extreme and even crush the system. For example, after loading 'train' dataframe, just getting len(train) hiked RAM up to 9GB. So be careful... Use a lot of gc.collect() and other techniques for making data smaller. So far I find dask most useful for filtering (selecting rows with specified features).