spring-cloud-function icon indicating copy to clipboard operation
spring-cloud-function copied to clipboard

MessagingException.failedMessage is raw payload instead of converted value

Open garyrussell opened this issue 1 year ago • 1 comments

FunctionInvocationWrapper invokeConsumer() and invokeFunction() should create a Message with the converted payload and throw a MessageHandlingException with that message.

Consider the following:

@SpringBootApplication
public class So76610419Application {

	public static void main(String[] args) {
		SpringApplication.run(So76610419Application.class, args);
	}

	@Bean
	Consumer<Foo> input() {
		return foo -> {
			System.out.println(foo);
			throw new RuntimeException("test");
		};
	}

	@ServiceActivator(inputChannel = "errorChannel")
	void handle(ErrorMessage em) {
		System.out.println(((MessagingException) em.getPayload()).getFailedMessage().getPayload());
	}

	@Bean
	ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
		return args -> {
			template.send("input-in-0", "{\"bar\":\"baz\"}".getBytes());
		};
	}

	public static class Foo {

		String bar;

		public String getBar() {
			return this.bar;
		}

		public void setBar(String bar) {
			this.bar = bar;
		}

		@Override
		public String toString() {
			return "Foo [bar=" + this.bar + "]";
		}

	}

}

I believe it's a reasonable expectation that the user has access to the converted payload.

The raw, unconverted, message is still available, in ErrorMessage.originalMessage.

garyrussell avatar Jul 13 '23 17:07 garyrussell

@garyrussell Your sample is not producing what i believe you are reporting

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:755) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at com.example.demofunction32.DemoFunction32Application.main(DemoFunction32Application.java:18) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1018) ~[kafka-clients-3.5.1.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) ~[kafka-clients-3.5.1.jar:na]
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1042) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:785) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
	at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:756) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:539) ~[spring-kafka-3.1.0-M1.jar:3.1.0-M1]
	at com.example.demofunction32.DemoFunction32Application.lambda$1(DemoFunction32Application.java:37) ~[classes/:na]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:765) ~[spring-boot-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	... 5 common frames omitted
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
	at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29) ~[kafka-clients-3.5.1.jar:na]
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.5.1.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015) ~[kafka-clients-3.5.1.jar:na]
	... 12 common frames omitted

olegz avatar Sep 25 '23 11:09 olegz