Skip to content

BaseRankingScorer

yohou.metrics.base.BaseRankingScorer

Bases: BaseClassProbaScorer

Base class for ranking classification metrics.

Extends :class:BaseClassProbaScorer for metrics that require the full probability vector and cannot produce meaningful per-row values (e.g., ROC AUC, PR AUC). Overrides score() entirely.

Parameters

Name Type Description Default
average str

Class averaging strategy: "macro" (unweighted mean across classes) or "weighted" (support-weighted mean).

"macro"
aggregation_method list of str or str

Which dimensions to aggregate.

"all"
groups list of str, dict of str to float, or None

Panel group filter or filter with weights.

None
components list of str, dict of str to float, or None

Component filter or filter with weights.

None

Source Code

Show/Hide source
class BaseRankingScorer(BaseClassProbaScorer, metaclass=abc.ABCMeta):
    """Base class for ranking classification metrics.

    Extends :class:`BaseClassProbaScorer` for metrics that require the
    full probability vector and cannot produce meaningful per-row values
    (e.g., ROC AUC, PR AUC). Overrides ``score()`` entirely.

    Parameters
    ----------
    average : str, default="macro"
        Class averaging strategy: ``"macro"`` (unweighted mean across
        classes) or ``"weighted"`` (support-weighted mean).
    aggregation_method : list of str or str, default="all"
        Which dimensions to aggregate.
    groups : list of str, dict of str to float, or None, default=None
        Panel group filter or filter with weights.
    components : list of str, dict of str to float, or None, default=None
        Component filter or filter with weights.

    """

    _parameter_constraints: dict = {
        **BaseClassProbaScorer._parameter_constraints,
        "average": [StrOptions({"macro", "weighted"})],
    }

    def __init__(
        self,
        average: str = "macro",
        aggregation_method: list[str] | str = "all",
        groups: list[str] | dict[str, float] | None = None,
        components: list[str] | dict[str, float] | None = None,
    ):
        super().__init__(
            aggregation_method=aggregation_method,
            groups=groups,
            components=components,
        )
        self.average = average

    def _compute_raw_errors(self, y_truth: pl.DataFrame, y_pred: pl.DataFrame) -> pl.DataFrame:
        """Not used by ranking scorers (score() is overridden)."""
        raise NotImplementedError("Ranking scorers override score() directly")

    @abc.abstractmethod
    def _compute_ranking_metric(
        self,
        y_true_binary: np.ndarray,
        y_proba: np.ndarray,
        sample_weight: np.ndarray | None = None,
    ) -> float:
        """Compute ranking metric for a single class (one-vs-rest).

        Parameters
        ----------
        y_true_binary : np.ndarray
            Binary array (1 for positive class, 0 otherwise).
        y_proba : np.ndarray
            Predicted probabilities for the positive class.
        sample_weight : np.ndarray or None
            Per-sample weights.

        Returns
        -------
        float
            Metric value for this class.

        """

    def score(
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
        /,
        time_weight: Callable | pl.DataFrame | dict | None = None,
        step_weight: Callable | pl.DataFrame | dict | None = None,
        vintage_weight: Callable | pl.DataFrame | dict | None = None,
        **params,
    ) -> float | pl.DataFrame:
        """Compute the ranking metric score.

        Overrides the full scoring pipeline because ranking metrics
        require the complete probability vector per group and cannot
        produce meaningful per-row error values.

        Parameters
        ----------
        y_truth : pl.DataFrame
            True class labels with ``"time"`` column.
        y_pred : pl.DataFrame
            Predicted probabilities with ``"time"`` column.
        time_weight : callable, pl.DataFrame, dict, or None, default=None
            Time-based evaluation weights.
        step_weight : callable, pl.DataFrame, dict, or None, default=None
            Per-step weights.
        vintage_weight : callable, pl.DataFrame, dict, or None, default=None
            Per-vintage weights.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        float or pl.DataFrame
            Aggregated metric score.

        """
        check_is_fitted(self, ["_is_fitted"])

        y_truth, y_pred, context = validate_scorer_data(self, y_truth, y_pred)

        y_truth, y_pred, context, tw, sw, _ = self._pre_filter_zero_weights(
            y_truth,
            y_pred,
            context,
            time_weight,
            step_weight,
            vintage_weight,
        )

        self._validate_probabilities(y_truth, y_pred)

        # Resolve sample weights (tw + sw only; vintage weight handled via context)
        sample_weight = self._resolve_combined_weights(tw, sw, len(y_truth))

        target_cols = self._extract_target_columns(y_truth)
        vintage_time = context.vintage_time if context is not None else None

        # Per-vintage computation
        if vintage_time is not None and vintage_time.n_unique() > 1:
            vt_values = vintage_time.to_list()
            unique_vintages = vintage_time.unique(maintain_order=True).to_list()

            vintage_results: list[pl.DataFrame] = []
            for vt in unique_vintages:
                mask = [v == vt for v in vt_values]
                yt_slice = y_truth.filter(mask)
                yp_slice = y_pred.filter(mask)
                sw_slice = sample_weight[mask] if sample_weight is not None else None

                row_data: dict[str, list[float]] = {}
                for target_col in target_cols:
                    proba_cols, class_labels = self._extract_class_proba_columns(yp_slice, target_col)
                    if not proba_cols:
                        continue
                    score_val = self._compute_ovr_metric(
                        yt_slice[target_col],
                        yp_slice.select(proba_cols),
                        class_labels,
                        sw_slice,
                    )
                    row_data[target_col] = [score_val]

                row = pl.DataFrame(row_data)
                row = row.with_columns(pl.lit(vt).alias("vintage_time").cast(pl.Datetime))
                vintage_results.append(row)

            result = pl.concat(vintage_results, how="diagonal_relaxed")
        else:
            # Single-vintage: compute across all rows
            result_data: dict[str, list[float]] = {}
            for target_col in target_cols:
                proba_cols, class_labels = self._extract_class_proba_columns(y_pred, target_col)
                if not proba_cols:
                    continue
                score_val = self._compute_ovr_metric(
                    y_truth[target_col],
                    y_pred.select(proba_cols),
                    class_labels,
                    sample_weight,
                )
                result_data[target_col] = [score_val]
            result = pl.DataFrame(result_data)

        # Delegate tail
        return self._aggregate_per_vintage_scores(result, context)

    def _compute_ovr_metric(
        self,
        truth_series: pl.Series,
        proba_df: pl.DataFrame,
        class_labels: list[str],
        sample_weight: np.ndarray | None,
    ) -> float:
        """Compute one-vs-rest ranking metric with class averaging.

        Parameters
        ----------
        truth_series : pl.Series
            True class labels.
        proba_df : pl.DataFrame
            Probability columns.
        class_labels : list of str
            Class label names.
        sample_weight : np.ndarray or None
            Combined sample weights.

        Returns
        -------
        float
            Averaged metric across classes.

        """
        truth_arr = truth_series.to_numpy().astype(str)
        proba_arr = proba_df.to_numpy()

        per_class_scores: list[float] = []
        supports: list[int] = []

        for i, cls in enumerate(class_labels):
            y_binary = (truth_arr == cls).astype(np.float64)
            support = int(y_binary.sum())
            # Skip classes with no positive or no negative samples
            if support == 0 or support == len(y_binary):
                continue
            score_val = self._compute_ranking_metric(
                y_binary,
                proba_arr[:, i],
                sample_weight,
            )
            per_class_scores.append(score_val)
            supports.append(support)

        if not per_class_scores:
            return 0.0

        if self.average == "weighted":
            total_support = sum(supports)
            return sum(s * v / total_support for s, v in zip(supports, per_class_scores, strict=True))
        # macro
        return sum(per_class_scores) / len(per_class_scores)

    @staticmethod
    def _resolve_combined_weights(
        tw: np.ndarray | dict[str, np.ndarray] | None,
        sw: np.ndarray | dict[str, np.ndarray] | None,
        n: int,
    ) -> np.ndarray | None:
        """Combine resolved weight arrays into a single sample weight vector.

        For simplicity, ranking scorers use a single combined weight array.
        Dict (panel-aware) weights are not supported and are skipped
        with a warning.
        """
        arrays = []
        for w in (tw, sw):
            if w is None:
                continue
            if isinstance(w, dict):
                warnings.warn(
                    "Panel-aware (dict) weights are not supported by ranking scorers and will be ignored. "
                    "Use a flat callable or DataFrame weight instead.",
                    UserWarning,
                    stacklevel=3,
                )
                continue
            arrays.append(w)

        if not arrays:
            return None

        combined = np.ones(n, dtype=np.float64)
        for arr in arrays:
            combined *= arr
        return combined

