Skip to content

NumericalIntegrator

yohou.preprocessing.signal.NumericalIntegrator

Bases: BaseTransformer

Numerical integration transformer for time series signals.

Integrates each feature column using scipy's cumulative integration methods. The integration uses the time column to determine the sampling interval.

This transformer is stateful: it maintains a running integral offset between transform calls, enabling streaming/chunked processing. Each transform call continues the integration from where the previous chunk left off.

Parameters

Name Type Description Default
method (cumulative_trapezoid, cumulative_simpson)

The scipy.integrate method to use for cumulative integration. - "cumulative_trapezoid": Trapezoidal rule integration (faster, less accurate) - "cumulative_simpson": Simpson's rule integration (slower, more accurate)

"cumulative_trapezoid"

Attributes

Name Type Description
interval_ str

Detected time interval string (e.g., '1d', '1h', '1s').

sampling_interval_ float

Sampling interval in seconds derived from interval_.

Examples

>>> import polars as pl
>>> from datetime import datetime, timedelta
>>> time = [datetime(2020, 1, 1) + timedelta(seconds=i * 0.001) for i in range(100)]
>>> X = pl.DataFrame({"time": time, "signal": [float(i) for i in range(100)]})
>>> transformer = NumericalIntegrator(method="cumulative_trapezoid")
>>> transformer.fit(X)
NumericalIntegrator(...)
>>> X_t = transformer.transform(X)
>>> "time" in X_t.columns
True

Notes

Statefulness: The integrator maintains the last transformed value between transform calls via _X_t_observed_. This enables accurate streaming integration where each chunk continues seamlessly from the previous one. The last input values are stored in _X_observed for proper trapezoid boundary calculation.

Use rewind() to clear the integration state and start fresh.

  • For cumulative_trapezoid, the output has the same length as input
  • Inverse transform uses numerical differentiation (np.gradient)

See Also

  • NumericalDifferentiator : Numerical differentiation transformer. scipy.integrate.cumulative_trapezoid : Trapezoidal integration. scipy.integrate.cumulative_simpson : Simpson's rule integration.

Source Code

