aop
aop copied to clipboard
Fix - NPE when recover route from queue properties.
Motivation
PersistentQueue.routers is empty before add queue
.
PersistentQueue
amqpQueueProperties.stream().forEach((amqpQueueProperty) -> {
// recover exchange
String exchangeName = amqpQueueProperty.getExchangeName();
Set<String> bindingKeys = amqpQueueProperty.getBindingKeys();
Map<String, Object> arguments = amqpQueueProperty.getArguments();
CompletableFuture<AmqpExchange> amqpExchangeCompletableFuture =
exchangeContainer.asyncGetExchange(namespaceName, exchangeName, false, null);
amqpExchangeCompletableFuture.whenComplete((amqpExchange, throwable) -> {
AmqpMessageRouter messageRouter = AbstractAmqpMessageRouter.
generateRouter(AmqpExchange.Type.value(amqpQueueProperty.getType().toString()));
messageRouter.setQueue(this);
messageRouter.setExchange(amqpExchange);
messageRouter.setArguments(arguments);
messageRouter.setBindingKeys(bindingKeys);
amqpExchange.addQueue(this).thenAccept(__ -> routers.put(exchangeName, messageRouter));
});
});
PersistentExchange
@Override
public CompletableFuture<Void> addQueue(AmqpQueue queue) {
queues.add(queue);
if (exchangeType == Type.Direct) {
for (String bindingKey : queue.getRouter(exchangeName).getBindingKey()) {
bindingKeyQueueMap.compute(bindingKey, (k, v) -> {
if (v == null) {
Set<AmqpQueue> set = new HashSet<>();
set.add(queue);
return set;
} else {
v.add(queue);
return v;
}
});
}
}
updateExchangeProperties();
return createCursorIfNotExists(queue.getName()).thenApply(__ -> null);
}
NPE
queue.getRouter(exchangeName) is null
Modifications
Verifying this change
Does this pull request potentially affect one of the following parts:
If yes
was chosen, please highlight the changes
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
Documentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
- [x]
no-need-doc
can you describe how to recreate it?
Test setp
- Use rabbitmq client create exchange, queue and then bind.
- Send and receive some message.
- Restart amqp broker.
- Use rabbitmq client connect broker to receive message, but not declare queue, exchange.
- Use rabbitmq client connect broker to send message, but not declare queue, exchange.
Expect result Client can receive message.
Currently result
Client can not receive message.
If you add try/catch on code PersistentQueue -> recoverRoutersFromQueueProperties
, you will see a NPE in amqpExchange.addQueue(this).thenAccept(__ -> routers.put(exchangeName, messageRouter));
.
The reason is PersistentExchange -> addQueue
need PersistentQueue.routers
, but PersistentQueue.routers
is empty in recovery logic.
@mingyifei