Skip to content

BaseClassProbaScorer

yohou.metrics.base.BaseClassProbaScorer

Bases: BaseScorer

Base class for class-probability forecast metrics.

Class-probability forecasters produce per-class probability distributions. Metrics derived from this class evaluate the quality of predicted probability distributions against true class labels.

.. note:: The _response_method attribute indicates which forecaster method produces the predictions that this scorer expects.

Parameters

Name Type Description Default
aggregation_method list of str or str

Dimensions to aggregate over. Options: - "stepwise": Aggregate across forecasting steps. - "vintagewise": Aggregate across vintages (observed times). - "componentwise": Aggregate across components, return per-timestep DataFrame - "groupwise": Aggregate across panel groups (panel data only) - "all": Aggregate across all dimensions (returns scalar). Same as ["stepwise", "vintagewise", "componentwise", "groupwise"].

"all"
groups list of str, dict of str to float, or None

Panel group filter (list) or filter with weights (dict). If None, all panel groups are included with equal weight.

None
components list of str, dict of str to float, or None

Component filter (list) or filter with weights (dict). If None, all components are included with equal weight.

None

See Also

Source Code

Show/Hide source
class BaseClassProbaScorer(BaseScorer, metaclass=abc.ABCMeta):
    """Base class for class-probability forecast metrics.

    Class-probability forecasters produce per-class probability distributions.
    Metrics derived from this class evaluate the quality of predicted
    probability distributions against true class labels.

    .. note:: The ``_response_method`` attribute indicates which forecaster
       method produces the predictions that this scorer expects.

    Parameters
    ----------
    aggregation_method : list of str or str, default="all"
        Dimensions to aggregate over. Options:
        - "stepwise": Aggregate across forecasting steps.
        - "vintagewise": Aggregate across vintages (observed times).
        - "componentwise": Aggregate across components, return per-timestep DataFrame
        - "groupwise": Aggregate across panel groups (panel data only)
        - "all": Aggregate across all dimensions (returns scalar). Same as
          ["stepwise", "vintagewise", "componentwise", "groupwise"].
    groups : list of str, dict of str to float, or None, default=None
        Panel group filter (list) or filter with weights (dict). If None,
        all panel groups are included with equal weight.
    components : list of str, dict of str to float, or None, default=None
        Component filter (list) or filter with weights (dict). If None,
        all components are included with equal weight.

    See Also
    --------
    - [`LogLoss`][yohou.metrics.class_proba.LogLoss] : Logarithmic loss scorer.
    - [`BrierScore`][yohou.metrics.class_proba.BrierScore] : Brier score for multi-class probabilities.
    - [`Accuracy`][yohou.metrics.classification.Accuracy] : Accuracy from argmax of predicted probabilities.
    - [`BaseClassProbaForecaster`][yohou.class_proba.base.BaseClassProbaForecaster] : Produces class-probability forecasts.

    """

    _response_method: str = "predict_class_proba"

    _parameter_constraints: dict = {
        **BaseScorer._parameter_constraints,
        "aggregation_method": [
            list,
            StrOptions({"all", "stepwise", "vintagewise", "componentwise", "groupwise"}),
        ],
    }

    def __init__(
        self,
        aggregation_method: list[str] | str = "all",
        groups: list[str] | dict[str, float] | None = None,
        components: list[str] | dict[str, float] | None = None,
    ):
        super().__init__(
            groups=groups,
            components=components,
        )
        self.aggregation_method = aggregation_method

    @_fit_context(prefer_skip_nested_validation=True)
    def fit(self, y_train: pl.DataFrame, *, forecaster=None, **params) -> BaseClassProbaScorer:
        """Fit the scorer on training data.

        Validates ``aggregation_method``, ``groups``, and
        ``component_names``.

        Parameters
        ----------
        y_train : pl.DataFrame
            Training target time series with a ``"time"`` column and one or
            more categorical value columns.
        forecaster : BaseForecaster or None, default=None
            If provided, metadata is extracted directly from the fitted
            forecaster instead of being re-inferred from ``y_train``.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        self
            The fitted scorer instance.

        Raises
        ------
        ValueError
            If ``aggregation_method`` contains invalid values, or if
            ``groups`` / ``component_names`` are not found in
            ``y_train``.

        """
        valid_methods = {"stepwise", "vintagewise", "componentwise", "groupwise"}
        self._validate_parameters(
            y_train=y_train,
            aggregation_method=self.aggregation_method,
            valid_aggregation_methods=valid_methods,
        )
        return super().fit(y_train, forecaster=forecaster, **params)

    def __sklearn_tags__(self) -> Tags:
        """Get estimator tags.

        Returns
        -------
        Tags
            Estimator tags with scorer-specific attributes.

        """
        tags = super().__sklearn_tags__()
        assert tags.scorer_tags is not None
        tags.scorer_tags.prediction_type = "class_proba"
        return tags

    @staticmethod
    def _extract_class_proba_columns(y_pred: pl.DataFrame, target_col: str) -> tuple[list[str], list[str]]:
        """Extract probability columns and class labels for a target.

        Parameters
        ----------
        y_pred : pl.DataFrame
            Probability predictions with columns ``{target}_proba_{class}``.
        target_col : str
            Target column name.

        Returns
        -------
        tuple of (list of str, list of str)
            Probability column names and corresponding class labels.

        """
        proba_cols = [c for c in y_pred.columns if c.startswith(f"{target_col}_proba_")]
        class_labels = [c.split("_proba_", 1)[1] for c in proba_cols]
        return proba_cols, class_labels

    @staticmethod
    def _extract_target_columns(y_truth: pl.DataFrame) -> list[str]:
        """Extract target column names from truth DataFrame.

        Parameters
        ----------
        y_truth : pl.DataFrame
            Ground truth (time columns already removed).

        Returns
        -------
        list of str
            Target column names.

        """
        return y_truth.columns

    def _validate_probabilities(
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
    ) -> None:
        """Validate that probability columns contain valid values.

        Checks that all probability columns are finite and in [0, 1].

        Raises
        ------
        ValueError
            If any probability column contains NaN, infinite, or
            out-of-range values.

        """
        target_cols = self._extract_target_columns(y_truth)
        for target_col in target_cols:
            proba_cols, _ = self._extract_class_proba_columns(y_pred, target_col)
            if not proba_cols:
                continue
            proba_data = y_pred.select(proba_cols)
            arr = proba_data.to_numpy()
            if not np.all(np.isfinite(arr)):
                bad_cols = [c for c in proba_cols if not np.all(np.isfinite(y_pred[c].to_numpy()))]
                raise ValueError(
                    f"Probability columns contain NaN or infinite values: {bad_cols}. All probabilities must be finite."
                )
            if np.any(arr < 0) or np.any(arr > 1):
                bad_cols = [
                    c for c in proba_cols if np.any(y_pred[c].to_numpy() < 0) or np.any(y_pred[c].to_numpy() > 1)
                ]
                raise ValueError(
                    f"Probability columns contain values outside [0, 1]: {bad_cols}. "
                    "All probabilities must be between 0 and 1."
                )

    @abc.abstractmethod
    def _compute_raw_errors(
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
    ) -> pl.DataFrame:
        """Compute per-timestep per-component raw scores.

        Subclasses implement only this method.  Access fitted attributes
        and helper methods (e.g. ``_extract_class_proba_columns``,
        ``_extract_target_columns``) via ``self``.

        Parameters
        ----------
        y_truth : pl.DataFrame
            Ground truth values (time column already removed).
        y_pred : pl.DataFrame
            Predicted probabilities (time column already removed).

        Returns
        -------
        pl.DataFrame
            Raw error values with one column per target component.

        """

    def score(
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
        /,
        time_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
        step_weight: Callable | pl.DataFrame | dict[int | str, float] | None = None,
        vintage_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
        **params,
    ) -> float | pl.DataFrame:
        """Compute the class-probability metric score.

        Template method: validate -> pre-filter zeros -> compute raw errors
        -> apply weights -> aggregate -> rename.

        Parameters
        ----------
        y_truth : pl.DataFrame
            True class labels with ``"time"`` column.
        y_pred : pl.DataFrame
            Predicted probabilities with ``"time"`` column.
        time_weight : callable, pl.DataFrame, dict, or None, default=None
            Time-based evaluation weights. Accepts a callable
            ``f(time_series) -> pl.Series``, a panel-aware callable
            ``f(time_series, group_name) -> pl.Series``, a DataFrame
            with ``"time"`` and ``"weight"`` columns, or a
            ``{datetime_or_str: float}`` dict (``"*"`` key sets default).
        step_weight : callable, pl.DataFrame, dict, or None, default=None
            Per-step weights. Same formats as ``time_weight`` but keyed on
            ``"forecasting_step"``.
        vintage_weight : callable, pl.DataFrame, dict, or None, default=None
            Per-vintage weights. Same formats as ``time_weight`` but keyed
            on ``"vintage_time"``.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        float or pl.DataFrame
            Aggregated metric score.

        """
        check_is_fitted(self, ["_is_fitted"])

        y_truth, y_pred, context = validate_scorer_data(self, y_truth, y_pred)

        # 0. Resolve weights and pre-filter zero-weight rows
        y_truth, y_pred, context, tw, sw, _ = self._pre_filter_zero_weights(
            y_truth,
            y_pred,
            context,
            time_weight,
            step_weight,
            vintage_weight,
        )

        # 0b. Validate probability columns are finite and in [0, 1]
        self._validate_probabilities(y_truth, y_pred)

        # 1. Compute raw per-timestep per-component errors
        scores = self._compute_raw_errors(y_truth, y_pred)

        # 2. Apply weights (time first, then step)
        scores = self._apply_weights(scores, tw, sw)

        # 3. Aggregate (includes transform + rename via _aggregate_per_vintage_scores)
        return self._aggregate_scores(scores, context=context)

