datatrove icon indicating copy to clipboard operation
datatrove copied to clipboard

Filter very slow

Open hiennm15 opened this issue 6 months ago • 6 comments

I using 4xH100, 100 CPU cores, 1000 RAM to filter 1TB data japanese. Although the GPU is at 50% utilization and the CPU is running at 100%, only 3MB of data is processed per minute. I suspect that the tokenizer might be the bottleneck. I want to ask about what is actually causing the bottleneck. Is there a way to improve the filter speed?

from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.dedup import SentenceDedupFilter, SentenceDedupSignature, SentenceFindDedups
from datatrove.pipeline.dedup.sentence_dedup import SentDedupConfig
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import *
from datatrove.pipeline.readers import JsonlReader, WarcReader
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.typeshelper import Languages
import os

INPUT_READER = JsonlReader(
    data_folder="/home/altai/hiennm/data/pretrain",
    # recursive=True
)
TOTAL_TASKS = 4000
NUM_WORKERS = 400
FILTERING_OUTPUT_PATH = "/home/altai/hiennm/data/remove_"
stage = LocalPipelineExecutor(
    pipeline=[
        INPUT_READER,
        GopherRepetitionFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/GopherRepetitionFilter")),
        C4QualityFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/C4QualityFilter")),
        # LanguageFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/LanguageFilter")),
        GopherQualityFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/GopherQualityFilter")),
        C4BadWordsFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/C4BadWordsFilter")),
        URLFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/URLFilter")),
        # FineWebQualityFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/FineWebQualityFilter")),
        JsonlWriter(output_folder="filter_/output")
    ],
    tasks=TOTAL_TASKS,
    workers=NUM_WORKERS,
    logging_dir="filter_/log",
) 
if __name__ == '__main__':
    # freeze_support()
    stage.run()

image image

hiennm15 avatar Aug 19 '24 08:08 hiennm15