From f0d0165072f6e76421f60c672b75071b677e85f0 Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Thu, 4 Aug 2016 10:27:26 -0400 Subject: [PATCH] ENH: Faster merge_asof() performs a single pass when joining tables (#13902) Uses Tempita for different specializations of "by" and "on" types. These parameters can only take a single label now. --- asv_bench/benchmarks/join_merge.py | 23 +- doc/source/whatsnew/v0.19.0.txt | 2 +- pandas/src/join.pyx | 60 +---- pandas/src/joins_func_helper.pxi | 373 ++++++++++++++++++++++++++ pandas/src/joins_func_helper.pxi.in | 160 +++++++++++ pandas/tools/merge.py | 176 +++++++----- pandas/tools/tests/test_merge_asof.py | 94 +++++++ setup.py | 2 +- 8 files changed, 755 insertions(+), 135 deletions(-) create mode 100644 pandas/src/joins_func_helper.pxi create mode 100644 pandas/src/joins_func_helper.pxi.in diff --git a/asv_bench/benchmarks/join_merge.py b/asv_bench/benchmarks/join_merge.py index 86d5f84cb9b36..c98179c8950c5 100644 --- a/asv_bench/benchmarks/join_merge.py +++ b/asv_bench/benchmarks/join_merge.py @@ -310,7 +310,7 @@ def time_merge_asof_noby(self): merge_asof(self.df1, self.df2, on='time') -class merge_asof_by(object): +class merge_asof_by_object(object): def setup(self): import string @@ -326,7 +326,26 @@ def setup(self): self.df1 = self.df1.sort_values('time') self.df2 = self.df2.sort_values('time') - def time_merge_asof_by(self): + def time_merge_asof_by_object(self): + merge_asof(self.df1, self.df2, on='time', by='key') + + +class merge_asof_by_int(object): + + def setup(self): + np.random.seed(0) + one_count = 200000 + two_count = 1000000 + self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), + 'key': np.random.randint(0, 25, one_count), + 'value1': np.random.randn(one_count)}) + self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), + 'key': np.random.randint(0, 25, two_count), + 'value2': np.random.randn(two_count)}) + self.df1 = self.df1.sort_values('time') + self.df2 = self.df2.sort_values('time') + + def time_merge_asof_by_int(self): merge_asof(self.df1, self.df2, on='time', by='key') diff --git a/doc/source/whatsnew/v0.19.0.txt b/doc/source/whatsnew/v0.19.0.txt index f93e8f4240787..843e6de70ce93 100644 --- a/doc/source/whatsnew/v0.19.0.txt +++ b/doc/source/whatsnew/v0.19.0.txt @@ -48,7 +48,7 @@ The following are now part of this API: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ A long-time requested feature has been added through the :func:`merge_asof` function, to -support asof style joining of time-series. (:issue:`1870`, :issue:`13695`, :issue:`13709`). Full documentation is +support asof style joining of time-series. (:issue:`1870`, :issue:`13695`, :issue:`13709`, :issue:`13902`). Full documentation is :ref:`here ` The :func:`merge_asof` performs an asof merge, which is similar to a left-join diff --git a/pandas/src/join.pyx b/pandas/src/join.pyx index f3c7577ef528a..9281453c643ee 100644 --- a/pandas/src/join.pyx +++ b/pandas/src/join.pyx @@ -34,6 +34,8 @@ cdef double nan = NaN from pandas.algos import groupsort_indexer +include "joins_func_helper.pxi" + def inner_join(ndarray[int64_t] left, ndarray[int64_t] right, Py_ssize_t max_groups): @@ -162,64 +164,6 @@ def left_outer_join(ndarray[int64_t] left, ndarray[int64_t] right, return left_indexer, right_indexer -def left_outer_asof_join(ndarray[int64_t] left, ndarray[int64_t] right, - Py_ssize_t max_groups, # ignored - bint allow_exact_matches=1, - left_values=None, - right_values=None, - tolerance=None): - - cdef: - Py_ssize_t left_pos, right_pos, left_size, right_size - ndarray[int64_t] left_indexer, right_indexer - bint has_tolerance = 0 - ndarray[int64_t] left_values_, right_values_ - int64_t tolerance_ - - # if we are using tolerance, set our objects - if (left_values is not None and right_values is not None and - tolerance is not None): - has_tolerance = 1 - left_values_ = left_values - right_values_ = right_values - tolerance_ = tolerance - - left_size = len(left) - right_size = len(right) - - left_indexer = np.empty(left_size, dtype=np.int64) - right_indexer = np.empty(left_size, dtype=np.int64) - - right_pos = 0 - for left_pos in range(left_size): - # restart right_pos if it went negative in a previous iteration - if right_pos < 0: - right_pos = 0 - - # find last position in right whose value is less than left's value - if allow_exact_matches: - while (right_pos < right_size and - right[right_pos] <= left[left_pos]): - right_pos += 1 - else: - while (right_pos < right_size and - right[right_pos] < left[left_pos]): - right_pos += 1 - right_pos -= 1 - - # save positions as the desired index - left_indexer[left_pos] = left_pos - right_indexer[left_pos] = right_pos - - # if needed, verify that tolerance is met - if has_tolerance and right_pos != -1: - diff = left_values[left_pos] - right_values[right_pos] - if diff > tolerance_: - right_indexer[left_pos] = -1 - - return left_indexer, right_indexer - - def full_outer_join(ndarray[int64_t] left, ndarray[int64_t] right, Py_ssize_t max_groups): cdef: diff --git a/pandas/src/joins_func_helper.pxi b/pandas/src/joins_func_helper.pxi new file mode 100644 index 0000000000000..7a59da37c5ced --- /dev/null +++ b/pandas/src/joins_func_helper.pxi @@ -0,0 +1,373 @@ +""" +Template for each `dtype` helper function for hashtable + +WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in +""" + +#---------------------------------------------------------------------- +# asof_join_by +#---------------------------------------------------------------------- + + +from hashtable cimport * + + +def asof_join_int64_t_by_object(ndarray[int64_t] left_values, + ndarray[int64_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int64_t tolerance_ + PyObjectHashTable hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = PyObjectHashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_double_by_object(ndarray[double] left_values, + ndarray[double] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + double tolerance_ + PyObjectHashTable hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = PyObjectHashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int64_t_by_int64_t(ndarray[int64_t] left_values, + ndarray[int64_t] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int64_t tolerance_ + Int64HashTable hash_table + int64_t by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = Int64HashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_double_by_int64_t(ndarray[double] left_values, + ndarray[double] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + double tolerance_ + Int64HashTable hash_table + int64_t by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = Int64HashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +#---------------------------------------------------------------------- +# asof_join +#---------------------------------------------------------------------- + + +def asof_join_int64_t(ndarray[int64_t] left_values, + ndarray[int64_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int64_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_double(ndarray[double] left_values, + ndarray[double] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + double tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer diff --git a/pandas/src/joins_func_helper.pxi.in b/pandas/src/joins_func_helper.pxi.in new file mode 100644 index 0000000000000..06c35cfb69e53 --- /dev/null +++ b/pandas/src/joins_func_helper.pxi.in @@ -0,0 +1,160 @@ +""" +Template for each `dtype` helper function for hashtable + +WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in +""" + +#---------------------------------------------------------------------- +# asof_join_by +#---------------------------------------------------------------------- + +{{py: + +# table_type, by_dtype +by_dtypes = [('PyObjectHashTable', 'object'), ('Int64HashTable', 'int64_t')] + +# on_dtype +on_dtypes = ['int64_t', 'double'] + +}} + + +from hashtable cimport * + +{{for table_type, by_dtype in by_dtypes}} +{{for on_dtype in on_dtypes}} + + +def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values, + ndarray[{{on_dtype}}] right_values, + ndarray[{{by_dtype}}] left_by_values, + ndarray[{{by_dtype}}] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + {{on_dtype}} tolerance_ + {{table_type}} hash_table + {{by_dtype}} by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {{table_type}}(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + +{{endfor}} +{{endfor}} + + +#---------------------------------------------------------------------- +# asof_join +#---------------------------------------------------------------------- + +{{py: + +# on_dtype +dtypes = ['int64_t', 'double'] + +}} + +{{for on_dtype in dtypes}} + + +def asof_join_{{on_dtype}}(ndarray[{{on_dtype}}] left_values, + ndarray[{{on_dtype}}] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + {{on_dtype}} tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + +{{endfor}} + diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index 571df70e05c6d..1572363fc6136 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -17,13 +17,15 @@ is_datetime64_dtype, needs_i8_conversion, is_int64_dtype, + is_integer_dtype, + is_float_dtype, is_integer, is_int_or_datetime_dtype, is_dtype_equal, is_bool, is_list_like, _ensure_int64, - _ensure_platform_int, + _ensure_float64, _ensure_object) from pandas.types.missing import na_value_for_dtype @@ -275,20 +277,17 @@ def merge_asof(left, right, on=None, ---------- left : DataFrame right : DataFrame - on : label or list - Field names to join on. Must be found in both DataFrames. + on : label + Field name to join on. Must be found in both DataFrames. The data MUST be ordered. Furthermore this must be a numeric column, - typically a datetimelike or integer. On or left_on/right_on + such as datetimelike, integer, or float. On or left_on/right_on must be given. - left_on : label or list, or array-like - Field names to join on in left DataFrame. Can be a vector or list of - vectors of the length of the DataFrame to use a particular vector as - the join key instead of columns - right_on : label or list, or array-like - Field names to join on in right DataFrame or vector/list of vectors per - left_on docs - by : column name or list of column names - Group both the left and right DataFrames by the group columns; perform + left_on : label + Field name to join on in left DataFrame. + right_on : label + Field name to join on in right DataFrame. + by : column name + Group both the left and right DataFrames by the group column; perform the merge operation on these pieces and recombine. suffixes : 2-length sequence (tuple, list, ...) Suffix to apply to overlapping column names in the left and right @@ -415,38 +414,12 @@ def merge_asof(left, right, on=None, merge_ordered """ - def _merger(x, y): - # perform the ordered merge operation - op = _AsOfMerge(x, y, - on=on, left_on=left_on, right_on=right_on, - by=by, suffixes=suffixes, - how='asof', tolerance=tolerance, - allow_exact_matches=allow_exact_matches) - return op.get_result() - - if by is not None: - result, groupby = _groupby_and_merge(by, on, left, right, - lambda x, y: _merger(x, y), - check_duplicates=False) - - # we want to preserve the original order - # we had grouped, so need to reverse this - # if we DO have duplicates, then - # we cannot guarantee order - - sorter = _ensure_platform_int( - np.concatenate([groupby.indices[g] for g, _ in groupby])) - if len(result) != len(sorter): - return result - - rev = np.empty(len(sorter), dtype=np.int_) - rev.put(sorter, np.arange(len(sorter))) - return result.take(rev).reset_index(drop=True) - - if right.duplicated(on).any(): - right = right.drop_duplicates(on, keep='last') - - return _merger(left, right) + op = _AsOfMerge(left, right, + on=on, left_on=left_on, right_on=right_on, + by=by, suffixes=suffixes, + how='asof', tolerance=tolerance, + allow_exact_matches=allow_exact_matches) + return op.get_result() # TODO: transformations?? @@ -942,6 +915,35 @@ def get_result(self): return result +_asof_functions = { + 'int64_t': _join.asof_join_int64_t, + 'double': _join.asof_join_double, +} + +_asof_by_functions = { + ('int64_t', 'int64_t'): _join.asof_join_int64_t_by_int64_t, + ('double', 'int64_t'): _join.asof_join_double_by_int64_t, + ('int64_t', 'object'): _join.asof_join_int64_t_by_object, + ('double', 'object'): _join.asof_join_double_by_object, +} + +_type_casters = { + 'int64_t': _ensure_int64, + 'double': _ensure_float64, + 'object': _ensure_object, +} + + +def _get_cython_type(dtype): + """ Given a dtype, return 'int64_t', 'double', or 'object' """ + if is_integer_dtype(dtype): + return 'int64_t' + elif is_float_dtype(dtype): + return 'double' + else: + return 'object' + + class _AsOfMerge(_OrderedMerge): _merge_type = 'asof_merge' @@ -977,6 +979,9 @@ def _validate_specification(self): if not is_list_like(self.by): self.by = [self.by] + if len(self.by) != 1: + raise MergeError("can only asof by a single key") + self.left_on = self.by + list(self.left_on) self.right_on = self.by + list(self.right_on) @@ -1030,36 +1035,62 @@ def _get_merge_keys(self): def _get_join_indexers(self): """ return the join indexers """ + # values to compare + left_values = self.left_join_keys[-1] + right_values = self.right_join_keys[-1] + tolerance = self.tolerance + # we required sortedness in the join keys msg = " keys must be sorted" - for lk in self.left_join_keys: - if not Index(lk).is_monotonic: - raise ValueError('left' + msg) - for rk in self.right_join_keys: - if not Index(rk).is_monotonic: - raise ValueError('right' + msg) - - kwargs = {} - - # tolerance - t = self.tolerance - if t is not None: - lt = self.left_join_keys[self.left_on.index(self._asof_key)] - rt = self.right_join_keys[self.right_on.index(self._asof_key)] - if needs_i8_conversion(lt): - lt = lt.view('i8') - t = t.value - rt = rt.view('i8') - kwargs['left_values'] = lt - kwargs['right_values'] = rt - kwargs['tolerance'] = t + if not Index(left_values).is_monotonic: + raise ValueError('left' + msg) + if not Index(right_values).is_monotonic: + raise ValueError('right' + msg) + + # initial type conversion as needed + if needs_i8_conversion(left_values): + left_values = left_values.view('i8') + right_values = right_values.view('i8') + if tolerance is not None: + tolerance = tolerance.value + + # a "by" parameter requires special handling + if self.by is not None: + left_by_values = self.left_join_keys[0] + right_by_values = self.right_join_keys[0] + + # choose appropriate function by type + on_type = _get_cython_type(left_values.dtype) + by_type = _get_cython_type(left_by_values.dtype) + + on_type_caster = _type_casters[on_type] + by_type_caster = _type_casters[by_type] + func = _asof_by_functions[(on_type, by_type)] + + left_values = on_type_caster(left_values) + right_values = on_type_caster(right_values) + left_by_values = by_type_caster(left_by_values) + right_by_values = by_type_caster(right_by_values) + + return func(left_values, + right_values, + left_by_values, + right_by_values, + self.allow_exact_matches, + tolerance) + else: + # choose appropriate function by type + on_type = _get_cython_type(left_values.dtype) + type_caster = _type_casters[on_type] + func = _asof_functions[on_type] - return _get_join_indexers(self.left_join_keys, - self.right_join_keys, - sort=self.sort, - how=self.how, - allow_exact_matches=self.allow_exact_matches, - **kwargs) + left_values = type_caster(left_values) + right_values = type_caster(right_values) + + return func(left_values, + right_values, + self.allow_exact_matches, + tolerance) def _get_multiindex_indexer(join_keys, index, sort): @@ -1143,7 +1174,6 @@ def _right_outer_join(x, y, max_groups): 'left': _join.left_outer_join, 'right': _right_outer_join, 'outer': _join.full_outer_join, - 'asof': _join.left_outer_asof_join, } diff --git a/pandas/tools/tests/test_merge_asof.py b/pandas/tools/tests/test_merge_asof.py index e0c50cf3baaf7..f413618624592 100644 --- a/pandas/tools/tests/test_merge_asof.py +++ b/pandas/tools/tests/test_merge_asof.py @@ -364,6 +364,100 @@ def test_allow_exact_matches_and_tolerance3(self): 'version': [np.nan, np.nan]}) assert_frame_equal(result, expected) + def test_by_int(self): + # we specialize by type, so test that this is correct + df1 = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.020', + '20160525 13:30:00.030', + '20160525 13:30:00.040', + '20160525 13:30:00.050', + '20160525 13:30:00.060']), + 'key': [1, 2, 1, 3, 2], + 'value1': [1.1, 1.2, 1.3, 1.4, 1.5]}, + columns=['time', 'key', 'value1']) + + df2 = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.015', + '20160525 13:30:00.020', + '20160525 13:30:00.025', + '20160525 13:30:00.035', + '20160525 13:30:00.040', + '20160525 13:30:00.055', + '20160525 13:30:00.060', + '20160525 13:30:00.065']), + 'key': [2, 1, 1, 3, 2, 1, 2, 3], + 'value2': [2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8]}, + columns=['time', 'key', 'value2']) + + result = pd.merge_asof(df1, df2, on='time', by='key') + + expected = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.020', + '20160525 13:30:00.030', + '20160525 13:30:00.040', + '20160525 13:30:00.050', + '20160525 13:30:00.060']), + 'key': [1, 2, 1, 3, 2], + 'value1': [1.1, 1.2, 1.3, 1.4, 1.5], + 'value2': [2.2, 2.1, 2.3, 2.4, 2.7]}, + columns=['time', 'key', 'value1', 'value2']) + + assert_frame_equal(result, expected) + + def test_on_float(self): + # mimics how to determine the minimum-price variation + df1 = pd.DataFrame({ + 'price': [5.01, 0.0023, 25.13, 340.05, 30.78, 1040.90, 0.0078], + 'symbol': list("ABCDEFG")}, + columns=['symbol', 'price']) + + df2 = pd.DataFrame({ + 'price': [0.0, 1.0, 100.0], + 'mpv': [0.0001, 0.01, 0.05]}, + columns=['price', 'mpv']) + + df1 = df1.sort_values('price').reset_index(drop=True) + + result = pd.merge_asof(df1, df2, on='price') + + expected = pd.DataFrame({ + 'symbol': list("BGACEDF"), + 'price': [0.0023, 0.0078, 5.01, 25.13, 30.78, 340.05, 1040.90], + 'mpv': [0.0001, 0.0001, 0.01, 0.01, 0.01, 0.05, 0.05]}, + columns=['symbol', 'price', 'mpv']) + + assert_frame_equal(result, expected) + + def test_on_float_by_int(self): + # type specialize both "by" and "on" parameters + df1 = pd.DataFrame({ + 'symbol': list("AAABBBCCC"), + 'exch': [1, 2, 3, 1, 2, 3, 1, 2, 3], + 'price': [3.26, 3.2599, 3.2598, 12.58, 12.59, + 12.5, 378.15, 378.2, 378.25]}, + columns=['symbol', 'exch', 'price']) + + df2 = pd.DataFrame({ + 'exch': [1, 1, 1, 2, 2, 2, 3, 3, 3], + 'price': [0.0, 1.0, 100.0, 0.0, 5.0, 100.0, 0.0, 5.0, 1000.0], + 'mpv': [0.0001, 0.01, 0.05, 0.0001, 0.01, 0.1, 0.0001, 0.25, 1.0]}, + columns=['exch', 'price', 'mpv']) + + df1 = df1.sort_values('price').reset_index(drop=True) + df2 = df2.sort_values('price').reset_index(drop=True) + + result = pd.merge_asof(df1, df2, on='price', by='exch') + + expected = pd.DataFrame({ + 'symbol': list("AAABBBCCC"), + 'exch': [3, 2, 1, 3, 1, 2, 1, 2, 3], + 'price': [3.2598, 3.2599, 3.26, 12.5, 12.58, + 12.59, 378.15, 378.2, 378.25], + 'mpv': [0.0001, 0.0001, 0.01, 0.25, 0.01, 0.01, 0.05, 0.1, 0.25]}, + columns=['symbol', 'exch', 'price', 'mpv']) + + assert_frame_equal(result, expected) + if __name__ == '__main__': nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'], diff --git a/setup.py b/setup.py index 5bf188d829d26..1c12ff4aca372 100755 --- a/setup.py +++ b/setup.py @@ -109,7 +109,7 @@ def is_platform_mac(): _pxifiles = ['algos_common_helper.pxi.in', 'algos_groupby_helper.pxi.in', 'join_helper.pxi.in', 'algos_take_helper.pxi.in', 'hashtable_class_helper.pxi.in', 'hashtable_func_helper.pxi.in', - 'sparse_op_helper.pxi.in'] + 'sparse_op_helper.pxi.in', 'joins_func_helper.pxi.in'] class build_ext(_build_ext):