aop icon indicating copy to clipboard operation
aop copied to clipboard

Fix - NPE when recover route from queue properties.

Open casuallc opened this issue 2 years ago • 2 comments

Motivation

PersistentQueue.routers is empty before add queue.

PersistentQueue

amqpQueueProperties.stream().forEach((amqpQueueProperty) -> {
            // recover exchange
            String exchangeName = amqpQueueProperty.getExchangeName();
            Set<String> bindingKeys = amqpQueueProperty.getBindingKeys();
            Map<String, Object> arguments = amqpQueueProperty.getArguments();
            CompletableFuture<AmqpExchange> amqpExchangeCompletableFuture =
                    exchangeContainer.asyncGetExchange(namespaceName, exchangeName, false, null);
            amqpExchangeCompletableFuture.whenComplete((amqpExchange, throwable) -> {
                AmqpMessageRouter messageRouter = AbstractAmqpMessageRouter.
                        generateRouter(AmqpExchange.Type.value(amqpQueueProperty.getType().toString()));
                messageRouter.setQueue(this);
                messageRouter.setExchange(amqpExchange);
                messageRouter.setArguments(arguments);
                messageRouter.setBindingKeys(bindingKeys);
                amqpExchange.addQueue(this).thenAccept(__ -> routers.put(exchangeName, messageRouter));
            });
        });

PersistentExchange

    @Override
    public CompletableFuture<Void> addQueue(AmqpQueue queue) {
        queues.add(queue);
        if (exchangeType == Type.Direct) {
            for (String bindingKey : queue.getRouter(exchangeName).getBindingKey()) {
                bindingKeyQueueMap.compute(bindingKey, (k, v) -> {
                    if (v == null) {
                        Set<AmqpQueue> set = new HashSet<>();
                        set.add(queue);
                        return set;
                    } else {
                        v.add(queue);
                        return v;
                    }
                });
            }
        }
        updateExchangeProperties();
        return createCursorIfNotExists(queue.getName()).thenApply(__ -> null);
    }

NPE queue.getRouter(exchangeName) is null

Modifications

Verifying this change

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below and label this PR (if you have committer privilege).

Need to update docs?

  • [x] no-need-doc

casuallc avatar Feb 22 '23 12:02 casuallc

can you describe how to recreate it?

GhostBoyBoy avatar Feb 22 '23 12:02 GhostBoyBoy

Test setp

  • Use rabbitmq client create exchange, queue and then bind.
  • Send and receive some message.
  • Restart amqp broker.
  • Use rabbitmq client connect broker to receive message, but not declare queue, exchange.
  • Use rabbitmq client connect broker to send message, but not declare queue, exchange.

Expect result Client can receive message.

Currently result Client can not receive message. If you add try/catch on code PersistentQueue -> recoverRoutersFromQueueProperties, you will see a NPE in amqpExchange.addQueue(this).thenAccept(__ -> routers.put(exchangeName, messageRouter));.

The reason is PersistentExchange -> addQueue need PersistentQueue.routers, but PersistentQueue.routers is empty in recovery logic. @mingyifei

casuallc avatar Feb 22 '23 13:02 casuallc