Skip to content

Dask shuffle performance help #43155

Open
@mrocklin

Description

@mrocklin

Hi Folks,

I'm experimenting with a new shuffle algorithm for Dask dataframe. This is what backs distributed versions of join, set_index, groupby-apply, or anything that requires the large movement of rows around a distributed dataframe.

Things are coming along well, but I'm running into a performance challenge with pandas and I would like to solicit feedback here before diving more deeply. If this isn't the correct venue please let me know and I'll shift this elsewhere.

We've constructed a script (thanks @gjoseph92 for starting this) that creates a random dataframe and a column on which to split, rearrange, serialize/deserialize, and concat a couple of times. This is representative of the operations that we're trying to do, except that in between a couple of steps there the shards/groups end up coming from different machines, rather than being the same shards

import time
import random
import pickle

import numpy as np
import pandas as pd
    

# Parameters
n_groups = 10_000
n_cols = 1000
n_rows = 30_000
    
# Make input data
df = pd.DataFrame(np.random.random((n_rows, n_cols)))
df["partitions"] = (df[0] * n_groups).astype(int)  # random values 0..10000

start = time.time()
_, groups = zip(*df.groupby("partitions"))  # split into many small shards

groups = list(groups)
random.shuffle(groups)  # rearrange those shards

groups = [pickle.dumps(group) for group in groups]  # Simulate sending across the network
groups = [pickle.loads(group) for group in groups]

df = pd.concat(groups)  # reassemble shards
_, groups = zip(*df.groupby("partitions"))  # and resplit


stop = time.time()

import dask
print(dask.utils.format_bytes(df.memory_usage().sum() / (stop - start)), "/s")

With 10,000 groups I get around 40 MB/s bandwidth.

With 1,000 groups I get around 230 MB/s bandwidth

With 200 or fewer groups I get around 500 MB/s bandwidth

Obviously, one answer here is "use fewer groups, Pandas isn't designed to operate efficiently with only a few rows". That's fair and we're trying to design for that, but there is always pressure to shrink things down so I would like to explore and see if there is anything we can do on the pandas side to add tolerance here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Compatpandas objects compatability with Numpy or Python functionsEnhancementPerformanceMemory or execution speed performance

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions