Skip to content

BaseScorer

yohou.metrics.base.BaseScorer

Bases: BaseEstimator

Base class for all forecasting metrics.

Defines the interface for scoring forecast quality. All scorers must implement the score method and can optionally override fit for metrics that require training data statistics.

Parameters

Name Type Description Default
groups list of str, dict of str to float, or None

Panel group filter (list) or filter with weights (dict). If None, all panel groups are included with equal weight.

None
components list of str, dict of str to float, or None

Component filter (list) or filter with weights (dict). If None, all components are included with equal weight.

None

Notes

The aggregation_method parameter (on subclasses) controls which dimensions are collapsed when computing scores. Orthogonal modes: "stepwise", "vintagewise", "componentwise", "groupwise", "coveragewise" (interval only), or "all".

See Also

Source Code

Show/Hide source
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
class BaseScorer(BaseEstimator, metaclass=abc.ABCMeta):
    """Base class for all forecasting metrics.

    Defines the interface for scoring forecast quality. All scorers must implement
    the `score` method and can optionally override `fit` for metrics
    that require training data statistics.

    Parameters
    ----------
    groups : list of str, dict of str to float, or None, default=None
        Panel group filter (list) or filter with weights (dict). If None,
        all panel groups are included with equal weight.
    components : list of str, dict of str to float, or None, default=None
        Component filter (list) or filter with weights (dict). If None,
        all components are included with equal weight.

    Notes
    -----
    The ``aggregation_method`` parameter (on subclasses) controls which
    dimensions are collapsed when computing scores. Orthogonal modes:
    ``"stepwise"``, ``"vintagewise"``, ``"componentwise"``,
    ``"groupwise"``, ``"coveragewise"`` (interval only), or ``"all"``.

    See Also
    --------
    - [`BasePointScorer`][yohou.metrics.base.BasePointScorer] : Base class for point-prediction metrics.
    - [`BaseIntervalScorer`][yohou.metrics.base.BaseIntervalScorer] : Base class for interval-prediction metrics.
    - [`BaseConformityScorer`][yohou.metrics.conformity_base.BaseConformityScorer] : Base class for conformity scorers.

    """

    _lower_is_better: bool = True

    _parameter_constraints: dict = {
        "groups": [list, dict, None],
        "components": [list, dict, None],
    }

    def __init__(
        self,
        groups: list[str] | dict[str, float] | None = None,
        components: list[str] | dict[str, float] | None = None,
    ):
        self.groups = groups
        self.components = components

    @staticmethod
    def _filter_keys(param: list | dict | None) -> list | None:
        """Extract filter list from a polymorphic param."""
        if isinstance(param, dict):
            return list(param.keys())
        return param

    @staticmethod
    def _weight_dict(param: list | dict | None) -> dict | None:
        """Extract weight dict from a polymorphic param."""
        if isinstance(param, dict):
            return param
        return None

    @property
    def lower_is_better(self) -> bool:
        """Whether lower scores indicate better performance."""
        return self._lower_is_better

    def __sklearn_tags__(self) -> Tags:
        """Get estimator tags.

        Returns
        -------
        Tags
            Estimator tags with scorer-specific attributes.

        """
        tags = Tags(estimator_type="scorer", requires_fit=False)

        # Subclasses set prediction_type in their __sklearn_tags__() method
        # Most scorers don't require calibration (fit is optional)
        assert tags.scorer_tags is not None
        tags.scorer_tags.requires_calibration = False
        tags.scorer_tags.lower_is_better = self._lower_is_better

        return tags

    @_fit_context(prefer_skip_nested_validation=True)
    def fit(self, y_train: pl.DataFrame, *, forecaster=None, **params) -> BaseScorer:
        """Fit the scorer on training data.

        Validates ``groups`` and ``component_names`` against
        training data.  Stores training data statistics for scaled metrics
        (e.g., MASE).  Subclasses should override to add type-specific
        parameter validation.

        Parameters
        ----------
        y_train : pl.DataFrame
            Training target time series with a ``"time"`` column and one or
            more numeric value columns.
        forecaster : BaseForecaster or None, default=None
            If provided, metadata is extracted directly from the fitted
            forecaster (``interval_``, ``groups_``, ``forecaster_horizon_``)
            instead of being re-inferred from ``y_train``.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        self
            The fitted scorer instance.

        Raises
        ------
        ValueError
            If ``groups`` or ``component_names`` contain names not
            present in ``y_train``.

        """
        # Validate base parameters (groups, component_names)
        self._validate_parameters(y_train=y_train)

        # Validate input structure without aligning (single dataframe)
        validate_scorer_data(self, y_true=y_train, y_pred=None, reset=True)

        # Infer groups_ from y_train panel structure
        _, panel_groups = inspect_panel(y_train)
        self.groups_ = list(panel_groups.keys()) if panel_groups else None

        if forecaster is not None:
            # Extract metadata from fitted forecaster
            check_is_fitted(forecaster)
            self.interval_ = forecaster.interval_
            self.forecaster_horizon_ = forecaster.fit_forecasting_horizon_
        # Infer interval from training data time column
        elif "time" in y_train.columns and len(y_train) >= 2:
            self.interval_ = check_interval_consistency(y_train)
        else:
            self.interval_ = None

        # Mark as fitted
        self._is_fitted = True

        return self

    def _collapse_groups(self, df: pl.DataFrame) -> pl.DataFrame:
        """Collapse panel groups via weighted average.

        For each component (suffix after ``__``), computes a weighted average
        across all panel groups containing that component. Non-panel data
        is returned unchanged.

        """
        # Identify metadata columns (coverage_rate, context dims) to preserve
        meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
        meta_cols = [c for c in df.columns if c in meta_names]
        value_df = df.select([c for c in df.columns if c not in meta_names])

        _, panel_groups = inspect_panel(value_df)
        if len(panel_groups) == 0:
            return df

        # component -> [(group_name, column_name)]
        components: dict[str, list[tuple[str, str]]] = {}
        for group_name, group_cols in panel_groups.items():
            for col in group_cols:
                component = col.split("__", 1)[1]
                if component not in components:
                    components[component] = []
                components[component].append((group_name, col))

        weights: dict[str, float] = {}
        for group_name in panel_groups:
            gw = self._weight_dict(self.groups)
            weights[group_name] = gw.get(group_name, 1.0) if gw else 1.0

        exprs: list[pl.Expr] = [pl.col(c) for c in meta_cols]
        for component, group_cols in components.items():
            total_weight = sum(weights[gn] for gn, _ in group_cols)
            if total_weight == 0:
                raise ValueError("Total panel group weight is zero")
            weighted_terms = [pl.col(col_name) * (weights[gn] / total_weight) for gn, col_name in group_cols]
            exprs.append(pl.sum_horizontal(weighted_terms).alias(component))

        return df.select(exprs)

    def _collapse_coverage_rates(self, df: pl.DataFrame) -> pl.DataFrame:
        """Collapse coverage_rate dimension via weighted average.

        Rows within each coverage rate block are assumed to be in the same
        order (one row per timestep). Collapsing averages across rates for
        each timestep, preserving row dimension.
        """
        if "coverage_rate" not in df.columns:
            return df

        if len(df) == 0:
            return df.drop("coverage_rate")

        meta_names = {"forecasting_step", "vintage_time", "time"}
        meta_cols = [c for c in df.columns if c in meta_names]
        val_cols = [c for c in df.columns if c not in meta_names and c != "coverage_rate"]

        n_rates = df["coverage_rate"].n_unique()
        n_rows_per_rate = len(df) // n_rates

        # Add row index to identify timesteps across rate blocks
        row_idx = list(range(n_rows_per_rate)) * n_rates
        df = df.with_columns(pl.Series("_row_idx", row_idx))
        group_cols = ["_row_idx"] + meta_cols

        cw = self._weight_dict(getattr(self, "coverage_rates", None))

        if cw is not None:
            df = df.with_columns(
                pl
                .col("coverage_rate")
                .replace_strict(
                    {r: cw.get(r, 1.0) for r in df["coverage_rate"].unique().to_list()},
                    default=1.0,
                )
                .alias("_cw")
            )
            result = df.group_by(group_cols, maintain_order=True).agg([
                (pl.col(c) * pl.col("_cw")).sum() / pl.col("_cw").sum() for c in val_cols
            ])
        else:
            result = df.group_by(group_cols, maintain_order=True).agg([pl.col(c).mean() for c in val_cols])

        return result.sort("_row_idx").drop("_row_idx")

    def _collapse_rows(
        self,
        df: pl.DataFrame,
        context: ScoringContext | None,
        dims: set[str],
    ) -> pl.DataFrame:
        """Collapse row dimensions (stepwise and/or vintagewise) using mean."""
        return self._collapse_rows_with(df, context, dims, agg_fn="mean")

    def _collapse_rows_with(
        self,
        df: pl.DataFrame,
        context: ScoringContext | None,
        dims: set[str],
        agg_fn: str = "mean",
    ) -> pl.DataFrame:
        """Collapse row dimensions using the specified aggregation function.

        Parameters
        ----------
        df : pl.DataFrame
            DataFrame with per-row scores.
        context : ScoringContext or None
            Scoring context with time values and metadata.
        dims : set of str
            Aggregation dimensions.
        agg_fn : str, default="mean"
            Polars aggregation function name ("mean", "sum", "max").

        """
        collapse_steps = "stepwise" in dims
        collapse_vintages = "vintagewise" in dims

        if not collapse_steps and not collapse_vintages:
            return df

        meta_names = {"coverage_rate"}
        meta_cols = [c for c in df.columns if c in meta_names]
        val_cols = [c for c in df.columns if c not in meta_names]

        def _agg_exprs(cols: list[str]) -> list[pl.Expr]:
            """Build aggregation expressions for the given columns."""
            return [getattr(pl.col(c), agg_fn)() for c in cols]

        if collapse_steps and collapse_vintages:
            # Per-vintage-first: group by vintage_time within each vintage,
            # then return per-vintage rows (vintage collapse happens later).
            vintage_time = getattr(context, "vintage_time", None) if context is not None else None
            if vintage_time is not None and vintage_time.n_unique() > 1:
                vt_list = vintage_time.to_list()
                if "coverage_rate" in df.columns:
                    n_rates = df["coverage_rate"].n_unique()
                    vt_list = vt_list * n_rates
                group_cols = ["vintage_time"] + meta_cols
                return (
                    df
                    .with_columns(pl.Series("vintage_time", vt_list))
                    .group_by(group_cols, maintain_order=True)
                    .agg(_agg_exprs(val_cols))
                    .sort("vintage_time")
                )

            if meta_cols:
                return df.group_by(meta_cols, maintain_order=True).agg(_agg_exprs(val_cols))
            if agg_fn == "mean":
                return df.select(pl.all().mean())
            return df.select(_agg_exprs(val_cols))

        # Partial collapse: keep one dimension, collapse the other
        keep_dim = "forecasting_step" if collapse_vintages else "vintage_time"
        dim_values = getattr(context, keep_dim, None) if context is not None else None

        if dim_values is None:
            if meta_cols:
                return df.group_by(meta_cols, maintain_order=True).agg(_agg_exprs(val_cols))
            if agg_fn == "mean":
                return df.select(pl.all().mean())
            return df.select(_agg_exprs(val_cols))

        # Tile context values for interval data (coverage_rate repeats rows)
        dim_list = dim_values.to_list()
        if "coverage_rate" in df.columns:
            n_rates = df["coverage_rate"].n_unique()
            dim_list = dim_list * n_rates

        group_cols = [keep_dim] + meta_cols

        return (
            df
            .with_columns(pl.Series(keep_dim, dim_list))
            .group_by(group_cols, maintain_order=True)
            .agg(_agg_exprs(val_cols))
            .sort(keep_dim)
        )

    def _collapse_vintage_dimension(
        self,
        df: pl.DataFrame,
        context: ScoringContext | None,
        dims: set[str],
    ) -> pl.DataFrame:
        """Collapse vintage_time rows via (weighted) mean.

        Internal method, not an override point. Fires after _transform_scores
        to produce cross-vintage aggregation. No-op when vintage_time column
        is absent or vintagewise not in dims.
        """
        if "vintage_time" not in df.columns or "vintagewise" not in dims:
            return df

        meta_names = {"coverage_rate", "forecasting_step", "time"}
        meta_cols = [c for c in df.columns if c in meta_names]
        val_cols = [c for c in df.columns if c not in meta_names and c != "vintage_time"]

        vintage_weight = context.vintage_weight if context is not None else None

        if vintage_weight is not None:
            # Weighted mean across vintages
            # Attach weight per vintage row
            unique_vintages = df["vintage_time"].unique(maintain_order=True).to_list()
            if len(vintage_weight) != len(unique_vintages):
                raise ValueError(
                    f"vintage_weight length ({len(vintage_weight)}) does not match "
                    f"number of unique vintages ({len(unique_vintages)})"
                )
            weight_map = dict(zip(unique_vintages, vintage_weight.tolist(), strict=True))
            df = df.with_columns(pl.col("vintage_time").replace_strict(weight_map, default=1.0).alias("_vw"))
            if meta_cols:
                result = df.group_by(meta_cols, maintain_order=True).agg([
                    (pl.col(c) * pl.col("_vw")).sum() / pl.col("_vw").sum() for c in val_cols
                ])
            else:
                total = df["_vw"].sum()
                result = df.select([(pl.col(c) * pl.col("_vw")).sum() / total for c in val_cols])
        # Unweighted mean across vintages
        elif meta_cols:
            result = df.group_by(meta_cols, maintain_order=True).agg([pl.col(c).mean() for c in val_cols])
        else:
            result = df.select([pl.col(c).mean() for c in val_cols])

        return result

    def _collapse_components(self, df: pl.DataFrame) -> pl.DataFrame:
        """Collapse component columns into a single score via weighted average.

        Metadata columns (coverage_rate, forecasting_step, etc.) are preserved.
        For panel data (``__`` prefixed columns), each group is collapsed
        separately into ``{group}__score``.
        """
        meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
        meta_cols = [c for c in df.columns if c in meta_names]
        value_df = df.select([c for c in df.columns if c not in meta_names])

        if len(value_df.columns) == 0:
            return df

        _, panel_groups = inspect_panel(value_df)
        cw = self._weight_dict(self.components)

        keep = [pl.col(c) for c in meta_cols]

        if len(panel_groups) > 0:
            new_cols = []
            for group_name, group_cols in panel_groups.items():
                if cw is not None:
                    unprefixed = [c.split("__", 1)[1] for c in group_cols]
                    weights = [cw.get(n, 1.0) for n in unprefixed]
                    total = sum(weights)
                    weighted = pl.sum_horizontal([
                        pl.col(c) * (w / total) for c, w in zip(group_cols, weights, strict=True)
                    ])
                else:
                    weighted = pl.sum_horizontal([pl.col(c) for c in group_cols]) / len(group_cols)
                new_cols.append(weighted.alias(f"{group_name}__score"))
            return df.select(keep + new_cols)

        val_cols = value_df.columns
        if cw is not None:
            weights = [cw.get(c, 1.0) for c in val_cols]
            total = sum(weights)
            score = pl.sum_horizontal([pl.col(c) * (w / total) for c, w in zip(val_cols, weights, strict=True)])
        else:
            score = pl.sum_horizontal([pl.col(c) for c in val_cols]) / len(val_cols)

        return df.select(keep + [score.alias("score")])

    def _finalize(
        self,
        result: pl.DataFrame,
        context: ScoringContext | None,
        dims: set[str],
    ) -> float | pl.DataFrame:
        """Attach remaining row labels and convert 1x1 results to scalar.

        After all collapse steps, determines whether to return a scalar
        (when all dimensions are collapsed) or a labelled DataFrame.
        """
        meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
        existing_labels = [c for c in result.columns if c in meta_names]
        value_cols = [c for c in result.columns if c not in meta_names]

        collapse_steps = "stepwise" in dims
        collapse_vintages = "vintagewise" in dims
        rows_collapsed = collapse_steps and collapse_vintages

        # Add time labels if rows are NOT fully collapsed and no row label exists yet
        has_row_label = any(c in existing_labels for c in ("forecasting_step", "vintage_time", "time"))
        if not rows_collapsed and not has_row_label and context is not None and context.time_values is not None:
            time_values = context.time_values
            if "coverage_rate" in result.columns:
                # Tile time values for each coverage rate
                n_rates = result["coverage_rate"].n_unique()
                if len(time_values) * n_rates == len(result):
                    tiled_times = time_values * n_rates
                    result = result.with_columns(pl.Series("time", tiled_times).cast(pl.Datetime))
                    # Reorder: time first, then all others
                    cols = ["time"] + [c for c in result.columns if c != "time"]
                    result = result.select(cols)
            elif len(time_values) == len(result):
                result = pl.concat(
                    [
                        pl.DataFrame({"time": time_values}).cast({"time": pl.Datetime}),
                        result,
                    ],
                    how="horizontal",
                )

        # Attach vintage_time if available and rows are not collapsed
        if (
            not rows_collapsed
            and "vintage_time" not in result.columns
            and context is not None
            and context.vintage_time is not None
        ):
            ot_values = context.vintage_time.to_list()
            if "coverage_rate" in result.columns:
                n_rates = result["coverage_rate"].n_unique()
                # Each vintage_time repeats n_rates times (once per coverage_rate)
                if len(ot_values) * n_rates == len(result):
                    ot_values = ot_values * n_rates
            if len(ot_values) == len(result):
                # Insert after time if present, otherwise at position 0
                insert_pos = result.columns.index("time") + 1 if "time" in result.columns else 0
                ot_series = pl.Series("vintage_time", ot_values).cast(pl.Datetime)
                cols = list(result.columns)
                cols.insert(insert_pos, "vintage_time")
                result = result.with_columns(ot_series).select(cols)

        # Re-check for labels after potential time/vintage_time label addition
        existing_labels = [c for c in result.columns if c in meta_names]
        value_cols = [c for c in result.columns if c not in meta_names]

        # Scalar: only when ALL spatial dimensions are collapsed
        # (i.e., stepwise+vintagewise+componentwise+groupwise all in dims, and no remaining labels)
        all_spatial_collapsed = "stepwise" in dims and "vintagewise" in dims and "componentwise" in dims
        # A single-value coverage_rate is trivial metadata when all other dims are collapsed
        if all_spatial_collapsed and existing_labels == ["coverage_rate"] and len(result) == 1:
            existing_labels = []
            result = result.drop("coverage_rate")
            value_cols = [c for c in result.columns if c not in meta_names]
        if len(result) == 1 and len(value_cols) <= 1 and not existing_labels and all_spatial_collapsed:
            if len(value_cols) == 0:
                return float("nan")
            return float(result[value_cols[0]][0])

        return result

    def _pre_filter_zero_weights(
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
        context: ScoringContext,
        time_weight: Callable | pl.DataFrame | dict | None = None,
        step_weight: Callable | pl.DataFrame | dict | None = None,
        vintage_weight: Callable | pl.DataFrame | dict | None = None,
    ) -> tuple[
        pl.DataFrame,
        pl.DataFrame,
        ScoringContext,
        np.ndarray | dict[str, np.ndarray] | None,
        np.ndarray | dict[str, np.ndarray] | None,
        np.ndarray | dict[str, np.ndarray] | None,
    ]:
        """Resolve weights and pre-filter rows with zero weight.

        For group-uniform sources (dict, 1-param callable, DataFrame
        without group columns), zeros are combined into a mask and matching
        rows are removed from ``y_truth``, ``y_pred``, and ``context``.
        For panel-aware sources, weights are resolved per-group into a
        ``dict[str, np.ndarray]`` but NOT used for row pre-filtering.

        Returns the filtered data plus pre-resolved weights to avoid
        resolving twice.

        """
        _, panel_groups = inspect_panel(y_truth)
        has_panel = len(panel_groups) > 0

        zero_mask = np.zeros(len(y_truth), dtype=bool)

        def _resolve_one(
            w: Callable | pl.DataFrame | dict | None,
            key_series: pl.Series,
            join_column: str,
            name: str,
        ) -> np.ndarray | dict[str, np.ndarray] | None:
            """Resolve a single weight argument into aligned arrays."""
            if w is None:  # pragma: no cover
                return None

            # Detect panel-awareness
            is_panel_aware = False
            if has_panel and callable(w) and not isinstance(w, dict):
                n_params = validate_callable_signature(w)
                is_panel_aware = n_params == 2
            elif has_panel and isinstance(w, pl.DataFrame):
                # Check if it has group-specific weight columns
                for g in panel_groups:
                    if f"{g}_weight" in w.columns:
                        is_panel_aware = True
                        break

            if is_panel_aware and has_panel:
                # Resolve per-group, no row pre-filtering
                result_dict: dict[str, np.ndarray] = {}
                for group_name in panel_groups:
                    arr = resolve_weight_to_array(w, key_series, join_column, group_name)
                    result_dict[group_name] = arr
                return result_dict

            # Group-uniform: resolve once, track zeros for filtering
            group_name = next(iter(panel_groups)) if has_panel else None
            arr = resolve_weight_to_array(w, key_series, join_column, group_name)
            return arr

        # Build key series for each weight type
        time_series = pl.Series("time", context.time_values) if context.time_values is not None else None
        step_series = context.forecasting_step
        vintage_series = context.vintage_time

        # Resolve time_weight
        tw_resolved = None
        if time_weight is not None:
            if time_series is None:  # pragma: no cover
                raise ValueError("time_values unavailable in context but time_weight was provided")
            tw_resolved = _resolve_one(time_weight, time_series, "time", "time_weight")
            if isinstance(tw_resolved, np.ndarray):
                zero_mask |= tw_resolved == 0.0

        # Resolve step_weight (silently ignored when forecasting_step unavailable)
        sw_resolved = None
        if step_weight is not None and step_series is not None:
            sw_resolved = _resolve_one(step_weight, step_series, "forecasting_step", "step_weight")
            if isinstance(sw_resolved, np.ndarray):
                zero_mask |= sw_resolved == 0.0

        # Resolve vintage_weight (silently ignored when vintage_time unavailable)
        vw_resolved = None
        if vintage_weight is not None and vintage_series is not None:
            vw_resolved = _resolve_one(vintage_weight, vintage_series, "vintage_time", "vintage_weight")
            if isinstance(vw_resolved, np.ndarray):
                zero_mask |= vw_resolved == 0.0

        # Apply zero-mask filter
        if np.any(zero_mask):
            keep = ~zero_mask
            if not np.any(keep):
                raise ValueError(
                    "All rows have zero weight after pre-filtering. "
                    "Check that weight dicts/callables assign non-zero weights to at least some data."
                )

            from yohou.metrics._context import ScoringContext as _ScoringContext  # noqa: PLC0415

            y_truth = y_truth.filter(keep)
            y_pred = y_pred.filter(keep)
            context = _ScoringContext(
                time_values=[t for t, m in zip(context.time_values, keep.tolist(), strict=True) if m],
                vintage_time=(context.vintage_time.filter(keep) if context.vintage_time is not None else None),
                forecasting_step=(
                    context.forecasting_step.filter(keep) if context.forecasting_step is not None else None
                ),
            )

            # Slice resolved weight arrays to match
            if isinstance(tw_resolved, np.ndarray):
                tw_resolved = tw_resolved[keep]  # ty: ignore[invalid-argument-type]
            elif isinstance(tw_resolved, dict):
                tw_resolved = {g: a[keep] for g, a in tw_resolved.items()}

            if isinstance(sw_resolved, np.ndarray):
                sw_resolved = sw_resolved[keep]  # ty: ignore[invalid-argument-type]
            elif isinstance(sw_resolved, dict):
                sw_resolved = {g: a[keep] for g, a in sw_resolved.items()}

            if isinstance(vw_resolved, np.ndarray):
                vw_resolved = vw_resolved[keep]  # ty: ignore[invalid-argument-type]
            elif isinstance(vw_resolved, dict):
                vw_resolved = {g: a[keep] for g, a in vw_resolved.items()}

        # Derive per-unique-vintage weights and store in context
        if isinstance(vw_resolved, dict) and context.vintage_time is not None:
            # Panel-aware vintage_weight: all groups share the same vintage_time
            # axis, so use the first group's weights for cross-vintage weighting.
            first_group_weights = next(iter(vw_resolved.values()))
            context = self._set_vintage_weight_on_context(context, first_group_weights)  # ty: ignore[invalid-argument-type]
            vw_resolved = None
        elif isinstance(vw_resolved, np.ndarray) and context.vintage_time is not None:
            context = self._set_vintage_weight_on_context(context, vw_resolved)
            # vintage_weight handled at cross-vintage level; don't pass to _apply_weights
            vw_resolved = None

        return y_truth, y_pred, context, tw_resolved, sw_resolved, vw_resolved

    @staticmethod
    def _set_vintage_weight_on_context(
        context: ScoringContext,
        vw_array: np.ndarray,
    ) -> ScoringContext:
        """Attach per-unique-vintage weights to a scoring context."""
        vt_df = pl.DataFrame({
            "vintage_time": context.vintage_time,
            "_vw": vw_array,
        })
        per_vintage = vt_df.group_by("vintage_time", maintain_order=True).agg(pl.col("_vw").first())
        return ScoringContext(
            time_values=context.time_values,
            vintage_time=context.vintage_time,
            forecasting_step=context.forecasting_step,
            vintage_weight=per_vintage["_vw"].to_numpy(),
        )

    def _resolve_vintage_weight_to_context(
        self,
        context: ScoringContext,
        vintage_weight: Callable | pl.DataFrame | dict | None,
    ) -> ScoringContext:
        """Resolve a vintage_weight argument into context.vintage_weight.

        Lightweight alternative to ``_pre_filter_zero_weights`` for scorers
        that only need vintage_weight resolution (no time/step weights).
        """
        if vintage_weight is None:
            return context
        if context.vintage_time is None:
            return context
        vw_resolved = resolve_weight_to_array(
            vintage_weight,
            context.vintage_time,
            "vintage_time",
        )
        if isinstance(vw_resolved, dict):
            raise TypeError(
                "Panel-aware (dict) vintage_weight is not supported. "
                "Use a flat callable, DataFrame, or dict keyed on vintage_time values."
            )
        return self._set_vintage_weight_on_context(context, vw_resolved)

    def _apply_weights(
        self,
        scores: pl.DataFrame,
        time_weight_resolved: np.ndarray | dict[str, np.ndarray] | None,
        step_weight_resolved: np.ndarray | dict[str, np.ndarray] | None,
        n_rates: int = 1,
    ) -> pl.DataFrame:
        """Apply pre-resolved weights to score DataFrame.

        Two-stage normalization: (1) normalize and apply time_weight,
        (2) apply step_weight (normalized).

        Vintage weights are not applied at row level; they are handled
        during cross-vintage aggregation in ``_collapse_vintage_dimension``.

        """
        _, panel_groups = inspect_panel(scores)

        def _apply_array(df: pl.DataFrame, w: np.ndarray, cols: list[str]) -> pl.DataFrame:
            """Multiply columns by weight array."""
            return df.with_columns([(pl.col(c) * w).alias(c) for c in cols])

        def _apply_one_weight(
            df: pl.DataFrame,
            w_resolved: np.ndarray | dict[str, np.ndarray],
            value_cols: list[str],
        ) -> pl.DataFrame:
            """Apply a single normalized weight (array or dict) to scores."""
            if isinstance(w_resolved, dict):
                for group_name, group_arr in w_resolved.items():
                    normed = normalize_weights(group_arr)  # ty: ignore[invalid-argument-type]
                    tiled = np.tile(normed, n_rates) if n_rates > 1 else normed
                    group_cols = [c for c in panel_groups.get(group_name, []) if c in value_cols]  # ty: ignore[no-matching-overload]
                    if group_cols:
                        df = _apply_array(df, tiled, group_cols)
            else:
                normed = normalize_weights(w_resolved)
                tiled = np.tile(normed, n_rates) if n_rates > 1 else normed
                df = _apply_array(df, tiled, value_cols)
            return df

        value_cols = [c for c in scores.columns if c != "coverage_rate"]

        # Stage 1: time_weight (normalized independently)
        if time_weight_resolved is not None:
            scores = _apply_one_weight(scores, time_weight_resolved, value_cols)

        # Stage 2: step_weight (normalized independently)
        if step_weight_resolved is not None:
            scores = _apply_one_weight(scores, step_weight_resolved, value_cols)

        return scores

    def _validate_parameters(
        self,
        y_train: pl.DataFrame | None = None,
        aggregation_method: list[str] | str | None = None,
        valid_aggregation_methods: set[str] | None = None,
    ) -> None:
        """Validate scorer parameters.

        Parameters
        ----------
        y_train : pl.DataFrame or None
            Training data to validate against. If None, only type validation is performed.
        aggregation_method : list of str or str or None
            Aggregation method to validate. If None, aggregation validation is skipped.
        valid_aggregation_methods : set of str or None
            Set of valid aggregation method strings. Required if aggregation_method is provided.

        Raises
        ------
        ValueError
            If validation fails.

        """
        # Validate aggregation_method if provided
        if aggregation_method is not None:
            if valid_aggregation_methods is None:
                raise ValueError("valid_aggregation_methods must be provided when validating aggregation_method")

            # Handle single string
            if isinstance(aggregation_method, str):
                # "all" is a special value that means aggregate across all dimensions
                if aggregation_method != "all" and aggregation_method not in valid_aggregation_methods:
                    raise ValueError(
                        f"Invalid aggregation_method '{aggregation_method}'. "
                        f"Valid options are: 'all' or {sorted(valid_aggregation_methods)}"
                    )
            # Handle list
            elif isinstance(aggregation_method, list):
                # Check all elements are strings
                if not all(isinstance(method, str) for method in aggregation_method):
                    raise ValueError(f"All elements in aggregation_method must be strings, got: {aggregation_method}")
                if len(aggregation_method) == 0:
                    raise ValueError(
                        f"aggregation_method list cannot be empty. "
                        f"Use 'all' or provide at least one method: {sorted(valid_aggregation_methods)}"
                    )
                for method in aggregation_method:
                    if method not in valid_aggregation_methods:
                        raise ValueError(
                            f"Invalid aggregation_method '{method}' in list. "
                            f"Valid list elements are: {sorted(valid_aggregation_methods)}"
                        )
            else:
                raise ValueError(
                    f"aggregation_method must be a string or list of strings, got {type(aggregation_method)}"
                )

        # Validate groups type (list or dict)
        group_filter = self._filter_keys(self.groups)
        if group_filter is not None:
            if not all(isinstance(name, str) for name in group_filter):
                raise ValueError("All group names must be strings")
            if len(group_filter) == 0:
                raise ValueError("groups cannot be empty")

        # Validate components type (list or dict)
        comp_filter = self._filter_keys(self.components)
        if comp_filter is not None:
            if not all(isinstance(name, str) for name in comp_filter):
                raise ValueError("All component names must be strings")
            if len(comp_filter) == 0:
                raise ValueError("components cannot be empty")

        # If y_train is provided, validate against actual data
        if y_train is not None:
            _, panel_groups = inspect_panel(y_train)
            available_groups = set(panel_groups.keys())

            # Validate groups exist in data
            if group_filter is not None:
                if len(available_groups) == 0:
                    # No panel data, but user specified groups
                    raise ValueError(
                        f"groups specified but data contains no panel groups. "
                        f"Data has only global columns: {sorted(set(y_train.columns) - {'time'})}"
                    )
                requested_groups = set(group_filter)
                missing_groups = requested_groups - available_groups
                if missing_groups:
                    raise ValueError(
                        f"Requested groups {sorted(missing_groups)} not found in data. "
                        f"Available groups: {sorted(available_groups)}"
                    )

            # Validate components exist in data
            if comp_filter is not None:
                if len(panel_groups) > 0:
                    # Panel data: check unprefixed column names
                    available_components = set()
                    for group_cols in panel_groups.values():
                        for col in group_cols:
                            # Extract unprefixed column name
                            available_components.add(col.split("__", 1)[1])
                else:
                    # Global data: check column names directly
                    available_components = set(y_train.columns) - {"time"}

                requested_components = set(comp_filter)
                missing_components = requested_components - available_components
                if missing_components:
                    raise ValueError(
                        f"Requested components {sorted(missing_components)} "
                        f"not found in data. Available components: {sorted(available_components)}"
                    )

    @staticmethod
    def _normalize_agg_methods(
        aggregation_method: list[str] | str,
        include_coveragewise: bool = False,
    ) -> set[str]:
        """Normalize aggregation_method to a set of orthogonal mode names.

        Expands ``"all"`` to the full set of modes.

        Parameters
        ----------
        aggregation_method : list of str or str
            Raw aggregation_method value from the scorer.
        include_coveragewise : bool, default=False
            Whether to include ``"coveragewise"`` in the ``"all"`` expansion
            (interval scorers only).

        Returns
        -------
        set of str
            Normalized set of aggregation mode names.

        """
        if aggregation_method == "all":
            modes = {"stepwise", "vintagewise", "componentwise", "groupwise"}
            if include_coveragewise:
                modes.add("coveragewise")
            return modes

        modes = {aggregation_method} if isinstance(aggregation_method, str) else set(aggregation_method)

        return modes

    def _transform_scores(self, df: pl.DataFrame) -> pl.DataFrame:
        """Apply element-wise transform to aggregated value columns.

        Override in subclasses needing post-aggregation transforms
        (e.g. sqrt for RMSE). Called inside ``_aggregate_per_vintage_scores``
        after component/group collapse, before vintage collapse.
        Receives only value columns (no meta columns).

        Parameters
        ----------
        df : pl.DataFrame
            DataFrame with value columns only.

        Returns
        -------
        pl.DataFrame
            Transformed DataFrame.

        """
        return df

    @staticmethod
    def _reject_weights(**params: object) -> None:
        """Raise if any weight kwargs are passed to a scorer that doesn't support them."""
        weight_keys = [
            k for k in params if k in {"time_weight", "step_weight", "vintage_weight"} and params[k] is not None
        ]
        if weight_keys:
            raise TypeError(
                f"This scorer does not support sample weights, "
                f"but received: {', '.join(sorted(weight_keys))}. "
                f"Remove the weight arguments or use a scorer that supports weighting."
            )

    def _aggregate_per_vintage_scores(
        self,
        result: pl.DataFrame,
        context: ScoringContext | None,
    ) -> float | pl.DataFrame:
        """Shared pipeline tail for all scorer families.

        Input: a per-vintage DataFrame (one row per vintage with
        ``vintage_time`` column for multi-vintage, or single-row without
        it for single-vintage). Pipeline:
        components → groups → strip meta → _transform_scores → reattach meta
        → _collapse_vintage_dimension → finalize → rename.
        """
        dims = self._normalize_agg_methods(self.aggregation_method)  # ty: ignore[unresolved-attribute]

        # Input validation: multi-vintage context but no vintage_time column.
        # Only fires when both stepwise and vintagewise are requested, because
        # in that case _collapse_rows preserves vintage_time for per-vintage-first
        # aggregation and _collapse_vintage_dimension handles the final collapse.
        # When only vintagewise is requested, _collapse_rows already collapses
        # vintages by grouping by forecasting_step, so vintage_time is gone.
        if (
            "stepwise" in dims
            and "vintagewise" in dims
            and context is not None
            and context.vintage_time is not None
            and context.vintage_time.n_unique() > 1
            and "vintage_time" not in result.columns
        ):
            raise ValueError(
                "Context has multiple vintages but input DataFrame is missing "
                "'vintage_time' column. Attach vintage_time before calling "
                "_aggregate_per_vintage_scores."
            )

        if "componentwise" in dims:
            result = self._collapse_components(result)

        if "groupwise" in dims:
            result = self._collapse_groups(result)

        # Strip meta → _transform_scores → reattach meta
        meta_names = {"coverage_rate", "forecasting_step", "vintage_time", "time"}
        meta_cols = [c for c in result.columns if c in meta_names]
        val_cols = [c for c in result.columns if c not in meta_names]

        if val_cols:
            transformed = self._transform_scores(result.select(val_cols))
            result = pl.concat([result.select(meta_cols), transformed], how="horizontal") if meta_cols else transformed

        # Collapse vintage dimension (weighted/unweighted mean across vintages)
        result = self._collapse_vintage_dimension(result, context, dims)

        # Finalize: attach labels, convert to scalar
        finalized = self._finalize(result, context, dims)

        if isinstance(finalized, pl.DataFrame):
            finalized = self._rename_metric_columns(finalized)

        return finalized

    def _map_per_vintage(
        self,
        y_truth: pl.DataFrame,
        y_pred: pl.DataFrame,
        context: ScoringContext | None,
        compute_fn: Callable[[pl.DataFrame, pl.DataFrame], pl.DataFrame | None],
    ) -> pl.DataFrame:
        """Group data by vintage and apply compute_fn per group.

        Utility for Pattern 2 scorers (whole-column computation).
        Returns a per-vintage DataFrame with ``vintage_time`` column
        for multi-vintage, or a single-row DataFrame (no vintage_time)
        for single-vintage.

        Parameters
        ----------
        y_truth : pl.DataFrame
            Ground truth (time column removed).
        y_pred : pl.DataFrame
            Predictions (time column removed).
        context : ScoringContext or None
            Scoring context.
        compute_fn : callable
            ``(y_truth_slice, y_pred_slice) -> pl.DataFrame | None``.
            Returns single-row DataFrame per vintage or None to skip.

        Returns
        -------
        pl.DataFrame
            Per-vintage results concatenated.

        Raises
        ------
        ValueError
            If all vintages are skipped.

        """
        vintage_time = context.vintage_time if context is not None else None

        # Single-vintage fallthrough
        if vintage_time is None or vintage_time.n_unique() <= 1:
            result = compute_fn(y_truth, y_pred)
            if result is None:
                raise ValueError("All vintage groups were skipped. No valid data to compute the metric.")
            return result

        # Multi-vintage: group by vintage
        vt_values = vintage_time.to_list()
        unique_vintages = vintage_time.unique(maintain_order=True).to_list()

        results: list[pl.DataFrame] = []
        for vt in unique_vintages:
            mask = [v == vt for v in vt_values]
            yt_slice = y_truth.filter(mask)
            yp_slice = y_pred.filter(mask)
            row = compute_fn(yt_slice, yp_slice)
            if row is None:
                continue
            row = row.with_columns(pl.lit(vt).alias("vintage_time").cast(pl.Datetime))
            results.append(row)

        if not results:
            raise ValueError("All vintage groups were skipped. No valid data to compute the metric.")

        return pl.concat(results, how="diagonal_relaxed")

    def _aggregate_scores(
        self, raw_scores: pl.DataFrame, context: ScoringContext | None = None
    ) -> float | pl.DataFrame:
        """Apply sequential aggregation pipeline to raw scores.

        Pipeline: coverage → rows (per-vintage) → _aggregate_per_vintage_scores.

        Parameters
        ----------
        raw_scores : pl.DataFrame
            DataFrame with per-timestep (rows) per-component (columns) scores.
            May contain a ``"coverage_rate"`` column for interval scorers.
        context : ScoringContext or None, default=None
            Scoring context with time values and metadata.

        Returns
        -------
        float or pl.DataFrame
            Aggregated scores based on aggregation_method.

        """
        has_coverage_rate = "coverage_rate" in raw_scores.columns
        dims = self._normalize_agg_methods(
            self.aggregation_method,  # ty: ignore[unresolved-attribute]
            include_coveragewise=has_coverage_rate,
        )

        result = raw_scores

        # 1. Collapse coverage rates (interval only)
        if "coveragewise" in dims:
            result = self._collapse_coverage_rates(result)

        # 2. Collapse row dimensions (steps and/or vintages)
        result = self._collapse_rows(result, context, dims)

        # 3. Delegate tail to shared pipeline
        return self._aggregate_per_vintage_scores(result, context)

    @abc.abstractmethod
    def score(
        self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, /, **params
    ) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
        """Compute the metric score.

        Parameters
        ----------
        y_truth : pl.DataFrame
            Ground truth time series to score against.  Must have a
            ``"time"`` column and one or more numeric value columns.
        y_pred : pl.DataFrame
            Predicted time series to evaluate.  Must have ``"vintage_time"``
            and ``"time"`` columns and columns matching ``y_truth``.
        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame or float or dict
            Aggregated score(s).  A ``float`` when
            ``aggregation_method="all"``, a ``pl.DataFrame`` for partial
            aggregations, or a ``dict`` mapping coverage rates to scores
            for interval scorers.

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If the scorer has not been fitted yet (when calibration is
            required).
        ValueError
            If ``y_truth`` and ``y_pred`` have mismatched columns or
            incompatible shapes.

        """

    def _rename_metric_columns(self, result: pl.DataFrame) -> pl.DataFrame:
        """Rename aggregation output columns to use the metric name.

        Replaces ``"score"`` with ``_metric_name`` and ``"__score"``
        suffixes with ``"__<_metric_name>"``.

        Parameters
        ----------
        result : pl.DataFrame
            DataFrame from ``_aggregate_scores`` that may contain
            ``"score"`` or ``"*__score"`` columns.

        Returns
        -------
        pl.DataFrame
            DataFrame with columns renamed to use the metric name.

        """
        metric_name: str = getattr(self, "_metric_name", "score")
        rename_map: dict[str, str] = {}
        if "score" in result.columns:
            rename_map["score"] = metric_name
        for col in result.columns:
            if col.endswith("__score"):
                rename_map[col] = col.replace("__score", f"__{metric_name}")
        if rename_map:
            result = result.rename(rename_map)
        return result

    def __call__(
        self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, **params
    ) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
        """Compute score using callable interface.

        Enables using scorers as functions: scorer(y_truth, y_pred).

        Parameters
        ----------
        y_truth : pl.DataFrame
            Ground truth values.

        y_pred : pl.DataFrame
            Predicted values.

        **params : dict
            Metadata to route to nested estimators.

        Returns
        -------
        pl.DataFrame or float or dict
            Metric score.

        """
        return self.score(y_truth, y_pred, **params)

