diff --git a/merlion/evaluate/forecast.py b/merlion/evaluate/forecast.py index 20d98c9ba..670c1efaa 100644 --- a/merlion/evaluate/forecast.py +++ b/merlion/evaluate/forecast.py @@ -20,7 +20,6 @@ from merlion.utils.resample import granularity_str_to_seconds -# TODO: support multivariate time series class ForecastScoreAccumulator: """ Accumulator which maintains summary statistics describing a forecasting @@ -35,6 +34,7 @@ def __init__( periodicity: int = 1, ub: TimeSeries = None, lb: TimeSeries = None, + target_seq_index: int = None, ): """ :param ground_truth: ground truth time series @@ -44,18 +44,28 @@ def __init__( whereas m>1 indicates seasonal time series. This value is used for computing MSES, MSIS. :param ub (optional): upper bound of 95% prediction interval. This value is used for computing MSIS :param lb (optional): lower bound of 95% prediction interval. This value is used for computing MSIS + :param target_seq_index (optional): the index of the target sequence, for multivariate. """ ground_truth = ground_truth.to_ts() if isinstance(ground_truth, UnivariateTimeSeries) else ground_truth predict = predict.to_ts() if isinstance(predict, UnivariateTimeSeries) else predict insample = insample.to_ts() if isinstance(insample, UnivariateTimeSeries) else insample t0, tf = predict.t0, predict.tf ground_truth = ground_truth.window(t0, tf, include_tf=True).align() + if target_seq_index is not None: + ground_truth = ground_truth.univariates[ground_truth.names[target_seq_index]].to_ts() + if insample is not None: + insample = insample.univariates[insample.names[target_seq_index]].to_ts() + else: + assert ground_truth.dim == 1 and ( + insample is None or insample.dim == 1 + ), "Expected to receive either univariate ground truth time series or non-None target_seq_index" self.ground_truth = ground_truth self.predict = predict.align(reference=ground_truth.time_stamps) self.insample = insample self.periodicity = periodicity self.ub = ub self.lb = lb + self.target_seq_index = target_seq_index def check_before_eval(self): # Make sure time series is univariate @@ -211,9 +221,16 @@ def accumulate_forecast_score( ub: TimeSeries = None, lb: TimeSeries = None, metric=None, + target_seq_index=None, ) -> Union[ForecastScoreAccumulator, float]: acc = ForecastScoreAccumulator( - ground_truth=ground_truth, predict=predict, insample=insample, periodicity=periodicity, ub=ub, lb=lb + ground_truth=ground_truth, + predict=predict, + insample=insample, + periodicity=periodicity, + ub=ub, + lb=lb, + target_seq_index=target_seq_index, ) return acc if metric is None else metric(acc) diff --git a/merlion/models/ensemble/combine.py b/merlion/models/ensemble/combine.py index fa31549d3..1b3af1458 100644 --- a/merlion/models/ensemble/combine.py +++ b/merlion/models/ensemble/combine.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 salesforce.com, inc. +# Copyright (c) 2022 salesforce.com, inc. # All rights reserved. # SPDX-License-Identifier: BSD-3-Clause # For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause @@ -92,7 +92,7 @@ def models_used(self) -> List[bool]: assert self.n_models is not None, "Combiner must be trained to determine which models are used" return [True] * self.n_models - def train(self, all_model_outs: List[TimeSeries], target: TimeSeries = None) -> TimeSeries: + def train(self, all_model_outs: List[TimeSeries], target: TimeSeries = None, **kwargs) -> TimeSeries: """ Trains the model combination rule. diff --git a/merlion/models/ensemble/forecast.py b/merlion/models/ensemble/forecast.py index b49f9237f..4400287ef 100644 --- a/merlion/models/ensemble/forecast.py +++ b/merlion/models/ensemble/forecast.py @@ -96,6 +96,9 @@ def train_pre_process(self, train_data: TimeSeries) -> TimeSeries: def resample_time_stamps(self, time_stamps: Union[int, List[int]], time_series_prev: TimeSeries = None): return time_stamps + def train_combiner(self, all_model_outs: List[TimeSeries], target: TimeSeries, **kwargs) -> TimeSeries: + return self.combiner.train(all_model_outs, target, target_seq_index=self.target_seq_index) + def _train( self, train_data: pd.DataFrame, train_config: EnsembleTrainConfig = None ) -> Tuple[Optional[TimeSeries], None]: diff --git a/tests/forecast/test_ets.py b/tests/forecast/test_ets.py index bc0f9e4db..e8ae700d5 100644 --- a/tests/forecast/test_ets.py +++ b/tests/forecast/test_ets.py @@ -102,9 +102,12 @@ def __init__(self, *args, **kwargs): self.model = AutoETS(AutoETSConfig(pval=0.1, error="add", trend="add", seasonal="add", damped_trend=True)) def _multi_setup(self): - vals = self.data.to_pd().values - self.data = TimeSeries.from_pd(pd.DataFrame(np.concatenate((vals, vals * 2), axis=1), columns=["A", "B"])) + x = self.data.to_pd() + self.data = TimeSeries.from_pd( + pd.DataFrame(np.concatenate((x.values, x.values * 2), axis=1), columns=["A", "B"], index=x.index) + ) self.train_data = self.data[: len(self.train_data)] + self.test_data = self.data[len(self.train_data) :] self.model.config.target_seq_index = 0 def test_univariate(self): @@ -118,16 +121,21 @@ def _test_forecast(self): # batch forecasting RMSE = 6.5612 _, _ = self.model.train(self.train_data) forecast, lb, ub = self.model.forecast(self.max_forecast_steps, return_iqr=True) - rmse = ForecastMetric.RMSE.value(self.test_data, forecast) + rmse = ForecastMetric.RMSE.value(self.test_data, forecast, target_seq_index=0) logger.info(f"RMSE = {rmse:.4f} for {self.max_forecast_steps} step forecasting") self.assertAlmostEqual(rmse, 6.5, delta=1) - rmspe = ForecastMetric.RMSPE.value(self.test_data, forecast) + rmspe = ForecastMetric.RMSPE.value(self.test_data, forecast, target_seq_index=0) logger.info(f"RMPSE = {rmspe:.4f} for {self.max_forecast_steps} step forecasting") - smape = ForecastMetric.sMAPE.value(self.test_data, forecast) + smape = ForecastMetric.sMAPE.value(self.test_data, forecast, target_seq_index=0) logger.info(f"sMAPE = {smape:.4f} for {self.max_forecast_steps} step forecasting") - insample = self.train_data.univariates[self.train_data.names[0]].to_ts() msis = ForecastMetric.MSIS.value( - ground_truth=self.test_data, predict=forecast, insample=insample, periodicity=4, ub=ub, lb=lb + ground_truth=self.test_data, + predict=forecast, + insample=self.train_data, + periodicity=4, + ub=ub, + lb=lb, + target_seq_index=0, ) logger.info(f"MSIS = {msis:.4f}") self.assertLessEqual(np.abs(msis - 101.6), 10) @@ -155,7 +163,7 @@ def _test_forecast(self): forecast_results = forecast forecast_results += forecast t += self.model.timedelta - rmse_onestep = ForecastMetric.RMSE.value(self.test_data, forecast_results) + rmse_onestep = ForecastMetric.RMSE.value(self.test_data, forecast_results, target_seq_index=0) logger.info(f"Streaming RMSE = {rmse_onestep:.4f} for {self.max_forecast_steps} step forecasting") self.assertAlmostEqual(rmse_onestep, 2.4, delta=1) diff --git a/tests/forecast/test_forecast_ensemble.py b/tests/forecast/test_forecast_ensemble.py index bf46ef3e6..5e0caa75c 100644 --- a/tests/forecast/test_forecast_ensemble.py +++ b/tests/forecast/test_forecast_ensemble.py @@ -10,6 +10,7 @@ import unittest import numpy as np +import pandas as pd from merlion.models.ensemble.forecast import ForecasterEnsemble, ForecasterEnsembleConfig from merlion.models.ensemble.combine import ModelSelector, Mean @@ -19,7 +20,7 @@ from merlion.models.factory import ModelFactory from merlion.transform.base import Identity from merlion.transform.resample import TemporalResample -from merlion.utils.data_io import csv_to_time_series +from merlion.utils.data_io import csv_to_time_series, TimeSeries logger = logging.getLogger(__name__) rootdir = dirname(dirname(dirname(abspath(__file__)))) @@ -37,13 +38,12 @@ def __init__(self, *args, **kwargs): def test_mean(self): print("-" * 80) model0 = Arima(ArimaConfig(order=(6, 1, 2), max_forecast_steps=50, transform=TemporalResample("1h"))) - model1 = Arima(ArimaConfig(order=(24, 1, 0), transform=TemporalResample("10min"), max_forecast_steps=50)) + model1 = Arima(ArimaConfig(order=(24, 1, 0), max_forecast_steps=50, transform=TemporalResample("10min"))) model2 = AutoProphet( config=AutoProphetConfig(transform=Identity(), periodicity_strategy=PeriodicityStrategy.Max) ) self.ensemble = ForecasterEnsemble( - models=[model0, model1, model2], - config=ForecasterEnsembleConfig(combiner=Mean(abs_score=False), target_seq_index=0), + models=[model0, model1, model2], config=ForecasterEnsembleConfig(combiner=Mean(abs_score=False)) ) self.expected_smape = 37 @@ -52,26 +52,36 @@ def test_mean(self): logger.info("test_mean\n" + "-" * 80 + "\n") self.run_test() - def test_selector(self): - print("-" * 80) + def _test_selector(self): model0 = Arima(ArimaConfig(order=(6, 1, 2), max_forecast_steps=50, transform=TemporalResample("1h"))) model1 = Arima(ArimaConfig(order=(24, 1, 0), transform=TemporalResample("10min"), max_forecast_steps=50)) model2 = AutoProphet( config=AutoProphetConfig(target_seq_index=0, transform=Identity(), periodicity_strategy="Max") ) self.ensemble = ForecasterEnsemble( - models=[model0, model1, model2], config=ForecasterEnsembleConfig(combiner=Mean(abs_score=False)) + config=ForecasterEnsembleConfig( + models=[model0, model1, model2], combiner=ModelSelector(metric=ForecastMetric.sMAPE), target_seq_index=0 + ) ) - self.expected_smape = 35 - logger.info("test_selector\n" + "-" * 80 + "\n") - self.ensemble.config.combiner = ModelSelector(metric=ForecastMetric.sMAPE) self.run_test() # We expect the model selector to select Prophet because it gets the lowest validation sMAPE valid_smapes = np.asarray(self.ensemble.combiner.metric_values) self.assertAlmostEqual(np.max(np.abs(valid_smapes - [34.32, 40.66, 30.71])), 0, delta=0.5) self.assertSequenceEqual(self.ensemble.models_used, [False, False, True]) + def test_univariate_selector(self): + print("-" * 80) + logger.info("test_univariate_selector\n" + "-" * 80 + "\n") + self._test_selector() + + def test_multivariate_selector(self): + x = self.vals_train.to_pd() + self.vals_train = TimeSeries.from_pd( + pd.DataFrame(np.concatenate((x.values, x.values * 2), axis=1), columns=["A", "B"], index=x.index) + ) + self._test_selector() + def run_test(self): logger.info("Training model...") self.ensemble.train(self.vals_train)