From 11c8427171c738ca715a30e23f2ec4e309c9b2ef Mon Sep 17 00:00:00 2001 From: Ka Wo Chen Date: Tue, 27 Oct 2015 19:57:54 -0400 Subject: [PATCH] CLN/PERF: remove used functions; use C skip list for rolling median --- asv_bench/benchmarks/gil.py | 70 +++ asv_bench/benchmarks/stat_ops.py | 29 +- doc/source/whatsnew/v0.17.1.txt | 3 +- pandas/algos.pyx | 723 +++++++++++++++---------------- pandas/src/skiplist.h | 89 ++-- pandas/src/skiplist.pxd | 17 +- pandas/stats/moments.py | 12 +- 7 files changed, 522 insertions(+), 421 deletions(-) diff --git a/asv_bench/benchmarks/gil.py b/asv_bench/benchmarks/gil.py index eeca2d54381b2..fdeace108f76e 100644 --- a/asv_bench/benchmarks/gil.py +++ b/asv_bench/benchmarks/gil.py @@ -366,3 +366,73 @@ def time_period_to_datetime(self): def run(period): period.to_timestamp() run(self.period) + + +class nogil_rolling_algos_slow(object): + goal_time = 0.2 + + def setup(self): + self.win = 100 + np.random.seed(1234) + self.arr = np.random.rand(100000) + if (not have_real_test_parallel): + raise NotImplementedError + + def time_nogil_rolling_median(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_median(arr, win) + run(self.arr, self.win) + + +class nogil_rolling_algos_fast(object): + goal_time = 0.2 + + def setup(self): + self.win = 100 + np.random.seed(1234) + self.arr = np.random.rand(1000000) + if (not have_real_test_parallel): + raise NotImplementedError + + def time_nogil_rolling_mean(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_mean(arr, win) + run(self.arr, self.win) + + def time_nogil_rolling_min(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_min(arr, win) + run(self.arr, self.win) + + def time_nogil_rolling_max(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_max(arr, win) + run(self.arr, self.win) + + def time_nogil_rolling_var(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_var(arr, win) + run(self.arr, self.win) + + def time_nogil_rolling_skew(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_skew(arr, win) + run(self.arr, self.win) + + def time_nogil_rolling_kurt(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_kurt(arr, win) + run(self.arr, self.win) + + def time_nogil_rolling_std(self): + @test_parallel(num_threads=2) + def run(arr, win): + rolling_std(arr, win) + run(self.arr, self.win) \ No newline at end of file diff --git a/asv_bench/benchmarks/stat_ops.py b/asv_bench/benchmarks/stat_ops.py index 4125357455d2e..daf5135e64c40 100644 --- a/asv_bench/benchmarks/stat_ops.py +++ b/asv_bench/benchmarks/stat_ops.py @@ -231,6 +231,31 @@ class stats_rolling_mean(object): def setup(self): self.arr = np.random.randn(100000) + self.win = 100 - def time_stats_rolling_mean(self): - rolling_mean(self.arr, 100) \ No newline at end of file + def time_rolling_mean(self): + rolling_mean(self.arr, self.win) + + def time_rolling_median(self): + rolling_median(self.arr, self.win) + + def time_rolling_min(self): + rolling_min(self.arr, self.win) + + def time_rolling_max(self): + rolling_max(self.arr, self.win) + + def time_rolling_sum(self): + rolling_sum(self.arr, self.win) + + def time_rolling_std(self): + rolling_std(self.arr, self.win) + + def time_rolling_var(self): + rolling_var(self.arr, self.win) + + def time_rolling_skew(self): + rolling_skew(self.arr, self.win) + + def time_rolling_kurt(self): + rolling_kurt(self.arr, self.win) \ No newline at end of file diff --git a/doc/source/whatsnew/v0.17.1.txt b/doc/source/whatsnew/v0.17.1.txt index 84db16e338d87..55df24bb8ab58 100755 --- a/doc/source/whatsnew/v0.17.1.txt +++ b/doc/source/whatsnew/v0.17.1.txt @@ -59,7 +59,8 @@ Performance Improvements - Release the GIL on most datetime field operations (e.g. ``DatetimeIndex.year``, ``Series.dt.year``), normalization, and conversion to and from ``Period``, ``DatetimeIndex.to_period`` and ``PeriodIndex.to_timestamp`` (:issue:`11263`) - +- Release the GIL on some srolling algos (``rolling_median``, ``rolling_mean``, ``rolling_max``, ``rolling_min``, ``rolling_var``, ``rolling_kurt``, `rolling_skew`` (:issue:`11450`) +- Improved performance of ``rolling_median`` (:issue:`11450`) - Improved performance to ``to_excel`` (:issue:`11352`) diff --git a/pandas/algos.pyx b/pandas/algos.pyx index 44b1996272356..8569209f2e946 100644 --- a/pandas/algos.pyx +++ b/pandas/algos.pyx @@ -56,9 +56,9 @@ cdef inline int int_min(int a, int b): return a if a <= b else b cdef extern from "src/headers/math.h": - double sqrt(double x) - double fabs(double) - int signbit(double) + double sqrt(double x) nogil + double fabs(double) nogil + int signbit(double) nogil from pandas import lib @@ -864,7 +864,8 @@ def min_subseq(ndarray[double_t] arr): #------------------------------------------------------------------------------- # Rolling sum - +@cython.boundscheck(False) +@cython.wraparound(False) def roll_sum(ndarray[double_t] input, int win, int minp): cdef double val, prev, sum_x = 0 cdef int nobs = 0, i @@ -873,40 +874,41 @@ def roll_sum(ndarray[double_t] input, int win, int minp): cdef ndarray[double_t] output = np.empty(N, dtype=float) minp = _check_minp(win, minp, N) + with nogil: + for i from 0 <= i < minp - 1: + val = input[i] - for i from 0 <= i < minp - 1: - val = input[i] + # Not NaN + if val == val: + nobs += 1 + sum_x += val - # Not NaN - if val == val: - nobs += 1 - sum_x += val + output[i] = NaN - output[i] = NaN + for i from minp - 1 <= i < N: + val = input[i] - for i from minp - 1 <= i < N: - val = input[i] - - if val == val: - nobs += 1 - sum_x += val + if val == val: + nobs += 1 + sum_x += val - if i > win - 1: - prev = input[i - win] - if prev == prev: - sum_x -= prev - nobs -= 1 + if i > win - 1: + prev = input[i - win] + if prev == prev: + sum_x -= prev + nobs -= 1 - if nobs >= minp: - output[i] = sum_x - else: - output[i] = NaN + if nobs >= minp: + output[i] = sum_x + else: + output[i] = NaN return output #------------------------------------------------------------------------------- # Rolling mean - +@cython.boundscheck(False) +@cython.wraparound(False) def roll_mean(ndarray[double_t] input, int win, int minp): cdef: @@ -916,48 +918,48 @@ def roll_mean(ndarray[double_t] input, cdef ndarray[double_t] output = np.empty(N, dtype=float) minp = _check_minp(win, minp, N) + with nogil: + for i from 0 <= i < minp - 1: + val = input[i] - for i from 0 <= i < minp - 1: - val = input[i] - - # Not NaN - if val == val: - nobs += 1 - sum_x += val - if signbit(val): - neg_ct += 1 - - output[i] = NaN - - for i from minp - 1 <= i < N: - val = input[i] + # Not NaN + if val == val: + nobs += 1 + sum_x += val + if signbit(val): + neg_ct += 1 - if val == val: - nobs += 1 - sum_x += val - if signbit(val): - neg_ct += 1 + output[i] = NaN - if i > win - 1: - prev = input[i - win] - if prev == prev: - sum_x -= prev - nobs -= 1 - if signbit(prev): - neg_ct -= 1 + for i from minp - 1 <= i < N: + val = input[i] - if nobs >= minp: - result = sum_x / nobs - if neg_ct == 0 and result < 0: - # all positive - output[i] = 0 - elif neg_ct == nobs and result > 0: - # all negative - output[i] = 0 + if val == val: + nobs += 1 + sum_x += val + if signbit(val): + neg_ct += 1 + + if i > win - 1: + prev = input[i - win] + if prev == prev: + sum_x -= prev + nobs -= 1 + if signbit(prev): + neg_ct -= 1 + + if nobs >= minp: + result = sum_x / nobs + if neg_ct == 0 and result < 0: + # all positive + output[i] = 0 + elif neg_ct == nobs and result > 0: + # all negative + output[i] = 0 + else: + output[i] = result else: - output[i] = result - else: - output[i] = NaN + output[i] = NaN return output @@ -1242,6 +1244,8 @@ def nancorr_spearman(ndarray[float64_t, ndim=2] mat, Py_ssize_t minp=1): #---------------------------------------------------------------------- # Rolling variance +@cython.boundscheck(False) +@cython.wraparound(False) def roll_var(ndarray[double_t] input, int win, int minp, int ddof=1): """ Numerically stable implementation using Welford's method. @@ -1257,80 +1261,82 @@ def roll_var(ndarray[double_t] input, int win, int minp, int ddof=1): # Check for windows larger than array, addresses #7297 win = min(win, N) - # Over the first window, observations can only be added, never removed - for i from 0 <= i < win: - val = input[i] - - # Not NaN - if val == val: - nobs += 1 - delta = (val - mean_x) - mean_x += delta / nobs - ssqdm_x += delta * (val - mean_x) - - if (nobs >= minp) and (nobs > ddof): - #pathological case - if nobs == 1: - val = 0 - else: - val = ssqdm_x / (nobs - ddof) - if val < 0: - val = 0 - else: - val = NaN - - output[i] = val - - # After the first window, observations can both be added and removed - for i from win <= i < N: - val = input[i] - prev = input[i - win] + with nogil: + # Over the first window, observations can only be added, never removed + for i from 0 <= i < win: + val = input[i] - if val == val: - if prev == prev: - # Adding one observation and removing another one - delta = val - prev - prev -= mean_x - mean_x += delta / nobs - val -= mean_x - ssqdm_x += (val + prev) * delta - else: - # Adding one observation and not removing any + # Not NaN + if val == val: nobs += 1 delta = (val - mean_x) mean_x += delta / nobs ssqdm_x += delta * (val - mean_x) - elif prev == prev: - # Adding no new observation, but removing one - nobs -= 1 - if nobs: - delta = (prev - mean_x) - mean_x -= delta / nobs - ssqdm_x -= delta * (prev - mean_x) - else: - mean_x = 0 - ssqdm_x = 0 - # Variance is unchanged if no observation is added or removed - - if (nobs >= minp) and (nobs > ddof): - #pathological case - if nobs == 1: - val = 0 + + if (nobs >= minp) and (nobs > ddof): + #pathological case + if nobs == 1: + val = 0 + else: + val = ssqdm_x / (nobs - ddof) + if val < 0: + val = 0 else: - val = ssqdm_x / (nobs - ddof) - if val < 0: + val = NaN + + output[i] = val + + # After the first window, observations can both be added and removed + for i from win <= i < N: + val = input[i] + prev = input[i - win] + + if val == val: + if prev == prev: + # Adding one observation and removing another one + delta = val - prev + prev -= mean_x + mean_x += delta / nobs + val -= mean_x + ssqdm_x += (val + prev) * delta + else: + # Adding one observation and not removing any + nobs += 1 + delta = (val - mean_x) + mean_x += delta / nobs + ssqdm_x += delta * (val - mean_x) + elif prev == prev: + # Adding no new observation, but removing one + nobs -= 1 + if nobs: + delta = (prev - mean_x) + mean_x -= delta / nobs + ssqdm_x -= delta * (prev - mean_x) + else: + mean_x = 0 + ssqdm_x = 0 + # Variance is unchanged if no observation is added or removed + + if (nobs >= minp) and (nobs > ddof): + #pathological case + if nobs == 1: val = 0 - else: - val = NaN + else: + val = ssqdm_x / (nobs - ddof) + if val < 0: + val = 0 + else: + val = NaN - output[i] = val + output[i] = val return output #------------------------------------------------------------------------------- # Rolling skewness - +@cython.boundscheck(False) +@cython.wraparound(False) def roll_skew(ndarray[double_t] input, int win, int minp): cdef double val, prev cdef double x = 0, xx = 0, xxx = 0 @@ -1343,55 +1349,55 @@ def roll_skew(ndarray[double_t] input, int win, int minp): cdef double A, B, C, R minp = _check_minp(win, minp, N) + with nogil: + for i from 0 <= i < minp - 1: + val = input[i] - for i from 0 <= i < minp - 1: - val = input[i] - - # Not NaN - if val == val: - nobs += 1 - x += val - xx += val * val - xxx += val * val * val - - output[i] = NaN - - for i from minp - 1 <= i < N: - val = input[i] + # Not NaN + if val == val: + nobs += 1 + x += val + xx += val * val + xxx += val * val * val - if val == val: - nobs += 1 - x += val - xx += val * val - xxx += val * val * val + output[i] = NaN - if i > win - 1: - prev = input[i - win] - if prev == prev: - x -= prev - xx -= prev * prev - xxx -= prev * prev * prev + for i from minp - 1 <= i < N: + val = input[i] - nobs -= 1 - if nobs >= minp: - A = x / nobs - B = xx / nobs - A * A - C = xxx / nobs - A * A * A - 3 * A * B - if B <= 0 or nobs < 3: - output[i] = NaN + if val == val: + nobs += 1 + x += val + xx += val * val + xxx += val * val * val + + if i > win - 1: + prev = input[i - win] + if prev == prev: + x -= prev + xx -= prev * prev + xxx -= prev * prev * prev + + nobs -= 1 + if nobs >= minp: + A = x / nobs + B = xx / nobs - A * A + C = xxx / nobs - A * A * A - 3 * A * B + if B <= 0 or nobs < 3: + output[i] = NaN + else: + R = sqrt(B) + output[i] = ((sqrt(nobs * (nobs - 1.)) * C) / + ((nobs-2) * R * R * R)) else: - R = sqrt(B) - output[i] = ((sqrt(nobs * (nobs - 1.)) * C) / - ((nobs-2) * R * R * R)) - else: - output[i] = NaN + output[i] = NaN return output #------------------------------------------------------------------------------- # Rolling kurtosis - - +@cython.boundscheck(False) +@cython.wraparound(False) def roll_kurt(ndarray[double_t] input, int win, int minp): cdef double val, prev @@ -1405,62 +1411,62 @@ def roll_kurt(ndarray[double_t] input, cdef double A, B, C, D, R, K minp = _check_minp(win, minp, N) + with nogil: + for i from 0 <= i < minp - 1: + val = input[i] - for i from 0 <= i < minp - 1: - val = input[i] - - # Not NaN - if val == val: - nobs += 1 - - # seriously don't ask me why this is faster - x += val - xx += val * val - xxx += val * val * val - xxxx += val * val * val * val + # Not NaN + if val == val: + nobs += 1 - output[i] = NaN + # seriously don't ask me why this is faster + x += val + xx += val * val + xxx += val * val * val + xxxx += val * val * val * val - for i from minp - 1 <= i < N: - val = input[i] + output[i] = NaN - if val == val: - nobs += 1 - x += val - xx += val * val - xxx += val * val * val - xxxx += val * val * val * val + for i from minp - 1 <= i < N: + val = input[i] - if i > win - 1: - prev = input[i - win] - if prev == prev: - x -= prev - xx -= prev * prev - xxx -= prev * prev * prev - xxxx -= prev * prev * prev * prev + if val == val: + nobs += 1 + x += val + xx += val * val + xxx += val * val * val + xxxx += val * val * val * val + + if i > win - 1: + prev = input[i - win] + if prev == prev: + x -= prev + xx -= prev * prev + xxx -= prev * prev * prev + xxxx -= prev * prev * prev * prev + + nobs -= 1 + + if nobs >= minp: + A = x / nobs + R = A * A + B = xx / nobs - R + R = R * A + C = xxx / nobs - R - 3 * A * B + R = R * A + D = xxxx / nobs - R - 6*B*A*A - 4*C*A + + if B == 0 or nobs < 4: + output[i] = NaN - nobs -= 1 + else: + K = (nobs * nobs - 1.)*D/(B*B) - 3*((nobs-1.)**2) + K = K / ((nobs - 2.)*(nobs-3.)) - if nobs >= minp: - A = x / nobs - R = A * A - B = xx / nobs - R - R = R * A - C = xxx / nobs - R - 3 * A * B - R = R * A - D = xxxx / nobs - R - 6*B*A*A - 4*C*A - - if B == 0 or nobs < 4: - output[i] = NaN + output[i] = K else: - K = (nobs * nobs - 1.)*D/(B*B) - 3*((nobs-1.)**2) - K = K / ((nobs - 2.)*(nobs-3.)) - - output[i] = K - - else: - output[i] = NaN + output[i] = NaN return output @@ -1512,9 +1518,13 @@ cdef _roll_skiplist_op(ndarray arg, int win, int minp, skiplist_f op): from skiplist cimport * + +@cython.boundscheck(False) +@cython.wraparound(False) def roll_median_c(ndarray[float64_t] arg, int win, int minp): - cdef double val, res, prev cdef: + double val, res, prev + bint err=0 int ret=0 skiplist_t *sl Py_ssize_t midpoint, nobs = 0, i @@ -1524,71 +1534,59 @@ def roll_median_c(ndarray[float64_t] arg, int win, int minp): cdef ndarray[double_t] output = np.empty(N, dtype=float) sl = skiplist_init(win) + if sl == NULL: + raise MemoryError("skiplist_init failed") minp = _check_minp(win, minp, N) - for i from 0 <= i < minp - 1: - val = arg[i] - - # Not NaN - if val == val: - nobs += 1 - skiplist_insert(sl, val) - - output[i] = NaN + with nogil: + for i from 0 <= i < minp - 1: + val = arg[i] - for i from minp - 1 <= i < N: - val = arg[i] + # Not NaN + if val == val: + nobs += 1 + err = skiplist_insert(sl, val) != 1 + if err: + break + output[i] = NaN - if i > win - 1: - prev = arg[i - win] + with nogil: + if not err: + for i from minp - 1 <= i < N: - if prev == prev: - skiplist_remove(sl, prev) - nobs -= 1 + val = arg[i] - if val == val: - nobs += 1 - skiplist_insert(sl, val) + if i > win - 1: + prev = arg[i - win] - if nobs >= minp: - midpoint = nobs / 2 - if nobs % 2: - res = skiplist_get(sl, midpoint, &ret) - else: - res = (skiplist_get(sl, midpoint, &ret) + - skiplist_get(sl, (midpoint - 1), &ret)) / 2 - else: - res = NaN + if prev == prev: + skiplist_remove(sl, prev) + nobs -= 1 - output[i] = res + if val == val: + nobs += 1 + err = skiplist_insert(sl, val) != 1 + if err: + break + + if nobs >= minp: + midpoint = nobs / 2 + if nobs % 2: + res = skiplist_get(sl, midpoint, &ret) + else: + res = (skiplist_get(sl, midpoint, &ret) + + skiplist_get(sl, (midpoint - 1), &ret)) / 2 + else: + res = NaN - skiplist_destroy(sl) + output[i] = res + skiplist_destroy(sl) + if err: + raise MemoryError("skiplist_insert failed") return output -def roll_median_cython(ndarray input, int win, int minp): - ''' - O(N log(window)) implementation using skip list - ''' - return _roll_skiplist_op(input, win, minp, _get_median) - -# Unfortunately had to resort to some hackery here, would like for -# Cython to be able to get this right. - -cdef double_t _get_median(object sl, int nobs, int minp): - cdef Py_ssize_t midpoint - cdef IndexableSkiplist skiplist = sl - if nobs >= minp: - midpoint = nobs / 2 - if nobs % 2: - return skiplist.get(midpoint) - else: - return (skiplist.get(midpoint) + - skiplist.get(midpoint - 1)) / 2 - else: - return NaN - #---------------------------------------------------------------------- # Moving maximum / minimum code taken from Bottleneck under the terms @@ -1603,7 +1601,7 @@ from libc cimport stdlib @cython.boundscheck(False) @cython.wraparound(False) -def roll_max2(ndarray[float64_t] a, int window, int minp): +def roll_max(ndarray[float64_t] a, int window, int minp): "Moving max of 1d array of dtype=float64 along axis=0 ignoring NaNs." cdef np.float64_t ai, aold cdef Py_ssize_t count @@ -1617,7 +1615,7 @@ def roll_max2(ndarray[float64_t] a, int window, int minp): cdef Py_ssize_t n0 = dim[0] cdef np.npy_intp *dims = [n0] cdef np.ndarray[np.float64_t, ndim=1] y = PyArray_EMPTY(1, dims, - NPY_float64, 0) + NPY_float64, 0) if window < 1: raise ValueError('Invalid window size %d' @@ -1628,65 +1626,59 @@ def roll_max2(ndarray[float64_t] a, int window, int minp): % (minp, window)) minp = _check_minp(window, minp, n0) + with nogil: + ring = stdlib.malloc(window * sizeof(pairs)) + end = ring + window + last = ring - ring = stdlib.malloc(window * sizeof(pairs)) - end = ring + window - last = ring - - minpair = ring - ai = a[0] - if ai == ai: - minpair.value = ai - else: - minpair.value = MINfloat64 - minpair.death = window - - count = 0 - for i0 in range(n0): - ai = a[i0] + minpair = ring + ai = a[0] if ai == ai: - count += 1 - else: - ai = MINfloat64 - if i0 >= window: - aold = a[i0 - window] - if aold == aold: - count -= 1 - if minpair.death == i0: - minpair += 1 - if minpair >= end: - minpair = ring - if ai >= minpair.value: minpair.value = ai - minpair.death = i0 + window - last = minpair - else: - while last.value <= ai: - if last == ring: - last = end - last -= 1 - last += 1 - if last == end: - last = ring - last.value = ai - last.death = i0 + window - if count >= minp: - y[i0] = minpair.value else: - y[i0] = NaN + minpair.value = MINfloat64 + minpair.death = window + + count = 0 + for i0 in range(n0): + ai = a[i0] + if ai == ai: + count += 1 + else: + ai = MINfloat64 + if i0 >= window: + aold = a[i0 - window] + if aold == aold: + count -= 1 + if minpair.death == i0: + minpair += 1 + if minpair >= end: + minpair = ring + if ai >= minpair.value: + minpair.value = ai + minpair.death = i0 + window + last = minpair + else: + while last.value <= ai: + if last == ring: + last = end + last -= 1 + last += 1 + if last == end: + last = ring + last.value = ai + last.death = i0 + window + if count >= minp: + y[i0] = minpair.value + else: + y[i0] = NaN - for i0 in range(minp - 1): - y[i0] = NaN + for i0 in range(minp - 1): + y[i0] = NaN - stdlib.free(ring) + stdlib.free(ring) return y -def roll_max(ndarray input, int win, int minp): - ''' - O(N log(window)) implementation using skip list - ''' - return _roll_skiplist_op(input, win, minp, _get_max) - cdef double_t _get_max(object skiplist, int nobs, int minp): if nobs >= minp: @@ -1694,15 +1686,10 @@ cdef double_t _get_max(object skiplist, int nobs, int minp): else: return NaN -def roll_min(ndarray input, int win, int minp): - ''' - O(N log(window)) implementation using skip list - ''' - return _roll_skiplist_op(input, win, minp, _get_min) @cython.boundscheck(False) @cython.wraparound(False) -def roll_min2(np.ndarray[np.float64_t, ndim=1] a, int window, int minp): +def roll_min(np.ndarray[np.float64_t, ndim=1] a, int window, int minp): "Moving min of 1d array of dtype=float64 along axis=0 ignoring NaNs." cdef np.float64_t ai, aold cdef Py_ssize_t count @@ -1716,7 +1703,7 @@ def roll_min2(np.ndarray[np.float64_t, ndim=1] a, int window, int minp): cdef Py_ssize_t n0 = dim[0] cdef np.npy_intp *dims = [n0] cdef np.ndarray[np.float64_t, ndim=1] y = PyArray_EMPTY(1, dims, - NPY_float64, 0) + NPY_float64, 0) if window < 1: raise ValueError('Invalid window size %d' @@ -1727,57 +1714,57 @@ def roll_min2(np.ndarray[np.float64_t, ndim=1] a, int window, int minp): % (minp, window)) minp = _check_minp(window, minp, n0) + with nogil: + ring = stdlib.malloc(window * sizeof(pairs)) + end = ring + window + last = ring - ring = stdlib.malloc(window * sizeof(pairs)) - end = ring + window - last = ring - - minpair = ring - ai = a[0] - if ai == ai: - minpair.value = ai - else: - minpair.value = MAXfloat64 - minpair.death = window - - count = 0 - for i0 in range(n0): - ai = a[i0] + minpair = ring + ai = a[0] if ai == ai: - count += 1 - else: - ai = MAXfloat64 - if i0 >= window: - aold = a[i0 - window] - if aold == aold: - count -= 1 - if minpair.death == i0: - minpair += 1 - if minpair >= end: - minpair = ring - if ai <= minpair.value: minpair.value = ai - minpair.death = i0 + window - last = minpair - else: - while last.value >= ai: - if last == ring: - last = end - last -= 1 - last += 1 - if last == end: - last = ring - last.value = ai - last.death = i0 + window - if count >= minp: - y[i0] = minpair.value else: - y[i0] = NaN + minpair.value = MAXfloat64 + minpair.death = window + + count = 0 + for i0 in range(n0): + ai = a[i0] + if ai == ai: + count += 1 + else: + ai = MAXfloat64 + if i0 >= window: + aold = a[i0 - window] + if aold == aold: + count -= 1 + if minpair.death == i0: + minpair += 1 + if minpair >= end: + minpair = ring + if ai <= minpair.value: + minpair.value = ai + minpair.death = i0 + window + last = minpair + else: + while last.value >= ai: + if last == ring: + last = end + last -= 1 + last += 1 + if last == end: + last = ring + last.value = ai + last.death = i0 + window + if count >= minp: + y[i0] = minpair.value + else: + y[i0] = NaN - for i0 in range(minp - 1): - y[i0] = NaN + for i0 in range(minp - 1): + y[i0] = NaN - stdlib.free(ring) + stdlib.free(ring) return y cdef double_t _get_min(object skiplist, int nobs, int minp): diff --git a/pandas/src/skiplist.h b/pandas/src/skiplist.h index 57b32005021b9..c7117f16c9496 100644 --- a/pandas/src/skiplist.h +++ b/pandas/src/skiplist.h @@ -43,19 +43,20 @@ static PANDAS_INLINE double Log2(double val) { typedef struct node_t node_t; struct node_t { + node_t **next; + int *width; double value; int is_nil; int levels; - node_t **next; - int *width; int ref_count; }; typedef struct { node_t *head; - int size, maxlevels; node_t **tmp_chain; int *tmp_steps; + int size; + int maxlevels; } skiplist_t; static PANDAS_INLINE double urand(void) { @@ -68,33 +69,37 @@ static PANDAS_INLINE int int_min(int a, int b) { static PANDAS_INLINE node_t *node_init(double value, int levels) { node_t *result; - result = (node_t*) calloc(1, sizeof(node_t)); - - result->value = value; - result->levels = levels; - result->is_nil = 0; - result->ref_count = 0; - - result->next = (node_t**) malloc(levels * sizeof(node_t*)); - result->width = (int*) malloc(levels * sizeof(int)); - + result = (node_t*) malloc(sizeof(node_t)); + if (result) { + result->value = value; + result->levels = levels; + result->is_nil = 0; + result->ref_count = 0; + result->next = (node_t**) malloc(levels * sizeof(node_t*)); + result->width = (int*) malloc(levels * sizeof(int)); + if (!(result->next && result->width) && (levels != 0)) { + free(result->next); + free(result->width); + free(result); + return NULL; + } + } return result; } // do this ourselves - static PANDAS_INLINE void node_incref(node_t *node) { - node->ref_count += 1; + ++(node->ref_count); } static PANDAS_INLINE void node_decref(node_t *node) { - node->ref_count -= 1; + --(node->ref_count); } static void node_destroy(node_t *node) { int i; if (node) { - if (node->ref_count == 1) { + if (node->ref_count <= 1) { for (i = 0; i < node->levels; ++i) { node_destroy(node->next[i]); } @@ -110,21 +115,41 @@ static void node_destroy(node_t *node) { } } +static PANDAS_INLINE void skiplist_destroy(skiplist_t *skp) { + if (skp) { + node_destroy(skp->head); + free(skp->tmp_steps); + free(skp->tmp_chain); + free(skp); + } +} + static PANDAS_INLINE skiplist_t *skiplist_init(int expected_size) { skiplist_t *result; node_t *NIL, *head; int maxlevels, i; - maxlevels = Log2((double) expected_size); - result = (skiplist_t*) calloc(1, sizeof(skiplist_t)); + maxlevels = 1 + Log2((double) expected_size); + result = (skiplist_t*) malloc(sizeof(skiplist_t)); + if (!result) { + return NULL; + } result->tmp_chain = (node_t**) malloc(maxlevels * sizeof(node_t*)); result->tmp_steps = (int*) malloc(maxlevels * sizeof(int)); result->maxlevels = maxlevels; + result->size = 0; head = result->head = node_init(PANDAS_NAN, maxlevels); + NIL = node_init(0.0, 0); + + if (!(result->tmp_chain && result->tmp_steps && result->head && NIL)) { + skiplist_destroy(result); + node_destroy(NIL); + return NULL; + } + node_incref(head); - NIL = node_init(0, 0); NIL->is_nil = 1; for (i = 0; i < maxlevels; ++i) @@ -137,18 +162,7 @@ static PANDAS_INLINE skiplist_t *skiplist_init(int expected_size) { return result; } -static PANDAS_INLINE void skiplist_destroy(skiplist_t *skp) { - if (skp) { - node_destroy(skp->head); - free(skp->tmp_steps); - free(skp->tmp_chain); - free(skp); - } -} - - // 1 if left < right, 0 if left == right, -1 if left > right - static PANDAS_INLINE int _node_cmp(node_t* node, double value){ if (node->is_nil || node->value > value) { return -1; @@ -171,12 +185,12 @@ static PANDAS_INLINE double skiplist_get(skiplist_t *skp, int i, int *ret) { } node = skp->head; - i++; + ++i; for (level = skp->maxlevels - 1; level >= 0; --level) { while (node->width[level] <= i) { - i = i - node->width[level]; + i -= node->width[level]; node = node->next[level]; } } @@ -212,6 +226,9 @@ static PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) { size = int_min(skp->maxlevels, 1 - ((int) Log2(urand()))); newnode = node_init(value, size); + if (!newnode) { + return -1; + } steps = 0; for (level = 0; level < size; ++level) { @@ -231,7 +248,7 @@ static PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) { chain[level]->width[level] += 1; } - skp->size++; + ++(skp->size); return 1; } @@ -273,9 +290,9 @@ static PANDAS_INLINE int skiplist_remove(skiplist_t *skp, double value) { } for (level = size; level < skp->maxlevels; ++level) { - chain[level]->width[level] -= 1; + --(chain[level]->width[level]); } - skp->size--; + --(skp->size); return 1; } diff --git a/pandas/src/skiplist.pxd b/pandas/src/skiplist.pxd index c1221c4741c7b..69e9df5b542aa 100644 --- a/pandas/src/skiplist.pxd +++ b/pandas/src/skiplist.pxd @@ -1,21 +1,22 @@ cdef extern from "skiplist.h": ctypedef struct node_t: + node_t **next + int *width double value int is_nil int levels - node_t **next - int *width int ref_count ctypedef struct skiplist_t: node_t *head - int size, maxlevels node_t **tmp_chain int *tmp_steps + int size + int maxlevels - inline skiplist_t* skiplist_init(int) - inline void skiplist_destroy(skiplist_t*) - inline double skiplist_get(skiplist_t*, int, int*) - inline int skiplist_insert(skiplist_t*, double) - inline int skiplist_remove(skiplist_t*, double) + inline skiplist_t* skiplist_init(int) nogil + inline void skiplist_destroy(skiplist_t*) nogil + inline double skiplist_get(skiplist_t*, int, int*) nogil + inline int skiplist_insert(skiplist_t*, double) nogil + inline int skiplist_remove(skiplist_t*, double) nogil diff --git a/pandas/stats/moments.py b/pandas/stats/moments.py index c4791c43278b9..3894dc3b02499 100644 --- a/pandas/stats/moments.py +++ b/pandas/stats/moments.py @@ -646,11 +646,11 @@ def call_cython(arg, window, minp, args=(), kwargs={}, **kwds): return f -rolling_max = _rolling_func(algos.roll_max2, 'Moving maximum.', how='max') -rolling_min = _rolling_func(algos.roll_min2, 'Moving minimum.', how='min') +rolling_max = _rolling_func(algos.roll_max, 'Moving maximum.', how='max') +rolling_min = _rolling_func(algos.roll_min, 'Moving minimum.', how='min') rolling_sum = _rolling_func(algos.roll_sum, 'Moving sum.') rolling_mean = _rolling_func(algos.roll_mean, 'Moving mean.') -rolling_median = _rolling_func(algos.roll_median_cython, 'Moving median.', +rolling_median = _rolling_func(algos.roll_median_c, 'Moving median.', how='median') _ts_std = lambda *a, **kw: _zsqrt(algos.roll_var(*a, **kw)) @@ -888,11 +888,11 @@ def call_cython(arg, window, minp, args=(), kwargs={}, **kwds): return f -expanding_max = _expanding_func(algos.roll_max2, 'Expanding maximum.') -expanding_min = _expanding_func(algos.roll_min2, 'Expanding minimum.') +expanding_max = _expanding_func(algos.roll_max, 'Expanding maximum.') +expanding_min = _expanding_func(algos.roll_min, 'Expanding minimum.') expanding_sum = _expanding_func(algos.roll_sum, 'Expanding sum.') expanding_mean = _expanding_func(algos.roll_mean, 'Expanding mean.') -expanding_median = _expanding_func(algos.roll_median_cython, 'Expanding median.') +expanding_median = _expanding_func(algos.roll_median_c, 'Expanding median.') expanding_std = _expanding_func(_ts_std, 'Expanding standard deviation.', check_minp=_require_min_periods(1),