gRPC-haskell
gRPC-haskell copied to clipboard
streaming asynchronous exception
Hi! I'm using async package to manage my threads. In many places race function is very handy. For example:
res <- race receiveLongServerStream executeSomethingShort
where receiveLongServerStream is client action which receives some events from gRPC server, any amount of events, basically it's infinite thread
and executeSomethingShort is something short-living process which should normally return value first, then race function should cancel receiveLongServerStream and return res which is supposed to be Right there, because executeSomethingShort returned value first.
But for some reason all expression is blocked, probably because race can't cancel receiveLongServerStream for some reason. Here is docs for cancel function if it can help
https://hackage.haskell.org/package/async-2.2.2/docs/Control-Concurrent-Async.html#v:cancel
I just want to know how I can deal with it. Because this behaviour is causing deadlocks.
@tim2CF: My first instinct when I see an unkillable thread is that something is blocked on an unsafe FFI call
Probably gRPC C code is causing this right? receiveLongServerStream in my example is client-side listener, basically subscription for server events with some attached IO handler, and it has very big timeout, for example 1 hour, because it don't make sense to reopen connection for events which I want to always listen and handle asap:
https://github.com/coingaming/lnd-client/blob/fe0f9560ee0f85107f8520cc8088eea3aceac4fc/src/LndClient/RPC.hs#L362-L366
In some cases I can replace race with some combination of link =<< (async this)
But process which is not cancellable by race was such a surprise for me, spent couple of hours to find it)
Yeah, I suspect an FFI call in the binding to the grpc package is responsible for this. It might either be in the logic for the server itself or in some finalizer which is triggered when the thread is cancelled and then the finalizer is not cancellable.
The second most common reason for this is finalization logic that inappropriately masks exceptions, leading to Haskell code that blocks and is uninterruptible.
Thanks @Gabriel439 ! A bit unrelated question.. is there any good way to handle nicely subscription misbehaviour in case handler receives unexpected values? Here is an example
https://github.com/coingaming/lnd-client/blob/6760cbf23ad351b982fefad5585199eca818943b/src/LndClient/RPC.hs#L207-L212
There my gRPC stream handler throws exception, just because I have no idea how to terminate subscription from handler when handler receives some corrupted or unexpected data. If there is nice way without exceptions, I would like to use it
@tim2CF: There is a chance that I might have fixed the issue you are running into in the course of fixing https://github.com/awakesecurity/gRPC-haskell/pull/105
Thanks! Any thoughts about proper termination of gRPC subscription from inside of the handler?
From type signature I can't get any useful thoughts, because handler function returns IO () value
https://github.com/awakesecurity/gRPC-haskell/blob/a26497c82cd5b75a779a762aa9e7f3748a2526d6/src/Network/GRPC/HighLevel/Client.hs#L75
So I have no idea how I do terminate stream from the handler if it gets some corrupted data
@tim2CF: I'm not sure, but have you tried manually terminating by not executing any more IO actions within the handler? In other words, in the event of some sort of corruption you stop reading data?
If you need something a bit more high-level you could do something like this:
https://hackage.haskell.org/package/break-1.0.2/docs/Control-Break.html
If I'm not calling recursively stream handler - subscription will just terminate?
The bad thing about IO () type is that there is no clear mechanism how to propagate the reason of termination outside. For now I'm using Control.Exception for this - throw exception from inside of subscription handler
https://github.com/coingaming/lnd-client/blob/b5e29ce9218b65b692afe706705964f739206c98/src/LndClient/RPC.hs#L441-L442
And catching if from outside in level of subscription
https://github.com/coingaming/lnd-client/blob/b5e29ce9218b65b692afe706705964f739206c98/src/LndClient/RPC.hs#L349
It works, so seems like not an issue at all, I just was confused a bit because I'm not using exceptions in Haskell in everyday basis. But thanks for reply!
@tim2CF: My understanding is that subscription will terminate when the callback you supply to ClientReaderRequest terminates
That said, I think you should be able to safely throw an exception inside of the handler. If that causes issues then we can always look into that
Thanks!
@tim2CF: You're welcome! 🙂
Hi again, @Gabriel439! Is it possible that gRPC subscription client is still non-cancellable from the client side with cancel function? I recently did some improvements on my library, trying to use withAsync + link instead of async + link for more clean runtime, but found that this test is hanging exactly in place where Async is trying to be cancelled. Seems like using simple cancel like here https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/test/LndClient/TestApp.hs#L196-L209 or better withAsync version like here https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/test/LndClient/TestApp.hs#L210-L224 makes no difference, this line is never reached https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/test/LndClient/TestApp.hs#L210-L224
As I remember, you was talking about cancellation from the server side, but here is example how I'm trying to cancel client async subscription from client side as well. I'm using latest versions or your gRPC package with previous fix already included, but for some reason it's still not working.
In my examples I'm using my own spawnLink and withSpawnLink, but they are just some generic wrappers around async + link and withAsync + link https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/src/LndClient/Util.hs#L66-L81
I don't think this difference can cause problems, I think it will work the same way with original function from Async package.
As additional info I can say that seems like process spawned like this is not cancellable. Can it be true?
-- spawning grpc subscription in new async thread
pid <- async $ withGRPCClient config $ \client -> do
blablabla
-- this seems not working until async thread is exited
cancel pid
I did compiled grpc-haskell with debug flag and found that subscription thread is hanging after log
[ThreadId 35]: runOps: allocated op contexts: [OpRecvMessageContext
0x000
07f9bc8000b70]
[ThreadId 35]: runOps: tag: Tag {unTag = 0x8000000000000012}
[ThreadId 35]: startBatch: calling grpc_call_start_batch with
pointers: C
all 0x00007f9bc40238a0 OpArray 0x00007f9bb0001300
[ThreadId 35]: startBatch: grpc_call_start_batch call returned.
[ThreadId 35]: runOps: called start_batch. callError: Right ()
[ThreadId 35]: pluck: called with tag=Tag {unTag =
0x8000000000000012},mw
ait=Nothing
[ThreadId 35]: pluck: blocking on grpc_completion_queue_pluck for
tag=Tag
{unTag = 0x8000000000000012}
and then it's hanging in cancel line of high-level code I described before, and after significant timeout (couple of minutes) it resumes with logs
^[N^[[ThreadId 35]: pluck finished: Event {eventCompletionType =
OpComple
te, eventSuccess = True, eventTag = Tag {unTag = 0x8000000000000012}}
C wrapper: timespec_destroy: freeing ptr: 0x7f9bb0000e90
[ThreadId 35]: runOps: pluck returned Right ()
[ThreadId 35]: runOps: got good op; starting.
[ThreadId 35]: resultFromOpContext: OpRecvMessageContext
C wrapper: free_slice: freeing ptr: 0x7f9bb0000ed0
C wrapper: byte_buffer_reader_destroy: freeing ptr: 0x7f9bb0000e90
[ThreadId 35]: resultFromOpContext: bb copied: "\n\ENQHELLO\SUB
and then finally shutting down/cancelling (without delay)
[ThreadId 35]: debugCall: client call: 0x00007f9bc40238a0
[ThreadId 35]: withClientCall(R): destroying.
[ThreadId 35]: Destroying client-side call object.
[ThreadId 35]: withClient: destroying.
[ThreadId 35]: destroyClient: calling grpc_channel_destroy()
[ThreadId 35]: destroyClient: shutting down CQ.
[ThreadId 35]: drainLoop: before next() call
C wrapper: timespec_destroy: freeing ptr: 0x7f9bc40122b0
[ThreadId 35]: drainLoop: next() call got Event {eventCompletionType
= Qu
eueShutdown, eventSuccess = False, eventTag = Tag {unTag =
0xe573bccf5e72
7f00}}
[ThreadId 35]: Got CQ loop shutdown result of: Just ()
[ThreadId 35]: withGRPC: shutting down
[2021-01-29 19:54:07][LndClient][Info][d7fe5ea939ca][PID
29037][ThreadId
26][main:LndClient.TestApp test/LndClient/TestApp.hs:226:7]
BOOOOOOOMMMMM
MMMMMMMMMMM
I hope this can help you to understand what's going on there, and why process is not getting cancelled immediately (I still thinking it can be fault on my side where I'm using gRPC library) @Gabriel439
A took a look to gRPC-haskell source code a bit, and according my issue with client long subscription cancellation and logs produced in debug mode, it might be because of this mask_ function application. I'm really noob in low level FFA questions, but this place might cause this https://github.com/awakesecurity/gRPC-haskell/blob/641f0bab046f2f03e5350a7c5f2044af1e19a5b1/core/src/Network/GRPC/LowLevel/Op.hs#L218
What do you think?
@tim2CF: Yes, generally use of mask_ is an anti-pattern and likely a source of thread not gracefully handling exceptions. I will take a look at this soon
Cool, thanks Gabriel! Sorry for the spam. This library seems the only one complete gRPC library for Haskell and I had no idea how to fix the issue.
fwiw, I added the mask_ in https://github.com/awakesecurity/gRPC-haskell/pull/89. Fully agree, that it’s not great but it does workaround an actual issue. I’d be very happy to see a better solution here.
@cocreature: Actually, it's not clear to me why the mask_ is present. I've read the associated comment a few times and I still don't understand the sequence of events that lead to the bug that the mask_ is intended to prevent. In particular, I don't understand how gRPC can write to the freed ByteBuffer if there is an exception thrown in the middle of startBatch