Add multiprocessing to `target-dump`
It could be nice to make target-dump support multiprocessing to increase overall processing speed. We could create a process pool with a number of workers specified via a command line argument. We would need to figure out how to handle the state file while using multiple processes. Something with race conditions/mutexes. Maybe the contextmanager can lock files but I'm not sure.
While this is a very nice feature request, this is not a high priority for us to work on as we don't use target-dump internally. We would welcome a PR for this though!
I am working on it in my private time, just because of the programming challenges that come with parallelism. We don't use it internally either, but I do see some potential for it if there's support for multiprocessing.
@qmadev will it be OK if I'll work on this one too or are you already in the middle of it? I want to do it for learning purposes as well, let me know if that's too late to start, thanks!
I haven't really started yet, so please go ahead!
Love your enthusiasm guys! @qmadev @yuvaljacob If you'd like more ideas/challenges you can look at open tasks labeled "good-first-issue" (or any open task that rise your curiosity). Thanks for contributing!
@EinatFox @qmadev @Schamper , I went over the entire pipeline of function executions, it seems that the most CPU heavy part is most likely is this one:
record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit)
Here is the full pipeline code:
def execute_pipeline(
state: DumpState,
targets: Iterator[Target],
dry_run: bool,
arguments: list[str],
limit: int | None = None,
) -> None:
"""Run the record generation, processing and sinking pipeline."""
target_func_pairs_stream = produce_target_func_pairs(targets, state)
record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit)
record_stream = sink_records(record_stream, state)
record_stream = log_progress(record_stream)
record_stream = persist_processing_state(record_stream, state)
# exhaust the generator, executing all pipeline steps
deque(record_stream, maxlen=0)
log.info("Pipeline has finished")
The rest of the record_stream statements can run one after the other (in sync), and update "state" without interruptions.
- Does only running
record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit)in parallel seems like a valid solution? If not I can run all of them in parallel and use multiprocessing lock with its own context manager. - Can I use multiprocessing as the default behavior here, or do you want a flag for the user to determine if multiprocessing should be utilized? IMO having that as the default behavior is better, as we can automatically get the number of cores (workers)
- Do you want an optional CLI option for mentioning number of workers (cores) or a mandatory one? That's a design question, we can also automatically fetch the number of records with multiprocessing.cpu_count(), or just use it in case the value was not provided by the user.
Awaiting your updates, thanks!
- Looking at the code snippet you provided, I don't think you can get away with only running
record_stream = itertools.islice(execute_functions(target_func_pairs_stream, dry_run, arguments), limit)in parallel, but do test it. - Sounds good to me.
- Yes, optional CLI arg with the amount of CPU cores as default.