Methods

lower_is_better property

Whether lower scores indicate better performance.

__sklearn_tags__()

Get estimator tags.

Returns
Type Description
Tags

Estimator tags with scorer-specific attributes.

Source Code
Show/Hide source
def __sklearn_tags__(self) -> Tags:
    """Get estimator tags.

    Returns
    -------
    Tags
        Estimator tags with scorer-specific attributes.

    """
    tags = Tags(estimator_type="scorer", requires_fit=False)

    # Subclasses set prediction_type in their __sklearn_tags__() method
    # Most scorers don't require calibration (fit is optional)
    assert tags.scorer_tags is not None
    tags.scorer_tags.requires_calibration = False
    tags.scorer_tags.lower_is_better = self._lower_is_better

    return tags

fit(y_train, *, forecaster=None, **params)

Fit the scorer on training data.

Validates groups and component_names against training data. Stores training data statistics for scaled metrics (e.g., MASE). Subclasses should override to add type-specific parameter validation.

Parameters
Name Type Description Default
y_train DataFrame

Training target time series with a "time" column and one or more numeric value columns.

required
forecaster BaseForecaster or None

If provided, metadata is extracted directly from the fitted forecaster (interval_, groups_, forecaster_horizon_) instead of being re-inferred from y_train.

None
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
self

