grpc-go
grpc-go copied to clipboard
Feature Request: Add support for custom transport (in-process, wasm)
Say I want, for example
- Developing a frontend and a backend as separate logical services, but deploy them in one binary. For traditional systems, having both run in a single binary will just make deployment, installation and running it much simpler. But I also want to have the option to easily split them apart for more complex usecases.
- Use an embedded database (like leveldb), but I want, for debugging, to be able to peek and poke at it too. I can't just open it twice (it uses locking to prevent that). So instead I want to expose it as a grpc service for debugging. To reuse all my application-level code to interpret the raw bytes in the db, however, I also want that to only use the exposed grpc service. The service can then just open a leveldb, wrap it in a service and use that to talk to the database, whereas the debugger can connect to the service and use that client.
So, I want to be able to implement a FooServer and then connect to it from the same process. Of course, I could just listen on localhost and connect to that or something like that, but then I'd pay the penalty of serializing and deserializing everything and running the bytes through the kernel (which is significant when it's in the path of talking to your database, for example).
Instead, it would be cool if grpc allowed me to get a "local" connection, like func LocalPair() (*grpc.Client, *grpc.Server), which doesn't use a network at all and just directly passes the proto.Messages around.
I'd be willing to try to implement that myself, but first I wanted to ask if this is a use case you'd be willing to support.
This can be done by providing a custom dialer (WithDialer) to the client and a custom listener to the server.
In this way, you can specify the net.Conn connecting client and server to be a wrapper of something in memory.
Thanks for replying. However, even a loopback-connection will still require the serialization and de-serialization part of all the requests and metadata. That's kind of a waste if we already have the correctly typed things at either end.
So this feature request is specifically about bypassing all of that. I haven't yet found a way to achieve that. For example, even if I were to try to wrap it myself and use a codegen, I can't actually inspect the passed CallOptions (and thus any sent metadata, which makes it kind of a no-starter) due to the way it's set up. So, from what I can tell, support for this would need to come from the grpc package itself.
To bypass all of these overhead, we need an in-process-transport impl (in addition to http2Client and http2Server transport) in the transport package and the corresponding custom codec, listener, etc.. We did think about this previously but this is not a trivial change and we do not have enough hands covering this now unfortunately.
As an alternative, I think if you can add a wrapper on the top of the grpc generated code to switch on in-process and normal case you should probably achieve this without any changes in grpc library (I have not thought through all the detail and could be wrong though).
This would be really useful for @yugui and grpc-gateway use case.
Added an enhancement label to this, but the work is not prioritized at the moment. To flesh this out further, it may be best to submit a proposal that discusses a few implementation options as a language specific gRFC
It seems that this is already available for grpc development in Java:
http://www.grpc.io/grpc-java/javadoc/io/grpc/inprocess/package-summary.html https://github.com/grpc/grpc-java/tree/master/core/src/main/java/io/grpc/inprocess https://github.com/grpc/grpc-java/tree/master/core/src/test/java/io/grpc/inprocess
In the .NET world, this is something to be expected. OWIN does it really well and allows you to build HTTP-enabled components and host them anywhere. If you host it in-process, all the requests happen completely in-memory.
@iamqizhao wrote:
As an alternative, I think if you can add a wrapper on the top of the grpc generated code to switch on in-process and normal case you should probably achieve this without any changes in grpc library
Unfortunately, this is not necessarily the case. The main issue for an in-process implementation is that the CallOption stuff is totally opaque. So, even were there a way to communicate headers and trailers in-process, there is no way code can actually interact with these options -- at least not without forking grpc-go and adding said code to that package so it can interact with these un-exported types.
I brought this up on the mailing list some time ago: https://groups.google.com/d/msg/grpc-io/NOfh5ESgnyc/RgDJe5g0EgAJ
I'm now re-visisting this issue because I've hit it again with something else I'm trying to do: a client interceptor to do client-side retries that can support "hedging". With hedging, the interceptor may issue a retry before the prior attempt completes -- triggered by a timeout vs. a failure. But header/trailer metadata for only the last/successful attempt should be used for the header and trailer call options. Since these types are totally opaque, it isn't possible for an interceptor to do what it needs to do. And if it just passes along the options, without any changes, to multiple concurrent attempts, it is both non-deterministic as well as unsafe/racy regarding how the client-provided metadata addresses will get set.
FWIW, retries and hedging are coming to gRPC-Go natively in a month or two. Relevant gRFC.
Regarding the initial issue, we have since created the bufconn package to at least bypass the network stack (message are still [de]serialized and everything goes through the http2 transport, but this should help with overhead somewhat). Otherwise, the team still doesn't have the bandwidth to take on an in-process transport any time soon.
Otherwise, the team still doesn't have the bandwidth to take on an in-process transport any time soon.
It would be nice if the API were at least amenable to a 3rd party library supplying this. Unfortunately, it is currently not due to #1495.
Any updates on this?
/cc @stevvooe @crosbymichael I don't think bufconn is enough for us. We can only wrap all the grpc services now.
There are still a couple of issues that prevent a 3rd-party package from providing an in-process channel: #1495 and #1802,
However, that hasn't stopped me! Take a look at these: https://godoc.org/github.com/fullstorydev/grpchan https://godoc.org/github.com/fullstorydev/grpchan/inprocgrpc
The above issues do represent shortcomings though. They basically mean that call options are ignored by custom channel implementations (which means you can't get response headers and trailers from a unary RPC). And there is a similar issue on the server side: you cannot set response headers or trailers from a unary RPC implementation.
@Random-Liu, I don't really understand why you can't use bufconn though. It is similar enough in concept and construction that if you can't use it, you may not be able to use inprocgrpc either. The main advantage of inprocgrpc is that it doesn't incur serialization/de-serialization overhead or deal with HTTP/2 framing to wire the client up to the server. It uses a channel with a limited buffer size to achieve back-pressure, instead of using flow control windows in the HTTP/2 protocol.
@Random-Liu,
Any updates on this?
We were literally talking about this yesterday, so we haven't forgotten about it. However, it's still not at the top of our priority list. It will probably be another 6 months before we can get to it. Our other priorities right now are channelz, performance, and retry. It would be a fairly meaty project if an outside contributor wanted to take it on, but we would be able to advise.
The main advantage of
inprocgrpcis that it doesn't incur serialization/de-serialization overhead
Skipping serialization/deserialization seems dangerous in Go, because proto messages are mutable. This means that if the server modifies the request message, then the client would see the result of that modification. (The same is true for streaming in either direction.) This shouldn't typically be happening, but it's a notable difference between a real server and inprocgrpc.
This means that if the server modifies the request message, then the client would see the result of that modification
No, this does not happen. The library avoids serialization/de-serialization, but it does copy.
I don't really understand why you can't use bufconn though.
@jhump Because we may also want to get rid of the serialization/de-serialization, :)
With https://godoc.org/github.com/fullstorydev/grpchan/inprocgrpc, can I make the grpc server serve both remote requests and inproc requests at the same time?
The library avoids serialization/de-serialization, but it does copy.
Aha, I see now. So you do a proto.Clone() for proto messages, and a best-effort shallow copy otherwise (ref). FWIW, it may be better (if possible) to serialize and deserialize using the configured codec in the fallback case, in order to avoid this potential problem. But I also agree with the comment that says the fallback path should basically never be exercised, as most people use proto with grpc.
can I make the grpc server serve both remote requests and inproc requests at the same time?
@Random-Liu, yes. Though it does not make the *grpc.Server serve both requests -- rather it makes the actual service implementation serve both -- one via *grpc.Server and its HTTP/2 transport; the other via in-process channel.
handlers := grpchan.HandlerMap{}
// if no interceptors, then can just use handlers directly as the registry,
// since it implements the right interface
reg := grpchan.WithInterceptor(handlers, unaryInterceptor, streamInterceptor)
// Now register services using reg, not a *grpc.Server
// NOTE: must use RegisterHandler<ServiceName>, which means using
// --grpchan_out option w/ protoc
generatedpkg.RegisterHandlerMyService(reg, &svcImpl{})
// Now you can use the registered services for both in-process and real server
real := grpc.NewServer() // no need for interceptor options! (handled above)
inproc := new(inprocgrpc.Channel)
handlers.ForEach(real.RegisterService)
handlers.ForEach(inproc.RegisterService)
it may be better (if possible) to serialize and deserialize using the configured codec in the fallback case
@dfawley, that's an interesting idea. But currently, an inprocgrpc.Channel does not have a configured codec. (That's a grpc.ServerOption, for normal HTTP/2 servers.) That is something I will happily add if the library gets users asking for it (i.e. using grpchan but not using protobuf).
@jhump Nice. I'll look into it a bit more.
Our use case is that containerd has a official grpc api. And containerd has a plugin for Kubernetes support is called cri-containerd which talks with containerd grpc api.
Previously cri-containerd is a separate process, but recently we decided to merge it into containerd process to get rid of the grpc overhead, but still integrate with the official grpc api. That is why we need the inproc grpc.
FWIW, in the next quarter, I plan to redesign the interface/abstractions between the grpc & transport packages to make it possible for custom transports to be implemented.
@dfawley I would like to thank you for keeping this going forward. Much appreciated and keep up the good work!
Would this unblock gRPC over HTTP 1.1? Really, what we want is grpc-web for golang clients. Please let me know if there is a separate space where I should bring this up.
@Daniel-B-Smith yes, I believe it would unblock any efforts to implement that (with the caveat that the core grpc-go team is not committed to implementing it). I'll have an update on the progress of this issue shortly...
I am far enough along with a prototype that I'm ready to share some details on this effort. Please feel free to review and provide feedback.
First, some of my starting goals:
- Should encapsulate gRPC semantics (hide HTTP/2 semantics and grpc wire protocol semantics).
- Transports should be message-based, not byte-based, to facilitate an in-process transport that uses proto.Clone() for efficiency.
- Note that retry support requires byte-based support from all transports, because we need to cache multiple sent messages, which our streaming API does not make possible without serialization.
- Unexpected asynchronous events should be delivered by callbacks, not channels.
- Channels may be more idiomatic for Go, but require a goroutine to monitor them, which leads to worse performance (in the case of streams) and more complex code (see #2219 which refactors our transport monitoring).
- Sending and receiving data should remain synchronous to mirror grpc's external API.
Dialers (Client) andListeners (Server) should return gRPC transports, notnet.Conns. How to connect (e.g.net.Dial) is an implementation detail enacapsulated in the transport. Likewise, handshaking happens internally toDialers andListeners.
On to the API. Disclaimer: I have PoC-implemented only the client-side of this interface so far. However, a client is fully implemented and passes all tests, so this API is sufficient for all our current needs.
Package structure (under google.golang.org/grpc/):
transport: Definition of types shared between client and server
transport/client: Definition of client-side transport and stream
transport/server: Definition of server-side transport and stream
package transport
// OutgoingMessage is a message to be sent by gRPC.
type OutgoingMessage interface {
// Marshal marshals to a byte buffer and returns information about the
// encoding, or an error. Repeated calls to Marshal must always return the
// same values.
Marshal() ([]byte, *MessageInfo, error)
}
// OutgoingMessage is a message to be received by gRPC.
type IncomingMessage interface {
// Unmarshal unmarshals from the scatter-gather byte buffer given.
Unmarshal([][]byte, *MessageInfo) error
}
// MessageInfo contains details about how a message is encoded.
type MessageInfo struct {
// Encoding is the message's content-type encoding.
Encoding string
// Compressor is the compressor's name or the empty string if compression
// is disabled.
Compressor string
}
package client
// TransportBuilder constructs Transports connected to addresses.
type TransportBuilder interface {
// Build begins connecting to the address. It must return a Transport that
// is ready to accept new streams or an error.
Build(context.Context, resolver.Address, TransportMonitor, TransportBuildOptions) (Transport, error)
// TransportBuildOptions, elided, is a struct with an []interface{} for opaque,
// custom-transport options and possibly declared fields with known-common options.
}
// A Transport is a client-side gRPC transport. It begins in a
// "half-connected" state where the client may opportunistically start new
// streams by calling NewStream. Some clients will wait until the
// TransportMonitor's Connected method is called.
type Transport interface {
// NewStream begins a new Stream on the Transport. Blocks until sufficient
// stream quota is available, if applicable. If the Transport is closed,
// returns an error.
NewStream(context.Context, Header) (Stream, error)
// GracefulClose closes the Transport. Outstanding and pending Streams
// created by NewStream continue uninterrupted and this function blocks
// until the Streams are finished and the transport has closed.
// Close may be called concurrently.
GracefulClose()
// Close closes the Transport. Outstanding and pending Streams created by
// NewStream are canceled.
Close()
// Info returns information about the transport's current state.
Info() TransportInfo
}
// TransportInfo contains information about the transport's current state. All
// information is optional.
type TransportInfo struct {
// RemoteAddr is the address of the server (typically an IP/port).
RemoteAddr net.Addr
IsSecure bool // if set, WithInsecure is not required and Per-RPC Credentials are allowed.
AuthInfo credentials.AuthInfo
}
// A TransportMonitor is a monitor for client-side transports.
type TransportMonitor interface {
// Connected reports that the Transport is fully connected - i.e. the
// remote server has confirmed it is a gRPC server.
//
// May only be called once.
Connected()
// OnError reports that the Transport has closed due to the error provided.
// Existing streams may or may not continue, but new streams may not be
// created. When all existing streams have completed, the Transport will
// fully close.
//
// Once called, no further calls in the TransportMonitor are valid.
OnError(error)
}
// Stream is a client-side streaming RPC
type Stream interface {
// SendMsg queues the message m to be sent by the Stream and returns true
// unless the Stream terminates before m can be queued. May not wait for m
// to be sent. May not be called simultaneously from multiple goroutines.
SendMsg(m transport.OutgoingMessage, opts StreamSendMsgOptions) bool
// RecvHeader blocks until the Stream receives the server's header and then
// returns it. Returns nil if the Stream terminated without a valid
// header. Repeated calls will return the same header.
RecvHeader() *ServerHeader
// RecvMsg receives the next message on the Stream into m and returns true
// unless the Stream terminates before a full message is received. May not
// be called simultaneously from multiple goroutines.
RecvMsg(m transport.IncomingMessage) bool
// RecvTrailer blocks until the Stream receives the server's trailer and
// then returns it. Returns a synthesized trailer containing an
// appropriate status if the RPC terminates before receiving a trailer from
// the server. Repeated calls will return the same trailer.
//
// If all messages have not been retrieved from the stream before calling
// RecvTrailer, subsequent calls to RecvMsg should immediately fail. This
// is to prevent the status of the RPC from changing as a result of parsing
// the messages.
RecvTrailer() Trailer
// Cancel unconditionally cancels the RPC. Any pending SendMsg and RecvMsg
// calls should become unblocked. Queued messages may not be sent. If the
// stream does not already have a status, the one provided (which must be
// non-nil) is used.
Cancel(*status.Status)
// Info returns information about the stream's current state.
Info() StreamInfo
}
// ServerHeader contains header data sent by the server.
type ServerHeader struct {
Metadata metadata.MD
}
// StreamInfo contains information about the stream's current state.
//
// All information is optional. Fields not supported by a Stream returning
// this struct should be nil. If a transport does not support all features,
// some grpc features (e.g. transparent retry or grpclb load reporting) may not
// work properly.
type StreamInfo struct {
// BytesReceived is true iff the client received any data for this stream
// from the server (e.g. partial header bytes), false if the stream ended
// without receiving any data, or nil if data may still be received.
BytesReceived *bool
// Unprocessed is true if the server has confirmed the application did not
// process this stream*, false if the server sent a response indicating the
// application may have processed the stream, or nil if it is uncertain.
//
// *: In HTTP/2, this is true if a RST_STREAM with REFUSED_STREAM is
// received or if a GOAWAY including this stream's ID is received.
Unprocessed *bool
}
package server
type TransportListener interface {
// Accept blocks until a new Stream is created. grpc calls this until
// error != nil.
Accept() (Stream, error)
// GracefulClose causes the TransportListener to stop accepting new
// incoming streams, and returns when all outstanding streams have
// completed.
GracefulClose()
// Close immediately closes all outstanding connections and streams.
Close()
}
type Header struct {
MethodName string
}
type Trailer struct {
Status status.Status
Metadata metadata.MD
}
type Stream interface {
// Implementation details TBD; expected to be similar in structure to client.Stream.
}
/cc @johanbrandhorst @jhump
An important detail I left out is how to inject these custom transports.
Server-side, the story is simple: instead of giving grpc a net.Conn-based Listener, you would pass it a server.TransportListener that spits out server.Streams instead of net.Conns. That TransportListener would be responsible for managing the underlying connections.
Client-side, it's a bit more complicated: either the resolvers or just the schemes (TBD) will indirectly determine which transport to use. I.e. the "dns" scheme/resolver would output "tcp" addresses that by default use our default transport layer, whereas the "inproc" scheme/resolver (not yet implemented) would output "inproc" addresses that would use our inproc transport. You could register (per-ClientConn or globally, also TBD) a map from scheme/address type to client.TransportBuilder that determines which transport to use for those addresses.
What timeframe are you thinking of for implementing this? It looks comprehensive and while it does add another 2 layers for the transports and resolvers I really do like to see this done.
@annismckenzie I'm targeting something in the next month or so. I am taking a couple weeks off soon, but hopefully I can wrap things up pretty quickly when I return since I already have a working prototype.
I'm not sure what you mean about adding "another 2 layers". This is basically just drawing a more consistent line between our existing, hard-coded HTTP/2 transport and the grpc package, and making it pluggable.
So sorry, I didn’t mean to be condescending or plain wrong in that assertion – thanks for all the hard work!
On Aug 1, 2018, at 12:46 AM, dfawley [email protected] wrote:
@annismckenzie I'm targeting something in the next month or so. I am taking a couple weeks off soon, but hopefully I can wrap things up pretty quickly when I return since I already have a working prototype.
I'm not sure what you mean about adding "another 2 layers". This is basically just drawing a more consistent line between our existing, hard-coded HTTP/2 transport and the grpc package, and making it pluggable.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.
@annismckenzie no worries, I am just trying to understand your concerns with the design, if you have any. I'm hopeful a few folks here will at least briefly look it over to let me know if there are any problems with it or improvements that can be made. Thanks!
I would like to be able to create a GRPC Client and have it talk to my existing GRPC Server within the same process, without sending network traffic through loopback/localhost (without dialing).
I want to do this because I am hosting a GRPC Server and a GRPC Gateway in the same process (https://github.com/grpc-ecosystem/grpc-gateway). The gateway requires a GRPC Client, which means that the GRPC Gateway uses the GRPC Client to send traffic on localhost loopback to the GRPC Server.
For example, I would like something roughly like this:
// GRPC Server
grpcServer := grpc.NewServer()
pb.RegisterPixelServer(grpcServer, myService)
// Serve and listen in a goroutine...
// GRPC Gateway
mux := runtime.NewServeMux()
grpcClient := pb.NewClientAdaptor(grpcServer, myService)
err := gw.RegisterPixelHandlerClient(ctx, mux, grpcClient)
// Serve up the gateway mux in a goroutine...
I realize that it is possible to manually make myService satisfy both MyServiceServer and MyServiceClient interfaces on a generated golang proto file.
However doing so would mean that grpc.CallOption's would get ignored, and I would also lose out on grpc's interceptors and stats handling, and other things that the grpc client and server are doing that would be bypassed by handling it directly.