grpc icon indicating copy to clipboard operation
grpc copied to clipboard

Idiomatic way to stop server side streaming

Open ischepin opened this issue 5 years ago • 17 comments

Describe the question Hello I am trying to understand what is the best way to notify the server that my GRPC client doesn't want to receive server stream messages anymore. Similar question was raised in grpc-go client: https://github.com/grpc/grpc-go/issues/990 I don't see any functions in GRPC.Stub module that allow to cancel a server stream, so the only way I see is reconnecting the client each time I want to 'unsubscribe' from server stream.

Versions:

  • OS: macOS
  • Erlang: 22
  • Elixir: 1.10.2
  • mix.lock(grpc, gun, cowboy, cowlib):
%{
  "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},
  "cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"},
  "grpc": {:hex, :grpc, "0.5.0-beta.1", "7d43f52e138fe261f5b4981f1ada515dfc2e1bfa9dc92c7022e8f41e7e49b571", [:mix], [{:cowboy, "~> 2.7.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:gun, "~> 2.0.0", [hex: :grpc_gun, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.5", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "fbbf8872935c295b7575435fe4128372c23c6ded89c2ef8058af3c6167bb3f65"},
  "gun": {:hex, :grpc_gun, "2.0.0", "f99678a2ab975e74372a756c86ec30a8384d3ac8a8b86c7ed6243ef4e61d2729", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "03dbbca1a9c604a0267a40ea1d69986225091acb822de0b2dbea21d5815e410b"},
}

ischepin avatar Oct 27 '20 14:10 ischepin

In the http2 spec there are two stream states in which neither endpoint should send any more DATA frames:

  1. half-closed; and
  2. closed

The primary difference being that in the half-closed state, either end-point is still permitted to send WINDOW_UPDATE frames. See https://tools.ietf.org/html/rfc7540#section-5.1 for details.

According to the documentation for GRPC.Stub.end_stream/1 (see https://github.com/elixir-grpc/grpc/blob/master/lib/grpc/stub.ex#L341), this function should cause the client to send a frame with the END_STREAM flag set, which should cause the stream to transition into the half-closed state. As described in the RFC, a well-behaved server would then stop sending DATA frames. Assuming you are talking about bidirectional streaming, this should do what you want. If you are asking about changing the state of a separate stream, I don't think gRPC (or http2) provide for it.

ljmarks avatar Oct 27 '20 14:10 ljmarks

Thanks for the quick response! But as I see this function accepts GRPC.Client.Stream.t(), and normally one would have only GRPC.Channel.t(). Also I've checked the code for end_stream, it will result in call to GRPC.Adapter.Gun.end_stream/1, that expects stream payload to have stream_ref field set. Would it possible to construct proper GRPC.Client.Stream.t() from this code example?

{:ok, chan} = GRPC.Stub.connect(server_url, opts)
# subscribe_invoices is a server stream RPC
{:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new())
for invoice <- stream do
  IO.inspect(invoice)
  ## here I would end the subscription after 1st event
end

ischepin avatar Oct 27 '20 15:10 ischepin

Ok, so it's impossible to do this on client if it's a unidirectional server stream. I've found similar questions on GitHub/stackoverflow for other languages, for example: https://github.com/grpc/grpc/issues/8023#issuecomment-245402978 The suggestion is to call cancel function to cancel the rpc call. But in elixir lib this function also expects a GRPC.Client.Stream.t().

ischepin avatar Oct 28 '20 08:10 ischepin

As you can see I've raised a question in lightningnetwork/lnd repo and it seems that Golang and python implementations allow to cancel unary streams from the client. Would it be possible to implement something similar in elixir library?

ischepin avatar Oct 29 '20 10:10 ischepin

Thanks for the quick response! But as I see this function accepts GRPC.Client.Stream.t(), and normally one would have only GRPC.Channel.t(). Also I've checked the code for end_stream, it will result in call to GRPC.Adapter.Gun.end_stream/1, that expects stream payload to have stream_ref field set. Would it possible to construct proper GRPC.Client.Stream.t() from this code example?

{:ok, chan} = GRPC.Stub.connect(server_url, opts)
# subscribe_invoices is a server stream RPC
{:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new())
for invoice <- stream do
  IO.inspect(invoice)
  ## here I would end the subscription after 1st event
end

Try:

opts = [end_stream: true]
GRPC.Stub.send_request(stream, note, opts)

sleipnir avatar Feb 03 '21 00:02 sleipnir

@sleipnir Your solution is for streaming requests. In this case it's a streaming response and Stub request returns an elixir stream (#Function<63.104660160/2 in Stream.unfold/2>) and not GRPC.Client.Stream.t()

lukaszsamson avatar Apr 03 '21 17:04 lukaszsamson

@ischepin have you found a solution? I am also trying to cancel LND gRPC streams in Elixir once my client got what it needed.

RooSoft avatar May 28 '21 10:05 RooSoft

Does anyone here want to attempt a solution for this issue?

polvalente avatar Jul 18 '22 09:07 polvalente

@ischepin this is some attempt to help.

i have clients disconnecting.

{:ok, chan} = GRPC.Stub.connect(server_url, opts)
# subscribe_invoices is a server stream RPC
{:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new())
Enum.reduce_while(stream, {}, fn invoice, acc -> 
  IO.inspect(invoice)
  ...
  ## halt whenever you like {:halt, acc) or continue {:cont, acc}
end

To detect server side that the client is gone, I use the fact that the spawned gets the following message (if i remember correct these are originating from cowboy). This is where it would be convenient with an alternative solution, anyways - this is currently how i deal with disconnecting clients.

receive do
  :shutdown -> Logger.debug("End of stream")
end

Hope this helps, and you aren't looking for something completely different.

AleksandarFilipov avatar Jul 19 '22 21:07 AleksandarFilipov

I think we need a better way to detect this, because this depends on the adapter used. Perhaps we can add a telemetry event to which users can listen. It might even be the case that #229 ends up resolving this

polvalente avatar Jul 20 '22 15:07 polvalente

@AleksandarFilipov I have a similar problem. I want to find out when a client loses its connection to log this information for later usage. At which point does your mentioned code line have to be included?

receive do :shutdown -> Logger.debug("End of stream") end

tmthn avatar Jun 06 '23 20:06 tmthn

Maybe we should add a specific telemetry event for this?

polvalente avatar Jun 08 '23 09:06 polvalente

@ischepin this is some attempt to help.

i have clients disconnecting.

{:ok, chan} = GRPC.Stub.connect(server_url, opts)
# subscribe_invoices is a server stream RPC
{:ok, stream} = Lnrpc.Lightning.Stub.subscribe_invoices(chan, Lnrpc.InvoiceSubscription.new())
Enum.reduce_while(stream, {}, fn invoice, acc -> 
  IO.inspect(invoice)
  ...
  ## halt whenever you like {:halt, acc) or continue {:cont, acc}
end

To detect server side that the client is gone, I use the fact that the spawned gets the following message (if i remember correct these are originating from cowboy). This is where it would be convenient with an alternative solution, anyways - this is currently how i deal with disconnecting clients.

receive do
  :shutdown -> Logger.debug("End of stream")
end

Hope this helps, and you aren't looking for something completely different.

@AleksandarFilipov is your code correctly closing the stream and the connection underneath? Or does it simply stop reading from it and leave it hanging?

Maybe we should add a specific telemetry event for this?

@polvalente why mix client request/response logic and telemetry? Those are separate concerns.

lukaszsamson avatar Jun 08 '23 09:06 lukaszsamson

@polvalente why mix client request/response logic and telemetry? Those are separate concerns.

Just a suggestion. I didn't perceive this as being just a client/server logic problem from the descriptions given.

Anyway, if there's a feature missing on the client-server communication workflow we should be able to add that. I'm happy to review and iterate on PRs on this.

polvalente avatar Jun 08 '23 09:06 polvalente

IMO the api should not return an elixir stream but instead some wrapper struct that can be used to call cancel similarly to what pyton or golang libraries allow.

lukaszsamson avatar Jun 08 '23 09:06 lukaszsamson

Perhaps we could introduce something close to the Client.Stream struct that plays into client streaming. Then we could pass either one of those to GRPC.Stub.cancel

Furthermore, the struct itself could implement the Enumerable protocol so that the change is mostly retrocompatible. Thoughts?

polvalente avatar Jun 09 '23 06:06 polvalente

@AleksandarFilipov is your code correctly closing the stream and the connection underneath? Or does it simply stop reading from it and leave it hanging?

@lukaszsamson looks like it closes the stream properly, haven't had any issues with it.

AleksandarFilipov avatar Aug 08 '23 16:08 AleksandarFilipov