Skip to content

BaseTransformer

yohou.base.transformer.BaseTransformer

Bases: BaseEstimator

Base class for time series transformers.

Yohou transformers operate on polars DataFrames with a mandatory "time" column and support stateful windowing via observe, rewind, and observe_transform methods.

Attributes

Name Type Description
feature_names_in_ list[str]

Names of the non-time columns seen during fit.

n_features_in_ int

Number of non-time columns seen during fit.

X_schema_ dict[str, DataType]

Column name to dtype mapping seen during fit.

interval_ str

Detected time interval of the training data (e.g., "1d", "1h").

Notes

Transformers can be stateful (observation_horizon > 0) or stateless (observation_horizon == 0). Stateful transformers maintain an internal memory buffer of the most recent observation_horizon rows, which is updated by observe() and truncated by rewind().

All transformers preserve the "time" column through transform() and inverse_transform().

See Also

Source Code

Show/Hide source
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
class BaseTransformer(BaseEstimator, metaclass=abc.ABCMeta):
    """Base class for time series transformers.

    Yohou transformers operate on polars DataFrames with a mandatory
    ``"time"`` column and support stateful windowing via ``observe``,
    ``rewind``, and ``observe_transform`` methods.

    Attributes
    ----------
    feature_names_in_ : list[str]
        Names of the non-time columns seen during ``fit``.
    n_features_in_ : int
        Number of non-time columns seen during ``fit``.
    X_schema_ : dict[str, pl.DataType]
        Column name to dtype mapping seen during ``fit``.
    interval_ : str
        Detected time interval of the training data (e.g., ``"1d"``,
        ``"1h"``).

    Notes
    -----
    Transformers can be **stateful** (``observation_horizon > 0``) or
    **stateless** (``observation_horizon == 0``).  Stateful transformers
    maintain an internal memory buffer of the most recent
    ``observation_horizon`` rows, which is updated by ``observe()`` and
    truncated by ``rewind()``.

    All transformers preserve the ``"time"`` column through
    ``transform()`` and ``inverse_transform()``.

    See Also
    --------
    - [`BaseForecaster`][yohou.base.forecaster.BaseForecaster] : Base class for forecasters.
    - [`LagTransformer`][yohou.preprocessing.window.LagTransformer] : Creates lagged features from time series.
    - [`SeasonalDifferencing`][yohou.stationarity.transformers.SeasonalDifferencing] : Stateful seasonal differencing transformer.

    """

    _parameter_constraints: dict = {}

    # Fitted attributes (set during fit())
    _observation_horizon: int
    feature_names_in_: list[str]
    n_features_in_: int
    X_schema_: dict[str, pl.DataType]
    interval_: str

    def __init_subclass__(cls, **kwargs: Any) -> None:
        """Merge parameter constraints from all classes in the MRO."""
        super().__init_subclass__(**kwargs)
        # Auto-merge _parameter_constraints from all classes in the MRO.
        merged: dict = {}
        for klass in reversed(cls.__mro__):
            own = klass.__dict__.get("_parameter_constraints")
            if own and isinstance(own, dict):
                merged.update(own)
        cls._parameter_constraints = merged

    def __sklearn_tags__(self) -> Tags:
        """Get estimator tags.

        Returns
        -------
        Tags
            Estimator tags with yohou-specific attributes.

        """
        # Create Tags with transformer-specific defaults
        tags = Tags(estimator_type="transformer", requires_fit=True)

        assert tags.transformer_tags is not None

        # Default to non-invertible; subclasses set _tags = {"invertible": True}
        tags.transformer_tags.invertible = False

        # Merge class-level _tags dict (flat keys) into tag dataclasses.
        # Walk MRO in reverse so most-derived class wins.
        merged_tags: dict[str, Any] = {}
        for klass in reversed(type(self).__mro__):
            class_tags = klass.__dict__.get("_tags")
            if class_tags and isinstance(class_tags, dict):
                merged_tags.update(class_tags)

        if merged_tags:
            for key, value in merged_tags.items():
                if tags.transformer_tags is not None and hasattr(tags.transformer_tags, key):
                    setattr(tags.transformer_tags, key, value)
                elif tags.input_tags is not None and hasattr(tags.input_tags, key):
                    setattr(tags.input_tags, key, value)
                elif tags.target_tags is not None and hasattr(tags.target_tags, key):
                    setattr(tags.target_tags, key, value)
                elif hasattr(tags, key):
                    setattr(tags, key, value)

        return tags

    @property
    def observation_horizon(self) -> int:
        """Get the number of time steps needed for stateful operations.

        The observation horizon defines how many recent observations the transformer
        needs to maintain in its memory.

        Returns
        -------
        int
            Number of time steps to retain.

        Raises
        ------
        NotFittedError
            If the transformer has not been fitted yet.

        """
        check_is_fitted(self, "_observation_horizon")
        return self._observation_horizon

    def _update_X_observed(self, X: pl.DataFrame) -> None:
        """Update stored observed data for stateful transformations.

        Parameters
        ----------
        X : pl.DataFrame
            Feature time series.

        """
        if self.observation_horizon > 0:
            if self.observation_horizon > len(X):
                raise ValueError("Not enough input data to set the transformer memory.")

            self._X_observed = X[-self.observation_horizon :]
            self.observed_time_ = X["time"][-1]
        else:
            self._X_observed = X[:0]
            # For stateless transformers, only update observed_time_ if X is non-empty
            if len(X) > 0:
                self.observed_time_ = X["time"][-1]

    @_fit_context(prefer_skip_nested_validation=True)
    def fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> "BaseTransformer":
        """Fit the transformer to input data.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        y : pl.DataFrame or None, default=None
            Ignored.  Present for API compatibility.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        self
            The fitted transformer instance.

        Raises
        ------
        ValueError
            If ``X`` does not have a ``"time"`` column, or if time intervals
            are inconsistent.

        """
        # Validate inputs and set fitted attributes (feature_names_in_, n_features_in_, X_schema_, interval_)
        X = validate_transformer_data(self, X=X, reset=True)

        if not hasattr(self, "_observation_horizon"):
            self._observation_horizon = 0

        # Router transformers would call process_routing() in their fit function

        self._fit(X, y)

        # Sync _observation_horizon with the property after _fit() completes.
        # This handles @property overrides that compute from constructor params.
        self._observation_horizon = self.observation_horizon

        self._update_X_observed(X)

        return self

    def _fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None) -> None:
        """Subclass hook called at the end of ``fit()``.

        Override this to implement custom fitting logic.  The default
        implementation does nothing.

        Parameters
        ----------
        X : pl.DataFrame
            Validated input time series (with ``"time"`` column).
        y : pl.DataFrame or None
            Ignored.  Present for API compatibility.

        """

    def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> pl.DataFrame:
        """Fit the transformer and return transformed data.

        Equivalent to calling ``fit(X).transform(X)``.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        y : pl.DataFrame or None, default=None
            Ignored.  Present for API compatibility.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        Raises
        ------
        ValueError
            If ``X`` is missing the ``"time"`` column or contains invalid data.

        """
        self.fit(X, y, **params)
        return self.transform(X, **params)

    def rewind(self, X: pl.DataFrame) -> "BaseTransformer":
        """Rewind internal memory to the last ``observation_horizon`` rows.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.

        Returns
        -------
        self
            The transformer with internal memory rewound to the last
            ``observation_horizon`` rows of the provided data.

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If the transformer has not been fitted yet.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
        # Validate against fitted state (no continuity check - rewind sets new window)
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)

        self._update_X_observed(X)

        return self

    def observe(self, X: pl.DataFrame) -> "BaseTransformer":
        """Observe new data and update internal memory.

        Extends the internal memory buffer with new observations, then
        calls ``rewind()`` to maintain the fixed ``observation_horizon``
        window.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns containing new observations.

        Returns
        -------
        self
            The transformer with updated internal memory from new
            observations.

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If the transformer has not been fitted.
        ValueError
            If ``X`` contains overlapping data with existing observations.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
        # Validate against fitted state (includes continuity check)
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

        self.rewind(pl.concat([self._X_observed, X]))

        return self

    def transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
        """Transform the input time series.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)
        return self._transform(X)

    def _transform(self, X: pl.DataFrame) -> pl.DataFrame:
        """Subclass hook called by ``transform()``.

        Override this to implement the core transformation logic. The
        input ``X`` has already been validated.

        Parameters
        ----------
        X : pl.DataFrame
            Validated input time series.

        Returns
        -------
        pl.DataFrame
            Transformed time series.

        """
        raise NotImplementedError(f"{type(self).__name__} must implement _transform() or override transform().")

    def inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame | None = None) -> pl.DataFrame:
        """Inverse-transform the data back to the original space.

        Parameters
        ----------
        X_t : pl.DataFrame
            Transformed time series to invert.
        X_p : pl.DataFrame or None, default=None
            Past observations needed by stateful transformers.

        Returns
        -------
        pl.DataFrame
            Data in the original (pre-transform) space.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
        return self._inverse_transform(X_t, X_p=X_p)

    def _inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame | None = None) -> pl.DataFrame:
        """Subclass hook called by ``inverse_transform()``.

        Override this to implement the inverse transformation logic.

        Parameters
        ----------
        X_t : pl.DataFrame
            Transformed time series to invert.
        X_p : pl.DataFrame or None
            Past observations needed by stateful transformers.

        Returns
        -------
        pl.DataFrame
            Data in the original space.

        """
        raise NotImplementedError(f"{type(self).__name__} does not support inverse_transform.")

    def observe_transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
        """Transform using pre-existing memory, then observe state.

        Performs a stateful transformation by concatenating stored
        observations with the new input, applying the transformation,
        and then updating the internal state.

        Equivalent to calling ``observe(X)`` then ``transform(X)``, but
        uses pre-existing memory for the transform.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If the transformer has not been fitted yet.
        ValueError
            If ``X`` has invalid structure or non-contiguous time index.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
        # Validate against fitted state (includes continuity check)
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

        # Route all params to transform only (observe is memory management)
        if self.observation_horizon > 0:
            X_full = pl.concat([self._X_observed, X])
            X_t = self.transform(X_full, **params)
            X_t = X_t[-len(X) :]
        else:
            X_t = self.transform(X, **params)

        self.observe(X)

        return X_t

    def rewind_transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
        """Transform the input and rewind state (stateless transform).

        Applies the transformation to the full input and then rewinds
        internal state.  Because ``transform()`` already drops the first
        ``observation_horizon`` rows for stateful transformers, the result
        has ``len(X) - observation_horizon`` rows.

        Equivalent to calling ``rewind(X)`` then ``transform(X)``.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with the first ``observation_horizon``
            rows discarded (by ``transform()``).

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If the transformer has not been fitted yet.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
        # Validate against fitted state (no continuity check - rewind sets new window)
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)

        # Apply transformation without using pre-existing observations.
        # transform() already drops the first observation_horizon rows for
        # stateful transformers, so no additional slicing is needed.
        X_t = self.transform(X, **params)

        # Rewind internal state with the input
        self.rewind(X)

        return X_t

    @abc.abstractmethod
    def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : list of str or None, default=None
            Column names of the input features.  If ``None``, uses the
            feature names seen during ``fit``.

        Returns
        -------
        list of str
            Output feature names after transformation.

        """

Methods

observation_horizon property

Get the number of time steps needed for stateful operations.

The observation horizon defines how many recent observations the transformer needs to maintain in its memory.

Returns
Type Description
int

Number of time steps to retain.

Raises
Type Description
NotFittedError

If the transformer has not been fitted yet.

__init_subclass__(**kwargs)

Merge parameter constraints from all classes in the MRO.

Source Code
Show/Hide source
def __init_subclass__(cls, **kwargs: Any) -> None:
    """Merge parameter constraints from all classes in the MRO."""
    super().__init_subclass__(**kwargs)
    # Auto-merge _parameter_constraints from all classes in the MRO.
    merged: dict = {}
    for klass in reversed(cls.__mro__):
        own = klass.__dict__.get("_parameter_constraints")
        if own and isinstance(own, dict):
            merged.update(own)
    cls._parameter_constraints = merged

__sklearn_tags__()

Get estimator tags.

Returns
Type Description
Tags

Estimator tags with yohou-specific attributes.

Source Code
Show/Hide source
def __sklearn_tags__(self) -> Tags:
    """Get estimator tags.

    Returns
    -------
    Tags
        Estimator tags with yohou-specific attributes.

    """
    # Create Tags with transformer-specific defaults
    tags = Tags(estimator_type="transformer", requires_fit=True)

    assert tags.transformer_tags is not None

    # Default to non-invertible; subclasses set _tags = {"invertible": True}
    tags.transformer_tags.invertible = False

    # Merge class-level _tags dict (flat keys) into tag dataclasses.
    # Walk MRO in reverse so most-derived class wins.
    merged_tags: dict[str, Any] = {}
    for klass in reversed(type(self).__mro__):
        class_tags = klass.__dict__.get("_tags")
        if class_tags and isinstance(class_tags, dict):
            merged_tags.update(class_tags)

    if merged_tags:
        for key, value in merged_tags.items():
            if tags.transformer_tags is not None and hasattr(tags.transformer_tags, key):
                setattr(tags.transformer_tags, key, value)
            elif tags.input_tags is not None and hasattr(tags.input_tags, key):
                setattr(tags.input_tags, key, value)
            elif tags.target_tags is not None and hasattr(tags.target_tags, key):
                setattr(tags.target_tags, key, value)
            elif hasattr(tags, key):
                setattr(tags, key, value)

    return tags

fit(X, y=None, **params)

Fit the transformer to input data.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
y DataFrame or None

Ignored. Present for API compatibility.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
self

The fitted transformer instance.

Raises
Type Description
ValueError

If X does not have a "time" column, or if time intervals are inconsistent.

Source Code
Show/Hide source
@_fit_context(prefer_skip_nested_validation=True)
def fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> "BaseTransformer":
    """Fit the transformer to input data.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    y : pl.DataFrame or None, default=None
        Ignored.  Present for API compatibility.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    self
        The fitted transformer instance.

    Raises
    ------
    ValueError
        If ``X`` does not have a ``"time"`` column, or if time intervals
        are inconsistent.

    """
    # Validate inputs and set fitted attributes (feature_names_in_, n_features_in_, X_schema_, interval_)
    X = validate_transformer_data(self, X=X, reset=True)

    if not hasattr(self, "_observation_horizon"):
        self._observation_horizon = 0

    # Router transformers would call process_routing() in their fit function

    self._fit(X, y)

    # Sync _observation_horizon with the property after _fit() completes.
    # This handles @property overrides that compute from constructor params.
    self._observation_horizon = self.observation_horizon

    self._update_X_observed(X)

    return self

fit_transform(X, y=None, **params)

Fit the transformer and return transformed data.

Equivalent to calling fit(X).transform(X).

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
y DataFrame or None

Ignored. Present for API compatibility.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with a "time" column and transformed value columns.

Raises
Type Description
ValueError

If X is missing the "time" column or contains invalid data.

Source Code
Show/Hide source
def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> pl.DataFrame:
    """Fit the transformer and return transformed data.

    Equivalent to calling ``fit(X).transform(X)``.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    y : pl.DataFrame or None, default=None
        Ignored.  Present for API compatibility.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with a ``"time"`` column and transformed
        value columns.

    Raises
    ------
    ValueError
        If ``X`` is missing the ``"time"`` column or contains invalid data.

    """
    self.fit(X, y, **params)
    return self.transform(X, **params)

rewind(X)

Rewind internal memory to the last observation_horizon rows.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
Returns
Type Description
self

The transformer with internal memory rewound to the last observation_horizon rows of the provided data.

Raises
Type Description
NotFittedError

If the transformer has not been fitted yet.

Source Code
Show/Hide source
def rewind(self, X: pl.DataFrame) -> "BaseTransformer":
    """Rewind internal memory to the last ``observation_horizon`` rows.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.

    Returns
    -------
    self
        The transformer with internal memory rewound to the last
        ``observation_horizon`` rows of the provided data.

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If the transformer has not been fitted yet.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
    # Validate against fitted state (no continuity check - rewind sets new window)
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)

    self._update_X_observed(X)

    return self

