Skip to content

FunctionTransformer

yohou.preprocessing.function.FunctionTransformer

Bases: BaseTransformer

Constructs a transformer from an arbitrary callable.

A FunctionTransformer forwards its X arguments to a user-defined function and returns the result. This is useful for both stateless transformations such as log scaling and stateful transformations such as diff that require past values.

The function receives a polars DataFrame (excluding the "time" column) and should return a polars DataFrame or numpy array with the same number of rows.

For stateful functions that produce NaN values at the beginning (e.g., diff), use fit_transform which auto-detects the warmup period by counting leading NaN rows. The transformer stores these initial rows for use in inverse_transform.

Parameters

Name Type Description Default
func callable or None

The callable to use for the transformation. This will be passed the DataFrame excluding the "time" column. If func is None, then func will be the identity function.

None
inverse_func callable or None

The callable to use for the inverse transformation. For stateful transforms (with auto-detected warmup), the past observations are passed as the second positional argument: inverse_func(X_t, X_p, **inv_kw_args). For stateless transforms, only inverse_func(X_t, **inv_kw_args) is called. If None, the identity function is used.

None
check_inverse bool

Whether to check that func followed by inverse_func leads to the original inputs. It can be used for a sanity check, raising a warning when the condition is not fulfilled.

True
kw_args dict or None

Dictionary of additional keyword arguments to pass to func.

None
inv_kw_args dict or None

Dictionary of additional keyword arguments to pass to inverse_func.

None
feature_names_out (callable, 'one-to-one' or None)

Determines the list of feature names that will be returned by the get_feature_names_out method. If 'one-to-one', the output feature names equal input feature names. If a callable, it must take two arguments (self, input_features) and return output feature names.

None

Attributes

Name Type Description
n_features_in_ int

Number of features seen during fit.

feature_names_in_ list of str

Names of features seen during fit.

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> import numpy as np
>>> from yohou.preprocessing import FunctionTransformer
>>> times = pl.datetime_range(
...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 10), interval="1d", eager=True
... )
>>> X = pl.DataFrame({"time": times, "value": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]})
>>> # Log transformation
>>> transformer = FunctionTransformer(func=np.log, inverse_func=np.exp)
>>> transformer.fit(X)
FunctionTransformer(func=<ufunc 'log'>, inverse_func=<ufunc 'exp'>)
>>> X_t = transformer.transform(X)
>>> "time" in X_t.columns
True
>>> # Identity transformation (when func is None)
>>> transformer = FunctionTransformer()
>>> transformer.fit(X)
FunctionTransformer()
>>> X_t = transformer.transform(X)
>>> X_t["value"].to_list() == X["value"].to_list()
True
>>> # Stateful transformation (diff) with auto-detected warmup
>>> def diff_func(df):
...     return df.select(pl.all().diff())
>>> def cumsum_func(df, X_p=None):
...     r = df.select(pl.all().cum_sum())
...     if X_p is not None:
...         for c in r.columns:
...             r = r.with_columns((pl.col(c) + X_p[c][-1]).alias(c))
...     return r
>>> transformer = FunctionTransformer(func=diff_func, inverse_func=cumsum_func)
>>> X_t = transformer.fit_transform(X)  # Auto-detects warmup=1 from NaN
>>> len(X_t) == len(X) - 1  # First row sliced off
True

See Also

Notes

Warmup detection is automatic: during fit, the function is applied and leading all-NaN rows are counted to set _observation_horizon. NaN rows must be contiguous at the start; a ValueError is raised if NaN appears at discontinuous positions.

For stateful transforms (e.g., diff), inverse_func receives the past observations as a second positional argument: inverse_func(X_t, X_p, **inv_kw_args). For stateless transforms, only inverse_func(X_t, **inv_kw_args) is called.

The check_inverse flag samples up to 100 rows and warns if the round-trip inverse_func(func(X)) does not recover the original values within a tolerance of rtol=1e-5, atol=1e-8.

Source Code