Show/Hide source
class NumericalIntegrator(BaseTransformer):
    """Numerical integration transformer for time series signals.

    Integrates each feature column using scipy's cumulative integration methods.
    The integration uses the time column to determine the sampling interval.

    This transformer is **stateful**: it maintains a running integral offset
    between transform calls, enabling streaming/chunked processing. Each
    transform call continues the integration from where the previous chunk
    left off.

    Parameters
    ----------
    method : {"cumulative_trapezoid", "cumulative_simpson"}, default="cumulative_trapezoid"
        The scipy.integrate method to use for cumulative integration.
        - "cumulative_trapezoid": Trapezoidal rule integration (faster, less accurate)
        - "cumulative_simpson": Simpson's rule integration (slower, more accurate)

    Attributes
    ----------
    interval_ : str
        Detected time interval string (e.g., '1d', '1h', '1s').

    sampling_interval_ : float
        Sampling interval in seconds derived from interval_.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime, timedelta
    >>> time = [datetime(2020, 1, 1) + timedelta(seconds=i * 0.001) for i in range(100)]
    >>> X = pl.DataFrame({"time": time, "signal": [float(i) for i in range(100)]})
    >>> transformer = NumericalIntegrator(method="cumulative_trapezoid")
    >>> transformer.fit(X)
    NumericalIntegrator(...)
    >>> X_t = transformer.transform(X)
    >>> "time" in X_t.columns
    True

    Notes
    -----
    **Statefulness**: The integrator maintains the last transformed value
    between transform calls via ``_X_t_observed_``. This enables accurate
    streaming integration where each chunk continues seamlessly from the
    previous one. The last input values are stored in ``_X_observed`` for
    proper trapezoid boundary calculation.

    Use ``rewind()`` to clear the integration state and start fresh.

    - For cumulative_trapezoid, the output has the same length as input
    - Inverse transform uses numerical differentiation (np.gradient)

    See Also
    --------
    - [`NumericalDifferentiator`][yohou.preprocessing.signal.NumericalDifferentiator] : Numerical differentiation transformer.
    `scipy.integrate.cumulative_trapezoid` : Trapezoidal integration.
    `scipy.integrate.cumulative_simpson` : Simpson's rule integration.

    """

    _parameter_constraints: dict = {
        "method": [StrOptions({"cumulative_trapezoid", "cumulative_simpson"})],
    }

    _tags = {"stateful": True, "invertible": True}

    def __init__(
        self,
        method: Literal["cumulative_trapezoid", "cumulative_simpson"] = "cumulative_trapezoid",
    ):
        self.method = method

    def _fit(self, X: pl.DataFrame, y: pl.DataFrame | None = None) -> None:
        """Fit the internal model."""
        # Detect interval using utility function
        self.interval_ = check_interval_consistency(X)
        td = interval_to_timedelta(self.interval_)
        if td is None:
            raise ValueError(
                f"NumericalIntegrator requires fixed-length intervals, but got variable interval: {self.interval_}"
            )
        self.sampling_interval_ = td.total_seconds()

        # Initialize state tracking
        # _last_X_value_: Last input value per column (for trapezoid boundary)
        # _X_t_observed_: Last transformed value per column (running integral offset)
        self._last_X_value_: dict[str, float] = {}
        self._X_t_observed_: dict[str, float] = {}

    def rewind(self, X: pl.DataFrame) -> "NumericalIntegrator":
        """Rewind the integration state and observation horizon.

        Clears the running integral offset, so the next transform call
        starts integration from zero.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series to set new observation window.

        Returns
        -------
        self

        """
        # Rewind state tracking
        self._last_X_value_ = {}
        self._X_t_observed_ = {}
        # Call parent rewind
        BaseTransformer.rewind(self, X)
        return self

    def _transform(self, X: pl.DataFrame) -> pl.DataFrame:
        """Integrate each feature column.

        Parameters
        ----------
        X : pl.DataFrame
            Validated input time series.

        Returns
        -------
        pl.DataFrame
            Integrated time series.

        """
        time = X.select(cs.by_name("time"))
        data = X.select(~cs.by_name("time"))

        integrator_func = getattr(scipy.integrate, self.method)
        dt = self.sampling_interval_

        # Integrate each column
        result_cols = {}
        for col_name in data.columns:
            col_values = data[col_name].to_numpy()

            # Handle chunk boundary with trapezoid rule
            boundary_integral = 0.0
            if col_name in self._last_X_value_ and self.method == "cumulative_trapezoid":
                # Add half-step for proper boundary: 0.5 * (last + first) * dt
                last_val = self._last_X_value_[col_name]
                boundary_integral = 0.5 * (last_val + col_values[0]) * dt

            # Get offset from previous chunks
            offset = self._X_t_observed_.get(col_name, 0.0) + boundary_integral

            # Perform integration with initial=0, then add offset
            integrated = integrator_func(col_values, x=None, dx=dt, initial=0.0)
            integrated = integrated + offset

            # Store state for next chunk
            self._X_t_observed_[col_name] = float(integrated[-1])
            self._last_X_value_[col_name] = float(col_values[-1])

            result_cols[col_name] = integrated

        X_t = pl.DataFrame(result_cols)
        feature_names = self.get_feature_names_out()
        X_t = X_t.rename(dict(zip(X_t.columns, feature_names, strict=False)))
        X_t = pl.concat([time, X_t], how="horizontal")

        return X_t

    def observe_transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
        """Transform new data and update integration state without rewind.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        """
        check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_", "sampling_interval_"])
        X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

        X_t = self.transform(X, **params)
        # Update observed time without clearing integration state
        self._update_X_observed(X)
        return X_t

    def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> pl.DataFrame:
        """Fit and transform in one step.

        Parameters
        ----------
        X : pl.DataFrame
            Input time series with a ``"time"`` column (datetime) and one or
            more numeric columns.
        y : pl.DataFrame or None, default=None
            Ignored.  Present for API compatibility.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame
            Transformed time series with a ``"time"`` column and transformed
            value columns.

        """
        self.fit(X, y)
        return self.transform(X, **params)

    def _inverse_transform(self, X_t: pl.DataFrame, X_p: pl.DataFrame | None = None) -> pl.DataFrame:
        """Differentiate to reverse integration.

        Parameters
        ----------
        X_t : pl.DataFrame
            Integrated time series.
        X_p : pl.DataFrame or None
            Not used for this stateless transformer.

        Returns
        -------
        pl.DataFrame
            Inverse-transformed time series.

        """
        X_t, _ = validate_transformer_data(
            self,
            X=X_t,
            reset=False,
            inverse=True,
            X_p=X_p,
            observation_horizon=0,
        )

        time = X_t.select(cs.by_name("time"))
        data = X_t.select(~cs.by_name("time"))

        dt = self.sampling_interval_

        # Differentiate each column
        result_cols = {}
        for col_name in data.columns:
            col_values = data[col_name].to_numpy()
            differentiated = np.gradient(col_values, dt)
            result_cols[col_name] = differentiated

        X = pl.DataFrame(result_cols)
        X = X.rename(dict(zip(X.columns, self.feature_names_in_, strict=False)))
        X = pl.concat([time, X], how="horizontal")

        return X

    def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
        """Get output feature names for transformation.

        Parameters
        ----------
        input_features : array-like of str or None, default=None
            Column names of the input features.  If ``None``, uses the
            feature names seen during ``fit``.

        Returns
        -------
        list of str
            Output feature names after transformation.

        """
        input_features = _check_feature_names_in(self, input_features)
        return [f"{col}_integrated" for col in input_features]