observe(X)

Observe new data and update internal memory.

Extends the internal memory buffer with new observations, then calls rewind() to maintain the fixed observation_horizon window.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns containing new observations.

required
Returns
Type Description
self

The transformer with updated internal memory from new observations.

Raises
Type Description
NotFittedError

If the transformer has not been fitted.

ValueError

If X contains overlapping data with existing observations.

Source Code
Show/Hide source
def observe(self, X: pl.DataFrame) -> "BaseTransformer":
    """Observe new data and update internal memory.

    Extends the internal memory buffer with new observations, then
    calls ``rewind()`` to maintain the fixed ``observation_horizon``
    window.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns containing new observations.

    Returns
    -------
    self
        The transformer with updated internal memory from new
        observations.

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If the transformer has not been fitted.
    ValueError
        If ``X`` contains overlapping data with existing observations.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
    # Validate against fitted state (includes continuity check)
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

    self.rewind(pl.concat([self._X_observed, X]))

    return self

transform(X, **params)

Transform the input time series.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with a "time" column and transformed value columns.

Source Code
Show/Hide source
def transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
    """Transform the input time series.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with a ``"time"`` column and transformed
        value columns.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)
    return self._transform(X)

inverse_transform(X_t, X_p=None)

Inverse-transform the data back to the original space.

