zilla icon indicating copy to clipboard operation
zilla copied to clipboard

Support dynamic message key and headers in `grpc-kafka`

Open jfallows opened this issue 1 year ago • 4 comments

Support dynamic message key and headers in grpc-kafka when

  • producing to Kafka, based on message record
  • fetching from Kafka, based on request metadata

For example:

bindings:
  grpc_kafka0:
    type: grpc-kafka
    kind: proxy
    routes:
      - when:
          - method: example.ChatService/send
        exit: kafka_cache_client0
        with:
          capability: produce
          topic: chat-messages
          key: ${message.chatroom}
      - when:
          - method: example.ChatService/receive
        exit: kafka_cache_client0
        with:
          capability: fetch
          topic: chat-messages
          filters:
            - key: ${metadata.chatroom}

Note: grpc metadata is already sent between grpc binding and grpc-kafka binding at the beginning of the stream, however only grpc binding currently understands the structure of the payload, so we need to determine how to make grpc-kafka payload type aware, such that expressions like ${message.chatroom} can understand which field number needs to be extracted from the payload and inserted into the Kafka message key for example. This may most naturally fit with upcoming catalog support for schema-registry, so that grpc protobuf service definitions can be defined once via a catalog and the referenced from each of grpc and grpc-kafka as needed to understand type information.

Note: we should also support injection of trusted identity via ${guarded['jwt0'].identity} expression for example, that can be used to populate a message header, such as sender, in the produced Kafka message.

Note: Message body extraction and SMT would feed into this feature enabling more routing and filtering options.

jfallows avatar Aug 24 '23 18:08 jfallows