flux-core icon indicating copy to clipboard operation
flux-core copied to clipboard

Lightweight local job submission (~ 10ms)

Open jedbrown opened this issue 4 years ago • 22 comments

The Python-based flux mini run costs over 100 ms, and thus becomes a bottleneck in some scenarios, including unit testing with independent executable invocations. This was discussed off-topic in https://github.com/flux-framework/flux-core/issues/2915 and some rough solutions were proposed. I'd like to revisit and scope out the potential for a lightweight (e.g., bash, C, or Rust) job submission tool that supports blocking semantics like mpiexec in terms of capturing stdout and stderr, and returning error codes. One requirement would be to execute in time comparable to MPICH mpiexec, which costs 5-8 ms.

Who else would use such a tool and what are their needs? Is this best as a stand-alone tool that uses Flux's C bindings or would there be a better way to integrate it?

An example of this sort of testing is in PETSc. If you click into any running pipeline, you can see CI jobs that execute something similar to the following, which uses parallel make to manage thousands of MPI-based tests. This is an example that runs an interesting subset of the full suite and will give the report as above, where you can set MPIEXEC=/other/mpiexec to try different runners with the configured/detected MPI.

git clone https://gitlab.com/petsc/petsc
cd petsc
./configure
make
make test search=snes%

Originally posted by @jedbrown in https://github.com/flux-framework/flux-core/issues/2915#issuecomment-621427664

jedbrown avatar Mar 18 '21 18:03 jedbrown

I'd like to revisit and scope out the potential for a lightweight (e.g., bash, C, or Rust) job submission tool that supports blocking semantics like mpiexec in terms of capturing stdout and stderr, and returning error codes.

@jedbrown: Are there any other flags of mpiexec that you would like to support besides -n/-np?

From #2915, I like @dongahn's path forward:

  • Use @garlick's dead simple launcher (this may already provide 2x overhead reduction though more benchmarking is needed)

@garlick: do you still have that flux job spec simple jobspec generation command somewhere? If so, we could start with the really simple flux-mpiexec bash one-liner that you had pasted in that thread: flux job attach $(flux job spec $@ | flux job submit).

  • Use dead simple validator

Although it doesn't really seem to be a bottleneck, we can use the no-op validator that @grondo suggested to avoid any initial delay due to python: https://github.com/flux-framework/flux-core/issues/2915#issuecomment-621334887

  • Don't use munge authentication (which isn't really required anyway for this single user flux use)

If flux is not configured with flux-security, the default behavior is to use none for signing, right? So the only TODO here is just to not configure flux with flux-security?

SteVwonder avatar Mar 19 '21 00:03 SteVwonder

I don't recall actually having a jobspec generator in C, if that's what I'm supposed to have? Sorry :-/

You could see how far you would get with an all-C front end tool by pre-generating the jobspec with flux mini run --dry-run >jobspec.json then running running flux job attach $(flux job submit jobspec.json) repeatedly. That would eliminate the python startup cost and give you a best case (zero cost to generate the jobspec).

The validator startup (also python btw) is a cost for the first job, then it stays running until it is idle for 5s. So to eliminate that from testing, run one job before running your test. Or disable as @SteVwonder suggests.

Munge will only be used when the submitting user isn't the same user running flux, even if built with flux-security. See https://github.com/flux-framework/flux-core/blob/master/src/common/libjob/job.c#L106

I would expect flux mini run commands run in parallel to scale pretty well, but if this is local only, then the number you can run in parallel would be limited by your local core count or whatever chunk of cores each job requires. If you don't want the parallelism limited by the number of cores, we don't have an "overcommit" like slurm does but you can load a set of fake resources, as we do in some of our tests and get more things running in parallel. Doing that begs the question of why use flux at all though, so I'm guess that is not what you're after.

