neo4j-streams
neo4j-streams copied to clipboard
Strange behavior of the streams.publish procedure inside apoc.periodic.repeat procedure
Please note that GitHub issues are only meant for bug reports/feature requests. If you have questions on how to use Streams, please ask on StackOverflow instead of creating an issue here.
Assume a background process: CALL apoc.periodic.repeat("process_name", "MATCH (n:Label) WITH n.prp1 AS val1, n.prop2 AS val2 CALL streams.publish("kafka_topic", {val1: val1, val2: val2}) RETURN null", 20)
When created, this process does not send anything to kafka. When the streams.publish is replaced with a dummy statement like counter = counter + 1, it works. When the streams.publish is called manualy outside the apoc.periodic.repeat it also works (messages are published).
I was able to solve this issue by calling streams.publish from a dummy apoc.do.when procedure like this: CALL apoc.periodic.repeat("process_name", "MATCH (n:Label) WITH n.prp1 AS val1, n.prop2 AS val2 CALL apoc.do.when(true, 'CALL streams.publish(\"kafka_topic\", {val1: val1, val2: val2}) RETURN null', 'RETURN null', {val1: val1, val2: val2}) YIELD value RETURN null", 20)
Maybe I missing something but this behavior is not documented anywhere and I was able to find a solution only because I already had a similar periodic.repeat process that already had a streams.publish inside apoc.do.when and worked perfectly.
Versions
- OS: ubuntu 20.04
- Neo4j: desktop enterprice 4.2.5
- Neo4j-APOC: 4.2.0.5
- Neo4j-Streams: 4.0.8
Hi @doctor3030,
sorry for the late reply. Have you already solved your issue?
If not, could you please share your neo4j.conf file, and in particular the Streams related part?
Regards,
Mauro
Well Im still using dummy apoc.do.when as I described above.
Here is my neo4j.conf neo4j.conf.txt
Streams.conf kafka.bootstrap.servers=10.0.0.120:19092,10.0.0.224:19092,10.0.0.114:19092 kafka.auto.offset.reset=earliest kafka.group.id=neo4j kafka.message.max.bytes=1048576000 kafka.enable.auto.commit=false kafka.streams.async.commit=false kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer kafka.session.timeout.ms=25000
streams.source.enabled=true streams.sink.enabled=true streams.sink.enabled.to.rehoboam=true streams.source.enabled.from.rehoboam=true streams.procedures.enabled.from.rehoboam=true streams.source.schema.polling.interval=300000
Thanks, Dmitry
Hi @doctor3030,
the issue is caused by how apoc works. I mean apoc.periodic.repeate open a new transaction and this is why it doesn't works together with Streams. If you need to use it, the workaround you already found is currently the only solution.
Regards,
Mauro