Skip to content

ContinuousRankedProbabilityScore

yohou.metrics.interval.ContinuousRankedProbabilityScore

Bases: BaseIntervalScorer

Continuous Ranked Probability Score (CRPS) for prediction intervals.

Approximates CRPS by averaging pinball losses across multiple coverage rates. Each coverage rate α contributes two quantile levels, τ_lower = (1−α)/2 and τ_upper = (1+α)/2. The score averages the mean quantile loss per rate across all rates, approximating the integral of quantile loss over [0, 1].

The approximation improves as coverage rates become denser.

\[\text{CRPS} \approx \frac{1}{K}\sum_{k=1}^{K} \frac{L(\tau_{k,\text{lower}}) + L(\tau_{k,\text{upper}})}{2}\]

where \(L(\tau)\) is the pinball loss at quantile \(\tau\) and \(K\) is the number of coverage rates.

Parameters

Name Type Description Default
aggregation_method list of str or str

Dimensions to collapse when aggregating scores. Orthogonal modes:

  • "stepwise": Collapse the forecasting-step dimension.
  • "vintagewise": Collapse the vintage/observed-time dimension.
  • "componentwise": Collapse components, return per-timestep scores.
  • "groupwise": Collapse panel groups (panel data only).

Coverage rates are always collapsed ("coveragewise" is forced).

  • "all": Collapse all dimensions (returns scalar).
"all"
integration ('mean', 'trapezoidal')

Method for integrating pinball losses across coverage rates:

  • "mean": Simple average across rates.
  • "trapezoidal": Midpoint-rule weights based on quantile spacing, giving more weight to rates that cover wider quantile regions.
"mean"
coverage_rates list of float or None

Coverage rate filter. Must contain at least 2 rates. Dict weights are not supported (use PinballLoss for custom weighting).

None
groups list of str, dict of str to float, or None

Panel group filter (list) or filter with weights (dict).

None
components list of str, dict of str to float, or None

Component filter (list) or filter with weights (dict).

None

Attributes

Name Type Description
lower_is_better bool

True for CRPS (lower is better).

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> from yohou.metrics import ContinuousRankedProbabilityScore
>>> y_true = pl.DataFrame({"time": [datetime(2020, 1, 1), datetime(2020, 1, 2)], "value": [10.0, 20.0]})
>>> y_pred = pl.DataFrame({
...     "vintage_time": [datetime(2019, 12, 31)] * 2,
...     "time": [datetime(2020, 1, 1), datetime(2020, 1, 2)],
...     "value_lower_0.5": [9.0, 19.0],
...     "value_upper_0.5": [11.0, 21.0],
...     "value_lower_0.9": [8.0, 18.0],
...     "value_upper_0.9": [12.0, 22.0],
... })
>>> crps = ContinuousRankedProbabilityScore()
>>> _ = crps.fit(y_true)
>>> crps.score(y_true, y_pred)
0.17...

Notes

  • Lower is better (0 = perfect forecast)
  • Scale-dependent
  • Requires at least 2 coverage rates
  • For per-rate inspection, use PinballLoss instead

See Also

Source Code

