Skip to content

FeaturePipeline

yohou.compose.feature_pipeline.FeaturePipeline

Bases: BaseTransformer, _BaseComposition

A sequence of time series transformers.

FeaturePipeline allows you to sequentially apply a list of time series transformers to preprocess the data.

Steps of the pipeline must be 'transforms', that is, they must implement fit, transform and observe methods.

The purpose of the pipeline is to assemble several steps that can be cross-validated together while setting different parameters. For this, it enables setting parameters of the various steps using their names and the parameter name separated by a '__', as in the example below. A step's estimator may be replaced entirely by setting the parameter with its name to another estimator, or a transformer removed by setting it to 'passthrough' or None.

Parameters

Name Type Description Default
steps list of tuples

List of (name of step, estimator) tuples that are to be chained in sequential order. To be compatible with the scikit-learn API, all steps must define fit. All non-last steps must also define transform. See Combining Estimators for more details.

required
memory str or object with the joblib.Memory interface

Used to cache the fitted transformers of the pipeline. The last step will never be cached, even if it is a transformer. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attribute named_steps or steps to inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming.

None
verbose bool

If True, the time elapsed while fitting each step will be printed as it is completed.

False

Attributes

Name Type Description
named_steps `Bunch`

Dictionary-like object, with the following attributes. Read-only attribute to access any step parameter by user given name. Keys are step names and values are steps parameters.

n_features_in_ int

Number of features seen during fit. Only defined if the underlying first estimator in steps exposes such an attribute when fit.

feature_names_in_ ndarray of shape (`n_features_in_`,)

Names of features seen during fit. Only defined if the underlying estimator exposes such an attribute when fit.

See Also

sklearn.pipeline.Pipeline : Underlying scikit-learn pipeline class. - BaseTransformer : Base class for time series transformers. - FeatureUnion : Parallel transformer combination. - ColumnTransformer : Apply transformers to specific columns.

Notes

All input data must include a time column with datetime values. The time column is preserved through all transformations.

The observation_horizon property accumulates across all steps, returning the sum of all transformer observation horizons. This indicates the total amount of historical data required by the pipeline.

Supports time series-specific observe() method for incremental learning, allowing the pipeline to incorporate new observations without full retraining.

The final step can be a forecaster, enabling end-to-end forecasting pipelines that transform features and generate predictions.

Examples

>>> import polars as pl
>>> from datetime import datetime, timedelta
>>> from yohou.compose import FeaturePipeline
>>> from yohou.stationarity import SeasonalDifferencing
>>> from yohou.preprocessing import LagTransformer
>>>
>>> # Create sample weekly time series data (52 weeks)
>>> time = pl.datetime_range(
...     start=datetime(2023, 1, 1),
...     end=datetime(2023, 1, 1) + timedelta(weeks=51),
...     interval="1w",
...     eager=True,
... )
>>> data = pl.DataFrame({"time": time, "sales": range(1, 53)})
>>>
>>> # Example 1: Create a sequential preprocessing pipeline
>>> pipe = FeaturePipeline([
...     ("deseason", SeasonalDifferencing(seasonality=4)),
...     ("lags", LagTransformer(lag=[1, 2, 3])),
... ])
>>>
>>> # Example 2: Access individual steps by name
>>> pipe.named_steps["deseason"]
SeasonalDifferencing(...)
>>>
>>> # Example 3: Access individual steps by position
>>> pipe[0]
SeasonalDifferencing(...)

Source Code