Methods

fit(y_train, *, forecaster=None, **params)

Fit the scorer on training data.

Validates aggregation_method, groups, and component_names.

Parameters
Name Type Description Default
y_train DataFrame

Training target time series with a "time" column and one or more categorical value columns.

required
forecaster BaseForecaster or None

If provided, metadata is extracted directly from the fitted forecaster instead of being re-inferred from y_train.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
self

The fitted scorer instance.

Raises
Type Description
ValueError

If aggregation_method contains invalid values, or if groups / component_names are not found in y_train.

Source Code
Show/Hide source
@_fit_context(prefer_skip_nested_validation=True)
def fit(self, y_train: pl.DataFrame, *, forecaster=None, **params) -> BaseClassProbaScorer:
    """Fit the scorer on training data.

    Validates ``aggregation_method``, ``groups``, and
    ``component_names``.

    Parameters
    ----------
    y_train : pl.DataFrame
        Training target time series with a ``"time"`` column and one or
        more categorical value columns.
    forecaster : BaseForecaster or None, default=None
        If provided, metadata is extracted directly from the fitted
        forecaster instead of being re-inferred from ``y_train``.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    self
        The fitted scorer instance.

    Raises
    ------
    ValueError
        If ``aggregation_method`` contains invalid values, or if
        ``groups`` / ``component_names`` are not found in
        ``y_train``.

    """
    valid_methods = {"stepwise", "vintagewise", "componentwise", "groupwise"}
    self._validate_parameters(
        y_train=y_train,
        aggregation_method=self.aggregation_method,
        valid_aggregation_methods=valid_methods,
    )
    return super().fit(y_train, forecaster=forecaster, **params)

