simpleflow icon indicating copy to clipboard operation
simpleflow copied to clipboard

Running a Group with multiples of the same task callable does not execute all multiples

Open jfwood opened this issue 8 years ago • 7 comments

If I have a task class such as this one:

@activity.with_attributes(task_list='quickstart', version='example') class Delay(object): ....

And then try to build up a Group of these such as:

c1 = canvas.Chain( canvas.ActivityTask(Delay), canvas.ActivityTask(Foo), canvas.ActivityTask(Bar), )

c2 = canvas.Chain( canvas.ActivityTask(Delay), canvas.ActivityTask(Foo), canvas.ActivityTask(Bar), )

g = canvas.Group(c1, c2) self.submit(g) ....

I'm noticing that the the first task (Delay here) is run separately, but only one Foo and Bar is run. Looking at the guts of the Dispatch module, it looks like the module/activity-name are the only thing to distinguish tasks like this, so it seems that for parallel tasks, we'd need to use unique tasks/task-classes for each of the parallel actions?

jfwood avatar Nov 22 '16 00:11 jfwood

Hi John,

Could you describe more precisely your use case? Your code above seems to work:

# zz.py
from __future__ import print_function

from simpleflow import Workflow, activity, canvas, futures
from simpleflow.task import ActivityTask


@activity.with_attributes(task_list='quickstart', version='example')
class Delay(object):
    def execute(self):
        print('Delay')


@activity.with_attributes(task_list='quickstart', version='example')
class Foo(object):
    def execute(self):
        print('Foo')


@activity.with_attributes(task_list='quickstart', version='example')
class Bar(object):
    def execute(self):
        print('Bar')


class MyWorkflow(Workflow):
    name = 'zz'
    version = 'example'
    task_list = 'example'

    def run(self):
        c1 = canvas.Chain(ActivityTask(Delay), ActivityTask(Foo), ActivityTask(Bar))
        c2 = canvas.Chain(ActivityTask(Delay), ActivityTask(Foo), ActivityTask(Bar))
        g = canvas.Group(c1, c2)
        f = self.submit(g)
        futures.wait(f)
[W1]$ simpleflow decider.start --domain TestDomain --task-list example zz.MyWorkflow -N1
[W2]$ simpleflow worker.start --domain TestDomain --task-list quickstart -N1
2016-11-22T11:03:56 INFO [process=MainProcess, pid=23154]: starting <bound method ActivityPoller.start of ActivityPoller(domain=TestDomain, task_list=quickstart)>
2016-11-22T11:03:56 INFO [process=Process-1, pid=23159]: starting ActivityPoller(task_list=quickstart) on domain TestDomain
[W3]$ simpleflow workflow.start --domain TestDomain --task-list example --input '{}' zz.MyWorkflow

On W2:

Delay
Delay
Foo
Foo
Bar
Bar

Thanks!

ybastide avatar Nov 22 '16 10:11 ybastide

Thanks for investigating that! I ended up restoring my code back to the way I had it when I thought it was failing and it seems to be working now :\ I'm wondering if one of the parallel flows timed out...it isn't always obvious when that happens in the SWF UI.

At any rate, I apologize for the inconvenience!

jfwood avatar Nov 22 '16 17:11 jfwood

@jfwood we also have lots of troubles with the official SWF web console. So we built an alternative interface just for our monitoring needs => https://github.com/jbbarth/webflow. If you want more informations or need help getting up and running, don't hesitate to ping me in that project, I'd be happy to help!

jbbarth avatar Nov 22 '16 17:11 jbbarth

Hello Yves,

I'm reopening this as we are still seeing something odd going on with parallel flows. The flow below is closer to what we are trying to run. I was expecting this output at the end:

... IN Bar, indata:Path least traveled...Delay...Boo...Foo ... IN Bar, indata:The easy way...Delay...Foo...Moo ...

Instead I see this:

... IN Boo, indata:Path least traveled...Delay ... IN Bar, indata:The easy way...Delay...Foo...Moo ...

Note how the first Chain sequence is truncated, not moving past the second task. The other Chain succeeds. I don't see any errors in the logs or in the SWF console.


from __future__ import print_function

from simpleflow import Workflow, activity, canvas, futures
from simpleflow.task import ActivityTask


@activity.with_attributes(task_list='quickstart', version='example')
class Delay(object):
    def __init__(self, indata):
        self.indata = indata

    def execute(self):
        print('IN Delay, indata:{}'.format(self.indata))
        self.indata = '{}...Delay'.format(self.indata)
        return self.indata


