streamparse icon indicating copy to clipboard operation
streamparse copied to clipboard

kafka-python consumer used in streamparse spout not work, and throw timeout

Open lusonzeng opened this issue 5 years ago • 6 comments

pyspout.py:

from streamparse import Spout

from kafka import KafkaConsumer

class PySpout(Spout):

    outputs = ['word']

    def initialize(self, stormconf, context):

        self.consumer = KafkaConsumer('test',

                                 group_id='my-group',

                                 bootstrap_servers=['localhost:9092'])

    def next_tuple(self):

        for message in self.consumer:

            self.emit([message.value])```



wordcount.py(bolt):

```python
import os

from collections import Counter

from streamparse import Bolt

class WordCountBolt(Bolt):

    outputs = ['word', 'count']

    def initialize(self, conf, ctx):

        self.counter = Counter()

        self.pid = os.getpid()

        self.total = 0

    def _increment(self, word, inc_by):

        self.counter[word] += inc_by

        self.total += inc_by

    def process(self, tup):

        word = tup.values[0]

        self._increment(word, 10 if word == "dog" else 1)

        if self.total % 1000 == 0:

            self.logger.info("counted [{:,}] words [pid={}]".format(self.total, self.pid))

        self.emit([word, self.counter[word]])```


wordcount.py(topology):

```python
class WordCount(Topology):

    new_spout = PySpout.spec()

    new_bolt = WordCountBolt.spec(inputs={new_spout: Grouping.fields('word')}, par=2)```

after excute: sparse run, then get no data and after a while thow timeout
```bash
5669 [refresh-active-timer] INFO  o.a.s.d.worker - All connections are ready for worker 29a4867e-ce62-4a92-bdae-32a8d28d4b60:1024 with id 85db4c94-fa26-41bc-9ffe-7fc2a15ff0f0
5686 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.d.executor - Preparing bolt __acker:(1)
5688 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Preparing bolt __system:(-1)
5693 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Prepared bolt __system:(-1)
5696 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.d.executor - Prepared bolt __acker:(1)
5700 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Opening spout new_spout:(4)
5703 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5760 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.d.executor - Preparing bolt new_bolt:(2)
5761 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5772 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.d.executor - Preparing bolt new_bolt:(3)
5773 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5947 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - Launched subprocess with pid 87117
5951 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.t.ShellBolt - Launched subprocess with pid 87124
5952 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.t.ShellBolt - Start checking heartbeat...
5953 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Opened spout new_spout:(4)
5953 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.d.executor - Prepared bolt new_bolt:(2)
5955 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Activating spout new_spout:(4)
5955 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - Start checking heartbeat...
5964 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.t.ShellBolt - Launched subprocess with pid 87132
5965 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.t.ShellBolt - Start checking heartbeat...
5965 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.d.executor - Prepared bolt new_bolt:(3)


52956 [pool-51-thread-1] ERROR o.a.s.s.ShellSpout - Halting process: ShellSpout died. Command: [streamparse_run, -s json spouts.pyspout.PySpout], ProcessInfo pid:87117, name:new_spout exitCode:-1, errorString: 
java.lang.RuntimeException: subprocess heartbeat timeout
	at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]
52960 [pool-51-thread-1] ERROR o.a.s.d.executor - 
java.lang.RuntimeException: subprocess heartbeat timeout
	at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]




Fatal error: local() encountered an error (return code 11) while executing 'storm jar /data/newqspace/lusonzeng/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local --no-splash --sleep 9223372036854775807 /tmp/tmpx6Q9ki.yaml'

lusonzeng avatar Aug 24 '18 10:08 lusonzeng

you can use pykafka instead of kafka-python spouts/words.py

from streamparse import Spout
from pykafka import KafkaClient


class WordSpout(Spout):
    outputs = ['word']

    def initialize(self, stormconf, context):
        client = KafkaClient(hosts="c1:9092,c1_1:9092,c1_2:9092")
        topic = client.topics['test'.encode('utf-8')]
        self.balanced_consumer = topic.get_balanced_consumer(
            consumer_group=b"test_group",
            auto_commit_enable=True,
            zookeeper_connect="c1:2181,c1_1:2181,c1_2:2181"
        )
        self.reg = self.regex()

    def next_tuple(self):
        message = self.balanced_consumer.consume()
        info = message.value.decode('utf-8')
        self.logger.info('==================={}'.format(info))
        word = next(self.words)
        self.emit([message])

amoyiki avatar Aug 25 '18 13:08 amoyiki

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

lusonzeng avatar Sep 05 '18 07:09 lusonzeng

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

how do you solve it?

jhhnjhhn avatar Dec 04 '18 04:12 jhhnjhhn

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

how do you solve it?

tianser avatar Apr 23 '19 03:04 tianser

@jhhnjhhn how do you solve it ?

tianser avatar Apr 23 '19 04:04 tianser

This code will be fix the issue:

from streamparse import Spout
from kafka import KafkaConsumer

class PySpout(Spout):

    outputs = ['word']

    def initialize(self, stormconf, context):
        self.consumer = KafkaConsumer('test', group_id='my-group', bootstrap_servers=['localhost:9092'], consumer_timeout_ms=10)

    def next_tuple(self):
        try:
            message = self.consumer.next_v1()
            self.emit([message.value])
        except:
            pass

Abdelhadi92 avatar Jan 15 '20 08:01 Abdelhadi92