neo4j-streams icon indicating copy to clipboard operation
neo4j-streams copied to clipboard

Strange behavior of the streams.publish procedure inside apoc.periodic.repeat procedure

Open doctor3030 opened this issue 4 years ago • 3 comments

Please note that GitHub issues are only meant for bug reports/feature requests. If you have questions on how to use Streams, please ask on StackOverflow instead of creating an issue here.

Assume a background process: CALL apoc.periodic.repeat("process_name", "MATCH (n:Label) WITH n.prp1 AS val1, n.prop2 AS val2 CALL streams.publish("kafka_topic", {val1: val1, val2: val2}) RETURN null", 20)

When created, this process does not send anything to kafka. When the streams.publish is replaced with a dummy statement like counter = counter + 1, it works. When the streams.publish is called manualy outside the apoc.periodic.repeat it also works (messages are published).

I was able to solve this issue by calling streams.publish from a dummy apoc.do.when procedure like this: CALL apoc.periodic.repeat("process_name", "MATCH (n:Label) WITH n.prp1 AS val1, n.prop2 AS val2 CALL apoc.do.when(true, 'CALL streams.publish(\"kafka_topic\", {val1: val1, val2: val2}) RETURN null', 'RETURN null', {val1: val1, val2: val2}) YIELD value RETURN null", 20)

Maybe I missing something but this behavior is not documented anywhere and I was able to find a solution only because I already had a similar periodic.repeat process that already had a streams.publish inside apoc.do.when and worked perfectly.

Versions

  • OS: ubuntu 20.04
  • Neo4j: desktop enterprice 4.2.5
  • Neo4j-APOC: 4.2.0.5
  • Neo4j-Streams: 4.0.8

doctor3030 avatar Aug 21 '21 04:08 doctor3030

Hi @doctor3030,

sorry for the late reply. Have you already solved your issue? If not, could you please share your neo4j.conf file, and in particular the Streams related part?

Regards,

Mauro

mroiter-larus avatar Nov 17 '21 15:11 mroiter-larus

Well Im still using dummy apoc.do.when as I described above.

Here is my neo4j.conf neo4j.conf.txt

Streams.conf kafka.bootstrap.servers=10.0.0.120:19092,10.0.0.224:19092,10.0.0.114:19092 kafka.auto.offset.reset=earliest kafka.group.id=neo4j kafka.message.max.bytes=1048576000 kafka.enable.auto.commit=false kafka.streams.async.commit=false kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer kafka.session.timeout.ms=25000

streams.source.enabled=true streams.sink.enabled=true streams.sink.enabled.to.rehoboam=true streams.source.enabled.from.rehoboam=true streams.procedures.enabled.from.rehoboam=true streams.source.schema.polling.interval=300000

Thanks, Dmitry

doctor3030 avatar Nov 18 '21 02:11 doctor3030

Hi @doctor3030,

the issue is caused by how apoc works. I mean apoc.periodic.repeate open a new transaction and this is why it doesn't works together with Streams. If you need to use it, the workaround you already found is currently the only solution.

Regards,

Mauro

mroiter-larus avatar Dec 01 '21 15:12 mroiter-larus