Description
Hi Folks,
I'm experimenting with a new shuffle algorithm for Dask dataframe. This is what backs distributed versions of join, set_index, groupby-apply, or anything that requires the large movement of rows around a distributed dataframe.
Things are coming along well, but I'm running into a performance challenge with pandas and I would like to solicit feedback here before diving more deeply. If this isn't the correct venue please let me know and I'll shift this elsewhere.
We've constructed a script (thanks @gjoseph92 for starting this) that creates a random dataframe and a column on which to split, rearrange, serialize/deserialize, and concat a couple of times. This is representative of the operations that we're trying to do, except that in between a couple of steps there the shards/groups end up coming from different machines, rather than being the same shards
import time
import random
import pickle
import numpy as np
import pandas as pd
# Parameters
n_groups = 10_000
n_cols = 1000
n_rows = 30_000
# Make input data
df = pd.DataFrame(np.random.random((n_rows, n_cols)))
df["partitions"] = (df[0] * n_groups).astype(int) # random values 0..10000
start = time.time()
_, groups = zip(*df.groupby("partitions")) # split into many small shards
groups = list(groups)
random.shuffle(groups) # rearrange those shards
groups = [pickle.dumps(group) for group in groups] # Simulate sending across the network
groups = [pickle.loads(group) for group in groups]
df = pd.concat(groups) # reassemble shards
_, groups = zip(*df.groupby("partitions")) # and resplit
stop = time.time()
import dask
print(dask.utils.format_bytes(df.memory_usage().sum() / (stop - start)), "/s")
With 10,000 groups I get around 40 MB/s bandwidth.
With 1,000 groups I get around 230 MB/s bandwidth
With 200 or fewer groups I get around 500 MB/s bandwidth
Obviously, one answer here is "use fewer groups, Pandas isn't designed to operate efficiently with only a few rows". That's fair and we're trying to design for that, but there is always pressure to shrink things down so I would like to explore and see if there is anything we can do on the pandas side to add tolerance here.