Submitting many tasks from Python fails due to serialization size limit
I have an issue where submitting many Python functions in a single job fails. I found this simple reproducer:
import time
from hyperqueue import Job, LocalCluster
def work(*args, **kwargs):
time.sleep(1)
print(args)
print(kwargs)
# Spawn a HQ server
with LocalCluster() as cluster:
# Add a single HyperQueue worker to the server
cluster.start_worker()
# Create a client and a job
client = cluster.client()
job = Job()
# Add a task graph
tasks = {}
for i in range(10000):
name = f"work_{i}"
stdout = f"{name}.stdout"
stderr = f"{name}.stderr"
tasks[name] = job.function(work, args=(["hello"] * 10000, i), kwargs=None, stdout=stdout, stderr=stderr, name=name)
print(tasks[name].label)
# Submit the job
submitted = client.submit(job)
# Wait until the job completes
client.wait_for_jobs([submitted])
Traceback (most recent call last):
File "/home/user/hq-test.py", line 29, in <module>
submitted = client.submit(job)
File "/usr/lib/python3.13/site-packages/hyperqueue/client.py", line 85, in submit
job_id = self.connection.submit_job(job_desc)
File "/usr/lib/python3.13/site-packages/hyperqueue/ffi/client.py", line 31, in submit_job
return ffi.submit_job(self.ctx, job_description)
~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
Exception: TakoError(GenericError("Serialization failed: SizeLimit"))
It depends on the number of tasks in the job: 10000 fails, but if you lower it to e.g. 1000, it works as expected. The size of arguments for each task should be the same here, so the problem is not that the tasks are too big, but that they are serialized all together. This is very inconvenient for the user because it is hard to anticipate the serialization size (which can also be very dynamic). Can you improve the submit logic so that the necessary info is serialized and submitted by parts?
The proper solution for this is to enable creating task arrays from Python; if you want to execute a large number of tasks that do the same thing, that's the right approach to use.
We're now starting to think about a new Python API, which will include that.
Well, in practice the tasks don't do the same thing - this is just a simple reproducer. In practice there are 8 different "types of things" handled by 2 Python functions and 6 dependencies in this "byte of things" 😄
Also in practice the arguments of the tasks are not only simple IDs that could be parametrized in a task array; the arguments involve some dataclass objects that are unique for each task and must be queried from a database. This is why I hit the limit at approximately 10000 tasks already, but still - this is not big data at all.
Sure, I could reorganize the tasks into many jobs per 8 tasks, but as far as I understand, HyperQueue's jobs are intended to be "big". Of course, if you have some ideas about how to organize it better, I would be glad for suggestions. I can share more details about the current code if needed.
Task arrays are not actually parametrized by simple IDs, but by arbitrary byte arrays, which is what makes e.g. this work. So we could serialize pretty much arbitrary Python values into this byte array and then pass it to each task separately as input.
Since all of args, kwargs and the Python function to be executed are actually serialized using a similar mechanism right now, I think that we might be able to reuse taskarrays for almost all Python computations, because in the end each task simply executes the same program (the Python interpreter) anyway, as we don't support Python workers at the moment.
You're right that HQ jobs should be "large", in that they should contain many tasks, but most of this benefit for HQ actually comes from the usage of taskarrays. If you create 100k different tasks in a job from Python, it won't be much better for HQ than having 100k jobs with a single task each (it will be better, but much worse than doing a 100k task taskarray job).
Before we implement this "taskarray compression" mechanism, I think that there are mostly only two possibilites here. Either you'll have to make the jobs smaller and create more of them, so that they fit the serialization limit, or we can bump the serialization limit or make it runtime configurable. @spirali Any thoughts?
We want in new Python API expose also open jobs, so you can repeatedly submit tasks into the same job. Missing open jobs are the probably the biggest problem in the current API. The other problem is that current Python API is still using the old architecture and adds everything into task body. The new API should utilize HQ_ENTRY for args/kwargs and task body (serialized function) should be shared with more tasks.
Nevertheless we should probably support internal splitting of large submit into several messages. It should not be that difficult, except covering some corner cases. I am adding it into my main TODO list.
Before we implement this "taskarray compression" mechanism, I think that there are mostly only two possibilites here. Either you'll have to make the jobs smaller and create more of them, so that they fit the serialization limit, or we can bump the serialization limit or make it runtime configurable. @spirali Any thoughts?
Yes, a hot fix is expose the serialization limit. But I would do it as a build option, and hope that users who can build own hyperqueue are also capable to understand what are they modifying.
Hmm, that's interesting. What is the main difference for HQ between a task array and a regular job? I.e. why is task array better?
Also, what are the actual limits now? Is there a (known) limit for the task array too? I'd guess that if you take the serialized function, args, and kwargs from Python that are tripping the limit now, and change it to an input for task array, it might trip some limit too.
In the current version, the main difference is how tasks are get into HQ. HQ distinguish two type of submits: array job and task graph.
- array job has a single task definition and list of HQ_ENTRIEs (or just id ranges).
- in task graph, each task is "full size" task, with own resource request, time limit, task body, etc.
In the current version, everything is at the end translated into individual Tako tasks*, but it is done internally so it does not hit any serialization limit. Btw: Since we know that array job are homogenous that we can skip some extra chores in constructing Tako tasks.
The current version is not able split any kind of submit into several messages, so a large HQ_ENTRIES will also hit the limit. But implementing splitting of HQ_ENTRIES into several messages will be easier, as it is just plain binary data with known size before the HQ serialization.
*) I am preparing new kind of "unfolding" scheduler that can better utilize task arrays internally in Tako layer, this should improve performance and memory consumptions for scenarios with 100k+ tasks.
@spirali Thanks for the explanation. If I understand it correctly, unfortunately task array would not be applicable for my case because of dependencies between the tasks. This might also complicate the submit splitting, but it is possible to sort the tasks before the submit such that all dependencies are before the task that depends on them, so it does not matter where you split the list. But I guess this would have to be done on the Python side...
The topological sort is already done before the submit in Rust code. The only part that would need some effort is to create an estimate how big a task will be in the serialized form.
Unfortunately, dependencies are now a show stopper for an array job.
Out of curiosity, is your dependencies expressible in a concise form by a Presburger arithmetic (the arithmetic with only addition without multiplication)? This means that dependencies between a task set A and task set B can be for example:
B[i] needs A[i * 2] and A[i * 2 + 1] B[i] needs A[i // 2] B[i] needs all A[j] for all j B[i] needs all A[j] for all j < i * 2
what cannot be conscisely expressed e.g.:
B[i] needs A[i * i] B[i] needs some A[f(i)] where j is an arbitrary complex function
Out of curiosity, is your dependencies expressible in a concise form by a Presburger arithmetic
Yes, I can decompose it into 8 task sets $S_1, ..., S_8$ like this:
- tasks from $S_1$ have no dependencies
- $S_2[i]$ depends on $S_1[i]$
- tasks from $S_3$ have no dependencies
- $S_4[i]$ depends on $S_3[i]$
- $S_5[i]$ depends on $S_4[i]$
- $S_6[i]$ depends on $S_5[i]$
- $S_7[i]$ depends on $S_4[i]$
- $S_8[i]$ depends on $S_7[i]$
But the sets $S_1$ - $S_2$ can have different size than $S_3$ - $S_8$.
Thank you. Sometimes I am thinking about scheduler build over Presburger artihmetics. It allows to have fast scheduler with dependencies without creating explicit Python object for each task. But design a good Python API around it is slightly tricky, and my prototype shows that is is useful for pipelines with more than 200k+ tasks, so I am not sure how practical is it.
It would be easy for me to solve this issue on the client side if there was a Python API for the open jobs, but I guess it is not available yet...
Hmm I'm trying to refactor our code with splitting the submission manually and got this error:
thread '<unnamed>' panicked at /home/user/hyperqueue/crates/hyperqueue/src/common/arraydef.rs:61:13:
assertion failed: last_id.map(|last_id| last_id < id).unwrap_or(true)
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
This is the Python traceback:
File "/home/user/compute.py", line 863, in compute_all
hq_client.wait_for_jobs(submitted_jobs)
File "/home/user/miniforge3/envs/base/lib/python3.12/site-packages/hyperqueue/client.py", line 100, in wait_for_jobs
failed_jobs = self.connection.wait_for_jobs(job_ids, callback)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/miniforge3/envs/base/lib/python3.12/site-packages/hyperqueue/ffi/client.py", line 35, in wait_for_jobs
return ffi.wait_for_jobs(self.ctx, job_ids, callback)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pyo3_runtime.PanicException: assertion failed: last_id.map(|last_id| last_id < id).unwrap_or(true)
It would be easy for me to solve this issue on the client side if there was a Python API for the open jobs, but I guess it is not available yet...
Yes, unfortunately open jobs are not supported in the current API. As it is quite a big change we plan it for a new API.
Hmm I'm trying to refactor our code with splitting the submission manually and got this error:
thread '<unnamed>' panicked at /home/user/hyperqueue/crates/hyperqueue/src/common/arraydef.rs:61:13: assertion failed: last_id.map(|last_id| last_id < id).unwrap_or(true) note: run with `RUST_BACKTRACE=1` environment variable to display a backtraceThis is the Python traceback:
File "/home/user/compute.py", line 863, in compute_all hq_client.wait_for_jobs(submitted_jobs) File "/home/user/miniforge3/envs/base/lib/python3.12/site-packages/hyperqueue/client.py", line 100, in wait_for_jobs failed_jobs = self.connection.wait_for_jobs(job_ids, callback) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/user/miniforge3/envs/base/lib/python3.12/site-packages/hyperqueue/ffi/client.py", line 35, in wait_for_jobs return ffi.wait_for_jobs(self.ctx, job_ids, callback) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ pyo3_runtime.PanicException: assertion failed: last_id.map(|last_id| last_id < id).unwrap_or(true)
I am not sure what is happening, but it triggers an assert that checks that array IDs has to be sorted.
I am not sure what is happening, but it triggers an assert that checks that array IDs has to be sorted.
I think that I made a mistake in the splitting and some dependencies were referencing tasks from a previous job 😇
I am not sure what is happening, but it triggers an assert that checks that array IDs has to be sorted.
I think that I made a mistake in the splitting and some dependencies were referencing tasks from a previous job 😇
@spirali Actually, I still get this problem even when I fixed the dependencies. Notice that the last assert is from hq_client.wait_for_jobs(submitted_jobs), where submitted_jobs is a list of the submitted jobs, each with max ~8000 tasks after manual splitting. So all the jobs were submitted correctly (and I can confirm this in the hq job list output), but wait_for_jobs() fails for some reason. Any ideas?
Does it fail when you wait on a single such submitted job or when you wait on all of them?
So far I have been waiting for all of them. I can try waiting for each job separately, but it will take some time as I don't have a simple reproducer for this failure and need to wait until the current jobs are computed...
OK I tried reproducing this issue and found something interesting. First, I have changed the reproducer from the first post:
import time
from hyperqueue import Job, LocalCluster
def work(*args, **kwargs):
time.sleep(1)
print(args)
print(kwargs)
# Spawn a HQ server
with LocalCluster() as cluster:
# Add a single HyperQueue worker to the server
cluster.start_worker()
# Create a client and a job
client = cluster.client()
# List for submitted jobs
submitted_jobs = []
# Submit several jobs
for k in range(4):
job = Job()
# Add a task graph
tasks = {}
for i in range(5000):
name = f"work_{i}"
stdout = f"{name}.stdout"
stderr = f"{name}.stderr"
tasks[name] = job.function(work, args=(["hello"] * 10000, i), kwargs=None, stdout=stdout, stderr=stderr, name=name)
print(tasks[name].label)
# Submit the job
submitted_jobs.append(client.submit(job))
# Wait until the jobs complete
client.wait_for_jobs(submitted_jobs)
Now, it depends how pyhq is installed:
- from source with
maturin build->assertion failed: last_id.map(|last_id| last_id < id).unwrap_or(true) - from source with
maturin build --release-> seems to work - install hyperqueue-0.23.0-cp39-abi3-manylinux_2_28_x86_64.whl from the github release -> seems to work
(Note that I did not wait until the whole script finishes but interrupted it a few minutes after the progressbar appeared.)
So the --release flag probably just omits the failing assert, but wait_for_jobs seems to work without it.
Sorry, I didn't examine the original code thoroughly enough, I thought that this is happening in a different situation (next time I'll ask for RUST_BACKTRACE=1 straight away :) ). This was a plain bug on our side. https://github.com/It4innovations/hyperqueue/pull/997 will fix this.