spaCy icon indicating copy to clipboard operation
spaCy copied to clipboard

Enable multiprocessing groups within project config

Open richardpaulhudson opened this issue 2 years ago • 28 comments

Description

Enable the specification of a group of commands within a spaCy project workflow that are to be executed in parallel.

Features

  1. Each spaCy projects command is made up of n operating-system-level commands that are executed in series. The spaCy projects commands within a parallel group are executed in parallel, but the operating-system-level commands within each spaCy projects command are still executed in series.
  2. spaCy project workflows support the definition of dependencies and outputs (files created by commands) in order to ensure that spaCy projects commands are not re-executed unnecessarily on consecutive workflow runs. All this functionality works for each command in a parallel group with respect to the rest of the project file in the same way as it does for a serial command. However, the management of dependencies between the members of a parallel group is out of scope: the user is responsible for ensuring that no problems occur.
  3. It is possible to specify a project-file-wide maximum number of parallel processes. If a parallel group contains more commands than this maximum n, only the first n commands within the group are started. Whenever a command completes, the next command in the group is started, and so on until all the commands in the group have executed.
  4. When the project file is being run from a terminal on a POSIX platform (e.g. Linux, macOS), a table is displayed with real-time status information about the executing commands. The status table works in a similar fashion to the docker pull command-line interface. No status table is displayed on Windows platforms at present because doing so uses ANSI control characters and Wasabi does not yet support them on Windows. However, note that in the medium term we aim to extend Wasabi support for ANSI to Windows, at which point the status table will be shown on Windows as well.
  5. If any command within a group returns a non-zero return code, the execution of the other commands within the group is terminated.
  6. Console output relating to a command as well as outputted by the OS-level commands it comprises is initially redirected to a logfile in a temporary directory that a user can monitor in real time if they so wish. When all the commands in a parallel group have completed, the contents of these logfiles are reproduced in the console. This ensures that output from different commands is not mixed up in the console. It also means that the console output after a parallel group has executed is very similar to the console output that would have resulted from the group being executed in series.

Architecture

  1. The main process that is executing the project file uses the multiprocessing module to spawn two or more worker processes, each of which is responsible for a single spaCy project command, and each of which executes — in series, as subprocesses, and using the subprocess module — the operating-system-level commands that it contains.
  2. Each worker process sends status information about the command and operating-system-level commands that it is executing back to the main process via a queue. This status information includes the process ids of the subprocesses that the worker process starts for each operating-system level command.
  3. If a subprocess has failed or been terminated, the worker process that started it does not execute any remaining operating-system-level commands within the spaCy project command for which it is responsible.
  4. If the main process receives notice from a worker process that a subprocess has failed, it terminates directly any other subprocesses that are currently running. Each worker process controlling one of these subprocesses detects that its subprocess has been terminated and notifies the main process of this fact. Doing things this way round has the advantage that subprocesses terminated by the main process are detected, treated and reported identically to subprocesses terminated from outside the library, i.e. directly from the operating system console.
  5. The main process maintains a simple state machine that reflects its knowledge of the activities of the worker processes. It uses this knowledge to decide when to start new worker processes or terminate subprocesses, and as the basis for the status table. Note that the state machine maintains a distinction between failed and terminated processes that is only meainingful on POSIX platforms; on Windows, both states are reported as failed/terminated.

The background to the high-level decision to use subprocess in conjunction with multiprocessing and a status queue is documented here.

Design decisions in need of review

These design decisions reflect various tradeoffs that others may well judge differently; all of them could be altered with minimal effort.

