Skip to content

VotingPointForecaster

yohou.ensemble.voting_point.VotingPointForecaster

Bases: _BaseEnsembleForecaster, BasePointForecaster, _BaseComposition

Combines point predictions from multiple forecasters via averaging.

Aggregates point predictions using mean or median from all base forecasters. All base forecasters must support predict().

If a base forecaster fails during fit, it is silently skipped with a warning. The ensemble raises only when all base forecasters fail.

Parameters

Name Type Description Default
forecasters list of (name, forecaster) tuples

Named base forecasters to combine. Each entry is a (name, forecaster) tuple where name is a unique string identifier and forecaster is a BaseForecaster instance.

required
method ('mean', 'median')

Aggregation method for point predictions. "mean" computes the (optionally weighted) arithmetic mean; "median" computes the unweighted median (weights are ignored).

"mean"
weights list of float or None

Per-forecaster weights used when method="mean". Raw values are passed to numpy.average which normalizes internally. Silently ignored when method="median". Length must match the number of forecasters.

None
n_jobs int or None

Number of parallel jobs for fitting base forecasters. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors.

None

Attributes

Name Type Description
forecasters_ list of (str, BaseForecaster)

Successfully fitted base forecasters as (name, forecaster) pairs. Forecasters that failed during fit are excluded.

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> from yohou.ensemble import VotingPointForecaster
>>> from yohou.point import SeasonalNaive
>>>
>>> time = pl.datetime_range(
...     start=datetime(2020, 1, 1), end=datetime(2020, 4, 9), interval="1d", eager=True
... )
>>> y = pl.DataFrame({"time": time, "value": range(len(time))})
>>>
>>> forecaster = VotingPointForecaster(
...     forecasters=[
...         ("naive_1", SeasonalNaive(seasonality=1)),
...         ("naive_7", SeasonalNaive(seasonality=7)),
...     ],
...     method="mean",
... )
>>> forecaster.fit(y, forecasting_horizon=3)
VotingPointForecaster(...)
>>> y_pred = forecaster.predict(forecasting_horizon=3)
>>> len(y_pred)
3

See Also

Notes

  • All base forecasters must predict the same target columns. A ValueError is raised after fitting if schemas differ.
  • Weights are only used with method="mean"; they are silently ignored with method="median".

Source Code