Show/Hide source
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
class FunctionTransformer(BaseTransformer):
    """Constructs a transformer from an arbitrary callable.

    A FunctionTransformer forwards its X arguments to a user-defined function
    and returns the result. This is useful for both stateless transformations
    such as log scaling and stateful transformations such as diff that require
    past values.

    The function receives a polars DataFrame (excluding the "time" column) and
    should return a polars DataFrame or numpy array with the same number of rows.

    For stateful functions that produce NaN values at the beginning (e.g., diff),
    use ``fit_transform`` which auto-detects the warmup period by counting
    leading NaN rows. The transformer stores these initial rows for use in
    ``inverse_transform``.

    Parameters
    ----------
    func : callable or None, default=None
        The callable to use for the transformation. This will be passed the
        DataFrame excluding the "time" column. If func is None, then func
        will be the identity function.
    inverse_func : callable or None, default=None
        The callable to use for the inverse transformation.  For stateful
        transforms (with auto-detected warmup), the past observations are
        passed as the second positional argument:
        ``inverse_func(X_t, X_p, **inv_kw_args)``.  For stateless transforms,
        only ``inverse_func(X_t, **inv_kw_args)`` is called.  If ``None``,
        the identity function is used.
    check_inverse : bool, default=True
        Whether to check that ``func`` followed by ``inverse_func`` leads
        to the original inputs. It can be used for a sanity check, raising
        a warning when the condition is not fulfilled.
    kw_args : dict or None, default=None
        Dictionary of additional keyword arguments to pass to func.
    inv_kw_args : dict or None, default=None
        Dictionary of additional keyword arguments to pass to inverse_func.
    feature_names_out : callable, 'one-to-one' or None, default=None
        Determines the list of feature names that will be returned by the
        ``get_feature_names_out`` method. If 'one-to-one', the output feature
        names equal input feature names. If a callable, it must take two
        arguments (self, input_features) and return output feature names.

    Attributes
    ----------
    n_features_in_ : int
        Number of features seen during fit.
    feature_names_in_ : list of str
        Names of features seen during fit.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> import numpy as np
    >>> from yohou.preprocessing import FunctionTransformer

    >>> times = pl.datetime_range(
    ...     start=datetime(2020, 1, 1), end=datetime(2020, 1, 10), interval="1d", eager=True
    ... )
    >>> X = pl.DataFrame({"time": times, "value": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]})

    >>> # Log transformation
    >>> transformer = FunctionTransformer(func=np.log, inverse_func=np.exp)
    >>> transformer.fit(X)
    FunctionTransformer(func=<ufunc 'log'>, inverse_func=<ufunc 'exp'>)
    >>> X_t = transformer.transform(X)
    >>> "time" in X_t.columns
    True

    >>> # Identity transformation (when func is None)
    >>> transformer = FunctionTransformer()
    >>> transformer.fit(X)
    FunctionTransformer()
    >>> X_t = transformer.transform(X)
    >>> X_t["value"].to_list() == X["value"].to_list()
    True

    >>> # Stateful transformation (diff) with auto-detected warmup
    >>> def diff_func(df):
    ...     return df.select(pl.all().diff())
    >>> def cumsum_func(df, X_p=None):
    ...     r = df.select(pl.all().cum_sum())
    ...     if X_p is not None:
    ...         for c in r.columns:
    ...             r = r.with_columns((pl.col(c) + X_p[c][-1]).alias(c))
    ...     return r
    >>> transformer = FunctionTransformer(func=diff_func, inverse_func=cumsum_func)
    >>> X_t = transformer.fit_transform(X)  # Auto-detects warmup=1 from NaN
    >>> len(X_t) == len(X) - 1  # First row sliced off
    True

    See Also
    --------
    - [`SlidingWindowFunctionTransformer`][yohou.preprocessing.window.SlidingWindowFunctionTransformer] : Apply function over sliding windows.
    - [`RollingStatisticsTransformer`][yohou.preprocessing.window.RollingStatisticsTransformer] : Compute rolling statistics.

    Notes
    -----
    Warmup detection is automatic: during ``fit``, the function is applied
    and leading all-NaN rows are counted to set ``_observation_horizon``.
    NaN rows must be contiguous at the start; a ``ValueError`` is raised if
    NaN appears at discontinuous positions.

    For stateful transforms (e.g., diff), ``inverse_func`` receives the
    past observations as a second positional argument:
    ``inverse_func(X_t, X_p, **inv_kw_args)``. For stateless transforms,
    only ``inverse_func(X_t, **inv_kw_args)`` is called.

    The ``check_inverse`` flag samples up to 100 rows and warns if the
    round-trip ``inverse_func(func(X))`` does not recover the original
    values within a tolerance of ``rtol=1e-5``, ``atol=1e-8``.

    """

    _parameter_constraints: dict = {
        "func": [callable, None],
        "inverse_func": [callable, None],
        "check_inverse": ["boolean"],
        "kw_args": [dict, None],
        "inv_kw_args": [dict, None],
        "feature_names_out": [callable, StrOptions({"one-to-one"}), None],
    }

    _tags = {"stateful": True}

    def __init__(
        self,
        func: Callable | None = None,
        inverse_func: Callable | None = None,
        *,
        check_inverse: bool = True,
        kw_args: dict | None = None,
        inv_kw_args: dict | None = None,
        feature_names_out: Callable | Literal["one-to-one"] | None = None,
    ):
        self.func = func
        self.inverse_func = inverse_func
        self.check_inverse = check_inverse
        self.kw_args = kw_args
        self.inv_kw_args = inv_kw_args
        self.feature_names_out = feature_names_out

    def _check_inverse_transform(self, X: pl.DataFrame) -> None:
        """Check that func and inverse_func are inverses."""
        # Sample a subset of the data for efficiency
        n_samples = min(100, len(X))
        idx = np.linspace(0, len(X) - 1, n_samples, dtype=int)
        X_sample = X[idx.tolist()]

        X_transformed = self.transform(X_sample)
        # Provide warmup rows as X_p so inverse_func can reconstruct
        X_p = X_sample.head(self._observation_horizon) if self._observation_horizon > 0 else None
        X_round_trip = self.inverse_transform(X_transformed, X_p=X_p)

        # Compare numeric columns only
        numeric_cols = [c for c in X.columns if c != "time"]
        for col in numeric_cols:
            original = X_sample[col].to_numpy()
            recovered = X_round_trip[col].to_numpy()
            if len(original) != len(recovered):
                min_len = min(len(original), len(recovered))
                original = original[-min_len:]
                recovered = recovered[-min_len:]
            if not np.allclose(original, recovered, rtol=1e-5, atol=1e-8, equal_nan=True):
                warnings.warn(
                    "The provided functions are not strictly inverse of each other. "
                    "If you are sure you want to proceed regardless, set 'check_inverse=False'.",
                    UserWarning,
                    stacklevel=2,
                )
                return

    def _detect_warmup(self, X: pl.DataFrame) -> int:
        """Detect warmup by counting leading NaN rows after applying func.

        Parameters
        ----------
        X : pl.DataFrame
            Input DataFrame with "time" column.

        Returns
        -------
        int
            Number of leading NaN rows (observation horizon).

        Raises
        ------
        ValueError
            If ``func`` generates NaN values at discontinuous time stamps.

        """
        # Apply transformation
        X_t = self._apply_func(X, self.func, self.kw_args)

        # Auto-detect observation_horizon from NaN rows at the beginning
        data_cols = X_t.select(~cs.by_name("time"))

        # Find rows where all values are NaN
        all_nan_mask = data_cols.select(pl.all_horizontal(pl.all().is_null() | pl.all().is_nan())).to_series()

        # Count consecutive NaN rows at the beginning
        obs_horizon = 0
        for is_nan in all_nan_mask:
            if is_nan:
                obs_horizon += 1
            else:
                break

        # Verify NaN only at the beginning
        if obs_horizon > 0:
            # Check that remaining rows don't have all-NaN
            remaining_nan = all_nan_mask[obs_horizon:]
            if remaining_nan.any():
                raise ValueError(
                    "`func` generates NaN values at discontinuous time stamps. "
                    "`func` is only allowed to generate NaN at continuous time "
                    "stamps at the beginning of X."
                )

        return obs_horizon

    @_fit_context(prefer_skip_nested_validation=True)
    def fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> FunctionTransformer:
        """Fit transformer by validating X and auto-detecting warmup.

        The warmup is auto-detected by applying the function and counting
        leading NaN rows.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        y : pl.DataFrame or None, default=None
            Ignored.  Present for API compatibility.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        self
            The fitted transformer instance.

        """
        X = validate_transformer_data(self, X=X, reset=True)
        BaseTransformer.fit(self, X, y, **params)

        # Auto-detect warmup from NaN rows
        self._observation_horizon = self._detect_warmup(X)

        if self.check_inverse and self.func is not None and self.inverse_func is not None:
            self._check_inverse_transform(X)

        return self

    def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> pl.DataFrame:
        """Fit and transform, handling stateful functions with warmup.

        For stateful functions that produce NaN at the beginning (e.g., diff),
        this method fits the transformer (auto-detecting warmup if needed),
        transforms the data, and slices off the warmup rows.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        y : pl.DataFrame or None, default=None
            Ignored.  Present for API compatibility.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        """
        # Fit (includes warmup detection)
        self.fit(X, y, **params)

        # Apply transformation
        X_t = self._apply_func(X, self.func, self.kw_args)

        # Store initial rows for inverse transform and slice output
        if self._observation_horizon > 0:
            self._X_observed = X.head(self._observation_horizon)
            X_t = X_t.slice(self._observation_horizon)

        return X_t

    def _apply_func(
        self,
        X: pl.DataFrame,
        func: Callable | None,
        kw_args: dict | None,
        X_p: pl.DataFrame | None = None,
    ) -> pl.DataFrame:
        """Apply function to DataFrame, preserving structure.

        Parameters
        ----------
        X : pl.DataFrame
            Input DataFrame with "time" column.
        func : callable or None
            Function to apply.
        kw_args : dict or None
            Keyword arguments for function.
        X_p : pl.DataFrame or None, default=None
            Past observations (with or without ``"time"`` column).
            When not ``None``, the data portion is passed as the second
            positional argument to *func*.

        Returns
        -------
        pl.DataFrame
            Transformed DataFrame.

        """
        if func is None:
            return X

        # Separate time column
        time_col = X.select("time")
        data_cols = X.select(~cs.by_name("time"))

        # Apply function
        kwargs = kw_args if kw_args is not None else {}
        if X_p is not None:
            data_p = X_p.select(~cs.by_name("time")) if "time" in X_p.columns else X_p
            result = func(data_cols, data_p, **kwargs)
        else:
            result = func(data_cols, **kwargs)

        # Handle different return types
        if isinstance(result, pl.DataFrame):
            result_df = result
        elif isinstance(result, np.ndarray):
            # Reconstruct DataFrame from numpy array
            if result.ndim == 1:
                result = result.reshape(-1, 1)
            result_df = pl.DataFrame(result, schema=data_cols.columns)
        else:
            # Try to convert to DataFrame
            result_df = pl.DataFrame(result, schema=data_cols.columns)

        # Recombine with time
        return pl.concat([time_col, result_df], how="horizontal")

    def transform(self, X: pl.DataFrame, X_p: pl.DataFrame | None = None, **params) -> pl.DataFrame:
        """Transform X using the forward function.

        For stateful functions, provide past observations `X_p` to
        enable proper transformation without NaN at the beginning.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        X_p : pl.DataFrame or None, default=None
            Past observations to prepend before transformation. If provided
            and warmup_ > 0, the function is applied to the concatenation of
            X_p and X, then only the X portion is returned.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_", "_observation_horizon"])
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)

        if X_p is not None and self._observation_horizon > 0:
            # Concatenate past observations with current data
            X_full = pl.concat([X_p.tail(self._observation_horizon), X], how="vertical")
            X_t_full = self._apply_func(X_full, self.func, self.kw_args)
            # Return only the X portion (after warmup)
            return X_t_full.slice(self._observation_horizon)

        X_t = self._apply_func(X, self.func, self.kw_args)

        # Drop warmup rows (contain NaN from insufficient history)
        if self._observation_horizon > 0:
            X_t = X_t[self._observation_horizon :]

        return X_t

    def inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame | None = None, **params) -> pl.DataFrame:
        """Transform X using the inverse function.

        For stateful transforms (``_observation_horizon > 0``), the past
        observations ``X_p`` are passed as the second positional argument
        to ``inverse_func``.  The ``inverse_func`` signature should be
        ``inverse_func(X_t, X_p, **inv_kw_args)`` for stateful transforms
        and ``inverse_func(X_t, **inv_kw_args)`` for stateless transforms.

        Parameters
        ----------
        X_t : pl.DataFrame
            Transformed time series with "time" column.
        X_p : pl.DataFrame or None, default=None
            Past observations in original scale.  When ``None`` and
            ``_observation_horizon > 0``, the stored ``_X_observed`` is
            used instead.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Inverse-transformed time series, reversing the transformation
            applied by ``transform``.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_", "_observation_horizon"])

        # Validate input data (inverse=True skips strict schema check since
        # predictions from an ML model may have different dtypes, e.g. Float64
        # when the original target was Int64)
        X_t, X_p = validate_transformer_data(
            self,
            X=X_t,
            reset=False,
            inverse=True,
            X_p=X_p,
            observation_horizon=self._observation_horizon,
        )

        # For stateful transformers, pass past observations to inverse_func
        if self._observation_horizon > 0:
            # Use provided X_p or stored _X_observed
            X_observed = X_p if X_p is not None else getattr(self, "_X_observed", None)
            if X_observed is not None:
                return self._apply_func(
                    X_t,
                    self.inverse_func,
                    self.inv_kw_args,
                    X_p=X_observed.tail(self._observation_horizon),
                )

        return self._apply_func(X_t, self.inverse_func, self.inv_kw_args)

    def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : list of str or None, default=None
            Column names of the input features.  If ``None``, uses the
            feature names seen during ``fit``.

        Returns
        -------
        list of str
            Output feature names after transformation.

        """
        input_features = _check_feature_names_in(self, input_features)

        if self.feature_names_out is None or self.feature_names_out == "one-to-one":
            return list(input_features)
        else:
            return list(np.asarray(self.feature_names_out(self, input_features), dtype=object))

Methods

fit(X, y=None, **params)

Fit transformer by validating X and auto-detecting warmup.

The warmup is auto-detected by applying the function and counting leading NaN rows.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
y DataFrame or None

Ignored. Present for API compatibility.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
self

The fitted transformer instance.

Source Code
Show/Hide source
@_fit_context(prefer_skip_nested_validation=True)
def fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> FunctionTransformer:
    """Fit transformer by validating X and auto-detecting warmup.

    The warmup is auto-detected by applying the function and counting
    leading NaN rows.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    y : pl.DataFrame or None, default=None
        Ignored.  Present for API compatibility.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    self
        The fitted transformer instance.

    """
    X = validate_transformer_data(self, X=X, reset=True)
    BaseTransformer.fit(self, X, y, **params)

    # Auto-detect warmup from NaN rows
    self._observation_horizon = self._detect_warmup(X)

    if self.check_inverse and self.func is not None and self.inverse_func is not None:
        self._check_inverse_transform(X)

    return self

fit_transform(X, y=None, **params)

Fit and transform, handling stateful functions with warmup.

For stateful functions that produce NaN at the beginning (e.g., diff), this method fits the transformer (auto-detecting warmup if needed), transforms the data, and slices off the warmup rows.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
y DataFrame or None

Ignored. Present for API compatibility.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with a "time" column and transformed value columns.

Source Code
Show/Hide source
def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> pl.DataFrame:
    """Fit and transform, handling stateful functions with warmup.

    For stateful functions that produce NaN at the beginning (e.g., diff),
    this method fits the transformer (auto-detecting warmup if needed),
    transforms the data, and slices off the warmup rows.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    y : pl.DataFrame or None, default=None
        Ignored.  Present for API compatibility.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with a ``"time"`` column and transformed
        value columns.

    """
    # Fit (includes warmup detection)
    self.fit(X, y, **params)

    # Apply transformation
    X_t = self._apply_func(X, self.func, self.kw_args)

    # Store initial rows for inverse transform and slice output
    if self._observation_horizon > 0:
        self._X_observed = X.head(self._observation_horizon)
        X_t = X_t.slice(self._observation_horizon)

    return X_t

transform(X, X_p=None, **params)

Transform X using the forward function.

For stateful functions, provide past observations X_p to enable proper transformation without NaN at the beginning.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
X_p DataFrame or None

Past observations to prepend before transformation. If provided and warmup_ > 0, the function is applied to the concatenation of X_p and X, then only the X portion is returned.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with a "time" column and transformed value columns.

Source Code
Show/Hide source
def transform(self, X: pl.DataFrame, X_p: pl.DataFrame | None = None, **params) -> pl.DataFrame:
    """Transform X using the forward function.

    For stateful functions, provide past observations `X_p` to
    enable proper transformation without NaN at the beginning.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    X_p : pl.DataFrame or None, default=None
        Past observations to prepend before transformation. If provided
        and warmup_ > 0, the function is applied to the concatenation of
        X_p and X, then only the X portion is returned.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with a ``"time"`` column and transformed
        value columns.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_", "_observation_horizon"])
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=False)

    if X_p is not None and self._observation_horizon > 0:
        # Concatenate past observations with current data
        X_full = pl.concat([X_p.tail(self._observation_horizon), X], how="vertical")
        X_t_full = self._apply_func(X_full, self.func, self.kw_args)
        # Return only the X portion (after warmup)
        return X_t_full.slice(self._observation_horizon)

    X_t = self._apply_func(X, self.func, self.kw_args)

    # Drop warmup rows (contain NaN from insufficient history)
    if self._observation_horizon > 0:
        X_t = X_t[self._observation_horizon :]

    return X_t

