grpc-go icon indicating copy to clipboard operation
grpc-go copied to clipboard

Proposal: PrepareMsg API to more easily parallelize serialization

Open dfawley opened this issue 7 years ago • 6 comments

If the time to serialize and compress a message is less than the time to transmit it, a single stream is capable of saturating a network connection. This is because, in grpc-go, the N+1th message can be serialized and compressed while the Nth message is being transmitted. However, compression is usually a slow process that results in a small amount of data which can be quickly transmitted, so this is not usually the case with compression enabled. References: #1879, #2355.

We would like to separate the encoding and transmission steps so that users are able to perform multiple encodes simultaneously and take advantage of system parallelism.

Proposed API:

package grpc

type PreparedMsg struct { /* Nothing exported */ }

// Encode prepares msg into p (marshals and compresses) for use with s.SendMsg.
// msg may not be modified until after SendMsg is called with p.  p is not valid
// if a non-nil error is returned.
func (p *PreparedMsg) Encode(s grpc.Stream, msg interface{}) error { ... }

If a PreparedMsg is passed to SendMsg, SendMsg will use the PreparedMsg's internal buffer to send the message on the stream, bypassing the marshal and compress steps.

This API, as opposed to func NewPreparedMsg(msg interface{}) PreparedMsg, would allow users to re-use a PreparedMsg, and may save allocations of internal buffers.

dfawley avatar Nov 01 '18 22:11 dfawley

This would require updating the grpc.Stream interface to get the required codec and compressors for the PreparedMsg. Right now there is no method to access those universally for any stream.

prannayk avatar Dec 30 '18 14:12 prannayk

This would require updating the grpc.Stream interface to get the required codec and compressors for the PreparedMsg. Right now there is no method to access those universally for any stream.

We can do this without changing grpc.Stream. I was originally thinking we would type assert s to grpc.clientStream, but this would make it so Streams implemented by interceptors couldn't support this operation. Instead, we could store this information in the rpcInfo that we already store in the stream's context instead:

https://github.com/grpc/grpc-go/blob/e441557ee90e1b93ac1f42ab8904845fd8d6e637/rpc_util.go#L670-L684

...or add a new key/value in the context, but this one seems fine to extend.

An interceptor could mask this context completely, but that should be rare, and there are limits to what we can do given that we can't add methods to grpc.Stream.

dfawley avatar Jan 07 '19 22:01 dfawley

I think a good idea is to do the following

Change rpcInfo to:

type rpcInfo struct {
    failfast.      bool
    codec        baseCodec
    cp              Compressor
    comp         encoding.Compressor
}

Following that, we will have to change the implementation of grpc.SendMsg for every stream individually. This should type assert m to PreparedMsg and use the marshalled and compressed data inside it if the type assertion succceeds. If assertion fails, revert to the present implemenation. This would be same for all streams, but will have to be implemented for every stream individually, since we can not make any changes to grpc.Stream.

Since we can not add methods to grpc.Stream we can not ensure that anyone implementing a new stream adheres to this, but this will efficiently implement it for the present implementation.

An alternate implementation is as follows :

  1. Create a type BaseStream struct which has a member as a function pointer called sendMsgFn
  2. BaseStream implements SendMsg which first type asserts to PreparedMsg. On success it passeds hdr, payload to the function pointer as sendMsgFn(hdr, payload). If not, then create the PreparedMsg on the fly.

This would ensure that every stream that extends BaseStream implements this feature in the right way. Any interceptor which wants to implement this can easily do this by extending the BaseStream struct. Otherwise, we will have to copy paste this, and implement this again everytime we implement a new stream that wants to implement this.

prannayk avatar Jan 10 '19 10:01 prannayk

This is done for the sending side.

A similar optimization could be done on the receiving side, too.

menghanl avatar Jun 05 '19 23:06 menghanl

@prannayk is it possible to make this available for ServerStreams as well?

eafzali avatar Mar 27 '20 09:03 eafzali

doing something like https://github.com/grpc/grpc-go/pull/3480 solved my problem at least. But I'm completely new to this codebase, can someone guide me how should I proceed with this? Like which kinda of testing would make sense for this? Or what would make sense for the failFast?

eafzali avatar Mar 27 '20 12:03 eafzali