zilla
zilla copied to clipboard
Support dynamic message key and headers in `grpc-kafka`
Support dynamic message key and headers in grpc-kafka
when
- producing to Kafka, based on
message
record - fetching from Kafka, based on request
metadata
For example:
bindings:
grpc_kafka0:
type: grpc-kafka
kind: proxy
routes:
- when:
- method: example.ChatService/send
exit: kafka_cache_client0
with:
capability: produce
topic: chat-messages
key: ${message.chatroom}
- when:
- method: example.ChatService/receive
exit: kafka_cache_client0
with:
capability: fetch
topic: chat-messages
filters:
- key: ${metadata.chatroom}
Note: grpc
metadata is already sent between grpc
binding and grpc-kafka
binding at the beginning of the stream, however only grpc
binding currently understands the structure of the payload, so we need to determine how to make grpc-kafka
payload type aware, such that expressions like ${message.chatroom}
can understand which field number needs to be extracted from the payload and inserted into the Kafka message key for example. This may most naturally fit with upcoming catalog
support for schema-registry
, so that grpc
protobuf
service definitions can be defined once via a catalog
and the referenced from each of grpc
and grpc-kafka
as needed to understand type information.
Note: we should also support injection of trusted identity via ${guarded['jwt0'].identity}
expression for example, that can be used to populate a message header, such as sender
, in the produced Kafka message.
Note: Message body extraction and SMT would feed into this feature enabling more routing and filtering options.