Skip to content

Downsampler

yohou.preprocessing.resampling.Downsampler

Bases: BaseTransformer

Downsample time series to a lower frequency using aggregation.

Reduces the frequency of time series data by grouping consecutive time points into bins and applying an aggregation function. Uses polars' group_by_dynamic for efficient windowed aggregation.

Parameters

Name Type Description Default
interval str

Target time interval (e.g., "1h", "1d", "5m", "30s"). Uses polars duration string syntax. Must be larger than the input data's interval.

'1h'
aggregation (mean, sum, min, max, first, last, median)

Aggregation function to apply within each time bin: - "mean": Average values in each bin - "sum": Sum values in each bin - "min": Minimum value in each bin - "max": Maximum value in each bin - "first": First value in each bin - "last": Last value in each bin - "median": Median value in each bin

"mean"
closed (left, right)

Which side of the interval is closed.

"left"
label (left, right)

Which side of the interval to use as the label for each bin.

"left"
include_boundaries bool

Whether to include the interval boundaries in output.

False

Attributes

Name Type Description
n_features_in_ int

Number of features seen during fit.

feature_names_in_ list of str

Names of features seen during fit.

input_interval_ timedelta or None

Detected time interval of input data.

target_interval_ timedelta or None

Target time interval.

Examples

>>> import polars as pl
>>> from datetime import datetime, timedelta
>>> from yohou.preprocessing import Downsampler
>>> # Create hourly data
>>> times = [datetime(2020, 1, 1) + timedelta(hours=i) for i in range(24)]
>>> X = pl.DataFrame({"time": times, "value": list(range(24))})
>>> # Downsample to daily (24h) using mean aggregation
>>> downsampler = Downsampler(interval="1d", aggregation="mean")
>>> downsampler.fit(X)
Downsampler(interval='1d')
>>> X_daily = downsampler.transform(X)
>>> len(X_daily) == 1  # Single day
True

See Also

  • Upsampler : Upsample time series to higher frequency.

Source Code

