Skip to content

Commit

Permalink
PERF: Improve performance in rolling.mean(engine="numba") (#43612)
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke authored Sep 23, 2021
1 parent f3d4817 commit ffbeda7
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 21 deletions.
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ Performance improvements
- Performance improvement in :meth:`GroupBy.quantile` (:issue:`43469`)
- :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`)
- Performance improvement in :meth:`SparseArray.take` with ``allow_fill=False`` (:issue:`43654`)
-
- Performance improvement in :meth:`.Rolling.mean` and :meth:`.Expanding.mean` with ``engine="numba"`` (:issue:`43612`)

.. ---------------------------------------------------------------------------
Expand Down
Empty file added pandas/core/_numba/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions pandas/core/_numba/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

from typing import Callable

import numpy as np

from pandas._typing import Scalar
from pandas.compat._optional import import_optional_dependency

from pandas.core.util.numba_ import (
NUMBA_FUNC_CACHE,
get_jit_arguments,
)


def generate_shared_aggregator(
func: Callable[..., Scalar],
engine_kwargs: dict[str, bool] | None,
cache_key_str: str,
):
"""
Generate a Numba function that loops over the columns 2D object and applies
a 1D numba kernel over each column.
Parameters
----------
func : function
aggregation function to be applied to each column
engine_kwargs : dict
dictionary of arguments to be passed into numba.jit
cache_key_str: str
string to access the compiled function of the form
<caller_type>_<aggregation_type> e.g. rolling_mean, groupby_mean
Returns
-------
Numba function
"""
nopython, nogil, parallel = get_jit_arguments(engine_kwargs, None)

cache_key = (func, cache_key_str)
if cache_key in NUMBA_FUNC_CACHE:
return NUMBA_FUNC_CACHE[cache_key]

numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def column_looper(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
):
result = np.empty((len(start), values.shape[1]), dtype=np.float64)
for i in numba.prange(values.shape[1]):
result[:, i] = func(values[:, i], start, end, min_periods)
return result

return column_looper
3 changes: 3 additions & 0 deletions pandas/core/_numba/kernels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from pandas.core._numba.kernels.mean_ import sliding_mean

__all__ = ["sliding_mean"]
119 changes: 119 additions & 0 deletions pandas/core/_numba/kernels/mean_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Numba 1D aggregation kernels that can be shared by
* Dataframe / Series
* groupby
* rolling / expanding
Mirrors pandas/_libs/window/aggregation.pyx
"""
from __future__ import annotations

import numba
import numpy as np


@numba.jit(nopython=True, nogil=True, parallel=False)
def is_monotonic_increasing(bounds: np.ndarray) -> bool:
"""Check if int64 values are monotonically increasing."""
n = len(bounds)
if n < 2:
return True
prev = bounds[0]
for i in range(1, n):
cur = bounds[i]
if cur < prev:
return False
prev = cur
return True


@numba.jit(nopython=True, nogil=True, parallel=False)
def add_mean(
val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float
) -> tuple[int, float, int, float]:
if not np.isnan(val):
nobs += 1
y = val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct += 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=False)
def remove_mean(
val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float
) -> tuple[int, float, int, float]:
if not np.isnan(val):
nobs -= 1
y = -val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct -= 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_mean(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
) -> np.ndarray:
N = len(start)
nobs = 0
sum_x = 0.0
neg_ct = 0
compensation_add = 0.0
compensation_remove = 0.0

is_monotonic_increasing_bounds = is_monotonic_increasing(
start
) and is_monotonic_increasing(end)

output = np.empty(N, dtype=np.float64)

for i in range(N):
s = start[i]
e = end[i]
if i == 0 or not is_monotonic_increasing_bounds:
for j in range(s, e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean(
val, nobs, sum_x, neg_ct, compensation_add
)
else:
for j in range(start[i - 1], s):
val = values[j]
nobs, sum_x, neg_ct, compensation_remove = remove_mean(
val, nobs, sum_x, neg_ct, compensation_remove
)

for j in range(end[i - 1], e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean(
val, nobs, sum_x, neg_ct, compensation_add
)

if nobs >= min_periods and nobs > 0:
result = sum_x / nobs
if neg_ct == 0 and result < 0:
result = 0
elif neg_ct == nobs and result > 0:
result = 0
else:
result = np.nan

output[i] = result

if not is_monotonic_increasing_bounds:
nobs = 0
sum_x = 0.0
neg_ct = 0
compensation_remove = 0.0

return output
54 changes: 47 additions & 7 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from pandas.core.dtypes.missing import notna

from pandas.core._numba import executor
from pandas.core.algorithms import factorize
from pandas.core.apply import ResamplerWindowApply
from pandas.core.arrays import ExtensionArray
Expand Down Expand Up @@ -576,6 +577,44 @@ def calc(x):
else:
return self._apply_tablewise(homogeneous_func, name)

def _numba_apply(
self,
func: Callable[..., Any],
numba_cache_key_str: str,
engine_kwargs: dict[str, bool] | None = None,
):
window_indexer = self._get_window_indexer()
min_periods = (
self.min_periods
if self.min_periods is not None
else window_indexer.window_size
)
obj = self._create_data(self._selected_obj)
if self.axis == 1:
obj = obj.T
values = self._prep_values(obj.to_numpy())
if values.ndim == 1:
values = values.reshape(-1, 1)
start, end = window_indexer.get_window_bounds(
num_values=len(values),
min_periods=min_periods,
center=self.center,
closed=self.closed,
)
aggregator = executor.generate_shared_aggregator(
func, engine_kwargs, numba_cache_key_str
)
result = aggregator(values, start, end, min_periods)
NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator
result = result.T if self.axis == 1 else result
if obj.ndim == 1:
result = result.squeeze()
out = obj._constructor(result, index=obj.index, name=obj.name)
return out
else:
out = obj._constructor(result, index=obj.index, columns=obj.columns)
return self._resolve_output(out, obj)

def aggregate(self, func, *args, **kwargs):
result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
if result is None:
Expand Down Expand Up @@ -1331,15 +1370,16 @@ def mean(
if maybe_use_numba(engine):
if self.method == "table":
func = generate_manual_numpy_nan_agg_with_axis(np.nanmean)
return self.apply(
func,
raw=True,
engine=engine,
engine_kwargs=engine_kwargs,
)
else:
func = np.nanmean
from pandas.core._numba.kernels import sliding_mean

return self.apply(
func,
raw=True,
engine=engine,
engine_kwargs=engine_kwargs,
)
return self._numba_apply(sliding_mean, "rolling_mean", engine_kwargs)
window_func = window_aggregations.roll_mean
return self._apply(window_func, name="mean", **kwargs)

Expand Down
34 changes: 21 additions & 13 deletions pandas/tests/window/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,44 +43,52 @@ def f(x, *args):
)
tm.assert_series_equal(result, expected)

@pytest.mark.parametrize(
"data", [DataFrame(np.eye(5)), Series(range(5), name="foo")]
)
def test_numba_vs_cython_rolling_methods(
self, nogil, parallel, nopython, arithmetic_numba_supported_operators
self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators
):

method = arithmetic_numba_supported_operators

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}

df = DataFrame(np.eye(5))
roll = df.rolling(2)
roll = data.rolling(2)
result = getattr(roll, method)(engine="numba", engine_kwargs=engine_kwargs)
expected = getattr(roll, method)(engine="cython")

# Check the cache
assert (getattr(np, f"nan{method}"), "Rolling_apply_single") in NUMBA_FUNC_CACHE
if method != "mean":
assert (
getattr(np, f"nan{method}"),
"Rolling_apply_single",
) in NUMBA_FUNC_CACHE

tm.assert_frame_equal(result, expected)
tm.assert_equal(result, expected)

@pytest.mark.parametrize("data", [DataFrame(np.eye(5)), Series(range(5))])
def test_numba_vs_cython_expanding_methods(
self, nogil, parallel, nopython, arithmetic_numba_supported_operators
self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators
):

method = arithmetic_numba_supported_operators

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}

df = DataFrame(np.eye(5))
expand = df.expanding()
data = DataFrame(np.eye(5))
expand = data.expanding()
result = getattr(expand, method)(engine="numba", engine_kwargs=engine_kwargs)
expected = getattr(expand, method)(engine="cython")

# Check the cache
assert (
getattr(np, f"nan{method}"),
"Expanding_apply_single",
) in NUMBA_FUNC_CACHE
if method != "mean":
assert (
getattr(np, f"nan{method}"),
"Expanding_apply_single",
) in NUMBA_FUNC_CACHE

tm.assert_frame_equal(result, expected)
tm.assert_equal(result, expected)

@pytest.mark.parametrize("jit", [True, False])
def test_cache_apply(self, jit, nogil, parallel, nopython):
Expand Down

0 comments on commit ffbeda7

Please sign in to comment.