From ffbeda7284a6ebc7ba4ecdd7e7e16f9812a36a06 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 23 Sep 2021 07:49:35 -0700 Subject: [PATCH] PERF: Improve performance in rolling.mean(engine="numba") (#43612) --- doc/source/whatsnew/v1.4.0.rst | 2 +- pandas/core/_numba/__init__.py | 0 pandas/core/_numba/executor.py | 59 ++++++++++++ pandas/core/_numba/kernels/__init__.py | 3 + pandas/core/_numba/kernels/mean_.py | 119 +++++++++++++++++++++++++ pandas/core/window/rolling.py | 54 +++++++++-- pandas/tests/window/test_numba.py | 34 ++++--- 7 files changed, 250 insertions(+), 21 deletions(-) create mode 100644 pandas/core/_numba/__init__.py create mode 100644 pandas/core/_numba/executor.py create mode 100644 pandas/core/_numba/kernels/__init__.py create mode 100644 pandas/core/_numba/kernels/mean_.py diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 6a342c059203b..f6e90a3341424 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -357,7 +357,7 @@ Performance improvements - Performance improvement in :meth:`GroupBy.quantile` (:issue:`43469`) - :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`) - Performance improvement in :meth:`SparseArray.take` with ``allow_fill=False`` (:issue:`43654`) -- +- Performance improvement in :meth:`.Rolling.mean` and :meth:`.Expanding.mean` with ``engine="numba"`` (:issue:`43612`) .. --------------------------------------------------------------------------- diff --git a/pandas/core/_numba/__init__.py b/pandas/core/_numba/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py new file mode 100644 index 0000000000000..c666bb1a0ad4b --- /dev/null +++ b/pandas/core/_numba/executor.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import Callable + +import numpy as np + +from pandas._typing import Scalar +from pandas.compat._optional import import_optional_dependency + +from pandas.core.util.numba_ import ( + NUMBA_FUNC_CACHE, + get_jit_arguments, +) + + +def generate_shared_aggregator( + func: Callable[..., Scalar], + engine_kwargs: dict[str, bool] | None, + cache_key_str: str, +): + """ + Generate a Numba function that loops over the columns 2D object and applies + a 1D numba kernel over each column. + + Parameters + ---------- + func : function + aggregation function to be applied to each column + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + cache_key_str: str + string to access the compiled function of the form + _ e.g. rolling_mean, groupby_mean + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs, None) + + cache_key = (func, cache_key_str) + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba = import_optional_dependency("numba") + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def column_looper( + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + min_periods: int, + ): + result = np.empty((len(start), values.shape[1]), dtype=np.float64) + for i in numba.prange(values.shape[1]): + result[:, i] = func(values[:, i], start, end, min_periods) + return result + + return column_looper diff --git a/pandas/core/_numba/kernels/__init__.py b/pandas/core/_numba/kernels/__init__.py new file mode 100644 index 0000000000000..eb43de1e0d979 --- /dev/null +++ b/pandas/core/_numba/kernels/__init__.py @@ -0,0 +1,3 @@ +from pandas.core._numba.kernels.mean_ import sliding_mean + +__all__ = ["sliding_mean"] diff --git a/pandas/core/_numba/kernels/mean_.py b/pandas/core/_numba/kernels/mean_.py new file mode 100644 index 0000000000000..32ea505513ed0 --- /dev/null +++ b/pandas/core/_numba/kernels/mean_.py @@ -0,0 +1,119 @@ +""" +Numba 1D aggregation kernels that can be shared by +* Dataframe / Series +* groupby +* rolling / expanding + +Mirrors pandas/_libs/window/aggregation.pyx +""" +from __future__ import annotations + +import numba +import numpy as np + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def is_monotonic_increasing(bounds: np.ndarray) -> bool: + """Check if int64 values are monotonically increasing.""" + n = len(bounds) + if n < 2: + return True + prev = bounds[0] + for i in range(1, n): + cur = bounds[i] + if cur < prev: + return False + prev = cur + return True + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def add_mean( + val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float +) -> tuple[int, float, int, float]: + if not np.isnan(val): + nobs += 1 + y = val - compensation + t = sum_x + y + compensation = t - sum_x - y + sum_x = t + if val < 0: + neg_ct += 1 + return nobs, sum_x, neg_ct, compensation + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def remove_mean( + val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float +) -> tuple[int, float, int, float]: + if not np.isnan(val): + nobs -= 1 + y = -val - compensation + t = sum_x + y + compensation = t - sum_x - y + sum_x = t + if val < 0: + neg_ct -= 1 + return nobs, sum_x, neg_ct, compensation + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def sliding_mean( + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + min_periods: int, +) -> np.ndarray: + N = len(start) + nobs = 0 + sum_x = 0.0 + neg_ct = 0 + compensation_add = 0.0 + compensation_remove = 0.0 + + is_monotonic_increasing_bounds = is_monotonic_increasing( + start + ) and is_monotonic_increasing(end) + + output = np.empty(N, dtype=np.float64) + + for i in range(N): + s = start[i] + e = end[i] + if i == 0 or not is_monotonic_increasing_bounds: + for j in range(s, e): + val = values[j] + nobs, sum_x, neg_ct, compensation_add = add_mean( + val, nobs, sum_x, neg_ct, compensation_add + ) + else: + for j in range(start[i - 1], s): + val = values[j] + nobs, sum_x, neg_ct, compensation_remove = remove_mean( + val, nobs, sum_x, neg_ct, compensation_remove + ) + + for j in range(end[i - 1], e): + val = values[j] + nobs, sum_x, neg_ct, compensation_add = add_mean( + val, nobs, sum_x, neg_ct, compensation_add + ) + + if nobs >= min_periods and nobs > 0: + result = sum_x / nobs + if neg_ct == 0 and result < 0: + result = 0 + elif neg_ct == nobs and result > 0: + result = 0 + else: + result = np.nan + + output[i] = result + + if not is_monotonic_increasing_bounds: + nobs = 0 + sum_x = 0.0 + neg_ct = 0 + compensation_remove = 0.0 + + return output diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 503a884578e8b..2060f2d701276 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -49,6 +49,7 @@ ) from pandas.core.dtypes.missing import notna +from pandas.core._numba import executor from pandas.core.algorithms import factorize from pandas.core.apply import ResamplerWindowApply from pandas.core.arrays import ExtensionArray @@ -576,6 +577,44 @@ def calc(x): else: return self._apply_tablewise(homogeneous_func, name) + def _numba_apply( + self, + func: Callable[..., Any], + numba_cache_key_str: str, + engine_kwargs: dict[str, bool] | None = None, + ): + window_indexer = self._get_window_indexer() + min_periods = ( + self.min_periods + if self.min_periods is not None + else window_indexer.window_size + ) + obj = self._create_data(self._selected_obj) + if self.axis == 1: + obj = obj.T + values = self._prep_values(obj.to_numpy()) + if values.ndim == 1: + values = values.reshape(-1, 1) + start, end = window_indexer.get_window_bounds( + num_values=len(values), + min_periods=min_periods, + center=self.center, + closed=self.closed, + ) + aggregator = executor.generate_shared_aggregator( + func, engine_kwargs, numba_cache_key_str + ) + result = aggregator(values, start, end, min_periods) + NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator + result = result.T if self.axis == 1 else result + if obj.ndim == 1: + result = result.squeeze() + out = obj._constructor(result, index=obj.index, name=obj.name) + return out + else: + out = obj._constructor(result, index=obj.index, columns=obj.columns) + return self._resolve_output(out, obj) + def aggregate(self, func, *args, **kwargs): result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg() if result is None: @@ -1331,15 +1370,16 @@ def mean( if maybe_use_numba(engine): if self.method == "table": func = generate_manual_numpy_nan_agg_with_axis(np.nanmean) + return self.apply( + func, + raw=True, + engine=engine, + engine_kwargs=engine_kwargs, + ) else: - func = np.nanmean + from pandas.core._numba.kernels import sliding_mean - return self.apply( - func, - raw=True, - engine=engine, - engine_kwargs=engine_kwargs, - ) + return self._numba_apply(sliding_mean, "rolling_mean", engine_kwargs) window_func = window_aggregations.roll_mean return self._apply(window_func, name="mean", **kwargs) diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 1086857f38b62..af2ca7270c982 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -43,44 +43,52 @@ def f(x, *args): ) tm.assert_series_equal(result, expected) + @pytest.mark.parametrize( + "data", [DataFrame(np.eye(5)), Series(range(5), name="foo")] + ) def test_numba_vs_cython_rolling_methods( - self, nogil, parallel, nopython, arithmetic_numba_supported_operators + self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators ): method = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} - df = DataFrame(np.eye(5)) - roll = df.rolling(2) + roll = data.rolling(2) result = getattr(roll, method)(engine="numba", engine_kwargs=engine_kwargs) expected = getattr(roll, method)(engine="cython") # Check the cache - assert (getattr(np, f"nan{method}"), "Rolling_apply_single") in NUMBA_FUNC_CACHE + if method != "mean": + assert ( + getattr(np, f"nan{method}"), + "Rolling_apply_single", + ) in NUMBA_FUNC_CACHE - tm.assert_frame_equal(result, expected) + tm.assert_equal(result, expected) + @pytest.mark.parametrize("data", [DataFrame(np.eye(5)), Series(range(5))]) def test_numba_vs_cython_expanding_methods( - self, nogil, parallel, nopython, arithmetic_numba_supported_operators + self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators ): method = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} - df = DataFrame(np.eye(5)) - expand = df.expanding() + data = DataFrame(np.eye(5)) + expand = data.expanding() result = getattr(expand, method)(engine="numba", engine_kwargs=engine_kwargs) expected = getattr(expand, method)(engine="cython") # Check the cache - assert ( - getattr(np, f"nan{method}"), - "Expanding_apply_single", - ) in NUMBA_FUNC_CACHE + if method != "mean": + assert ( + getattr(np, f"nan{method}"), + "Expanding_apply_single", + ) in NUMBA_FUNC_CACHE - tm.assert_frame_equal(result, expected) + tm.assert_equal(result, expected) @pytest.mark.parametrize("jit", [True, False]) def test_cache_apply(self, jit, nogil, parallel, nopython):