Show/Hide source
class VotingPointForecaster(_BaseEnsembleForecaster, BasePointForecaster, _BaseComposition):
    """Combines point predictions from multiple forecasters via averaging.

    Aggregates point predictions using mean or median from all base
    forecasters. All base forecasters must support ``predict()``.

    If a base forecaster fails during ``fit``, it is silently skipped
    with a warning. The ensemble raises only when all base forecasters
    fail.

    Parameters
    ----------
    forecasters : list of (name, forecaster) tuples
        Named base forecasters to combine. Each entry is a
        ``(name, forecaster)`` tuple where *name* is a unique string
        identifier and *forecaster* is a `BaseForecaster` instance.
    method : {"mean", "median"}, default="mean"
        Aggregation method for point predictions. ``"mean"`` computes
        the (optionally weighted) arithmetic mean; ``"median"`` computes
        the unweighted median (``weights`` are ignored).
    weights : list of float or None, default=None
        Per-forecaster weights used when ``method="mean"``. Raw values
        are passed to ``numpy.average`` which normalizes internally.
        Silently ignored when ``method="median"``. Length must match the
        number of forecasters.
    n_jobs : int or None, default=None
        Number of parallel jobs for fitting base forecasters.
        ``None`` means 1 unless in a ``joblib.parallel_backend`` context.
        ``-1`` means using all processors.

    Attributes
    ----------
    forecasters_ : list of (str, BaseForecaster)
        Successfully fitted base forecasters as ``(name, forecaster)``
        pairs. Forecasters that failed during ``fit`` are excluded.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> from yohou.ensemble import VotingPointForecaster
    >>> from yohou.point import SeasonalNaive
    >>>
    >>> time = pl.datetime_range(
    ...     start=datetime(2020, 1, 1), end=datetime(2020, 4, 9), interval="1d", eager=True
    ... )
    >>> y = pl.DataFrame({"time": time, "value": range(len(time))})
    >>>
    >>> forecaster = VotingPointForecaster(
    ...     forecasters=[
    ...         ("naive_1", SeasonalNaive(seasonality=1)),
    ...         ("naive_7", SeasonalNaive(seasonality=7)),
    ...     ],
    ...     method="mean",
    ... )
    >>> forecaster.fit(y, forecasting_horizon=3)  # doctest: +ELLIPSIS
    VotingPointForecaster(...)
    >>> y_pred = forecaster.predict(forecasting_horizon=3)
    >>> len(y_pred)
    3

    See Also
    --------
    - [`VotingIntervalForecaster`][yohou.ensemble.voting_interval.VotingIntervalForecaster] : Ensemble for interval forecasters.
    - [`VotingClassProbaForecaster`][yohou.ensemble.voting_class_proba.VotingClassProbaForecaster] : Ensemble for class-probability forecasters.
    - [`ColumnForecaster`][yohou.compose.column_forecaster.ColumnForecaster] : Apply different forecasters to different column subsets.
    - [`LocalPanelForecaster`][yohou.compose.local_panel_forecaster.LocalPanelForecaster] : Fit independent clones per panel group.

    Notes
    -----
    - All base forecasters must predict the same target columns. A
      ``ValueError`` is raised after fitting if schemas differ.
    - Weights are only used with ``method="mean"``; they are silently
      ignored with ``method="median"``.

    """

    _parameter_constraints: dict = {
        "forecasters": [list],
        "method": [StrOptions({"mean", "median"})],
        "weights": [list, None],
        "n_jobs": [Integral, None],
    }

    def __init__(
        self,
        forecasters: list[tuple[str, BaseForecaster]],
        *,
        method: Literal["mean", "median"] = "mean",
        weights: list[float] | None = None,
        n_jobs: int | None = None,
    ):
        super().__init__()
        self.forecasters = forecasters
        self.method = method
        self.weights = weights
        self.n_jobs = n_jobs

    def __sklearn_tags__(self) -> Tags:
        """Get estimator tags.

        Returns
        -------
        Tags
            Estimator tags with yohou-specific attributes.

        """
        tags = super().__sklearn_tags__()
        assert tags.forecaster_tags is not None

        tags.forecaster_tags.forecaster_type = POINT
        tags.forecaster_tags.tracks_observations = False
        tags.forecaster_tags.supports_panel_data = True

        forecasters_to_check = (
            [f for _, f in self.forecasters_] if hasattr(self, "forecasters_") else [f for _, f in self.forecasters]
        )

        if forecasters_to_check:
            tags.forecaster_tags.stateful = any(
                getattr(f.__sklearn_tags__().forecaster_tags, "stateful", False) for f in forecasters_to_check
            )

        return tags

    @_fit_context(prefer_skip_nested_validation=True)
    def fit(
        self,
        y: pl.DataFrame,
        X_actual: pl.DataFrame | None = None,
        forecasting_horizon: StrictInt = 1,
        X_future: pl.DataFrame | None = None,
        X_forecast: pl.DataFrame | None = None,
        **params,
    ) -> VotingPointForecaster:
        """Fit all base forecasters on the same data.

        Parameters
        ----------
        y : pl.DataFrame
            Target time series with ``"time"`` column.
        X_actual : pl.DataFrame or None, default=None
            Actual feature observations with a ``"time"`` column aligned
            with ``y``. Forwarded to each child forecaster.
        forecasting_horizon : int, default=1
            Number of steps ahead to forecast.
        X_future : pl.DataFrame or None, default=None
            Known future features with ``"time"`` column.
        X_forecast : pl.DataFrame or None, default=None
            External forecasts with ``"vintage_time"`` and ``"time"`` columns.
        **params : dict
            Metadata routing parameters forwarded to base forecasters.

        Returns
        -------
        self
            Fitted ensemble.

        Raises
        ------
        ValueError
            If ``weights`` length does not match the number of forecasters,
            or if fitted forecasters have mismatched target column schemas.
        RuntimeError
            If all base forecasters fail during fitting.

        """
        _raise_for_params(params, self, "fit")
        routed_params = process_routing(self, "fit", **params)

        if forecasting_horizon < 1:
            raise ValueError(f"forecasting_horizon must be >= 1, got {forecasting_horizon}")

        self._validate_forecasters_list()

        if self.weights is not None and len(self.weights) != len(self.forecasters):
            raise ValueError(
                f"Number of weights ({len(self.weights)}) must match number of forecasters ({len(self.forecasters)})"
            )

        self.forecasters_ = self._fit_forecasters_parallel(
            y=y,
            X_actual=X_actual,
            forecasting_horizon=forecasting_horizon,
            routed_params=routed_params,
            n_jobs=self.n_jobs,
            X_future=X_future,
            X_forecast=X_forecast,
        )

        self._validate_schemas_match()
        self._derive_fitted_attributes(self.forecasters_[0][1], forecasting_horizon, y, X_actual)
        self._compute_effective_weights()

        return self

    def predict(  # ty: ignore[invalid-method-override]
        self,
        forecasting_horizon: StrictInt | None = None,
        groups: list[str] | None = None,
        predict_transformed: bool = False,
        X_future: pl.DataFrame | None = None,
        X_forecast: pl.DataFrame | None = None,
        **params,
    ) -> pl.DataFrame:
        """Generate aggregated point predictions.

        Parameters
        ----------
        forecasting_horizon : int or None, default=None
            Number of steps ahead. If ``None``, uses value from ``fit``.
        groups : list of str or None, default=None
            Panel group prefixes to predict.
        predict_transformed : bool, default=False
            If ``True``, return predictions in transformed space.
        X_future : pl.DataFrame or None, default=None
            Known future features override.
        X_forecast : pl.DataFrame or None, default=None
            External forecasts override.
        **params : dict
            Metadata routing parameters.

        Returns
        -------
        pl.DataFrame
            Aggregated predictions with ``"vintage_time"``, ``"time"``,
            and target columns.

        """
        check_is_fitted(self, ["forecasters_"])
        _raise_for_params(params, self, "predict")
        routed_params = process_routing(self, "predict", **params)

        predictions = []
        for name, forecaster in self.forecasters_:
            forecaster_params = getattr(routed_params.get(name, Bunch(predict={})), "predict", {})
            y_pred = forecaster.predict(  # ty: ignore[unresolved-attribute]
                forecasting_horizon=forecasting_horizon,
                groups=groups,
                predict_transformed=predict_transformed,
                X_future=X_future,
                X_forecast=X_forecast,
                **forecaster_params,
            )
            predictions.append(y_pred)

        target_cols = [c for c in predictions[0].columns if c not in ("vintage_time", "time")]
        return self._aggregate_values(predictions, target_cols, self.method, self.weights_)

    def observe_predict(
        self,
        y: pl.DataFrame,
        X_actual: pl.DataFrame | None = None,
        forecasting_horizon: StrictInt | None = None,
        groups: list[str] | None = None,
        stride: StrictInt | None = None,
        predict_transformed: bool = False,
        X_future: pl.DataFrame | None = None,
        X_forecast: pl.DataFrame | None = None,
        **params,
    ) -> pl.DataFrame:
        """Alternate recursive observe and predict on each child, then aggregate.

        Delegates the rolling observe-predict loop to each base forecaster
        and aggregates the resulting predictions.

        Parameters
        ----------
        y : pl.DataFrame
            New target observations.
        X_actual : pl.DataFrame or None, default=None
            Actual feature observations with a ``"time"`` column aligned
            with ``y``. Sliced and observed incrementally at each step
            of the rolling loop.
        forecasting_horizon : int or None, default=None
            Number of steps ahead.
        groups : list of str or None, default=None
            Panel group prefixes.
        stride : int or None, default=None
            Step size for rolling update-predict.
        predict_transformed : bool, default=False
            If ``True``, return predictions in transformed space.
        X_future : pl.DataFrame or None, default=None
            Known future features.
        X_forecast : pl.DataFrame or None, default=None
            External forecasts.
        **params : dict
            Metadata routing parameters.

        Returns
        -------
        pl.DataFrame
            Aggregated predictions after observing new data.

        """
        check_is_fitted(self, ["forecasters_"])
        _raise_for_params(params, self, "predict")
        routed_params = process_routing(self, "predict", **params)

        predictions = []
        for name, forecaster in self.forecasters_:
            forecaster_params = getattr(routed_params.get(name, Bunch(predict={})), "predict", {})
            y_pred = forecaster.observe_predict(  # ty: ignore[unresolved-attribute]
                y=y,
                X_actual=X_actual,
                forecasting_horizon=forecasting_horizon,
                groups=groups,
                stride=stride,
                predict_transformed=predict_transformed,
                X_future=X_future,
                X_forecast=X_forecast,
                **forecaster_params,
            )
            predictions.append(y_pred)

        target_cols = [c for c in predictions[0].columns if c not in ("vintage_time", "time")]
        return self._aggregate_values(predictions, target_cols, self.method, self.weights_)

    def get_metadata_routing(self) -> MetadataRouter:
        """Get metadata routing configuration.

        Returns
        -------
        MetadataRouter
            Router with mappings for all base forecasters.

        """
        router = MetadataRouter(owner=self.__class__.__name__)

        for name, forecaster in self.forecasters:
            router.add(
                **{name: forecaster},
                method_mapping=MethodMapping().add(caller="fit", callee="fit").add(caller="predict", callee="predict"),
            )

        return router

