kafka-connect-mq-source icon indicating copy to clipboard operation
kafka-connect-mq-source copied to clipboard

kafka producer task shutdown , but MQ listener is still running

Open harry7ster opened this issue 3 years ago • 1 comments

I have encountered an issue where the KafkaProducer task failed, and shutdown, but the MQ listener (MQSourceTask) is still working , and consuming the messages from MQ, so basically the messages are being lost, because those are not published to Kafka.

Following are the logs when producer failed:

[Worker-061844fa207a18324] [2022-05-31 12:50:15,912] INFO [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-061844fa207a18324] [2022-05-31 12:50:15,913] INFO [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-061844fa207a18324] [2022-05-31 12:50:15,913] ERROR [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509) [Worker-061844fa207a18324] [2022-05-31 12:50:15,913] ERROR [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191) [Worker-061844fa207a18324] java.lang.IllegalStateException: Cannot perform operation after producer has been closed [Worker-061844fa207a18324] at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:893) [Worker-061844fa207a18324] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902) [Worker-061844fa207a18324] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:368) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) [Worker-061844fa207a18324] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [Worker-061844fa207a18324] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [Worker-061844fa207a18324] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [Worker-061844fa207a18324] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [Worker-061844fa207a18324] at java.base/java.lang.Thread.run(Thread.java:829) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] [Producer clientId=connector-producer-CFAckNackMQtoKafka-connector-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1205) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] App info kafka.producer for connector-producer-CFAckNackMQtoKafka-connector-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83) [Worker-061844fa207a18324] [2022-05-31 12:50:26,780] INFO [CFAckNackMQtoKafka-connector|task-0|offsets] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)

harry7ster avatar Jun 01 '22 14:06 harry7ster

Hi Team, anyone who can address this issue? The message is consumed from IBM MQ, but its not delivered to Kafka, so it violates the "Atleast once" guaranteee

harry7ster avatar Jun 10 '22 17:06 harry7ster

Any updates? Look like there are several isssues related this "sticky MQ connection" one. I requested an IBM support PMR to be openned, this issue has a major impact as it "steals" MQ messages from the sourced queue until we force the whole Kafka Connect cluster to shutdown.

pierre-degrandmaison avatar May 02 '23 19:05 pierre-degrandmaison

Hello @harry7ster and @pierre-degrandmaison,

I sincerely apologize for the prolonged delay in resolving this issue. I want to assure you that we are actively investigating the problem and working towards a solution. Rest assured, we will provide an update as soon as we are able to resolve it.

Thank you for your patience and understanding.

Joel-hanson avatar May 19 '23 09:05 Joel-hanson

Hi there, I've been experiencing the same issue. I happens randomly, and seems to occur at the start of getting a batch of messages from MQ after not receiving anything for some time. It then fails, but recover and begins to pull messages, but seems to have lost the beginning messages (This is just a guess though).

Here are some logs that might be useful: [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,897] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,898] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,898] ERROR [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,899] ERROR [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191) [Worker-046123a3ad0e8e5d2] java.lang.IllegalStateException: Cannot perform operation after producer has been closed [Worker-046123a3ad0e8e5d2] at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:893) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:368) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [Worker-046123a3ad0e8e5d2] at java.base/java.lang.Thread.run(Thread.java:829) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,899] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] [Producer clientId=connector-producer-MQ-SOURCE-HOLDS-DP-MQW3-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1205) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,900] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,900] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,900] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,901] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] App info kafka.producer for connector-producer-MQ-SOURCE-HOLDS-DP-MQW3-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:38,147] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:38,152] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:48,837] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:48,842] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:52,058] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:52,064] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:01,230] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:01,244] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:04,564] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:04,583] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:13,671] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:13,711] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:16,959] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:16,976] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:26,069] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:26,072] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,347] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,348] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,348] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,359] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,411] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Finished commitOffsets successfully in 63 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,411] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,412] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,415] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-0} Finished commitOffsets successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:38,468] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120)

cjh-cloud avatar Jun 12 '23 22:06 cjh-cloud

@Joel-hanson is it fixed now

AishD3 avatar Sep 18 '23 11:09 AishD3