eventmesh icon indicating copy to clipboard operation
eventmesh copied to clipboard

RabbitMQ durable error when consuming

Open brampurnot opened this issue 1 year ago • 10 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

Environment

Mac

EventMesh version

master

What happened

I'm the event mesh with rabbitMQ as the store. Producing messages is working fine and it nicely appears in my RabbitMq dashboard. However when I try to consume messages via Apache Event Mesh (HTTP API call), then I'm seeing this error: 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]

Initially I had the RabbitMQ exchange configured as durable but I changed it to transient. However the problem is still there. Here is the config in RabbitMQ: Screenshot 2023-11-30 at 14 07 38

And the queue: Screenshot 2023-11-30 at 14 07 47

Anyone faced this issue before? I tried with multiple different settings but still no luck.

How to reproduce

Configure RabbitMQ as store. Consume a message via an HTTP API call.

Debug logs

2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0]

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

brampurnot avatar Nov 30 '23 13:11 brampurnot

Could you please provide the code for your producer, consumer, and configuration files?

pandaapo avatar Nov 30 '23 14:11 pandaapo

Below you can find the config files: config files.zip

I'm currently trying it out directly in Postman by executing a call to product the message (which works fine) and one for consuming it. It fails at the consuming part.

brampurnot avatar Nov 30 '23 14:11 brampurnot

What is the consumer URL you use in Postman?

pandaapo avatar Nov 30 '23 14:11 pandaapo

https:///eventmesh/subscribe/local

This is the body I'm using: { "url": "https://webhook.site/6ab3a4ba-34a7-47b3-9ffd-fdea2576f2fe", "consumerGroup": "TEST-GROUP", "topic": [ { "mode": "CLUSTERING", "topic": "eventmesh.default", "type": "ASYNC" } ] }

brampurnot avatar Nov 30 '23 14:11 brampurnot

I want to reproduce the error you found, but after my subscription request was sent, it kept reporting the following error. Tracing this error, I found that the request body data could not be obtained by EventMesh. This confused me.

[eventMesh-clientManage-1] ERROR org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService - eventMesh protocol[body] error
java.lang.RuntimeException: eventMesh protocol[body] error
	at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.sendErrorResponse(HandlerService.java:362) [main/:?]
	at org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.handler(LocalSubscribeEventProcessor.java:97) [main/:?]
	at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]

The subscription request and request body I set in Postman are the same as yours.

pandaapo avatar Dec 01 '23 17:12 pandaapo

Ok so I got some good news. I was able to get it to work. The exchange/queue wasn't created automatically so I created it mysefl. Something must be wrong when doing so because that's where the error came from.

I discovered that the exchange/queue gets created automatically after sending a first message. And with that exchange/queue, there is no issue. I'm looking through it now to see if I can find out what parameter was causing it but they look identical at the moment.

Bram

brampurnot avatar Dec 01 '23 18:12 brampurnot

My exchange/queue is automatically created, but it reports the error I mentioned above. Now you can run it normally. I want to know if your subscription request in Postman is still the same as before. And what does your publish request in Postman look like?

pandaapo avatar Dec 02 '23 00:12 pandaapo

This is the publish request: curl --location 'https://<k8s-cluster>/eventmesh/publish/eventmesh.default' \ --header 'Content-Type: application/json' \ --header 'Accept: application/json' \ --data '{ "id": "1b080838-2976-493d-897f-07803944f4d4", "specversion": "1.0", "source": "https://demo.synpase.com/workflow-start-event", "type": "com.synapse.demo/workflow-start-event", "datacontenttype": "application/json", "customerid": "CUSTOMER-1", "subject": "TEST-TOPIC", "data": { "customer": { "id": "customer1", "name": "John Doe", "SSN": 123456, "yearlyIncome": 50000, "address": "123 MyLane, MyCity, MyCountry", "employer": "MyCompany" } } }'

Here is the subscription: curl --location 'https://<k8s-cluster>/eventmesh/subscribe/local' \ --header 'Content-Type: application/json' \ --data '{ "url": "https://webhook.site/8cc8f0b6-63c6-45b2-a829-4f2df7a09300", "consumerGroup": "TEST-GROUP", "topic": [ { "mode": "CLUSTERING", "topic": "eventmesh.default", "type": "ASYNC" } ] }'

brampurnot avatar Dec 02 '23 07:12 brampurnot

nobody solve this problem?

9997766 avatar Mar 25 '24 06:03 9997766

It has been 90 days since the last activity on this issue. Apache EventMesh values the voices of the community. Please don't hesitate to share your latest insights on this matter at any time, as the community is more than willing to engage in discussions regarding the development and optimization directions of this feature.

If you feel that your issue has been resolved, please feel free to close it. Should you have any additional information to share, you are welcome to reopen this issue.

github-actions[bot] avatar Jun 23 '24 18:06 github-actions[bot]