messenger-kafka icon indicating copy to clipboard operation
messenger-kafka copied to clipboard

Custom header option for consumers

Open ilyashtrikul opened this issue 4 years ago • 12 comments

Messenger Serializer requires type key with class FQN. This PR makes it possible with headers option in transport config.

framework:
    messenger:
        transports:
            producer:
               # ...
            consumer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    commitAsync: true
                    receiveTimeout: 10000
                    topic:
                        name: "events"
                    kafka_conf:
                        enable.auto.offset.store: 'false'
                        group.id: 'my-group-id' # should be unique per consumer
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
                        max.poll.interval.ms: '45000'
                    topic_conf:
                        auto.offset.reset: 'earliest'
                    # Custom headers for all messages
                    headers:
                      type: Some\Custom\Message\Class

ilyashtrikul avatar Dec 28 '20 19:12 ilyashtrikul

Hi @ilyashtrikul, thanks for the PR!

I also see you fixed the test :)

The headers field is definitely useful. But you can achieve the same result if you supply a custom serializer. And supplying a custom serializer is standard for symfony messenger.

So I'm wondering if you have considered that option?

KonstantinCodes avatar Jan 04 '21 14:01 KonstantinCodes

You're right, custom serializer is a solution, but it's not just copy-paste with some fixes, it's a bundle with own configuration and etc only for one header field.

ilyashtrikul avatar Jan 10 '21 20:01 ilyashtrikul

If you think this changes are not part of this library or will not make work a bit easy - feel free to close this PR ;)

ilyashtrikul avatar Jan 10 '21 20:01 ilyashtrikul

@ilyashtrikul I'm not sure what you mean with own bundle with configuration.

You just specify the service ID of your Serializer in the Messenger config

            consumer:
                dsn: '%env(KAFKA_URL)%'
                serializer: App\Infrastructure\Messenger\MySerializer

KonstantinCodes avatar Jan 12 '21 08:01 KonstantinCodes

MySerializer don't know which type of message come in (without specific headers), so it should be some hardcode for topic -> class in MySerializer or make serializer with own configuration for topic-class (which is better move to bundle for flexible configuring) and it will be a lot of serializer services. My way is set topic-class config nearby all consumer configuration.

ilyashtrikul avatar Jan 14 '21 12:01 ilyashtrikul

@ilyashtrikul Are you using Avro? Avro includes the qualified name, that you can use in your serializer to determine the output class

KonstantinCodes avatar Aug 29 '21 08:08 KonstantinCodes

Hi @ilyashtrikul, thanks for the PR!

I also see you fixed the test :)

The headers field is definitely useful. But you can achieve the same result if you supply a custom serializer. And supplying a custom serializer is standard for symfony messenger.

So I'm wondering if you have considered that option?

Hi @KonstantinCodes!

I find it really helpful to have the ability to pass the Event class as a header option.

I'm using your package in one of my projects (Huge thanks for creating this package!!) and I consume about 10+ different topics and for each, I had to introduce 10+ serializers.

If we have the ability to mention, okay for this topic this is the Event class by mentioning it in the header, then I can simplify this by just having a single generic serializer

What I mean is this

    order_delayed:
        dsn: '%env(KAFKA_URL)%'
        serializer: App\KafkaSerializer
        retry_strategy:
            max_retries: 0
        options:
            topic:
                name: order.delayed
            kafka_conf:
                enable.auto.offset.store: 'false'
                group.id: tracking
            headers:
                type: App\OrderDelayedEvent

Then in a KafkaSerializer, I would just have

$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], self::FORMAT);

What do you think?


Update: If you have a doubt as to whether the headers is the best place to mention the event class, I'd like to suggest an alternative approach like below

      options:
          topic:
              name: order.delayed
              event: App\OrderDelayedEvent

gayansanjeewa avatar Mar 24 '22 14:03 gayansanjeewa

@gayansanjeewa May I ask, how the payload is serialized? it it plain json?

KonstantinCodes avatar Mar 24 '22 16:03 KonstantinCodes

Hey @KonstantinCodes

So the decode method of the serializer is like

    /**
     * @param string[] $encodedEnvelope
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body'])) {
            return new Envelope(new EmptyMessageEvent());
        }

        /** @var OrderDelayedEvent $message */
        $message = $this->serializer->deserialize($encodedEnvelope['body'], OrderDelayedEvent::class, 'json');

        return new Envelope($message);
    }

and the payload would be like

{
    "order_id": "3c77d2f1-4b2a-4eac-bef3-a3c4c74fc64a",
    "user_id": 3300317213,
    "user_email": "[email protected]",
    "order_number": "OPL-100001111"
}

gayansanjeewa avatar Mar 25 '22 04:03 gayansanjeewa

Hey @KonstantinCodes ping :smile: I'd like to hear your feedback on my previous comment. Thanks!

gayansanjeewa avatar Mar 31 '22 02:03 gayansanjeewa

@gayansanjeewa Overriding these properties doesn't really seem like an ideal solution. If you do it this way, you'll need to configure different transports for each topic you consume.

I'd rather just pass the KafkaMessage to the Serializer so you can map the topic name to a class.

KonstantinCodes avatar Mar 31 '22 07:03 KonstantinCodes

Okay, thanks for your feedback @KonstantinCodes!

gayansanjeewa avatar Apr 01 '22 08:04 gayansanjeewa