cmd/cue: support for controlling parallelism in command tasks
Originally opened by @myitcv in https://github.com/cuelang/cue/issues/709
cue version: cue version 0.3.0-beta.1 darwin/amd64
I using a custom cue command to start a large number (100s) of independent tasks using exec.Run. The number of process tasks is too large to run them all in parallel, so I am trying to find a way to limit how many tasks are run at a time.
My current attempt involves grouping tasks, and making each group depend on the previous group completing. However, adding the $after dependencies makes the cue command very slow.
Here's an simplfied example of what I am trying:
test_tool.cue
package test
import (
"tool/exec"
)
command: test: {
let numTasks = 40
let parallelism = 10
_tasks: [ for i, _ in numTasks * [0] {"task-\(i)"}]
// Number of task groups, each should have parallelism or less tasks
_numGroups: __div(len(_tasks)+parallelism-1, parallelism)
// Group tasks into numGroups groups
_taskGroups: [
for tg in {
for i, t in _tasks {"\(__mod(i, _numGroups))": "\(t)": t}
} {tg},
]
for i, tg in _taskGroups
for k, t in tg {
let shellCommand = """
echo $(date +%!s(MISSING)): group:\( i ) in-group:\( k ) task:\( t ) starting
/bin/sleep 1
echo $(date +%!s(MISSING)): group:\( i ) in-group:\( k ) task:\( t ) finished
"""
task: "\( i )": "\( k )": exec.Run & {
// Run each group serially
if i > 0 {
$after: task["\( i-1 )"]
}
cmd: ["sh", "-e", "-c", shellCommand]
}
}
}
This should start four groups of tasks serially. The tasks in a group run in parallel. I would expect the tasks to take about four seconds, but it takes over a minute on my computer, and causes high CPU. time cue test shows:
real 1m12.176s
user 1m37.900s
sys 0m3.872s
If I comment out the $after: task["\( i-1 )"], time cue test shows:
real 0m1.340s
user 0m0.667s
sys 0m0.494s
Is there a reason adding these dependencies is so expensive? Is there a better way to control how many tasks run at once?
Originally posted by @svend in https://github.com/cuelang/cue/discussions/640
Original reply by @myitcv in https://github.com/cuelang/cue/issues/709#issuecomment-771712539
Quoting myself from https://github.com/cuelang/cue/discussions/640#discussioncomment-331122:
I've just hit a similar use case where I need to effectively hold a mutex whilst running a specific task (that writes to a part of a file), a number of instances of which are created, much like above, using a comprehension based on a
file.Glob.How about a solution that uses the equivalent of a length
nbuffered channel? Not sure what that would look like in terms of a declaration (using atool/syncbuiltin of sorts?), but a task would effectively be able to depend on such a limiter.Initialising the value of such a limiter (via a builtin) would take a positive integer value. If we were to support the injection of special values like
os, we might extend that to include the CUE equivalent of Go'sruntime.NumCPU().
Original reply by @myitcv in https://github.com/cuelang/cue/issues/709#issuecomment-772808008
Noting a discussion from this afternoon regarding this:
- this definitely makes sense to support within
cue cmd/cuerun - it might make sense to offer some sort of helper API to users of
tools/flow(we could create and use an internal API to start with) - what probably makes more sense is to have something like:
command: x: exec.Run & {
$semaphore: s1: 3
cmd: "........"
}
where a $semaphore field on a command is essentially a [string]: int type, allowing a command to depend on multiple semaphores
- if we support multiple semaphores, obviously need to take care on order of lock acquisition