Methods

__sklearn_tags__()

Get estimator tags.

Returns
Type Description
Tags

Estimator tags with yohou-specific attributes.

Source Code
Show/Hide source
def __sklearn_tags__(self) -> Tags:
    """Get estimator tags.

    Returns
    -------
    Tags
        Estimator tags with yohou-specific attributes.

    """
    tags = super().__sklearn_tags__()
    assert tags.forecaster_tags is not None

    tags.forecaster_tags.forecaster_type = POINT
    tags.forecaster_tags.tracks_observations = False
    tags.forecaster_tags.supports_panel_data = True

    forecasters_to_check = (
        [f for _, f in self.forecasters_] if hasattr(self, "forecasters_") else [f for _, f in self.forecasters]
    )

    if forecasters_to_check:
        tags.forecaster_tags.stateful = any(
            getattr(f.__sklearn_tags__().forecaster_tags, "stateful", False) for f in forecasters_to_check
        )

    return tags

fit(y, X_actual=None, forecasting_horizon=1, X_future=None, X_forecast=None, **params)

Fit all base forecasters on the same data.

Parameters
Name Type Description Default
y DataFrame

Target time series with "time" column.

required
X_actual DataFrame or None

Actual feature observations with a "time" column aligned with y. Forwarded to each child forecaster.

