SmartCompositeMessageConverter swallows conversion errors for simple types
Actual Behaviour:
When a message is received and the consumer accepts a simple type i.e. a raw Map without generics information, any conversion errors will be swallowed.
org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper#convertInputMessageIfNecessary will call SmartCompositeMessageConverter#fromMessage(Message<?> message, Class<?> targetClass) when no convertionHint is required. This results in errors thrown by any of the converters being swallowed as the Exception is caught, a debug message logged, and null is returned. Once the list of converters is exhausted, null is returned to the caller and the Message#payload is passed to the consumer as is (e.g. byte[]), this then results in a ClassCastException being raised.
Expected Behaviour:
SmartCompositeMessageConverter#fromMessage(Message<?> message, Class<?> targetClass) should propagate the Exception just like SmartCompositeMessageConverter#fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) so that message conversion errors are handled consistently.
Affects versions: 3.x, 4.x
The issue can be recreated with a simple unit test.
@LukeKynaston Can you paste the unit test here? That would make things easier to debug. Thanks!
Hi @sobychacko, I meant that you can write a unit test in the spring-cloud-stream repo to test this.
The fact that the code does different things for different overloads is the bug!
In the first scenario we swallow the exception and in the second we propagate the exception:
@Override
@Nullable
public Object fromMessage(Message<?> message, Class<?> targetClass) {
for (MessageConverter converter : getConverters()) {
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
return message.getPayload();
}
try {
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
return result;
}
}
catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Failure during type conversion by " + converter + ". Will try the next converter.", e);
}
}
}
return null;
}
@SuppressWarnings("unchecked")
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
return message.getPayload();
}
Object result = null;
if (message.getPayload() instanceof Iterable && conversionHint != null) {
Iterable<Object> iterablePayload = (Iterable<Object>) message.getPayload();
Type genericItemType = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0);
Class<?> genericItemRawType = FunctionTypeUtils.getRawType(genericItemType);
List<Object> resultList = new ArrayList<>();
for (Object item : iterablePayload) {
boolean isConverted = false;
if (item.getClass().getName().startsWith("org.springframework.kafka.support.KafkaNull")) {
resultList.add(null);
isConverted = true;
}
for (Iterator<MessageConverter> iterator = getConverters().iterator(); iterator.hasNext() && !isConverted;) {
MessageConverter converter = (MessageConverter) iterator.next();
if (!converter.getClass().getName().endsWith("ApplicationJsonMessageMarshallingConverter")) { // TODO Stream stuff, needs to be removed
Message<?> m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build(); // TODO Message creating may be expensive
Object conversionResult = (converter instanceof SmartMessageConverter & genericItemRawType != genericItemType ?
((SmartMessageConverter) converter).fromMessage(m, genericItemRawType, genericItemType) :
converter.fromMessage(m, genericItemRawType));
if (conversionResult != null) {
resultList.add(conversionResult);
isConverted = true;
}
}
}
}
result = resultList;
}
else {
for (MessageConverter converter : getConverters()) {
if (!converter.getClass().getName().endsWith("ApplicationJsonMessageMarshallingConverter")) { // TODO Stream stuff, needs to be removed
result = (converter instanceof SmartMessageConverter ?
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
converter.fromMessage(message, targetClass));
if (result != null) {
return result;
}
}
}
}
return result;
}
I think this issue belongs in the Spring Cloud Function repo. Moving the issue from Spring Cloud Stream to here. cc @LukeKynaston
This is related to https://github.com/spring-cloud/spring-cloud-function/issues/739 and is an unfortunate choice of default behavior.
In my case I'm using function routing based on a message header and have each function accept a specific record that's deserialized from JSON. If the payload is malformed I want the message to not even touch the function and go to the configured error handler (I'm using Spring Cloud Stream) but instead I'm receiving a Message<byte[]> which gets me ClassCastException as soon as I call getPayload().
Well, this is a default behavior and the intention is to allow user to receive a raw message by changing function signature (even temporarily), primarily to investigate. This is because conversion errors are usually development-type errors and could be fixed in many different ways including by adding your own MessageConverter to the stack - https://docs.spring.io/spring-cloud-function/reference/spring-cloud-function/programming-model.html#user-defined-message-converters
So what would you propose to change?
I think the better thing to do is to let the configured error handler (as in Consumer<ErrorMessage>) process those, which is what happens anyway once the casting exception happens. The main difference is that the user-defined function wouldn't get called with incompatible argument type.
@LukeKynaston Please look at this commit. The issue has been resolved similarly to what you are describing - the user-defined function wouldn't get called with incompatible argument type. You can create a bean that implements MessageConverterHelper and implement
default boolean shouldFailIfCantConvert(Message<?> message) {
return false;
}
We provide a default one, but as you can see the shouldFailIfCantConvert method return false to preserve the existing behavior, but you can override it, thus making it fail.
Please let me know if that is sufficient or you were looking for something else
@olegz, Thanks I think this will resolve our issue :). I'll check with the team before closing this, just to make sure everyone is happy with this solution.
@LukeKynaston I assume this could be closed?
Hi @olegz
I work with Luke. Unfortunately, he's been off due to illness.
Firstly thank you for looking into this issue. While I think we can make things work with the above commit, I'm not convinced it's the ideal approach. MessageConverter#fromMessage states If the converter does not support the specified media type or cannot perform the conversion, it should return null.
Given the above contract, I would expect converter implementations to handle exceptions and return null if they wish to be lenient/leave it to another converter to try the conversion.
In the case above the MessageConverter in question attempts to do the conversion as it can and should perform the conversion, as it supports application/json. The resulting exception should be surfaced immediately and the error handler called instead of trying the other converters IMO as this may actually lead to unexpected and in our case unwanted behaviour.
Perhaps I'm missing something, but, I feel it rarely makes sense to pass the raw payload to the consumer because it will either fail with a ClassCastException or an AopInvocationException for advised consumers e.g. @Transactional.
If you wanted to support the old behaviour and what we were expecting, I think it would be more useful if the MessageConverterHelper#this.failConversionIfNecessary was called in the exception handler block rather than after all the converters have been exhausted.