Skip to content

R2Score

yohou.metrics.point.R2Score

Bases: BasePointScorer

R-squared (Coefficient of Determination) metric for point forecasts.

Computes the proportion of variance in the true values that is explained by the predictions. A score of 1.0 indicates perfect prediction, 0.0 indicates performance equivalent to predicting the mean, and negative values indicate worse performance than predicting the mean.

The R² is defined as:

\[R^2 = 1 - \frac{\sum_{i=1}^{n}(y_i - \hat{y}_i)^2}{\sum_{i=1}^{n}(y_i - \bar{y})^2}\]

where \(y_i\) is the actual value, \(\hat{y}_i\) is the predicted value, \(\bar{y}\) is the mean of actual values, and \(n\) is the number of observations.

Parameters

Name Type Description Default
aggregation_method list of str or str

Dimensions to aggregate over. Options: - "stepwise": Aggregate across forecasting steps. - "vintagewise": Aggregate across vintages (observed times). - "componentwise": Aggregate across components, return per-timestep DataFrame - "groupwise": Aggregate across panel groups (panel data only) - "all": Aggregate across all dimensions (returns scalar). Same as ["stepwise", "vintagewise", "componentwise", "groupwise"].

"all"
groups list of str, dict of str to float, or None

Panel group filter (list) or filter with weights (dict).

None
components list of str, dict of str to float, or None

Component filter (list) or filter with weights (dict).

None

Attributes

Name Type Description
lower_is_better bool

Always False for R². Higher values indicate better fit.

Examples

>>> import polars as pl
>>> from datetime import datetime
>>> from yohou.metrics import R2Score
>>> y_true = pl.DataFrame({
...     "time": [datetime(2020, 1, 1), datetime(2020, 1, 2), datetime(2020, 1, 3)],
...     "value": [10.0, 20.0, 30.0],
... })
>>> y_pred = pl.DataFrame({
...     "vintage_time": [datetime(2019, 12, 31)] * 3,
...     "time": [datetime(2020, 1, 1), datetime(2020, 1, 2), datetime(2020, 1, 3)],
...     "value": [12.0, 18.0, 31.0],
... })
>>> r2 = R2Score()
>>> _ = r2.fit(y_true)
>>> r2.score(y_true, y_pred)
0.955

Notes

  • R² = 1.0 means perfect prediction
  • R² = 0.0 means predictions are as good as predicting the mean
  • R² < 0 means predictions are worse than predicting the mean
  • When SS_tot = 0 (constant true values), returns 0.0 by convention
  • Overrides score() because computing the denominator (SS_tot) requires access to the full y_truth column, not just per-row errors

See Also

Source Code

Show/Hide source
class R2Score(BasePointScorer):
    r"""R-squared (Coefficient of Determination) metric for point forecasts.

    Computes the proportion of variance in the true values that is explained
    by the predictions. A score of 1.0 indicates perfect prediction, 0.0
    indicates performance equivalent to predicting the mean, and negative
    values indicate worse performance than predicting the mean.

    The R² is defined as:

    $$R^2 = 1 - \frac{\sum_{i=1}^{n}(y_i - \hat{y}_i)^2}{\sum_{i=1}^{n}(y_i - \bar{y})^2}$$

    where $y_i$ is the actual value, $\hat{y}_i$ is the predicted value,
    $\bar{y}$ is the mean of actual values, and $n$ is the number of observations.

    Parameters
    ----------
    aggregation_method : list of str or str, default="all"
        Dimensions to aggregate over. Options:
        - "stepwise": Aggregate across forecasting steps.
        - "vintagewise": Aggregate across vintages (observed times).
        - "componentwise": Aggregate across components, return per-timestep DataFrame
        - "groupwise": Aggregate across panel groups (panel data only)
        - "all": Aggregate across all dimensions (returns scalar). Same as
          ["stepwise", "vintagewise", "componentwise", "groupwise"].
    groups : list of str, dict of str to float, or None, default=None
        Panel group filter (list) or filter with weights (dict).
    components : list of str, dict of str to float, or None, default=None
        Component filter (list) or filter with weights (dict).

    Attributes
    ----------
    lower_is_better : bool
        Always False for R². Higher values indicate better fit.

    Examples
    --------
    >>> import polars as pl
    >>> from datetime import datetime
    >>> from yohou.metrics import R2Score
    >>> y_true = pl.DataFrame({
    ...     "time": [datetime(2020, 1, 1), datetime(2020, 1, 2), datetime(2020, 1, 3)],
    ...     "value": [10.0, 20.0, 30.0],
    ... })
    >>> y_pred = pl.DataFrame({
    ...     "vintage_time": [datetime(2019, 12, 31)] * 3,
    ...     "time": [datetime(2020, 1, 1), datetime(2020, 1, 2), datetime(2020, 1, 3)],
    ...     "value": [12.0, 18.0, 31.0],
    ... })
    >>> r2 = R2Score()
    >>> _ = r2.fit(y_true)
    >>> r2.score(y_true, y_pred)  # doctest: +ELLIPSIS
    0.955

    Notes
    -----
    - R² = 1.0 means perfect prediction
    - R² = 0.0 means predictions are as good as predicting the mean
    - R² < 0 means predictions are worse than predicting the mean
    - When SS_tot = 0 (constant true values), returns 0.0 by convention
    - Overrides ``score()`` because computing the denominator (SS_tot) requires
      access to the full ``y_truth`` column, not just per-row errors

    See Also
    --------
    - [`MeanSquaredError`][yohou.metrics.point.MeanSquaredError] : Mean Squared Error, the numerator component of R²
    - [`MeanAbsoluteError`][yohou.metrics.point.MeanAbsoluteError] : Mean Absolute Error, alternative regression metric

    """

    _metric_name = "r2"

    lower_is_better = False

    def __init__(
        self,
        aggregation_method: list[str] | str = "all",
        groups: list[str] | dict[str, float] | None = None,
        components: list[str] | dict[str, float] | None = None,
    ) -> None:
        super().__init__(
            aggregation_method=aggregation_method,
            groups=groups,
            components=components,
        )

    def _compute_raw_errors(self, y_truth: pl.DataFrame, y_pred: pl.DataFrame) -> pl.DataFrame:
        """Not used directly. R² overrides score()."""
        return (y_truth - y_pred).select(pl.all().pow(2))

    def score(  # type: ignore
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
        /,
        vintage_weight: Callable | pl.DataFrame | dict | None = None,
        **params,
    ) -> float | pl.DataFrame:
        """Compute R-squared score.

        Parameters
        ----------
        y_truth : pl.DataFrame
            True values with "time" column.
        y_pred : pl.DataFrame
            Predicted values with "time" column.
        vintage_weight : callable, pl.DataFrame, dict, or None, default=None
            Per-vintage weights for cross-vintage aggregation.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        float or pl.DataFrame
            R² score. 1.0 for perfect predictions, 0.0 for mean-level predictions.

        Raises
        ------
        TypeError
            If time_weight or step_weight are passed.

        """
        self._reject_weights(**params)
        check_is_fitted(self, ["_is_fitted"])

        y_truth, y_pred, context = validate_scorer_data(
            self,
            y_truth,
            y_pred,
        )

        # Resolve vintage_weight into context
        context = self._resolve_vintage_weight_to_context(context, vintage_weight)

        def _compute_r2(yt_slice: pl.DataFrame, yp_slice: pl.DataFrame) -> pl.DataFrame:
            """Compute per-column R² score."""
            r2_values = {}
            for col in yt_slice.columns:
                truth = yt_slice[col].to_numpy().astype(np.float64)
                pred = yp_slice[col].to_numpy().astype(np.float64)
                ss_res = np.sum((truth - pred) ** 2)
                ss_tot = np.sum((truth - np.mean(truth)) ** 2)
                r2_values[col] = 1.0 - ss_res / ss_tot if ss_tot != 0 else 0.0
            return pl.DataFrame(r2_values).select(yt_slice.columns)

        result = self._map_per_vintage(y_truth, y_pred, context, _compute_r2)
        return self._aggregate_per_vintage_scores(result, context)

    def __sklearn_tags__(self):
        """Get estimator tags.

        Returns
        -------
        Tags
            Estimator tags with lower_is_better=False.

        """
        tags = super().__sklearn_tags__()
        if tags.scorer_tags is not None:
            tags.scorer_tags.lower_is_better = False
        return tags