None
forecasting_horizon int

Number of steps ahead to forecast.

1
X_future DataFrame or None

Known future features with "time" column.

None
X_forecast DataFrame or None

External forecasts with "vintage_time" and "time" columns.

None
**params dict

Metadata routing parameters forwarded to base forecasters.

{}
Returns
Type Description
self

Fitted ensemble.

Raises
Type Description
ValueError

If weights length does not match the number of forecasters, or if fitted forecasters have mismatched target column schemas.

RuntimeError

If all base forecasters fail during fitting.

Source Code
Show/Hide source
@_fit_context(prefer_skip_nested_validation=True)
def fit(
    self,
    y: pl.DataFrame,
    X_actual: pl.DataFrame | None = None,
    forecasting_horizon: StrictInt = 1,
    X_future: pl.DataFrame | None = None,
    X_forecast: pl.DataFrame | None = None,
    **params,
) -> VotingPointForecaster:
    """Fit all base forecasters on the same data.

    Parameters
    ----------
    y : pl.DataFrame
        Target time series with ``"time"`` column.
    X_actual : pl.DataFrame or None, default=None
        Actual feature observations with a ``"time"`` column aligned
        with ``y``. Forwarded to each child forecaster.
    forecasting_horizon : int, default=1
        Number of steps ahead to forecast.
    X_future : pl.DataFrame or None, default=None
        Known future features with ``"time"`` column.
    X_forecast : pl.DataFrame or None, default=None
        External forecasts with ``"vintage_time"`` and ``"time"`` columns.
    **params : dict
        Metadata routing parameters forwarded to base forecasters.

    Returns
    -------
    self
        Fitted ensemble.

    Raises
    ------
    ValueError
        If ``weights`` length does not match the number of forecasters,
        or if fitted forecasters have mismatched target column schemas.
    RuntimeError
        If all base forecasters fail during fitting.

    """
    _raise_for_params(params, self, "fit")
    routed_params = process_routing(self, "fit", **params)

    if forecasting_horizon < 1:
        raise ValueError(f"forecasting_horizon must be >= 1, got {forecasting_horizon}")

    self._validate_forecasters_list()

    if self.weights is not None and len(self.weights) != len(self.forecasters):
        raise ValueError(
            f"Number of weights ({len(self.weights)}) must match number of forecasters ({len(self.forecasters)})"
        )

    self.forecasters_ = self._fit_forecasters_parallel(
        y=y,
        X_actual=X_actual,
        forecasting_horizon=forecasting_horizon,
        routed_params=routed_params,
        n_jobs=self.n_jobs,
        X_future=X_future,
        X_forecast=X_forecast,
    )

    self._validate_schemas_match()
    self._derive_fitted_attributes(self.forecasters_[0][1], forecasting_horizon, y, X_actual)
    self._compute_effective_weights()

    return self

