streamparse
streamparse copied to clipboard
kafka-python consumer used in streamparse spout not work, and throw timeout
pyspout.py:
from streamparse import Spout
from kafka import KafkaConsumer
class PySpout(Spout):
outputs = ['word']
def initialize(self, stormconf, context):
self.consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
def next_tuple(self):
for message in self.consumer:
self.emit([message.value])```
wordcount.py(bolt):
```python
import os
from collections import Counter
from streamparse import Bolt
class WordCountBolt(Bolt):
outputs = ['word', 'count']
def initialize(self, conf, ctx):
self.counter = Counter()
self.pid = os.getpid()
self.total = 0
def _increment(self, word, inc_by):
self.counter[word] += inc_by
self.total += inc_by
def process(self, tup):
word = tup.values[0]
self._increment(word, 10 if word == "dog" else 1)
if self.total % 1000 == 0:
self.logger.info("counted [{:,}] words [pid={}]".format(self.total, self.pid))
self.emit([word, self.counter[word]])```
wordcount.py(topology):
```python
class WordCount(Topology):
new_spout = PySpout.spec()
new_bolt = WordCountBolt.spec(inputs={new_spout: Grouping.fields('word')}, par=2)```
after excute: sparse run, then get no data and after a while thow timeout
```bash
5669 [refresh-active-timer] INFO o.a.s.d.worker - All connections are ready for worker 29a4867e-ce62-4a92-bdae-32a8d28d4b60:1024 with id 85db4c94-fa26-41bc-9ffe-7fc2a15ff0f0
5686 [Thread-22-__acker-executor[1 1]] INFO o.a.s.d.executor - Preparing bolt __acker:(1)
5688 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Preparing bolt __system:(-1)
5693 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Prepared bolt __system:(-1)
5696 [Thread-22-__acker-executor[1 1]] INFO o.a.s.d.executor - Prepared bolt __acker:(1)
5700 [Thread-26-new_spout-executor[4 4]] INFO o.a.s.d.executor - Opening spout new_spout:(4)
5703 [Thread-26-new_spout-executor[4 4]] INFO o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5760 [Thread-18-new_bolt-executor[2 2]] INFO o.a.s.d.executor - Preparing bolt new_bolt:(2)
5761 [Thread-18-new_bolt-executor[2 2]] INFO o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5772 [Thread-20-new_bolt-executor[3 3]] INFO o.a.s.d.executor - Preparing bolt new_bolt:(3)
5773 [Thread-20-new_bolt-executor[3 3]] INFO o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5947 [Thread-26-new_spout-executor[4 4]] INFO o.a.s.s.ShellSpout - Launched subprocess with pid 87117
5951 [Thread-18-new_bolt-executor[2 2]] INFO o.a.s.t.ShellBolt - Launched subprocess with pid 87124
5952 [Thread-18-new_bolt-executor[2 2]] INFO o.a.s.t.ShellBolt - Start checking heartbeat...
5953 [Thread-26-new_spout-executor[4 4]] INFO o.a.s.d.executor - Opened spout new_spout:(4)
5953 [Thread-18-new_bolt-executor[2 2]] INFO o.a.s.d.executor - Prepared bolt new_bolt:(2)
5955 [Thread-26-new_spout-executor[4 4]] INFO o.a.s.d.executor - Activating spout new_spout:(4)
5955 [Thread-26-new_spout-executor[4 4]] INFO o.a.s.s.ShellSpout - Start checking heartbeat...
5964 [Thread-20-new_bolt-executor[3 3]] INFO o.a.s.t.ShellBolt - Launched subprocess with pid 87132
5965 [Thread-20-new_bolt-executor[3 3]] INFO o.a.s.t.ShellBolt - Start checking heartbeat...
5965 [Thread-20-new_bolt-executor[3 3]] INFO o.a.s.d.executor - Prepared bolt new_bolt:(3)
52956 [pool-51-thread-1] ERROR o.a.s.s.ShellSpout - Halting process: ShellSpout died. Command: [streamparse_run, -s json spouts.pyspout.PySpout], ProcessInfo pid:87117, name:new_spout exitCode:-1, errorString:
java.lang.RuntimeException: subprocess heartbeat timeout
at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]
52960 [pool-51-thread-1] ERROR o.a.s.d.executor -
java.lang.RuntimeException: subprocess heartbeat timeout
at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]
Fatal error: local() encountered an error (return code 11) while executing 'storm jar /data/newqspace/lusonzeng/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local --no-splash --sleep 9223372036854775807 /tmp/tmpx6Q9ki.yaml'
you can use pykafka
instead of kafka-python
spouts/words.py
from streamparse import Spout
from pykafka import KafkaClient
class WordSpout(Spout):
outputs = ['word']
def initialize(self, stormconf, context):
client = KafkaClient(hosts="c1:9092,c1_1:9092,c1_2:9092")
topic = client.topics['test'.encode('utf-8')]
self.balanced_consumer = topic.get_balanced_consumer(
consumer_group=b"test_group",
auto_commit_enable=True,
zookeeper_connect="c1:2181,c1_1:2181,c1_2:2181"
)
self.reg = self.regex()
def next_tuple(self):
message = self.balanced_consumer.consume()
info = message.value.decode('utf-8')
self.logger.info('==================={}'.format(info))
word = next(self.words)
self.emit([message])
i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.
it works without exception when kafka producer push data every second.
i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.
it works without exception when kafka producer push data every second.
how do you solve it?
i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.
it works without exception when kafka producer push data every second.
how do you solve it?
@jhhnjhhn how do you solve it ?
This code will be fix the issue:
from streamparse import Spout
from kafka import KafkaConsumer
class PySpout(Spout):
outputs = ['word']
def initialize(self, stormconf, context):
self.consumer = KafkaConsumer('test', group_id='my-group', bootstrap_servers=['localhost:9092'], consumer_timeout_ms=10)
def next_tuple(self):
try:
message = self.consumer.next_v1()
self.emit([message.value])
except:
pass