smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
Deprecate and replace MessageConverter with reactive IncomingMessageConverter
Fixes #1769
IncomingMessageConverter / MessageConverter
Deprecate MesasgeConverter
MessageConverter's interface is synchronous which does not allow it to handle failures in any meaningful way. First, if message conversion fails the interface requires it to return something which cannot currently be null; meaning there is no current mechanism to "ignore" a failed message. Second, it is required that MessageConverters handle calling ack or nack for a failure. Unfortunately those methods return CompletionStages. Converters must ignore the results of ack/nack or perform some sort of blocking wait for them to complete.
After talking with @cescoffier we decided changing the interface to use a reactive method would be best.
Introducing IncomingMessageConverter
IncomingMessageConverter is the same interface but returns a Uni<Message<?>>. Changing its name serves two purposes. 1) The new name allows/prepares for outgoing message converters (as discussed in issues like #1162). 2) It allows continued use of (now deprecated) MessageConverter until such time as it is removed.
Adapter AnyMessageConverter
To facilitate using both IncomingMessageConverter and MessageConverter a new base interface AnyMessageConverter was added. It is marked deprecated along with MessageConverter and when it is decided to remove MessageConverter, AnyMessageConverter will be removed as well.
ConverterUtils Changes
Wrapping Legacy Converters
Calls to the deprecated MessageConverter.convert are now wrapped with failure handling that logs the failure, rejects (aka nacks) and ignores the message. This solves issues with stalling of message handling (e.g. #1769) since failures cannot be handled by these converters.
Wrapping is not done for IncomingMessageConverter, now that the interface provides the full capabilities to handle failures gracefully it is expected that they will. All failures from streams returned from IncomingMessageConverter are ignored without logging or ack/nack (this is explained in the interface's API docs).
canConvert
Previously due to caching of converters, the canCovert method was only ever called one time, when the converter was cached. This has the following problems:
- The interface's API docs say that it will be called every time before
convertis called. - The call to
canConvertis passed an instance ofMessage<?>to make its decision but each call toconvertwill be passed a unique instance ofMessage<?>which may not be anything like the message passed tocanCovert.
For these reasons canConvert is now always called (as the API docs already suggest) even after converter caching. If a converter cannot be used with the current message instance a new converter will be cached. For converters that see the same exact messages all the time there is little to no penalty. Additionally converters that may see a wide range of message types will now work correctly.
No Identity converter
The ConverterUtils.convert method checks for the ability to assign the payload type in the message to the type target payload type required by the injection target. There is no need to lookup and use an identity converter since the message payload is already in a form the injection target can use.
RabbitMQ Conversion
IncomingRabbitMQMessage's ad-hoc conversion in IncomingRabbitMQMessage.getPayload() has been replaced with a standard set of converts that replicate the previous behavior but with better error handling and more deterministic logic.
Additionally this removes the content-type-override for RabbitMQ incoming connectors. It was used as a hack to make conversion happen on unsupported content types. This should be done in an IncomingMessageConverter, with proper error handling, whenever this functionality is needed.
@cescoffier As discussed in the Zulip chat. There are a 5 or 6 failing Kafka tests that seem to be a result of the now required onItem().transformtoUniAndConcatenate() used during conversion.
Codecov Report
Merging #1772 (be40a56) into main (35fdf68) will decrease coverage by
61.33%. The diff coverage is0.00%.
@@ Coverage Diff @@
## main #1772 +/- ##
=============================================
- Coverage 73.83% 12.49% -61.34%
+ Complexity 3145 441 -2704
=============================================
Files 275 276 +1
Lines 11228 11238 +10
Branches 1437 1432 -5
=============================================
- Hits 8290 1404 -6886
- Misses 2249 9645 +7396
+ Partials 689 189 -500
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...allrye/reactive/messaging/AnyMessageConverter.java | 0.00% <0.00%> (ø) |
|
| .../smallrye/reactive/messaging/MessageConverter.java | 0.00% <ø> (ø) |
|
| ...reactive/messaging/providers/AbstractMediator.java | 0.00% <ø> (-79.46%) |
:arrow_down: |
| ...messaging/providers/extension/ChannelProducer.java | 0.00% <ø> (-78.88%) |
:arrow_down: |
| ...messaging/providers/extension/MediatorManager.java | 0.00% <ø> (-82.56%) |
:arrow_down: |
| ...ve/messaging/providers/helpers/ConverterUtils.java | 0.00% <0.00%> (-63.64%) |
:arrow_down: |
| ...tive/messaging/providers/i18n/ProviderLogging.java | 0.00% <ø> (-100.00%) |
:arrow_down: |
| ...allrye/reactive/messaging/health/HealthReport.java | 0.00% <0.00%> (-100.00%) |
:arrow_down: |
| ...ipse/microprofile/reactive/messaging/Metadata.java | 0.00% <0.00%> (-100.00%) |
:arrow_down: |
| ...tive/messaging/ce/impl/BaseCloudEventMetadata.java | 0.00% <0.00%> (-100.00%) |
:arrow_down: |
| ... and 221 more |
Closing this as improvements were brought in with #1837 and #1856