Nr Decision Rationale
1. The console output for each command, having initially been redirected to its own logfile, is reproduced on the console once execution of the parallel group is complete, and there is no option to switch this off. Reproducing the log output has the advantage that the console output is virtually unchanged whether commands are executed in series or in parallel; an option to switch off reproducing log output would complicate the project file syntax; in most cases commands do not log significant amounts of output, so reproducing it is unproblematic.
2. The logfiles for the individual commands are written to a temporary directory; the name of each logfile is derived from the name of the command whose output it contains (this is possible because parallel groups are not allowed to contain the same command twice); this behaviour is fixed. The output is reproduced on the console which is where most users will view it; adding options around the location of the log files would complicate the project file syntax.
3. The console output from each worker process is sent to the main process via the queue, although it would also be possible for the main process to read it directly from the file system. Although this procedure means the output has to be serialized an extra time, the files are expected to remain small, so the improved encapsulation the procedure provides is more important; I confirmed with a short load test that the queue can handle much larger amounts of data than it would ever realistically need to transfer.
4. STDERR output is treated identically to STDOUT output and there is no option to change this. In most cases, the most important requirement is that commands being executed in parallel do not output to the console in real time as this messes up the status table display. STDERR output is only likely to be important when debugging complex issues, in which case the user is in any case likely to get the problematic commands working in series before moving them into a parallel group.
5. Execution of a parallel group is halted if any command returns a non-zero return code, although there are situations in which a non-zero return code might be expected. The same is true of spaCy projects in general when commands are executed in series.
6. Processes are terminated using the SIGTERM signal and there is no option to use alternatives like SIGKILL. Terminating the other commands in a group when one has failed is a convenience feature. The user's next step is likely to be to debug the failed command on its own, i.e. serially outside a parallel group. It seems most unlikely that a user's response to a process within a parallel group failing and it not having been possible to terminate some other process in the group with SIGTERM would be to specify a different signal.
7. The keepalive message interval, maximum width of a parallel command group name within a divider and temporary logfile directory name are all hardcoded in the parallel.py module without any options to change them. Options to change these parameters would complicate the syntax of the project file; it does not seem likely that anyone would have a reason to want values different from the defaults.
8. The main process spawns worker processes regardless of the platform, although forking is more efficient on Unix/Linux. Because the main process does not have a significant memory footprint, the additional cost of spawning is not relevant; on the other hand, it makes sense to keep the behaviour as consistent as possible across platforms.
9. The execution of serial commands is left as it was, meaning that there are differences in how a command is executed depending on whether it is part of a parallel group or not. It would be possible to execute each serial command as a 1-member parallel group, but this would unnecessary complicate what is still the standard way of executing commands, and would also increase the risk of this PR.

Demonstration

This demonstration has been tested on Linux, macOS and Windows 10. To try out the functionality, create these two files in a directory:

script.py:

import sys
from time import sleep
_, sleep_secs, rc = sys.argv
print("Output before sleep to stdout")
sleep(int(sleep_secs))
print("Output after sleep to stderr", file=sys.stderr)
sys.exit(int(rc))

project.yml:

workflows:
  all:
    - sleepC
    - parallel: [sleepC, sleepA, sleepB, sleepD, sleepE]
    - parallel: [sleepE, sleepA, fail, sleepC, sleepD]
    - sleepB

max_parallel_processes: 2
commands:

  - name: sleepA
    script:
      - "python script.py 2 0"
      - "python script.py 2 0"
      - "python script.py 3 0"
      - "python script.py 4 0"

  - name: sleepB
    script:
     - "python script.py 1 0"
     - "python script.py 2 0"

  - name: sleepC
    script:
    - "python script.py 4 0"

  - name: sleepD
    script:
    - "python script.py 2 0"
    outputs:
    - someOutput

  - name: sleepE
    script:
    - "python script.py 1 0"
    - "python script.py 1 0"
    - "python script.py 1 0"
    - "python script.py 1 0"
    - "python script.py 1 0"

  - name: fail
    script:
    - "python script.py 1 1"

Then type from within the directory:

  1. spacy project run all (successful and then unsuccessful execution of a parallel group; running commands are terminated when another command fails in the second group; command with unchanged output is not run the second time; status table is displayed on POSIX platforms)
  2. spacy project run all --force
  3. spacy project run all --dry
  4. spacy project run
  5. spacy project run all --help
  6. spacy project document
  7. spacy project dvc (can only be attempted if dvc is already installed; exits with an error message as dvc does not support parallel groups)

Types of change

Enhancement or new feature

Checklist

  • [x] I confirm that I have the right to submit this contribution under the project's MIT license.
  • [x] I ran the tests, and all new and existing tests passed.
  • [x] My changes don't require a change to the documentation, or if they do, I've added all required information.

richardpaulhudson avatar May 09 '22 15:05 richardpaulhudson

@explosion-bot please test_gpu

richardpaulhudson avatar May 10 '22 08:05 richardpaulhudson

🪁 Successfully triggered build on Buildkite

URL: https://buildkite.com/explosion-ai/spacy-gpu-test-suite/builds/68

