NeMo-Curator icon indicating copy to clipboard operation
NeMo-Curator copied to clipboard

fuzzy_dedup OOM issue

Open chenrui17 opened this issue 10 months ago • 4 comments
trafficstars

Describe the bug Use 5*A100 GPUs to do fuzzey_dedup task and encountered OOM issues. here is error info

2024-12-31 05:02:43,370 - distributed.worker - ERROR - Could not serialize object of type DataFrame
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 52, in dask_dumps
    sub_header, frames = dumps(x)
  File "/usr/local/lib/python3.10/dist-packages/cudf/comm/serialize.py", line 19, in dask_serialize_cudf_object
    return x.host_serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 150, in host_serialize
    header, frames = self.device_serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 90, in device_serialize
    header, frames = self.serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/dataframe.py", line 1171, in serialize
    header, frames = super().serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/utils/performance_tracking.py", line 51, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/frame.py", line 100, in serialize
    header["columns"], frames = serialize_columns(self._columns)
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in serialize_columns
    header_columns = [c.serialize() for c in columns]
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in <listcomp>
    header_columns = [c.serialize() for c in columns]
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 1226, in serialize
    if self.children:
  File "column.pyx", line 293, in cudf._lib.column.Column.children.__get__
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /tmp/pip-build-env-fkfud_57/normal/lib/python3.10/site-packages/librmm/include/rmm/mr/device/cuda_memory_resource.hpp:60: cudaErrorMemoryAllocation out of memory

Steps/Code to reproduce bug

import argparse
import time
import os
from datetime import datetime
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import ArgumentHelper
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
INPUT_DIR = os.environ.get("TMP_LLM_STEP_INPUT_DIR")
OUTPUT_DIR = os.environ.get("TMP_LLM_STEP_OUTPUT_DIR")
CACHE_DIR = os.environ.get("TMP_LLM_STEP_CACHE_DIR")
LOG_DIR = os.environ.get("TMP_LLM_NEMO_LOG_ROOT")
PROFILE_DIR = os.environ.get("TMP_LLM_STEP_PROFILE_DIR")
SYS_LOG_FILE = os.environ.get("TMP_LLM_NEMO_PIPELINE_LOG")
ID_FIELD = os.environ.get("TMP_LLM_NEMO_ID_FIELD")
TEXT_FIELD = os.environ.get("TMP_LLM_NEMO_TEXT_FIELD")
INPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_INPUT_DATA_FORMAT")
OUTPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_OUTPUT_DATA_FORMAT")

def pre_imports():
    import dask, cudf, dask_cudf
    
def attach_args(
        parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    ):
    return ArgumentHelper(parser).add_distributed_args()

def pipeline_log(msg, level = "INFO"):
    log_file_path = SYS_LOG_FILE
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    log_entry = f"[{current_time}]: [{level}]: {msg}\n"
    print(log_entry)
    with open(log_file_path, 'a') as log_file:
        log_file.write(log_entry)

def ensure_directory_exists(filename: str):
    os.makedirs(os.path.dirname(filename), exist_ok=True)

def load_dataset(input_data_dir, file_type = "parquet", backend = "cudf"):
    files = list(get_all_files_paths_under(input_data_dir))
    raw_data = read_data(files, file_type = file_type, backend = backend, add_filename=True)
    dataset = DocumentDataset(raw_data)
    return dataset

def fuzzy_dedup(input_dataset):
    fuzzy_dedup_config = FuzzyDuplicatesConfig(
            cache_dir = CACHE_DIR,
            id_field = ID_FIELD,
            text_field = TEXT_FIELD,
            seed=42,
            char_ngrams=5,
            num_buckets=20,
            hashes_per_bucket=13,
            use_64_bit_hash=False,
            buckets_per_shuffle=5,
            false_positive_check=True,
            num_anchors=2,
            jaccard_threshold=0.8,
        )
    fuzzy_dup = FuzzyDuplicates(logger = LOG_DIR, config = fuzzy_dedup_config)
    duplicates = fuzzy_dup(dataset = input_dataset)
    docs_to_remove = duplicates.df.map_partitions(
        lambda x: x[x.group.duplicated(keep="first")]
    )
    result = input_dataset.df[
        ~input_dataset.df[ID_FIELD].isin(
            docs_to_remove[ID_FIELD].compute()
        )
    ]
    return DocumentDataset(result)

def main(args):
    client = get_client(**ArgumentHelper.parse_client_args(args))
    backend = "pandas"
    if args.device == "gpu":
        backend = "cudf"
        pre_imports()
    pipeline_log(f"dedup source: {INPUT_DIR}")
    input_dataset = load_dataset(INPUT_DIR)
    doc_total = len(input_dataset)
    pipeline_log(f"{doc_total} lines read")
    dataset_processed = fuzzy_dedup(input_dataset)
    doc_count = len(dataset_processed)
    pipeline_log(f"{doc_count} linese remains after fuzzy dedup")
    pipeline_log(f"output path: {OUTPUT_DIR}")
    write_to_disk(dataset_processed.df, OUTPUT_DIR, output_type = OUTPUT_DATA_FORMAT)
    pipeline_log("write completed")

if __name__ == "__main__":
    main(attach_args().parse_args())

Environment overview (please complete the following information)

with 5 servers.
each has 1*A100(80GB).
each has 70 CPU cores.
each has 320GB CPU memory.

Additional context

use dclm-baseline 1.0 parquet data and totally 8TB parquet data(after add nemo_id and no compression)

chenrui17 avatar Jan 07 '25 13:01 chenrui17