[BUG] Training TwoTowerModel when loading dataset using `part_size=128MB` crashes
Describe the bug
I am trying to train a TwoTowerModel. I load the datasets (train and val) using NVTabular.Dataset and later pass them to model.fit. When I add the part_size="128MB" to the Dataset instantiation call, then model training will crash. If I remove that it works (unless the dataset is too big and I get OOM). I am not sure if this is a bug in NVTabular or in the Merlin models implementation.
Loading datasets:
train_ds = Dataset(os.path.join(data_folder, "train", "*.parquet"), part_size="128MB")
valid_ds = Dataset(os.path.join(data_folder, "valid", "*.parquet"), part_size="128MB")
The crash log looks like:
Traceback (most recent call last):
File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/app/src/train.py", line 99, in <module>
train(config, tracker, data_folder, args.models_folder)
File "/app/src/train.py", line 65, in train
model.fit(train_ds, validation_data=valid_ds, batch_size=config['train_config']['batch_size'],
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 969, in fit
out = super().fit(**fit_kwargs)
File "/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py", line 70, in error_handler
raise e.with_traceback(filtered_tb) from None
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 1262, in call
outputs, context = self._call_child(block, outputs, context)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 1291, in _call_child
outputs = call_layer(child, inputs, **call_kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
return layer(inputs, *args, **filtered_kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/tabular.py", line 478, in _tabular_call
outputs = self.super().__call__(inputs, *args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
return super().__call__(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 566, in call
out = call_layer(layer, layer_inputs, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
return layer(inputs, *args, **filtered_kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
return super().__call__(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 139, in call
outputs = call_layer(self.block, inputs, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
return layer(inputs, *args, **filtered_kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
return super().__call__(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 269, in call
return call_sequentially(self.layers, inputs, training=training, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 836, in call_sequentially
outputs = call_layer(layer, outputs, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
return layer(inputs, *args, **filtered_kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/tabular.py", line 478, in _tabular_call
outputs = self.super().__call__(inputs, *args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
return super().__call__(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 566, in call
out = call_layer(layer, layer_inputs, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
return layer(inputs, *args, **filtered_kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/tabular.py", line 478, in _tabular_call
outputs = self.super().__call__(inputs, *args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
return super().__call__(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/inputs/embedding.py", line 874, in call
embedded_outputs[name] = self.lookup_feature(name, val)
File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/inputs/embedding.py", line 900, in lookup_feature
out = tf.nn.safe_embedding_lookup_sparse(table_var, val, None, combiner=table.combiner)
ValueError: Exception encountered when calling layer "embedding_features_1" " f"(type EmbeddingFeatures).
Dimensions 27825 and 1 are not compatible
Call arguments received by layer "embedding_features_1" " f"(type EmbeddingFeatures):
• inputs={'X_ids': '<tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7fa8fe2f1ca0>'}
• kwargs={'training': 'False', 'testing': 'False', 'features': {'X_ids': ('tf.Tensor(shape=(27825, 1), dtype=int32)', 'tf.Tensor(shape=(512, 1), dtype=int32)')}}
The error is slightly different on each run but still similar.
I also got similar errors when storing the NVTabular dataset into multiple parquet files (using .to_parquet(..., out_files_per_proc=X).
The error to happen only when using multi-hot columns.
Steps/Code to reproduce bug I don't have a small reproducible example right now.
Expected behavior
I would expect the model to train no matter in how many files the dataset is split or whether I set the part_size parameter of NVTabular.Dataset or not
Environment details (please complete the following information):
- Environment location: Docker
- Method of NVTabular install: Docker
- docker pull: nvcr.io/nvidia/merlin/merlin-tensorflow:22.12
- I then install some additional dependencies such as: clearml, jupyterlab, PyYAML
- docker run --rm -it -d --ipc=host --gpus=all -p 8889:8888 -e CUDA_VISIBLE_DEVICES=0 -v $PWD:/app/ <image_name>
@mats-claassen can you give us more info?
-
what dataset are you using? the datasets we provide in our examples or this is your custom dataset?
-
what's the size of your train set? is it a single parquet file or it consists of multiple parquet files? if your parquet file size consists of one row group and if it is row group memory size large then you might have OOM issues. in that case you can repartition your train parquet file or reset the number of row group size in your parquet file and save it back to disk. Note that row group memory size should be smaller than
part_sizearg. Please refer to this doc for more info. So this is mostly likely not a bug, this is related to your parquet file row group memory size andpart_sizevalue you set. you can increase the part_size value and test again. -
what GPU are you using and what's its memory?
Dimensions 27825 and 1 are not compatible
This looks like an issue with how the data aligns with the model (which doesn't seem like it would directly have to do with the part_size), so I'm transferring this issue over to the Models repo
@rnyak Here go my answers:
- what dataset are you using?
I am using the MovieLens 25M augmented with some additional columns from TMDb. So it is custom but close to your example datasets
- what's the size of your train set? is it a single parquet file or it consists of multiple parquet files?
I have tested with different configurations for this. I have seen warning when I set part_size too small. I am not seeing them currently. The train set is between 70Mb and 300MB+ depending on sample rate and which columns I include in preprocessing. When it is close to or above 300 I start getting OOM during training when the dataset is split into a single parquet file. I have also tried with saving the dataset into multiple parquet files but then I have always faced this same crash when there is at least 1 multi-hot column in the dataset.
- what GPU are you using and what's its memory?
It's a GTX1070 with 8Gb memory
at least 1 multi-hot column in the dataset
This seems like a key point. Are you using an NVT Workflow to pre-process the training data? If so, are the multi-hot features sliced/padded with ListSlice or passed through the ValueCount op to determine the lengths of the list features?
@karlhigley They are not
@mats-claassen if you have list features, i.e. multi-hot, as @karlhigley mentioned you can tag them via ListSlice with pad=True or ValueCount op. here is the example for you to take as a reference script. Please see cell 12.
Hope that helps. If you still have OOM issue can you pls share your dataset together with your NVT workflow script so we can take a look at it?
So I added ValueCount op to the categorical list features but I am still getting errors during training like:
W tensorflow/core/framework/op_kernel.cc:1780] OP_REQUIRES failed at strided_slice_op.cc:105 : INVALID_ARGUMENT: Expected begin, end, and strides to be 1D equal size tensors, but got shapes [2], [16643], and [2] instead..
or:
tensorflow.python.framework.errors_impl.InvalidArgumentError: {{function_node __wrapped__StridedSlice_device_/job:localhost/replica:0/task:0/device:GPU:0}} Index out of range using input dim 1; input has only 1 dims [Op:StridedSlice] name: strided_slice/
This error is not always the same (running the same script over and over). You can get dataset and NVTabular workflow here
@mats-claassen where do you get the error from? it is in the NVT workflow? or you get the error from Two-Tower model training pipeline?
I checked your code and after changing it a bit, it worked for me. you can see the changes I made in the attached zip. nvt.ipynb.zip
Hey, it is during Two Tower model training. Up to that point it works fine
Hey, it is during Two Tower model training. Up to that point it works fine
ok. can you share your TT model code pls? you did not share it if I am not mistaken, you shared your nvt pipeline code.
Here is my train script:
from argparse import ArgumentParser
import os
# prevent TF to claim all GPU memory
from merlin.dataloader.tf_utils import configure_tensorflow
configure_tensorflow(0.4)
from merlin.models.tf import RecallAt, NDCGAt
from nvtabular import Dataset, Workflow
from nvtabular.loader.tensorflow import KerasSequenceLoader
import tensorflow as tf
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
import merlin.models.tf as mm
from tensorflow.keras import regularizers
def parse_arguments():
parser = ArgumentParser()
parser.add_argument("--dataset_name", type=str, required=True)
parser.add_argument("--models_folder", type=str, required=True)
return parser.parse_args()
def train(data_folder: str, models_folder: str):
batch_size = 512
CATEGORICAL_COLUMNS = ["target_id", "collection", "original_language", "director"]
CATEGORICAL_MH_COLUMNS = ["user_history", "genres", ] # "protagonist", "producers", "production_countries"
NUMERIC_COLUMNS = ["vote_average", "vote_count", "popularity", "avg_cast_popularity", "avg_crew_popularity",
"runtime", "release_date",]
# train_ds = Dataset(os.path.join(data_folder, "train", "*.parquet"), part_size="128MB")
train_ds = KerasSequenceLoader(
os.path.join(data_folder, "train", "*.parquet"), # you could also use a glob pattern
batch_size=batch_size,
label_names=["rating"],
cat_names=CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS,
cont_names=NUMERIC_COLUMNS,
engine="parquet",
shuffle=True,
buffer_size=0.06, # how many batches to load at once
parts_per_chunk=1,
# sparse_names=CATEGORICAL_MH_COLUMNS,
# sparse_max={'genres': 20, 'user_history': 100},
# sparse_as_dense=True
)
valid_ds = Dataset(os.path.join(data_folder, "valid", "*.parquet")) # , part_size="128MB")
schema = train_ds.schema.without(['rating', 'user_id_raw', 'item_id_raw', 'target_id_raw'])
epochs = 20
# Build Model
bias_regularizer = regularizers.L2(1e-5)
kernel_reg = regularizers.L1L2(l1=1e-7, l2=1e-5)
query_tower = mm.MLPBlock([64, 16], no_activation_last_layer=True, dropout=0.1,
kernel_regularizer=kernel_reg,
bias_regularizer=bias_regularizer)
item_tower = mm.MLPBlock([128, 16], no_activation_last_layer=True, dropout=0.1,
kernel_regularizer=kernel_reg,
bias_regularizer=bias_regularizer)
model = mm.TwoTowerModel(schema,
query_tower=query_tower,
item_tower=item_tower,
samplers=[mm.InBatchSampler()],
embedding_options=mm.EmbeddingOptions(infer_embedding_sizes=True),
)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.005)
model.compile(optimizer=optimizer,
loss="categorical_crossentropy",
metrics=[RecallAt(10), RecallAt(100), NDCGAt(10)])
callbacks = [EarlyStopping(monitor='val_loss', mode='min', patience=5)]
model.fit(train_ds, validation_data=valid_ds, batch_size=batch_size,
epochs=epochs, shuffle=True, drop_last=True,
train_metrics_steps=20, callbacks=callbacks)
if __name__ == "__main__":
args = parse_arguments()
data_folder = args.dataset_name
train(data_folder, args.models_folder)