The fitted scorer instance.

Raises
Type Description
ValueError

If groups or component_names contain names not present in y_train.

Source Code
Show/Hide source
@_fit_context(prefer_skip_nested_validation=True)
def fit(self, y_train: pl.DataFrame, *, forecaster=None, **params) -> BaseScorer:
    """Fit the scorer on training data.

    Validates ``groups`` and ``component_names`` against
    training data.  Stores training data statistics for scaled metrics
    (e.g., MASE).  Subclasses should override to add type-specific
    parameter validation.

    Parameters
    ----------
    y_train : pl.DataFrame
        Training target time series with a ``"time"`` column and one or
        more numeric value columns.
    forecaster : BaseForecaster or None, default=None
        If provided, metadata is extracted directly from the fitted
        forecaster (``interval_``, ``groups_``, ``forecaster_horizon_``)
        instead of being re-inferred from ``y_train``.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    self
        The fitted scorer instance.

    Raises
    ------
    ValueError
        If ``groups`` or ``component_names`` contain names not
        present in ``y_train``.

    """
    # Validate base parameters (groups, component_names)
    self._validate_parameters(y_train=y_train)

    # Validate input structure without aligning (single dataframe)
    validate_scorer_data(self, y_true=y_train, y_pred=None, reset=True)

    # Infer groups_ from y_train panel structure
    _, panel_groups = inspect_panel(y_train)
    self.groups_ = list(panel_groups.keys()) if panel_groups else None

    if forecaster is not None:
        # Extract metadata from fitted forecaster
        check_is_fitted(forecaster)
        self.interval_ = forecaster.interval_
        self.forecaster_horizon_ = forecaster.fit_forecasting_horizon_
    # Infer interval from training data time column
    elif "time" in y_train.columns and len(y_train) >= 2:
        self.interval_ = check_interval_consistency(y_train)
    else:
        self.interval_ = None

    # Mark as fitted
    self._is_fitted = True

    return self