My gut feeling is it is probably not realistic to expect end to end job turnaround in flux to approach that of local mpiexec, because mpiexec is a self contained launcher, and flux is made up of components that communicate with messages; and because flux is actually putting jobs in a queue and scheduling your cores. IOW mpiexec has the luxury of being able to immediately start your processes. I agree it would be great if we could do better though.

garlick avatar Mar 19 '21 01:03 garlick

we can use the no-op validator that @grondo suggested to avoid any initial delay due to python: #2915 (comment)

You can now reload the job-ingest module without a validator:

$ flux module reload job-ingest disable-validator

grondo avatar Mar 19 '21 01:03 grondo

Are there any other flags of mpiexec that you would like to support besides -n/-np?

Process affinity (-bind-to, -map-by) and GPU management would be a luxury, though not needed for minimum viable product.

To @garlick's suggestion, we're about an order of magnitude off of where we'd like to be. In current Docker:

$ flux mini run --dry-run /bin/true > jobspec.json
$ time for i in {1..100}; do flux job attach $(flux job submit jobspec.json); done                                                                                     

real    0m8.045s
user    0m1.207s
sys     0m0.895s

The context here involves running many unit tests that only take a few ms to run. An overhead of 80ms times 10k jobs is 800 seconds for each of 60 CI jobs in a pipeline, so about 13 core hours or $1 for every pipeline purely in job launch overhead (not to mention the human cost of slower pipeline results). The first hurdle is to get job launch overhead small enough before testing scalability of parallel submission. I understand that some of these tests could be consolidated into fewer invocations, but that comes at a maintenance cost and more potential for interference between tests. I think it should be possible to do single-node resource management faster, thus my motivation to solve this problem rather than work around the slowness of present solutions.

jedbrown avatar Mar 19 '21 05:03 jedbrown

Unfortunately, I don't think Flux will be very helpful if you need to run jobs serially back-to-back. Where Flux would be useful is if you want to submit all your test jobs without oversubscribing resources. Flux would queue the jobs and run them as quickly as possible.

For example, again in current Docker:

ƒ(s=1,d=0) fluxuser@4e8168745b2b:~$ flux mini run --dry-run /bin/true > jobspec.json
ƒ(s=1,d=0) fluxuser@4e8168745b2b:~$ time for i in {1..100}; do flux job attach $(flux job submit jobspec.json); done

real	0m6.639s
user	0m0.706s
sys	0m0.836s
ƒ(s=1,d=0) fluxuser@4e8168745b2b:~$ time flux mini submit --wait --quiet --cc=1-100 /bin/true

real	0m1.914s
user	0m0.222s
sys	0m0.040s

