spring-cloud-alibaba icon indicating copy to clipboard operation
spring-cloud-alibaba copied to clipboard

spring-cloud-bus整合rocketmq 无法消费自定义的事件

Open skylarkn opened this issue 3 years ago • 6 comments

问题:

服务A作为发起方可以创建本地事件,并发送消息到rocketmq,消费端也可以接收消息,但是无法将接收到的消息关联本地事件监听

简单打个断点:

org.springframework.cloud.bus.BusConsumer.accept(RemoteApplicationEvent event) 添加断点,event参数类型是UnknownRemoteApplicationEvent

对比rabbitMq: 一模一样的代码 将实现类从 spring-cloud-starter-bus-rocketmq 切换到 spring-cloud-starter-bus-amqp 问题就解决了

我创建的测试项目

skylarkn avatar Sep 03 '22 09:09 skylarkn

我也遇到这个问题,初步判断是序列化反序列化没走同一个Converter。 反序列化 从Message<?> 反序列化 payload 是走 BusJacksonAutoConfiguration.BusJacksonMessageConverter。但是BusJacksonMessageConverter转换到指定子类是依赖RemoteApplicationEvent里的注解@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")。但反序列化是Message里payload发现没有type值。故Jackson无法将其转换成自定义事件。 关键代码处 BusJacksonAutoConfiguration.BusJacksonMessageConverter#convertFromInternal(179)

pumbf avatar Sep 08 '22 03:09 pumbf

springboot版本:2.6.11 springcloud版本:2021.0.3 springcloudalibaba版本:2021.0.1.0

pumbf avatar Sep 08 '22 03:09 pumbf

hi @pumbf @skylarkn ,if you have some idea to solve it, welcome to create pull request.If not,we will solve it later.

sollhui avatar Sep 08 '22 04:09 sollhui

hi @pumbf @skylarkn ,if you have some idea to solve it, welcome to create pull request.If not,we will solve it later.

hi , i found the reason. Because s-c-b-rocketmq will create MappingFastJsonMessageConverter. it will replace ApplicationJsonMessageMarshallingConverter to convert the message. so "type" property will not exist

pumbf avatar Sep 08 '22 06:09 pumbf

image

core method: SimpleFunctionRegistry.FunctionInvocationWrapper::convertOutputMessageIfNecessary

pumbf avatar Sep 08 '22 06:09 pumbf

temporarily solve this problem by overriding the bean and removing the FastJsonConverter. @skylarkn

 /**
     * fix issue https://github.com/alibaba/spring-cloud-alibaba/issues/2742
     * @return  CompositeMessageConverter
     */
    @Bean(RocketMQMessageConverter.DEFAULT_NAME)
    public CompositeMessageConverter rocketMqMessageConvert() {
        Set<MessageConverter> messageConverters = new HashSet();
        ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
        byteArrayMessageConverter.setContentTypeResolver((ContentTypeResolver)null);
        messageConverters.add(byteArrayMessageConverter);
        messageConverters.add(new StringMessageConverter());
        return new CompositeMessageConverter(messageConverters);
    }

pumbf avatar Sep 08 '22 12:09 pumbf

Plz keep open util I check 2.2.x.

Sorieee avatar Oct 08 '22 13:10 Sorieee

还是不行

dream0708 avatar Jun 07 '23 15:06 dream0708