Curator icon indicating copy to clipboard operation
Curator copied to clipboard

fuzzy_dedup OOM issue

Open chenrui17 opened this issue 1 year ago • 4 comments

Describe the bug Use 5*A100 GPUs to do fuzzey_dedup task and encountered OOM issues. here is error info

2024-12-31 05:02:43,370 - distributed.worker - ERROR - Could not serialize object of type DataFrame
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 52, in dask_dumps
    sub_header, frames = dumps(x)
  File "/usr/local/lib/python3.10/dist-packages/cudf/comm/serialize.py", line 19, in dask_serialize_cudf_object
    return x.host_serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 150, in host_serialize
    header, frames = self.device_serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 90, in device_serialize
    header, frames = self.serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/dataframe.py", line 1171, in serialize
    header, frames = super().serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/utils/performance_tracking.py", line 51, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/frame.py", line 100, in serialize
    header["columns"], frames = serialize_columns(self._columns)
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in serialize_columns
    header_columns = [c.serialize() for c in columns]
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in <listcomp>
    header_columns = [c.serialize() for c in columns]
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 1226, in serialize
    if self.children:
  File "column.pyx", line 293, in cudf._lib.column.Column.children.__get__
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /tmp/pip-build-env-fkfud_57/normal/lib/python3.10/site-packages/librmm/include/rmm/mr/device/cuda_memory_resource.hpp:60: cudaErrorMemoryAllocation out of memory

Steps/Code to reproduce bug

import argparse
import time
import os
from datetime import datetime
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import ArgumentHelper
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
INPUT_DIR = os.environ.get("TMP_LLM_STEP_INPUT_DIR")
OUTPUT_DIR = os.environ.get("TMP_LLM_STEP_OUTPUT_DIR")
CACHE_DIR = os.environ.get("TMP_LLM_STEP_CACHE_DIR")
LOG_DIR = os.environ.get("TMP_LLM_NEMO_LOG_ROOT")
PROFILE_DIR = os.environ.get("TMP_LLM_STEP_PROFILE_DIR")
SYS_LOG_FILE = os.environ.get("TMP_LLM_NEMO_PIPELINE_LOG")
ID_FIELD = os.environ.get("TMP_LLM_NEMO_ID_FIELD")
TEXT_FIELD = os.environ.get("TMP_LLM_NEMO_TEXT_FIELD")
INPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_INPUT_DATA_FORMAT")
OUTPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_OUTPUT_DATA_FORMAT")

def pre_imports():
    import dask, cudf, dask_cudf
    
def attach_args(
        parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    ):
    return ArgumentHelper(parser).add_distributed_args()

def pipeline_log(msg, level = "INFO"):
    log_file_path = SYS_LOG_FILE
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    log_entry = f"[{current_time}]: [{level}]: {msg}\n"
    print(log_entry)
    with open(log_file_path, 'a') as log_file:
        log_file.write(log_entry)

def ensure_directory_exists(filename: str):
    os.makedirs(os.path.dirname(filename), exist_ok=True)

def load_dataset(input_data_dir, file_type = "parquet", backend = "cudf"):
    files = list(get_all_files_paths_under(input_data_dir))
    raw_data = read_data(files, file_type = file_type, backend = backend, add_filename=True)
    dataset = DocumentDataset(raw_data)
    return dataset

def fuzzy_dedup(input_dataset):
    fuzzy_dedup_config = FuzzyDuplicatesConfig(
            cache_dir = CACHE_DIR,
            id_field = ID_FIELD,
            text_field = TEXT_FIELD,
            seed=42,
            char_ngrams=5,
            num_buckets=20,
            hashes_per_bucket=13,
            use_64_bit_hash=False,
            buckets_per_shuffle=5,
            false_positive_check=True,
            num_anchors=2,
            jaccard_threshold=0.8,
        )
    fuzzy_dup = FuzzyDuplicates(logger = LOG_DIR, config = fuzzy_dedup_config)
    duplicates = fuzzy_dup(dataset = input_dataset)
    docs_to_remove = duplicates.df.map_partitions(
        lambda x: x[x.group.duplicated(keep="first")]
    )
    result = input_dataset.df[
        ~input_dataset.df[ID_FIELD].isin(
            docs_to_remove[ID_FIELD].compute()
        )
    ]
    return DocumentDataset(result)

