Nested model ensembling
Apologies if this is covered by some other feature request/issue already: Is there a way to nest global forecasting models that allows to properly train them with covariates? E.g. I have a trained Xgboost model that produces forecast for N steps ahead, and now I want to use the outputs of this model as inputs into TSMixer + the same features as inputs into the Xgboost. As I understand it, the current ensembling models do not support this since the covariates for the forecasting model (xgboost) are not passed into the regression model (TSMixer). Only the forecasts themselves are. Since we are producing forecasts for N steps ahead, we can't just precompute the Xgboost outputs and pass it as a feature into TSMixer either since they will vary depending on which timestep we are predicting from.
Is this correct or am I missing something?
I have ideas on ways to handle it but want to confirm that this is indeed the case as of right now.
Hi @tRosenflanz, sorry for the late reply.
You are correct that currently it's not possible to use covariates for the ensemble model in the RegressionEnsembleModel, as the .ensemble() will only consume the forecasting models' predictions. Additionally the model specified should be a scikit-learn compatible regressor, because the implementation treats ensembling as a standard regression problem on the prediction values, rather than a full time-series forecasting task. Therefore, it cannot natively support a global forecaster like TSMixer as the meta-learner. If you have any ideas to improve the logic feel free to mention it so we can discuss further here :)
I think what you're trying to achieve is also possible to do adapting torch datasets. Check this issue as it should be relevant. Note that the logic might differ slightly now as the code has been refactored (specifically, "Covariates"-specific torch datasets were removed), but the general principle of chaining model outputs via datasets remains relevant.
Thanks! I didn't find this issue when searching but this is very much along the lines of what I am thinking. The new dataset class should make it much simpler since it is just a single class to work with - minor differences in train/inference can be corrected by inheriting from a single overload class.
I will investigate further - ~~perhaps it can be adapted as an encoder to be a generic capability as well which would be even cleaner.~~ Encoders appear to be a one time operation pre-datasets so I don't think they work here.
The other option I was thinking of is to make a separate model class that would store both models and glue them together through a single module. It would require much more work and be restricted to TorchModels but will allow for more dynamic computation and perhaps even joint training.
For context, I would like to embed the newly introduced Chronos-2 (already in Darts!!!) into training of Torch Models as the sort of "baseline" anchor and compare that to fine-tuning Chronos itself. This sort of usage will probably be more popular with the rise of the foundation timeseries models so worth investigating now.
Alright. I have a solution that works and actually slightly improves the results for my models albeit at a cost of some feature sensitivity. I will leave this here if someone wants to experiment more:
This isn't the most elegant way of doing it but adding a component to the future covariates that serves as the "alignment" feature to allows to bypass the memory_indexer in the Dataset classes and maintain the shape of the numpy arrays. Here I used the date in nanoseconds for that. Series index is trivial to retrieve.
df["chronos_index_aligner"] = df["date"].astype("int") <- include this in the future covariates generation
covariates is a dict with 3 keys (target,past_cov,future_cov) that contains covariates of a specific type
import darts.timeseries as ts
def remove_component(cov_list, component_to_remove):
# returns a copy without a specified component
components = cov_list[0].components
index_to_remove = components.get_loc(component_to_remove)
components_new = [x for x in components if x != component_to_remove]
res = []
for series in cov_list:
vals = np.delete(series.values(), index_to_remove, axis=1)
series_new = ts.TimeSeries.from_times_and_values(
times=series.time_index, values=vals, columns=components_new
)
res.append(series_new)
return res
def get_model_hc_forecasts_as_dict(model, covariates, aligner_feature):
#mute the dtype warnings
log = logging.getLogger('darts')
log.setLevel(logging.ERROR)
#provide forecast for each timestep it is possible to predict on
hc = model.historical_forecasts(
series=covariates['target'],
past_covariates=covariates['past_cov'],
future_covariates=remove_component(covariates['future_cov'],aligner_feature),
forecast_horizon=model.output_chunk_length,
stride=1,
last_points_only= False,
overlap_end=True, #must be true to get future forecasts (confusing name)
retrain=False,
num_samples=1,
verbose=True,
)
#revert logging
log.setLevel(logging.WARNING)
hc_flattened = []
#convert the nested lists to a dictionary with the alignment key
for i,series_hc in enumerate(hc):
group_flattened = []
for forecast in series_hc:
min_date = forecast.start_time().asm8.astype(np.float64) #date in nanoseconds
vals = forecast.values().squeeze().tolist()
group_flattened.append((i,min_date, vals,0))
group_df = pd.DataFrame(group_flattened, columns=['series_idx','date_as_int', 'model_prediction','dummy_zero'])
hc_flattened.append(group_df)
hc_flattened = pd.concat(hc_flattened)
feature_dict = hc_flattened.set_index(['series_idx','date_as_int']).to_dict(orient='index')
return feature_dict
Dataset overrider classes:
import copy
class DatasetAugmenter:
def __init__(self,feature_dict,feature_name, alignment_feature, *args, **kwargs):
super().__init__(*args, **kwargs)
self.feature_dict = feature_dict
self.feature_name = feature_name
self.alignment_feature = alignment_feature
self._alignment_feature_index = self._get_component_index(self.future_covariates, alignment_feature)
def __getitem__(self, idx):
res = list(super().__getitem__(idx))
#align with the feature dictionary using the alignment feature
series_idx = self._get_series_idx(idx)
date_idx = res[self.future_cov_index][0, self._alignment_feature_index]
#retrieve the needed feature values (it is a list)
feature_key = (series_idx, date_idx)
feature_values = self.feature_dict[feature_key][self.feature_name]
#replace the alignment feature with the feature values
#copy since we are modifying in place
for i,new_value in zip([self.future_cov_index, self.historic_future_cov_index],[feature_values,0]):
res[i] = np.copy(res[i])
res[i][:, self._alignment_feature_index] = new_value
return res
def _get_series_idx(self,idx):
raise NotImplementedError()
def _get_component_index(self,series_list, component_name):
series = series_list[0] # check first series
components = series.components
return components.get_loc(component_name)
class AugmentedSequentialTorchTrainingDataset(DatasetAugmenter, SequentialTorchTrainingDataset):
historic_future_cov_index = 2
future_cov_index=3
def _get_series_idx(self,idx):
return idx // self.max_samples_per_ts
class AugmentedSequentialTorchInferenceDataset(DatasetAugmenter, SequentialTorchInferenceDataset):
historic_future_cov_index = 3
future_cov_index=4
def _get_series_idx(self,idx):
return idx
And now it can be used as follows. I think any darts model should be fine but here is a test with Chronos:
from darts.models import Chronos2Model
model_chronos = Chronos2Model(
input_chunk_length=100,
output_chunk_length=5,
output_chunk_shift=0,
pl_trainer_kwargs = {'precision': '16-mixed'}
)
kwargs_chronos=dict( input_chunk_length=model_chronos.input_chunk_length,
output_chunk_length=model_chronos.output_chunk_length,
output_chunk_shift= model_chronos.output_chunk_shift,
use_static_covariates=model_chronos.supports_static_covariates)
#need to fit this model (doesn't actually train the model)
train_dataset = SequentialTorchTrainingDataset(
covariates["target"],
covariates["past_cov"],
remove_component(covariates['future_cov'],'chronos_index_aligner'),
**kwargs_chronos)
model_chronos.fit_from_dataset(
train_dataset,
verbose=False
)
feature_dict_train = get_model_hc_forecasts_as_dict(model_chronos,covariates,'chronos_index_aligner')
augmented_train_dataset=AugmentedSequentialTorchTrainingDataset(
feature_dict=feature_dict_train,
feature_name= 'model_prediction',
alignment_feature = 'chronos_index_aligner',
series=covariates["target"],
past_covariates=covariates["past_cov"],
future_covariates=covariates['future_cov'],
**kwargs_chronos,
)
I think the "alignment" feature is pretty easy to use and can be pretty much any unique value per TimeSeries. The replacement operation is a bit expensive due to copying - I bet there is a cleaner way to do it but it was still very fast overall