Skip to content

Commit

Permalink
Make ForecastMetric work for multivar time series. (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
aadyotb authored Jul 15, 2022
1 parent 1d8f251 commit 3f7e800
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 22 deletions.
21 changes: 19 additions & 2 deletions merlion/evaluate/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from merlion.utils.resample import granularity_str_to_seconds


# TODO: support multivariate time series
class ForecastScoreAccumulator:
"""
Accumulator which maintains summary statistics describing a forecasting
Expand All @@ -35,6 +34,7 @@ def __init__(
periodicity: int = 1,
ub: TimeSeries = None,
lb: TimeSeries = None,
target_seq_index: int = None,
):
"""
:param ground_truth: ground truth time series
Expand All @@ -44,18 +44,28 @@ def __init__(
whereas m>1 indicates seasonal time series. This value is used for computing MSES, MSIS.
:param ub (optional): upper bound of 95% prediction interval. This value is used for computing MSIS
:param lb (optional): lower bound of 95% prediction interval. This value is used for computing MSIS
:param target_seq_index (optional): the index of the target sequence, for multivariate.
"""
ground_truth = ground_truth.to_ts() if isinstance(ground_truth, UnivariateTimeSeries) else ground_truth
predict = predict.to_ts() if isinstance(predict, UnivariateTimeSeries) else predict
insample = insample.to_ts() if isinstance(insample, UnivariateTimeSeries) else insample
t0, tf = predict.t0, predict.tf
ground_truth = ground_truth.window(t0, tf, include_tf=True).align()
if target_seq_index is not None:
ground_truth = ground_truth.univariates[ground_truth.names[target_seq_index]].to_ts()
if insample is not None:
insample = insample.univariates[insample.names[target_seq_index]].to_ts()
else:
assert ground_truth.dim == 1 and (
insample is None or insample.dim == 1
), "Expected to receive either univariate ground truth time series or non-None target_seq_index"
self.ground_truth = ground_truth
self.predict = predict.align(reference=ground_truth.time_stamps)
self.insample = insample
self.periodicity = periodicity
self.ub = ub
self.lb = lb
self.target_seq_index = target_seq_index

def check_before_eval(self):
# Make sure time series is univariate
Expand Down Expand Up @@ -211,9 +221,16 @@ def accumulate_forecast_score(
ub: TimeSeries = None,
lb: TimeSeries = None,
metric=None,
target_seq_index=None,
) -> Union[ForecastScoreAccumulator, float]:
acc = ForecastScoreAccumulator(
ground_truth=ground_truth, predict=predict, insample=insample, periodicity=periodicity, ub=ub, lb=lb
ground_truth=ground_truth,
predict=predict,
insample=insample,
periodicity=periodicity,
ub=ub,
lb=lb,
target_seq_index=target_seq_index,
)
return acc if metric is None else metric(acc)

Expand Down
4 changes: 2 additions & 2 deletions merlion/models/ensemble/combine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2021 salesforce.com, inc.
# Copyright (c) 2022 salesforce.com, inc.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
# For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
Expand Down Expand Up @@ -92,7 +92,7 @@ def models_used(self) -> List[bool]:
assert self.n_models is not None, "Combiner must be trained to determine which models are used"
return [True] * self.n_models

def train(self, all_model_outs: List[TimeSeries], target: TimeSeries = None) -> TimeSeries:
def train(self, all_model_outs: List[TimeSeries], target: TimeSeries = None, **kwargs) -> TimeSeries:
"""
Trains the model combination rule.
Expand Down
3 changes: 3 additions & 0 deletions merlion/models/ensemble/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def train_pre_process(self, train_data: TimeSeries) -> TimeSeries:
def resample_time_stamps(self, time_stamps: Union[int, List[int]], time_series_prev: TimeSeries = None):
return time_stamps

def train_combiner(self, all_model_outs: List[TimeSeries], target: TimeSeries, **kwargs) -> TimeSeries:
return self.combiner.train(all_model_outs, target, target_seq_index=self.target_seq_index)

def _train(
self, train_data: pd.DataFrame, train_config: EnsembleTrainConfig = None
) -> Tuple[Optional[TimeSeries], None]:
Expand Down
24 changes: 16 additions & 8 deletions tests/forecast/test_ets.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@ def __init__(self, *args, **kwargs):
self.model = AutoETS(AutoETSConfig(pval=0.1, error="add", trend="add", seasonal="add", damped_trend=True))

def _multi_setup(self):
vals = self.data.to_pd().values
self.data = TimeSeries.from_pd(pd.DataFrame(np.concatenate((vals, vals * 2), axis=1), columns=["A", "B"]))
x = self.data.to_pd()
self.data = TimeSeries.from_pd(
pd.DataFrame(np.concatenate((x.values, x.values * 2), axis=1), columns=["A", "B"], index=x.index)
)
self.train_data = self.data[: len(self.train_data)]
self.test_data = self.data[len(self.train_data) :]
self.model.config.target_seq_index = 0

def test_univariate(self):
Expand All @@ -118,16 +121,21 @@ def _test_forecast(self):
# batch forecasting RMSE = 6.5612
_, _ = self.model.train(self.train_data)
forecast, lb, ub = self.model.forecast(self.max_forecast_steps, return_iqr=True)
rmse = ForecastMetric.RMSE.value(self.test_data, forecast)
rmse = ForecastMetric.RMSE.value(self.test_data, forecast, target_seq_index=0)
logger.info(f"RMSE = {rmse:.4f} for {self.max_forecast_steps} step forecasting")
self.assertAlmostEqual(rmse, 6.5, delta=1)
rmspe = ForecastMetric.RMSPE.value(self.test_data, forecast)
rmspe = ForecastMetric.RMSPE.value(self.test_data, forecast, target_seq_index=0)
logger.info(f"RMPSE = {rmspe:.4f} for {self.max_forecast_steps} step forecasting")
smape = ForecastMetric.sMAPE.value(self.test_data, forecast)
smape = ForecastMetric.sMAPE.value(self.test_data, forecast, target_seq_index=0)
logger.info(f"sMAPE = {smape:.4f} for {self.max_forecast_steps} step forecasting")
insample = self.train_data.univariates[self.train_data.names[0]].to_ts()
msis = ForecastMetric.MSIS.value(
ground_truth=self.test_data, predict=forecast, insample=insample, periodicity=4, ub=ub, lb=lb
ground_truth=self.test_data,
predict=forecast,
insample=self.train_data,
periodicity=4,
ub=ub,
lb=lb,
target_seq_index=0,
)
logger.info(f"MSIS = {msis:.4f}")
self.assertLessEqual(np.abs(msis - 101.6), 10)
Expand Down Expand Up @@ -155,7 +163,7 @@ def _test_forecast(self):
forecast_results = forecast
forecast_results += forecast
t += self.model.timedelta
rmse_onestep = ForecastMetric.RMSE.value(self.test_data, forecast_results)
rmse_onestep = ForecastMetric.RMSE.value(self.test_data, forecast_results, target_seq_index=0)
logger.info(f"Streaming RMSE = {rmse_onestep:.4f} for {self.max_forecast_steps} step forecasting")
self.assertAlmostEqual(rmse_onestep, 2.4, delta=1)

Expand Down
30 changes: 20 additions & 10 deletions tests/forecast/test_forecast_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import unittest

import numpy as np
import pandas as pd

from merlion.models.ensemble.forecast import ForecasterEnsemble, ForecasterEnsembleConfig
from merlion.models.ensemble.combine import ModelSelector, Mean
Expand All @@ -19,7 +20,7 @@
from merlion.models.factory import ModelFactory
from merlion.transform.base import Identity
from merlion.transform.resample import TemporalResample
from merlion.utils.data_io import csv_to_time_series
from merlion.utils.data_io import csv_to_time_series, TimeSeries

logger = logging.getLogger(__name__)
rootdir = dirname(dirname(dirname(abspath(__file__))))
Expand All @@ -37,13 +38,12 @@ def __init__(self, *args, **kwargs):
def test_mean(self):
print("-" * 80)
model0 = Arima(ArimaConfig(order=(6, 1, 2), max_forecast_steps=50, transform=TemporalResample("1h")))
model1 = Arima(ArimaConfig(order=(24, 1, 0), transform=TemporalResample("10min"), max_forecast_steps=50))
model1 = Arima(ArimaConfig(order=(24, 1, 0), max_forecast_steps=50, transform=TemporalResample("10min")))
model2 = AutoProphet(
config=AutoProphetConfig(transform=Identity(), periodicity_strategy=PeriodicityStrategy.Max)
)
self.ensemble = ForecasterEnsemble(
models=[model0, model1, model2],
config=ForecasterEnsembleConfig(combiner=Mean(abs_score=False), target_seq_index=0),
models=[model0, model1, model2], config=ForecasterEnsembleConfig(combiner=Mean(abs_score=False))
)

self.expected_smape = 37
Expand All @@ -52,26 +52,36 @@ def test_mean(self):
logger.info("test_mean\n" + "-" * 80 + "\n")
self.run_test()

def test_selector(self):
print("-" * 80)
def _test_selector(self):
model0 = Arima(ArimaConfig(order=(6, 1, 2), max_forecast_steps=50, transform=TemporalResample("1h")))
model1 = Arima(ArimaConfig(order=(24, 1, 0), transform=TemporalResample("10min"), max_forecast_steps=50))
model2 = AutoProphet(
config=AutoProphetConfig(target_seq_index=0, transform=Identity(), periodicity_strategy="Max")
)
self.ensemble = ForecasterEnsemble(
models=[model0, model1, model2], config=ForecasterEnsembleConfig(combiner=Mean(abs_score=False))
config=ForecasterEnsembleConfig(
models=[model0, model1, model2], combiner=ModelSelector(metric=ForecastMetric.sMAPE), target_seq_index=0
)
)

self.expected_smape = 35
logger.info("test_selector\n" + "-" * 80 + "\n")
self.ensemble.config.combiner = ModelSelector(metric=ForecastMetric.sMAPE)
self.run_test()
# We expect the model selector to select Prophet because it gets the lowest validation sMAPE
valid_smapes = np.asarray(self.ensemble.combiner.metric_values)
self.assertAlmostEqual(np.max(np.abs(valid_smapes - [34.32, 40.66, 30.71])), 0, delta=0.5)
self.assertSequenceEqual(self.ensemble.models_used, [False, False, True])

def test_univariate_selector(self):
print("-" * 80)
logger.info("test_univariate_selector\n" + "-" * 80 + "\n")
self._test_selector()

def test_multivariate_selector(self):
x = self.vals_train.to_pd()
self.vals_train = TimeSeries.from_pd(
pd.DataFrame(np.concatenate((x.values, x.values * 2), axis=1), columns=["A", "B"], index=x.index)
)
self._test_selector()

def run_test(self):
logger.info("Training model...")
self.ensemble.train(self.vals_train)
Expand Down

0 comments on commit 3f7e800

Please sign in to comment.