changqing
changqing
### Motivation Add queue to exchange when recover from topic. Think that - Receive cmd `AmqpChannel -> receiveBasicPublish`. - Get or create queue `QueueContainer -> asyncGetQueue`. - Recover from topic...
Descriptions of the changes in this PR: #2784
### Motivation Release bytebuf when exception occur. ### Modifications ### Verifying this change ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please...
### Motivation New add queue can only receive message that send after queue add to exchange. ### Modifications ### Verifying this change ### Does this pull request potentially affect one...
### Motivation Add permit if receive nack command. The `availablePermits` will reduce when resend message to consumer. ### Modifications ### Verifying this change ### Does this pull request potentially affect...
### Motivation ByteBuf not released. ### Modifications ### Verifying this change ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the...
### Motivation Check if queue is null.  ### Modifications ### Verifying this change ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen,...
### Motivation `PersistentQueue.routers is empty before add queue`. **PersistentQueue** ``` amqpQueueProperties.stream().forEach((amqpQueueProperty) -> { // recover exchange String exchangeName = amqpQueueProperty.getExchangeName(); Set bindingKeys = amqpQueueProperty.getBindingKeys(); Map arguments = amqpQueueProperty.getArguments(); CompletableFuture amqpExchangeCompletableFuture...
Use delay message to implement deliver message at time.
**Reproduce** - create non-paritioned topic - send message to this topic **error**  **probable reason** PulsarKafkaConsumer -> poll ``` public ConsumerRecords poll(long timeoutMillis) { try { QueueItem item = receivedMessages.poll(timeoutMillis,...