def main(args):
    client = get_client(**ArgumentHelper.parse_client_args(args))
    backend = "pandas"
    if args.device == "gpu":
        backend = "cudf"
        pre_imports()
    pipeline_log(f"dedup source: {INPUT_DIR}")
    input_dataset = load_dataset(INPUT_DIR)
    doc_total = len(input_dataset)
    pipeline_log(f"{doc_total} lines read")
    dataset_processed = fuzzy_dedup(input_dataset)
    doc_count = len(dataset_processed)
    pipeline_log(f"{doc_count} linese remains after fuzzy dedup")
    pipeline_log(f"output path: {OUTPUT_DIR}")
    write_to_disk(dataset_processed.df, OUTPUT_DIR, output_type = OUTPUT_DATA_FORMAT)
    pipeline_log("write completed")

if __name__ == "__main__":
    main(attach_args().parse_args())

Environment overview (please complete the following information)

with 5 servers.
each has 1*A100(80GB).
each has 70 CPU cores.
each has 320GB CPU memory.

Additional context

use dclm-baseline 1.0 parquet data and totally 8TB parquet data(after add nemo_id and no compression)

chenrui17 avatar Jan 07 '25 13:01 chenrui17

Thanks for raising the issue @chenrui17 . For 8TB of input data on 5 A100 GPUs (~400GB memory) the memory requirements to hold intermediates during stages like LSH might lead to OOM's.

I have a few recommendations to reduce memory and computational requirements at this scale.

char_ngrams=24, # use larger ngram size to reduce false positives.
buckets_per_shuffle=1, # process 1 bucket per iteration of LSH to reduce memory requirements.

# skip the false positive check which is computationally expensive. 
# In practice this is usually 1-2% of documents based on our experiments.
false_positive_check=False,

Some of the changes suggested above are becoming the default in Curator (see #386).

Additionally I would recommend parquet files <= 2GB uncompressed if you have large files. If using small files, you can use the blocksize=1GB arg during read_parquet (here: https://github.com/NVIDIA/NeMo-Curator/blob/9c8f185a5c3b5891d3c3ac78200c2684b10c3014/nemo_curator/datasets/doc_dataset.py#L90-L98 or specify files_per_partition to combine multiple parquet files into a single block for processing in prior versions of curator.

ayushdg avatar Jan 09 '25 14:01 ayushdg

Internally we've typically used 16-24 GPUs for processing data at this scale so I'm not sure if these suggestions will prevent OOM errors on 5 GPUs, but happy to follow up and see if this improves things.

ayushdg avatar Jan 09 '25 14:01 ayushdg

I'm currently dealing with parquet files of around 15 to 20 T on 64 H800 sheets. Unfortunately, it also throws an oom error. The following are my configuration parameters

    fuzzy_dedup_config = FuzzyDuplicatesConfig(
        cache_dir=cache_dir,
        id_field=dup_field,
        text_field=text_field,
        seed=42,
        char_ngrams=24,
        num_buckets=20,
        hashes_per_bucket=13,
        use_64_bit_hash=False,
        buckets_per_shuffle=3,
        false_positive_check=False,
    )

I wonder if this is normal? Because I think 64 H800s are already quite a large cluster. Compared with data processing. Secondly, if I expand the data to 40T or even 50T, are there any better suggestions in the Settings?

HuaYZhao avatar Apr 29 '25 11:04 HuaYZhao

Sorry missed this comment in my notifications @HuaYZhao. Could you share more details around which stage of the deduplication pipeline you run. Is it in the LSH stage or someplace else like connected components?

If it's the LSH stage I would recommend lower buckets_per_shuffle even further to 1. The higher memory usage in LSH comes from needing to partition the minhashes on the hash key and the shuffle can be memory intensive. Currently for the 64TB scale we end up using 96-128 GPUs for the LSH and Connected Components stages. Minhash and edgelist computation is parallel and will not OOM depending on available GPU memory.

We're working on many improvements to deduplication (and Curator in general) in the 25.06/25.08 timeline that utilizes a different shuffle API that should improve memory requirements significantly.

ayushdg avatar May 14 '25 21:05 ayushdg

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.

github-actions[bot] avatar Jul 25 '25 02:07 github-actions[bot]

This issue was closed because it has been inactive for 7 days since being marked as stale.

github-actions[bot] avatar Aug 01 '25 02:08 github-actions[bot]