fs2-grpc
fs2-grpc copied to clipboard
A closed client can still receive events
To be more precise, onClose can be called on the underlying Fs2StreamClientCallListener while the client (and its dispatcher) are already closed. I don't have a full end-to-end example, but it comes down to this:
val someServiceResource: Resource[IO, SomeServiceFs2Grpc[IO, Unit]] = ???
someServiceResource
.map(
_.someStream(SomeStreamRequest(), ()).interruptAfter(1.second).compile.drain
)
.useEval
.unsafeRunSync()
Output:
Jun 07, 2023 1:52:23 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@13affa0e
java.lang.IllegalStateException: dispatcher already shutdown
at cats.effect.std.Dispatcher$$anon$2.unsafeToFutureCancelable(Dispatcher.scala:422)
at cats.effect.std.DispatcherPlatform.unsafeRunTimed(DispatcherPlatform.scala:59)
at cats.effect.std.DispatcherPlatform.unsafeRunTimed$(DispatcherPlatform.scala:58)
at cats.effect.std.Dispatcher$$anon$2.unsafeRunTimed(Dispatcher.scala:317)
at cats.effect.std.DispatcherPlatform.unsafeRunSync(DispatcherPlatform.scala:51)
at cats.effect.std.DispatcherPlatform.unsafeRunSync$(DispatcherPlatform.scala:50)
at cats.effect.std.Dispatcher$$anon$2.unsafeRunSync(Dispatcher.scala:317)
at fs2.grpc.client.Fs2StreamClientCallListener.onClose(Fs2StreamClientCallListener.scala:43)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:468)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:432)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:465)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
So interruptAfter preemptively stops the stream after which the surrounding resource is immediately closed. But onClose still gets called afterwards and invokes the already closed dispatcher.
What happens if you provide a dispatcher that waits for the completion of active fibers?
Same result. The error seems to happen when submitting a new task to the dispatcher. So the dispatcher is already closed even before the task is started.
I wonder how we best avoid that error. Gate submission through something?
Perhaps ideally the cancellation triggered by interruptAfter should backpressure until onClose has been called? Assuming onClose will always be called at least once.
I guess the most sensible thing might be to make clients themselves "closable" resources and reject all events after the client has been closed. But then all methods that don't return a Resource yet (def client, def mkClient, def stub) would have to change incompatibly just for this.
@Jasper-M You are welcome to give it a go, I do not have the bandwidth atm. I suppose it is time to spring clean that client interface in a major bump that allows breaking changes.