predict(forecasting_horizon=None, groups=None, predict_transformed=False, X_future=None, X_forecast=None, **params)

Generate aggregated point predictions.

Parameters
Name Type Description Default
forecasting_horizon int or None

Number of steps ahead. If None, uses value from fit.

None
groups list of str or None

Panel group prefixes to predict.

None
predict_transformed bool

If True, return predictions in transformed space.

False
X_future DataFrame or None

Known future features override.

None
X_forecast DataFrame or None

External forecasts override.

None
**params dict

Metadata routing parameters.

{}
Returns
Type Description
DataFrame

Aggregated predictions with "vintage_time", "time", and target columns.

Source Code
Show/Hide source
def predict(  # ty: ignore[invalid-method-override]
    self,
    forecasting_horizon: StrictInt | None = None,
    groups: list[str] | None = None,
    predict_transformed: bool = False,
    X_future: pl.DataFrame | None = None,
    X_forecast: pl.DataFrame | None = None,
    **params,
) -> pl.DataFrame:
    """Generate aggregated point predictions.

    Parameters
    ----------
    forecasting_horizon : int or None, default=None
        Number of steps ahead. If ``None``, uses value from ``fit``.
    groups : list of str or None, default=None
        Panel group prefixes to predict.
    predict_transformed : bool, default=False
        If ``True``, return predictions in transformed space.
    X_future : pl.DataFrame or None, default=None
        Known future features override.
    X_forecast : pl.DataFrame or None, default=None
        External forecasts override.
    **params : dict
        Metadata routing parameters.

    Returns
    -------
    pl.DataFrame
        Aggregated predictions with ``"vintage_time"``, ``"time"``,
        and target columns.

    """
    check_is_fitted(self, ["forecasters_"])
    _raise_for_params(params, self, "predict")
    routed_params = process_routing(self, "predict", **params)

    predictions = []
    for name, forecaster in self.forecasters_:
        forecaster_params = getattr(routed_params.get(name, Bunch(predict={})), "predict", {})
        y_pred = forecaster.predict(  # ty: ignore[unresolved-attribute]
            forecasting_horizon=forecasting_horizon,
            groups=groups,
            predict_transformed=predict_transformed,
            X_future=X_future,
            X_forecast=X_forecast,
            **forecaster_params,
        )
        predictions.append(y_pred)

    target_cols = [c for c in predictions[0].columns if c not in ("vintage_time", "time")]
    return self._aggregate_values(predictions, target_cols, self.method, self.weights_)

