[BUG] Pulsar Consumer can't read message from Kafka Client.
Describe the bug We have a Pulsar cluster with KOP enabled.
When a Pulsar Producer sends messages to a topic used by a Kafka consumer, Kafka's consumer is able to receive and read the message.
When a Kafka Producer sends messages to a topic used by a Pulsar consumer, Pulsar's consumer is not able to properly read the message.
To Reproduce Steps to reproduce the behavior:
- Start a kafka producer to send message to $topic
- Start a pulsar consumer to receive message from $topic
- Pulsar consumer will show error:
Some required fields are missing at org.apache.pulsar.common.api.proto.SingleMessageMetadata.checkRequiredFields(SingleMessageMetadata.java:478) ~[org.apache.pulsar-pulsar-common-2.9.1.jar:2.9.1] at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:473) ~[org.apache.pulsar-pulsar-common-2.9.1.jar:2.9.1] at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1666) ~[org.apache.pulsar-pulsar-common-2.9.1.jar:2.9.1] at org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1027) ~[org.apache.pulsar-pulsar-client-original-2.9.1.jar:2.9.1] ... 32 more 2022-06-07T19:47:49,411+0000 [pulsar-client-io-1-1] ERROR org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/angeltestpartition-partition-0][cli-sub] Discarding corrupted message at 1466:2
Expected behavior Be able to produce messages from Kafka clients and consumed on Pulsar consumers.
What's your entryFormat config? If you configured entryFormat=kafka, you need to configure a KafkaPayloadProcessor to handle Kafka format messages at the Pulsar consumer side. See https://github.com/streamnative/kop/blob/master/docs/configuration.md#choose-the-proper-entryformat.
The property is set to: entryFormat=mixed_kafka.
Is there any additional configuration that we need to do on Pulsar consumers to successfully read messages published by Kafka producer?
No, you only need the KafkaPayloadProcessor.