diff --git a/doc/source/computation.rst b/doc/source/computation.rst index a37cbc96b2d8c..cd90ba6e9ca1a 100644 --- a/doc/source/computation.rst +++ b/doc/source/computation.rst @@ -459,6 +459,48 @@ default of the index) in a DataFrame. dft dft.rolling('2s', on='foo').sum() +.. _stats.rolling_window.endpoints: + +Rolling Window Endpoints +~~~~~~~~~~~~~~~~~~~~~~~~ + +.. versionadded:: 0.20.0 + +The inclusion of the interval endpoints in rolling window calculations can be specified with the ``closed`` +parameter: + +.. csv-table:: + :header: "``closed``", "Description", "Default for" + :widths: 20, 30, 30 + + ``right``, close right endpoint, time-based windows + ``left``, close left endpoint, + ``both``, close both endpoints, fixed windows + ``neither``, open endpoints, + +For example, having the right endpoint open is useful in many problems that require that there is no contamination +from present information back to past information. This allows the rolling window to compute statistics +"up to that point in time", but not including that point in time. + +.. ipython:: python + + df = pd.DataFrame({'x': [1]*5}, + index = [pd.Timestamp('20130101 09:00:01'), + pd.Timestamp('20130101 09:00:02'), + pd.Timestamp('20130101 09:00:03'), + pd.Timestamp('20130101 09:00:04'), + pd.Timestamp('20130101 09:00:06')]) + + df["right"] = df.rolling('2s', closed='right').x.sum() # default + df["both"] = df.rolling('2s', closed='both').x.sum() + df["left"] = df.rolling('2s', closed='left').x.sum() + df["neither"] = df.rolling('2s', closed='neither').x.sum() + + df + +Currently, this feature is only implemented for time-based windows. +For fixed windows, the closed parameter cannot be set and the rolling window will always have both endpoints closed. + .. _stats.moments.ts-versus-resampling: Time-aware Rolling vs. Resampling diff --git a/doc/source/whatsnew/v0.20.0.txt b/doc/source/whatsnew/v0.20.0.txt index fd1cd3d0022c9..db650dc6569eb 100644 --- a/doc/source/whatsnew/v0.20.0.txt +++ b/doc/source/whatsnew/v0.20.0.txt @@ -319,6 +319,7 @@ To convert a ``SparseDataFrame`` back to sparse SciPy matrix in COO format, you Other Enhancements ^^^^^^^^^^^^^^^^^^ +- ``DataFrame.rolling()`` now accepts the parameter ``closed='right'|'left'|'both'|'neither'`` to choose the rolling window endpoint closedness. See the :ref:`documentation ` (:issue:`13965`) - Integration with the ``feather-format``, including a new top-level ``pd.read_feather()`` and ``DataFrame.to_feather()`` method, see :ref:`here `. - ``Series.str.replace()`` now accepts a callable, as replacement, which is passed to ``re.sub`` (:issue:`15055`) - ``Series.str.replace()`` now accepts a compiled regular expression as a pattern (:issue:`15446`) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index ad56ea44a0dc6..86978a9739ca4 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -5962,12 +5962,12 @@ def _add_series_or_dataframe_operations(cls): @Appender(rwindow.rolling.__doc__) def rolling(self, window, min_periods=None, freq=None, center=False, - win_type=None, on=None, axis=0): + win_type=None, on=None, axis=0, closed=None): axis = self._get_axis_number(axis) return rwindow.rolling(self, window=window, min_periods=min_periods, freq=freq, center=center, win_type=win_type, - on=on, axis=axis) + on=on, axis=axis, closed=closed) cls.rolling = rolling diff --git a/pandas/core/window.py b/pandas/core/window.py index 89d2f5b24d77e..5b84b075ce81a 100644 --- a/pandas/core/window.py +++ b/pandas/core/window.py @@ -56,11 +56,12 @@ class _Window(PandasObject, SelectionMixin): _attributes = ['window', 'min_periods', 'freq', 'center', 'win_type', - 'axis', 'on'] + 'axis', 'on', 'closed'] exclusions = set() def __init__(self, obj, window=None, min_periods=None, freq=None, - center=False, win_type=None, axis=0, on=None, **kwargs): + center=False, win_type=None, axis=0, on=None, closed=None, + **kwargs): if freq is not None: warnings.warn("The freq kw is deprecated and will be removed in a " @@ -71,6 +72,7 @@ def __init__(self, obj, window=None, min_periods=None, freq=None, self.blocks = [] self.obj = obj self.on = on + self.closed = closed self.window = window self.min_periods = min_periods self.freq = freq @@ -101,6 +103,10 @@ def validate(self): if self.min_periods is not None and not \ is_integer(self.min_periods): raise ValueError("min_periods must be an integer") + if self.closed is not None and self.closed not in \ + ['right', 'both', 'left', 'neither']: + raise ValueError("closed must be 'right', 'left', 'both' or " + "'neither'") def _convert_freq(self, how=None): """ resample according to the how, return a new object """ @@ -374,8 +380,14 @@ class Window(_Window): on : string, optional For a DataFrame, column on which to calculate the rolling window, rather than the index + closed : string, default None + Make the interval closed on the 'right', 'left', 'both' or + 'neither' endpoints. + For offset-based windows, it defaults to 'right'. + For fixed windows, defaults to 'both'. Remaining cases not implemented + for fixed windows. - .. versionadded:: 0.19.0 + .. versionadded:: 0.20.0 axis : int or string, default 0 @@ -717,12 +729,12 @@ def _apply(self, func, name=None, window=None, center=None, raise ValueError("we do not support this function " "in _window.{0}".format(func)) - def func(arg, window, min_periods=None): + def func(arg, window, min_periods=None, closed=None): minp = check_minp(min_periods, window) # ensure we are only rolling on floats arg = _ensure_float64(arg) return cfunc(arg, - window, minp, indexi, **kwargs) + window, minp, indexi, closed, **kwargs) # calculation function if center: @@ -731,11 +743,13 @@ def func(arg, window, min_periods=None): def calc(x): return func(np.concatenate((x, additional_nans)), - window, min_periods=self.min_periods) + window, min_periods=self.min_periods, + closed=self.closed) else: def calc(x): - return func(x, window, min_periods=self.min_periods) + return func(x, window, min_periods=self.min_periods, + closed=self.closed) with np.errstate(all='ignore'): if values.ndim > 1: @@ -768,7 +782,8 @@ def count(self): for b in blocks: result = b.notnull().astype(int) result = self._constructor(result, window=window, min_periods=0, - center=self.center).sum() + center=self.center, + closed=self.closed).sum() results.append(result) return self._wrap_results(results, blocks, obj) @@ -789,11 +804,10 @@ def apply(self, func, args=(), kwargs={}): offset = _offset(window, self.center) index, indexi = self._get_index() - def f(arg, window, min_periods): + def f(arg, window, min_periods, closed): minp = _use_window(min_periods, window) - return _window.roll_generic(arg, window, minp, indexi, - offset, func, args, - kwargs) + return _window.roll_generic(arg, window, minp, indexi, closed, + offset, func, args, kwargs) return self._apply(f, func, args=args, kwargs=kwargs, center=False) @@ -864,7 +878,7 @@ def std(self, ddof=1, *args, **kwargs): def f(arg, *args, **kwargs): minp = _require_min_periods(1)(self.min_periods, window) return _zsqrt(_window.roll_var(arg, window, minp, indexi, - ddof)) + self.closed, ddof)) return self._apply(f, 'std', check_minp=_require_min_periods(1), ddof=ddof, **kwargs) @@ -911,7 +925,7 @@ def quantile(self, quantile, **kwargs): def f(arg, *args, **kwargs): minp = _use_window(self.min_periods, window) return _window.roll_quantile(arg, window, minp, indexi, - quantile) + self.closed, quantile) return self._apply(f, 'quantile', quantile=quantile, **kwargs) @@ -1044,6 +1058,10 @@ def validate(self): elif self.window < 0: raise ValueError("window must be non-negative") + if not self.is_datetimelike and self.closed is not None: + raise ValueError("closed only implemented for datetimelike " + "and offset based windows") + def _validate_monotonic(self): """ validate on is monotonic """ if not self._on.is_monotonic: diff --git a/pandas/core/window.pyx b/pandas/core/window.pyx index a06e616002ee2..3bb8abe26c781 100644 --- a/pandas/core/window.pyx +++ b/pandas/core/window.pyx @@ -158,9 +158,14 @@ cdef class MockFixedWindowIndexer(WindowIndexer): index of the input floor: optional unit for flooring + left_closed: bint + left endpoint closedness + right_closed: bint + right endpoint closedness """ def __init__(self, ndarray input, int64_t win, int64_t minp, + bint left_closed, bint right_closed, object index=None, object floor=None): assert index is None @@ -191,9 +196,14 @@ cdef class FixedWindowIndexer(WindowIndexer): index of the input floor: optional unit for flooring the unit + left_closed: bint + left endpoint closedness + right_closed: bint + right endpoint closedness """ def __init__(self, ndarray input, int64_t win, int64_t minp, + bint left_closed, bint right_closed, object index=None, object floor=None): cdef ndarray start_s, start_e, end_s, end_e @@ -229,10 +239,16 @@ cdef class VariableWindowIndexer(WindowIndexer): min number of obs in a window to consider non-NaN index: ndarray index of the input + left_closed: bint + left endpoint closedness + True if the left endpoint is closed, False if open + right_closed: bint + right endpoint closedness + True if the right endpoint is closed, False if open """ def __init__(self, ndarray input, int64_t win, int64_t minp, - ndarray index): + bint left_closed, bint right_closed, ndarray index): self.is_variable = 1 self.N = len(index) @@ -244,12 +260,13 @@ cdef class VariableWindowIndexer(WindowIndexer): self.end = np.empty(self.N, dtype='int64') self.end.fill(-1) - self.build(index, win) + self.build(index, win, left_closed, right_closed) # max window size self.win = (self.end - self.start).max() - def build(self, ndarray[int64_t] index, int64_t win): + def build(self, ndarray[int64_t] index, int64_t win, bint left_closed, + bint right_closed): cdef: ndarray[int64_t] start, end @@ -261,7 +278,13 @@ cdef class VariableWindowIndexer(WindowIndexer): N = self.N start[0] = 0 - end[0] = 1 + + # right endpoint is closed + if right_closed: + end[0] = 1 + # right endpoint is open + else: + end[0] = 0 with nogil: @@ -271,6 +294,10 @@ cdef class VariableWindowIndexer(WindowIndexer): end_bound = index[i] start_bound = index[i] - win + # left endpoint is closed + if left_closed: + start_bound -= 1 + # advance the start bound until we are # within the constraint start[i] = i @@ -286,9 +313,13 @@ cdef class VariableWindowIndexer(WindowIndexer): else: end[i] = end[i - 1] + # right endpoint is open + if not right_closed: + end[i] -= 1 + -def get_window_indexer(input, win, minp, index, floor=None, - use_mock=True): +def get_window_indexer(input, win, minp, index, closed, + floor=None, use_mock=True): """ return the correct window indexer for the computation @@ -299,6 +330,10 @@ def get_window_indexer(input, win, minp, index, floor=None, minp: integer, minimum periods index: 1d ndarray, optional index to the input array + closed: string, default None + {'right', 'left', 'both', 'neither'} + window endpoint closedness. Defaults to 'right' in + VariableWindowIndexer and to 'both' in FixedWindowIndexer floor: optional unit for flooring the unit use_mock: boolean, default True @@ -307,18 +342,38 @@ def get_window_indexer(input, win, minp, index, floor=None, compat Indexer that allows us to use a standard code path with all of the indexers. + Returns ------- tuple of 1d int64 ndarrays of the offsets & data about the window """ + cdef: + bint left_closed = False + bint right_closed = False + + assert closed is None or closed in ['right', 'left', 'both', 'neither'] + + # if windows is variable, default is 'right', otherwise default is 'both' + if closed is None: + closed = 'right' if index is not None else 'both' + + if closed in ['right', 'both']: + right_closed = True + + if closed in ['left', 'both']: + left_closed = True + if index is not None: - indexer = VariableWindowIndexer(input, win, minp, index) + indexer = VariableWindowIndexer(input, win, minp, left_closed, + right_closed, index) elif use_mock: - indexer = MockFixedWindowIndexer(input, win, minp, index, floor) + indexer = MockFixedWindowIndexer(input, win, minp, left_closed, + right_closed, index, floor) else: - indexer = FixedWindowIndexer(input, win, minp, index, floor) + indexer = FixedWindowIndexer(input, win, minp, left_closed, + right_closed, index, floor) return indexer.get_data() # ---------------------------------------------------------------------- @@ -327,7 +382,7 @@ def get_window_indexer(input, win, minp, index, floor=None, def roll_count(ndarray[double_t] input, int64_t win, int64_t minp, - object index): + object index, object closed): cdef: double val, count_x = 0.0 int64_t s, e, nobs, N @@ -336,7 +391,7 @@ def roll_count(ndarray[double_t] input, int64_t win, int64_t minp, ndarray[double_t] output start, end, N, win, minp, _ = get_window_indexer(input, win, - minp, index) + minp, index, closed) output = np.empty(N, dtype=float) with nogil: @@ -408,7 +463,7 @@ cdef inline void remove_sum(double val, int64_t *nobs, double *sum_x) nogil: def roll_sum(ndarray[double_t] input, int64_t win, int64_t minp, - object index): + object index, object closed): cdef: double val, prev_x, sum_x = 0 int64_t s, e @@ -418,7 +473,8 @@ def roll_sum(ndarray[double_t] input, int64_t win, int64_t minp, ndarray[double_t] output start, end, N, win, minp, is_variable = get_window_indexer(input, win, - minp, index) + minp, index, + closed) output = np.empty(N, dtype=float) # for performance we are going to iterate @@ -523,7 +579,7 @@ cdef inline void remove_mean(double val, Py_ssize_t *nobs, double *sum_x, def roll_mean(ndarray[double_t] input, int64_t win, int64_t minp, - object index): + object index, object closed): cdef: double val, prev_x, result, sum_x = 0 int64_t s, e @@ -533,7 +589,8 @@ def roll_mean(ndarray[double_t] input, int64_t win, int64_t minp, ndarray[double_t] output start, end, N, win, minp, is_variable = get_window_indexer(input, win, - minp, index) + minp, index, + closed) output = np.empty(N, dtype=float) # for performance we are going to iterate @@ -647,7 +704,7 @@ cdef inline void remove_var(double val, double *nobs, double *mean_x, def roll_var(ndarray[double_t] input, int64_t win, int64_t minp, - object index, int ddof=1): + object index, object closed, int ddof=1): """ Numerically stable implementation using Welford's method. """ @@ -660,7 +717,8 @@ def roll_var(ndarray[double_t] input, int64_t win, int64_t minp, ndarray[double_t] output start, end, N, win, minp, is_variable = get_window_indexer(input, win, - minp, index) + minp, index, + closed) output = np.empty(N, dtype=float) # Check for windows larger than array, addresses #7297 @@ -789,7 +847,7 @@ cdef inline void remove_skew(double val, int64_t *nobs, double *x, double *xx, def roll_skew(ndarray[double_t] input, int64_t win, int64_t minp, - object index): + object index, object closed): cdef: double val, prev double x = 0, xx = 0, xxx = 0 @@ -800,7 +858,8 @@ def roll_skew(ndarray[double_t] input, int64_t win, int64_t minp, ndarray[double_t] output start, end, N, win, minp, is_variable = get_window_indexer(input, win, - minp, index) + minp, index, + closed) output = np.empty(N, dtype=float) if is_variable: @@ -916,7 +975,7 @@ cdef inline void remove_kurt(double val, int64_t *nobs, double *x, double *xx, def roll_kurt(ndarray[double_t] input, int64_t win, int64_t minp, - object index): + object index, object closed): cdef: double val, prev double x = 0, xx = 0, xxx = 0, xxxx = 0 @@ -927,7 +986,8 @@ def roll_kurt(ndarray[double_t] input, int64_t win, int64_t minp, ndarray[double_t] output start, end, N, win, minp, is_variable = get_window_indexer(input, win, - minp, index) + minp, index, + closed) output = np.empty(N, dtype=float) if is_variable: @@ -985,11 +1045,11 @@ def roll_kurt(ndarray[double_t] input, int64_t win, int64_t minp, def roll_median_c(ndarray[float64_t] input, int64_t win, int64_t minp, - object index): + object index, object closed): cdef: double val, res, prev - bint err=0, is_variable - int ret=0 + bint err = 0, is_variable + int ret = 0 skiplist_t *sl Py_ssize_t i, j int64_t nobs = 0, N, s, e @@ -1001,7 +1061,7 @@ def roll_median_c(ndarray[float64_t] input, int64_t win, int64_t minp, # actual skiplist ops outweigh any window computation costs start, end, N, win, minp, is_variable = get_window_indexer( input, win, - minp, index, + minp, index, closed, use_mock=False) output = np.empty(N, dtype=float) @@ -1111,7 +1171,7 @@ cdef inline numeric calc_mm(int64_t minp, Py_ssize_t nobs, def roll_max(ndarray[numeric] input, int64_t win, int64_t minp, - object index): + object index, object closed): """ Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -1123,12 +1183,15 @@ def roll_max(ndarray[numeric] input, int64_t win, int64_t minp, is below this, output a NaN index: ndarray, optional index for window computation + closed: 'right', 'left', 'both', 'neither' + make the interval closed on the right, left, + both or neither endpoints """ - return _roll_min_max(input, win, minp, index, is_max=1) + return _roll_min_max(input, win, minp, index, closed=closed, is_max=1) def roll_min(ndarray[numeric] input, int64_t win, int64_t minp, - object index): + object index, object closed): """ Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -1141,11 +1204,11 @@ def roll_min(ndarray[numeric] input, int64_t win, int64_t minp, index: ndarray, optional index for window computation """ - return _roll_min_max(input, win, minp, index, is_max=0) + return _roll_min_max(input, win, minp, index, is_max=0, closed=closed) cdef _roll_min_max(ndarray[numeric] input, int64_t win, int64_t minp, - object index, bint is_max): + object index, object closed, bint is_max): """ Moving min/max of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -1170,7 +1233,7 @@ cdef _roll_min_max(ndarray[numeric] input, int64_t win, int64_t minp, starti, endi, N, win, minp, is_variable = get_window_indexer( input, win, - minp, index) + minp, index, closed) output = np.empty(N, dtype=input.dtype) @@ -1272,7 +1335,8 @@ cdef _roll_min_max(ndarray[numeric] input, int64_t win, int64_t minp, def roll_quantile(ndarray[float64_t, cast=True] input, int64_t win, - int64_t minp, object index, double quantile): + int64_t minp, object index, object closed, + double quantile): """ O(N log(window)) implementation using skip list """ @@ -1292,7 +1356,7 @@ def roll_quantile(ndarray[float64_t, cast=True] input, int64_t win, # actual skiplist ops outweigh any window computation costs start, end, N, win, minp, is_variable = get_window_indexer( input, win, - minp, index, + minp, index, closed, use_mock=False) output = np.empty(N, dtype=float) skiplist = IndexableSkiplist(win) @@ -1335,7 +1399,7 @@ def roll_quantile(ndarray[float64_t, cast=True] input, int64_t win, def roll_generic(ndarray[float64_t, cast=True] input, - int64_t win, int64_t minp, object index, + int64_t win, int64_t minp, object index, object closed, int offset, object func, object args, object kwargs): cdef: @@ -1355,12 +1419,13 @@ def roll_generic(ndarray[float64_t, cast=True] input, start, end, N, win, minp, is_variable = get_window_indexer(input, win, minp, index, + closed, floor=0) output = np.empty(N, dtype=float) counts = roll_sum(np.concatenate([np.isfinite(input).astype(float), np.array([0.] * offset)]), - win, minp, index)[offset:] + win, minp, index, closed)[offset:] if is_variable: diff --git a/pandas/tests/test_window.py b/pandas/tests/test_window.py index 5fc31e9321f31..3929aba858797 100644 --- a/pandas/tests/test_window.py +++ b/pandas/tests/test_window.py @@ -431,6 +431,12 @@ def test_numpy_compat(self): tm.assertRaisesRegexp(UnsupportedFunctionCall, msg, getattr(r, func), dtype=np.float64) + def test_closed(self): + df = DataFrame({'A': [0, 1, 2, 3, 4]}) + # closed only allowed for datetimelike + with self.assertRaises(ValueError): + df.rolling(window=3, closed='neither') + class TestExpanding(Base): @@ -3385,6 +3391,45 @@ def test_min_periods(self): result = df.rolling('2s', min_periods=1).sum() tm.assert_frame_equal(result, expected) + def test_closed(self): + + # xref GH13965 + + df = DataFrame({'A': [1] * 5}, + index=[pd.Timestamp('20130101 09:00:01'), + pd.Timestamp('20130101 09:00:02'), + pd.Timestamp('20130101 09:00:03'), + pd.Timestamp('20130101 09:00:04'), + pd.Timestamp('20130101 09:00:06')]) + + # closed must be 'right', 'left', 'both', 'neither' + with self.assertRaises(ValueError): + self.regular.rolling(window='2s', closed="blabla") + + expected = df.copy() + expected["A"] = [1.0, 2, 2, 2, 1] + result = df.rolling('2s', closed='right').sum() + tm.assert_frame_equal(result, expected) + + # default should be 'right' + result = df.rolling('2s').sum() + tm.assert_frame_equal(result, expected) + + expected = df.copy() + expected["A"] = [1.0, 2, 3, 3, 2] + result = df.rolling('2s', closed='both').sum() + tm.assert_frame_equal(result, expected) + + expected = df.copy() + expected["A"] = [np.nan, 1.0, 2, 2, 1] + result = df.rolling('2s', closed='left').sum() + tm.assert_frame_equal(result, expected) + + expected = df.copy() + expected["A"] = [np.nan, 1.0, 1, 1, np.nan] + result = df.rolling('2s', closed='neither').sum() + tm.assert_frame_equal(result, expected) + def test_ragged_sum(self): df = self.ragged