kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Data loss when tasks write to same object during rebalance
We use distributed kafka-connect and this S3-sink, with roll setting rotate.schedule.interval.ms=1800000 (30min)
Setup
- Kafka connect: Confluent version 4.1.1 and now 5.1.2 (Kafka 1.1.1 and now Kafka 2.1.1)
- Kafka brokers: version 0.11 and now 1.1.1
- +100 S3-sink connectors, 1500tasks in total
- 7-18 EC2 nodes, auto-scaling
Issue
In certain rather rare circumstances, often when a rebalance occurs during rolling, we get data loss. Since we only have INFO-logs from these events, we have a hard time to trace the exact cause.
Scenario 1
Basically this is one of the scenarios:
t1: writes to S3 offsets 0-30 to object foo
t1: commits synchronously to Kafka: next offset 31
t2: writes to s3 offsets 0-20 to same object foo
t2: commits asynchronously to Kafka: next offset 20
t2: commit failed
so in S3 we have offset 0-20 but next iteration will start with 31. We have data loss between 21-30
We haven't really understood why t2 is starting with offset 0 when it should start with the offset committed by t1. Maybe the t1-commit hasn't completed when t2 starts?
These are the actual log events, but here are there 3 tasks writing to the same object:
2019-04-24 16:00:11,621 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708250
2019-04-24 16:00:11,621 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=foobar_events_v1-2} Committing offsets asynchronously using sequence number 69: {foobar-events-0=OffsetAndMetadata{offset=708250, leaderEpoch=null, metadata=''}}
2019-04-24 17:00:02,539 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708477
2019-04-24 17:00:06,211 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:332 - WorkerSinkTask{id=foobar_events_v1-3} Committing offsets synchronously using sequence number 48: {foobar-events-0=OffsetAndMetadata{offset=708477, leaderEpoch=null, metadata=''}}
2019-04-24 17:00:23,316 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708447
2019-04-24 17:00:31,836 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=foobar_events_v1-3} Committing offsets asynchronously using sequence number 66: {foobar-events-2=OffsetAndMetadata{offset=864414, leaderEpoch=null, metadata=''}, foobar-events-1=OffsetAndMetadata{offset=704295, leaderEpoch=null, metadata=''}, foobar-events-0=OffsetAndMetadata{offset=708447, leaderEpoch=null, metadata=''}}
2019-04-24 17:01:04,066 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708389
2019-04-24 17:01:12,604 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=foobar_events_v1-0} Committing offsets asynchronously using sequence number 71: {foobar-events-3=OffsetAndMetadata{offset=957470, leaderEpoch=null, metadata=''}, foobar-events-2=OffsetAndMetadata{offset=864349, leaderEpoch=null, metadata=''}, foobar-events-1=OffsetAndMetadata{offset=704243, leaderEpoch=null, metadata=''}, foobar-events-0=OffsetAndMetadata{offset=708389, leaderEpoch=null, metadata=''}, foobar-events-5=OffsetAndMetadata{offset=726798, leaderEpoch=null, metadata=''}, foobar-events-4=OffsetAndMetadata{offset=704686, leaderEpoch=null, metadata=''}}
2019-04-24 17:01:12,604 ERROR org.apache.kafka.connect.runtime.WorkerSinkTask:261 - WorkerSinkTask{id=foobar_events_v1-0} Commit of offsets threw an unexpected exception for sequence number 71: {foobar-events-3=OffsetAndMetadata{offset=957470, leaderEpoch=null, metadata=''}, foobar-events-2=OffsetAndMetadata{offset=864349, leaderEpoch=null, metadata=''}, foobar-events-1=OffsetAndMetadata{offset=704243, leaderEpoch=null, metadata=''}, foobar-events-0=OffsetAndMetadata{offset=708389, leaderEpoch=null, metadata=''}, foobar-events-5=OffsetAndMetadata{offset=726798, leaderEpoch=null, metadata=''}, foobar-events-4=OffsetAndMetadata{offset=704686, leaderEpoch=null, metadata=''}}
at java.lang.Thread.run(Thread.java:748)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:433)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:364)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1479)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:584)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:617)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:778)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
In S3 we have the relevant objects for this partition:
2019042416/foobar-events+0+0000708250.json.gz
Last modified: 2019-04-24 17:01:04
Contains offsets: 708250-708388
2019042417/foobar-events+0+0000708477.json.gz
Last modified: 2019-04-24 17:30:24
Contains offsets: 708477-708586
Missing offsets: 708389-708476
Scenario 2
Here's another, but rather different scenario, although same effect:
t1: writes to S3 offsets 0-30
t2: writes to s3 offsets 0-20
t2: commits asynchronously to Kafka: next offset 20
t1: commits synchronously to Kafka: next offset 31
t2: commit timeout
Log:
2019-05-24 21:30:14,191 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for baz-events-34 is 6719227
2019-05-24 21:30:33,673 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for baz-events-34 is 6711912
2019-05-24 21:30:38,380 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=baz_events_v1-6} Committing offsets asynchronously using sequence number 40: {{baz-events-34=OffsetAndMetadata{offset=6711912, leaderEpoch=null, metadata=''}}
2019-05-24 21:30:42,741 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:332 - WorkerSinkTask{id=baz_events_v1-0} Committing offsets synchronously using sequence number 12: {baz-events-34=OffsetAndMetadata{offset=6719227, leaderEpoch=null, metadata=''}
2019-05-24 21:31:11,849 WARN org.apache.kafka.connect.runtime.WorkerSinkTask:219 - WorkerSinkTask{id=baz_events_v1-6} Commit of offsets timed out
In S3 we have the relevant objects for this partition:
2019052421/baz-events+34+0006711429.json.gz
Last modified: 2019-05-24 21:30:31
Contains offsets: 6711429-6711911
2019052421/baz-events+34+0006719227.json.gz
Last modified: 2019-05-24 22:00:01
Contains offsets: 6719227-6721298
Missing offsets: 6711912-6719226
We can restore all events that has gone missing from another system. Also we have now enabled S3 versioning which should let us restore the objects easily but we are looking for a solution which requires less maintenance. I can submit more log events upon request.
Thanks for reporting @RickardCardell ! Based on the versions used, do you think you might have been hitting: https://issues.apache.org/jira/browse/KAFKA-5731 ? Is this still an issue with AK 1.1.1?
I understand that debug logs might be too many during a long period of time, but maybe you could enable them selectively for classes, such as: org.apache.kafka.connect.runtime.WorkerSinkTask
or others in org.apache.kafka.connect.runtime.distributed
This is an initial thought. I'll keep looking based on the logs you've shared.
KAFKA-5731 seems resolved for the version we use, but if it weren't resolved I would have said it's a likely cause :)
We have already enabled Debug log but no data gaps after it was enabled. We enabled it for org.apache.kafka.connect.runtime.WorkerSinkTask
and io.confluent.connect.s3.TopicPartitionWriter
but I will look into if we can add some more, i.e some classes of org.apache.kafka.connect.runtime.distributed
. We have made some efforts to trigger the error, but haven't succeeded.
Thanks for the pointers!
Also I updated scenario 2 since I found that the async commit timed out.
I am having issue of losing datal when sending to s3 buckets.I have no exception in the logs, except for WARN message Commit of offsets timed out in the logs. How to figure out what is causing the loss of data in S3 connectors?