inverse_transform(X_t, X_p=None, **params)

Transform X using the inverse function.

For stateful transforms (_observation_horizon > 0), the past observations X_p are passed as the second positional argument to inverse_func. The inverse_func signature should be inverse_func(X_t, X_p, **inv_kw_args) for stateful transforms and inverse_func(X_t, **inv_kw_args) for stateless transforms.

Parameters
Name Type Description Default
X_t DataFrame

Transformed time series with "time" column.

required
X_p DataFrame or None

Past observations in original scale. When None and _observation_horizon > 0, the stored _X_observed is used instead.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Inverse-transformed time series, reversing the transformation applied by transform.

Source Code
Show/Hide source
def inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame | None = None, **params) -> pl.DataFrame:
    """Transform X using the inverse function.

    For stateful transforms (``_observation_horizon > 0``), the past
    observations ``X_p`` are passed as the second positional argument
    to ``inverse_func``.  The ``inverse_func`` signature should be
    ``inverse_func(X_t, X_p, **inv_kw_args)`` for stateful transforms
    and ``inverse_func(X_t, **inv_kw_args)`` for stateless transforms.

    Parameters
    ----------
    X_t : pl.DataFrame
        Transformed time series with "time" column.
    X_p : pl.DataFrame or None, default=None
        Past observations in original scale.  When ``None`` and
        ``_observation_horizon > 0``, the stored ``_X_observed`` is
        used instead.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Inverse-transformed time series, reversing the transformation
        applied by ``transform``.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_", "_observation_horizon"])

    # Validate input data (inverse=True skips strict schema check since
    # predictions from an ML model may have different dtypes, e.g. Float64
    # when the original target was Int64)
    X_t, X_p = validate_transformer_data(
        self,
        X=X_t,
        reset=False,
        inverse=True,
        X_p=X_p,
        observation_horizon=self._observation_horizon,
    )

    # For stateful transformers, pass past observations to inverse_func
    if self._observation_horizon > 0:
        # Use provided X_p or stored _X_observed
        X_observed = X_p if X_p is not None else getattr(self, "_X_observed", None)
        if X_observed is not None:
            return self._apply_func(
                X_t,
                self.inverse_func,
                self.inv_kw_args,
                X_p=X_observed.tail(self._observation_horizon),
            )

    return self._apply_func(X_t, self.inverse_func, self.inv_kw_args)

get_feature_names_out(input_features=None)

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features list of str or None

Column names of the input features. If None, uses the feature names seen during fit.

None
Returns
Type Description
list of str

Output feature names after transformation.

Source Code
Show/Hide source
def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : list of str or None, default=None
        Column names of the input features.  If ``None``, uses the
        feature names seen during ``fit``.

    Returns
    -------
    list of str
        Output feature names after transformation.

    """
    input_features = _check_feature_names_in(self, input_features)

    if self.feature_names_out is None or self.feature_names_out == "one-to-one":
        return list(input_features)
    else:
        return list(np.asarray(self.feature_names_out(self, input_features), dtype=object))

Tutorials

The following example notebooks use this component:

  • How to Wrap Functions as Transformers


    Data-Features

    Wrap arbitrary polars or numpy operations as sklearn transformers with FunctionTransformer, supporting stateful warmup, inverse transforms, and pipelines.

    View · Open in marimo