Parameters
Name Type Description Default
X_t DataFrame

Transformed time series to invert.

required
X_p DataFrame or None

Past observations needed by stateful transformers.

None
Returns
Type Description
DataFrame

Data in the original (pre-transform) space.

Source Code
Show/Hide source
def inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame | None = None) -> pl.DataFrame:
    """Inverse-transform the data back to the original space.

    Parameters
    ----------
    X_t : pl.DataFrame
        Transformed time series to invert.
    X_p : pl.DataFrame or None, default=None
        Past observations needed by stateful transformers.

    Returns
    -------
    pl.DataFrame
        Data in the original (pre-transform) space.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
    return self._inverse_transform(X_t, X_p=X_p)

observe_transform(X, **params)

Transform using pre-existing memory, then observe state.

Performs a stateful transformation by concatenating stored observations with the new input, applying the transformation, and then updating the internal state.

Equivalent to calling observe(X) then transform(X), but uses pre-existing memory for the transform.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with a "time" column and transformed value columns.

Raises
Type Description
NotFittedError

If the transformer has not been fitted yet.

ValueError

If X has invalid structure or non-contiguous time index.

Source Code
Show/Hide source
def observe_transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
    """Transform using pre-existing memory, then observe state.

    Performs a stateful transformation by concatenating stored
    observations with the new input, applying the transformation,
    and then updating the internal state.

    Equivalent to calling ``observe(X)`` then ``transform(X)``, but
    uses pre-existing memory for the transform.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with a ``"time"`` column and transformed
        value columns.

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If the transformer has not been fitted yet.
    ValueError
        If ``X`` has invalid structure or non-contiguous time index.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
    # Validate against fitted state (includes continuity check)
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

    # Route all params to transform only (observe is memory management)
    if self.observation_horizon > 0:
        X_full = pl.concat([self._X_observed, X])
        X_t = self.transform(X_full, **params)
        X_t = X_t[-len(X) :]
    else:
        X_t = self.transform(X, **params)

    self.observe(X)

    return X_t

