spring-multirabbit icon indicating copy to clipboard operation
spring-multirabbit copied to clipboard

Retry section under RabbitProperties not being configured

Open mhewedy opened this issue 3 years ago • 4 comments

I've noticed that the Retry section in the RabbitProperties are not being configured (In fact, the whole Listener section), as it is configured in AbstractRabbitListenerContainerFactoryConfigurer but is not considered in the multi rabbit when creating SimpleRabbitListenerContainerFactory here: https://github.com/freenowtech/spring-multirabbit/blob/77cbe04386dad8bf1538d8036df9088c77583172/spring-multirabbit/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfiguration.java#L191

So the question is, provided that ... how can I configure the retry for different ContainerFactory's of multi rabbit?

I've fixed it with this full-of-hacks solution ... your advice appreciated

  • I've applied a hack to configure the Listener part in RabbitProperties for each SimpleRabbitListenerContainerFactory
  • Then, I've applied a hack for the MessageRecoverer (hence we need the key to hold the binding before sending the message to the dl exchange)
@Configuration
@ConditionalOnClass(RabbitTemplate.class)
public class RabbitConfig {

    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Component
    @RequiredArgsConstructor
    private static class MultiRabbitCustomizer implements InitializingBean, ApplicationContextAware {

        private ApplicationContext applicationContext;

        private final RabbitTemplate rabbitTemplate;
        private final MessageConverter messageConverter;
        private final MultiRabbitProperties multiRabbitProperties;
        private final SimpleRoutingConnectionFactory multiRabbitConnectionFactory;

        @Override
        public void afterPropertiesSet() {
            multiRabbitProperties.getConnections().forEach((key, props) -> {

                var bean = applicationContext.getBean(key, SimpleRabbitListenerContainerFactory.class);
                ConnectionFactory targetConnectionFactory = multiRabbitConnectionFactory.getTargetConnectionFactory(key);

                var multiRabbitConfigurer = new MultiRabbitConfigurer(props.getListener().getSimple(), targetConnectionFactory);
                var messageRecoverer = new MultiRabbitRepublishMessageRecoverer(rabbitTemplate, "general_dl_exchange", key);
                multiRabbitConfigurer.setMessageRecoverer(messageRecoverer);
                multiRabbitConfigurer.setMessageConverter(messageConverter);
                multiRabbitConfigurer.configure(bean, multiRabbitConnectionFactory);
            });
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

    @RequiredArgsConstructor
    private static class MultiRabbitConfigurer
            extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {

        private final RabbitProperties.SimpleContainer config;
        private final ConnectionFactory connectionFactory;

        @Override
        public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory ignored) {
            PropertyMapper map = PropertyMapper.get();
            configure(factory, connectionFactory, config);
            map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
            map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
            map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
            map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
        }

        @Override
        public void setMessageConverter(MessageConverter messageConverter) {
            super.setMessageConverter(messageConverter);
        }

        @Override
        public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
            super.setMessageRecoverer(messageRecoverer);
        }
    }

    private static class MultiRabbitRepublishMessageRecoverer extends RepublishMessageRecoverer {
        private final String key;

        public MultiRabbitRepublishMessageRecoverer(RabbitTemplate rabbitTemplate, String errorExchange, String key) {
            super(rabbitTemplate, errorExchange);
            this.key = key;
        }

        @Override
        protected void doSend(@Nullable String exchange, String routingKey, Message message) {
            SimpleResourceHolder.bind(((RabbitTemplate) errorTemplate).getConnectionFactory(), key);
            // routing key is: "error.<original queue name>"
            super.doSend(exchange, routingKey + message.getMessageProperties().getConsumerQueue(), message);
            SimpleResourceHolder.unbind(((RabbitTemplate) errorTemplate).getConnectionFactory());
        }
    }

}

mhewedy avatar Jan 04 '22 20:01 mhewedy

Appreciate your response on this ....

mhewedy avatar Jan 16 '22 16:01 mhewedy

I have the same issues, any updates on this?

gabrielamaciac avatar Sep 04 '23 06:09 gabrielamaciac

@mhewedy, thanks for sharing your workaround, we're starting to use it.

@rwanderc you've picked this up but never got time to work on it, I assume. What's the status here?

berndgoetz avatar Sep 21 '23 08:09 berndgoetz

For me the workaround did not work. However, I did manage to solve it similar to what @mhewedy did, by changing the configuration on the second SimpleRabbitListenerContainerFactory bean.

gabrielamaciac avatar Sep 21 '23 10:09 gabrielamaciac