Skip to content

SlidingWindowFunctionTransformer

yohou.preprocessing.window.SlidingWindowFunctionTransformer

Bases: BaseTransformer

Transform time series by applying a function over sliding windows.

This transformer applies a user-defined function to sliding windows of the input time series. It is useful for computing rolling aggregates, custom statistics, or any windowed transformation.

The function receives a polars DataFrame containing window_size rows (one window) and should return a scalar or a 1D array for that window.

Parameters

Name Type Description Default
func callable

Function to apply to each sliding window. It receives a polars DataFrame with shape (window_size, n_features) and should return: - A scalar (applied to all columns) - A dict mapping column names to scalars - A numpy array of shape (n_features,)

required
window_size int

Size of the sliding window. Must be >= 1.

1
kw_args dict or None

Dictionary of additional keyword arguments to pass to func.

None

Attributes

Name Type Description
n_features_in_ int

Number of features seen during fit.

feature_names_in_ list of str

Names of features seen during fit.

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> import numpy as np
>>> from yohou.preprocessing import SlidingWindowFunctionTransformer
>>> times = pl.datetime_range(
...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 10), interval="1d", eager=True
... )
>>> X = pl.DataFrame({"time": times, "value": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]})
>>> # Compute rolling mean with window size 3
>>> def rolling_mean(window):
...     return window.select(pl.all().exclude("time").mean()).to_numpy().flatten()
>>> transformer = SlidingWindowFunctionTransformer(func=rolling_mean, window_size=3)
>>> transformer.fit(X)
SlidingWindowFunctionTransformer(...)
>>> X_t = transformer.transform(X)
>>> len(X_t)  # Original length minus (window_size - 1)
8

See Also

LagTransformer : Create lagged features. RollingStatisticsTransformer : Pre-built rolling statistics. FunctionTransformer : Apply function element-wise.

Source Code

Show/Hide source
class SlidingWindowFunctionTransformer(BaseTransformer):
    """Transform time series by applying a function over sliding windows.

    This transformer applies a user-defined function to sliding windows of the
    input time series. It is useful for computing rolling aggregates, custom
    statistics, or any windowed transformation.

    The function receives a polars DataFrame containing `window_size` rows
    (one window) and should return a scalar or a 1D array for that window.

    Parameters
    ----------
    func : callable
        Function to apply to each sliding window. It receives a polars DataFrame
        with shape (window_size, n_features) and should return:
        - A scalar (applied to all columns)
        - A dict mapping column names to scalars
        - A numpy array of shape (n_features,)
    window_size : int, default=1
        Size of the sliding window. Must be >= 1.
    kw_args : dict or None, default=None
        Dictionary of additional keyword arguments to pass to func.

    Attributes
    ----------
    n_features_in_ : int
        Number of features seen during fit.
    feature_names_in_ : list of str
        Names of features seen during fit.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> import numpy as np
    >>> from yohou.preprocessing import SlidingWindowFunctionTransformer

    >>> times = pl.datetime_range(
    ...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 10), interval="1d", eager=True
    ... )
    >>> X = pl.DataFrame({"time": times, "value": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]})

    >>> # Compute rolling mean with window size 3
    >>> def rolling_mean(window):
    ...     return window.select(pl.all().exclude("time").mean()).to_numpy().flatten()
    >>> transformer = SlidingWindowFunctionTransformer(func=rolling_mean, window_size=3)
    >>> transformer.fit(X)  # doctest: +ELLIPSIS
    SlidingWindowFunctionTransformer(...)
    >>> X_t = transformer.transform(X)
    >>> len(X_t)  # Original length minus (window_size - 1)
    8

    See Also
    --------
    `LagTransformer` : Create lagged features.
    `RollingStatisticsTransformer` : Pre-built rolling statistics.
    `FunctionTransformer` : Apply function element-wise.

    """

    _parameter_constraints: dict = {
        "func": [callable],
        "window_size": [Interval(numbers.Integral, 1, None, closed="left")],
        "kw_args": [dict, None],
    }

    _tags = {"stateful": True}

    def __init__(
        self,
        func: Callable,
        window_size: int = 1,
        *,
        kw_args: dict | None = None,
    ):
        self.func = func
        self.window_size = window_size
        self.kw_args = kw_args

    @property
    def observation_horizon(self) -> int:  # noqa: D102
        """Return the number of past observations needed."""
        return self.window_size - 1

    def _transform(self, X: pl.DataFrame) -> pl.DataFrame:
        """Transform X by applying func to sliding windows.

        Parameters
        ----------
        X : pl.DataFrame
            Validated input time series.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        """
        # Get data columns (excluding time)
        data_cols = [c for c in X.columns if c != "time"]

        # Apply function over sliding windows
        kwargs = self.kw_args if self.kw_args is not None else {}
        results = []
        time_values = []

        n_rows = len(X)
        for i in range(n_rows - self.window_size + 1):
            window = X[i : i + self.window_size]
            result = self.func(window, **kwargs)

            # Handle different return types
            if isinstance(result, dict):
                results.append(result)
            elif np.isscalar(result):
                results.append(dict.fromkeys(data_cols, result))
            elif isinstance(result, np.ndarray):
                results.append({col: result[j] for j, col in enumerate(data_cols)})
            else:
                results.append({col: float(result) for col in data_cols})

            # Time index is always the last point in the window
            time_idx = i + self.window_size - 1
            time_values.append(X["time"][time_idx])

        # Build output DataFrame
        result_df = pl.DataFrame(results)
        time_df = pl.DataFrame({"time": time_values})

        return pl.concat([time_df, result_df], how="horizontal")

    def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : list of str or None, default=None
            Column names of the input features.  If ``None``, uses the
            feature names seen during ``fit``.

        Returns
        -------
        list of str
            Output feature names after transformation.

        """
        input_features = _check_feature_names_in(self, input_features)
        arr: list[str] = np.asarray(input_features, dtype=object).tolist()
        return arr

Methods

observation_horizon property

Return the number of past observations needed.

get_feature_names_out(input_features=None)

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features list of str or None

Column names of the input features. If None, uses the feature names seen during fit.

None
Returns
Type Description
list of str

Output feature names after transformation.

Source Code
Show/Hide source
def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : list of str or None, default=None
        Column names of the input features.  If ``None``, uses the
        feature names seen during ``fit``.

    Returns
    -------
    list of str
        Output feature names after transformation.

    """
    input_features = _check_feature_names_in(self, input_features)
    arr: list[str] = np.asarray(input_features, dtype=object).tolist()
    return arr

Tutorials

The following example notebooks use this component:

  • How to Apply Window Transformations


    Data-Features

    Feature engineering with LagTransformer, RollingStatisticsTransformer, SlidingWindowFunctionTransformer, and ExponentialMovingAverage on time series data.

    View · Open in marimo