kafka producer task shutdown , but MQ listener is still running
I have encountered an issue where the KafkaProducer task failed, and shutdown, but the MQ listener (MQSourceTask) is still working , and consuming the messages from MQ, so basically the messages are being lost, because those are not published to Kafka.
Following are the logs when producer failed:
[Worker-061844fa207a18324] [2022-05-31 12:50:15,912] INFO [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-061844fa207a18324] [2022-05-31 12:50:15,913] INFO [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-061844fa207a18324] [2022-05-31 12:50:15,913] ERROR [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509) [Worker-061844fa207a18324] [2022-05-31 12:50:15,913] ERROR [CFAckNackMQtoKafka-connector|task-1] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191) [Worker-061844fa207a18324] java.lang.IllegalStateException: Cannot perform operation after producer has been closed [Worker-061844fa207a18324] at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:893) [Worker-061844fa207a18324] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902) [Worker-061844fa207a18324] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:368) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) [Worker-061844fa207a18324] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) [Worker-061844fa207a18324] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [Worker-061844fa207a18324] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [Worker-061844fa207a18324] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [Worker-061844fa207a18324] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [Worker-061844fa207a18324] at java.base/java.lang.Thread.run(Thread.java:829) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] [Producer clientId=connector-producer-CFAckNackMQtoKafka-connector-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1205) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678) [Worker-061844fa207a18324] [2022-05-31 12:50:15,914] INFO [CFAckNackMQtoKafka-connector|task-1] App info kafka.producer for connector-producer-CFAckNackMQtoKafka-connector-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83) [Worker-061844fa207a18324] [2022-05-31 12:50:26,780] INFO [CFAckNackMQtoKafka-connector|task-0|offsets] WorkerSourceTask{id=CFAckNackMQtoKafka-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
Hi Team, anyone who can address this issue? The message is consumed from IBM MQ, but its not delivered to Kafka, so it violates the "Atleast once" guaranteee
Any updates? Look like there are several isssues related this "sticky MQ connection" one. I requested an IBM support PMR to be openned, this issue has a major impact as it "steals" MQ messages from the sourced queue until we force the whole Kafka Connect cluster to shutdown.
Hello @harry7ster and @pierre-degrandmaison,
I sincerely apologize for the prolonged delay in resolving this issue. I want to assure you that we are actively investigating the problem and working towards a solution. Rest assured, we will provide an update as soon as we are able to resolve it.
Thank you for your patience and understanding.
Hi there, I've been experiencing the same issue. I happens randomly, and seems to occur at the start of getting a batch of messages from MQ after not receiving anything for some time. It then fails, but recover and begins to pull messages, but seems to have lost the beginning messages (This is just a guess though).
Here are some logs that might be useful: [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,897] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,898] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,898] ERROR [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,899] ERROR [MQ-SOURCE-HOLDS-DP-MQW3|task-1] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191) [Worker-046123a3ad0e8e5d2] java.lang.IllegalStateException: Cannot perform operation after producer has been closed [Worker-046123a3ad0e8e5d2] at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:893) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:368) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) [Worker-046123a3ad0e8e5d2] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [Worker-046123a3ad0e8e5d2] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [Worker-046123a3ad0e8e5d2] at java.base/java.lang.Thread.run(Thread.java:829) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,899] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] [Producer clientId=connector-producer-MQ-SOURCE-HOLDS-DP-MQW3-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1205) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,900] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,900] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,900] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:37,901] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] App info kafka.producer for connector-producer-MQ-SOURCE-HOLDS-DP-MQW3-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:38,147] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:38,152] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:48,837] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:48,842] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:52,058] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:42:52,064] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:01,230] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:01,244] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:04,564] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:04,583] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:13,671] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:13,711] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:16,959] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:16,976] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:26,069] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:26,072] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,347] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,348] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,348] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,359] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,411] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-1} Finished commitOffsets successfully in 63 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,411] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,412] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:29,415] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-0|offsets] WorkerSourceTask{id=MQ-SOURCE-HOLDS-DP-MQW3-0} Finished commitOffsets successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583) [Worker-046123a3ad0e8e5d2] [2023-06-08 14:43:38,468] INFO [MQ-SOURCE-HOLDS-DP-MQW3|task-1] Polling for records (com.ibm.eventstreams.connect.mqsource.MQSourceTask:120)
@Joel-hanson is it fixed now