neo4j-streams
neo4j-streams copied to clipboard
streams.sink.start() has no effect unless results are iterated
Expected Behavior (Mandatory)
Calling streams.sink.start()
should start the sink, regardless of whether the results are iterated
Actual Behavior (Mandatory)
The sink is not started unless the results are iterated
How to Reproduce the Problem
Steps (Mandatory)
- Call
streams.sink.start
from within another procedure, using GraphDatabaseService#executeTransactionally:
graphDatabaseService.executeTransactionally("CALL streams.sink.start();");
- Observe that the sink is not started by checking the logs
- This code will actually work to start it:
graphDatabaseService.executeTransactionally("CALL streams.sink.start();", Map.of(), r -> r.stream().collect(Collectors.toList()));
This appears to be due to the procedure returning a Stream where the start call only occurs inside of a stream stage, which gets skipped if the stream is not consumed: https://github.com/neo4j-contrib/neo4j-streams/blob/3.5/consumer/src/main/kotlin/streams/procedures/StreamsSinkProcedures.kt#L50-L62 I don't really know Kotlin but I was able to decompile it to Java to see this:
@Procedure("streams.sink.start")
@NotNull
public final Stream sinkStart() {
this.checkEnabled();
return this.checkLeader((Function0)(new Function0() {
// $FF: synthetic method
// $FF: bridge method
public Object invoke() {
return this.invoke();
}
@NotNull
public final Stream invoke() {
Stream var1;
try {
StreamsSinkProcedures.this.getStreamsEventSink().start();
var1 = StreamsSinkProcedures.this.sinkStatus();
} catch (Exception var3) {
Log var10000 = StreamsSinkProcedures.this.log;
if (var10000 != null) {
var10000.error("Cannot start the Sink because of the following exception", (Throwable)var3);
}
Stream var4 = Stream.concat(StreamsSinkProcedures.this.sinkStatus(), Stream.of(new KeyValueResult("exception", ExceptionUtils.getStackTrace((Throwable)var3))));
Intrinsics.checkNotNullExpressionValue(var4, "Stream.concat(sinkStatus…Utils.getStackTrace(e))))");
var1 = var4;
}
return var1;
}
}));
}
Specifications (Mandatory)
Currently used versions
Versions
- OS: macOS
- Neo4j: 4.1.5
- Neo4j-Streams: 4.0.6
It is worth noting that GraphDatabaseService.executeTransactionally(String)
does indeed auto-close the Result, but that seems to not be enough to cause the Stream to be consumed. This makes sense in the case of Streams that may be much larger
https://github.com/neo4j/neo4j/blob/4.2/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/GraphDatabaseFacade.java#L164-L168
@ivangreene we are investigating this. We think this may be a limitation in the semantics/operation of executeTransactionally, but are not yet sure. Will follow up.
Underlying implementation is here: https://github.com/neo4j-contrib/neo4j-streams/blob/8f986d684730eafdde3617768e99ddb02ddc72f4/consumer/src/main/kotlin/streams/procedures/StreamsSinkProcedures.kt#L50
Kernel team has indicated that consumption or non-consumption of the results should not affect whether the TX executes or not.
@moxious It makes sense that executeTransactionally should not consume an entire stream if all of the results are not needed, although that behavior may have surprises like this. I wish I understood Kotlin so I could open a PR, but I don't know how to move the call to 'start' outside of the stream operation 😄
@ivangreene starting the sink indeed does have different threading consequences, which is a conflating factor here, but you can't really move that call outside of the path. Otherwise if starting the consumption service failed (for whatever reason) it would be impossible to be notified of that in the return of the proc. Moving it outside would make the proc blind to the success or failure of the call, and would make the status returned not meaningful
This will likely require us to repro and do some investigation. Good enough for the moment that the work-around is so trivial (just consume the results)
Hmm... so you need some way to call start() immediately, but also return an error in the result Stream if the call to start() fails. Yep I have no idea how to do this in Kotlin 😄 . Ok well thanks for the attention to the issue, and let me know if you have any problems reproducing it