Skip to content

TimeIndexTransformer

yohou.preprocessing.time_features.TimeIndexTransformer

Bases: BaseTransformer

Convert the time column to a numeric index with optional polynomial terms.

Produces integer or normalized time step indices starting from the first observed timestamp, useful as trend features for reduction forecasters.

The base index at time step \(t\) is:

\[x(t) = \frac{t - t_0}{\Delta t}\]

where \(t_0\) is the first observed timestamp and \(\Delta t\) is the detected time interval. When normalize=True, the index is scaled by the number of fit steps:

\[\tilde{x}(t) = \frac{x(t)}{N - 1}\]

where \(N\) is n_steps_. Polynomial features of degree \(d\) are then \(\tilde{x}(t),\, \tilde{x}(t)^2,\, \ldots,\, \tilde{x}(t)^d\).

Parameters

Name Type Description Default
normalize bool

If True, scale the index to [0, 1] based on the number of steps in the fit data. Values can exceed this range for data beyond the fit range.

False
degree int

Polynomial degree. 1 produces a linear index only, 2 adds a quadratic term, and so on.

1

Attributes

Name Type Description
first_time_ datetime

First observed timestamp, used as reference for index computation.

n_steps_ int

Number of time steps in the fit data. Used as normalization denominator (n_steps_ - 1).

See Also

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> time = pl.datetime_range(
...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 11), interval="1d", eager=True
... )
>>> X = pl.DataFrame({"time": time, "value": range(len(time))})
>>> transformer = TimeIndexTransformer(degree=2)
>>> transformer.fit(X)
TimeIndexTransformer(degree=2)
>>> X_t = transformer.transform(X)
>>> X_t["time_index"][0]
0
>>> "time_index_2" in X_t.columns
True

Source Code