Show/Hide source
class ContinuousRankedProbabilityScore(BaseIntervalScorer):
    r"""Continuous Ranked Probability Score (CRPS) for prediction intervals.

    Approximates CRPS by averaging pinball losses across multiple coverage
    rates. Each coverage rate α contributes two quantile levels,
    τ_lower = (1−α)/2 and τ_upper = (1+α)/2. The score averages
    the mean quantile loss per rate across all rates, approximating
    the integral of quantile loss over [0, 1].

    The approximation improves as coverage rates become denser.

    $$\text{CRPS} \approx \frac{1}{K}\sum_{k=1}^{K}
    \frac{L(\tau_{k,\text{lower}}) + L(\tau_{k,\text{upper}})}{2}$$

    where $L(\tau)$ is the pinball loss at quantile $\tau$ and $K$ is the
    number of coverage rates.

    Parameters
    ----------
    aggregation_method : list of str or str, default="all"
        Dimensions to collapse when aggregating scores. Orthogonal modes:

        - "stepwise": Collapse the forecasting-step dimension.
        - "vintagewise": Collapse the vintage/observed-time dimension.
        - "componentwise": Collapse components, return per-timestep scores.
        - "groupwise": Collapse panel groups (panel data only).

        Coverage rates are always collapsed ("coveragewise" is forced).

        - "all": Collapse all dimensions (returns scalar).
    integration : {"mean", "trapezoidal"}, default="mean"
        Method for integrating pinball losses across coverage rates:

        - "mean": Simple average across rates.
        - "trapezoidal": Midpoint-rule weights based on quantile spacing,
          giving more weight to rates that cover wider quantile regions.
    coverage_rates : list of float or None, default=None
        Coverage rate filter. Must contain at least 2 rates.
        Dict weights are not supported (use PinballLoss for custom weighting).
    groups : list of str, dict of str to float, or None, default=None
        Panel group filter (list) or filter with weights (dict).
    components : list of str, dict of str to float, or None, default=None
        Component filter (list) or filter with weights (dict).

    Attributes
    ----------
    lower_is_better : bool
        True for CRPS (lower is better).

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> from yohou.metrics import ContinuousRankedProbabilityScore
    >>> y_true = pl.DataFrame({"time": [datetime(2020, 1, 1), datetime(2020, 1, 2)], "value": [10.0, 20.0]})
    >>> y_pred = pl.DataFrame({
    ...     "vintage_time": [datetime(2019, 12, 31)] * 2,
    ...     "time": [datetime(2020, 1, 1), datetime(2020, 1, 2)],
    ...     "value_lower_0.5": [9.0, 19.0],
    ...     "value_upper_0.5": [11.0, 21.0],
    ...     "value_lower_0.9": [8.0, 18.0],
    ...     "value_upper_0.9": [12.0, 22.0],
    ... })
    >>> crps = ContinuousRankedProbabilityScore()
    >>> _ = crps.fit(y_true)
    >>> crps.score(y_true, y_pred)  # doctest: +ELLIPSIS
    0.17...

    Notes
    -----
    - Lower is better (0 = perfect forecast)
    - Scale-dependent
    - Requires at least 2 coverage rates
    - For per-rate inspection, use PinballLoss instead

    See Also
    --------
    - [`PinballLoss`][yohou.metrics.interval.PinballLoss] : Per-quantile loss with optional per-rate results
    - [`IntervalScore`][yohou.metrics.interval.IntervalScore] : Combined coverage and sharpness metric

    """

    _parameter_constraints: dict = {
        **BaseIntervalScorer._parameter_constraints,
        "integration": [StrOptions({"mean", "trapezoidal"})],
        "coverage_rates": [list, None],
    }

    _metric_name = "crps"

    def __init__(
        self,
        aggregation_method: list[str] | str = "all",
        integration: str = "mean",
        coverage_rates: list[float] | None = None,
        groups: list[str] | dict[str, float] | None = None,
        components: list[str] | dict[str, float] | None = None,
    ) -> None:
        self.integration = integration

        agg_list = aggregation_method
        if aggregation_method == "all":
            agg_list = ["stepwise", "vintagewise", "componentwise", "groupwise", "coveragewise"]

        super().__init__(
            aggregation_method=agg_list,
            coverage_rates=coverage_rates,
            groups=groups,
            components=components,
        )

    def _compute_raw_scores(self, y_truth, y_pred, coverage_rates, target_columns):
        """Compute per-row mean quantile loss values."""
        frames = []
        for rate in coverage_rates:
            tau_lower = (1 - rate) / 2
            tau_upper = (1 + rate) / 2
            rate_data = {}
            for col in target_columns:
                lower_col = f"{col}_lower_{rate}"
                upper_col = f"{col}_upper_{rate}"
                if lower_col in y_pred.columns and upper_col in y_pred.columns:
                    loss_lower = _pinball_loss(tau_lower, y_truth[col], y_pred[lower_col])
                    loss_upper = _pinball_loss(tau_upper, y_truth[col], y_pred[upper_col])
                    loss_lower_series = y_pred.select(
                        loss_lower.alias("loss_lower")  # ty: ignore[unresolved-attribute]
                    )["loss_lower"]
                    loss_upper_series = y_pred.select(
                        loss_upper.alias("loss_upper")  # ty: ignore[unresolved-attribute]
                    )["loss_upper"]
                    rate_data[col] = (loss_lower_series + loss_upper_series) / 2
            frames.append(pl.DataFrame(rate_data).with_columns(pl.lit(rate).alias("coverage_rate")))
        return pl.concat(frames)

    def _collapse_coverage_rates(self, df: pl.DataFrame) -> pl.DataFrame:
        """Collapse coverage rates using the configured integration method."""
        if self.integration == "mean":
            return super()._collapse_coverage_rates(df)

        # Trapezoidal integration
        if "coverage_rate" not in df.columns:
            return df

        if len(df) == 0:
            return df.drop("coverage_rate")

        meta_names = {"forecasting_step", "vintage_time", "time"}
        meta_cols = [c for c in df.columns if c in meta_names]
        val_cols = [c for c in df.columns if c not in meta_names and c != "coverage_rate"]

        n_rates = df["coverage_rate"].n_unique()
        n_rows_per_rate = len(df) // n_rates

        row_idx = list(range(n_rows_per_rate)) * n_rates
        df = df.with_columns(pl.Series("_row_idx", row_idx))
        group_cols = ["_row_idx"] + meta_cols

        rates = df["coverage_rate"].unique().sort().to_list()
        weights = _trapezoidal_weights(rates)

        df = df.with_columns(
            pl
            .col("coverage_rate")
            .replace_strict(
                {r: weights[r] for r in rates},
                default=1.0,
            )
            .alias("_cw")
        )
        result = df.group_by(group_cols, maintain_order=True).agg([
            (pl.col(c) * pl.col("_cw")).sum() / pl.col("_cw").sum() for c in val_cols
        ])

        return result.sort("_row_idx").drop("_row_idx")

    def _aggregate_scores(self, raw_scores, context=None):
        """Aggregate scores, always collapsing coverage rates regardless of aggregation_method."""
        has_coverage_rate = "coverage_rate" in raw_scores.columns
        dims = self._normalize_agg_methods(
            self.aggregation_method,
            include_coveragewise=has_coverage_rate,
        )

        result = raw_scores

        # Always collapse coverage rates for CRPS
        result = self._collapse_coverage_rates(result)

        # Collapse row dimensions (steps and/or vintages)
        result = self._collapse_rows(result, context, dims)

        # Collapse components
        if "componentwise" in dims:
            result = self._collapse_components(result)

        # Collapse panel groups
        if "groupwise" in dims:
            result = self._collapse_groups(result)

        return self._finalize(result, context, dims)

    def score(  # type: ignore
        self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, /, **params
    ) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
        """Compute CRPS.

        Parameters
        ----------
        y_truth : pl.DataFrame
            True values with "time" column.
        y_pred : pl.DataFrame
            Predicted intervals with "{col}_lower_{rate}", "{col}_upper_{rate}" columns.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        float or pl.DataFrame
            CRPS score.

        Raises
        ------
        ValueError
            If fewer than 2 coverage rates are provided.

        """
        rates = self._extract_coverage_rates(y_pred)
        if len(rates) < 2:
            msg = (
                f"ContinuousRankedProbabilityScore requires at least 2 coverage rates, "
                f"but only {len(rates)} provided. "
                f"Use multiple coverage rates to compute CRPS."
            )
            raise ValueError(msg)

        return super().score(y_truth, y_pred, **params)

Methods

score(y_truth, y_pred, /, **params)

Compute CRPS.

Parameters
Name Type Description Default
y_truth DataFrame

True values with "time" column.

required
y_pred DataFrame

Predicted intervals with "{col}lower}", "{colupper" columns.

required
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
float or DataFrame

CRPS score.

Raises
Type Description
ValueError

If fewer than 2 coverage rates are provided.

Source Code
Show/Hide source
def score(  # type: ignore
    self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, /, **params
) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
    """Compute CRPS.

    Parameters
    ----------
    y_truth : pl.DataFrame
        True values with "time" column.
    y_pred : pl.DataFrame
        Predicted intervals with "{col}_lower_{rate}", "{col}_upper_{rate}" columns.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    float or pl.DataFrame
        CRPS score.

    Raises
    ------
    ValueError
        If fewer than 2 coverage rates are provided.

    """
    rates = self._extract_coverage_rates(y_pred)
    if len(rates) < 2:
        msg = (
            f"ContinuousRankedProbabilityScore requires at least 2 coverage rates, "
            f"but only {len(rates)} provided. "
            f"Use multiple coverage rates to compute CRPS."
        )
        raise ValueError(msg)

    return super().score(y_truth, y_pred, **params)