springwolf-core icon indicating copy to clipboard operation
springwolf-core copied to clipboard

@AsyncListener is not overriding @KafkaListener and both payloads appears

Open fcmdeveloper1 opened this issue 1 year ago • 9 comments

Dear @timonback, i tried for java and i have duplicate messages, although i used the @AsyncListener which should override the payload in the method itself.

but the topic name and the group name should still be taken from the @KafkaListener annotation, unless you will provide attribute for them in the @AsyncListener annotation

example:

@AsyncListener( operation = @AsyncOperation(payloadType = LearningEvent.class, description = "description", channelName = "lms" )) @KafkaListener(topics = "lms", groupId = "test") public void receiveExamplePayload2( @payload List<ConsumerRecord<String, LearningEvent>> records, Acknowledgment acknowledgment) { log.info("Received new message in example-queue: {}", records.toString()); }

the generated asyncAPI has :

"channels": { "lms": { "messages": { "com.asyncapi.kafka.dtos.LearningEvent": { "$ref": "#/components/messages/com.asyncapi.kafka.dtos.LearningEvent" }, "org.apache.kafka.clients.consumer.ConsumerRecord": { "$ref": "#/components/messages/org.apache.kafka.clients.consumer.ConsumerRecord" } } } }

attached is a sample project can be imported by maven and check the output, so it will be easy to reproduce

asyncApi_github.zip

image

My Best Regards

fcmdeveloper1 avatar Apr 28 '24 05:04 fcmdeveloper1

Welcome to Springwolf. Thanks a lot for reporting your first issue. Please check out our contributors guide and feel free to join us on discord.

github-actions[bot] avatar Apr 28 '24 05:04 github-actions[bot]

Hi @fcmdeveloper1, Thank you for the report. There is indeed a small bug in the extraction of payloads from generics.

When the bug is released, you still will need to add this to your springwolf configuration so that ConsumerRecords are extracted correctly:

springwolf.payload.extractable-classes.org.apache.kafka.clients.consumer.ConsumerRecord=1

More details: https://www.springwolf.dev/docs/configuration/documenting-messages/#unwrapping-the-payload

timonback avatar May 03 '24 13:05 timonback

Dear @timonback,

   this means that by adding the `@AsyncListener` annotation, the LearningEvent payload only will be shown and the other payload of the method itself will be ignored which is the ConsumerRecord, or you mean i have to add this property  `"springwolf.payload.extractable-classes.org.apache.kafka.clients.consumer.ConsumerRecord=0"` to actually ignore the ConsumerRecord payload?

My Best Regards

fcmdeveloper1 avatar May 05 '24 10:05 fcmdeveloper1

Hi @fcmdeveloper1 ,

Springwolf does read the payload from the method signature. It will see the consumerrecord and believe that it is the actual payload. However consumerrecord is a wrapper around the actual learningevent payload. To tell springwolf to unwrap/extract the payload, you will have to add the configuration property in my post above

This is independent of AsyncListener or KafkaListener. Also note value. For more details have a look at the website documentation. As this is a bit complex, we happily take PRs to improve the documentation on it.

timonback avatar May 05 '24 12:05 timonback

Dear @timonback, i though from the documentation, it is mentioned that the @AsyncListener has a higher precedence over the method payload itself, that's why i raised this defect and this is what i expected (to have the message from the @AsyncListener annotation instead of the ConsumerRecord)

image

however, when i tried with this mentioned property, "springwolf.payload.extractable-classes.org.apache.kafka.clients.consumer.ConsumerRecord=1", the generated asyncapi yaml has 2 messages, one for the LMSEvent and the other one is just a string

"lms": { "messages": { "com.asyncapi.kafka.dtos.LearningEvent": { "$ref": "#/components/messages/com.asyncapi.kafka.dtos.LearningEvent" }, "java.lang.String": { "$ref": "#/components/messages/java.lang.String" } } }

should i try with some new patch or version?

My Best Regards

fcmdeveloper1 avatar May 06 '24 08:05 fcmdeveloper1

Yes, try with the SNAPSHOT version as it contains the bugfix. More details are in the README.

What actually happens is that the AsyncListener generates the documentation with your event. Additionally, the method is detected as second time due to the kafkalistener annotation. This one will see the consumerrecord. In the current release there is a bug. In the snapshot version (and with the configuration setting), this should detect the same event and thereby resolve the reported issue.

timonback avatar May 06 '24 10:05 timonback

Dear @timonback, i tested it and it seems it works fine if the payload of the method has the same generics object

public void receiveExamplePayload2( @Payload(required = true) List<ConsumerRecord<String, LearningEvent>> records, Acknowledgment acknowledgment)

and the @AsyncListener has the same objects @AsyncListener( operation = @AsyncOperation(payloadType = LearningEvent.class, description = "description", channelName = "lms" ))

But

if the method signature contains for example another object such as ParentDto, both objects will appear, which still not the expected.

@AsyncListener( operation = @AsyncOperation(payloadType = LearningEvent.class, description = "description", channelName = "lms" )) @KafkaListener(topics = "lms", groupId = "test") public void receiveExamplePayload2( @Payload(required = true) List<ConsumerRecord<String, ParentEventDto>> records, Acknowledgment acknowledgment) { log.info("Received new message in example-queue: {}", records.toString());

My Best Regards

fcmdeveloper1 avatar May 09 '24 13:05 fcmdeveloper1

Hi @fcmdeveloper1 ,

we assume that the payload in the method signature (ParentEventDto) matches the actual payload (LearningEvent).

What happens in springwolf, is that we use scanners to detect listeners (and publishers), extract the payload and build the asyncapi.json file.

One scanner will find the ParentEventDto for the topic and the other scanner will find the LEarningEvent. Those are merged and therefore both payloads detected.

Please check out the faq: https://www.springwolf.dev/docs/faq#consumers-are-detected-multiple-times-with-different-payloads

One option is to disable the 'other' scanner.

(As this seems unclear, you are more than welcome to simply edit the documentation and suggest a better description)

timonback avatar May 10 '24 13:05 timonback

Dear @timonback,

   i got your point, but is there a possibility to add a flag in the @asyncLister to mention if we need to have the @KafkaListener or not for the included method or some flag to ignore this @KafkaListener if needed by adding any annotation to the method signature itself?

My Best Regards

fcmdeveloper1 avatar May 12 '24 06:05 fcmdeveloper1

Hi, I don't see a way at this point, besides the options mentioned above.

timonback avatar May 31 '24 16:05 timonback

The change is available in the latest release. 🎉

Thank you for the report/contribution and making Springwolf better!

github-actions[bot] avatar May 31 '24 16:05 github-actions[bot]