smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Deprecate and replace MessageConverter with reactive IncomingMessageConverter

Open kdubb opened this issue 3 years ago • 2 comments
trafficstars

Fixes #1769

IncomingMessageConverter / MessageConverter

Deprecate MesasgeConverter

MessageConverter's interface is synchronous which does not allow it to handle failures in any meaningful way. First, if message conversion fails the interface requires it to return something which cannot currently be null; meaning there is no current mechanism to "ignore" a failed message. Second, it is required that MessageConverters handle calling ack or nack for a failure. Unfortunately those methods return CompletionStages. Converters must ignore the results of ack/nack or perform some sort of blocking wait for them to complete.

After talking with @cescoffier we decided changing the interface to use a reactive method would be best.

Introducing IncomingMessageConverter

IncomingMessageConverter is the same interface but returns a Uni<Message<?>>. Changing its name serves two purposes. 1) The new name allows/prepares for outgoing message converters (as discussed in issues like #1162). 2) It allows continued use of (now deprecated) MessageConverter until such time as it is removed.

Adapter AnyMessageConverter

To facilitate using both IncomingMessageConverter and MessageConverter a new base interface AnyMessageConverter was added. It is marked deprecated along with MessageConverter and when it is decided to remove MessageConverter, AnyMessageConverter will be removed as well.

ConverterUtils Changes

Wrapping Legacy Converters

Calls to the deprecated MessageConverter.convert are now wrapped with failure handling that logs the failure, rejects (aka nacks) and ignores the message. This solves issues with stalling of message handling (e.g. #1769) since failures cannot be handled by these converters.

Wrapping is not done for IncomingMessageConverter, now that the interface provides the full capabilities to handle failures gracefully it is expected that they will. All failures from streams returned from IncomingMessageConverter are ignored without logging or ack/nack (this is explained in the interface's API docs).

canConvert

Previously due to caching of converters, the canCovert method was only ever called one time, when the converter was cached. This has the following problems:

  1. The interface's API docs say that it will be called every time before convert is called.
  2. The call to canConvert is passed an instance of Message<?> to make its decision but each call to convert will be passed a unique instance of Message<?> which may not be anything like the message passed to canCovert.

For these reasons canConvert is now always called (as the API docs already suggest) even after converter caching. If a converter cannot be used with the current message instance a new converter will be cached. For converters that see the same exact messages all the time there is little to no penalty. Additionally converters that may see a wide range of message types will now work correctly.

No Identity converter

The ConverterUtils.convert method checks for the ability to assign the payload type in the message to the type target payload type required by the injection target. There is no need to lookup and use an identity converter since the message payload is already in a form the injection target can use.

RabbitMQ Conversion

IncomingRabbitMQMessage's ad-hoc conversion in IncomingRabbitMQMessage.getPayload() has been replaced with a standard set of converts that replicate the previous behavior but with better error handling and more deterministic logic.

Additionally this removes the content-type-override for RabbitMQ incoming connectors. It was used as a hack to make conversion happen on unsupported content types. This should be done in an IncomingMessageConverter, with proper error handling, whenever this functionality is needed.

kdubb avatar Jun 16 '22 20:06 kdubb

@cescoffier As discussed in the Zulip chat. There are a 5 or 6 failing Kafka tests that seem to be a result of the now required onItem().transformtoUniAndConcatenate() used during conversion.

kdubb avatar Jun 16 '22 20:06 kdubb

Codecov Report

Merging #1772 (be40a56) into main (35fdf68) will decrease coverage by 61.33%. The diff coverage is 0.00%.

Impacted file tree graph

@@              Coverage Diff              @@
##               main    #1772       +/-   ##
=============================================
- Coverage     73.83%   12.49%   -61.34%     
+ Complexity     3145      441     -2704     
=============================================
  Files           275      276        +1     
  Lines         11228    11238       +10     
  Branches       1437     1432        -5     
=============================================
- Hits           8290     1404     -6886     
- Misses         2249     9645     +7396     
+ Partials        689      189      -500     
Impacted Files Coverage Δ
...allrye/reactive/messaging/AnyMessageConverter.java 0.00% <0.00%> (ø)
.../smallrye/reactive/messaging/MessageConverter.java 0.00% <ø> (ø)
...reactive/messaging/providers/AbstractMediator.java 0.00% <ø> (-79.46%) :arrow_down:
...messaging/providers/extension/ChannelProducer.java 0.00% <ø> (-78.88%) :arrow_down:
...messaging/providers/extension/MediatorManager.java 0.00% <ø> (-82.56%) :arrow_down:
...ve/messaging/providers/helpers/ConverterUtils.java 0.00% <0.00%> (-63.64%) :arrow_down:
...tive/messaging/providers/i18n/ProviderLogging.java 0.00% <ø> (-100.00%) :arrow_down:
...allrye/reactive/messaging/health/HealthReport.java 0.00% <0.00%> (-100.00%) :arrow_down:
...ipse/microprofile/reactive/messaging/Metadata.java 0.00% <0.00%> (-100.00%) :arrow_down:
...tive/messaging/ce/impl/BaseCloudEventMetadata.java 0.00% <0.00%> (-100.00%) :arrow_down:
... and 221 more

codecov-commenter avatar Jun 16 '22 21:06 codecov-commenter

Closing this as improvements were brought in with #1837 and #1856

ozangunalp avatar Nov 07 '22 15:11 ozangunalp