Methods

score(y_truth, y_pred, /, time_weight=None, step_weight=None, vintage_weight=None, **params)

Compute the ranking metric score.

Overrides the full scoring pipeline because ranking metrics require the complete probability vector per group and cannot produce meaningful per-row error values.

Parameters
Name Type Description Default
y_truth DataFrame

True class labels with "time" column.

required
y_pred DataFrame

Predicted probabilities with "time" column.

required
time_weight callable, pl.DataFrame, dict, or None

Time-based evaluation weights.

None
step_weight callable, pl.DataFrame, dict, or None

Per-step weights.

None
vintage_weight callable, pl.DataFrame, dict, or None

Per-vintage weights.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
float or DataFrame

Aggregated metric score.

Source Code
Show/Hide source
def score(
    self,
    y_truth: pl.DataFrame,
    y_pred: pl.DataFrame,
    /,
    time_weight: Callable | pl.DataFrame | dict | None = None,
    step_weight: Callable | pl.DataFrame | dict | None = None,
    vintage_weight: Callable | pl.DataFrame | dict | None = None,
    **params,
) -> float | pl.DataFrame:
    """Compute the ranking metric score.

    Overrides the full scoring pipeline because ranking metrics
    require the complete probability vector per group and cannot
    produce meaningful per-row error values.

    Parameters
    ----------
    y_truth : pl.DataFrame
        True class labels with ``"time"`` column.
    y_pred : pl.DataFrame
        Predicted probabilities with ``"time"`` column.
    time_weight : callable, pl.DataFrame, dict, or None, default=None
        Time-based evaluation weights.
    step_weight : callable, pl.DataFrame, dict, or None, default=None
        Per-step weights.
    vintage_weight : callable, pl.DataFrame, dict, or None, default=None
        Per-vintage weights.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    float or pl.DataFrame
        Aggregated metric score.

    """
    check_is_fitted(self, ["_is_fitted"])

    y_truth, y_pred, context = validate_scorer_data(self, y_truth, y_pred)

    y_truth, y_pred, context, tw, sw, _ = self._pre_filter_zero_weights(
        y_truth,
        y_pred,
        context,
        time_weight,
        step_weight,
        vintage_weight,
    )

    self._validate_probabilities(y_truth, y_pred)

    # Resolve sample weights (tw + sw only; vintage weight handled via context)
    sample_weight = self._resolve_combined_weights(tw, sw, len(y_truth))

    target_cols = self._extract_target_columns(y_truth)
    vintage_time = context.vintage_time if context is not None else None

    # Per-vintage computation
    if vintage_time is not None and vintage_time.n_unique() > 1:
        vt_values = vintage_time.to_list()
        unique_vintages = vintage_time.unique(maintain_order=True).to_list()

        vintage_results: list[pl.DataFrame] = []
        for vt in unique_vintages:
            mask = [v == vt for v in vt_values]
            yt_slice = y_truth.filter(mask)
            yp_slice = y_pred.filter(mask)
            sw_slice = sample_weight[mask] if sample_weight is not None else None

            row_data: dict[str, list[float]] = {}
            for target_col in target_cols:
                proba_cols, class_labels = self._extract_class_proba_columns(yp_slice, target_col)
                if not proba_cols:
                    continue
                score_val = self._compute_ovr_metric(
                    yt_slice[target_col],
                    yp_slice.select(proba_cols),
                    class_labels,
                    sw_slice,
                )
                row_data[target_col] = [score_val]

            row = pl.DataFrame(row_data)
            row = row.with_columns(pl.lit(vt).alias("vintage_time").cast(pl.Datetime))
            vintage_results.append(row)

        result = pl.concat(vintage_results, how="diagonal_relaxed")
    else:
        # Single-vintage: compute across all rows
        result_data: dict[str, list[float]] = {}
        for target_col in target_cols:
            proba_cols, class_labels = self._extract_class_proba_columns(y_pred, target_col)
            if not proba_cols:
                continue
            score_val = self._compute_ovr_metric(
                y_truth[target_col],
                y_pred.select(proba_cols),
                class_labels,
                sample_weight,
            )
            result_data[target_col] = [score_val]
        result = pl.DataFrame(result_data)

    # Delegate tail
    return self._aggregate_per_vintage_scores(result, context)