Skip to content

check_memory_bounded

yohou.testing.transformer.check_memory_bounded(transformer, X_train, X_test, y=None, n_updates=5)

Check memory doesn't grow unbounded with sequential updates.

Important for production time series applications with continuous data streams. Memory should stabilize at observation_horizon size.

Parameters

Name Type Description Default
transformer BaseTransformer

Unfitted transformer

required
X_train DataFrame

Training data (used for fit)

required
X_test DataFrame

Test data (used for updates - non-overlapping with training)

required
y DataFrame

Target data

None
n_updates int

Number of update iterations to test

5

Raises

Type Description
AssertionError

If memory grows beyond expected bounds

Source Code

Show/Hide source
def check_memory_bounded(
    transformer,
    X_train: pl.DataFrame,
    X_test: pl.DataFrame,
    y: pl.DataFrame | None = None,
    n_updates: int = 5,
) -> None:
    """Check memory doesn't grow unbounded with sequential updates.

    Important for production time series applications with continuous
    data streams. Memory should stabilize at observation_horizon size.

    Parameters
    ----------
    transformer : BaseTransformer
        Unfitted transformer
    X_train : pl.DataFrame
        Training data (used for fit)
    X_test : pl.DataFrame
        Test data (used for updates - non-overlapping with training)
    y : pl.DataFrame, optional
        Target data
    n_updates : int
        Number of update iterations to test

    Raises
    ------
    AssertionError
        If memory grows beyond expected bounds

    """
    transformer_clone = clone(transformer)
    transformer_clone.fit(X_train, y)

    horizon = transformer_clone.observation_horizon
    max_memory_factor = 2.0
    expected_max_rows = int(horizon * max_memory_factor)

    # Create update chunks from test data (non-overlapping with training)
    chunk_size = max(1, len(X_test) // n_updates)

    for i in range(n_updates):
        start_idx = i * chunk_size
        if start_idx >= len(X_test):
            break

        X_chunk = X_test.slice(start_idx, chunk_size)
        transformer_clone.observe(X_chunk)

        actual_rows = len(transformer_clone._X_observed)
        assert actual_rows <= expected_max_rows, (
            f"Memory grew beyond bounds after {i + 1} updates: "
            f"{actual_rows} rows > {expected_max_rows} (horizon={horizon})"
        )