Skip to content

POC for New GroupBy Dispatching Module #20485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 5 additions & 25 deletions pandas/core/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from pandas.core.sorting import (get_group_index_sorter, get_group_index,
compress_group_index, get_flattened_iterator,
decons_obs_group_ids, get_indexer_dict)
from pandas.core.util._dispatching import CythonDispatcher
from pandas.util._decorators import (cache_readonly, Substitution,
Appender, make_signature)
from pandas.io.formats.printing import pprint_thing
Expand Down Expand Up @@ -593,6 +594,8 @@ def __init__(self, obj, keys=None, axis=0, level=None,
# we accept no other args
validate_kwargs('group', kwargs, {})

self._dispatcher = CythonDispatcher(self) # circle-ref

def __len__(self):
return len(self.groups)

Expand Down Expand Up @@ -1202,29 +1205,6 @@ class GroupBy(_GroupBy):
"""
_apply_whitelist = _common_apply_whitelist

def _bool_agg(self, val_test, skipna):
"""Shared func to call any / all Cython GroupBy implementations"""

def objs_to_bool(vals):
try:
vals = vals.astype(np.bool)
except ValueError: # for objects
vals = np.array([bool(x) for x in vals])

return vals.view(np.uint8)

def result_to_bool(result):
return result.astype(np.bool, copy=False)

return self._get_cythonized_result('group_any_all', self.grouper,
aggregate=True,
cython_dtype=np.uint8,
needs_values=True,
needs_mask=True,
pre_processing=objs_to_bool,
post_processing=result_to_bool,
val_test=val_test, skipna=skipna)

@Substitution(name='groupby')
@Appender(_doc_template)
def any(self, skipna=True):
Expand All @@ -1236,7 +1216,7 @@ def any(self, skipna=True):
skipna : bool, default True
Flag to ignore nan values during truth testing
"""
return self._bool_agg('any', skipna)
return self._dispatcher.dispatch('any', skipna=skipna)

@Substitution(name='groupby')
@Appender(_doc_template)
Expand All @@ -1248,7 +1228,7 @@ def all(self, skipna=True):
skipna : bool, default True
Flag to ignore nan values during truth testing
"""
return self._bool_agg('all', skipna)
return self._dispatcher.dispatch('all', skipna=skipna)

@Substitution(name='groupby')
@Appender(_doc_template)
Expand Down
186 changes: 186 additions & 0 deletions pandas/core/util/_dispatching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import collections
import inspect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than put this here and trying to make this too generic. let's do:

  • move pandas.core.groupby to pandas.core.groupby.groupby (and add a shim to deprecate pandas.core.groupby, simliar to what we do for pandas.core.categorical), do as a pre-cursor PR, only doing this move
  • put this module in pandas.core.groupby, call it dispatch.py

from functools import partial

import numpy as np

from pandas._libs import groupby as libgroupby
from pandas.core.dtypes.missing import isnull


class CythonDispatcher(object):

def __init__(self, groupby):
self.groupby = groupby
self.func_nm = None
self.obj = None

@property
def func_metadata(self):
"""
Stores the metadata required to dispatch each function.

The format of the dict is as follows:

attr_name : {
'application': {'aggregate', 'transform'}
'cython_nm': ... # Name of the Cython function to call
'extra_kwargs': {...} # Extra kwargs to pass to Cython
'type_blacklist': [...] # Dtypes for which func should raise
'result_type': ... # dtype of result from Cython
'conversion_in': ... # dtype or callable for conversion pre-Cython
'needs_values': ... # Whether the obj values should pass to Cython
'needs_mask': ... # Whether a mask of NA values should be passed
'conversion_out': ... # dtype or callable for conv post-Cython
}
"""
return {
'any': {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought here is that a dict of dicts can most cleanly describe the metadata that each function may have to pass to Cython

'application': 'aggregate',
'cython_nm' : 'group_any_all',
'extra_kwargs': {'val_test': 'any'},
'type_blacklist': [],
'result_type': np.uint8,
'conversion_in': self._any_all_convertor,
'needs_values': True,
'needs_mask': True,
'conversion_out': np.bool
},
'all': {
'application': 'aggregate',
'cython_nm' : 'group_any_all',
'extra_kwargs': {'val_test': 'all'},
'type_blacklist': [],
'result_type': np.uint8,
'conversion_in': self._any_all_convertor,
'needs_values': True,
'needs_mask': True,
'conversion_out': np.bool
}
}

@property
def application_type(self):
return self.func_metadata[self.func_nm]['application']

def _any_all_convertor(self, vals):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hoping we don't have a ton of these, but any custom conversion routines can be defined in this module (maybe not even within this class) and simply reference in the metadata dict

"""
Converts objects to appropriate type for any/all calculations.
"""
try:
vals = vals.astype(np.bool)
except ValueError: # for objects
vals = np.array([bool(x) for x in vals])

return vals.view(np.uint8)

def _validate_types(self):
"""
Validate that the types of the `grp_by` object.

Raises
------
``TypeError`` if the `grp_by` dtypes are not valid for `func_nm`.
"""
if self.obj.values.dtype in self.func_metadata[
self.func_nm]['type_blacklist']:
raise TypeError("'{}' cannot be applied to a dtype of {}".format(
self.func_nm, self.obj.values.dtype))

def _get_result(self, **kwargs):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This func does all the heavy lifting; mostly matches the implementation of _get_cythonized_result currently in groupby.py

"""
Fetch the result from the Cython layer.

Parameters
----------
kwargs
Extra arguments to bind to the `func_nm` Cython signature.

Resolve function name in case of templating use.
"""
# Since this func is called in a loop, the below might be better
# served outside of the loop and passed in?
labels, _, ngroups = self.groupby.grouper.group_info

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above these should be subclasses

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make the things to dispatch just methods. Otherwise this has to have a lot of magic. Its much simpler to just call a method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Struggling to conceptualize this - so do you think it would be best to have a BaseDispatcher and then have the various groupings of applications as the subclasses (so here AnyAllDispatcher)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comments above

first order of business is to move groupby to a sub module
so can easily create more modules

if self.application_type == 'aggregate':
res_sz = ngroups
elif self.application_type == 'transform':
res_sz = len(labels)

res_type = self.func_metadata[self.func_nm].get('result_type',
self.obj.values.dtype)

result = np.zeros(res_sz, dtype=res_type)
base_func = getattr(libgroupby,
self.func_metadata[self.func_nm]['cython_nm'])
func = partial(base_func, result, labels)

if self.func_metadata[self.func_nm].get('needs_values'):
conv_in = self.func_metadata[self.func_nm].get('conversion_in')
vals = self.obj.values
# Below conditional needs refactoring but essentially want
# to differentiate callables from dtypes
if callable(conv_in) and not inspect.isclass(conv_in):
vals = conv_in(self.obj.values)
elif conv_in: # is a type to convert to
vals = self.obj.values.astype(conv, copy=False)
func = partial(func, vals)

if self.func_metadata[self.func_nm].get('needs_values'):
mask = isnull(self.obj.values).view(np.uint8)
func = partial(func, mask)

# Not backwards compatible (py>=3.5 only)
cy_kwargs = {**kwargs, **self.func_metadata[self.func_nm].get(
'extra_kwargs', {})}
func(**cy_kwargs)

conv_out = self.func_metadata[self.func_nm].get('conversion_out')
# Just like before, this needs refactoring
if callable(conv_out) and not inspect.isclass(conv_out):
result = conv_out(result)
elif conv_out:
result = result.astype(conv_out, copy=False)

return result

def _wrap_output(self, output):
"""
Bind and apply the appropriate wrap func from `self.groupby`.
"""
if self.application_type == 'aggregate':
return getattr(self.groupby, '_wrap_aggregated_output')(output)
elif self.application_type == 'transform':
return getattr(self.groupby, '_wrap_transformed_output')(output)

raise ValueError("Unknown application type for {}".format(
self.func_nm))

def dispatch(self, func_nm, **kwargs):
"""
Dispatch the `func_nm` appropriately to the Cython layer.

Will resolve any type and conversion dependencies, as well as apply
any post-Cython conversions required for the given `func_nm`.

Parameters
----------
func_nm : str
Conceptual name of the function to be applied.
kwargs
Extra arguments to bind to the `func_nm` Cython signature.

Returns
-------
ndarray
Result of Cython operation with appropriate conversions applied.
"""
self.func_nm = func_nm

output = collections.OrderedDict()
for name, obj in self.groupby._iterate_slices():
self.obj = obj
self._validate_types()
output[name] = self._get_result(**kwargs)

return self._wrap_output(output)