Skip to content

RollingStatisticsTransformer

yohou.preprocessing.window.RollingStatisticsTransformer

Bases: BaseTransformer

Compute rolling window statistics for time series.

This transformer computes one or more rolling statistics (mean, std, min, max, median, quantiles) over sliding windows. It is a convenience wrapper around polars rolling functions with a sklearn-compatible interface.

Parameters

Name Type Description Default
window_size int

Size of the rolling window. Must be >= 1.

7
statistics str or list of str

Statistic(s) to compute. Options: - "mean": Rolling mean - "std": Rolling standard deviation - "min": Rolling minimum - "max": Rolling maximum - "median": Rolling median - "sum": Rolling sum - "var": Rolling variance - "q25": 25th percentile - "q75": 75th percentile

"mean"

Attributes

Name Type Description
n_features_in_ int

Number of features seen during fit.

feature_names_in_ list of str

Names of features seen during fit.

statistics_ list of str

Effective list of statistics to compute.

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> from yohou.preprocessing import RollingStatisticsTransformer
>>> times = pl.datetime_range(
...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 10), interval="1d", eager=True
... )
>>> X = pl.DataFrame({"time": times, "value": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]})
>>> # Compute rolling mean with window size 3
>>> transformer = RollingStatisticsTransformer(window_size=3, statistics="mean")
>>> transformer.fit(X)
RollingStatisticsTransformer(window_size=3)
>>> X_t = transformer.transform(X)
>>> len(X_t)
8
>>> "value_mean" in X_t.columns
True
>>> # Multiple statistics
>>> transformer = RollingStatisticsTransformer(window_size=3, statistics=["mean", "std", "min", "max"])
>>> transformer.fit(X)
RollingStatisticsTransformer(...)
>>> X_t = transformer.transform(X)
>>> len([c for c in X_t.columns if c != "time"])
4

See Also

SlidingWindowFunctionTransformer : Apply custom function over windows. LagTransformer : Create lagged features.

Notes

Rolling statistics are computed via native polars rolling expressions (rolling_mean, rolling_std, etc.), which are significantly faster than Python-level iteration. Quantile statistics (q25, q75) use rolling_quantile with linear interpolation.

The first window_size - 1 rows produce nulls from incomplete windows and are dropped from the output, setting observation_horizon = window_size - 1.

Output column names follow the pattern {input_col}_{statistic}, e.g., "value_mean", "value_std".

Source Code

