dissect.target icon indicating copy to clipboard operation
dissect.target copied to clipboard

Add multiprocessing to `target-dump`

Open qmadev opened this issue 5 months ago • 7 comments

It could be nice to make target-dump support multiprocessing to increase overall processing speed. We could create a process pool with a number of workers specified via a command line argument. We would need to figure out how to handle the state file while using multiple processes. Something with race conditions/mutexes. Maybe the contextmanager can lock files but I'm not sure.

qmadev avatar Jul 01 '25 18:07 qmadev

While this is a very nice feature request, this is not a high priority for us to work on as we don't use target-dump internally. We would welcome a PR for this though!

Schamper avatar Jul 02 '25 13:07 Schamper

I am working on it in my private time, just because of the programming challenges that come with parallelism. We don't use it internally either, but I do see some potential for it if there's support for multiprocessing.

qmadev avatar Jul 02 '25 13:07 qmadev

@qmadev will it be OK if I'll work on this one too or are you already in the middle of it? I want to do it for learning purposes as well, let me know if that's too late to start, thanks!

yuvaljacob avatar Jul 09 '25 23:07 yuvaljacob

I haven't really started yet, so please go ahead!

qmadev avatar Jul 10 '25 09:07 qmadev

Love your enthusiasm guys! @qmadev @yuvaljacob If you'd like more ideas/challenges you can look at open tasks labeled "good-first-issue" (or any open task that rise your curiosity). Thanks for contributing!

EinatFox avatar Jul 10 '25 09:07 EinatFox

@EinatFox @qmadev @Schamper , I went over the entire pipeline of function executions, it seems that the most CPU heavy part is most likely is this one:

record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit)

Here is the full pipeline code:

def execute_pipeline(
    state: DumpState,
    targets: Iterator[Target],
    dry_run: bool,
    arguments: list[str],
    limit: int | None = None,
) -> None:
    """Run the record generation, processing and sinking pipeline."""

    target_func_pairs_stream = produce_target_func_pairs(targets, state)
    record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit)
    record_stream = sink_records(record_stream, state)
    record_stream = log_progress(record_stream)
    record_stream = persist_processing_state(record_stream, state)

    # exhaust the generator, executing all pipeline steps
    deque(record_stream, maxlen=0)

    log.info("Pipeline has finished")

The rest of the record_stream statements can run one after the other (in sync), and update "state" without interruptions.

  1. Does only running record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit) in parallel seems like a valid solution? If not I can run all of them in parallel and use multiprocessing lock with its own context manager.
  2. Can I use multiprocessing as the default behavior here, or do you want a flag for the user to determine if multiprocessing should be utilized? IMO having that as the default behavior is better, as we can automatically get the number of cores (workers)
  3. Do you want an optional CLI option for mentioning number of workers (cores) or a mandatory one? That's a design question, we can also automatically fetch the number of records with multiprocessing.cpu_count(), or just use it in case the value was not provided by the user.

Awaiting your updates, thanks!

yuvaljacob avatar Jul 10 '25 17:07 yuvaljacob

  1. Looking at the code snippet you provided, I don't think you can get away with only running record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit) in parallel, but do test it.
  2. Sounds good to me.
  3. Yes, optional CLI arg with the amount of CPU cores as default.

qmadev avatar Jul 10 '25 23:07 qmadev