Methods

score(y_truth, y_pred, /, vintage_weight=None, **params)

Compute R-squared score.

Parameters
Name Type Description Default
y_truth DataFrame

True values with "time" column.

required
y_pred DataFrame

Predicted values with "time" column.

required
vintage_weight callable, pl.DataFrame, dict, or None

Per-vintage weights for cross-vintage aggregation.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
float or DataFrame

R² score. 1.0 for perfect predictions, 0.0 for mean-level predictions.

Raises
Type Description
TypeError

If time_weight or step_weight are passed.

Source Code
Show/Hide source
def score(  # type: ignore
    self,
    y_truth: pl.DataFrame,
    y_pred: pl.DataFrame,
    /,
    vintage_weight: Callable | pl.DataFrame | dict | None = None,
    **params,
) -> float | pl.DataFrame:
    """Compute R-squared score.

    Parameters
    ----------
    y_truth : pl.DataFrame
        True values with "time" column.
    y_pred : pl.DataFrame
        Predicted values with "time" column.
    vintage_weight : callable, pl.DataFrame, dict, or None, default=None
        Per-vintage weights for cross-vintage aggregation.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    float or pl.DataFrame
        R² score. 1.0 for perfect predictions, 0.0 for mean-level predictions.

    Raises
    ------
    TypeError
        If time_weight or step_weight are passed.

    """
    self._reject_weights(**params)
    check_is_fitted(self, ["_is_fitted"])

    y_truth, y_pred, context = validate_scorer_data(
        self,
        y_truth,
        y_pred,
    )

    # Resolve vintage_weight into context
    context = self._resolve_vintage_weight_to_context(context, vintage_weight)

    def _compute_r2(yt_slice: pl.DataFrame, yp_slice: pl.DataFrame) -> pl.DataFrame:
        """Compute per-column R² score."""
        r2_values = {}
        for col in yt_slice.columns:
            truth = yt_slice[col].to_numpy().astype(np.float64)
            pred = yp_slice[col].to_numpy().astype(np.float64)
            ss_res = np.sum((truth - pred) ** 2)
            ss_tot = np.sum((truth - np.mean(truth)) ** 2)
            r2_values[col] = 1.0 - ss_res / ss_tot if ss_tot != 0 else 0.0
        return pl.DataFrame(r2_values).select(yt_slice.columns)

    result = self._map_per_vintage(y_truth, y_pred, context, _compute_r2)
    return self._aggregate_per_vintage_scores(result, context)

__sklearn_tags__()

Get estimator tags.

Returns
Type Description
Tags

Estimator tags with lower_is_better=False.

Source Code
Show/Hide source
def __sklearn_tags__(self):
    """Get estimator tags.

    Returns
    -------
    Tags
        Estimator tags with lower_is_better=False.

    """
    tags = super().__sklearn_tags__()
    if tags.scorer_tags is not None:
        tags.scorer_tags.lower_is_better = False
    return tags