Skip to content

BasePointScorer

yohou.metrics.base.BasePointScorer

Bases: BaseScorer

Base class for point forecast metrics.

Point forecasters produce single-value predictions. Metrics derived from this class evaluate prediction accuracy (e.g., MeanAbsoluteError, RootMeanSquaredError, MAPE).

.. note:: The _response_method attribute indicates which forecaster method produces the predictions that this scorer expects.

Parameters

Name Type Description Default
aggregation_method list of str or str

Dimensions to aggregate over. Options: - "stepwise": Aggregate across forecasting steps. - "vintagewise": Aggregate across vintages (observed times). - "componentwise": Aggregate across components, return per-timestep DataFrame - "groupwise": Aggregate across panel groups (panel data only) - "all": Aggregate across all dimensions (returns scalar). Same as ["stepwise", "vintagewise", "componentwise", "groupwise"].

"all"
groups list of str, dict of str to float, or None

Panel group filter (list) or filter with weights (dict). If None, all panel groups are included with equal weight.

None
components list of str, dict of str to float, or None

Component filter (list) or filter with weights (dict). If None, all components are included with equal weight.

None

See Also

Source Code

Show/Hide source
class BasePointScorer(BaseScorer, metaclass=abc.ABCMeta):
    """Base class for point forecast metrics.

    Point forecasters produce single-value predictions. Metrics derived from this
    class evaluate prediction accuracy (e.g., MeanAbsoluteError, RootMeanSquaredError, MAPE).

    .. note:: The ``_response_method`` attribute indicates which forecaster
       method produces the predictions that this scorer expects.

    Parameters
    ----------
    aggregation_method : list of str or str, default="all"
        Dimensions to aggregate over. Options:
        - "stepwise": Aggregate across forecasting steps.
        - "vintagewise": Aggregate across vintages (observed times).
        - "componentwise": Aggregate across components, return per-timestep DataFrame
        - "groupwise": Aggregate across panel groups (panel data only)
        - "all": Aggregate across all dimensions (returns scalar). Same as
          ["stepwise", "vintagewise", "componentwise", "groupwise"].
    groups : list of str, dict of str to float, or None, default=None
        Panel group filter (list) or filter with weights (dict). If None,
        all panel groups are included with equal weight.
    components : list of str, dict of str to float, or None, default=None
        Component filter (list) or filter with weights (dict). If None,
        all components are included with equal weight.

    See Also
    --------
    - [`MeanAbsoluteError`][yohou.metrics.point.MeanAbsoluteError] : Concrete point scorer implementation.
    - [`MeanSquaredError`][yohou.metrics.point.MeanSquaredError] : Concrete point scorer implementation.
    - [`BasePointForecaster`][yohou.point.base.BasePointForecaster] : Produces point forecasts.

    """

    _response_method: str = "predict"

    _parameter_constraints: dict = {
        **BaseScorer._parameter_constraints,
        "aggregation_method": [
            list,
            StrOptions({"all", "stepwise", "vintagewise", "componentwise", "groupwise"}),
        ],
    }

    def __init__(
        self,
        aggregation_method: list[str] | str = "all",
        groups: list[str] | dict[str, float] | None = None,
        components: list[str] | dict[str, float] | None = None,
    ):
        super().__init__(
            groups=groups,
            components=components,
        )
        self.aggregation_method = aggregation_method

    @_fit_context(prefer_skip_nested_validation=True)
    def fit(self, y_train: pl.DataFrame, *, forecaster=None, **params) -> BasePointScorer:
        """Fit the scorer on training data.

        Validates ``aggregation_method``, ``groups``, and
        ``component_names``.

        Parameters
        ----------
        y_train : pl.DataFrame
            Training target time series with a ``"time"`` column and one or
            more numeric value columns.
        forecaster : BaseForecaster or None, default=None
            If provided, metadata is extracted directly from the fitted
            forecaster instead of being re-inferred from ``y_train``.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        self
            The fitted scorer instance.

        Raises
        ------
        ValueError
            If ``aggregation_method`` contains invalid values, or if
            ``groups`` / ``component_names`` are not found in
            ``y_train``.

        """
        # Validate point-specific parameters (aggregation_method)
        valid_methods = {"stepwise", "vintagewise", "componentwise", "groupwise"}
        self._validate_parameters(
            y_train=y_train,
            aggregation_method=self.aggregation_method,
            valid_aggregation_methods=valid_methods,
        )

        return super().fit(y_train, forecaster=forecaster, **params)

    @abc.abstractmethod
    def _compute_raw_errors(self, y_truth: pl.DataFrame, y_pred: pl.DataFrame) -> pl.DataFrame:
        """Compute per-timestep per-component raw errors.

        Subclasses implement only this method.  Access fitted attributes
        (e.g. ``self.scales_``, ``self.naive_errors_``) via ``self``.

        Parameters
        ----------
        y_truth : pl.DataFrame
            Ground truth values (time column already removed).
        y_pred : pl.DataFrame
            Predicted values (time column already removed).

        Returns
        -------
        pl.DataFrame
            Raw error values, same shape as inputs.

        """

    def score(
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
        /,
        time_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
        step_weight: Callable | pl.DataFrame | dict[int | str, float] | None = None,
        vintage_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
        **params,
    ) -> float | pl.DataFrame:
        """Compute the point metric score.

        Template method: validate -> pre-filter zeros -> compute raw errors
        -> apply weights -> aggregate -> post-aggregate transform -> rename.

        Parameters
        ----------
        y_truth : pl.DataFrame
            True values with ``"time"`` column.
        y_pred : pl.DataFrame
            Predicted values with ``"time"`` column.
        time_weight : callable, pl.DataFrame, dict, or None, default=None
            Time-based evaluation weights. Accepts a callable
            ``f(time_series) -> pl.Series``, a panel-aware callable
            ``f(time_series, group_name) -> pl.Series``, a DataFrame
            with ``"time"`` and ``"weight"`` columns, or a
            ``{datetime_or_str: float}`` dict (``"*"`` key sets default).
        step_weight : callable, pl.DataFrame, dict, or None, default=None
            Per-step weights. Same formats as ``time_weight`` but keyed on
            ``"forecasting_step"``. Use ``{"*": 0.0, 1: 1.0}`` to score
            only step 1.
        vintage_weight : callable, pl.DataFrame, dict, or None, default=None
            Per-vintage weights. Same formats as ``time_weight`` but keyed
            on ``"vintage_time"``.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        float or pl.DataFrame
            Aggregated metric score.

        """
        check_is_fitted(self, ["_is_fitted"])

        y_truth, y_pred, context = validate_scorer_data(
            self,
            y_truth,
            y_pred,
        )

        # 0. Resolve weights and pre-filter zero-weight rows
        y_truth, y_pred, context, tw, sw, _ = self._pre_filter_zero_weights(
            y_truth,
            y_pred,
            context,
            time_weight,
            step_weight,
            vintage_weight,
        )

        # 1. Compute raw per-timestep per-component errors
        scores = self._compute_raw_errors(y_truth, y_pred)

        # 2. Apply weights (time first, then step)
        scores = self._apply_weights(scores, tw, sw)

        # 3. Aggregate (includes transform + rename via _aggregate_per_vintage_scores)
        return self._aggregate_scores(scores, context=context)

    def __sklearn_tags__(self) -> Tags:
        """Get estimator tags.

        Returns
        -------
        Tags
            Estimator tags with scorer-specific attributes.

        """
        tags = super().__sklearn_tags__()
        assert tags.scorer_tags is not None
        tags.scorer_tags.prediction_type = "point"
        return tags

