Retry section under RabbitProperties not being configured
I've noticed that the Retry section in the RabbitProperties are not being configured (In fact, the whole Listener section), as it is configured in AbstractRabbitListenerContainerFactoryConfigurer but is not considered in the multi rabbit when creating SimpleRabbitListenerContainerFactory here: https://github.com/freenowtech/spring-multirabbit/blob/77cbe04386dad8bf1538d8036df9088c77583172/spring-multirabbit/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfiguration.java#L191
So the question is, provided that ... how can I configure the retry for different ContainerFactory's of multi rabbit?
I've fixed it with this full-of-hacks solution ... your advice appreciated
- I've applied a hack to configure the Listener part in RabbitProperties for each
SimpleRabbitListenerContainerFactory - Then, I've applied a hack for the MessageRecoverer (hence we need the key to hold the binding before sending the message to the dl exchange)
@Configuration
@ConditionalOnClass(RabbitTemplate.class)
public class RabbitConfig {
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Component
@RequiredArgsConstructor
private static class MultiRabbitCustomizer implements InitializingBean, ApplicationContextAware {
private ApplicationContext applicationContext;
private final RabbitTemplate rabbitTemplate;
private final MessageConverter messageConverter;
private final MultiRabbitProperties multiRabbitProperties;
private final SimpleRoutingConnectionFactory multiRabbitConnectionFactory;
@Override
public void afterPropertiesSet() {
multiRabbitProperties.getConnections().forEach((key, props) -> {
var bean = applicationContext.getBean(key, SimpleRabbitListenerContainerFactory.class);
ConnectionFactory targetConnectionFactory = multiRabbitConnectionFactory.getTargetConnectionFactory(key);
var multiRabbitConfigurer = new MultiRabbitConfigurer(props.getListener().getSimple(), targetConnectionFactory);
var messageRecoverer = new MultiRabbitRepublishMessageRecoverer(rabbitTemplate, "general_dl_exchange", key);
multiRabbitConfigurer.setMessageRecoverer(messageRecoverer);
multiRabbitConfigurer.setMessageConverter(messageConverter);
multiRabbitConfigurer.configure(bean, multiRabbitConnectionFactory);
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
@RequiredArgsConstructor
private static class MultiRabbitConfigurer
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
private final RabbitProperties.SimpleContainer config;
private final ConnectionFactory connectionFactory;
@Override
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory ignored) {
PropertyMapper map = PropertyMapper.get();
configure(factory, connectionFactory, config);
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
}
@Override
public void setMessageConverter(MessageConverter messageConverter) {
super.setMessageConverter(messageConverter);
}
@Override
public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
super.setMessageRecoverer(messageRecoverer);
}
}
private static class MultiRabbitRepublishMessageRecoverer extends RepublishMessageRecoverer {
private final String key;
public MultiRabbitRepublishMessageRecoverer(RabbitTemplate rabbitTemplate, String errorExchange, String key) {
super(rabbitTemplate, errorExchange);
this.key = key;
}
@Override
protected void doSend(@Nullable String exchange, String routingKey, Message message) {
SimpleResourceHolder.bind(((RabbitTemplate) errorTemplate).getConnectionFactory(), key);
// routing key is: "error.<original queue name>"
super.doSend(exchange, routingKey + message.getMessageProperties().getConsumerQueue(), message);
SimpleResourceHolder.unbind(((RabbitTemplate) errorTemplate).getConnectionFactory());
}
}
}
Appreciate your response on this ....
I have the same issues, any updates on this?
@mhewedy, thanks for sharing your workaround, we're starting to use it.
@rwanderc you've picked this up but never got time to work on it, I assume. What's the status here?
For me the workaround did not work. However, I did manage to solve it similar to what @mhewedy did, by changing the configuration on the second SimpleRabbitListenerContainerFactory bean.