M5 evaluation not working
I am trying to run the GluonTS tutorial on the M5 dataset. However, the m5 evaluation code is failing. Can someone please help me with the same?
To Reproduce
https://github.com/awslabs/gluon-ts/blob/dev/examples/m5_gluonts_template.ipynb
%matplotlib inline
import mxnet as mx
from mxnet import gluon
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import os
from tqdm import tqdm
from pathlib import Path
single_prediction_length = 28
submission_prediction_length = single_prediction_length * 2
m5_input_path="./m5-forecasting-accuracy"
submission=True
if submission:
prediction_length = submission_prediction_length
else:
prediction_length = single_prediction_length
calendar = pd.read_csv(f'{m5_input_path}/calendar.csv')
sales_train_validation = pd.read_csv(f'{m5_input_path}/sales_train_validation.csv')
sample_submission = pd.read_csv(f'{m5_input_path}/sample_submission.csv')
sell_prices = pd.read_csv(f'{m5_input_path}/sell_prices.csv')
cal_features = calendar.drop(
['date', 'wm_yr_wk', 'weekday', 'wday', 'month', 'year', 'event_name_1', 'event_name_2', 'd'],
axis=1
)
cal_features['event_type_1'] = cal_features['event_type_1'].apply(lambda x: 0 if str(x)=="nan" else 1)
cal_features['event_type_2'] = cal_features['event_type_2'].apply(lambda x: 0 if str(x)=="nan" else 1)
test_cal_features = cal_features.values.T
if submission:
train_cal_features = test_cal_features[:,:-submission_prediction_length]
else:
train_cal_features = test_cal_features[:,:-submission_prediction_length-single_prediction_length]
test_cal_features = test_cal_features[:,:-submission_prediction_length]
test_cal_features_list = [test_cal_features] * len(sales_train_validation)
train_cal_features_list = [train_cal_features] * len(sales_train_validation)
state_ids = sales_train_validation["state_id"].astype('category').cat.codes.values
state_ids_un , state_ids_counts = np.unique(state_ids, return_counts=True)
store_ids = sales_train_validation["store_id"].astype('category').cat.codes.values
store_ids_un , store_ids_counts = np.unique(store_ids, return_counts=True)
cat_ids = sales_train_validation["cat_id"].astype('category').cat.codes.values
cat_ids_un , cat_ids_counts = np.unique(cat_ids, return_counts=True)
dept_ids = sales_train_validation["dept_id"].astype('category').cat.codes.values
dept_ids_un , dept_ids_counts = np.unique(dept_ids, return_counts=True)
item_ids = sales_train_validation["item_id"].astype('category').cat.codes.values
item_ids_un , item_ids_counts = np.unique(item_ids, return_counts=True)
stat_cat_list = [item_ids, dept_ids, cat_ids, store_ids, state_ids]
stat_cat = np.concatenate(stat_cat_list)
stat_cat = stat_cat.reshape(len(stat_cat_list), len(item_ids)).T
stat_cat_cardinalities = [len(item_ids_un), len(dept_ids_un), len(cat_ids_un), len(store_ids_un), len(state_ids_un)]
from gluonts.dataset.common import load_datasets, ListDataset
from gluonts.dataset.field_names import FieldName
train_df = sales_train_validation.drop(["id","item_id","dept_id","cat_id","store_id","state_id"], axis=1)
train_target_values = train_df.values
if submission == True:
test_target_values = [np.append(ts, np.ones(submission_prediction_length) * np.nan) for ts in train_df.values]
else:
test_target_values = train_target_values.copy()
train_target_values = [ts[:-single_prediction_length] for ts in train_df.values]
m5_dates = [pd.Timestamp("2011-01-29", freq='1D') for _ in range(len(sales_train_validation))]
train_ds = ListDataset([
{
FieldName.TARGET: target,
FieldName.START: start,
FieldName.FEAT_DYNAMIC_REAL: fdr,
FieldName.FEAT_STATIC_CAT: fsc
}
for (target, start, fdr, fsc) in zip(train_target_values,
m5_dates,
train_cal_features_list,
stat_cat)
], freq="D")
test_ds = ListDataset([
{
FieldName.TARGET: target,
FieldName.START: start,
FieldName.FEAT_DYNAMIC_REAL: fdr,
FieldName.FEAT_STATIC_CAT: fsc
}
for (target, start, fdr, fsc) in zip(test_target_values,
m5_dates,
test_cal_features_list,
stat_cat)
], freq="D")
from gluonts.model.deepar import DeepAREstimator
from gluonts.mx.distribution.neg_binomial import NegativeBinomialOutput
from gluonts.mx.trainer import Trainer
estimator = DeepAREstimator(
prediction_length=prediction_length,
freq="D",
distr_output = NegativeBinomialOutput(),
use_feat_dynamic_real=True,
use_feat_static_cat=True,
cardinality=stat_cat_cardinalities,
trainer=Trainer(
learning_rate=1e-3,
epochs=100,
num_batches_per_epoch=50,
batch_size=32
)
)
predictor = estimator.train(train_ds)
from gluonts.evaluation.backtest import make_evaluation_predictions
forecast_it, ts_it = make_evaluation_predictions(
dataset=test_ds,
predictor=predictor,
num_samples=100
)
print("Obtaining time series conditioning values ...")
tss = list(tqdm(ts_it, total=len(test_ds)))
print("Obtaining time series predictions ...")
forecasts = list(tqdm(forecast_it, total=len(test_ds)))
submission = False
if submission == False:
from gluonts.evaluation import Evaluator
class M5Evaluator(Evaluator):
def get_metrics_per_ts(self, time_series, forecast):
successive_diff = np.diff(time_series.values.reshape(len(time_series)))
successive_diff = successive_diff ** 2
successive_diff = successive_diff[:-prediction_length]
denom = np.mean(successive_diff)
pred_values = forecast.samples.mean(axis=0)
true_values = time_series.values.reshape(len(time_series))[-prediction_length:]
num = np.mean((pred_values - true_values)**2)
rmsse = num / denom
metrics = super().get_metrics_per_ts(time_series, forecast)
metrics["RMSSE"] = rmsse
return metrics
def get_aggregate_metrics(self, metric_per_ts):
wrmsse = metric_per_ts["RMSSE"].mean()
agg_metric , _ = super().get_aggregate_metrics(metric_per_ts)
agg_metric["MRMSSE"] = wrmsse
return agg_metric, metric_per_ts
evaluator = M5Evaluator(quantiles=[0.5, 0.67, 0.95, 0.99])
agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(test_ds))
print(json.dumps(agg_metrics, indent=4))
Error
Running evaluation: 100%|██████████| 30490/30490 [00:00<00:00, 111656.51it/s]
Output exceeds the size limit. Open the full output data in a text editor
---------------------------------------------------------------------------
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/multiprocess/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/multiprocess/pool.py", line 48, in mapstar
return list(map(*args))
File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/gluonts/evaluation/_base.py", line 56, in worker_function
File "/var/folders/gr/ff8nqlbn5yj2xyvj129ggsfh0000gp/T/ipykernel_45492/2865651275.py", line 16, in get_metrics_per_ts
metrics = super().get_metrics_per_ts(time_series, forecast)
File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/gluonts/evaluation/_base.py", line 318, in get_metrics_per_ts
try:
ValueError: operands could not be broadcast together with shapes (28,) (56,)
"""
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
/Users/poulamisarkar/Documents/TUM/sem2/seminar/glutons /m5_gluonts_template.ipynb Cell 26' in <cell line: 1>()
24 return agg_metric, metric_per_ts
27 evaluator = M5Evaluator(quantiles=[0.5, 0.67, 0.95, 0.99])
---> 28 agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(test_ds))
29 print(json.dumps(agg_metrics, indent=4))
...
769 return self._value
770 else:
--> 771 raise self._value
ValueError: operands could not be broadcast together with shapes (28,) (56,)
Environment
- Operating system: MACOS
- Python version: 3.9.12
- GluonTS version: 0.9.4
@Poulami-Sarkar hi, thanks for raising the issue! I cannot reproduce the issue locally.
For brevity, I'm running the example using epochs = 3 in the estimator, and reducing the test dataset in size after it's created, with
from itertools import islice
test_ds = list(islice(test_ds, 20))
This way the code runs in 20 seconds or so. Could you confirm that you get the error also with these changes?
Hello,
I tried the changes you suggested but now I am getting a new error
`Running evaluation: 0%| | 0/20 [00:00<?, ?it/s]
Traceback (most recent call last):
File "
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
100%|██████████████████████`
@Poulami-Sarkar looks like there are some issues with multiprocessing. You can try two things to fix this:
Running the following on top of your script/notebook
import multiprocessing
multiprocessing.set_start_method('fork')
Or setting
num_workers=None
when constructing the Evaluator.