streamparse
streamparse copied to clipboard
Spout prematurely acks, even failed Bolt tuples
Hello,
I've had problems calling a Spouts fail()
method in the boilerplate wordcount project. According to the pystorm quickstart docs and numerous things I've read, calling fail(tuple)
in a Bolt should elicit a failure in the originating Spout. However, even with the few modifications I've made, I always get a Spout ack()
right when it leaves the Spout. Is this the correct behavior, or do I need to change a setting/instance variable?
I'm on streamparse 3.4.0
and storm 1.0.2
.
My logs show the Spout ack()
coming before the Bolt logging.
7609 [Thread-14-word_spout-executor[2 2]] INFO o.a.s.s.ShellSpout - ShellLog pid:35761, name:word_spout 2017-02-20 15:30:33,070 - pystorm.component.word_spout - acking w
hen I shouldnt tup_id: 3219
...
7611 [Thread-21] INFO o.a.s.t.ShellBolt - ShellLog pid:35760, name:count_bolt 2017-02-20 15:30:33,072 - pystorm.component.count_bolt - BOLT: receiving tup_id/count: 3219
word.py
from itertools import cycle
from streamparse import Spout
from streamparse import ReliableSpout
class WordSpout(ReliableSpout):
outputs = ['word']
def initialize(self, stormconf, context):
self.words = cycle(['dog', 'cat', 'zebra', 'elephant'])
self.count = 0
def next_tuple(self):
self.count += 1
word = next(self.words)
self.emit([str(self.count)], tup_id=self.count)
def ack(self, tup_id):
self.logger.info("acking when I shouldnt tup_id: {0}".format(tup_id))
def fail(self, tup_id):
self.logger.info("failing when I should tup_id: {0}".format(tup_id))
wordcount.py
import os
from collections import Counter
from streamparse import Bolt
class WordCountBolt(Bolt):
auto_ack = False
auto_fail = False
outputs = ['word', 'count']
def initialize(self, conf, ctx):
self.counter = Counter()
self.pid = os.getpid()
self.total = 0
def _increment(self, word, inc_by):
self.counter[word] += inc_by
self.total += inc_by
def process(self, tup):
word = tup.values[0]
self._increment(word, 10 if word == "dog" else 1)
# if self.total % 1000 == 0:
# self.logger.info("counted [{:,}] words [pid={}]".format(self.total,
# self.pid))
self.logger.info("BOLT: receiving tup_id/count: {0}".format(word))
# self.emit([word, self.counter[word]])
self.fail(tup)
To reiterate, from what I know, I would expect the Bolt's failing to elicit a fail in the Spout and I would also not expect an ack so early. Am I wrong in thinking?
-jgujgu
As a followup to this, my colleague and I finally ran this on a cluster with Docker. The long and short of it is that we were finally able to observe acks
and fails
through the Storm UI and logs.
Note that in the acking scenario (the failing scenario I have coded out above), I literally call self.ack()
after self.emit()
. Finally, to reiterate, in the wordcount boilerplate example, we were able to observe either all acking or all failing with the slight changes I mentioned above.
I'm reopening this because if you're not seeing acks and fails work when running in local mode (with sparse run
), there's something wrong somewhere (although it may be on the Storm end).
I have the same issue. I don't understand what I have to do, if there is an exception in my bolt code.
I read this guide http://streamparse.readthedocs.io/en/stable/topologies.html#dealing-with-errors. If there is an exception Storm will auto-restart the crashed component and the spout will receive a fail() call. But in my case the poisoned tuple remains in topology and Apache Storm restart bolt continuisly.
I tryed to force self.fail(tup)
when there is exception in this line, but it doesn't call fail in Spout.
My code analyzes spam mail, so I can't controll input. I'd like to call fail, remove mail from topology and move mail from file system, so I can analyze manually the issue.
To solve the issue I catch the exception and put a pass
, but I want to know what is the best practise.
@fedelemantuano Just to confirm, you are seeing this issue when running your topology with sparse run
, correct? It is not recommended that you Storm topologies in production in local mode (via sparse run
), it's really only intended for testing.
No I use only sparse submit and test in docker container. If you want to try it, you can download the docker, it's all open source. Now I catch the exception and use raise_exception to log it.
Hello, I think I have the same issue.
I'm using default streamparse configuration and I'm deploying my topology using /usr/bin/sparse submit --force --wait 15
but when a tuple fails with a python exception all the topology is restarted.
Is this the expected behavior? Reading in the docs I tough only the bolt should be restarted.
@ManofWax Technically, it's not really the entire topology that restarts. The Java worker process that communicates with your Python subprocesses restarts when any of the subprocesses exits. There is usually one of those for each topology on each machine in your cluster.
The problem is that Component.exit_on_exception
defaults to True
. We have discussed changing the default to False
, but have not done that yet because it isn't backward compatible.
To work around that issue, set exit_on_exception = False
for each component in your topology that you do not want to exit when there's an exception. You should only enable it for components where you are certain an exception you aren't catching isn't going to leave things in an unrecoverable state. A lot of times it's better to catch the errors you want to recover from yourself.
anyone solve this issue???
I solve the issue. the problem is that you have to manually set the tuple_id in the sput emit method. otherwise storm not track the tuple ack/fail