datatrove icon indicating copy to clipboard operation
datatrove copied to clipboard

minhash dedup causes local machine to hang.

Open staticpunch opened this issue 8 months ago • 4 comments

Currently my goal is to deduplicate ~750GB text (around 750 jsonl files, each is 1GB). My machine has 1TB RAM, 256 CPU cores. I used the following config to run Minhash Deduplication but then my machine hanged for more than 24 hours. I couldn't even Ctrl+C the process that I had to reboot the server.

from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import (
    MinhashConfig,
    MinhashDedupBuckets,
    MinhashDedupCluster,
    MinhashDedupFilter,
)
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.tokens import TokensCounter
from datatrove.pipeline.writers.jsonl import JsonlWriter

# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(use_64bit_hashes=True)  # better precision -> fewer false positives (collisions)
MINHASH_BASE_PATH = "minhash/output"
LOGS_FOLDER = "minhash/log"
LOCAL_LOGS_FOLDER = "my_local_folder_for_slurm_logs/"

TOTAL_TASKS = 256
NUM_WORKERS = 128

# this is the original data that we want to deduplicate
INPUT_READER = JsonlReader(
	data_folder="data",
	recursive=True ## I have some subfolders.
)

# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = LocalPipelineExecutor(
    pipeline=[
        INPUT_READER,
        MinhashDedupSignature(
            output_folder=f"{MINHASH_BASE_PATH}/signatures", 
            config=minhash_config
        ),
    ],
    tasks=TOTAL_TASKS,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/signatures",
) 

# stage 2 finds matches between signatures in each bucket
stage2 = LocalPipelineExecutor(
    pipeline=[
        MinhashDedupBuckets(
            input_folder=f"{MINHASH_BASE_PATH}/signatures",
            output_folder=f"{MINHASH_BASE_PATH}/buckets",
            config=minhash_config,
        ),
    ],
    tasks=minhash_config.num_buckets,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/buckets",
    depends=stage1,
)

# stage 3 creates clusters of duplicates using the results from all buckets
stage3 = LocalPipelineExecutor(
    pipeline=[
        MinhashDedupCluster(
            input_folder=f"{MINHASH_BASE_PATH}/buckets",
            output_folder=f"{MINHASH_BASE_PATH}/remove_ids",
            config=minhash_config,
        ),
    ],
    tasks=1,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/clusters",
    depends=stage2,
)

stage4 = LocalPipelineExecutor(
    # job_name="mh4",
    pipeline=[
        INPUT_READER,
        TokensCounter("Llama-3-8B/tokenizer.json"),  # nice way to see how many tokens we had before and after deduplication
        MinhashDedupFilter(
            input_folder=f"{MINHASH_BASE_PATH}/remove_ids",
            exclusion_writer=JsonlWriter(f"{MINHASH_BASE_PATH}/removed"),
        ),
        JsonlWriter(output_folder=f"{MINHASH_BASE_PATH}/deduplicated_output"),
    ],
    tasks=TOTAL_TASKS,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/filter",
    depends=stage3,
)

stage4.run()

I did limit the workers and tasks to be lower than the number of my CPU cores, so I'm pretty clueless what is the reason to cause my server to hang. Please suggest me a better config to run minhash smoothly.

staticpunch avatar Jun 17 '24 07:06 staticpunch