__sklearn_tags__()

Get estimator tags.

Returns
Type Description
Tags

Estimator tags with scorer-specific attributes.

Source Code
Show/Hide source
def __sklearn_tags__(self) -> Tags:
    """Get estimator tags.

    Returns
    -------
    Tags
        Estimator tags with scorer-specific attributes.

    """
    tags = super().__sklearn_tags__()
    assert tags.scorer_tags is not None
    tags.scorer_tags.prediction_type = "class_proba"
    return tags

score(y_truth, y_pred, /, time_weight=None, step_weight=None, vintage_weight=None, **params)

Compute the class-probability metric score.

Template method: validate -> pre-filter zeros -> compute raw errors -> apply weights -> aggregate -> rename.

Parameters
Name Type Description Default
y_truth DataFrame

True class labels with "time" column.

required
y_pred DataFrame

Predicted probabilities with "time" column.

required
time_weight callable, pl.DataFrame, dict, or None

Time-based evaluation weights. Accepts a callable f(time_series) -> pl.Series, a panel-aware callable f(time_series, group_name) -> pl.Series, a DataFrame with "time" and "weight" columns, or a {datetime_or_str: float} dict ("*" key sets default).

None
step_weight callable, pl.DataFrame, dict, or None

Per-step weights. Same formats as time_weight but keyed on "forecasting_step".

