simpleflow
simpleflow copied to clipboard
Giving multiple blocking dependencies to tasks
Today, we have canvas to create dependencies through groups and chain. The problem is that we don't have the possibility to give them multiple dependencies for one of them.
Ex :
class MyWorkflow(Workflow):
def run(self):
chain1 = Chain(Group(a, b, c), Group(d, e, f))
chain2 = Chain(Group(h, i, j), Group(k, l, m))
my_group = Group(chain1, chain2)
self.submit(my_group)
I'd like to run in parallel two chains, but the last part of my second chain needs Group(a, b, c)
to be completed.
Note that when the workflow is dynamically built (features iteration at Botify for example), we don't necessarily have an access from chain2
to chain1
First proposal
We could give an id to a task, chain or group, and then having a function taking this id, and returning the right future status
class MyWorkflow(Workflow):
def run(self):
chain1 = Chain(Group(a, b, c, id="abc"), Group(d, e, f))
chain2 = Chain(Group(h, i, j), self.wait_by_id("abc"), Group(k, l, m))
my_group = Group(chain1, chain2)
self.submit(my_group)
More ideas are welcome ! @jbbarth @ybastide @AsoSunag
An alternative:
class MyWorkflow(Workflow):
def run(self):
chain1 = Chain(Group(a, b, c), self.signal("abc"), Group(d, e, f))
chain2 = Chain(Group(h, i, j), self.wait_signal("abc"), Group(k, l, m))
my_group = Group(chain1, chain2)
self.submit(my_group)
@ybastide Is it the official signal
concept from SWF ?
That's what I'd use, but after a POC to check this applies
By using the signal concept from SWF can we send a signal in a child workflow and receive it in another child workflow?
Yes, as long as the sender knows the receiver's workflow ID...
... But this is not something we pass around. If the ID is deterministic, the sender can compute it; other ideas welcome :-).
I like the signal approach proposal by @ybastide !
Knowing the workflow ID is just a matter of deducing it from the input or from the workflow execution parameters (tag list? big up to #172 and #94!). It definitely should be deterministic in most cases imho. Or it may be something we discover during the workflow execution, wrapped into the future (we can imagine a future.id
that would behave like future.result
: return the activity/workflow ID if already known, raise ExecutionBlocked
else).
As for your first proposal @ampelmann, do you think it would be OK with signals instead? Managing such an ID is definitely possible, but I'd prefer something generic if possible (that works with simple tasks, workflows, or group/chains).
I'm vaguely imagining:
def signal(self, name, input=None, workflow_id=None, run_id=None, domain=None):
SignalWorkflowExecution(signalName=name, input=input, domain=domain or self.get_domain_somewhat(), ...)
def wait_signal(self, name):
f = self.executor.get_signal_future(name) # completed on WorkflowExecutionSignaled
return f.result
I think it's a very good idea to use signals, and it will be compatible with the child workflow needs, which was not possible with my first proposal.
Let's POC that !