observe_predict(y, X_actual=None, forecasting_horizon=None, groups=None, stride=None, predict_transformed=False, X_future=None, X_forecast=None, **params)

Alternate recursive observe and predict on each child, then aggregate.

Delegates the rolling observe-predict loop to each base forecaster and aggregates the resulting predictions.

Parameters
Name Type Description Default
y DataFrame

New target observations.

required
X_actual DataFrame or None

Actual feature observations with a "time" column aligned with y. Sliced and observed incrementally at each step of the rolling loop.

None
forecasting_horizon int or None

Number of steps ahead.

None
groups list of str or None

Panel group prefixes.

None
stride int or None

Step size for rolling update-predict.

None
predict_transformed bool

If True, return predictions in transformed space.

False
X_future DataFrame or None

Known future features.

None
X_forecast DataFrame or None

External forecasts.

None
**params dict

Metadata routing parameters.

{}
Returns
Type Description
DataFrame

Aggregated predictions after observing new data.

Source Code
Show/Hide source
def observe_predict(
    self,
    y: pl.DataFrame,
    X_actual: pl.DataFrame | None = None,
    forecasting_horizon: StrictInt | None = None,
    groups: list[str] | None = None,
    stride: StrictInt | None = None,
    predict_transformed: bool = False,
    X_future: pl.DataFrame | None = None,
    X_forecast: pl.DataFrame | None = None,
    **params,
) -> pl.DataFrame:
    """Alternate recursive observe and predict on each child, then aggregate.

    Delegates the rolling observe-predict loop to each base forecaster
    and aggregates the resulting predictions.

    Parameters
    ----------
    y : pl.DataFrame
        New target observations.
    X_actual : pl.DataFrame or None, default=None
        Actual feature observations with a ``"time"`` column aligned
        with ``y``. Sliced and observed incrementally at each step
        of the rolling loop.
    forecasting_horizon : int or None, default=None
        Number of steps ahead.
    groups : list of str or None, default=None
        Panel group prefixes.
    stride : int or None, default=None
        Step size for rolling update-predict.
    predict_transformed : bool, default=False
        If ``True``, return predictions in transformed space.
    X_future : pl.DataFrame or None, default=None
        Known future features.
    X_forecast : pl.DataFrame or None, default=None
        External forecasts.
    **params : dict
        Metadata routing parameters.

    Returns
    -------
    pl.DataFrame
        Aggregated predictions after observing new data.

    """
    check_is_fitted(self, ["forecasters_"])
    _raise_for_params(params, self, "predict")
    routed_params = process_routing(self, "predict", **params)

    predictions = []
    for name, forecaster in self.forecasters_:
        forecaster_params = getattr(routed_params.get(name, Bunch(predict={})), "predict", {})
        y_pred = forecaster.observe_predict(  # ty: ignore[unresolved-attribute]
            y=y,
            X_actual=X_actual,
            forecasting_horizon=forecasting_horizon,
            groups=groups,
            stride=stride,
            predict_transformed=predict_transformed,
            X_future=X_future,
            X_forecast=X_forecast,
            **forecaster_params,
        )
        predictions.append(y_pred)

    target_cols = [c for c in predictions[0].columns if c not in ("vintage_time", "time")]
    return self._aggregate_values(predictions, target_cols, self.method, self.weights_)

get_metadata_routing()

Get metadata routing configuration.

Returns
Type Description
MetadataRouter

Router with mappings for all base forecasters.

Source Code
Show/Hide source
def get_metadata_routing(self) -> MetadataRouter:
    """Get metadata routing configuration.

    Returns
    -------
    MetadataRouter
        Router with mappings for all base forecasters.

    """
    router = MetadataRouter(owner=self.__class__.__name__)

    for name, forecaster in self.forecasters:
        router.add(
            **{name: forecaster},
            method_mapping=MethodMapping().add(caller="fit", callee="fit").add(caller="predict", callee="predict"),
        )

    return router

Tutorials

The following example notebooks use this component:

  • How to Combine Forecasters with VotingPointForecaster


    Forecasting-Models

    Build point ensembles with VotingPointForecaster using mean, weighted, and median aggregation strategies.

    View · Open in marimo