explosion-bot avatar May 10 '22 08:05 explosion-bot

I have been trying to experiment with this functionality but couldn't define a good example (in yaml) that work

Retrying my previous experiment with the current PR code does work - some of the error handling you improved must have fixed it. Also thanks for the cool example in the PR description! That's definitely helpful. One comment I have there is that when you execute spacy project run it shows

Available workflows in project.yml Usage: python -m spacy project run [WORKFLOW]

all setup -> commandA -> commandB

which implies an order between commandA and commandB -> could you look into fixing that as well?

svlandeg avatar Jun 09 '22 10:06 svlandeg

One more thing I was realising is that this change does introduce a potential for breaking compatibility: when users create a .yml with this parallelization for a new version of spaCy, that same .yml will result in errors for older versions of spaCy, and the error is actually quite cryptic:

✘ Invalid project.yml. Double-check that the YAML is correct.
[workflows -> all -> 1] str type expected

I don't think we can do much about that though...

svlandeg avatar Jun 14 '22 13:06 svlandeg

One more thing I was realising is that this change does introduce a potential for breaking compatibility: when users create a .yml with this parallelization for a new version of spaCy, that same .yml will result in errors for older versions of spaCy, and the error is actually quite cryptic:

✘ Invalid project.yml. Double-check that the YAML is correct.
[workflows -> all -> 1] str type expected

I don't think we can do much about that though...

What we could do is to introduce an additional field at the top declaring a schema version number: older versions of spaCy would then fail because they wouldn't recognise the new field and this would be less cryptic for the user. However, it would also mean that all config files would no longer be backwards-compatible rather than just config files that involve parallelism, which would be an unacceptably large price to pay for a slightly clearer error message. I think the best course of action is just to live with this.

UPDATE: during internal discussions we considered various fields (like the max_parallel_processes field) as a possible route to a more user-friendly error message. In fact it turns out that an unrecognized top-level field in the config file does not trigger an error, so that there really is no other option than to live with the current situation.

richardpaulhudson avatar Jun 15 '22 15:06 richardpaulhudson

I've investigated the various ways of parallelising commands and documented the results with code snippets. if you follow the argumentation there, the current architecture with multiprocessing queues represents the most appropriate way of solving the problem, but changes are necessary to better manage error handling and logging and to ensure cross-platform consistency.

I'd be very grateful for feedback, especially on the specific issues set out in the last section.

richardpaulhudson avatar Jul 07 '22 14:07 richardpaulhudson

I'd be very grateful for feedback, especially on the specific issues set out in the last section.

That's a very thorough write-up! :clap:

multiprocessing.Pool provides a convenient method for collecting the output from multiple parallel processes and returning when they have all completed.

I think Pool actually does return/proceed whenever a child process has finished, not just when all of them do. This doesn't change any of the other points you make.

I agree that Queue seems like a reasonable choice.

4.4 Managing console output

