fuzzy_dedup OOM issue
Describe the bug Use 5*A100 GPUs to do fuzzey_dedup task and encountered OOM issues. here is error info
2024-12-31 05:02:43,370 - distributed.worker - ERROR - Could not serialize object of type DataFrame
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 366, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 52, in dask_dumps
sub_header, frames = dumps(x)
File "/usr/local/lib/python3.10/dist-packages/cudf/comm/serialize.py", line 19, in dask_serialize_cudf_object
return x.host_serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 150, in host_serialize
header, frames = self.device_serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 90, in device_serialize
header, frames = self.serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/core/dataframe.py", line 1171, in serialize
header, frames = super().serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/utils/performance_tracking.py", line 51, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/cudf/core/frame.py", line 100, in serialize
header["columns"], frames = serialize_columns(self._columns)
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in serialize_columns
header_columns = [c.serialize() for c in columns]
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in <listcomp>
header_columns = [c.serialize() for c in columns]
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 1226, in serialize
if self.children:
File "column.pyx", line 293, in cudf._lib.column.Column.children.__get__
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /tmp/pip-build-env-fkfud_57/normal/lib/python3.10/site-packages/librmm/include/rmm/mr/device/cuda_memory_resource.hpp:60: cudaErrorMemoryAllocation out of memory
Steps/Code to reproduce bug
import argparse
import time
import os
from datetime import datetime
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import ArgumentHelper
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
INPUT_DIR = os.environ.get("TMP_LLM_STEP_INPUT_DIR")
OUTPUT_DIR = os.environ.get("TMP_LLM_STEP_OUTPUT_DIR")
CACHE_DIR = os.environ.get("TMP_LLM_STEP_CACHE_DIR")
LOG_DIR = os.environ.get("TMP_LLM_NEMO_LOG_ROOT")
PROFILE_DIR = os.environ.get("TMP_LLM_STEP_PROFILE_DIR")
SYS_LOG_FILE = os.environ.get("TMP_LLM_NEMO_PIPELINE_LOG")
ID_FIELD = os.environ.get("TMP_LLM_NEMO_ID_FIELD")
TEXT_FIELD = os.environ.get("TMP_LLM_NEMO_TEXT_FIELD")
INPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_INPUT_DATA_FORMAT")
OUTPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_OUTPUT_DATA_FORMAT")
def pre_imports():
import dask, cudf, dask_cudf
def attach_args(
parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
):
return ArgumentHelper(parser).add_distributed_args()
def pipeline_log(msg, level = "INFO"):
log_file_path = SYS_LOG_FILE
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
log_entry = f"[{current_time}]: [{level}]: {msg}\n"
print(log_entry)
with open(log_file_path, 'a') as log_file:
log_file.write(log_entry)
def ensure_directory_exists(filename: str):
os.makedirs(os.path.dirname(filename), exist_ok=True)
def load_dataset(input_data_dir, file_type = "parquet", backend = "cudf"):
files = list(get_all_files_paths_under(input_data_dir))
raw_data = read_data(files, file_type = file_type, backend = backend, add_filename=True)
dataset = DocumentDataset(raw_data)
return dataset
def fuzzy_dedup(input_dataset):
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir = CACHE_DIR,
id_field = ID_FIELD,
text_field = TEXT_FIELD,
seed=42,
char_ngrams=5,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=True,
num_anchors=2,
jaccard_threshold=0.8,
)
fuzzy_dup = FuzzyDuplicates(logger = LOG_DIR, config = fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset = input_dataset)
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)
result = input_dataset.df[
~input_dataset.df[ID_FIELD].isin(
docs_to_remove[ID_FIELD].compute()
)
]
return DocumentDataset(result)
def main(args):
client = get_client(**ArgumentHelper.parse_client_args(args))
backend = "pandas"
if args.device == "gpu":
backend = "cudf"
pre_imports()
pipeline_log(f"dedup source: {INPUT_DIR}")
input_dataset = load_dataset(INPUT_DIR)
doc_total = len(input_dataset)
pipeline_log(f"{doc_total} lines read")
dataset_processed = fuzzy_dedup(input_dataset)
doc_count = len(dataset_processed)
pipeline_log(f"{doc_count} linese remains after fuzzy dedup")
pipeline_log(f"output path: {OUTPUT_DIR}")
write_to_disk(dataset_processed.df, OUTPUT_DIR, output_type = OUTPUT_DATA_FORMAT)
pipeline_log("write completed")
if __name__ == "__main__":
main(attach_args().parse_args())
Environment overview (please complete the following information)
with 5 servers.
each has 1*A100(80GB).
each has 70 CPU cores.
each has 320GB CPU memory.
Additional context
use dclm-baseline 1.0 parquet data and totally 8TB parquet data(after add nemo_id and no compression)
Thanks for raising the issue @chenrui17 . For 8TB of input data on 5 A100 GPUs (~400GB memory) the memory requirements to hold intermediates during stages like LSH might lead to OOM's.
I have a few recommendations to reduce memory and computational requirements at this scale.
char_ngrams=24, # use larger ngram size to reduce false positives.
buckets_per_shuffle=1, # process 1 bucket per iteration of LSH to reduce memory requirements.
# skip the false positive check which is computationally expensive.
# In practice this is usually 1-2% of documents based on our experiments.
false_positive_check=False,
Some of the changes suggested above are becoming the default in Curator (see #386).
Additionally I would recommend parquet files <= 2GB uncompressed if you have large files. If using small files, you can use the blocksize=1GB arg during read_parquet (here: https://github.com/NVIDIA/NeMo-Curator/blob/9c8f185a5c3b5891d3c3ac78200c2684b10c3014/nemo_curator/datasets/doc_dataset.py#L90-L98 or specify files_per_partition to combine multiple parquet files into a single block for processing in prior versions of curator.
Internally we've typically used 16-24 GPUs for processing data at this scale so I'm not sure if these suggestions will prevent OOM errors on 5 GPUs, but happy to follow up and see if this improves things.
I'm currently dealing with parquet files of around 15 to 20 T on 64 H800 sheets. Unfortunately, it also throws an oom error. The following are my configuration parameters
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache_dir,
id_field=dup_field,
text_field=text_field,
seed=42,
char_ngrams=24,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=3,
false_positive_check=False,
)
I wonder if this is normal? Because I think 64 H800s are already quite a large cluster. Compared with data processing. Secondly, if I expand the data to 40T or even 50T, are there any better suggestions in the Settings?
Sorry missed this comment in my notifications @HuaYZhao. Could you share more details around which stage of the deduplication pipeline you run. Is it in the LSH stage or someplace else like connected components?
If it's the LSH stage I would recommend lower buckets_per_shuffle even further to 1.
The higher memory usage in LSH comes from needing to partition the minhashes on the hash key and the shuffle can be memory intensive. Currently for the 64TB scale we end up using 96-128 GPUs for the LSH and Connected Components stages. Minhash and edgelist computation is parallel and will not OOM depending on available GPU memory.
We're working on many improvements to deduplication (and Curator in general) in the 25.06/25.08 timeline that utilizes a different shuffle API that should improve memory requirements significantly.
This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.
This issue was closed because it has been inactive for 7 days since being marked as stale.