None
vintage_weight callable, pl.DataFrame, dict, or None

Per-vintage weights. Same formats as time_weight but keyed on "vintage_time".

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
float or DataFrame

Aggregated metric score.

Source Code
Show/Hide source
def score(
    self,
    y_truth: pl.DataFrame,
    y_pred: pl.DataFrame,
    /,
    time_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
    step_weight: Callable | pl.DataFrame | dict[int | str, float] | None = None,
    vintage_weight: Callable | pl.DataFrame | dict[datetime | str, float] | None = None,
    **params,
) -> float | pl.DataFrame:
    """Compute the class-probability metric score.

    Template method: validate -> pre-filter zeros -> compute raw errors
    -> apply weights -> aggregate -> rename.

    Parameters
    ----------
    y_truth : pl.DataFrame
        True class labels with ``"time"`` column.
    y_pred : pl.DataFrame
        Predicted probabilities with ``"time"`` column.
    time_weight : callable, pl.DataFrame, dict, or None, default=None
        Time-based evaluation weights. Accepts a callable
        ``f(time_series) -> pl.Series``, a panel-aware callable
        ``f(time_series, group_name) -> pl.Series``, a DataFrame
        with ``"time"`` and ``"weight"`` columns, or a
        ``{datetime_or_str: float}`` dict (``"*"`` key sets default).
    step_weight : callable, pl.DataFrame, dict, or None, default=None
        Per-step weights. Same formats as ``time_weight`` but keyed on
        ``"forecasting_step"``.
    vintage_weight : callable, pl.DataFrame, dict, or None, default=None
        Per-vintage weights. Same formats as ``time_weight`` but keyed
        on ``"vintage_time"``.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    float or pl.DataFrame
        Aggregated metric score.

    """
    check_is_fitted(self, ["_is_fitted"])

    y_truth, y_pred, context = validate_scorer_data(self, y_truth, y_pred)

    # 0. Resolve weights and pre-filter zero-weight rows
    y_truth, y_pred, context, tw, sw, _ = self._pre_filter_zero_weights(
        y_truth,
        y_pred,
        context,
        time_weight,
        step_weight,
        vintage_weight,
    )

    # 0b. Validate probability columns are finite and in [0, 1]
    self._validate_probabilities(y_truth, y_pred)

    # 1. Compute raw per-timestep per-component errors
    scores = self._compute_raw_errors(y_truth, y_pred)

    # 2. Apply weights (time first, then step)
    scores = self._apply_weights(scores, tw, sw)

    # 3. Aggregate (includes transform + rename via _aggregate_per_vintage_scores)
    return self._aggregate_scores(scores, context=context)