Methods

fit(y_train, *, forecaster=None, **params)

Fit the scorer on training data.

Validates aggregation_method, groups, and component_names.

Parameters
Name Type Description Default
y_train DataFrame

Training target time series with a "time" column and one or more numeric value columns.

required
forecaster BaseForecaster or None

If provided, metadata is extracted directly from the fitted forecaster instead of being re-inferred from y_train.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
self

The fitted scorer instance.

Raises
Type Description
ValueError

If aggregation_method contains invalid values, or if groups / component_names are not found in y_train.

Source Code
Show/Hide source
@_fit_context(prefer_skip_nested_validation=True)
def fit(self, y_train: pl.DataFrame, *, forecaster=None, **params) -> BasePointScorer:
    """Fit the scorer on training data.

    Validates ``aggregation_method``, ``groups``, and
    ``component_names``.

    Parameters
    ----------
    y_train : pl.DataFrame
        Training target time series with a ``"time"`` column and one or
        more numeric value columns.
    forecaster : BaseForecaster or None, default=None
        If provided, metadata is extracted directly from the fitted
        forecaster instead of being re-inferred from ``y_train``.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    self
        The fitted scorer instance.

    Raises
    ------
    ValueError
        If ``aggregation_method`` contains invalid values, or if
        ``groups`` / ``component_names`` are not found in
        ``y_train``.

    """
    # Validate point-specific parameters (aggregation_method)
    valid_methods = {"stepwise", "vintagewise", "componentwise", "groupwise"}
    self._validate_parameters(
        y_train=y_train,
        aggregation_method=self.aggregation_method,
        valid_aggregation_methods=valid_methods,
    )

    return super().fit(y_train, forecaster=forecaster, **params)