Methods

rewind(X)

Rewind the integration state and observation horizon.

Clears the running integral offset, so the next transform call starts integration from zero.

Parameters
Name Type Description Default
X DataFrame

Input time series to set new observation window.

required
Returns
Type Description
self
Source Code
Show/Hide source
def rewind(self, X: pl.DataFrame) -> "NumericalIntegrator":
    """Rewind the integration state and observation horizon.

    Clears the running integral offset, so the next transform call
    starts integration from zero.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series to set new observation window.

    Returns
    -------
    self

    """
    # Rewind state tracking
    self._last_X_value_ = {}
    self._X_t_observed_ = {}
    # Call parent rewind
    BaseTransformer.rewind(self, X)
    return self

observe_transform(X, **params)

Transform new data and update integration state without rewind.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with a "time" column and transformed value columns.

Source Code
Show/Hide source
def observe_transform(self, X: pl.DataFrame, **params) -> pl.DataFrame:
    """Transform new data and update integration state without rewind.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with a ``"time"`` column and transformed
        value columns.

    """
    check_is_fitted(self, ["X_schema_", "feature_names_in_", "n_features_in_", "sampling_interval_"])
    X = validate_transformer_data(self, X=X, reset=False, check_continuity=True)

    X_t = self.transform(X, **params)
    # Update observed time without clearing integration state
    self._update_X_observed(X)
    return X_t

fit_transform(X, y=None, **params)

Fit and transform in one step.

Parameters
Name Type Description Default
X DataFrame

Input time series with a "time" column (datetime) and one or more numeric columns.

required
y DataFrame or None

Ignored. Present for API compatibility.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame

Transformed time series with a "time" column and transformed value columns.

Source Code
Show/Hide source
def fit_transform(self, X: pl.DataFrame, y: pl.DataFrame | None = None, **params) -> pl.DataFrame:
    """Fit and transform in one step.

    Parameters
    ----------
    X : pl.DataFrame
        Input time series with a ``"time"`` column (datetime) and one or
        more numeric columns.
    y : pl.DataFrame or None, default=None
        Ignored.  Present for API compatibility.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame
        Transformed time series with a ``"time"`` column and transformed
        value columns.

    """
    self.fit(X, y)
    return self.transform(X, **params)

get_feature_names_out(input_features=None)

Get output feature names for transformation.

Parameters
Name Type Description Default
input_features array-like of str or None

Column names of the input features. If None, uses the feature names seen during fit.

None
Returns
Type Description
list of str

Output feature names after transformation.

Source Code
Show/Hide source
def get_feature_names_out(self, input_features: list[str] | None = None) -> list[str]:
    """Get output feature names for transformation.

    Parameters
    ----------
    input_features : array-like of str or None, default=None
        Column names of the input features.  If ``None``, uses the
        feature names seen during ``fit``.

    Returns
    -------
    list of str
        Output feature names after transformation.

    """
    input_features = _check_feature_names_in(self, input_features)
    return [f"{col}_integrated" for col in input_features]

Tutorials

The following example notebooks use this component:

  • How to Apply Signal Processing Filters


    Data-Features

    Apply NumericalFilter (Butterworth, Chebyshev, Bessel), NumericalDifferentiator, and NumericalIntegrator for signal smoothing and rate-of-change extraction.

    View · Open in marimo