pulsar-grpc
pulsar-grpc copied to clipboard
gRPC protocol handler for Pulsar
The gRPC protocol handler for Pulsar provides a gRPC interface as an alternative to the Pulsar binary protocol.
The goal of this handler is to provide an API easier to integrate than the binary TCP protocol when there is no existing Pulsar driver for a given language or when a wanted feature is not yet implemented in a Pulsar driver but there exists a gRPC implementation.
Enable the gRPC protocol handler on your existing Apache Pulsar clusters
Build the gRPC protocol handler
- clone this project from GitHub to your local.
git clone https://github.com/cbornet/pulsar-grpc.git
cd pulsar-grpc
- build the project.
mvn clean package -DskipTests
- the nar file can be found at this location.
./protocol-handler/target/pulsar-protocol-handler-grpc-${version}.nar
Install the gRPC protocol handler
As mentioned previously, the gPRC protocol handler is a plugin that can be installed to the Pulsar brokers.
Version 2.7+ of Apache Pulsar brokers is required for this plugin to work. The version of the plugin used should match the one from the broker. Releases are numbered after the broker version they are built against. Eg: pulsar-protocol-handler-grpc-2.8.2.1.nar should be used with Pulsar 2.8 brokers.
You need to configure the Pulsar broker to run the gRPC protocol handler as a plugin, that is, add configurations in Pulsar's configuration file, such as broker.conf or standalone.conf.
-
Set the configuration of the gRPC protocol handler.
Add the following properties and set their values in Pulsar configuration file, such as
conf/broker.conforconf/standalone.conf.Property Set it to the following value Default value messagingProtocolsgrpc null protocolHandlerDirectoryLocation of the NAR file ./protocols Example
messagingProtocols=grpc protocolHandlerDirectory=./protocols -
Set the gRPC service ports.
Set the
grpcServicePortproperty to start a plaintext gRPC server.Set the
grpcServiceTlsPortproperty to start a TLS secured gRPC server. The gRPC protocol handler uses the same configuration properties as the Pulsar broker (see Transport Encryption using TLS for details).Example
grpcServicePort=9080 grpcServiceTlsPort=9443
Restart Pulsar brokers to load the gRPC protocol handler
After you have installed the gRPC protocol handler to Pulsar broker, you can restart the Pulsar brokers to load it.
Use the gRPC API
The API is specified in the file PulsarApi.proto and can be used to generate Pulsar gRPC clients in the chosen language. The gRPC API has a lot in common with the Pulsar binary protocol: the protobufs exchanged are almost the same. The following documentation takes the binary protocol definitions of the protos as reference and only highlights the differences.
One difference with the binary protocol is that the message metadata and payload are embedded inside the CommandSend/CommandMessage proto (the performance optimization of the binary protocol cannot be done with standard gRPC).
In CommandSend/CommandMessage, there are 3 possible modes to encode the message and payload:
bytes: this format is the closest from the binary protocol. The client will have to encode/decode the metadata and payload with the binary framing. It's the only mode possible if the message is encrypted.MetadataAndPayload: in this format the metadata and payload are sent/received in distinct protobuf fields instead of the Pulsar framing. In aCommanSend, if thecompressfield is set, the message will be compressed by the broker using the compressions settings of the metadata. It's also possible to compress on the client.Messages: this format allows to send/receive multiple messages in batch using protobuf format instead of the binary framing. The compression settings contained in themetadatafield are used by the broker to compress/uncompress the message.
Other difference with the binary protocol is that gRPC has its own keep-alive and reconnection management so you don't have to implement it on the client.
For producers/consumers, the gRPC flow control is used so you don't have to handle rate limit sending errors or consumer flow messages.
Topic Lookup
rpc lookup_topic(CommandLookupTopic) returns (CommandLookupTopicResponse) {}
Topic lookup works similarly to the binary protocol except that it returns the grpcServiceHost, grpcServicePort and grpcServicePortTls owning the given topic in the response.
It's also possible to lookup the broker by REST or binary protocol and then making a call to the topic's broker
/admin/v2/broker-stats/load-reportendpoint to get the info in theprotocols.grpcfield in the formgrpcServiceHost=xxx;grpcServicePort=xxx;grpcServicePortTls=xxx.
Producing messages
Producing a single message
rpc produceSingle(CommandProduceSingle) returns (CommandSendReceipt) {}
This is a simplified interface to send one messages one at a time. Note that authentication/authorization will occur at each call so prefer the streaming interface if you have a lot of messages to send.
CommandProduceSingle assembles a CommandProducer used to create a producer and a CommandSend containing the message to send.
The producer is automatically closed at the end of the rpc call so there's no CloseProducer command needed.
Producing a stream of messages
rpc produce(stream CommandSend) returns (stream SendResult) {}
This call creates a producer to send messages continuously and receive acknowledgments asynchronously.
The CommandProducer used to create the producer must be passed as gRPC call metadata with the key pulsar-producer-params-bin and encoded in protobuf.
SendResult can be one of CommandProducerSuccess, CommandSendReceipt, CommandSendError.
If producer/broker/topic rate limit is reached, the gRPC flow control will be triggered. So you don't have to worry about rate limiting sending errors.
The producer is automatically closed at the end of the rpc call so there's no CloseProducer command needed.
Consuming messages
rpc consume(stream ConsumeInput) returns (stream ConsumeOutput) {}
This call creates a consumer to receive messages continuously and send acknowledgments.
The CommandSubscribe used to create the producer must be passed as gRPC call metadata with the key pulsar-consumer-params-bin and encoded in protobuf.
Compared to the binary protocol, CommandSubscribe has an additional preferedPayloadType field to indicate how the CommandMessage should be sent preferably. It can take the values:
MESSAGES(default): batch messages exposed in protobufBINARY: raw metadata and payload encoded with the binary framingMETADATA_AND_PAYLOAD: metadata and payload in separate protobuf fieldsMETADATA_AND_PAYLOAD_UNCOMPRESSED: metadata and payload in separate protobuf fields with payload uncompressed on the broker.
If the message is encrypted, the BINARY mode will be used as the broker cannot decrypt it.
ConsumeInput can be one of CommandAck, CommandFlow, CommandUnsubscribe, CommandRedeliverUnacknowledgedMessages,CommandConsumerStats,CommandGetLastMessageId,CommandSeek.
ConsumeOutput can be one of CommandSubscribeSuccess, CommandMessage, CommandAckResponse, CommandActiveConsumerChange, CommandReachedEndOfTopic, CommandConsumerStatsResponse, CommandGetLastMessageIdResponse, CommandSuccess, CommandError.
The gRPC flow control is used to automatically backpressure the arrival of new messages. So there's no need to send CommandFlow messages to ask for new messages. CommandFlow shall be called once to buffer some messages on the broker for throughput tuning.
The consumer is automatically closed at the end of the rpc call so there's no CloseConsumer command needed.
Authenticating
All Pulsar modes of authentication are supported (TLS, Basic, Athenz, JWT, Kerberos).
Authentication is done by attaching a CommandAuth as a binary metadata header with key pulsar-auth-bin that contain the same auth_method and auth_data fields as the Pulsar binary CommandConnect message.
For Kerberos/SASL the flow is similar to the one for the REST interface:
- The client sends a
CommandAuthmessage in a call, the broker will immediately close the call with aCommandAuthChallengeas a binary metadata header with keypulsar-authchallenge-bin. - The client must verify and solve the challenge then send the result in a call with
CommandAuthResponseas a binary metadata header with keypulsar-authresponse-bin. - The broker will verify the challenge response and if successful will close the call with an
AuthRoleTokenas a binary metadata header with keypulsar-authroletoken-bin. - The
AuthRoleTokenshall then be used to authentify subsequent calls by passing it as a binary metadata header with keypulsar-authroletoken-bin.
Transactions
The gRPC protocol handler has full support for transactions. PIP 31 describes the flow and the message definition for transactions. In gRPC this results in the following RPCs
rpc create_transaction(CommandNewTxn) returns (CommandNewTxnResponse) {}
rpc add_partitions_to_transaction(CommandAddPartitionToTxn) returns (CommandAddPartitionToTxnResponse) {}
rpc end_transaction(CommandEndTxn) returns (CommandEndTxnResponse) {}
rpc end_transaction_on_partition(CommandEndTxnOnPartition) returns (CommandEndTxnOnPartitionResponse) {}
rpc end_transaction_on_subscription(CommandEndTxnOnSubscription) returns (CommandEndTxnOnSubscriptionResponse) {}