Fully agree that child processes shouldn't log to the console directly, that's just a mess (stderr is perhaps fine, as it shouldn't happen often anyway). Output to be displayed in the console could also be stored in a queue though - while it's not "real-time", at least we wouldn't have to wait for the process to finish in order to see the log and on average not as much log would be lost for killed processes.

4.5 Executing serial commands

Agreed. Might also make debugging easier.

rmitsch avatar Jul 07 '22 15:07 rmitsch

Thanks for your very timely feedback @rmitsch!

Output to be displayed in the console could also be stored in a queue though - while it's not "real-time", at least we wouldn't have to wait for the process to finish in order to see the log and on average not as much log would be lost for killed processes.

I completely agree that we should try and avoid losing stdout output for stored processes. Up until now I thought I had to do something else in the worker process where the subprocess is started, but I investigated further and it turns out that example_queueing_with_output_management.py already does everything it can to catch stdout output. The problem is rather in the subprocess code itself: if you start a Python script without PYTHONUNBUFFERED=1, it buffers its stdout output and anything that's in the buffer is lost when the process is killed. And because we can't fully control what subprocess code does, there's nothing we can really do about this problem that would apply to any command. This applies whichever way we manage the output: it's a necessary consequence of killing processes. For Python commands, however, we can add -u, e.g. python3 -u ... instead of python3 ..., to prevent the buffering.

For subprocesses that do write to stdout regularly, do you (and other people) think it's better to save up all the output to write in one go, or to write it in chunks to stay closer to real-time output? On balance I think saving all the output to write in one go is probably the better option: while chunks could be written in a controlled way from the main process that would ensure we didn't get output from multiple processes on the same line, each chunk from a given process would still be unlikely to form a logical section of output. This would be likely to make the log as a whole hard to read, and the console output would also differ in its structure and its readability between consecutive executions of the same workflow.

I'm changing the document to reflect this discussion btw, (I'm saying this in case at some point in the future the discussion doesn't seem to follow from the document.)

richardpaulhudson avatar Jul 07 '22 17:07 richardpaulhudson

For subprocesses that do write to stdout regularly, do you (and other people) think it's better to save up all the output to write in one go, or to write it in chunks to stay closer to real-time output? On balance I think saving all the output to write in one go is probably the better option: ...

You are making a good point here. Personally I think having some more or less "live" output is useful though, if only to see things are actually running (running something and not seeing any output always makes me nervous).

Either way, writing the log to stdout is going to be messy. An idea - dunno if this makes a lot of sense: how about

  • displaying only some kind of status (active/failed/finished) for each process while they are running,
  • printing all the output to stdout once all processes are done as suggested by you,
  • in addition also writing the logs for each process to - temporary - files and logging those paths to stdout so that users can access clean per-process logs?

So we might display something like this as long as at least one process is still running:

| Process ID | Status | Path to log file |
|------------|--------|------------------|
|     1      | active |  /tmp/log_1.txt  |
|     2      |  done  |  /tmp/log_2.txt  |
|     3      | failed |  /tmp/log_3.txt  |

[Edit] Something like the command instead of the process ID might be more useful.

rmitsch avatar Jul 08 '22 08:07 rmitsch

This is generally an excellent idea, although I'm not sure about the Status column as this is log output rather than an interactive GUI. I'm not sure it's really necessary though, because:

status active: log output from the worker process & subprocess will not yet have been logged by the main process status done: log output from the worker process & subprocess will have been logged by the main process status failed: log output from the worker process & subprocess will have been logged and marked as failed by the main process

richardpaulhudson avatar Jul 08 '22 09:07 richardpaulhudson

This is generally an excellent idea, although I'm not sure about the Status column as this is log output rather than an interactive GUI. I'm not sure it's really necessary though, because: ...

Yeah, it would have to be updated in-place tqdm-style - or re-printed every x seconds. You are correct, it's not strictly necessary, as it can be inferred from the logging output. However, I think it might be easy to lose track of which processes have failed/are done at any point if there's a lot of logging output (which we can't/don't want to control).

rmitsch avatar Jul 08 '22 10:07 rmitsch

Yeah, it would have to be updated in-place tqdm-style - or re-printed every x seconds

Printing it tqdm style would be great from a usability point of view — can anyone else think of any reasons that would speak against doing that? (tqdm isn't used within spaCy projects at present, although it is already a spaCy dependency).

Btw

printing all the output to stdout once all processes are done as suggested by you

was not exactly my suggestion, rather printing the output to stdout for each process when that process completes (although if a process fails, other processes in the group will be killed which will lead to their output being printed as well shortly afterwards). This means that if the status table is printed out tqdm-style, it will need to be rerendered after the stdout block for each completed process.

richardpaulhudson avatar Jul 08 '22 11:07 richardpaulhudson

Btw

printing all the output to stdout once all processes are done as suggested by you

was not exactly my suggestion...

Ah, ok. Actually, does it even make sense to (messily) log to stdout if we log to file and have a status indicator on stdout?

rmitsch avatar Jul 08 '22 11:07 rmitsch

Actually, does it even make sense to (messily) log to stdout if we log to file and have a status indicator on stdout?

Hmm, I see what you mean. I think we need to distinguish between the output from the worker process when a job finishes and any output the subprocess might have logged to stdout:

  • as the worker process output won't end up in the subprocess log files, we definitely want to retain it, also for consistency with serially executed commands. I agree that if we have a status indicator it doesn't make much difference whether the worker process output for a given subprocess is displayed when the subprocess terminates or when the whole group does, though.
  • I can see that if a subprocess logs lots of output to stdout it probably doesn't make sense to duplicate that output to the main console. But if it only logs a couple of lines (which many subprocesses probably do) it would be nice to see those lines alongside the worker process output for that subprocess. So one possibility would be to duplicate the output on the console if it's less than e.g. 20 lines, and to display a message that n lines of output were logged if it's more than 20 lines. Or is this getting overengineered?

richardpaulhudson avatar Jul 08 '22 17:07 richardpaulhudson

Hmm, I see what you mean. I think we need to distinguish between...

The distinction makes sense, and I fully agree on the first point. Regarding subprocess logs in the main console: perhaps we could just start with a binary option (display subprocess logs - yes/no) and then see if users want a line limit? If we don't even want a binary choice*, I'd go without no log replication to the console.

* Configuring this would require changes to the syntax, not sure that's worth it.

rmitsch avatar Jul 11 '22 08:07 rmitsch

Configuring this would require changes to the syntax, not sure that's worth it.

The PR already introduces changes to the projects file syntax. I don't think more are a problem, especially if there's default behaviour so that existing project files remain valid.

richardpaulhudson avatar Jul 11 '22 09:07 richardpaulhudson

@explosion-bot please test_gpu

richardpaulhudson avatar Jul 20 '22 17:07 richardpaulhudson

🪁 Successfully triggered build on Buildkite

URL: https://buildkite.com/explosion-ai/spacy-gpu-test-suite/builds/103

explosion-bot avatar Jul 20 '22 17:07 explosion-bot

@explosion-bot please test_gpu

richardpaulhudson avatar Jul 21 '22 11:07 richardpaulhudson

🪁 Successfully triggered build on Buildkite

URL: https://buildkite.com/explosion-ai/spacy-gpu-test-suite/builds/104

explosion-bot avatar Jul 21 '22 11:07 explosion-bot

@explosion-bot please test_gpu

richardpaulhudson avatar Jul 21 '22 16:07 richardpaulhudson

🪁 Successfully triggered build on Buildkite

URL: https://buildkite.com/explosion-ai/spacy-gpu-test-suite/builds/105

explosion-bot avatar Jul 21 '22 16:07 explosion-bot

@explosion-bot please test_gpu

richardpaulhudson avatar Jul 25 '22 16:07 richardpaulhudson

🪁 Successfully triggered build on Buildkite

URL: https://buildkite.com/explosion-ai/spacy-gpu-test-suite/builds/106

explosion-bot avatar Jul 25 '22 16:07 explosion-bot

@explosion-bot please test_gpu

richardpaulhudson avatar Jul 25 '22 16:07 richardpaulhudson

🪁 Successfully triggered build on Buildkite

URL: https://buildkite.com/explosion-ai/spacy-gpu-test-suite/builds/107

explosion-bot avatar Jul 25 '22 16:07 explosion-bot

Not approving yet because of the Windows/ANSI/wasabi issue, but otherwise I'm done with my review. LGTM!

rmitsch avatar Jul 27 '22 13:07 rmitsch

When a command within a parallel group has failed, the other commands in the group are terminated. Here a race condition can occur where the worker process to be terminated has just completed of its own accord and no longer exists. One of the last batches of changes based on review comments was to narrow the scope of the exception caught when handling this race condition.

One CI run failed on a subsequent run, suggesting that the new, narrower exception does not apply on Windows and that a different exception would be necessary there. I reran the CI to test the assumption that this race condition (which occurs fairly rarely - I've never seen it anywhere outside the context of a CI runner, presumably because CI runners do not have consistent access to the resources they use) was the probable culprit and indeed it passed the second time. I think it is very likely to have been the race condition that caused the exception although it could also have been some strange one-off with the CI runner.

I'm on vacation until mid-August and when I get back I'll do more tests on a Windows machine to find out exactly what's going on. Presuming I'm right about the race condition, I'll then widen the scope of the try - except block to handle the Windows exception and also double-check what happens on macOS.

Apart from this issue and the Wasabi/Windows issue, this PR is ready for further reviews.

richardpaulhudson avatar Jul 29 '22 07:07 richardpaulhudson

@explosion-bot please test_gpu

richardpaulhudson avatar Oct 04 '22 16:10 richardpaulhudson

🪁 Successfully triggered build on Buildkite

URL: https://buildkite.com/explosion-ai/spacy-gpu-test-suite/builds/113

explosion-bot avatar Oct 04 '22 16:10 explosion-bot