flux mini submit --cc submits copies of a job as quickly as possible and the --wait option waits until all jobs have run and completed. You can see this helps a bit on a single node (roughly 3.5x speedup). The nice thing, though, is you do not have to worry about what resources are available. You could run your Flux instance as a job with 100 cores and all 100 jobs would run (almost) simultaneously (whereas if you ran serially you'd be wasting 99 cores)

I know this doesn't help with your use case, but wanted to point out why there may be a mismatch between the desire to run serial jobs very quickly and Flux's design as a scalable cluster resource manager.

grondo avatar Mar 19 '21 15:03 grondo

For the sub-second micro tasks regime like this, perhaps we can introduce a shim "service" layer using our next FluxExecuter @jameshcorbett did, which submit those tasks in a job stream. (https://github.com/flux-framework/flux-core/pull/3468) Tagging @jameshcorbett.

dongahn avatar Mar 19 '21 15:03 dongahn

Oh, I don't want to run jobs sequentially. Rather, use make -j32 test, which attempts to always have 32 jobs submitted. Some of those jobs are single-core while others use more cores (and sometimes GPUs). But I want the error and output semantics of mpiexec instead of a batch script writing those to named files. If all 32 jobs are single-core, they can all run simultaneously. The resource manager's role is to prevent oversubscription.

My concern here is not with latency of a sequential job launch, but just the time any core is kept busy. Do you happen to know what flux is doing for 80 ms? (Running /bin/true takes less than 1 ms so it's all overhead.) If the core was actually idle for 70 of those 80 seconds (waiting on a network request or something) then I could use make -j 128 to cover the latency. But if it's busy, it'll interfere with active jobs and the cost is significant when it's run a half million times across a pipeline.

jedbrown avatar Mar 19 '21 16:03 jedbrown

Ah, ok. That makes sense thanks.

Do you happen to know what flux is doing for 80 ms?

Yeah, Flux is doing all the work that a job scheduler and resource manager needs to do to queue, schedule, and execute parallel jobs. The overhead involves things like

  • Parsing the job request, storing it in the Flux key-value store and announcing it to the scheduler and job listing services
  • Tracking available resources and selecting those resources for the next queued job
  • Executing a "job shell" that initiates the job's tasks and shepherds I/O, again into the KVS by default

Flux accomplishes the above via distinct service "modules" which communicate via messages, so there is definitely some overhead involved.

We have had an idea to allow some sort of execution system bypass (#2924), which could help a little in this case (you'd be removing the whole extra "job shell" process), but that is not implemented yet.

If you really want a system that can keep real tasks running on cores with a minimum of overhead, it might be a different kind of tool than a cluster resource manager. Maybe something using the job slots feature of GNU make?

grondo avatar Mar 19 '21 16:03 grondo

FWIW I did make a small proof-of-concept run command in C here: b6ba638927a01b2801b5cc37d9a28061971d63af

It actually does improve performance of back to back runs, though nowhere close to 10ms per job. On my laptop:

1x flux mini run /bin/true to prime validator

real    0m0.345s
user    0m0.090s
sys     0m0.016s

1x flux mini run /bin/true

real    0m0.163s
user    0m0.082s
sys     0m0.021s

1x flux job fastrun /bin/true

real    0m0.064s
user    0m0.006s
sys     0m0.000s

100x flux mini run /bin/true

real    0m14.453s
user    0m6.890s
sys     0m1.174s

100x flux job fastrun /bin/true

real    0m7.792s
user    0m0.355s
sys     0m0.322s

It has the following limitations:

  • jobspec only requests cores (one per task)
  • jobspec contains hardwired attributes, including shell options
  • only works for instance owner (uses waitable job, sign mech=none)
  • job output is only dumped at completion, not during execution
  • standard input is redirected from /dev/null

To make it reach parity with flux mini run would be some work, and in the end, less maintainable than the python, so I'm not sure its a great place to invest effort, but it does show whats possible just by tuning the front end.

garlick avatar Mar 19 '21 21:03 garlick

The job slots protocol doesn't give a way to avoid deadlock when running multi-process jobs (which would need to acquire multiple slots to start) -- prior conversation: https://github.com/flux-framework/flux-core/issues/2915#issuecomment-621338594 https://github.com/flux-framework/flux-core/issues/2915#issuecomment-621344325

The JSON parse into a Python dict is way under 1 ms. I'd be curious how to profile the other essential operations.

I see flux job fastrun has similar cost to the flux job attach $(flux job submit jobspec.json). I'm curious if there is some low-hanging fruit remaining. 10ms is enough time for a huge amount of IPC.

jedbrown avatar Mar 20 '21 04:03 jedbrown

Each job leaves behind an eventlog with timestamps on each of its phases. For example here is one from a /bin/true single task run with validator primed (on my laptop):

$ flux job eventlog ƒ35Etkd5
1616250198.042633 submit userid=5588 urgency=16 flags=4
1616250198.057018 depend
1616250198.057075 priority priority=16
1616250198.059016 alloc annotations={"sched":{"resource_summary":"rank0/core0"}}
1616250198.063934 start
1616250198.100513 finish status=0
1616250198.101904 release ranks="all" final=true
1616250198.102623 free
1616250198.102653 clean

The main synchronization points in the front end tool (the simple one) are:

  1. submit job, get back job ID
  2. request to wait for job ID to complete, get result
  3. request output from KVS, get data

Disabling validation probably would help reduce the time between submit and depend. After that, all the events above occur during step 2.

So we likely have to look elsewhere than the front end tool to get latency down further.

garlick avatar Mar 20 '21 14:03 garlick

prior conversation:

I apologize @jedbrown, I forgot it was you who introduced me to that feature :smile:

I see flux job fastrun has similar cost to the flux job attach $(flux job submit jobspec.json). I'm curious if there is some low-hanging fruit remaining. 10ms is enough time for a huge amount of IPC.

I'm sure there is! We haven't attempted to really optimize in this area yet. However, we are currently working on building up some tools that can help us profile Flux's distributed, asynchronous architecture, so we may be able to find that low hanging fruit soon!

Another test we can do is to use mock job execution. If the jobspec has the attribute system.exec.test then the exec system will only pretend to execute the job-shell and job tasks, so we can see how much time is spent executing this process (plus all of its separate synchronization)

$ flux mini run --dry-run --setattr=system.exec.test.run_duration=0.001s /bin/true >test.json

E.g.

ƒ(s=1,d=0,builddir) fluxuser@bf1c16efbbfe:~/flux-core$ time flux job attach -E $(flux job submit test.json)
0.000s: job.submit {"userid":1002,"urgency":16,"flags":0}
0.014s: job.depend
0.014s: job.priority {"priority":16}
0.016s: job.alloc {"annotations":{"sched":{"resource_summary":"rank0/core0"}}}
0.019s: job.start
0.020s: job.finish {"status":0}
0.023s: job.release {"ranks":"all","final":true}
0.024s: job.free
0.024s: job.clean
flux-job: No job output found

real	0m0.222s
user	0m0.032s
sys	0m0.036s

vs

ƒ(s=1,d=0,builddir) fluxuser@bf1c16efbbfe:~/flux-core$ time flux job attach -E $(flux job submit true.json)
0.000s: job.submit {"userid":1002,"urgency":16,"flags":0}
0.014s: job.depend
0.014s: job.priority {"priority":16}
0.017s: job.alloc {"annotations":{"sched":{"resource_summary":"rank0/core0"}}}
0.021s: job.start
0.056s: job.finish {"status":0}
0.060s: job.release {"ranks":"all","final":true}
0.061s: job.free
0.061s: job.clean

real	0m0.250s
user	0m0.037s
sys	0m0.034s

Note there is also some overhead to flux job attach, which has to read multiple eventlogs in the KVS:

e.g. for a job that has already run:

ƒ(s=1,d=0,builddir) fluxuser@bf1c16efbbfe:~/flux-core$ time flux job attach ƒvt139eF
bf1c16efbbfe

real	0m0.041s
user	0m0.014s
sys	0m0.023s

grondo avatar Mar 20 '21 15:03 grondo

For fun I tried creating a job manager service for ingesting and waiting for a job. This cuts off a bit of the overhead. Rather than accept any jobspec, the service accepts a few inputs and creates the jobspec internally, then generates a jobid, commits the jobspec to the KVS, and starts the job on its way. A response is not generated to the original request until the job is done, and it includes the result. So it is kind of a submit + wait, with job ingest cut out of the picture.

This probably won't scale as well as the regular path, since job-ingest is loaded on all ranks, and job-manager only on rank 0. Also job-ingest takes pains to batch KVS commits when there are bursts of jobs and this handles jobs individually.

However it is somewhat faster and lower user/system overhead than the previous attempt for serial jobs (its the --short-cut one):

1x flux mini run /bin/true to prime validator

real    0m0.265s
user    0m0.081s
sys     0m0.004s

100x flux mini run /bin/true

real    0m13.121s
user    0m7.474s
sys     0m1.302s

100x flux job fastrun /bin/true

real    0m6.723s
user    0m0.487s
sys     0m0.381s

100x flux job fastrun --short-cut /bin/true

real    0m4.285s
user    0m0.324s
sys     0m0.317s

Here is an example eventlog showing that submit through alloc go pretty fast.

1616279604.094052 submit userid=5588 urgency=16 flags=0
1616279604.094098 depend
1616279604.094122 priority priority=16
1616279604.095845 alloc annotations={"sched":{"resource_summary":"rank0/core0"}}
1616279604.103782 start
1616279604.118091 finish status=0
1616279604.127827 release ranks="all" final=true
1616279604.128624 free
1616279604.128656 clean

garlick avatar Mar 20 '21 22:03 garlick

Cool, that's entering usable territory for our use case. Open MPI mpiexec has an overhead of about 30 ms, so it's already getting close. How expensive are the serialized KVS commits? Is there a test I could run on my 64-core node to better understand the scaling of your approach?

jedbrown avatar Mar 21 '21 21:03 jedbrown

Let me clean it up a little and then I'll push to a branch for you to try.

The commits aren't necessarily serialized - multiple jobs and their KVS commits can be in flight concurrently. But waiting just a short bit to combine jobspec etc from contemporaneous jobs into one commit saves overhead in the KVS service (walking metadata, allocating memory, etc) and lessens the storage impact of all the job data, since each commit effectively stores a root snapshot in the sqlite backend.

It's not that we couldn't do that here as in the ingest module, but we have to choose our battles.

garlick avatar Mar 21 '21 21:03 garlick

Hi, just checking if this is complete (if so, how should I use to observe that fast submission time?) or unplanned.

jedbrown avatar Aug 14 '25 02:08 jedbrown

Apologies @jedbrown - this was never completed and TBH I sort of lost track of it in all the excitement bringing up El Capitan with Flux. However, there is a stale branch still out there which could be dusted off and reexamined if need be.

We do have the python API and flux-bulksubmit(1) that can be used hide the python startup latency when submitting a lot of jobs. I'm not sure if that fits at all into what you need.

We've also discussed building an mpiexec compatible front end in the context of MPMD

https://github.com/flux-framework/flux-core/issues/2414#issuecomment-2988439688

One quick idea would be to make the fast submission tool mpiexec compatible. That at least constrains the design to something fairly simple so we wouldn't end up needing to eventually implement everything flux run does in C, which would be mildly nightmarish.

garlick avatar Aug 14 '25 16:08 garlick

p.s. I should have tied the mpiexec(1) thought back to the issue description where you used it as an example.

garlick avatar Aug 14 '25 16:08 garlick

Sorry for closing this undone, I really thought it was completed, reopening.

so we wouldn't end up needing to eventually implement everything flux run does in C, which would be mildly nightmarish.

True British-style comedic understatement. 😁

Reading this, I was thinking something similar to your service idea @garlick, have a module in python that uses our current code for the frontend, accepts a simple arguments json object over local RPC and just does the usual thing on the head-node or whichever broker so we don't have to reimplement much of anything.

trws avatar Aug 14 '25 16:08 trws

Would you consider Rust for the submission tool? That would give good library support for CLI handling and JSON construction, and you get a statically-linked executable that's every bit as fast as C.

jedbrown avatar Aug 14 '25 16:08 jedbrown

I would, but it's more that we have a significant amount of logic already built up in python that couldn't be quickly replicated, regardless of language. I've actually wanted to get some rust in the codebase for a while, so if someone wanted to start doing the legwork on that I'd be all for it as long as we keep to a version that we can easily ensure is available on RHEL8 or equivalent.

trws avatar Aug 14 '25 16:08 trws

I wouldn't suggest tying the work needed to bring Rust into the project to this issue or it'll likely stall again.

The python submission server seems like a pretty good thought! I'd need to look at the prototype branch and see what else was required to get the latency down though. As I recall I had to embed this in the job manager, short circuiting both ingest and flux job attach (the C program that the python tools use to watch the eventlog and capture stdio).

garlick avatar Aug 14 '25 16:08 garlick