@activity.with_attributes(task_list='quickstart', version='example')
class Foo(object):
    def __init__(self, indata):
        self.indata = indata

    def execute(self):
        print('IN Foo, indata:{}'.format(self.indata))
        self.indata = '{}...Foo'.format(self.indata)
        return self.indata


@activity.with_attributes(task_list='quickstart', version='example')
class Bar(object):
    def __init__(self, indata):
        self.indata = indata

    def execute(self):
        print('IN Bar, indata:{}'.format(self.indata))
        self.indata = '{}...Bar'.format(self.indata)
        return self.indata


@activity.with_attributes(task_list='quickstart', version='example')
class Boo(object):
    def __init__(self, indata):
        self.indata = indata

    def execute(self):
        print('IN Boo, indata:{}'.format(self.indata))
        self.indata = '{}...Boo'.format(self.indata)
        return self.indata


@activity.with_attributes(task_list='quickstart', version='example')
class Moo(object):
    def __init__(self, indata):
        self.indata = indata

    def execute(self):
        print('IN Moo, indata:{}'.format(self.indata))
        self.indata = '{}...Moo'.format(self.indata)
        return self.indata


class MyWorkflow(Workflow):
    name = 'zz'
    version = 'example'
    task_list = 'example'

    def run(self):
        print('START!!!!')
        c1 = canvas.Chain(
            ActivityTask(Delay, 'Path least traveled'),
            ActivityTask(Boo),
            ActivityTask(Foo),
            ActivityTask(Bar),
            send_result=True)
        c2 = canvas.Chain(
            ActivityTask(Delay, 'The easy way'),
            ActivityTask(Foo),
            ActivityTask(Moo),
            ActivityTask(Bar),
            send_result=True)
        g = canvas.Group(c1, c2)
        f = self.submit(g)
        futures.wait(f)

jfwood avatar Nov 23 '16 22:11 jfwood

@jfwood Could you add idempotent=True to @activity.with_attributes decorator and see if it works ? I'm not sure that the activityId is well incremented when the task is not considered as idempotent in a group or chain.

ampelmann avatar Nov 23 '16 23:11 ampelmann

Thanks @ampelmann. This did work for the example above, but for some reason in my actual flow using idempotent=True, I see the first activity in the two Chains run over and over again even though the task appears to be complete. A single Chain with the idempotent arg runs fine. Still digging into it, thanks!

jfwood avatar Nov 28 '16 17:11 jfwood

Hi @jfwood: The problem is that (non-idempotent) c2.Foo is given the identifier activity-examples.a.Foo-1 when first submitted, then activity-examples.a.Foo-2 after c1.Foo's submission... So c1.Foo is never scheduled (it's activity-examples.a.Foo-1!) while c2.Foo runs twice.

idempotent=True fixes this here as instead of using serial numbers, we use a hash of the arguments.

About idempotent activities running over and over again: could you try https://github.com/botify-labs/simpleflow/pull/159? It looks like the same behavior

(Bonus: what the executor does here)

Submit Delay('Path least traveled') id=activity-examples.a.Delay-1
No event, will schedule
Submit Delay('The easy way') id=activity-examples.a.Delay-2
no event, will schedule
Decision: 2 ScheduleActivityTasks

reset ID counters

Submit Delay('Path least traveled') id=activity-examples.a.Delay-1
event: completed
Submit Boo('Path least traveled...Delay') id=activity-examples.a.Boo-1
No event, will schedule
Submit Delay('The easy way') id=activity-examples.a.Delay-2
event: scheduled
Decision: ScheduleActivityTask for Boo-1

reset ID counters

Submit Delay('Path least traveled') id=activity-examples.a.Delay-1
event: completed
Submit Boo('Path least traveled...Delay') id=activity-examples.a.Boo-1
event: scheduled
Submit Delay('The easy way') id=activity-examples.a.Delay-2
event: completed
Submit Foo('The easy way...Delay') id=activity-examples.a.Foo-1
No event, will schedule
Decision: ScheduleActivityTask for Foo-1

reset ID counters

Submit Delay('Path least traveled') id=activity-examples.a.Delay-1
event: completed
Submit Boo('Path least traveled...Delay') id=activity-examples.a.Boo-1
event: completed
Submit Foo('Path least traveled...Delay...Boo') **id=activity-examples.a.Foo-1**
**event: scheduled**
Submit Delay('The easy way') id=activity-examples.a.Delay-2
event: completed
Submit Foo('The easy way...Delay') **id=activity-examples.a.Foo-2**
**No event, will schedule**
Decision: ScheduleActivityTask for Foo-2

ybastide avatar Nov 29 '16 16:11 ybastide