appengine-pipelines icon indicating copy to clipboard operation
appengine-pipelines copied to clipboard

In notify_barriers when queue task_retry_limit exceeded the pipeline never finalize

Open ymohii opened this issue 9 years ago • 5 comments

in this line https://github.com/GoogleCloudPlatform/appengine-pipelines/blob/master/python/src/pipeline/pipeline.py#L1672

The library simply passes the exception of taskqueue.TombstonedTaskError which happens often in our application at this point of the library. But when the retries exceeds task_retry_limit the pipelines are not notified with the filled slots and the pipeline stay in the status of Run or Finalize forever.

is this kind of behavior is intended or there is a way to overcome this issue.

ymohii avatar Aug 13 '15 11:08 ymohii

I have seen this issue too, where the pipeline stays in the Finalizing status, when it should have otherwise aborted, but I am not certain that this is the root cause.

What this line is trying to achieve is idempotence, making sure that we haven't already fired the same task from the taskqueue perspective. The goal of this codeblock is to prevent errors when we've actually already created and run the notify tasks.


What is an issue here, at a glance, is that task_list contains one or more tasks, but any one of those tasks being duplicated will throw one of (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError).

There's nothing in the documentation that tells me what happens to the remaining tasks when one fails for this reason. Anyone from the Google team know for sure?

soundofjw avatar Aug 13 '15 18:08 soundofjw

@soundofjw thanks for your reply. Now I agree that it appears that this block of code is not responsible for the problem,I hope someone can help on this issue: The root pipeline is either on run state or finalize for ever. Number of child pipelines I can see from the UI in run state but they never actually started. There is no errors at all in the logs. Which gives me the feeling that the problem may be is that we have number of tasks that has not been added to the queue somehow.

ymohii avatar Aug 17 '15 08:08 ymohii

@ymohii Can you share the root pipeline code, specifically if you've got something going on in the finalized method? - Obfuscate any business logic as necessary.

I've seen this problem occur when my finalized methods throw Exceptions - do you see anything lik that in the logs? (I'd guess not, by your comment).

I wouldn't completely throw out your initial assumption - when I'm in situations like this I tend to start adding logging statements everywhere.


As an alternative debugging option, can you get your pipeline successfully running on your local environment?

I tend to choose using pipe.start() over pipe.start_test() and then executing the queues via the taskqueue stub. This is a pretty intimidating process the first time, so I recommend looking at how appengine-mapreduce runs test cases. - Here's a stripped down version of similar implementation: https://gist.github.com/soundofjw/8bb8247e5f7d1d31d917

soundofjw avatar Aug 17 '15 15:08 soundofjw

@soundofjw Thanks for your feedback my pipelines in simply an implementation of the fan-in example in this article http://sookocheff.com/post/appengine/pipelines/fan-in-fan-out/ I'm taking this code snippet from the article as an example

class SquarePipeline(pipeline.Pipeline):

    def run(self, number):
        logging.info('Squaring: %s' % number)
        return number * number


class Sum(pipeline.Pipeline):

    def run(self, *args):
        value = sum(list(args))
        logging.info('Sum: %s', value)
        return value


class FanInPipeline(pipeline.Pipeline):

    def run(self, count):
        results = []
        for i in xrange(0, count):
            result = yield SquarePipeline(i)
            results.append(result)

        # Waits until all SquarePipeline results are complete
        yield Sum(*results)

In my own pipeline there is nothing more than this fan-in implementation.

For this example (if the my problem happens to it) I logged throw the loop and found that the loop has called the SquarePipeline successfully for all the items But some of the SquarePipeline pipelines do not start at all ( the logging I put inside the run method of SquarePipeline has never been called for some missing pipelines).

I can overcome the problem by increasing the number of retries for the queue in queue.yaml but i think this is not a good thing to rely on without understanding the problem.

regarding finalize method I there is no finalize implementation for it. regarding local environment the pipelines are working fine also in small subsets of data, but in my app the data is more bigger (the loop in FanInPipeline is generating in some cases more than 100 of pipelines) and then the problem arises, which makes me not satisfied with increasing number of retries solution until I understand the source of the problem.

ymohii avatar Aug 24 '15 11:08 ymohii

Two comments:

  1. taskqueue.TombstonedTaskError and taskqueue.TaskAlreadyExistsError don't protect against all duplication of messages. IE a message could be received, processed, and then fail to be acknowledged. So the library code still needs to be idempotent. (by updating datastore)
  2. It would probably be much better to manage retries externally to the task queue. IE: set the taskqueue retry limit to infinite, and manage the sate in datastore. This would avoid the problem of having an inconsistent state between them resulting in silently stopping.

tkaitchuck avatar Oct 02 '15 19:10 tkaitchuck