rewind_transform(X, **params)

Transform the input and rewind state (stateless transform).

Applies the transformation to the full input and then rewinds internal state. Because transform() already drops the first observation_horizon rows for stateful transformers, the result has len(X) - observation_horizon rows.

Equivalent to calling rewind(X) then transform(X).

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with the first observation_horizon rows discarded (by transform()).

Raises
Type Description
NotFittedError

If the transformer has not been fitted yet.

Source Code
Show/Hide source
def rewind_transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
    """Transform the input and rewind state (stateless transform).

    Applies the transformation to the full input and then rewinds
    internal state.  Because ``transform()`` already drops the first
    ``observation_horizon`` rows for stateful transformers, the result
    has ``len(X) - observation_horizon`` rows.

    Equivalent to calling ``rewind(X)`` then ``transform(X)``.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with the first ``observation_horizon``
        rows discarded (by ``transform()``).

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If the transformer has not been fitted yet.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_"])
    # Validate against fitted state (no continuity check - rewind sets new window)
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)

    # Apply transformation without using pre-existing observations.
    # transform() already drops the first observation_horizon rows for
    # stateful transformers, so no additional slicing is needed.
    X_t = self.transform(X, **params)

    # Rewind internal state with the input
    self.rewind(X)

    return X_t

get_feature_names_out(input_features=None) abstractmethod

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features list of str or None

Column names of the input features. If None, uses the feature names seen during fit.

None
Returns
Type Description
list of str

Output feature names after transformation.

Source Code
Show/Hide source
@abc.abstractmethod
def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : list of str or None, default=None
        Column names of the input features.  If ``None``, uses the
        feature names seen during ``fit``.

    Returns
    -------
    list of str
        Output feature names after transformation.

    """

Tutorials

The following example notebooks use this component:

  • How to Create a Custom Transformer


    Getting-Started

    Implement a ScaleTransformer from scratch, validate it with the check generator, and use it in a forecast pipeline.

    View · Open in marimo