Show/Hide source
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
class FeaturePipeline(BaseTransformer, _BaseComposition):
    """
    A sequence of time series transformers.

    `FeaturePipeline` allows you to sequentially apply a list of time series
    transformers to preprocess the data.

    Steps of the pipeline must be 'transforms', that is, they must implement
    `fit`, `transform` and `observe` methods.

    The purpose of the pipeline is to assemble several steps that can be
    cross-validated together while setting different parameters. For this, it
    enables setting parameters of the various steps using their names and the
    parameter name separated by a `'__'`, as in the example below. A step's
    estimator may be replaced entirely by setting the parameter with its name
    to another estimator, or a transformer removed by setting it to
    `'passthrough'` or `None`.

    Parameters
    ----------
    steps : list of tuples
        List of (name of step, estimator) tuples that are to be chained in
        sequential order. To be compatible with the scikit-learn API, all steps
        must define `fit`. All non-last steps must also define `transform`. See
        [Combining Estimators](https://scikit-learn.org/stable/modules/compose.html) for more details.

    memory : str or object with the joblib.Memory interface, default=None
        Used to cache the fitted transformers of the pipeline. The last step
        will never be cached, even if it is a transformer. By default, no
        caching is performed. If a string is given, it is the path to the
        caching directory. Enabling caching triggers a clone of the transformers
        before fitting. Therefore, the transformer instance given to the
        pipeline cannot be inspected directly. Use the attribute ``named_steps``
        or ``steps`` to inspect estimators within the pipeline. Caching the
        transformers is advantageous when fitting is time consuming.

    verbose : bool, default=False
        If True, the time elapsed while fitting each step will be printed as it
        is completed.

    Attributes
    ----------
    named_steps : `Bunch`
        Dictionary-like object, with the following attributes.
        Read-only attribute to access any step parameter by user given name.
        Keys are step names and values are steps parameters.

    n_features_in_ : int
        Number of features seen during ``fit``. Only defined if the
        underlying first estimator in `steps` exposes such an attribute
        when fit.

    feature_names_in_ : ndarray of shape (`n_features_in_`,)
        Names of features seen during ``fit``. Only defined if the
        underlying estimator exposes such an attribute when fit.

    See Also
    --------
    `sklearn.pipeline.Pipeline` : Underlying scikit-learn pipeline class.
    - [`BaseTransformer`][yohou.base.transformer.BaseTransformer] : Base class for time series transformers.
    - [`FeatureUnion`][yohou.compose.feature_union.FeatureUnion] : Parallel transformer combination.
    - [`ColumnTransformer`][yohou.compose.column_transformer.ColumnTransformer] : Apply transformers to specific columns.

    Notes
    -----
    All input data must include a `time` column with datetime values. The `time`
    column is preserved through all transformations.

    The `observation_horizon` property accumulates across all steps, returning
    the sum of all transformer observation horizons. This indicates the total
    amount of historical data required by the pipeline.

    Supports time series-specific `observe()` method for incremental learning,
    allowing the pipeline to incorporate new observations without full retraining.

    The final step can be a forecaster, enabling end-to-end forecasting pipelines
    that transform features and generate predictions.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime, timedelta
    >>> from yohou.compose import FeaturePipeline
    >>> from yohou.stationarity import SeasonalDifferencing
    >>> from yohou.preprocessing import LagTransformer
    >>>
    >>> # Create sample weekly time series data (52 weeks)
    >>> time = pl.datetime_range(
    ...     start=datetime(2023, 1, 1),
    ...     end=datetime(2023, 1, 1) + timedelta(weeks=51),
    ...     interval="1w",
    ...     eager=True,
    ... )
    >>> data = pl.DataFrame({"time": time, "sales": range(1, 53)})
    >>>
    >>> # Example 1: Create a sequential preprocessing pipeline
    >>> pipe = FeaturePipeline([
    ...     ("deseason", SeasonalDifferencing(seasonality=4)),
    ...     ("lags", LagTransformer(lag=[1, 2, 3])),
    ... ])
    >>>
    >>> # Example 2: Access individual steps by name
    >>> pipe.named_steps["deseason"]  # doctest: +ELLIPSIS
    SeasonalDifferencing(...)
    >>>
    >>> # Example 3: Access individual steps by position
    >>> pipe[0]  # doctest: +ELLIPSIS
    SeasonalDifferencing(...)

    """

    # BaseEstimator interface
    _required_parameters = ["steps"]

    _parameter_constraints: dict[str, Any] = {
        "steps": [list, Hidden(tuple)],
        "memory": [None, str, HasMethods(["cache"])],
        "verbose": ["boolean"],
    }

    def __init__(
        self,
        steps: list[tuple[str, Any]],
        *,
        memory: None | Memory | str = None,
        verbose: bool = False,
    ) -> None:
        self.steps = steps
        self.transform_input = None
        self.memory = memory
        self.verbose = verbose

    def get_params(self, deep: bool = True) -> dict[str, Any]:
        """Get parameters for this estimator.

        Parameters
        ----------
        deep : bool, default=True
            If True, will return the parameters for this estimator and
            contained subobjects that are estimators.

        Returns
        -------
        params : dict[str, Any]
            Parameter names mapped to their values.

        """
        return _BaseComposition._get_params(self, attr="steps", deep=deep)

    def set_params(self, **params: Any) -> "FeaturePipeline":
        """Set the parameters of this estimator.

        Parameters
        ----------
        **params : dict
            Estimator parameters.

        Returns
        -------
        self : FeaturePipeline
            FeaturePipeline instance.

        """
        _BaseComposition._set_params(self, attr="steps", **params)
        return self

    def _iter(
        self,
        with_final: bool = True,
        filter_passthrough: bool = True,
    ) -> Iterator[tuple[int, str, Any]]:
        """Generate (idx, name, trans) tuples from self.steps.

        Parameters
        ----------
        with_final : bool, default=True
            Include the final estimator.

        filter_passthrough : bool, default=True
            Filter out 'passthrough' steps.

        Yields
        ------
        idx : int
            Step index.
        name : str
            Step name.
        trans : Any
            Step transformer.

        """
        # Cast self to sklearn_Pipeline for parent method call.
        # FeaturePipeline extends sklearn_Pipeline, so this is safe.
        return sklearn_Pipeline._iter(
            cast(sklearn_Pipeline, self),
            with_final=with_final,
            filter_passthrough=filter_passthrough,
        )

    def __len__(self) -> int:
        """Return the length of the FeaturePipeline.

        Returns
        -------
        length : int
            Number of steps in the pipeline.

        """
        return len(self.steps)

    def __getitem__(self, ind: int | str | slice) -> Any:
        """Return a sub-pipeline or a single estimator in the pipeline.

        Parameters
        ----------
        ind : int, str, or slice
            Index, name, or slice of the step to retrieve.

        Returns
        -------
        estimator : Any
            The estimator or sub-pipeline.

        """
        if isinstance(ind, slice):
            if ind.step is not None:
                raise ValueError("FeaturePipeline slicing only supports a step of 1")
            return self.__class__(steps=self.steps[ind], memory=self.memory, verbose=self.verbose)
        elif isinstance(ind, int):
            _, est = self.steps[ind]
            return est
        else:
            # String case - get by name
            return self.named_steps[ind]

    def _fit(self, X: pl.DataFrame, y: pl.DataFrame | None, routed_params: Any, **kwargs: Any) -> pl.DataFrame:  # ty: ignore[invalid-method-override]
        """Fit the pipeline.

        Parameters
        ----------
        X : pl.DataFrame
            Training data.

        y : pl.DataFrame | None
            Training targets.

        routed_params : Any
            Routed parameters.

        **kwargs : Any
            Additional keyword arguments forwarded to the parent ``_fit``
            (e.g. ``callback_ctx`` in scikit-learn >= 1.9).

        Returns
        -------
        X_t : pl.DataFrame
            Transformed data.

        """
        if "callback_ctx" not in kwargs:
            # sklearn >= 1.9 requires callback_ctx in Pipeline._fit
            _init = getattr(sklearn_Pipeline, "_init_callback_context", None)
            if _init is not None:
                kwargs["callback_ctx"] = _init(self, max_subtasks=len(self.steps))
        return sklearn_Pipeline._fit(self, X, y, routed_params, **kwargs)  # ty: ignore[invalid-argument-type]

    @property
    def named_steps(self) -> Bunch:
        """Access the steps by name.

        Returns
        -------
        named_steps : Bunch
            Dictionary-like object with step names as keys.

        """
        return sklearn_Pipeline.named_steps.fget(self)  # ty: ignore[invalid-argument-type]

    @property
    def _final_estimator(self) -> Any:
        """Get the final estimator.

        Returns
        -------
        estimator : Any
            The final estimator in the pipeline.

        """
        return sklearn_Pipeline._final_estimator.fget(self)  # ty: ignore[invalid-argument-type]

    def _log_message(self, step_idx: int) -> str:
        """Get log message for a step.

        Parameters
        ----------
        step_idx : int
            Index of the step.

        Returns
        -------
        message : str
            Log message.

        """
        return sklearn_Pipeline._log_message(self, step_idx)  # ty: ignore[invalid-argument-type]

    def _check_method_params(self, method: str, props: dict[str, Any]) -> Any:
        """Check and route method parameters.

        Parameters
        ----------
        method : str
            Method name.

        props : dict[str, Any]
            Properties to check.

        Returns
        -------
        routed_params : Any
            Routed parameters.

        """
        # Validate params before routing (sklearn pattern)
        _raise_for_params(props, self, method)
        return process_routing(self, method, **props)

    def get_feature_names_out(self, input_features: list[str] | None = None) -> Any:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : list[str] | None, default=None
            Input feature names.

        Returns
        -------
        feature_names_out : Any
            Output feature names.

        """
        return super().get_feature_names_out(input_features)

    @property
    def n_features_in_(self) -> int:
        """Number of features seen during fit.

        Returns
        -------
        n_features_in_ : int
            Number of input features.

        """
        return sklearn_Pipeline.n_features_in_.fget(self)  # ty: ignore[invalid-argument-type]

    @property
    def feature_names_in_(self) -> Any:
        """Names of features seen during fit.

        Returns
        -------
        feature_names_in_ : Any
            Names of input features.

        """
        return sklearn_Pipeline.feature_names_in_.fget(self)  # ty: ignore[invalid-argument-type]

    def __sklearn_is_fitted__(self) -> bool:
        """Check if the pipeline is fitted.

        Returns
        -------
        is_fitted : bool
            True if the pipeline is fitted.

        """
        return sklearn_Pipeline.__sklearn_is_fitted__(self)  # ty: ignore[invalid-argument-type]

    def _sk_visual_block_(self) -> Any:
        """Get visual block representation.

        Returns
        -------
        visual_block : Any
            Visual block representation.

        """
        # Cast self to sklearn_Pipeline for parent method call.
        # FeaturePipeline extends sklearn_Pipeline, so this is safe.
        return sklearn_Pipeline._sk_visual_block_(cast(sklearn_Pipeline, self))

    def _get_metadata_for_step(self, **params: Any) -> Any:
        """Get metadata for a specific step.

        Parameters
        ----------
        **params : dict
            Arguments passed from sklearn's _fit method.

        Returns
        -------
        metadata : Any
            Metadata for the step.

        """
        # Cast self to sklearn_Pipeline for parent method call.
        # FeaturePipeline extends sklearn_Pipeline, so this is safe.
        return sklearn_Pipeline._get_metadata_for_step(cast(sklearn_Pipeline, self), **params)

    def __sklearn_tags__(self) -> Tags:
        """Get estimator tags.

        Returns
        -------
        Tags
            Estimator tags with yohou-specific attributes.

        """
        tags = super().__sklearn_tags__()

        # Aggregate tags from steps (static capability check)
        if hasattr(self, "steps") and self.steps is not None:
            transformers = [t for _, t in self.steps if t != "passthrough" and t is not None]
            if transformers:
                assert tags.transformer_tags is not None
                assert tags.input_tags is not None
                # Stateful if any step is stateful
                tags.transformer_tags.stateful = any(
                    t.__sklearn_tags__().transformer_tags.stateful for t in transformers
                )

                # Invertible if all steps are invertible
                tags.transformer_tags.invertible = all(
                    t.__sklearn_tags__().transformer_tags.invertible for t in transformers
                )

                # min_value is the one of the first transformer
                tags.input_tags.min_value = transformers[0].__sklearn_tags__().input_tags.min_value

        return tags

    @property
    def observation_horizon(self) -> int:
        """Get cumulative observation horizon across all steps.

        Returns
        -------
        int
            Total observation horizon needed.

        Raises
        ------
        NotFittedError
            If the pipeline has not been fitted yet.

        """
        check_is_fitted(self)

        observation_horizon = 0
        for _, t in self.steps:
            if t != "passthrough" and t is not None and hasattr(t, "observation_horizon"):
                observation_horizon += t.observation_horizon

        return observation_horizon

    def _update_X_observed(self, X: pl.DataFrame) -> None:
        """Update pipeline-level observation tracking.

        Unlike individual transformers, the pipeline does not store
        ``observation_horizon`` rows of raw input.  Each step manages
        its own memory buffer.  We only keep the last row for temporal
        continuity checks in `observe` and record
        ``observed_time_``.

        Parameters
        ----------
        X : pl.DataFrame
            Raw pipeline input.

        """
        # Store one row for continuity checks, not the full observation_horizon
        self._X_observed = X[-1:] if len(X) > 0 else X[:0]
        if len(X) > 0:
            self.observed_time_ = X["time"][-1]

    def rewind(self, X: pl.DataFrame) -> "FeaturePipeline":
        """Rewind the pipeline state to the provided data.

        Propagates ``rewind_transform()`` through each step in order.
        Each step transforms its input (dropping warmup rows) and rewinds
        its own memory, then passes the transformed data to the next step.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series.

        Returns
        -------
        self

        """
        check_is_fitted(self)

        # Propagate rewind_transform through each step:
        # step.rewind_transform(Xt) = step.transform(Xt) + step.rewind(Xt)
        Xt = X
        for _, _, transform in self._iter():
            Xt = transform.rewind_transform(Xt)

        # Update pipeline-level tracking
        self._update_X_observed(X)

        return self

    def observe(self, X: pl.DataFrame) -> "FeaturePipeline":
        """Observe new data and update step memory.

        Propagates ``observe_transform()`` through each step in order.
        Each step uses its own memory to provide context, transforms the
        data, updates its observation buffer, then passes the result to
        the next step.

        Parameters
        ----------
        X : pl.DataFrame
            New observations to incorporate.

        Returns
        -------
        self

        Raises
        ------
        ValueError
            If ``X`` is not temporally continuous with previous
            observations.

        """
        check_is_fitted(self)
        # Schema + continuity validation at pipeline level
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

        # Propagate observe_transform through each step:
        # step.observe_transform(Xt) = concat(_X_observed, Xt) -> transform -> observe
        Xt = X
        for _, _, transform in self._iter():
            Xt = transform.observe_transform(Xt)

        # Update pipeline-level tracking
        self._update_X_observed(X)

        return self

    def _validate_steps(self) -> None:
        """Validate that all steps are BaseTransformer instances.

        Raises
        ------
        TypeError
            If any step is not a BaseTransformer or 'passthrough'.

        """
        names, transformers = zip(*self.steps, strict=False)

        # validate names
        self._validate_names(names)

        for t in transformers:
            if t is None or t == "passthrough":
                continue
            if not isinstance(t, BaseTransformer):
                raise TypeError(
                    "All steps should be instances of `BaseTransformer` "
                    "or be the string 'passthrough' "
                    f"'{t}' (type {type(t)}) doesn't"
                )

    @_fit_context(
        # estimators in FeaturePipeline.steps are not validated yet
        prefer_skip_nested_validation=False
    )
    def fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params: Any) -> "FeaturePipeline":
        """Fit the model.

        Fit all the transformers one after the other and sequentially transform the
        data. Finally, fit the transformed data using the final estimator.

        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first step of the
            pipeline.

        y : iterable, default=None
            Training targets. Must fulfill label requirements for all steps of
            the pipeline.

        **params : dict of str -> object
            - If `enable_metadata_routing=False` (default):

                Parameters passed to the ``fit`` method of each step, where
                each parameter name is prefixed such that parameter ``p`` for step
                ``s`` has key ``s__p``.

            - If `enable_metadata_routing=True`:

                Parameters requested and accepted by steps. Each step must have
                requested certain metadata for these parameters to be forwarded to
                them.

        Returns
        -------
        self : object
            FeaturePipeline with fitted steps.
        """
        routed_params = self._check_method_params(method="fit", props=params)
        X_t = self._fit(X, y, routed_params)
        with _print_elapsed_time("FeaturePipeline", self._log_message(len(self.steps) - 1)):
            if self._final_estimator != "passthrough":
                last_step_params = routed_params[self.steps[-1][0]]
                self._final_estimator.fit(X_t, y, **last_step_params["fit"])

        # Set pipeline-level attributes for rewind/observe validation.
        # Individual steps are already fitted with their own attributes.
        self.X_schema_ = dict(X.select(~cs.by_name("time")).schema)
        self._observation_horizon = self.observation_horizon
        self._update_X_observed(X)

        return self

    @_fit_context(
        # estimators in FeaturePipeline.steps are not validated yet
        prefer_skip_nested_validation=False
    )
    def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params: Any) -> pl.DataFrame:
        """Fit the model and transform with the final estimator.

        Fit all the transformers one after the other and sequentially transform
        the data. Only valid if the final estimator either implements
        `fit_transform` or `fit` and `transform`.

        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first step of the
            pipeline.

        y : iterable, default=None
            Training targets. Must fulfill label requirements for all steps of
            the pipeline.

        **params : dict of str -> object
            - If `enable_metadata_routing=False` (default):

                Parameters passed to the ``fit`` method of each step, where
                each parameter name is prefixed such that parameter ``p`` for step
                ``s`` has key ``s__p``.

            - If `enable_metadata_routing=True`:

                Parameters requested and accepted by steps. Each step must have
                requested certain metadata for these parameters to be forwarded to
                them.

        Returns
        -------
        X_t : ndarray of shape (n_samples, n_transformed_features)
            Transformed samples.
        """
        routed_params = self._check_method_params(method="fit_transform", props=params)
        X_t = self._fit(X, y, routed_params)

        last_step = self._final_estimator
        with _print_elapsed_time("FeaturePipeline", self._log_message(len(self.steps) - 1)):
            if last_step == "passthrough":
                return X_t

            last_step_params = routed_params[self.steps[-1][0]]
            result = last_step.fit_transform(X_t, y, **last_step_params["fit_transform"])
            return result

    def transform(self, X: pl.DataFrame, **params: Any) -> pl.DataFrame:
        """Transform the data, and apply `transform` with the final estimator.

        Call `transform` of each transformer in the pipeline. The transformed
        data are finally passed to the final estimator that calls
        `transform` method. Only valid if the final estimator
        implements `transform`.

        This also works where final estimator is `None` in which case all prior
        transformations are applied.

        Parameters
        ----------
        X : iterable
            Data to transform. Must fulfill input requirements of first step
            of the pipeline.

        **params : dict of str -> object
            Parameters requested and accepted by steps. Each step must have
            requested certain metadata for these parameters to be forwarded to
            them.

        Returns
        -------
        X_t : ndarray of shape (n_samples, n_transformed_features)
            Transformed data.
        """
        _raise_for_params(params, self, "transform")

        # not branching here since params is only available if
        # enable_metadata_routing=True
        routed_params = process_routing(self, "transform", **params)
        X_t = X
        for _, name, transform in self._iter():
            X_t = transform.transform(X_t, **routed_params[name].transform)
        return X_t

    def observe_transform(self, X: pl.DataFrame, **params: Any) -> pl.DataFrame:
        """Observe and transform the data through the pipeline.

        This method atomically observes each transformer with new data and
        transforms it in sequence. The transformation uses the pre-observe state,
        then updates the memory. This is more efficient and correct than calling
        observe() then transform() separately.

        Parameters
        ----------
        X : pl.DataFrame
            New data to observe and transform. Must fulfill input requirements
            of first step of the pipeline.

        **params : dict of str -> object
            Parameters routed to the `transform` methods of the steps. Each step must
            have requested certain metadata via `set_transform_request()` for these
            parameters to be forwarded to them.

        Returns
        -------
        X_t : pl.DataFrame
            Transformed data corresponding to the new input rows.

        """
        _raise_for_params(params, self, "observe_transform")

        routed_params = process_routing(self, "observe_transform", **params)

        # Transform sequentially through all steps using their observe_transform
        # Each transformer handles its own memory management internally
        X_t = X
        for _, name, transform in self._iter():
            X_t = _observe_transform_one(transform, X_t, None, None, routed_params[name])

        return X_t

    def rewind_transform(self, X: pl.DataFrame, **params: Any) -> pl.DataFrame:
        """Rewind and transform the data through the pipeline.

        This method applies rewind_transform semantics to each transformer in sequence:
        transforms from scratch without using pre-existing memory, discards warmup rows,
        and rewinds the internal state with the input data.

        Parameters
        ----------
        X : pl.DataFrame
            Data to transform and use for rewinding state. Must fulfill input requirements
            of first step of the pipeline.

        **params : dict of str -> object
            Parameters routed to the `rewind_transform` methods of the steps. Each step must
            have requested certain metadata via `set_rewind_transform_request()` for these
            parameters to be forwarded to them.

        Returns
        -------
        X_t : pl.DataFrame
            Transformed data with warmup rows discarded.

        """
        _raise_for_params(params, self, "rewind_transform")

        routed_params = process_routing(self, "rewind_transform", **params)

        # Transform sequentially through all steps using their rewind_transform
        # Each transformer handles its own rewind and warmup discard internally
        X_t = X
        for _, name, transform in self._iter():
            X_t = _rewind_transform_one(transform, X_t, None, None, routed_params[name])

        return X_t

    def _can_inverse_transform(self) -> bool:
        """Check if all steps support `inverse_transform`.

        Returns
        -------
        bool
            True if all steps are invertible.

        """
        return all(
            t.__sklearn_tags__().transformer_tags.invertible
            for _, _, t in self._iter()
            if t.__sklearn_tags__().transformer_tags is not None
        )

    @available_if(_can_inverse_transform)
    def inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame, **params: Any) -> pl.DataFrame:
        """Apply `inverse_transform` for each step in a reverse order.

        All estimators in the pipeline must support `inverse_transform`.

        Parameters
        ----------
        X_t : pl.DataFrame
            Transformed data to inverse-transform. Must fulfill input
            requirements of the last step's ``inverse_transform`` method.

        X_p : pl.DataFrame
            Untransformed data corresponding to at least ``observation_horizon``
            immediately previous time stamps. Used by stateful steps to
            reconstruct original-space values during inverse transformation.
            When ``observation_horizon == 0``, this is unused but still required.

        **params : dict of str -> object
            Parameters requested and accepted by steps. Each step must have
            requested certain metadata for these parameters to be forwarded to
            them.

        Returns
        -------
        pl.DataFrame
            Inverse transformed data in the original feature space.
        """
        _raise_for_params(params, self, "inverse_transform")

        # we don't have to branch here, since params is only non-empty if
        # enable_metadata_routing=True.
        routed_params = process_routing(self, "inverse_transform", **params)
        reverse_iter = reversed(list(self._iter()))

        if self.observation_horizon:
            # Build X_p_iter_list by transforming X_p through each step
            # The key insight: for the first transformer's inverse, we need
            # X_p[sum_of_other_observation_horizons : observation_horizon]
            # not X_p[:first_observation_horizon]
            steps_list = list(self._iter())

            X_p_iter = X_p
            X_p_iter_list = []
            for _idx, (_, _, transform) in enumerate(steps_list[:-1]):
                # NOTE: No transform metadata is routed here. This is an internal
                # forward-pass to compute what X_p looks like in each step's
                # transformed space, not a user-facing transform call. If a
                # transformer ever requires routed metadata for this internal
                # transform, a dedicated routing mapping would be needed.
                X_p_iter = deepcopy(transform).rewind_transform(X_p_iter)
                X_p_iter_list.append(X_p_iter)

            # For the first transformer's inverse, we need the slice of X_p
            # that comes after all the memory used by other transformers
            first_transform = steps_list[0][2]
            offset = sum(t.observation_horizon for _, _, t in steps_list[1:])
            X_p_iter_list.append(X_p[offset : offset + first_transform.observation_horizon])

            # NOTE: Do NOT reverse! X_p_iter_list is built as:
            # [X_p for last step's inverse, ..., X_p for first step's inverse]
            # which matches reverse_iter: [last step, ..., first step]

            X = X_t
            for (_, name, transform), X_p_iter in zip(reverse_iter, X_p_iter_list, strict=False):
                X = transform.inverse_transform(X_t=X, X_p=X_p_iter, **routed_params[name].inverse_transform)

        else:
            X = X_t
            for _, name, transform in reverse_iter:
                X = transform.inverse_transform(X_t=X, X_p=X_p, **routed_params[name].inverse_transform)

        return X

    def get_metadata_routing(self) -> MetadataRouter:
        """Get metadata routing of this object.

        Please check [Metadata Routing User Guide](https://scikit-learn.org/stable/metadata_routing.html) on how the routing
        mechanism works.

        Returns
        -------
        routing : MetadataRouter
            A `MetadataRouter` encapsulating
            routing information.
        """
        router = MetadataRouter(owner=self.__class__.__name__)

        # first we add all steps except the last one
        for _, name, trans in self._iter(with_final=False, filter_passthrough=True):
            method_mapping = MethodMapping()
            # fit, fit_predict, and fit_transform call fit_transform if it
            # exists, or else fit and transform
            if hasattr(trans, "fit_transform"):
                (
                    method_mapping
                    .add(caller="fit", callee="fit_transform")
                    .add(caller="fit_transform", callee="fit_transform")
                    .add(caller="fit_predict", callee="fit_transform")
                )
            else:
                (
                    method_mapping
                    .add(caller="fit", callee="fit")
                    .add(caller="fit", callee="transform")
                    .add(caller="fit_transform", callee="fit")
                    .add(caller="fit_transform", callee="transform")
                    .add(caller="fit_predict", callee="fit")
                    .add(caller="fit_predict", callee="transform")
                )

            (
                method_mapping
                .add(caller="predict", callee="transform")
                .add(caller="predict", callee="transform")
                .add(caller="predict_proba", callee="transform")
                .add(caller="decision_function", callee="transform")
                .add(caller="predict_log_proba", callee="transform")
                .add(caller="transform", callee="transform")
                .add(caller="inverse_transform", callee="inverse_transform")
                .add(caller="score", callee="transform")
            )

            router.add(method_mapping=method_mapping, **{name: trans})

        final_name, final_est = self.steps[-1]
        if final_est is None or final_est == "passthrough":
            return router

        # then we add the last step
        method_mapping = MethodMapping()
        if hasattr(final_est, "fit_transform"):
            method_mapping.add(caller="fit_transform", callee="fit_transform")
        else:
            method_mapping.add(caller="fit", callee="fit").add(caller="fit", callee="transform")
        (
            method_mapping
            .add(caller="fit", callee="fit")
            .add(caller="transform", callee="transform")
            .add(caller="inverse_transform", callee="inverse_transform")
            .add(caller="score", callee="score")
        )

        router.add(method_mapping=method_mapping, **{final_name: final_est})
        return router

Methods

named_steps property

Access the steps by name.

Returns
Name Type Description
named_steps Bunch

Dictionary-like object with step names as keys.

n_features_in_ property

Number of features seen during fit.

Returns
Name Type Description
n_features_in_ int

Number of input features.

feature_names_in_ property

Names of features seen during fit.

Returns
Name Type Description
feature_names_in_ Any

Names of input features.

observation_horizon property

Get cumulative observation horizon across all steps.

Returns
Type Description
int

Total observation horizon needed.

Raises
Type Description
NotFittedError

If the pipeline has not been fitted yet.

get_params(deep=True)

Get parameters for this estimator.

Parameters
Name Type Description Default
deep bool

If True, will return the parameters for this estimator and contained subobjects that are estimators.

True
Returns
Name Type Description
params dict[str, Any]

Parameter names mapped to their values.

Source Code
Show/Hide source
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Get parameters for this estimator.

    Parameters
    ----------
    deep : bool, default=True
        If True, will return the parameters for this estimator and
        contained subobjects that are estimators.

    Returns
    -------
    params : dict[str, Any]
        Parameter names mapped to their values.

    """
    return _BaseComposition._get_params(self, attr="steps", deep=deep)

set_params(**params)

Set the parameters of this estimator.

Parameters
Name Type Description Default
**params dict

Estimator parameters.

{}
Returns
Name Type Description
self FeaturePipeline

FeaturePipeline instance.

Source Code
Show/Hide source
def set_params(self, **params: Any) -> "FeaturePipeline":
    """Set the parameters of this estimator.

    Parameters
    ----------
    **params : dict
        Estimator parameters.

    Returns
    -------
    self : FeaturePipeline
        FeaturePipeline instance.

    """
    _BaseComposition._set_params(self, attr="steps", **params)
    return self

__len__()

Return the length of the FeaturePipeline.

Returns
Name Type Description
length int

Number of steps in the pipeline.

Source Code
Show/Hide source
def __len__(self) -> int:
    """Return the length of the FeaturePipeline.

    Returns
    -------
    length : int
        Number of steps in the pipeline.

    """
    return len(self.steps)

__getitem__(ind)

Return a sub-pipeline or a single estimator in the pipeline.

Parameters
Name Type Description Default
ind int, str, or slice

Index, name, or slice of the step to retrieve.

required
Returns
Name Type Description
estimator Any

The estimator or sub-pipeline.

Source Code
Show/Hide source
def __getitem__(self, ind: int | str | slice) -> Any:
    """Return a sub-pipeline or a single estimator in the pipeline.

    Parameters
    ----------
    ind : int, str, or slice
        Index, name, or slice of the step to retrieve.

    Returns
    -------
    estimator : Any
        The estimator or sub-pipeline.

    """
    if isinstance(ind, slice):
        if ind.step is not None:
            raise ValueError("FeaturePipeline slicing only supports a step of 1")
        return self.__class__(steps=self.steps[ind], memory=self.memory, verbose=self.verbose)
    elif isinstance(ind, int):
        _, est = self.steps[ind]
        return est
    else:
        # String case - get by name
        return self.named_steps[ind]

get_feature_names_out(input_features=None)

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features list[str] | None

Input feature names.

None
Returns
Name Type Description
feature_names_out Any

Output feature names.

Source Code
Show/Hide source
def get_feature_names_out(self, input_features: list[str] | None = None) -> Any:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : list[str] | None, default=None
        Input feature names.

    Returns
    -------
    feature_names_out : Any
        Output feature names.

    """
    return super().get_feature_names_out(input_features)

__sklearn_is_fitted__()

Check if the pipeline is fitted.

Returns
Name Type Description
is_fitted bool

True if the pipeline is fitted.

Source Code
Show/Hide source
def __sklearn_is_fitted__(self) -> bool:
    """Check if the pipeline is fitted.

    Returns
    -------
    is_fitted : bool
        True if the pipeline is fitted.

    """
    return sklearn_Pipeline.__sklearn_is_fitted__(self)  # ty: ignore[invalid-argument-type]

__sklearn_tags__()

Get estimator tags.

Returns
Type Description
Tags

Estimator tags with yohou-specific attributes.

Source Code
Show/Hide source
def __sklearn_tags__(self) -> Tags:
    """Get estimator tags.

    Returns
    -------
    Tags
        Estimator tags with yohou-specific attributes.

    """
    tags = super().__sklearn_tags__()

    # Aggregate tags from steps (static capability check)
    if hasattr(self, "steps") and self.steps is not None:
        transformers = [t for _, t in self.steps if t != "passthrough" and t is not None]
        if transformers:
            assert tags.transformer_tags is not None
            assert tags.input_tags is not None
            # Stateful if any step is stateful
            tags.transformer_tags.stateful = any(
                t.__sklearn_tags__().transformer_tags.stateful for t in transformers
            )

            # Invertible if all steps are invertible
            tags.transformer_tags.invertible = all(
                t.__sklearn_tags__().transformer_tags.invertible for t in transformers
            )

            # min_value is the one of the first transformer
            tags.input_tags.min_value = transformers[0].__sklearn_tags__().input_tags.min_value

    return tags

rewind(X)

Rewind the pipeline state to the provided data.

Propagates rewind_transform() through each step in order. Each step transforms its input (dropping warmup rows) and rewinds its own memory, then passes the transformed data to the next step.

Parameters
Name Type Description Default
X DataFrame

Input time series.

required
Returns
Type Description
self
Source Code
Show/Hide source
def rewind(self, X: pl.DataFrame) -> "FeaturePipeline":
    """Rewind the pipeline state to the provided data.

    Propagates ``rewind_transform()`` through each step in order.
    Each step transforms its input (dropping warmup rows) and rewinds
    its own memory, then passes the transformed data to the next step.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series.

    Returns
    -------
    self

    """
    check_is_fitted(self)

    # Propagate rewind_transform through each step:
    # step.rewind_transform(Xt) = step.transform(Xt) + step.rewind(Xt)
    Xt = X
    for _, _, transform in self._iter():
        Xt = transform.rewind_transform(Xt)

    # Update pipeline-level tracking
    self._update_X_observed(X)

    return self

observe(X)

Observe new data and update step memory.

Propagates observe_transform() through each step in order. Each step uses its own memory to provide context, transforms the data, updates its observation buffer, then passes the result to the next step.

Parameters
Name Type Description Default
X DataFrame

New observations to incorporate.

required
Returns
Type Description
self
Raises
Type Description
ValueError

If X is not temporally continuous with previous observations.

Source Code
Show/Hide source
def observe(self, X: pl.DataFrame) -> "FeaturePipeline":
    """Observe new data and update step memory.

    Propagates ``observe_transform()`` through each step in order.
    Each step uses its own memory to provide context, transforms the
    data, updates its observation buffer, then passes the result to
    the next step.

    Parameters
    ----------
    X : pl.DataFrame
        New observations to incorporate.

    Returns
    -------
    self

    Raises
    ------
    ValueError
        If ``X`` is not temporally continuous with previous
        observations.

    """
    check_is_fitted(self)
    # Schema + continuity validation at pipeline level
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

    # Propagate observe_transform through each step:
    # step.observe_transform(Xt) = concat(_X_observed, Xt) -> transform -> observe
    Xt = X
    for _, _, transform in self._iter():
        Xt = transform.observe_transform(Xt)

    # Update pipeline-level tracking
    self._update_X_observed(X)

    return self

fit(X, y=None, **params)

Fit the model.

Fit all the transformers one after the other and sequentially transform the data. Finally, fit the transformed data using the final estimator.

Parameters
Name Type Description Default
X iterable

Training data. Must fulfill input requirements of first step of the pipeline.

required
y iterable

Training targets. Must fulfill label requirements for all steps of the pipeline.

None
**params dict of str -> object
  • If enable_metadata_routing=False (default):

    Parameters passed to the fit method of each step, where each parameter name is prefixed such that parameter p for step s has key s__p.

  • If enable_metadata_routing=True:

    Parameters requested and accepted by steps. Each step must have requested certain metadata for these parameters to be forwarded to them.

{}
Returns
Name Type Description
self object

FeaturePipeline with fitted steps.

Source Code
Show/Hide source
@_fit_context(
    # estimators in FeaturePipeline.steps are not validated yet
    prefer_skip_nested_validation=False
)
def fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params: Any) -> "FeaturePipeline":
    """Fit the model.

    Fit all the transformers one after the other and sequentially transform the
    data. Finally, fit the transformed data using the final estimator.

    Parameters
    ----------
    X : iterable
        Training data. Must fulfill input requirements of first step of the
        pipeline.

    y : iterable, default=None
        Training targets. Must fulfill label requirements for all steps of
        the pipeline.

    **params : dict of str -> object
        - If `enable_metadata_routing=False` (default):

            Parameters passed to the ``fit`` method of each step, where
            each parameter name is prefixed such that parameter ``p`` for step
            ``s`` has key ``s__p``.

        - If `enable_metadata_routing=True`:

            Parameters requested and accepted by steps. Each step must have
            requested certain metadata for these parameters to be forwarded to
            them.

    Returns
    -------
    self : object
        FeaturePipeline with fitted steps.
    """
    routed_params = self._check_method_params(method="fit", props=params)
    X_t = self._fit(X, y, routed_params)
    with _print_elapsed_time("FeaturePipeline", self._log_message(len(self.steps) - 1)):
        if self._final_estimator != "passthrough":
            last_step_params = routed_params[self.steps[-1][0]]
            self._final_estimator.fit(X_t, y, **last_step_params["fit"])

    # Set pipeline-level attributes for rewind/observe validation.
    # Individual steps are already fitted with their own attributes.
    self.X_schema_ = dict(X.select(~cs.by_name("time")).schema)
    self._observation_horizon = self.observation_horizon
    self._update_X_observed(X)

    return self

fit_transform(X, y=None, **params)

Fit the model and transform with the final estimator.

Fit all the transformers one after the other and sequentially transform the data. Only valid if the final estimator either implements fit_transform or fit and transform.

Parameters
Name Type Description Default
X iterable

Training data. Must fulfill input requirements of first step of the pipeline.

required
y iterable

Training targets. Must fulfill label requirements for all steps of the pipeline.

None
**params dict of str -> object
  • If enable_metadata_routing=False (default):

    Parameters passed to the fit method of each step, where each parameter name is prefixed such that parameter p for step s has key s__p.

  • If enable_metadata_routing=True:

    Parameters requested and accepted by steps. Each step must have requested certain metadata for these parameters to be forwarded to them.

{}
Returns
Name Type Description
X_t ndarray of shape (n_samples, n_transformed_features)

Transformed samples.

Source Code
Show/Hide source
@_fit_context(
    # estimators in FeaturePipeline.steps are not validated yet
    prefer_skip_nested_validation=False
)
def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params: Any) -> pl.DataFrame:
    """Fit the model and transform with the final estimator.

    Fit all the transformers one after the other and sequentially transform
    the data. Only valid if the final estimator either implements
    `fit_transform` or `fit` and `transform`.

    Parameters
    ----------
    X : iterable
        Training data. Must fulfill input requirements of first step of the
        pipeline.

    y : iterable, default=None
        Training targets. Must fulfill label requirements for all steps of
        the pipeline.

    **params : dict of str -> object
        - If `enable_metadata_routing=False` (default):

            Parameters passed to the ``fit`` method of each step, where
            each parameter name is prefixed such that parameter ``p`` for step
            ``s`` has key ``s__p``.

        - If `enable_metadata_routing=True`:

            Parameters requested and accepted by steps. Each step must have
            requested certain metadata for these parameters to be forwarded to
            them.

    Returns
    -------
    X_t : ndarray of shape (n_samples, n_transformed_features)
        Transformed samples.
    """
    routed_params = self._check_method_params(method="fit_transform", props=params)
    X_t = self._fit(X, y, routed_params)

    last_step = self._final_estimator
    with _print_elapsed_time("FeaturePipeline", self._log_message(len(self.steps) - 1)):
        if last_step == "passthrough":
            return X_t

        last_step_params = routed_params[self.steps[-1][0]]
        result = last_step.fit_transform(X_t, y, **last_step_params["fit_transform"])
        return result

transform(X, **params)

Transform the data, and apply transform with the final estimator.

Call transform of each transformer in the pipeline. The transformed data are finally passed to the final estimator that calls transform method. Only valid if the final estimator implements transform.

This also works where final estimator is None in which case all prior transformations are applied.

Parameters
Name Type Description Default
X iterable

Data to transform. Must fulfill input requirements of first step of the pipeline.

required
**params dict of str -> object

Parameters requested and accepted by steps. Each step must have requested certain metadata for these parameters to be forwarded to them.

{}
Returns
Name Type Description
X_t ndarray of shape (n_samples, n_transformed_features)

Transformed data.

Source Code
Show/Hide source
def transform(self, X: pl.DataFrame, **params: Any) -> pl.DataFrame:
    """Transform the data, and apply `transform` with the final estimator.

    Call `transform` of each transformer in the pipeline. The transformed
    data are finally passed to the final estimator that calls
    `transform` method. Only valid if the final estimator
    implements `transform`.

    This also works where final estimator is `None` in which case all prior
    transformations are applied.

    Parameters
    ----------
    X : iterable
        Data to transform. Must fulfill input requirements of first step
        of the pipeline.

    **params : dict of str -> object
        Parameters requested and accepted by steps. Each step must have
        requested certain metadata for these parameters to be forwarded to
        them.

    Returns
    -------
    X_t : ndarray of shape (n_samples, n_transformed_features)
        Transformed data.
    """
    _raise_for_params(params, self, "transform")

    # not branching here since params is only available if
    # enable_metadata_routing=True
    routed_params = process_routing(self, "transform", **params)
    X_t = X
    for _, name, transform in self._iter():
        X_t = transform.transform(X_t, **routed_params[name].transform)
    return X_t

observe_transform(X, **params)

Observe and transform the data through the pipeline.

This method atomically observes each transformer with new data and transforms it in sequence. The transformation uses the pre-observe state, then updates the memory. This is more efficient and correct than calling observe() then transform() separately.

Parameters
Name Type Description Default
X DataFrame

New data to observe and transform. Must fulfill input requirements of first step of the pipeline.

required
**params dict of str -> object

Parameters routed to the transform methods of the steps. Each step must have requested certain metadata via set_transform_request() for these parameters to be forwarded to them.

{}
Returns
Name Type Description
X_t DataFrame

Transformed data corresponding to the new input rows.

Source Code
Show/Hide source
def observe_transform(self, X: pl.DataFrame, **params: Any) -> pl.DataFrame:
    """Observe and transform the data through the pipeline.

    This method atomically observes each transformer with new data and
    transforms it in sequence. The transformation uses the pre-observe state,
    then updates the memory. This is more efficient and correct than calling
    observe() then transform() separately.

    Parameters
    ----------
    X : pl.DataFrame
        New data to observe and transform. Must fulfill input requirements
        of first step of the pipeline.

    **params : dict of str -> object
        Parameters routed to the `transform` methods of the steps. Each step must
        have requested certain metadata via `set_transform_request()` for these
        parameters to be forwarded to them.

    Returns
    -------
    X_t : pl.DataFrame
        Transformed data corresponding to the new input rows.

    """
    _raise_for_params(params, self, "observe_transform")

    routed_params = process_routing(self, "observe_transform", **params)

    # Transform sequentially through all steps using their observe_transform
    # Each transformer handles its own memory management internally
    X_t = X
    for _, name, transform in self._iter():
        X_t = _observe_transform_one(transform, X_t, None, None, routed_params[name])

    return X_t

rewind_transform(X, **params)

Rewind and transform the data through the pipeline.

This method applies rewind_transform semantics to each transformer in sequence: transforms from scratch without using pre-existing memory, discards warmup rows, and rewinds the internal state with the input data.

Parameters
Name Type Description Default
X DataFrame

Data to transform and use for rewinding state. Must fulfill input requirements of first step of the pipeline.

required
**params dict of str -> object

Parameters routed to the rewind_transform methods of the steps. Each step must have requested certain metadata via set_rewind_transform_request() for these parameters to be forwarded to them.

{}
Returns
Name Type Description
X_t DataFrame

Transformed data with warmup rows discarded.

Source Code
Show/Hide source
def rewind_transform(self, X: pl.DataFrame, **params: Any) -> pl.DataFrame:
    """Rewind and transform the data through the pipeline.

    This method applies rewind_transform semantics to each transformer in sequence:
    transforms from scratch without using pre-existing memory, discards warmup rows,
    and rewinds the internal state with the input data.

    Parameters
    ----------
    X : pl.DataFrame
        Data to transform and use for rewinding state. Must fulfill input requirements
        of first step of the pipeline.

    **params : dict of str -> object
        Parameters routed to the `rewind_transform` methods of the steps. Each step must
        have requested certain metadata via `set_rewind_transform_request()` for these
        parameters to be forwarded to them.

    Returns
    -------
    X_t : pl.DataFrame
        Transformed data with warmup rows discarded.

    """
    _raise_for_params(params, self, "rewind_transform")

    routed_params = process_routing(self, "rewind_transform", **params)

    # Transform sequentially through all steps using their rewind_transform
    # Each transformer handles its own rewind and warmup discard internally
    X_t = X
    for _, name, transform in self._iter():
        X_t = _rewind_transform_one(transform, X_t, None, None, routed_params[name])

    return X_t

inverse_transform(X_t, X_p, **params)

Apply inverse_transform for each step in a reverse order.

All estimators in the pipeline must support inverse_transform.

Parameters
Name Type Description Default
X_t DataFrame

Transformed data to inverse-transform. Must fulfill input requirements of the last step's inverse_transform method.

required
X_p DataFrame

Untransformed data corresponding to at least observation_horizon immediately previous time stamps. Used by stateful steps to reconstruct original-space values during inverse transformation. When observation_horizon == 0, this is unused but still required.

required
**params dict of str -> object

Parameters requested and accepted by steps. Each step must have requested certain metadata for these parameters to be forwarded to them.

{}
Returns
Type Description
DataFrame

Inverse transformed data in the original feature space.

Source Code
Show/Hide source
@available_if(_can_inverse_transform)
def inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame, **params: Any) -> pl.DataFrame:
    """Apply `inverse_transform` for each step in a reverse order.

    All estimators in the pipeline must support `inverse_transform`.

    Parameters
    ----------
    X_t : pl.DataFrame
        Transformed data to inverse-transform. Must fulfill input
        requirements of the last step's ``inverse_transform`` method.

    X_p : pl.DataFrame
        Untransformed data corresponding to at least ``observation_horizon``
        immediately previous time stamps. Used by stateful steps to
        reconstruct original-space values during inverse transformation.
        When ``observation_horizon == 0``, this is unused but still required.

    **params : dict of str -> object
        Parameters requested and accepted by steps. Each step must have
        requested certain metadata for these parameters to be forwarded to
        them.

    Returns
    -------
    pl.DataFrame
        Inverse transformed data in the original feature space.
    """
    _raise_for_params(params, self, "inverse_transform")

    # we don't have to branch here, since params is only non-empty if
    # enable_metadata_routing=True.
    routed_params = process_routing(self, "inverse_transform", **params)
    reverse_iter = reversed(list(self._iter()))

    if self.observation_horizon:
        # Build X_p_iter_list by transforming X_p through each step
        # The key insight: for the first transformer's inverse, we need
        # X_p[sum_of_other_observation_horizons : observation_horizon]
        # not X_p[:first_observation_horizon]
        steps_list = list(self._iter())

        X_p_iter = X_p
        X_p_iter_list = []
        for _idx, (_, _, transform) in enumerate(steps_list[:-1]):
            # NOTE: No transform metadata is routed here. This is an internal
            # forward-pass to compute what X_p looks like in each step's
            # transformed space, not a user-facing transform call. If a
            # transformer ever requires routed metadata for this internal
            # transform, a dedicated routing mapping would be needed.
            X_p_iter = deepcopy(transform).rewind_transform(X_p_iter)
            X_p_iter_list.append(X_p_iter)

        # For the first transformer's inverse, we need the slice of X_p
        # that comes after all the memory used by other transformers
        first_transform = steps_list[0][2]
        offset = sum(t.observation_horizon for _, _, t in steps_list[1:])
        X_p_iter_list.append(X_p[offset : offset + first_transform.observation_horizon])

        # NOTE: Do NOT reverse! X_p_iter_list is built as:
        # [X_p for last step's inverse, ..., X_p for first step's inverse]
        # which matches reverse_iter: [last step, ..., first step]

        X = X_t
        for (_, name, transform), X_p_iter in zip(reverse_iter, X_p_iter_list, strict=False):
            X = transform.inverse_transform(X_t=X, X_p=X_p_iter, **routed_params[name].inverse_transform)

    else:
        X = X_t
        for _, name, transform in reverse_iter:
            X = transform.inverse_transform(X_t=X, X_p=X_p, **routed_params[name].inverse_transform)

    return X

get_metadata_routing()

Get metadata routing of this object.

Please check Metadata Routing User Guide on how the routing mechanism works.

Returns
Name Type Description
routing MetadataRouter

A MetadataRouter encapsulating routing information.

Source Code
Show/Hide source
def get_metadata_routing(self) -> MetadataRouter:
    """Get metadata routing of this object.

    Please check [Metadata Routing User Guide](https://scikit-learn.org/stable/metadata_routing.html) on how the routing
    mechanism works.

    Returns
    -------
    routing : MetadataRouter
        A `MetadataRouter` encapsulating
        routing information.
    """
    router = MetadataRouter(owner=self.__class__.__name__)

    # first we add all steps except the last one
    for _, name, trans in self._iter(with_final=False, filter_passthrough=True):
        method_mapping = MethodMapping()
        # fit, fit_predict, and fit_transform call fit_transform if it
        # exists, or else fit and transform
        if hasattr(trans, "fit_transform"):
            (
                method_mapping
                .add(caller="fit", callee="fit_transform")
                .add(caller="fit_transform", callee="fit_transform")
                .add(caller="fit_predict", callee="fit_transform")
            )
        else:
            (
                method_mapping
                .add(caller="fit", callee="fit")
                .add(caller="fit", callee="transform")
                .add(caller="fit_transform", callee="fit")
                .add(caller="fit_transform", callee="transform")
                .add(caller="fit_predict", callee="fit")
                .add(caller="fit_predict", callee="transform")
            )

        (
            method_mapping
            .add(caller="predict", callee="transform")
            .add(caller="predict", callee="transform")
            .add(caller="predict_proba", callee="transform")
            .add(caller="decision_function", callee="transform")
            .add(caller="predict_log_proba", callee="transform")
            .add(caller="transform", callee="transform")
            .add(caller="inverse_transform", callee="inverse_transform")
            .add(caller="score", callee="transform")
        )

        router.add(method_mapping=method_mapping, **{name: trans})

    final_name, final_est = self.steps[-1]
    if final_est is None or final_est == "passthrough":
        return router

    # then we add the last step
    method_mapping = MethodMapping()
    if hasattr(final_est, "fit_transform"):
        method_mapping.add(caller="fit_transform", callee="fit_transform")
    else:
        method_mapping.add(caller="fit", callee="fit").add(caller="fit", callee="transform")
    (
        method_mapping
        .add(caller="fit", callee="fit")
        .add(caller="transform", callee="transform")
        .add(caller="inverse_transform", callee="inverse_transform")
        .add(caller="score", callee="score")
    )

    router.add(method_mapping=method_mapping, **{final_name: final_est})
    return router

Tutorials

The following example notebooks use this component:

  • How to Clean Time Series Data


    Data-Features

    End-to-end data cleaning pipeline combining SimpleTimeImputer and SeasonalImputer for missing values with OutlierThresholdHandler for anomaly clipping.

    View · Open in marimo

  • How to Build a Feature Pipeline


    Data-Features

    Nest FeaturePipeline, FeatureUnion, and DecompositionPipeline for multi-level feature engineering with trend-season-residual decomposition.

    View · Open in marimo

  • Class-Probability Forecasting


    Getting-Started

    Forecast air quality categories using ClassProbaReductionForecaster, producing a probability distribution over four WHO air quality classes.

    View · Open in marimo

  • Forecasting Workflow


    Getting-Started

    Evaluate forecasters with cross-validation, search hyperparameters with GridSearchCV, and inspect residuals to diagnose model weaknesses.

    View · Open in marimo

  • Interval Forecasting


    Getting-Started

    Wrap a point forecaster with SplitConformalForecaster to produce 95% prediction intervals with statistical coverage guarantees.

    View · Open in marimo

  • How to Build Panel Feature Pipelines


    Panel-Data

    Combine ColumnForecaster, FeaturePipeline, FeatureUnion, and DecompositionPipeline on panel data with per-group scoring on KDD Cup air quality.

    View · Open in marimo

  • Quickstart


    Quickstart

    Comprehensive end-to-end tour of yohou beyond the Getting Started tutorials, covering data loading, baseline forecasting, preprocessing pipelines, decomposition, cross-validation search, and interval prediction.

    View · Open in marimo