score(y_truth, y_pred, /, **params) abstractmethod

Compute the metric score.

Parameters
Name Type Description Default
y_truth DataFrame

Ground truth time series to score against. Must have a "time" column and one or more numeric value columns.

required
y_pred DataFrame

Predicted time series to evaluate. Must have "vintage_time" and "time" columns and columns matching y_truth.

required
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame or float or dict

Aggregated score(s). A float when aggregation_method="all", a pl.DataFrame for partial aggregations, or a dict mapping coverage rates to scores for interval scorers.

Raises
Type Description
NotFittedError

If the scorer has not been fitted yet (when calibration is required).

ValueError

If y_truth and y_pred have mismatched columns or incompatible shapes.

Source Code
Show/Hide source
@abc.abstractmethod
def score(
    self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, /, **params
) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
    """Compute the metric score.

    Parameters
    ----------
    y_truth : pl.DataFrame
        Ground truth time series to score against.  Must have a
        ``"time"`` column and one or more numeric value columns.
    y_pred : pl.DataFrame
        Predicted time series to evaluate.  Must have ``"vintage_time"``
        and ``"time"`` columns and columns matching ``y_truth``.
    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame or float or dict
        Aggregated score(s).  A ``float`` when
        ``aggregation_method="all"``, a ``pl.DataFrame`` for partial
        aggregations, or a ``dict`` mapping coverage rates to scores
        for interval scorers.

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If the scorer has not been fitted yet (when calibration is
        required).
    ValueError
        If ``y_truth`` and ``y_pred`` have mismatched columns or
        incompatible shapes.

    """

__call__(y_truth, y_pred, **params)

Compute score using callable interface.

Enables using scorers as functions: scorer(y_truth, y_pred).

Parameters
Name Type Description Default
y_truth DataFrame

Ground truth values.

required
y_pred DataFrame

Predicted values.

required
**params dict

Metadata to route to nested estimators.

{}
Returns
Type Description
DataFrame or float or dict

Metric score.

Source Code
Show/Hide source
def __call__(
    self, y_truth: pl.DataFrame, y_pred: pl.DataFrame, **params
) -> pl.DataFrame | float | dict[str | float, float | pl.DataFrame]:
    """Compute score using callable interface.

    Enables using scorers as functions: scorer(y_truth, y_pred).

    Parameters
    ----------
    y_truth : pl.DataFrame
        Ground truth values.

    y_pred : pl.DataFrame
        Predicted values.

    **params : dict
        Metadata to route to nested estimators.

    Returns
    -------
    pl.DataFrame or float or dict
        Metric score.

    """
    return self.score(y_truth, y_pred, **params)