models icon indicating copy to clipboard operation
models copied to clipboard

[BUG] Training TwoTowerModel when loading dataset using `part_size=128MB` crashes

Open mats-claassen opened this issue 2 years ago • 11 comments

Describe the bug I am trying to train a TwoTowerModel. I load the datasets (train and val) using NVTabular.Dataset and later pass them to model.fit. When I add the part_size="128MB" to the Dataset instantiation call, then model training will crash. If I remove that it works (unless the dataset is too big and I get OOM). I am not sure if this is a bug in NVTabular or in the Merlin models implementation.

Loading datasets:

    train_ds = Dataset(os.path.join(data_folder, "train", "*.parquet"), part_size="128MB")
    valid_ds = Dataset(os.path.join(data_folder, "valid", "*.parquet"), part_size="128MB")

The crash log looks like:

Traceback (most recent call last):
  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/app/src/train.py", line 99, in <module>
    train(config, tracker, data_folder, args.models_folder)
  File "/app/src/train.py", line 65, in train
    model.fit(train_ds, validation_data=valid_ds, batch_size=config['train_config']['batch_size'],
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 969, in fit
    out = super().fit(**fit_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py", line 70, in error_handler
    raise e.with_traceback(filtered_tb) from None
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 1262, in call
    outputs, context = self._call_child(block, outputs, context)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 1291, in _call_child
    outputs = call_layer(child, inputs, **call_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
    return layer(inputs, *args, **filtered_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/tabular.py", line 478, in _tabular_call
    outputs = self.super().__call__(inputs, *args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
    return super().__call__(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 566, in call
    out = call_layer(layer, layer_inputs, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
    return layer(inputs, *args, **filtered_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
    return super().__call__(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/models/base.py", line 139, in call
    outputs = call_layer(self.block, inputs, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
    return layer(inputs, *args, **filtered_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
    return super().__call__(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 269, in call
    return call_sequentially(self.layers, inputs, training=training, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 836, in call_sequentially
    outputs = call_layer(layer, outputs, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
    return layer(inputs, *args, **filtered_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/tabular.py", line 478, in _tabular_call
    outputs = self.super().__call__(inputs, *args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
    return super().__call__(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py", line 566, in call
    out = call_layer(layer, layer_inputs, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py", line 437, in call_layer
    return layer(inputs, *args, **filtered_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/tabular.py", line 478, in _tabular_call
    outputs = self.super().__call__(inputs, *args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/config/schema.py", line 58, in __call__
    return super().__call__(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/inputs/embedding.py", line 874, in call
    embedded_outputs[name] = self.lookup_feature(name, val)
  File "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/inputs/embedding.py", line 900, in lookup_feature
    out = tf.nn.safe_embedding_lookup_sparse(table_var, val, None, combiner=table.combiner)
ValueError: Exception encountered when calling layer "embedding_features_1" "                 f"(type EmbeddingFeatures).

Dimensions 27825 and 1 are not compatible

Call arguments received by layer "embedding_features_1" "                 f"(type EmbeddingFeatures):
  • inputs={'X_ids': '<tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7fa8fe2f1ca0>'}
  • kwargs={'training': 'False', 'testing': 'False', 'features': {'X_ids': ('tf.Tensor(shape=(27825, 1), dtype=int32)', 'tf.Tensor(shape=(512, 1), dtype=int32)')}}

The error is slightly different on each run but still similar.

I also got similar errors when storing the NVTabular dataset into multiple parquet files (using .to_parquet(..., out_files_per_proc=X). The error to happen only when using multi-hot columns.

Steps/Code to reproduce bug I don't have a small reproducible example right now.

Expected behavior I would expect the model to train no matter in how many files the dataset is split or whether I set the part_size parameter of NVTabular.Dataset or not

Environment details (please complete the following information):

  • Environment location: Docker
  • Method of NVTabular install: Docker
    • docker pull: nvcr.io/nvidia/merlin/merlin-tensorflow:22.12
    • I then install some additional dependencies such as: clearml, jupyterlab, PyYAML
    • docker run --rm -it -d --ipc=host --gpus=all -p 8889:8888 -e CUDA_VISIBLE_DEVICES=0 -v $PWD:/app/ <image_name>

mats-claassen avatar Mar 17 '23 18:03 mats-claassen

@mats-claassen can you give us more info?

  • what dataset are you using? the datasets we provide in our examples or this is your custom dataset?

  • what's the size of your train set? is it a single parquet file or it consists of multiple parquet files? if your parquet file size consists of one row group and if it is row group memory size large then you might have OOM issues. in that case you can repartition your train parquet file or reset the number of row group size in your parquet file and save it back to disk. Note that row group memory size should be smaller than part_size arg. Please refer to this doc for more info. So this is mostly likely not a bug, this is related to your parquet file row group memory size and part_size value you set. you can increase the part_size value and test again.

  • what GPU are you using and what's its memory?

rnyak avatar Mar 29 '23 00:03 rnyak

Dimensions 27825 and 1 are not compatible

This looks like an issue with how the data aligns with the model (which doesn't seem like it would directly have to do with the part_size), so I'm transferring this issue over to the Models repo

karlhigley avatar Mar 29 '23 03:03 karlhigley

@rnyak Here go my answers:

  • what dataset are you using?

I am using the MovieLens 25M augmented with some additional columns from TMDb. So it is custom but close to your example datasets

  • what's the size of your train set? is it a single parquet file or it consists of multiple parquet files?

I have tested with different configurations for this. I have seen warning when I set part_size too small. I am not seeing them currently. The train set is between 70Mb and 300MB+ depending on sample rate and which columns I include in preprocessing. When it is close to or above 300 I start getting OOM during training when the dataset is split into a single parquet file. I have also tried with saving the dataset into multiple parquet files but then I have always faced this same crash when there is at least 1 multi-hot column in the dataset.

  • what GPU are you using and what's its memory?

It's a GTX1070 with 8Gb memory

mats-claassen avatar Mar 29 '23 11:03 mats-claassen

at least 1 multi-hot column in the dataset

This seems like a key point. Are you using an NVT Workflow to pre-process the training data? If so, are the multi-hot features sliced/padded with ListSlice or passed through the ValueCount op to determine the lengths of the list features?

karlhigley avatar Mar 29 '23 15:03 karlhigley

@karlhigley They are not

mats-claassen avatar Mar 29 '23 17:03 mats-claassen

@mats-claassen if you have list features, i.e. multi-hot, as @karlhigley mentioned you can tag them via ListSlice with pad=True or ValueCount op. here is the example for you to take as a reference script. Please see cell 12.

Hope that helps. If you still have OOM issue can you pls share your dataset together with your NVT workflow script so we can take a look at it?

rnyak avatar Mar 30 '23 16:03 rnyak

So I added ValueCount op to the categorical list features but I am still getting errors during training like: W tensorflow/core/framework/op_kernel.cc:1780] OP_REQUIRES failed at strided_slice_op.cc:105 : INVALID_ARGUMENT: Expected begin, end, and strides to be 1D equal size tensors, but got shapes [2], [16643], and [2] instead..

or: tensorflow.python.framework.errors_impl.InvalidArgumentError: {{function_node __wrapped__StridedSlice_device_/job:localhost/replica:0/task:0/device:GPU:0}} Index out of range using input dim 1; input has only 1 dims [Op:StridedSlice] name: strided_slice/

This error is not always the same (running the same script over and over). You can get dataset and NVTabular workflow here

mats-claassen avatar Apr 03 '23 14:04 mats-claassen

@mats-claassen where do you get the error from? it is in the NVT workflow? or you get the error from Two-Tower model training pipeline?

I checked your code and after changing it a bit, it worked for me. you can see the changes I made in the attached zip. nvt.ipynb.zip

rnyak avatar Apr 04 '23 00:04 rnyak

Hey, it is during Two Tower model training. Up to that point it works fine

mats-claassen avatar Apr 04 '23 11:04 mats-claassen

Hey, it is during Two Tower model training. Up to that point it works fine

ok. can you share your TT model code pls? you did not share it if I am not mistaken, you shared your nvt pipeline code.

rnyak avatar Apr 04 '23 19:04 rnyak

Here is my train script:

from argparse import ArgumentParser
import os

# prevent TF to claim all GPU memory
from merlin.dataloader.tf_utils import configure_tensorflow
configure_tensorflow(0.4)

from merlin.models.tf import RecallAt, NDCGAt
from nvtabular import Dataset, Workflow
from nvtabular.loader.tensorflow import KerasSequenceLoader
import tensorflow as tf
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
import merlin.models.tf as mm
from tensorflow.keras import regularizers


def parse_arguments():
    parser = ArgumentParser()
    parser.add_argument("--dataset_name", type=str, required=True)
    parser.add_argument("--models_folder", type=str, required=True)
    return parser.parse_args()


def train(data_folder: str, models_folder: str):
    batch_size = 512

    CATEGORICAL_COLUMNS = ["target_id", "collection", "original_language", "director"]
    CATEGORICAL_MH_COLUMNS = ["user_history", "genres", ]  # "protagonist", "producers", "production_countries"
    NUMERIC_COLUMNS = ["vote_average", "vote_count", "popularity", "avg_cast_popularity", "avg_crew_popularity",
                       "runtime", "release_date",]

    # train_ds = Dataset(os.path.join(data_folder, "train", "*.parquet"), part_size="128MB")
    train_ds = KerasSequenceLoader(
        os.path.join(data_folder, "train", "*.parquet"),  # you could also use a glob pattern
        batch_size=batch_size,
        label_names=["rating"],
        cat_names=CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS,
        cont_names=NUMERIC_COLUMNS,
        engine="parquet",
        shuffle=True,
        buffer_size=0.06,  # how many batches to load at once
        parts_per_chunk=1,
        # sparse_names=CATEGORICAL_MH_COLUMNS,
        # sparse_max={'genres': 20, 'user_history': 100},
        # sparse_as_dense=True
    )

    valid_ds = Dataset(os.path.join(data_folder, "valid", "*.parquet"))  # , part_size="128MB")
    schema = train_ds.schema.without(['rating', 'user_id_raw', 'item_id_raw', 'target_id_raw'])

    epochs = 20
    # Build Model
    bias_regularizer = regularizers.L2(1e-5)
    kernel_reg = regularizers.L1L2(l1=1e-7, l2=1e-5)
    query_tower = mm.MLPBlock([64, 16], no_activation_last_layer=True, dropout=0.1,
                              kernel_regularizer=kernel_reg,
                              bias_regularizer=bias_regularizer)
    item_tower = mm.MLPBlock([128, 16], no_activation_last_layer=True, dropout=0.1,
                             kernel_regularizer=kernel_reg,
                             bias_regularizer=bias_regularizer)
    model = mm.TwoTowerModel(schema,
                query_tower=query_tower,
                item_tower=item_tower,
                samplers=[mm.InBatchSampler()],
                embedding_options=mm.EmbeddingOptions(infer_embedding_sizes=True),
            )
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.005)
    model.compile(optimizer=optimizer,
                  loss="categorical_crossentropy",
                  metrics=[RecallAt(10), RecallAt(100), NDCGAt(10)])

    callbacks = [EarlyStopping(monitor='val_loss', mode='min', patience=5)]

    model.fit(train_ds, validation_data=valid_ds, batch_size=batch_size,
              epochs=epochs, shuffle=True, drop_last=True,
              train_metrics_steps=20, callbacks=callbacks)


if __name__ == "__main__":
    args = parse_arguments()
    data_folder = args.dataset_name
    train(data_folder, args.models_folder)

mats-claassen avatar Apr 05 '23 12:04 mats-claassen