drpc
drpc copied to clipboard
Concurrent RPCs
A common issue that everyone will hit when using DRPC is how to handle concurrent RPCs. We should have some answer for this.
Some thoughts on the topic:
Connection pooling can be one such answer, that can even be used without modifying DRPC at all. But while connection pooling can be an effective technique, it is not always possible. I have a somewhat particular use case where an agent establishes a TCP connection to a server and offers RPC services that the server can then invoke. In other words, the TCP client is the RPC server and the TCP server is the RPC client. That means it's impossible for the RPC client to create new connections. (That is btw the reason I can't use gRPC at all and was very happy when I stumbled upon DRPC, which was extremely easy to hook up this way :))
A more integrated but also more complex solution is stream multiplexing, i.e. sending concurrent streams over a single transport. The wire format supports this already, because each frame contains a stream ID. Stream multiplexing is nice but brings its own set of problems (e.g. head-of-line blocking). For any kind of stream (server, client or bidirectional), some sort of back pressure mechanism is probably required to prevent the remote from being flooded with messages if it can't keep up - which would negatively impact or even kill other streams using the same connection. This is not an issue with the current non-multiplexing behavior, when a transport with its own pressure mechanism is used (e.g. TCP).
Connection pooling can be one such answer, that can even be used without modifying DRPC at all. But while connection pooling can be an effective technique, it is not always possible. I have a somewhat particular use case where an agent establishes a TCP connection to a server and offers RPC services that the server can then invoke. In other words, the TCP client is the RPC server and the TCP server is the RPC client. That means it's impossible for the RPC client to create new connections.
Connection pooling is currently the answer that Storj uses (https://pkg.go.dev/storj.io/common/rpc/rpcpool) and works well. I think the "reverse connection" use case may be niche enough that an in-library answer need not apply to it. I totally didn't anticipate that, so it's great that it worked out :smile:.
Stream multiplexing is nice but brings its own set of problems (e.g. head-of-line blocking).
Yeah, this HoL blocking and buffering were the main problems I wanted to avoid by adding a multiplexing solution at the deepest layers. As you noticed, the wire format initially was meant to allow concurrent streams. At some point, I decided to remove concurrent stream support, and the readers now will error if received stream ids are not monotonically increasing. This greatly simplified the code to read and reassemble packets from the transport.
That said, multiplexing can be added on top of DRPC without changes as well by using something like https://pkg.go.dev/github.com/hashicorp/yamux. Or, if your transport is something like QUIC, you can open new streams from a single session.
In summary, I agree that connection pooling and multiplexing are the two main ways of solving this.
Some scattered thoughts:
-
If we define an interface like
interface { NewStream() (drpc.Transport, error) }we can make adrpcmulticonn(all names are strawmen) package that takes that and creates a new stream for every RPC. Then you can write a wrapper around yamux or your QUIC conn or whatever. -
There can be a
drpcpoolconnpackage that exports afunc New(dial func() (drpc.Transport, error)) drpc.Connand calls and caches the results fromdial(this is nearly the strategy Storj takes right now, except this would allow multiple pools that dial to the same destination). -
There can be a
drpcpoolpackage that exports afunc (p *Pool) Get(key interface{}, dial func() (drpc.Transport, error)) (drpc.Conn, error)(this is the strategy Storj takes) -
I think
drpcpoolconnis basically equivalent todrpcmulticonnin the sense that you can transform them into each other:drpcmulticonn.New(streams)=>drpcpoolconn.New(streams.NewStream)drpcpoolconn.New(dial)=>drpcmulticonn.New(streamsFunc(dial))(streamsFunc implements the interface)- The only difference would be that
drpcmulticonnis effectively adrpcpoolconnwith cache size of 0.
-
I think
drpcpoolis more general thandrpcpoolconndrpcpoolconn.New(dial)=>drpcpool.New().Get(nil, dial)- The reverse transformation would require some effort.
- Again,
drpcpoolconncan be thought of as using adrpcpoolwith cache size 0.
-
It seems to me the most common case that people will hit, especially when migrating from gRPC, will be adding concurrency to a system that uses basic TCP sockets, and I think that implies some sort of connection pooling to avoid a dial for every RPC.
hey hi @zeebo, first of all DRPC is really cool :) thanks for sharing. I recently wrote a library of utilities to use it on production services. The package includes basic things like:
- Simplified TLS setup (for client and server)
- Server middleware, including basic components for logging, token-based auth, rate limit, panic recovery, etc
- Client middleware, including basic components for logging, custom metadata, panic recovery, etc
- Bi-directional streaming support over upgraded HTTP(S) connections using WebSockets
- Concurrent RPCs via connection pool
I was wondering if it's ok to add the link here (or somewhere else) for people looking for that kind of thing 😄 not trying to do any form of shameless plug so feel free to remove this message if it's not appropriate to do so 👍🏼
- https://pkg.go.dev/go.bryk.io/pkg/net/drpc
- https://github.com/bryk-io/pkg
Awesome! I've added your description and a link to the package to the README. Let me know if it doesn't look right.