NeMo-Curator
NeMo-Curator copied to clipboard
fuzzy_dedup OOM issue
trafficstars
Describe the bug Use 5*A100 GPUs to do fuzzey_dedup task and encountered OOM issues. here is error info
2024-12-31 05:02:43,370 - distributed.worker - ERROR - Could not serialize object of type DataFrame
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 366, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 52, in dask_dumps
sub_header, frames = dumps(x)
File "/usr/local/lib/python3.10/dist-packages/cudf/comm/serialize.py", line 19, in dask_serialize_cudf_object
return x.host_serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 150, in host_serialize
header, frames = self.device_serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 90, in device_serialize
header, frames = self.serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/core/dataframe.py", line 1171, in serialize
header, frames = super().serialize()
File "/usr/local/lib/python3.10/dist-packages/cudf/utils/performance_tracking.py", line 51, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/cudf/core/frame.py", line 100, in serialize
header["columns"], frames = serialize_columns(self._columns)
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in serialize_columns
header_columns = [c.serialize() for c in columns]
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in <listcomp>
header_columns = [c.serialize() for c in columns]
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 1226, in serialize
if self.children:
File "column.pyx", line 293, in cudf._lib.column.Column.children.__get__
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /tmp/pip-build-env-fkfud_57/normal/lib/python3.10/site-packages/librmm/include/rmm/mr/device/cuda_memory_resource.hpp:60: cudaErrorMemoryAllocation out of memory
Steps/Code to reproduce bug
import argparse
import time
import os
from datetime import datetime
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import ArgumentHelper
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
INPUT_DIR = os.environ.get("TMP_LLM_STEP_INPUT_DIR")
OUTPUT_DIR = os.environ.get("TMP_LLM_STEP_OUTPUT_DIR")
CACHE_DIR = os.environ.get("TMP_LLM_STEP_CACHE_DIR")
LOG_DIR = os.environ.get("TMP_LLM_NEMO_LOG_ROOT")
PROFILE_DIR = os.environ.get("TMP_LLM_STEP_PROFILE_DIR")
SYS_LOG_FILE = os.environ.get("TMP_LLM_NEMO_PIPELINE_LOG")
ID_FIELD = os.environ.get("TMP_LLM_NEMO_ID_FIELD")
TEXT_FIELD = os.environ.get("TMP_LLM_NEMO_TEXT_FIELD")
INPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_INPUT_DATA_FORMAT")
OUTPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_OUTPUT_DATA_FORMAT")
def pre_imports():
import dask, cudf, dask_cudf
def attach_args(
parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
):
return ArgumentHelper(parser).add_distributed_args()
def pipeline_log(msg, level = "INFO"):
log_file_path = SYS_LOG_FILE
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
log_entry = f"[{current_time}]: [{level}]: {msg}\n"
print(log_entry)
with open(log_file_path, 'a') as log_file:
log_file.write(log_entry)
def ensure_directory_exists(filename: str):
os.makedirs(os.path.dirname(filename), exist_ok=True)
def load_dataset(input_data_dir, file_type = "parquet", backend = "cudf"):
files = list(get_all_files_paths_under(input_data_dir))
raw_data = read_data(files, file_type = file_type, backend = backend, add_filename=True)
dataset = DocumentDataset(raw_data)
return dataset
def fuzzy_dedup(input_dataset):
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir = CACHE_DIR,
id_field = ID_FIELD,
text_field = TEXT_FIELD,
seed=42,
char_ngrams=5,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=True,
num_anchors=2,
jaccard_threshold=0.8,
)
fuzzy_dup = FuzzyDuplicates(logger = LOG_DIR, config = fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset = input_dataset)
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)
result = input_dataset.df[
~input_dataset.df[ID_FIELD].isin(
docs_to_remove[ID_FIELD].compute()
)
]
return DocumentDataset(result)
def main(args):
client = get_client(**ArgumentHelper.parse_client_args(args))
backend = "pandas"
if args.device == "gpu":
backend = "cudf"
pre_imports()
pipeline_log(f"dedup source: {INPUT_DIR}")
input_dataset = load_dataset(INPUT_DIR)
doc_total = len(input_dataset)
pipeline_log(f"{doc_total} lines read")
dataset_processed = fuzzy_dedup(input_dataset)
doc_count = len(dataset_processed)
pipeline_log(f"{doc_count} linese remains after fuzzy dedup")
pipeline_log(f"output path: {OUTPUT_DIR}")
write_to_disk(dataset_processed.df, OUTPUT_DIR, output_type = OUTPUT_DATA_FORMAT)
pipeline_log("write completed")
if __name__ == "__main__":
main(attach_args().parse_args())
Environment overview (please complete the following information)
with 5 servers.
each has 1*A100(80GB).
each has 70 CPU cores.
each has 320GB CPU memory.
Additional context
use dclm-baseline 1.0 parquet data and totally 8TB parquet data(after add nemo_id and no compression)