incubator-heron
incubator-heron copied to clipboard
Kafka spout
Is there any example for Kafka as spout for heron?
I checked some examples which are present in contrib
folder but later removed. I tried to move to older version of heron and build it but it is breaking my current changes.
It would be great if someone could share pointers for latest version of heron kafka.
@sleepy-brook Heron currently doesn't have native Kafka spout, but you can utilize the Kafka spout from Storm to compose your topology.
https://github.com/apache/storm/tree/master/external/storm-kafka-client
@nlu90 does it require trident?
If so, AFAIK heron doesn't support trident.
I currently have the Kafka spout code working, however, it falls after some time randomly (around 2 - 7 days). It shows initialized but nothing received from the Kafka broker. There is no sign or error showing its falling. Only thing I can notice is, the Kafka Spout reinitialize when it says Restoring instance after some time.
[2018-10-30 04:06:41 +0000] [INFO] st_heron_instance.py: Restoring instance state to checkpoint 1540822768709483564-1540872269
[2018-10-30 04:06:41 +0000] [INFO] st_heron_instance.py: Instance restore state deserialized
[2018-10-30 04:06:42 +0000] [INFO] st_heron_instance.py: Received start stateful processing for 1540822768709483564-1540872269
[2018-10-30 04:06:42 +0000] [INFO] st_heron_instance.py: Starting bolt/spout instance now...
[2018-10-30 04:06:42 +0000] [INFO] base_instance.py: [+] Initializing InputKafkaLogSpout...
I also tried to use 3 or 1 Kafka Spouts as parallel Spout, but still not fixing the issue. However, one thing I can confirm is that, 3 parallel spouts setup doesn't duplicate message emit.
My Kafka Spout is more like a standard Lafka client in Python and emitting messages in Heron. It appears stop emitting after some time (random time period) and now just have a new update. Kafka reports that the client is up-to-date to the offset, but just not emitting in Heron. Might be an issue in Heron
For reference only
def next_tuple(self):
msg = self.CONFLUENT_CONSUMER.poll(0.1)
if msg is None:
return
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
return
else:
self.log("[*] [ERROR] " + msg.error())
return
self.emit([msg.value()])