spring-cloud-function
spring-cloud-function copied to clipboard
MessagingException.failedMessage is raw payload instead of converted value
FunctionInvocationWrapper
invokeConsumer()
and invokeFunction()
should create a Message
with the converted payload and throw a MessageHandlingException
with that message.
Consider the following:
@SpringBootApplication
public class So76610419Application {
public static void main(String[] args) {
SpringApplication.run(So76610419Application.class, args);
}
@Bean
Consumer<Foo> input() {
return foo -> {
System.out.println(foo);
throw new RuntimeException("test");
};
}
@ServiceActivator(inputChannel = "errorChannel")
void handle(ErrorMessage em) {
System.out.println(((MessagingException) em.getPayload()).getFailedMessage().getPayload());
}
@Bean
ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input-in-0", "{\"bar\":\"baz\"}".getBytes());
};
}
public static class Foo {
String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
I believe it's a reasonable expectation that the user has access to the converted payload.
The raw, unconverted, message is still available, in ErrorMessage.originalMessage
.
@garyrussell Your sample is not producing what i believe you are reporting
java.lang.IllegalStateException: Failed to execute ApplicationRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:755) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at com.example.demofunction32.DemoFunction32Application.main(DemoFunction32Application.java:18) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1018) ~[kafka-clients-3.5.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) ~[kafka-clients-3.5.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1042) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:785) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:756) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:539) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
at com.example.demofunction32.DemoFunction32Application.lambda$1(DemoFunction32Application.java:37) ~[classes/:na]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:765) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
... 5 common frames omitted
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29) ~[kafka-clients-3.5.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.5.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015) ~[kafka-clients-3.5.1.jar:na]
... 12 common frames omitted