score(y_truth, y_pred, /, time_weight=None, step_weight=None, vintage_weight=None, **params)

Compute the point metric score.

Template method: validate -> pre-filter zeros -> compute raw errors -> apply weights -> aggregate -> post-aggregate transform -> rename.

Parameters
Name Type Description Default
y_truth DataFrame

True values with "time" column.

required
y_pred DataFrame

Predicted values with "time" column.

required
time_weight callable, pl.DataFrame, dict, or None

Time-based evaluation weights. Accepts a callable f(time_series) -> pl.Series, a panel-aware callable f(time_series, group_name) -> pl.Series, a DataFrame with "time" and "weight" columns, or a {datetime_or_str: float} dict ("*" key sets default).

None
step_weight callable, pl.DataFrame, dict, or None

Per-step weights. Same formats as time_weight but keyed on "forecasting_step". Use {"*": 0.0, 1: 1.0} to score only step 1.

None
vintage_weight callable, pl.DataFrame, dict, or None

Per-vintage weights. Same formats as time_weight but keyed on "vintage_time".

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
float or DataFrame

Aggregated metric score.

Source Code
Show/Hide source
def score(
    self,
    y_truth: pl.DataFrame,
    y_pred: pl.DataFrame,
    /,
    time_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
    step_weight: Callable | pl.DataFrame | dict[int | str, float] | None = None,
    vintage_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
    **params,
) -> float | pl.DataFrame:
    """Compute the point metric score.

    Template method: validate -> pre-filter zeros -> compute raw errors
    -> apply weights -> aggregate -> post-aggregate transform -> rename.

    Parameters
    ----------
    y_truth : pl.DataFrame
        True values with ``"time"`` column.
    y_pred : pl.DataFrame
        Predicted values with ``"time"`` column.
    time_weight : callable, pl.DataFrame, dict, or None, default=None
        Time-based evaluation weights. Accepts a callable
        ``f(time_series) -> pl.Series``, a panel-aware callable
        ``f(time_series, group_name) -> pl.Series``, a DataFrame
        with ``"time"`` and ``"weight"`` columns, or a
        ``{datetime_or_str: float}`` dict (``"*"`` key sets default).
    step_weight : callable, pl.DataFrame, dict, or None, default=None
        Per-step weights. Same formats as ``time_weight`` but keyed on
        ``"forecasting_step"``. Use ``{"*": 0.0, 1: 1.0}`` to score
        only step 1.
    vintage_weight : callable, pl.DataFrame, dict, or None, default=None
        Per-vintage weights. Same formats as ``time_weight`` but keyed
        on ``"vintage_time"``.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    float or pl.DataFrame
        Aggregated metric score.

    """
    check_is_fitted(self, ["_is_fitted"])

    y_truth, y_pred, context = validate_scorer_data(
        self,
        y_truth,
        y_pred,
    )

    # 0. Resolve weights and pre-filter zero-weight rows
    y_truth, y_pred, context, tw, sw, _ = self._pre_filter_zero_weights(
        y_truth,
        y_pred,
        context,
        time_weight,
        step_weight,
        vintage_weight,
    )

    # 1. Compute raw per-timestep per-component errors
    scores = self._compute_raw_errors(y_truth, y_pred)

    # 2. Apply weights (time first, then step)
    scores = self._apply_weights(scores, tw, sw)

    # 3. Aggregate (includes transform + rename via _aggregate_per_vintage_scores)
    return self._aggregate_scores(scores, context=context)

__sklearn_tags__()

Get estimator tags.

Returns
Type Description
Tags

Estimator tags with scorer-specific attributes.

Source Code
Show/Hide source
def __sklearn_tags__(self) -> Tags:
    """Get estimator tags.

    Returns
    -------
    Tags
        Estimator tags with scorer-specific attributes.

    """
    tags = super().__sklearn_tags__()
    assert tags.scorer_tags is not None
    tags.scorer_tags.prediction_type = "point"
    return tags

Tutorials

The following example notebooks use this component:

  • How to Create a Custom Scorer


    Evaluation-Search

    Implement a custom point scorer with aggregation, panel support, and systematic testing.

    View · Open in marimo