siddhi-io-kafka
siddhi-io-kafka copied to clipboard
Fix kafka offset from option not working
The following sample is not working in siddhi, This gives a null pointer error because the default partition list is null inside kafka code and after fixing that even the message offset is not considered when passing the content to the sink.
@App:name("HelloKafka")
@App:description('Consume events from a Kafka Topic and log the messages on the console.')
@source(type='kafka',
topic.list='productions',
threading.option='single.thread',
group.id="group1",
bootstrap.servers='localhost:9092',
partition.no.list='0',
topic.offsets.map='productions=2',
@map(type='json'))
define stream SweetProductionStream (name string, amount double);
@sink(type='log')
define stream OutputStream (name string, amount double);
from SweetProductionStream
select str:upper(name) as name, amount
insert into OutputStream;