Show/Hide source
class RollingStatisticsTransformer(BaseTransformer):
    """Compute rolling window statistics for time series.

    This transformer computes one or more rolling statistics (mean, std, min,
    max, median, quantiles) over sliding windows. It is a convenience wrapper
    around polars rolling functions with a sklearn-compatible interface.

    Parameters
    ----------
    window_size : int, default=7
        Size of the rolling window. Must be >= 1.
    statistics : str or list of str, default="mean"
        Statistic(s) to compute. Options:
        - "mean": Rolling mean
        - "std": Rolling standard deviation
        - "min": Rolling minimum
        - "max": Rolling maximum
        - "median": Rolling median
        - "sum": Rolling sum
        - "var": Rolling variance
        - "q25": 25th percentile
        - "q75": 75th percentile

    Attributes
    ----------
    n_features_in_ : int
        Number of features seen during fit.
    feature_names_in_ : list of str
        Names of features seen during fit.
    statistics_ : list of str
        Effective list of statistics to compute.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> from yohou.preprocessing import RollingStatisticsTransformer

    >>> times = pl.datetime_range(
    ...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 10), interval="1d", eager=True
    ... )
    >>> X = pl.DataFrame({"time": times, "value": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]})

    >>> # Compute rolling mean with window size 3
    >>> transformer = RollingStatisticsTransformer(window_size=3, statistics="mean")
    >>> transformer.fit(X)
    RollingStatisticsTransformer(window_size=3)
    >>> X_t = transformer.transform(X)
    >>> len(X_t)
    8
    >>> "value_mean" in X_t.columns
    True

    >>> # Multiple statistics
    >>> transformer = RollingStatisticsTransformer(window_size=3, statistics=["mean", "std", "min", "max"])
    >>> transformer.fit(X)  # doctest: +ELLIPSIS
    RollingStatisticsTransformer(...)
    >>> X_t = transformer.transform(X)
    >>> len([c for c in X_t.columns if c != "time"])
    4

    See Also
    --------
    `SlidingWindowFunctionTransformer` : Apply custom function over windows.
    `LagTransformer` : Create lagged features.

    Notes
    -----
    Rolling statistics are computed via native polars rolling expressions
    (``rolling_mean``, ``rolling_std``, etc.), which are significantly faster
    than Python-level iteration. Quantile statistics (``q25``, ``q75``) use
    ``rolling_quantile`` with linear interpolation.

    The first ``window_size - 1`` rows produce nulls from incomplete windows
    and are dropped from the output, setting
    ``observation_horizon = window_size - 1``.

    Output column names follow the pattern ``{input_col}_{statistic}``,
    e.g., ``"value_mean"``, ``"value_std"``.

    """

    _valid_statistics = {"mean", "std", "min", "max", "median", "sum", "var", "q25", "q75"}

    _parameter_constraints: dict = {
        "window_size": [Interval(numbers.Integral, 1, None, closed="left")],
        "statistics": [str, list],
    }

    _tags = {"stateful": True}

    def __init__(
        self,
        window_size: int = 7,
        statistics: str | list[str] = "mean",
    ):
        self.window_size = window_size
        self.statistics = statistics

    @property
    def observation_horizon(self) -> int:  # noqa: D102
        """Return the number of past observations needed."""
        return self.window_size - 1

    def _fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None) -> None:
        """Fit the internal model."""
        # Normalize statistics to list
        if isinstance(self.statistics, str):
            self.statistics_ = [self.statistics]
        else:
            self.statistics_ = list(self.statistics)

        # Validate statistics
        invalid = set(self.statistics_) - self._valid_statistics
        if invalid:
            msg = f"Invalid statistics: {invalid}. Valid options: {self._valid_statistics}"
            raise ValueError(msg)

    def _apply_rolling_stat(self, col: pl.Expr, stat: str) -> pl.Expr:
        """Apply a rolling statistic to a column expression.

        Parameters
        ----------
        col : pl.Expr
            Column expression.
        stat : str
            Statistic name.

        Returns
        -------
        pl.Expr
            Rolling statistic expression.

        """
        if stat == "mean":
            return col.rolling_mean(self.window_size)
        elif stat == "std":
            return col.rolling_std(self.window_size)
        elif stat == "min":
            return col.rolling_min(self.window_size)
        elif stat == "max":
            return col.rolling_max(self.window_size)
        elif stat == "median":
            return col.rolling_median(self.window_size)
        elif stat == "sum":
            return col.rolling_sum(self.window_size)
        elif stat == "var":
            return col.rolling_var(self.window_size)
        elif stat == "q25":
            return col.rolling_quantile(0.25, window_size=self.window_size)
        elif stat == "q75":
            return col.rolling_quantile(0.75, window_size=self.window_size)
        else:
            msg = f"Unknown statistic: {stat}"
            raise ValueError(msg)

    def _transform(self, X: pl.DataFrame) -> pl.DataFrame:
        """Transform X by computing rolling statistics.

        Parameters
        ----------
        X : pl.DataFrame
            Validated input time series.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        """
        # Get data columns
        data_cols = [c for c in X.columns if c != "time"]

        # Build expressions for all statistics
        exprs = [pl.col("time")]
        for col_name in data_cols:
            for stat in self.statistics_:
                col_expr = pl.col(col_name)
                stat_expr = self._apply_rolling_stat(col_expr, stat)
                exprs.append(stat_expr.alias(f"{col_name}_{stat}"))

        X_t = X.select(exprs)

        # Drop first observation_horizon rows (contain nulls from incomplete windows)
        if self._observation_horizon > 0:
            X_t = X_t[self._observation_horizon :]

        return X_t

    def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : list of str or None, default=None
            Column names of the input features.  If ``None``, uses the
            feature names seen during ``fit``.

        Returns
        -------
        list of str
            Output feature names after transformation.

        """
        check_is_fitted(self, ["statistics_"])
        input_features = _check_feature_names_in(self, input_features)
        feature_names = [f"{col}_{stat}" for col in input_features for stat in self.statistics_]
        arr: list[str] = np.asarray(feature_names, dtype=object).tolist()
        return arr

Methods

observation_horizon property

Return the number of past observations needed.

get_feature_names_out(input_features=None)

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features list of str or None

Column names of the input features. If None, uses the feature names seen during fit.

None
Returns
Type Description
list of str

Output feature names after transformation.

Source Code
Show/Hide source
def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : list of str or None, default=None
        Column names of the input features.  If ``None``, uses the
        feature names seen during ``fit``.

    Returns
    -------
    list of str
        Output feature names after transformation.

    """
    check_is_fitted(self, ["statistics_"])
    input_features = _check_feature_names_in(self, input_features)
    feature_names = [f"{col}_{stat}" for col in input_features for stat in self.statistics_]
    arr: list[str] = np.asarray(feature_names, dtype=object).tolist()
    return arr

Tutorials

The following example notebooks use this component:

  • How to Compose Features with FeatureUnion


    Data-Features

    Combine lag features, rolling statistics, EMA, and scaling in parallel with FeatureUnion and automatic observation horizon resolution.

    View · Open in marimo

  • How to Build a Feature Pipeline


    Data-Features

    Nest FeaturePipeline, FeatureUnion, and DecompositionPipeline for multi-level feature engineering with trend-season-residual decomposition.

    View · Open in marimo

  • How to Apply Window Transformations


    Data-Features

    Feature engineering with LagTransformer, RollingStatisticsTransformer, SlidingWindowFunctionTransformer, and ExponentialMovingAverage on time series data.

    View · Open in marimo

  • How to Build Panel Feature Pipelines


    Panel-Data

    Combine ColumnForecaster, FeaturePipeline, FeatureUnion, and DecompositionPipeline on panel data with per-group scoring on KDD Cup air quality.

    View · Open in marimo

  • How to Preprocess Panel Data


    Panel-Data

    Automatic panel-aware transformation (StandardScaler, rolling stats, imputation) plus manual per-group workflows with get_group_df and dict_to_panel.

    View · Open in marimo