Show/Hide source
class TimeIndexTransformer(BaseTransformer):
    r"""Convert the time column to a numeric index with optional polynomial terms.

    Produces integer or normalized time step indices starting from the
    first observed timestamp, useful as trend features for reduction
    forecasters.

    The base index at time step $t$ is:

    $$x(t) = \frac{t - t_0}{\Delta t}$$

    where $t_0$ is the first observed timestamp and $\Delta t$ is the
    detected time interval. When ``normalize=True``, the index is
    scaled by the number of fit steps:

    $$\tilde{x}(t) = \frac{x(t)}{N - 1}$$

    where $N$ is ``n_steps_``. Polynomial features of degree $d$ are
    then $\tilde{x}(t),\, \tilde{x}(t)^2,\, \ldots,\, \tilde{x}(t)^d$.

    Parameters
    ----------
    normalize : bool, default=False
        If ``True``, scale the index to ``[0, 1]`` based on the number
        of steps in the fit data. Values can exceed this range for
        data beyond the fit range.
    degree : int, default=1
        Polynomial degree. ``1`` produces a linear index only, ``2``
        adds a quadratic term, and so on.

    Attributes
    ----------
    first_time_ : datetime
        First observed timestamp, used as reference for index
        computation.
    n_steps_ : int
        Number of time steps in the fit data. Used as normalization
        denominator (``n_steps_ - 1``).

    See Also
    --------
    - [`CalendarFeatureTransformer`][yohou.preprocessing.calendar.CalendarFeatureTransformer] : Calendar features (month, day of week, etc.).
    - [`FourierFeatureTransformer`][yohou.preprocessing.time_features.FourierFeatureTransformer] : Sin/cos harmonics for cyclical encoding.
    - [`PolynomialTrendForecaster`][yohou.stationarity.trend.PolynomialTrendForecaster] : Forecaster-level polynomial trend estimation.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> time = pl.datetime_range(
    ...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 11), interval="1d", eager=True
    ... )
    >>> X = pl.DataFrame({"time": time, "value": range(len(time))})
    >>> transformer = TimeIndexTransformer(degree=2)
    >>> transformer.fit(X)
    TimeIndexTransformer(degree=2)
    >>> X_t = transformer.transform(X)
    >>> X_t["time_index"][0]
    0
    >>> "time_index_2" in X_t.columns
    True

    """

    _parameter_constraints: dict = {
        "normalize": ["boolean"],
        "degree": [Interval(numbers.Integral, 1, None, closed="left")],
    }

    _PREFIX = "time_index"

    def __init__(
        self,
        normalize: bool = False,
        degree: StrictInt = 1,
    ):
        self.normalize = normalize
        self.degree = degree

    def _fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None) -> None:
        """Fit the internal model."""
        self.first_time_ = X["time"][0]
        self.n_steps_ = len(X)

        generated_names = []
        for d in range(1, self.degree + 1):
            col_name = self._PREFIX if d == 1 else f"{self._PREFIX}_{d}"
            generated_names.append(col_name)
        existing = set(X.columns) - {"time"}
        conflicts = set(generated_names) & existing
        if conflicts:
            raise ValueError(f"Generated column names {sorted(conflicts)} conflict with existing columns in X.")

    def _compute_index(self, X: pl.DataFrame) -> np.ndarray:
        """Compute the numeric time index for the given data.

        Parameters
        ----------
        X : pl.DataFrame
            DataFrame with ``"time"`` column.

        Returns
        -------
        np.ndarray
            Numeric time index array.

        """
        interval = self.interval_
        time_diff = X["time"] - self.first_time_

        if interval.endswith("mo") or interval.endswith("y"):
            first_time = self.first_time_
            months = (
                ((X["time"].dt.year() - first_time.year) * 12 + (X["time"].dt.month() - first_time.month))
                .to_numpy()
                .astype(np.float64)
            )
            if interval.endswith("y"):
                t = months / 12.0
            else:
                mo_count = int(interval.replace("mo", ""))
                t = months / mo_count
        else:
            t = time_diff.dt.total_seconds().to_numpy().astype(np.float64)
            if len(X) > 1:
                first_diff = (X["time"][1] - X["time"][0]).total_seconds()
                if first_diff != 0:
                    t = t / first_diff

        return t

    def _transform(self, X: pl.DataFrame) -> pl.DataFrame:
        """Generate time index features from the time column.

        Parameters
        ----------
        X : pl.DataFrame
            Validated input time series.

        Returns
        -------
        pl.DataFrame
            DataFrame with ``"time"`` column and time index features.

        """
        t = self._compute_index(X)

        feature_cols = []
        for d in range(1, self.degree + 1):
            col_name = self._PREFIX if d == 1 else f"{self._PREFIX}_{d}"
            if d == 1 and not self.normalize:
                values = t.astype(np.int64)
                feature_cols.append(pl.Series(col_name, values, dtype=pl.Int64))
            elif d == 1 and self.normalize:
                denom = max(self.n_steps_ - 1, 1)
                values = t / denom
                feature_cols.append(pl.Series(col_name, values, dtype=pl.Float64))
            else:
                if self.normalize:
                    denom = max(self.n_steps_ - 1, 1)
                    base = t / denom
                else:
                    base = t
                values = base**d
                feature_cols.append(pl.Series(col_name, values, dtype=pl.Float64))

        return X.select(pl.col("time")).with_columns(*feature_cols)

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : array-like of str or None, default=None
            Input feature names (unused, for API compatibility).

        Returns
        -------
        list of str
            Generated time index feature column names.

        """
        check_is_fitted(self, ["first_time_"])
        generated = []
        for d in range(1, self.degree + 1):
            col_name = self._PREFIX if d == 1 else f"{self._PREFIX}_{d}"
            generated.append(col_name)
        return generated

Methods

get_feature_names_out(input_features=None)

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features array-like of str or None

Input feature names (unused, for API compatibility).

None
Returns
Type Description
list of str

Generated time index feature column names.

Source Code
Show/Hide source
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : array-like of str or None, default=None
        Input feature names (unused, for API compatibility).

    Returns
    -------
    list of str
        Generated time index feature column names.

    """
    check_is_fitted(self, ["first_time_"])
    generated = []
    for d in range(1, self.degree + 1):
        col_name = self._PREFIX if d == 1 else f"{self._PREFIX}_{d}"
        generated.append(col_name)
    return generated

Tutorials

The following example notebooks use this component:

  • How to Add Calendar, Fourier, and Holiday Features


    Data-Features

    Enrich your feature matrix with time-derived signals using CalendarFeatureTransformer, FourierFeatureTransformer, and HolidayFeatureTransformer.

    View · Open in marimo