class BaseScorer(BaseEstimator, metaclass=abc.ABCMeta):
"""Base class for all forecasting metrics.
Defines the interface for scoring forecast quality. All scorers must implement
the `score` method and can optionally override `fit` for metrics
that require training data statistics.
Parameters
----------
groups : list of str, dict of str to float, or None, default=None
Panel group filter (list) or filter with weights (dict). If None,
all panel groups are included with equal weight.
components : list of str, dict of str to float, or None, default=None
Component filter (list) or filter with weights (dict). If None,
all components are included with equal weight.
Notes
-----
The ``aggregation_method`` parameter (on subclasses) controls which
dimensions are collapsed when computing scores. Orthogonal modes:
``"stepwise"``, ``"vintagewise"``, ``"componentwise"``,
``"groupwise"``, ``"coveragewise"`` (interval only), or ``"all"``.
See Also
--------
- [`BasePointScorer`][yohou.metrics.base.BasePointScorer] : Base class for point-prediction metrics.
- [`BaseIntervalScorer`][yohou.metrics.base.BaseIntervalScorer] : Base class for interval-prediction metrics.
- [`BaseConformityScorer`][yohou.metrics.conformity_base.BaseConformityScorer] : Base class for conformity scorers.
"""
_lower_is_better: bool = True
_parameter_constraints: dict = {
"groups": [list, dict, None],
"components": [list, dict, None],
}
def __init__(
self,
groups: list[str] | dict[str, float] | None = None,
components: list[str] | dict[str, float] | None = None,
):
self.groups = groups
self.components = components
@staticmethod
def _filter_keys(param: list | dict | None) -> list | None:
"""Extract filter list from a polymorphic param."""
if isinstance(param, dict):
return list(param.keys())
return param
@staticmethod
def _weight_dict(param: list | dict | None) -> dict | None:
"""Extract weight dict from a polymorphic param."""
if isinstance(param, dict):
return param
return None
@property
def lower_is_better(self) -> bool:
"""Whether lower scores indicate better performance."""
return self._lower_is_better
def __sklearn_tags__(self) -> Tags:
"""Get estimator tags.
Returns
-------
Tags
Estimator tags with scorer-specific attributes.
"""
tags = Tags(estimator_type="scorer", requires_fit=False)
# Subclasses set prediction_type in their __sklearn_tags__() method
# Most scorers don't require calibration (fit is optional)
assert tags.scorer_tags is not None
tags.scorer_tags.requires_calibration = False
tags.scorer_tags.lower_is_better = self._lower_is_better
return tags
@_fit_context(prefer_skip_nested_validation=True)
def fit(self, y_train: pl.DataFrame, *, forecaster=None, **params) -> BaseScorer:
"""Fit the scorer on training data.
Validates ``groups`` and ``component_names`` against
training data. Stores training data statistics for scaled metrics
(e.g., MASE). Subclasses should override to add type-specific
parameter validation.
Parameters
----------
y_train : pl.DataFrame
Training target time series with a ``"time"`` column and one or
more numeric value columns.
forecaster : BaseForecaster or None, default=None
If provided, metadata is extracted directly from the fitted
forecaster (``interval_``, ``groups_``, ``forecaster_horizon_``)
instead of being re-inferred from ``y_train``.
**params : dict
Metadata to route to nested estimators.
Returns
-------
self
The fitted scorer instance.
Raises
------
ValueError
If ``groups`` or ``component_names`` contain names not
present in ``y_train``.
"""
# Validate base parameters (groups, component_names)
self._validate_parameters(y_train=y_train)
# Validate input structure without aligning (single dataframe)
validate_scorer_data(self, y_true=y_train, y_pred=None, reset=True)
# Infer groups_ from y_train panel structure
_, panel_groups = inspect_panel(y_train)
self.groups_ = list(panel_groups.keys()) if panel_groups else None
if forecaster is not None:
# Extract metadata from fitted forecaster
check_is_fitted(forecaster)
self.interval_ = forecaster.interval_
self.forecaster_horizon_ = forecaster.fit_forecasting_horizon_
# Infer interval from training data time column
elif "time" in y_train.columns and len(y_train) >= 2:
self.interval_ = check_interval_consistency(y_train)
else:
self.interval_ = None
# Mark as fitted
self._is_fitted = True
return self
def _collapse_groups(self, df: pl.DataFrame) -> pl.DataFrame:
"""Collapse panel groups via weighted average.
For each component (suffix after ``__``), computes a weighted average
across all panel groups containing that component. Non-panel data
is returned unchanged.
"""
# Identify metadata columns (coverage_rate, context dims) to preserve
meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
meta_cols = [c for c in df.columns if c in meta_names]
value_df = df.select([c for c in df.columns if c not in meta_names])
_, panel_groups = inspect_panel(value_df)
if len(panel_groups) == 0:
return df
# component -> [(group_name, column_name)]
components: dict[str, list[tuple[str, str]]] = {}
for group_name, group_cols in panel_groups.items():
for col in group_cols:
component = col.split("__", 1)[1]
if component not in components:
components[component] = []
components[component].append((group_name, col))
weights: dict[str, float] = {}
for group_name in panel_groups:
gw = self._weight_dict(self.groups)
weights[group_name] = gw.get(group_name, 1.0) if gw else 1.0
exprs: list[pl.Expr] = [pl.col(c) for c in meta_cols]
for component, group_cols in components.items():
total_weight = sum(weights[gn] for gn, _ in group_cols)
if total_weight == 0:
raise ValueError("Total panel group weight is zero")
weighted_terms = [pl.col(col_name) * (weights[gn] / total_weight) for gn, col_name in group_cols]
exprs.append(pl.sum_horizontal(weighted_terms).alias(component))
return df.select(exprs)
def _collapse_coverage_rates(self, df: pl.DataFrame) -> pl.DataFrame:
"""Collapse coverage_rate dimension via weighted average.
Rows within each coverage rate block are assumed to be in the same
order (one row per timestep). Collapsing averages across rates for
each timestep, preserving row dimension.
"""
if "coverage_rate" not in df.columns:
return df
if len(df) == 0:
return df.drop("coverage_rate")
meta_names = {"forecasting_step", "vintage_time", "time"}
meta_cols = [c for c in df.columns if c in meta_names]
val_cols = [c for c in df.columns if c not in meta_names and c != "coverage_rate"]
n_rates = df["coverage_rate"].n_unique()
n_rows_per_rate = len(df) // n_rates
# Add row index to identify timesteps across rate blocks
row_idx = list(range(n_rows_per_rate)) * n_rates
df = df.with_columns(pl.Series("_row_idx", row_idx))
group_cols = ["_row_idx"] + meta_cols
cw = self._weight_dict(getattr(self, "coverage_rates", None))
if cw is not None:
df = df.with_columns(
pl
.col("coverage_rate")
.replace_strict(
{r: cw.get(r, 1.0) for r in df["coverage_rate"].unique().to_list()},
default=1.0,
)
.alias("_cw")
)
result = df.group_by(group_cols, maintain_order=True).agg([
(pl.col(c) * pl.col("_cw")).sum() / pl.col("_cw").sum() for c in val_cols
])
else:
result = df.group_by(group_cols, maintain_order=True).agg([pl.col(c).mean() for c in val_cols])
return result.sort("_row_idx").drop("_row_idx")
def _collapse_rows(
self,
df: pl.DataFrame,
context: ScoringContext | None,
dims: set[str],
) -> pl.DataFrame:
"""Collapse row dimensions (stepwise and/or vintagewise) using mean."""
return self._collapse_rows_with(df, context, dims, agg_fn="mean")
def _collapse_rows_with(
self,
df: pl.DataFrame,
context: ScoringContext | None,
dims: set[str],
agg_fn: str = "mean",
) -> pl.DataFrame:
"""Collapse row dimensions using the specified aggregation function.
Parameters
----------
df : pl.DataFrame
DataFrame with per-row scores.
context : ScoringContext or None
Scoring context with time values and metadata.
dims : set of str
Aggregation dimensions.
agg_fn : str, default="mean"
Polars aggregation function name ("mean", "sum", "max").
"""
collapse_steps = "stepwise" in dims
collapse_vintages = "vintagewise" in dims
if not collapse_steps and not collapse_vintages:
return df
meta_names = {"coverage_rate"}
meta_cols = [c for c in df.columns if c in meta_names]
val_cols = [c for c in df.columns if c not in meta_names]
def _agg_exprs(cols: list[str]) -> list[pl.Expr]:
"""Build aggregation expressions for the given columns."""
return [getattr(pl.col(c), agg_fn)() for c in cols]
if collapse_steps and collapse_vintages:
# Per-vintage-first: group by vintage_time within each vintage,
# then return per-vintage rows (vintage collapse happens later).
vintage_time = getattr(context, "vintage_time", None) if context is not None else None
if vintage_time is not None and vintage_time.n_unique() > 1:
vt_list = vintage_time.to_list()
if "coverage_rate" in df.columns:
n_rates = df["coverage_rate"].n_unique()
vt_list = vt_list * n_rates
group_cols = ["vintage_time"] + meta_cols
return (
df
.with_columns(pl.Series("vintage_time", vt_list))
.group_by(group_cols, maintain_order=True)
.agg(_agg_exprs(val_cols))
.sort("vintage_time")
)
if meta_cols:
return df.group_by(meta_cols, maintain_order=True).agg(_agg_exprs(val_cols))
if agg_fn == "mean":
return df.select(pl.all().mean())
return df.select(_agg_exprs(val_cols))
# Partial collapse: keep one dimension, collapse the other
keep_dim = "forecasting_step" if collapse_vintages else "vintage_time"
dim_values = getattr(context, keep_dim, None) if context is not None else None
if dim_values is None:
if meta_cols:
return df.group_by(meta_cols, maintain_order=True).agg(_agg_exprs(val_cols))
if agg_fn == "mean":
return df.select(pl.all().mean())
return df.select(_agg_exprs(val_cols))
# Tile context values for interval data (coverage_rate repeats rows)
dim_list = dim_values.to_list()
if "coverage_rate" in df.columns:
n_rates = df["coverage_rate"].n_unique()
dim_list = dim_list * n_rates
group_cols = [keep_dim] + meta_cols
return (
df
.with_columns(pl.Series(keep_dim, dim_list))
.group_by(group_cols, maintain_order=True)
.agg(_agg_exprs(val_cols))
.sort(keep_dim)
)
def _collapse_vintage_dimension(
self,
df: pl.DataFrame,
context: ScoringContext | None,
dims: set[str],
) -> pl.DataFrame:
"""Collapse vintage_time rows via (weighted) mean.
Internal method, not an override point. Fires after _transform_scores
to produce cross-vintage aggregation. No-op when vintage_time column
is absent or vintagewise not in dims.
"""
if "vintage_time" not in df.columns or "vintagewise" not in dims:
return df
meta_names = {"coverage_rate", "forecasting_step", "time"}
meta_cols = [c for c in df.columns if c in meta_names]
val_cols = [c for c in df.columns if c not in meta_names and c != "vintage_time"]
vintage_weight = context.vintage_weight if context is not None else None
if vintage_weight is not None:
# Weighted mean across vintages
# Attach weight per vintage row
unique_vintages = df["vintage_time"].unique(maintain_order=True).to_list()
if len(vintage_weight) != len(unique_vintages):
raise ValueError(
f"vintage_weight length ({len(vintage_weight)}) does not match "
f"number of unique vintages ({len(unique_vintages)})"
)
weight_map = dict(zip(unique_vintages, vintage_weight.tolist(), strict=True))
df = df.with_columns(pl.col("vintage_time").replace_strict(weight_map, default=1.0).alias("_vw"))
if meta_cols:
result = df.group_by(meta_cols, maintain_order=True).agg([
(pl.col(c) * pl.col("_vw")).sum() / pl.col("_vw").sum() for c in val_cols
])
else:
total = df["_vw"].sum()
result = df.select([(pl.col(c) * pl.col("_vw")).sum() / total for c in val_cols])
# Unweighted mean across vintages
elif meta_cols:
result = df.group_by(meta_cols, maintain_order=True).agg([pl.col(c).mean() for c in val_cols])
else:
result = df.select([pl.col(c).mean() for c in val_cols])
return result
def _collapse_components(self, df: pl.DataFrame) -> pl.DataFrame:
"""Collapse component columns into a single score via weighted average.
Metadata columns (coverage_rate, forecasting_step, etc.) are preserved.
For panel data (``__`` prefixed columns), each group is collapsed
separately into ``{group}__score``.
"""
meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
meta_cols = [c for c in df.columns if c in meta_names]
value_df = df.select([c for c in df.columns if c not in meta_names])
if len(value_df.columns) == 0:
return df
_, panel_groups = inspect_panel(value_df)
cw = self._weight_dict(self.components)
keep = [pl.col(c) for c in meta_cols]
if len(panel_groups) > 0:
new_cols = []
for group_name, group_cols in panel_groups.items():
if cw is not None:
unprefixed = [c.split("__", 1)[1] for c in group_cols]
weights = [cw.get(n, 1.0) for n in unprefixed]
total = sum(weights)
weighted = pl.sum_horizontal([
pl.col(c) * (w / total) for c, w in zip(group_cols, weights, strict=True)
])
else:
weighted = pl.sum_horizontal([pl.col(c) for c in group_cols]) / len(group_cols)
new_cols.append(weighted.alias(f"{group_name}__score"))
return df.select(keep + new_cols)
val_cols = value_df.columns
if cw is not None:
weights = [cw.get(c, 1.0) for c in val_cols]
total = sum(weights)
score = pl.sum_horizontal([pl.col(c) * (w / total) for c, w in zip(val_cols, weights, strict=True)])
else:
score = pl.sum_horizontal([pl.col(c) for c in val_cols]) / len(val_cols)
return df.select(keep + [score.alias("score")])
def _finalize(
self,
result: pl.DataFrame,
context: ScoringContext | None,
dims: set[str],
) -> float | pl.DataFrame:
"""Attach remaining row labels and convert 1x1 results to scalar.
After all collapse steps, determines whether to return a scalar
(when all dimensions are collapsed) or a labelled DataFrame.
"""
meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
existing_labels = [c for c in result.columns if c in meta_names]
value_cols = [c for c in result.columns if c not in meta_names]
collapse_steps = "stepwise" in dims
collapse_vintages = "vintagewise" in dims
rows_collapsed = collapse_steps and collapse_vintages
# Add time labels if rows are NOT fully collapsed and no row label exists yet
has_row_label = any(c in existing_labels for c in ("forecasting_step", "vintage_time", "time"))
if not rows_collapsed and not has_row_label and context is not None and context.time_values is not None:
time_values = context.time_values
if "coverage_rate" in result.columns:
# Tile time values for each coverage rate
n_rates = result["coverage_rate"].n_unique()
if len(time_values) * n_rates == len(result):
tiled_times = time_values * n_rates
result = result.with_columns(pl.Series("time", tiled_times).cast(pl.Datetime))
# Reorder: time first, then all others
cols = ["time"] + [c for c in result.columns if c != "time"]
result = result.select(cols)
elif len(time_values) == len(result):
result = pl.concat(
[
pl.DataFrame({"time": time_values}).cast({"time": pl.Datetime}),
result,
],
how="horizontal",
)
# Attach vintage_time if available and rows are not collapsed
if (
not rows_collapsed
and "vintage_time" not in result.columns
and context is not None
and context.vintage_time is not None
):
ot_values = context.vintage_time.to_list()
if "coverage_rate" in result.columns:
n_rates = result["coverage_rate"].n_unique()
# Each vintage_time repeats n_rates times (once per coverage_rate)
if len(ot_values) * n_rates == len(result):
ot_values = ot_values * n_rates
if len(ot_values) == len(result):
# Insert after time if present, otherwise at position 0
insert_pos = result.columns.index("time") + 1 if "time" in result.columns else 0
ot_series = pl.Series("vintage_time", ot_values).cast(pl.Datetime)
cols = list(result.columns)
cols.insert(insert_pos, "vintage_time")
result = result.with_columns(ot_series).select(cols)
# Re-check for labels after potential time/vintage_time label addition
existing_labels = [c for c in result.columns if c in meta_names]
value_cols = [c for c in result.columns if c not in meta_names]
# Scalar: only when ALL spatial dimensions are collapsed
# (i.e., stepwise+vintagewise+componentwise+groupwise all in dims, and no remaining labels)
all_spatial_collapsed = "stepwise" in dims and "vintagewise" in dims and "componentwise" in dims
# A single-value coverage_rate is trivial metadata when all other dims are collapsed
if all_spatial_collapsed and existing_labels == ["coverage_rate"] and len(result) == 1:
existing_labels = []
result = result.drop("coverage_rate")
value_cols = [c for c in result.columns if c not in meta_names]
if len(result) == 1 and len(value_cols) <= 1 and not existing_labels and all_spatial_collapsed:
if len(value_cols) == 0:
return float("nan")
return float(result[value_cols[0]][0])
return result
def _pre_filter_zero_weights(
self,
y_truth: pl.DataFrame,
y_pred: pl.DataFrame,
context: ScoringContext,
time_weight: Callable | pl.DataFrame | dict | None = None,
step_weight: Callable | pl.DataFrame | dict | None = None,
vintage_weight: Callable | pl.DataFrame | dict | None = None,
) -> tuple[
pl.DataFrame,
pl.DataFrame,
ScoringContext,
np.ndarray | dict[str, np.ndarray] | None,
np.ndarray | dict[str, np.ndarray] | None,
np.ndarray | dict[str, np.ndarray] | None,
]:
"""Resolve weights and pre-filter rows with zero weight.
For group-uniform sources (dict, 1-param callable, DataFrame
without group columns), zeros are combined into a mask and matching
rows are removed from ``y_truth``, ``y_pred``, and ``context``.
For panel-aware sources, weights are resolved per-group into a
``dict[str, np.ndarray]`` but NOT used for row pre-filtering.
Returns the filtered data plus pre-resolved weights to avoid
resolving twice.
"""
_, panel_groups = inspect_panel(y_truth)
has_panel = len(panel_groups) > 0
zero_mask = np.zeros(len(y_truth), dtype=bool)
def _resolve_one(
w: Callable | pl.DataFrame | dict | None,
key_series: pl.Series,
join_column: str,
name: str,
) -> np.ndarray | dict[str, np.ndarray] | None:
"""Resolve a single weight argument into aligned arrays."""
if w is None: # pragma: no cover
return None
# Detect panel-awareness
is_panel_aware = False
if has_panel and callable(w) and not isinstance(w, dict):
n_params = validate_callable_signature(w)
is_panel_aware = n_params == 2
elif has_panel and isinstance(w, pl.DataFrame):
# Check if it has group-specific weight columns
for g in panel_groups:
if f"{g}_weight" in w.columns:
is_panel_aware = True
break
if is_panel_aware and has_panel:
# Resolve per-group, no row pre-filtering
result_dict: dict[str, np.ndarray] = {}
for group_name in panel_groups:
arr = resolve_weight_to_array(w, key_series, join_column, group_name)
result_dict[group_name] = arr
return result_dict
# Group-uniform: resolve once, track zeros for filtering
group_name = next(iter(panel_groups)) if has_panel else None
arr = resolve_weight_to_array(w, key_series, join_column, group_name)
return arr
# Build key series for each weight type
time_series = pl.Series("time", context.time_values) if context.time_values is not None else None
step_series = context.forecasting_step
vintage_series = context.vintage_time
# Resolve time_weight
tw_resolved = None
if time_weight is not None:
if time_series is None: # pragma: no cover
raise ValueError("time_values unavailable in context but time_weight was provided")
tw_resolved = _resolve_one(time_weight, time_series, "time", "time_weight")
if isinstance(tw_resolved, np.ndarray):
zero_mask |= tw_resolved == 0.0
# Resolve step_weight (silently ignored when forecasting_step unavailable)
sw_resolved = None
if step_weight is not None and step_series is not None:
sw_resolved = _resolve_one(step_weight, step_series, "forecasting_step", "step_weight")
if isinstance(sw_resolved, np.ndarray):
zero_mask |= sw_resolved == 0.0
# Resolve vintage_weight (silently ignored when vintage_time unavailable)
vw_resolved = None
if vintage_weight is not None and vintage_series is not None:
vw_resolved = _resolve_one(vintage_weight, vintage_series, "vintage_time", "vintage_weight")
if isinstance(vw_resolved, np.ndarray):
zero_mask |= vw_resolved == 0.0
# Apply zero-mask filter
if np.any(zero_mask):
keep = ~zero_mask
if not np.any(keep):
raise ValueError(
"All rows have zero weight after pre-filtering. "
"Check that weight dicts/callables assign non-zero weights to at least some data."
)
from yohou.metrics._context import ScoringContext as _ScoringContext # noqa: PLC0415
y_truth = y_truth.filter(keep)
y_pred = y_pred.filter(keep)
context = _ScoringContext(
time_values=[t for t, m in zip(context.time_values, keep.tolist(), strict=True) if m],
vintage_time=(context.vintage_time.filter(keep) if context.vintage_time is not None else None),
forecasting_step=(
context.forecasting_step.filter(keep) if context.forecasting_step is not None else None
),
)
# Slice resolved weight arrays to match
if isinstance(tw_resolved, np.ndarray):
tw_resolved = tw_resolved[keep] # ty: ignore[invalid-argument-type]
elif isinstance(tw_resolved, dict):
tw_resolved = {g: a[keep] for g, a in tw_resolved.items()}
if isinstance(sw_resolved, np.ndarray):
sw_resolved = sw_resolved[keep] # ty: ignore[invalid-argument-type]
elif isinstance(sw_resolved, dict):
sw_resolved = {g: a[keep] for g, a in sw_resolved.items()}
if isinstance(vw_resolved, np.ndarray):
vw_resolved = vw_resolved[keep] # ty: ignore[invalid-argument-type]
elif isinstance(vw_resolved, dict):
vw_resolved = {g: a[keep] for g, a in vw_resolved.items()}
# Derive per-unique-vintage weights and store in context
if isinstance(vw_resolved, dict) and context.vintage_time is not None:
# Panel-aware vintage_weight: all groups share the same vintage_time
# axis, so use the first group's weights for cross-vintage weighting.
first_group_weights = next(iter(vw_resolved.values()))
context = self._set_vintage_weight_on_context(context, first_group_weights) # ty: ignore[invalid-argument-type]
vw_resolved = None
elif isinstance(vw_resolved, np.ndarray) and context.vintage_time is not None:
context = self._set_vintage_weight_on_context(context, vw_resolved)
# vintage_weight handled at cross-vintage level; don't pass to _apply_weights
vw_resolved = None
return y_truth, y_pred, context, tw_resolved, sw_resolved, vw_resolved
@staticmethod
def _set_vintage_weight_on_context(
context: ScoringContext,
vw_array: np.ndarray,
) -> ScoringContext:
"""Attach per-unique-vintage weights to a scoring context."""
vt_df = pl.DataFrame({
"vintage_time": context.vintage_time,
"_vw": vw_array,
})
per_vintage = vt_df.group_by("vintage_time", maintain_order=True).agg(pl.col("_vw").first())
return ScoringContext(
time_values=context.time_values,
vintage_time=context.vintage_time,
forecasting_step=context.forecasting_step,
vintage_weight=per_vintage["_vw"].to_numpy(),
)
def _resolve_vintage_weight_to_context(
self,
context: ScoringContext,
vintage_weight: Callable | pl.DataFrame | dict | None,
) -> ScoringContext:
"""Resolve a vintage_weight argument into context.vintage_weight.
Lightweight alternative to ``_pre_filter_zero_weights`` for scorers
that only need vintage_weight resolution (no time/step weights).
"""
if vintage_weight is None:
return context
if context.vintage_time is None:
return context
vw_resolved = resolve_weight_to_array(
vintage_weight,
context.vintage_time,
"vintage_time",
)
if isinstance(vw_resolved, dict):
raise TypeError(
"Panel-aware (dict) vintage_weight is not supported. "
"Use a flat callable, DataFrame, or dict keyed on vintage_time values."
)
return self._set_vintage_weight_on_context(context, vw_resolved)
def _apply_weights(
self,
scores: pl.DataFrame,
time_weight_resolved: np.ndarray | dict[str, np.ndarray] | None,
step_weight_resolved: np.ndarray | dict[str, np.ndarray] | None,
n_rates: int = 1,
) -> pl.DataFrame:
"""Apply pre-resolved weights to score DataFrame.
Two-stage normalization: (1) normalize and apply time_weight,
(2) apply step_weight (normalized).
Vintage weights are not applied at row level; they are handled
during cross-vintage aggregation in ``_collapse_vintage_dimension``.
"""
_, panel_groups = inspect_panel(scores)
def _apply_array(df: pl.DataFrame, w: np.ndarray, cols: list[str]) -> pl.DataFrame:
"""Multiply columns by weight array."""
return df.with_columns([(pl.col(c) * w).alias(c) for c in cols])
def _apply_one_weight(
df: pl.DataFrame,
w_resolved: np.ndarray | dict[str, np.ndarray],
value_cols: list[str],
) -> pl.DataFrame:
"""Apply a single normalized weight (array or dict) to scores."""
if isinstance(w_resolved, dict):
for group_name, group_arr in w_resolved.items():
normed = normalize_weights(group_arr) # ty: ignore[invalid-argument-type]
tiled = np.tile(normed, n_rates) if n_rates > 1 else normed
group_cols = [c for c in panel_groups.get(group_name, []) if c in value_cols] # ty: ignore[no-matching-overload]
if group_cols:
df = _apply_array(df, tiled, group_cols)
else:
normed = normalize_weights(w_resolved)
tiled = np.tile(normed, n_rates) if n_rates > 1 else normed
df = _apply_array(df, tiled, value_cols)
return df
value_cols = [c for c in scores.columns if c != "coverage_rate"]
# Stage 1: time_weight (normalized independently)
if time_weight_resolved is not None:
scores = _apply_one_weight(scores, time_weight_resolved, value_cols)
# Stage 2: step_weight (normalized independently)
if step_weight_resolved is not None:
scores = _apply_one_weight(scores, step_weight_resolved, value_cols)
return scores
def _validate_parameters(
self,
y_train: pl.DataFrame | None = None,
aggregation_method: list[str] | str | None = None,
valid_aggregation_methods: set[str] | None = None,
) -> None:
"""Validate scorer parameters.
Parameters
----------
y_train : pl.DataFrame or None
Training data to validate against. If None, only type validation is performed.
aggregation_method : list of str or str or None
Aggregation method to validate. If None, aggregation validation is skipped.
valid_aggregation_methods : set of str or None
Set of valid aggregation method strings. Required if aggregation_method is provided.
Raises
------
ValueError
If validation fails.
"""
# Validate aggregation_method if provided
if aggregation_method is not None:
if valid_aggregation_methods is None:
raise ValueError("valid_aggregation_methods must be provided when validating aggregation_method")
# Handle single string
if isinstance(aggregation_method, str):
# "all" is a special value that means aggregate across all dimensions
if aggregation_method != "all" and aggregation_method not in valid_aggregation_methods:
raise ValueError(
f"Invalid aggregation_method '{aggregation_method}'. "
f"Valid options are: 'all' or {sorted(valid_aggregation_methods)}"
)
# Handle list
elif isinstance(aggregation_method, list):
# Check all elements are strings
if not all(isinstance(method, str) for method in aggregation_method):
raise ValueError(f"All elements in aggregation_method must be strings, got: {aggregation_method}")
if len(aggregation_method) == 0:
raise ValueError(
f"aggregation_method list cannot be empty. "
f"Use 'all' or provide at least one method: {sorted(valid_aggregation_methods)}"
)
for method in aggregation_method:
if method not in valid_aggregation_methods:
raise ValueError(
f"Invalid aggregation_method '{method}' in list. "
f"Valid list elements are: {sorted(valid_aggregation_methods)}"
)
else:
raise ValueError(
f"aggregation_method must be a string or list of strings, got {type(aggregation_method)}"
)
# Validate groups type (list or dict)
group_filter = self._filter_keys(self.groups)
if group_filter is not None:
if not all(isinstance(name, str) for name in group_filter):
raise ValueError("All group names must be strings")
if len(group_filter) == 0:
raise ValueError("groups cannot be empty")
# Validate components type (list or dict)
comp_filter = self._filter_keys(self.components)
if comp_filter is not None:
if not all(isinstance(name, str) for name in comp_filter):
raise ValueError("All component names must be strings")
if len(comp_filter) == 0:
raise ValueError("components cannot be empty")
# If y_train is provided, validate against actual data
if y_train is not None:
_, panel_groups = inspect_panel(y_train)
available_groups = set(panel_groups.keys())
# Validate groups exist in data
if group_filter is not None:
if len(available_groups) == 0:
# No panel data, but user specified groups
raise ValueError(
f"groups specified but data contains no panel groups. "
f"Data has only global columns: {sorted(set(y_train.columns) - {'time'})}"
)
requested_groups = set(group_filter)
missing_groups = requested_groups - available_groups
if missing_groups:
raise ValueError(
f"Requested groups {sorted(missing_groups)} not found in data. "
f"Available groups: {sorted(available_groups)}"
)
# Validate components exist in data
if comp_filter is not None:
if len(panel_groups) > 0:
# Panel data: check unprefixed column names
available_components = set()
for group_cols in panel_groups.values():
for col in group_cols:
# Extract unprefixed column name
available_components.add(col.split("__", 1)[1])
else:
# Global data: check column names directly
available_components = set(y_train.columns) - {"time"}
requested_components = set(comp_filter)
missing_components = requested_components - available_components
if missing_components:
raise ValueError(
f"Requested components {sorted(missing_components)} "
f"not found in data. Available components: {sorted(available_components)}"
)
@staticmethod
def _normalize_agg_methods(
aggregation_method: list[str] | str,
include_coveragewise: bool = False,
) -> set[str]:
"""Normalize aggregation_method to a set of orthogonal mode names.
Expands ``"all"`` to the full set of modes.
Parameters
----------
aggregation_method : list of str or str
Raw aggregation_method value from the scorer.
include_coveragewise : bool, default=False
Whether to include ``"coveragewise"`` in the ``"all"`` expansion
(interval scorers only).
Returns
-------
set of str
Normalized set of aggregation mode names.
"""
if aggregation_method == "all":
modes = {"stepwise", "vintagewise", "componentwise", "groupwise"}
if include_coveragewise:
modes.add("coveragewise")
return modes
modes = {aggregation_method} if isinstance(aggregation_method, str) else set(aggregation_method)
return modes
def _transform_scores(self, df: pl.DataFrame) -> pl.DataFrame:
"""Apply element-wise transform to aggregated value columns.
Override in subclasses needing post-aggregation transforms
(e.g. sqrt for RMSE). Called inside ``_aggregate_per_vintage_scores``
after component/group collapse, before vintage collapse.
Receives only value columns (no meta columns).
Parameters
----------
df : pl.DataFrame
DataFrame with value columns only.
Returns
-------
pl.DataFrame
Transformed DataFrame.
"""
return df
@staticmethod
def _reject_weights(**params: object) -> None:
"""Raise if any weight kwargs are passed to a scorer that doesn't support them."""
weight_keys = [
k for k in params if k in {"time_weight", "step_weight", "vintage_weight"} and params[k] is not None
]
if weight_keys:
raise TypeError(
f"This scorer does not support sample weights, "
f"but received: {', '.join(sorted(weight_keys))}. "
f"Remove the weight arguments or use a scorer that supports weighting."
)
def _aggregate_per_vintage_scores(
self,
result: pl.DataFrame,
context: ScoringContext | None,
) -> float | pl.DataFrame:
"""Shared pipeline tail for all scorer families.
Input: a per-vintage DataFrame (one row per vintage with
``vintage_time`` column for multi-vintage, or single-row without
it for single-vintage). Pipeline:
components → groups → strip meta → _transform_scores → reattach meta
→ _collapse_vintage_dimension → finalize → rename.
"""
dims = self._normalize_agg_methods(self.aggregation_method) # ty: ignore[unresolved-attribute]
# Input validation: multi-vintage context but no vintage_time column.
# Only fires when both stepwise and vintagewise are requested, because
# in that case _collapse_rows preserves vintage_time for per-vintage-first
# aggregation and _collapse_vintage_dimension handles the final collapse.
# When only vintagewise is requested, _collapse_rows already collapses
# vintages by grouping by forecasting_step, so vintage_time is gone.
if (
"stepwise" in dims
and "vintagewise" in dims
and context is not None
and context.vintage_time is not None
and context.vintage_time.n_unique() > 1
and "vintage_time" not in result.columns
):
raise ValueError(
"Context has multiple vintages but input DataFrame is missing "
"'vintage_time' column. Attach vintage_time before calling "
"_aggregate_per_vintage_scores."
)
if "componentwise" in dims:
result = self._collapse_components(result)
if "groupwise" in dims:
result = self._collapse_groups(result)
# Strip meta → _transform_scores → reattach meta
meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
meta_cols = [c for c in result.columns if c in meta_names]
val_cols = [c for c in result.columns if c not in meta_names]
if val_cols:
transformed = self._transform_scores(result.select(val_cols))
result = pl.concat([result.select(meta_cols), transformed], how="horizontal") if meta_cols else transformed
# Collapse vintage dimension (weighted/unweighted mean across vintages)
result = self._collapse_vintage_dimension(result, context, dims)
# Finalize: attach labels, convert to scalar
finalized = self._finalize(result, context, dims)
if isinstance(finalized, pl.DataFrame):
finalized = self._rename_metric_columns(finalized)
return finalized
def _map_per_vintage(
self,
y_truth: pl.DataFrame,
y_pred: pl.DataFrame,
context: ScoringContext | None,
compute_fn: Callable[[pl.DataFrame, pl.DataFrame], pl.DataFrame | None],
) -> pl.DataFrame:
"""Group data by vintage and apply compute_fn per group.
Utility for Pattern 2 scorers (whole-column computation).
Returns a per-vintage DataFrame with ``vintage_time`` column
for multi-vintage, or a single-row DataFrame (no vintage_time)
for single-vintage.
Parameters
----------
y_truth : pl.DataFrame
Ground truth (time column removed).
y_pred : pl.DataFrame
Predictions (time column removed).
context : ScoringContext or None
Scoring context.
compute_fn : callable
``(y_truth_slice, y_pred_slice) -> pl.DataFrame | None``.
Returns single-row DataFrame per vintage or None to skip.
Returns
-------
pl.DataFrame
Per-vintage results concatenated.
Raises
------
ValueError
If all vintages are skipped.
"""
vintage_time = context.vintage_time if context is not None else None
# Single-vintage fallthrough
if vintage_time is None or vintage_time.n_unique() <= 1:
result = compute_fn(y_truth, y_pred)
if result is None:
raise ValueError("All vintage groups were skipped. No valid data to compute the metric.")
return result
# Multi-vintage: group by vintage
vt_values = vintage_time.to_list()
unique_vintages = vintage_time.unique(maintain_order=True).to_list()
results: list[pl.DataFrame] = []
for vt in unique_vintages:
mask = [v == vt for v in vt_values]
yt_slice = y_truth.filter(mask)
yp_slice = y_pred.filter(mask)
row = compute_fn(yt_slice, yp_slice)
if row is None:
continue
row = row.with_columns(pl.lit(vt).alias("vintage_time").cast(pl.Datetime))
results.append(row)
if not results:
raise ValueError("All vintage groups were skipped. No valid data to compute the metric.")
return pl.concat(results, how="diagonal_relaxed")
def _aggregate_scores(
self, raw_scores: pl.DataFrame, context: ScoringContext | None = None
) -> float | pl.DataFrame:
"""Apply sequential aggregation pipeline to raw scores.
Pipeline: coverage → rows (per-vintage) → _aggregate_per_vintage_scores.
Parameters
----------
raw_scores : pl.DataFrame
DataFrame with per-timestep (rows) per-component (columns) scores.
May contain a ``"coverage_rate"`` column for interval scorers.
context : ScoringContext or None, default=None
Scoring context with time values and metadata.
Returns
-------
float or pl.DataFrame
Aggregated scores based on aggregation_method.
"""
has_coverage_rate = "coverage_rate" in raw_scores.columns
dims = self._normalize_agg_methods(
self.aggregation_method, # ty: ignore[unresolved-attribute]
include_coveragewise=has_coverage_rate,
)
result = raw_scores
# 1. Collapse coverage rates (interval only)
if "coveragewise" in dims:
result = self._collapse_coverage_rates(result)
# 2. Collapse row dimensions (steps and/or vintages)
result = self._collapse_rows(result, context, dims)
# 3. Delegate tail to shared pipeline
return self._aggregate_per_vintage_scores(result, context)
@abc.abstractmethod
def score(
self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, /, **params
) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
"""Compute the metric score.
Parameters
----------
y_truth : pl.DataFrame
Ground truth time series to score against. Must have a
``"time"`` column and one or more numeric value columns.
y_pred : pl.DataFrame
Predicted time series to evaluate. Must have ``"vintage_time"``
and ``"time"`` columns and columns matching ``y_truth``.
**params : dict
Metadata to route to nested estimators.
Returns
-------
pl.DataFrame or float or dict
Aggregated score(s). A ``float`` when
``aggregation_method="all"``, a ``pl.DataFrame`` for partial
aggregations, or a ``dict`` mapping coverage rates to scores
for interval scorers.
Raises
------
sklearn.exceptions.NotFittedError
If the scorer has not been fitted yet (when calibration is
required).
ValueError
If ``y_truth`` and ``y_pred`` have mismatched columns or
incompatible shapes.
"""
def _rename_metric_columns(self, result: pl.DataFrame) -> pl.DataFrame:
"""Rename aggregation output columns to use the metric name.
Replaces ``"score"`` with ``_metric_name`` and ``"__score"``
suffixes with ``"__<_metric_name>"``.
Parameters
----------
result : pl.DataFrame
DataFrame from ``_aggregate_scores`` that may contain
``"score"`` or ``"*__score"`` columns.
Returns
-------
pl.DataFrame
DataFrame with columns renamed to use the metric name.
"""
metric_name: str = getattr(self, "_metric_name", "score")
rename_map: dict[str, str] = {}
if "score" in result.columns:
rename_map["score"] = metric_name
for col in result.columns:
if col.endswith("__score"):
rename_map[col] = col.replace("__score", f"__{metric_name}")
if rename_map:
result = result.rename(rename_map)
return result
def __call__(
self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, **params
) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
"""Compute score using callable interface.
Enables using scorers as functions: scorer(y_truth, y_pred).
Parameters
----------
y_truth : pl.DataFrame
Ground truth values.
y_pred : pl.DataFrame
Predicted values.
**params : dict
Metadata to route to nested estimators.
Returns
-------
pl.DataFrame or float or dict
Metric score.
"""
return self.score(y_truth, y_pred, **params)