grpc
grpc copied to clipboard
Idiomatic way to stop server side streaming
Describe the question Hello I am trying to understand what is the best way to notify the server that my GRPC client doesn't want to receive server stream messages anymore. Similar question was raised in grpc-go client: https://github.com/grpc/grpc-go/issues/990 I don't see any functions in GRPC.Stub module that allow to cancel a server stream, so the only way I see is reconnecting the client each time I want to 'unsubscribe' from server stream.
Versions:
- OS: macOS
- Erlang: 22
- Elixir: 1.10.2
- mix.lock(grpc, gun, cowboy, cowlib):
%{
"cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},
"cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"},
"grpc": {:hex, :grpc, "0.5.0-beta.1", "7d43f52e138fe261f5b4981f1ada515dfc2e1bfa9dc92c7022e8f41e7e49b571", [:mix], [{:cowboy, "~> 2.7.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:gun, "~> 2.0.0", [hex: :grpc_gun, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.5", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "fbbf8872935c295b7575435fe4128372c23c6ded89c2ef8058af3c6167bb3f65"},
"gun": {:hex, :grpc_gun, "2.0.0", "f99678a2ab975e74372a756c86ec30a8384d3ac8a8b86c7ed6243ef4e61d2729", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "03dbbca1a9c604a0267a40ea1d69986225091acb822de0b2dbea21d5815e410b"},
}
In the http2 spec there are two stream states in which neither endpoint should send any more DATA frames:
- half-closed; and
- closed
The primary difference being that in the half-closed state, either end-point is still permitted to send WINDOW_UPDATE frames. See https://tools.ietf.org/html/rfc7540#section-5.1 for details.
According to the documentation for GRPC.Stub.end_stream/1 (see https://github.com/elixir-grpc/grpc/blob/master/lib/grpc/stub.ex#L341), this function should cause the client to send a frame with the END_STREAM flag set, which should cause the stream to transition into the half-closed state. As described in the RFC, a well-behaved server would then stop sending DATA frames. Assuming you are talking about bidirectional streaming, this should do what you want. If you are asking about changing the state of a separate stream, I don't think gRPC (or http2) provide for it.
Thanks for the quick response!
But as I see this function accepts GRPC.Client.Stream.t(), and normally one would have only GRPC.Channel.t().
Also I've checked the code for end_stream, it will result in call to GRPC.Adapter.Gun.end_stream/1, that expects stream payload to have stream_ref field set.
Would it possible to construct proper GRPC.Client.Stream.t() from this code example?
{:ok, chan} = GRPC.Stub.connect(server_url, opts)
# subscribe_invoices is a server stream RPC
{:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new())
for invoice <- stream do
IO.inspect(invoice)
## here I would end the subscription after 1st event
end
Ok, so it's impossible to do this on client if it's a unidirectional server stream.
I've found similar questions on GitHub/stackoverflow for other languages, for example: https://github.com/grpc/grpc/issues/8023#issuecomment-245402978
The suggestion is to call cancel function to cancel the rpc call. But in elixir lib this function also expects a GRPC.Client.Stream.t().
As you can see I've raised a question in lightningnetwork/lnd repo and it seems that Golang and python implementations allow to cancel unary streams from the client. Would it be possible to implement something similar in elixir library?
Thanks for the quick response! But as I see this function accepts
GRPC.Client.Stream.t(), and normally one would have onlyGRPC.Channel.t(). Also I've checked the code forend_stream, it will result in call toGRPC.Adapter.Gun.end_stream/1, that expects stream payload to havestream_reffield set. Would it possible to construct properGRPC.Client.Stream.t()from this code example?{:ok, chan} = GRPC.Stub.connect(server_url, opts) # subscribe_invoices is a server stream RPC {:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new()) for invoice <- stream do IO.inspect(invoice) ## here I would end the subscription after 1st event end
Try:
opts = [end_stream: true]
GRPC.Stub.send_request(stream, note, opts)
@sleipnir Your solution is for streaming requests. In this case it's a streaming response and Stub request returns an elixir stream (#Function<63.104660160/2 in Stream.unfold/2>) and not GRPC.Client.Stream.t()
@ischepin have you found a solution? I am also trying to cancel LND gRPC streams in Elixir once my client got what it needed.
Does anyone here want to attempt a solution for this issue?
@ischepin this is some attempt to help.
i have clients disconnecting.
{:ok, chan} = GRPC.Stub.connect(server_url, opts)
# subscribe_invoices is a server stream RPC
{:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new())
Enum.reduce_while(stream, {}, fn invoice, acc ->
IO.inspect(invoice)
...
## halt whenever you like {:halt, acc) or continue {:cont, acc}
end
To detect server side that the client is gone, I use the fact that the spawned gets the following message (if i remember correct these are originating from cowboy). This is where it would be convenient with an alternative solution, anyways - this is currently how i deal with disconnecting clients.
receive do
:shutdown -> Logger.debug("End of stream")
end
Hope this helps, and you aren't looking for something completely different.
I think we need a better way to detect this, because this depends on the adapter used. Perhaps we can add a telemetry event to which users can listen. It might even be the case that #229 ends up resolving this
@AleksandarFilipov I have a similar problem. I want to find out when a client loses its connection to log this information for later usage. At which point does your mentioned code line have to be included?
receive do :shutdown -> Logger.debug("End of stream") end
Maybe we should add a specific telemetry event for this?
@ischepin this is some attempt to help.
i have clients disconnecting.
{:ok, chan} = GRPC.Stub.connect(server_url, opts) # subscribe_invoices is a server stream RPC {:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new()) Enum.reduce_while(stream, {}, fn invoice, acc -> IO.inspect(invoice) ... ## halt whenever you like {:halt, acc) or continue {:cont, acc} endTo detect server side that the client is gone, I use the fact that the spawned gets the following message (if i remember correct these are originating from cowboy). This is where it would be convenient with an alternative solution, anyways - this is currently how i deal with disconnecting clients.
receive do :shutdown -> Logger.debug("End of stream") endHope this helps, and you aren't looking for something completely different.
@AleksandarFilipov is your code correctly closing the stream and the connection underneath? Or does it simply stop reading from it and leave it hanging?
Maybe we should add a specific telemetry event for this?
@polvalente why mix client request/response logic and telemetry? Those are separate concerns.
@polvalente why mix client request/response logic and telemetry? Those are separate concerns.
Just a suggestion. I didn't perceive this as being just a client/server logic problem from the descriptions given.
Anyway, if there's a feature missing on the client-server communication workflow we should be able to add that. I'm happy to review and iterate on PRs on this.
IMO the api should not return an elixir stream but instead some wrapper struct that can be used to call cancel similarly to what pyton or golang libraries allow.
Perhaps we could introduce something close to the Client.Stream struct that plays into client streaming. Then we could pass either one of those to GRPC.Stub.cancel
Furthermore, the struct itself could implement the Enumerable protocol so that the change is mostly retrocompatible. Thoughts?
@AleksandarFilipov is your code correctly closing the stream and the connection underneath? Or does it simply stop reading from it and leave it hanging?
@lukaszsamson looks like it closes the stream properly, haven't had any issues with it.