Show/Hide source
class Downsampler(BaseTransformer):
    """Downsample time series to a lower frequency using aggregation.

    Reduces the frequency of time series data by grouping consecutive time
    points into bins and applying an aggregation function. Uses polars'
    `group_by_dynamic` for efficient windowed aggregation.

    Parameters
    ----------
    interval : str
        Target time interval (e.g., "1h", "1d", "5m", "30s").
        Uses polars duration string syntax. Must be larger than the input
        data's interval.
    aggregation : {"mean", "sum", "min", "max", "first", "last", "median"}, default="mean"
        Aggregation function to apply within each time bin:
        - "mean": Average values in each bin
        - "sum": Sum values in each bin
        - "min": Minimum value in each bin
        - "max": Maximum value in each bin
        - "first": First value in each bin
        - "last": Last value in each bin
        - "median": Median value in each bin
    closed : {"left", "right"}, default="left"
        Which side of the interval is closed.
    label : {"left", "right"}, default="left"
        Which side of the interval to use as the label for each bin.
    include_boundaries : bool, default=False
        Whether to include the interval boundaries in output.

    Attributes
    ----------
    n_features_in_ : int
        Number of features seen during fit.
    feature_names_in_ : list of str
        Names of features seen during fit.
    input_interval_ : timedelta or None
        Detected time interval of input data.
    target_interval_ : timedelta or None
        Target time interval.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime, timedelta
    >>> from yohou.preprocessing import Downsampler

    >>> # Create hourly data
    >>> times = [datetime(2020, 1, 1) + timedelta(hours=i) for i in range(24)]
    >>> X = pl.DataFrame({"time": times, "value": list(range(24))})

    >>> # Downsample to daily (24h) using mean aggregation
    >>> downsampler = Downsampler(interval="1d", aggregation="mean")
    >>> downsampler.fit(X)
    Downsampler(interval='1d')
    >>> X_daily = downsampler.transform(X)
    >>> len(X_daily) == 1  # Single day
    True

    See Also
    --------
    - [`Upsampler`][yohou.preprocessing.resampling.Upsampler] : Upsample time series to higher frequency.

    """

    _valid_aggregations = {"mean", "sum", "min", "max", "first", "last", "median"}

    _parameter_constraints: dict = {
        "interval": [str],
        "aggregation": [StrOptions(_valid_aggregations)],
        "closed": [StrOptions({"left", "right"})],
        "label": [StrOptions({"left", "right"})],
        "include_boundaries": ["boolean"],
    }

    _tags = {"stateful": False}

    def __init__(
        self,
        interval: str = "1h",
        aggregation: Literal["mean", "sum", "min", "max", "first", "last", "median"] = "mean",
        closed: Literal["left", "right"] = "left",
        label: Literal["left", "right"] = "left",
        include_boundaries: bool = False,
    ):
        self.interval = interval
        self.aggregation = aggregation
        self.closed = closed
        self.label = label
        self.include_boundaries = include_boundaries

    def _fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None) -> None:
        """Fit the internal model."""
        # Detect input interval
        self.input_interval_str_ = check_interval_consistency(X)
        self.input_interval_ = interval_to_timedelta(self.input_interval_str_)
        self.target_interval_ = interval_to_timedelta(self.interval)

        # Normalize interval to polars-native format (e.g. "30min" → "30m")
        _mult, _unit = parse_interval(self.interval)
        self.polars_interval_ = f"{_mult}{_unit}"

        # Validate: target must be >= input for downsampling
        if (
            self.input_interval_ is not None
            and self.target_interval_ is not None
            and self.target_interval_ < self.input_interval_
        ):
            msg = (
                f"Target interval ({self.interval}) is smaller than input interval "
                f"({self.input_interval_str_}). Use Upsampler for increasing frequency."
            )
            raise ValueError(msg)

    def _transform(self, X: pl.DataFrame) -> pl.DataFrame:
        """Downsample time series to target frequency.

        Parameters
        ----------
        X : pl.DataFrame
            Validated input time series.

        Returns
        -------
        pl.DataFrame
            Downsampled time series.

        """
        # Get data columns
        data_cols = [c for c in X.columns if c != "time"]

        # Build aggregation expressions
        agg_exprs = []
        for col in data_cols:
            if self.aggregation == "mean":
                agg_exprs.append(pl.col(col).mean())
            elif self.aggregation == "sum":
                agg_exprs.append(pl.col(col).sum())
            elif self.aggregation == "min":
                agg_exprs.append(pl.col(col).min())
            elif self.aggregation == "max":
                agg_exprs.append(pl.col(col).max())
            elif self.aggregation == "first":
                agg_exprs.append(pl.col(col).first())
            elif self.aggregation == "last":
                agg_exprs.append(pl.col(col).last())
            elif self.aggregation == "median":
                agg_exprs.append(pl.col(col).median())

        result = (
            X
            .sort("time")
            .group_by_dynamic(
                "time",
                every=self.polars_interval_,
                closed=self.closed,
                label=self.label,
                include_boundaries=self.include_boundaries,
            )
            .agg(agg_exprs)
        )

        return result

    def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : list of str or None, default=None
            Column names of the input features.  If ``None``, uses the
            feature names seen during ``fit``.

        Returns
        -------
        list of str
            Output feature names after transformation.

        """
        check_is_fitted(self, ["feature_names_in_"])
        input_features = _check_feature_names_in(self, input_features)
        return list(input_features)

Methods

get_feature_names_out(input_features=None)

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features list of str or None

Column names of the input features. If None, uses the feature names seen during fit.

None
Returns
Type Description
list of str

Output feature names after transformation.

Source Code
Show/Hide source
def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : list of str or None, default=None
        Column names of the input features.  If ``None``, uses the
        feature names seen during ``fit``.

    Returns
    -------
    list of str
        Output feature names after transformation.

    """
    check_is_fitted(self, ["feature_names_in_"])
    input_features = _check_feature_names_in(self, input_features)
    return list(input_features)

Tutorials

The following example notebooks use this component:

  • How to Handle Long Series


    Data-Features

    Limit history with observation_horizon, weight recent errors with exponential decay, and downsample high-frequency data.

    View · Open in marimo

  • How to Resample Time Series


    Data-Features

    Demonstrate Downsampler and Upsampler for changing time series frequency, including multivariate support, boundary settings, and round-trip information loss.

    View · Open in marimo