Skip to content

window_futures

yohou.utils.pivot.window_futures(X_future, observation_times, forecasting_horizon, interval, *, time_col='time')

Window known-future features into step-indexed columns.

For each observation time T and forecast horizon H, extracts values at T + 1*interval through T + H*interval from X_future, producing step-indexed columns <col>_step_1 through <col>_step_H.

The output has one row per observation time and uses "time" as the time column (set to the observation time).

Parameters

Name Type Description Default
X_future DataFrame

Known-future data with a time_col column and one or more value columns. Values are deterministic (e.g., holidays, day-of-week).

required
observation_times Series

Series of observation timestamps to window from.

required
forecasting_horizon int

Number of forward steps (H) to extract per observation time.

required
interval str or timedelta

Time frequency between steps (e.g., "1d", "1h").

required
time_col str

Name of the time column in X_future.

"time"

Returns

Type Description
DataFrame

Wide DataFrame with [time, <col>_step_1, ..., <col>_step_H]. One row per observation time.

Raises

Type Description
ValueError

If time_col is not in X_future.

ValueError

If forecasting_horizon is not positive.

ValueError

If no value columns remain after removing time_col.

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> holidays = pl.DataFrame({
...     "time": [
...         datetime(2020, 1, 1),
...         datetime(2020, 1, 2),
...         datetime(2020, 1, 3),
...         datetime(2020, 1, 4),
...         datetime(2020, 1, 5),
...     ],
...     "is_holiday": [1, 0, 0, 1, 0],
... })
>>> obs_times = pl.Series([datetime(2020, 1, 1), datetime(2020, 1, 2)])
>>> window_futures(holidays, obs_times, forecasting_horizon=3, interval="1d")
shape: (2, 4)
┌─────────────────────┬───────────────────┬───────────────────┬───────────────────┐
│ time                ┆ is_holiday_step_1 ┆ is_holiday_step_2 ┆ is_holiday_step_3 │
│ ---                 ┆ ---               ┆ ---               ┆ ---               │
│ datetime[μs]        ┆ i64               ┆ i64               ┆ i64               │
╞═════════════════════╪═══════════════════╪═══════════════════╪═══════════════════╡
│ 2020-01-01 00:00:00 ┆ 0                 ┆ 0                 ┆ 1                 │
│ 2020-01-02 00:00:00 ┆ 0                 ┆ 1                 ┆ 0                 │
└─────────────────────┴───────────────────┴───────────────────┴───────────────────┘

Source Code

Show/Hide source
def window_futures(
    X_future: pl.DataFrame,
    observation_times: pl.Series,
    forecasting_horizon: int,
    interval: str | timedelta,
    *,
    time_col: str = "time",
) -> pl.DataFrame:
    """Window known-future features into step-indexed columns.

    For each observation time T and forecast horizon H, extracts values at
    ``T + 1*interval`` through ``T + H*interval`` from ``X_future``, producing
    step-indexed columns ``<col>_step_1`` through ``<col>_step_H``.

    The output has one row per observation time and uses ``"time"`` as the
    time column (set to the observation time).

    Parameters
    ----------
    X_future : pl.DataFrame
        Known-future data with a ``time_col`` column and one or more value
        columns. Values are deterministic (e.g., holidays, day-of-week).
    observation_times : pl.Series
        Series of observation timestamps to window from.
    forecasting_horizon : int
        Number of forward steps (H) to extract per observation time.
    interval : str or timedelta
        Time frequency between steps (e.g., ``"1d"``, ``"1h"``).
    time_col : str, default="time"
        Name of the time column in ``X_future``.

    Returns
    -------
    pl.DataFrame
        Wide DataFrame with ``[time, <col>_step_1, ..., <col>_step_H]``.
        One row per observation time.

    Raises
    ------
    ValueError
        If ``time_col`` is not in ``X_future``.
    ValueError
        If ``forecasting_horizon`` is not positive.
    ValueError
        If no value columns remain after removing ``time_col``.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> holidays = pl.DataFrame({
    ...     "time": [
    ...         datetime(2020, 1, 1),
    ...         datetime(2020, 1, 2),
    ...         datetime(2020, 1, 3),
    ...         datetime(2020, 1, 4),
    ...         datetime(2020, 1, 5),
    ...     ],
    ...     "is_holiday": [1, 0, 0, 1, 0],
    ... })
    >>> obs_times = pl.Series([datetime(2020, 1, 1), datetime(2020, 1, 2)])
    >>> window_futures(holidays, obs_times, forecasting_horizon=3, interval="1d")
    shape: (2, 4)
    ┌─────────────────────┬───────────────────┬───────────────────┬───────────────────┐
    │ time                ┆ is_holiday_step_1 ┆ is_holiday_step_2 ┆ is_holiday_step_3 │
    │ ---                 ┆ ---               ┆ ---               ┆ ---               │
    │ datetime[μs]        ┆ i64               ┆ i64               ┆ i64               │
    ╞═════════════════════╪═══════════════════╪═══════════════════╪═══════════════════╡
    │ 2020-01-01 00:00:00 ┆ 0                 ┆ 0                 ┆ 1                 │
    │ 2020-01-02 00:00:00 ┆ 0                 ┆ 1                 ┆ 0                 │
    └─────────────────────┴───────────────────┴───────────────────┴───────────────────┘

    """
    if time_col not in X_future.columns:
        msg = f"Column '{time_col}' not found in DataFrame. Available columns: {X_future.columns}"
        raise ValueError(msg)

    if forecasting_horizon < 1:
        msg = f"forecasting_horizon must be positive, got {forecasting_horizon}."
        raise ValueError(msg)

    value_cols = [c for c in X_future.columns if c != time_col]
    if not value_cols:
        msg = f"No value columns found. DataFrame has only '{time_col}'. At least one value column is required."
        raise ValueError(msg)

    # Build a lookup mapping from time → row values
    # Join approach: create a tidy representation with vintage_time, then pivot.
    rows: list[dict] = []
    for obs_time in observation_times.to_list():
        for step in range(1, forecasting_horizon + 1):
            target_time = add_interval(obs_time, interval, n=step)
            rows.append({"vintage_time": obs_time, "time": target_time, "_step": step})

    if not rows:
        # No observation times: return empty frame with correct schema
        cols = {"time": pl.Series([], dtype=observation_times.dtype)}
        for col in value_cols:
            for step in range(1, forecasting_horizon + 1):
                cols[f"{col}_step_{step}"] = pl.Series([], dtype=X_future[col].dtype)
        return pl.DataFrame(cols)

    lookup = pl.DataFrame(rows)

    # Join lookup with X_future on time to get values at each target time
    joined = lookup.join(
        X_future.select([time_col] + value_cols),
        left_on="time",
        right_on=time_col,
        how="left",
    )

    # Pivot to wide format
    pivot_exprs: list[pl.Expr] = []
    for step in range(1, forecasting_horizon + 1):
        for col in value_cols:
            pivot_exprs.append(
                pl.when(pl.col("_step") == step).then(pl.col(col)).otherwise(None).max().alias(f"{col}_step_{step}")
            )

    result = joined.group_by("vintage_time", maintain_order=True).agg(pivot_exprs)
    result = result.rename({"vintage_time": "time"})

    return result