jobflow
jobflow copied to clipboard
Combining different OutputReference in a single object
I am not sure if this is such a common use case, but recently with @VicTrqt we came across a situation where different jobs produce a list as an output and these lists should be joined and passed as a single list as an input to another job. Here is an extremely simplified version of the problem:
from jobflow import job, Flow
@job
def s(x: list) -> int:
return sum(x)
@job
def l(a: int, b: int) -> list:
return [a, b]
j1 = l(1,2)
j2 = l(3,4)
j3 = s(j1.output + j2.output)
f = Flow([j1, j2, j3])
This code of course does not work since the +
operation is not supported between OutputReference
s.
A trivial workaround here is to change the s
job so that it could handle nested lists and use j3 = s([j1.output, j2.output])
. However this will somewhat break the API and it is not obvious when s
is written that such a use case will show up in the future.
An intermediate job that joins the lists is also an option, but a bit of a waste if it needs to run on a separate job on a cluster. It will also needlessly complicate the Flow structure.
Is there a simpler way to achive this in jobflow? Or some workaround like those described above is the only possible way at the moment?
If there is no way of handling this I wonder if it may be worth introducing a standard way of doing that. For example introducing a CombinedOutputReference
object that will include a list of output references and an operation to perform among them:
import operators
j3 = s(CombinedOutputReference(operators.add, j1.output, j2.output))
that will be applied at the moment of resolving the references. One could also think of including the __add__
method in OutputReference
to directly allow j1.output + j2.output
.
However I am not sure if this really makes sense, as the CombinedOutputReference
object basically becomes a Job that runs when resolving the reference.