streamparse icon indicating copy to clipboard operation
streamparse copied to clipboard

Spout prematurely acks, even failed Bolt tuples

Open jgujgu opened this issue 8 years ago • 9 comments

Hello,

I've had problems calling a Spouts fail() method in the boilerplate wordcount project. According to the pystorm quickstart docs and numerous things I've read, calling fail(tuple) in a Bolt should elicit a failure in the originating Spout. However, even with the few modifications I've made, I always get a Spout ack() right when it leaves the Spout. Is this the correct behavior, or do I need to change a setting/instance variable?

I'm on streamparse 3.4.0 and storm 1.0.2.

My logs show the Spout ack() coming before the Bolt logging.

7609 [Thread-14-word_spout-executor[2 2]] INFO  o.a.s.s.ShellSpout - ShellLog pid:35761, name:word_spout 2017-02-20 15:30:33,070 - pystorm.component.word_spout - acking w
hen I shouldnt tup_id: 3219

...

7611 [Thread-21] INFO  o.a.s.t.ShellBolt - ShellLog pid:35760, name:count_bolt 2017-02-20 15:30:33,072 - pystorm.component.count_bolt - BOLT: receiving tup_id/count: 3219

word.py

from itertools import cycle

from streamparse import Spout
from streamparse import ReliableSpout


class WordSpout(ReliableSpout):
    outputs = ['word']

    def initialize(self, stormconf, context):
        self.words = cycle(['dog', 'cat', 'zebra', 'elephant'])
        self.count = 0

    def next_tuple(self):
        self.count += 1
        word = next(self.words)
        self.emit([str(self.count)], tup_id=self.count)

    def ack(self, tup_id):
        self.logger.info("acking when I shouldnt tup_id: {0}".format(tup_id))

    def fail(self, tup_id):
        self.logger.info("failing when I should tup_id: {0}".format(tup_id))

wordcount.py

import os
from collections import Counter

from streamparse import Bolt


class WordCountBolt(Bolt):
    auto_ack = False
    auto_fail = False

    outputs = ['word', 'count']

    def initialize(self, conf, ctx):
        self.counter = Counter()
        self.pid = os.getpid()
        self.total = 0

    def _increment(self, word, inc_by):
        self.counter[word] += inc_by
        self.total += inc_by

    def process(self, tup):
        word = tup.values[0]
        self._increment(word, 10 if word == "dog" else 1)
        # if self.total % 1000 == 0:
            # self.logger.info("counted [{:,}] words [pid={}]".format(self.total,
                                                                    # self.pid))
        self.logger.info("BOLT: receiving tup_id/count: {0}".format(word))
        # self.emit([word, self.counter[word]])
        self.fail(tup)

To reiterate, from what I know, I would expect the Bolt's failing to elicit a fail in the Spout and I would also not expect an ack so early. Am I wrong in thinking?

-jgujgu

jgujgu avatar Feb 21 '17 17:02 jgujgu

As a followup to this, my colleague and I finally ran this on a cluster with Docker. The long and short of it is that we were finally able to observe acks and fails through the Storm UI and logs.

Note that in the acking scenario (the failing scenario I have coded out above), I literally call self.ack() after self.emit(). Finally, to reiterate, in the wordcount boilerplate example, we were able to observe either all acking or all failing with the slight changes I mentioned above.

jgujgu avatar Apr 24 '17 16:04 jgujgu

I'm reopening this because if you're not seeing acks and fails work when running in local mode (with sparse run), there's something wrong somewhere (although it may be on the Storm end).

dan-blanchard avatar Apr 24 '17 17:04 dan-blanchard

I have the same issue. I don't understand what I have to do, if there is an exception in my bolt code.

I read this guide http://streamparse.readthedocs.io/en/stable/topologies.html#dealing-with-errors. If there is an exception Storm will auto-restart the crashed component and the spout will receive a fail() call. But in my case the poisoned tuple remains in topology and Apache Storm restart bolt continuisly.

I tryed to force self.fail(tup) when there is exception in this line, but it doesn't call fail in Spout.

My code analyzes spam mail, so I can't controll input. I'd like to call fail, remove mail from topology and move mail from file system, so I can analyze manually the issue.

To solve the issue I catch the exception and put a pass, but I want to know what is the best practise.

fedelemantuano avatar May 29 '17 08:05 fedelemantuano

@fedelemantuano Just to confirm, you are seeing this issue when running your topology with sparse run, correct? It is not recommended that you Storm topologies in production in local mode (via sparse run), it's really only intended for testing.

dan-blanchard avatar Jun 02 '17 19:06 dan-blanchard

No I use only sparse submit and test in docker container. If you want to try it, you can download the docker, it's all open source. Now I catch the exception and use raise_exception to log it.

fedelemantuano avatar Jun 02 '17 19:06 fedelemantuano

Hello, I think I have the same issue. I'm using default streamparse configuration and I'm deploying my topology using /usr/bin/sparse submit --force --wait 15 but when a tuple fails with a python exception all the topology is restarted. Is this the expected behavior? Reading in the docs I tough only the bolt should be restarted.

ManofWax avatar Mar 21 '18 09:03 ManofWax

@ManofWax Technically, it's not really the entire topology that restarts. The Java worker process that communicates with your Python subprocesses restarts when any of the subprocesses exits. There is usually one of those for each topology on each machine in your cluster.

The problem is that Component.exit_on_exception defaults to True. We have discussed changing the default to False, but have not done that yet because it isn't backward compatible.

To work around that issue, set exit_on_exception = False for each component in your topology that you do not want to exit when there's an exception. You should only enable it for components where you are certain an exception you aren't catching isn't going to leave things in an unrecoverable state. A lot of times it's better to catch the errors you want to recover from yourself.

dan-blanchard avatar Mar 21 '18 13:03 dan-blanchard

anyone solve this issue???

doshu avatar Oct 04 '19 14:10 doshu

I solve the issue. the problem is that you have to manually set the tuple_id in the sput emit method. otherwise storm not track the tuple ack/fail

doshu avatar Oct 05 '19 08:10 doshu