kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Data loss when tasks write to same object during rebalance

Open RickardCardell opened this issue 5 years ago • 3 comments

We use distributed kafka-connect and this S3-sink, with roll setting rotate.schedule.interval.ms=1800000 (30min)

Setup

  • Kafka connect: Confluent version 4.1.1 and now 5.1.2 (Kafka 1.1.1 and now Kafka 2.1.1)
  • Kafka brokers: version 0.11 and now 1.1.1
  • +100 S3-sink connectors, 1500tasks in total
  • 7-18 EC2 nodes, auto-scaling

Issue

In certain rather rare circumstances, often when a rebalance occurs during rolling, we get data loss. Since we only have INFO-logs from these events, we have a hard time to trace the exact cause.

Scenario 1

Basically this is one of the scenarios:

t1: writes to S3 offsets 0-30 to object foo
t1: commits synchronously to Kafka: next offset 31
t2: writes to s3 offsets 0-20 to same object foo
t2: commits asynchronously to Kafka: next offset 20
t2: commit failed

so in S3 we have offset 0-20 but next iteration will start with 31. We have data loss between 21-30

We haven't really understood why t2 is starting with offset 0 when it should start with the offset committed by t1. Maybe the t1-commit hasn't completed when t2 starts?

These are the actual log events, but here are there 3 tasks writing to the same object:

2019-04-24 16:00:11,621  INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708250
2019-04-24 16:00:11,621  INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=foobar_events_v1-2} Committing offsets asynchronously using sequence number 69: {foobar-events-0=OffsetAndMetadata{offset=708250, leaderEpoch=null, metadata=''}}
2019-04-24 17:00:02,539  INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708477
2019-04-24 17:00:06,211  INFO org.apache.kafka.connect.runtime.WorkerSinkTask:332 - WorkerSinkTask{id=foobar_events_v1-3} Committing offsets synchronously using sequence number 48: {foobar-events-0=OffsetAndMetadata{offset=708477, leaderEpoch=null, metadata=''}}
2019-04-24 17:00:23,316  INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708447
2019-04-24 17:00:31,836  INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=foobar_events_v1-3} Committing offsets asynchronously using sequence number 66: {foobar-events-2=OffsetAndMetadata{offset=864414, leaderEpoch=null, metadata=''}, foobar-events-1=OffsetAndMetadata{offset=704295, leaderEpoch=null, metadata=''}, foobar-events-0=OffsetAndMetadata{offset=708447, leaderEpoch=null, metadata=''}}
2019-04-24 17:01:04,066  INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for foobar-events-0 is 708389
2019-04-24 17:01:12,604  INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=foobar_events_v1-0} Committing offsets asynchronously using sequence number 71: {foobar-events-3=OffsetAndMetadata{offset=957470, leaderEpoch=null, metadata=''}, foobar-events-2=OffsetAndMetadata{offset=864349, leaderEpoch=null, metadata=''}, foobar-events-1=OffsetAndMetadata{offset=704243, leaderEpoch=null, metadata=''}, foobar-events-0=OffsetAndMetadata{offset=708389, leaderEpoch=null, metadata=''}, foobar-events-5=OffsetAndMetadata{offset=726798, leaderEpoch=null, metadata=''}, foobar-events-4=OffsetAndMetadata{offset=704686, leaderEpoch=null, metadata=''}}
2019-04-24 17:01:12,604 ERROR org.apache.kafka.connect.runtime.WorkerSinkTask:261 - WorkerSinkTask{id=foobar_events_v1-0} Commit of offsets threw an unexpected exception for sequence number 71: {foobar-events-3=OffsetAndMetadata{offset=957470, leaderEpoch=null, metadata=''}, foobar-events-2=OffsetAndMetadata{offset=864349, leaderEpoch=null, metadata=''}, foobar-events-1=OffsetAndMetadata{offset=704243, leaderEpoch=null, metadata=''}, foobar-events-0=OffsetAndMetadata{offset=708389, leaderEpoch=null, metadata=''}, foobar-events-5=OffsetAndMetadata{offset=726798, leaderEpoch=null, metadata=''}, foobar-events-4=OffsetAndMetadata{offset=704686, leaderEpoch=null, metadata=''}}
at java.lang.Thread.run(Thread.java:748)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:433)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:364)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1479)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:584)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:617)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:778)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

In S3 we have the relevant objects for this partition:

2019042416/foobar-events+0+0000708250.json.gz
Last modified: 2019-04-24 17:01:04
Contains offsets: 708250-708388

2019042417/foobar-events+0+0000708477.json.gz
Last modified: 2019-04-24 17:30:24
Contains offsets: 708477-708586

Missing offsets: 708389-708476

Scenario 2

Here's another, but rather different scenario, although same effect:

t1: writes to S3 offsets 0-30
t2: writes to s3 offsets 0-20
t2: commits asynchronously to Kafka: next offset 20
t1: commits synchronously to Kafka: next offset 31
t2: commit timeout

Log:

2019-05-24 21:30:14,191 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for baz-events-34 is 6719227
2019-05-24 21:30:33,673 INFO io.confluent.connect.s3.TopicPartitionWriter:491 - Files committed to S3. Target commit offset for baz-events-34 is 6711912
2019-05-24 21:30:38,380 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:346 - WorkerSinkTask{id=baz_events_v1-6} Committing offsets asynchronously using sequence number 40:  {{baz-events-34=OffsetAndMetadata{offset=6711912, leaderEpoch=null, metadata=''}}
2019-05-24 21:30:42,741 INFO org.apache.kafka.connect.runtime.WorkerSinkTask:332 - WorkerSinkTask{id=baz_events_v1-0} Committing offsets synchronously using sequence number 12: {baz-events-34=OffsetAndMetadata{offset=6719227, leaderEpoch=null, metadata=''}
2019-05-24 21:31:11,849 WARN org.apache.kafka.connect.runtime.WorkerSinkTask:219 - WorkerSinkTask{id=baz_events_v1-6} Commit of offsets timed out

In S3 we have the relevant objects for this partition:

2019052421/baz-events+34+0006711429.json.gz
Last modified: 2019-05-24 21:30:31
Contains offsets: 6711429-6711911

2019052421/baz-events+34+0006719227.json.gz
Last modified: 2019-05-24 22:00:01
Contains offsets: 6719227-6721298

Missing offsets: 6711912-6719226

We can restore all events that has gone missing from another system. Also we have now enabled S3 versioning which should let us restore the objects easily but we are looking for a solution which requires less maintenance. I can submit more log events upon request.

RickardCardell avatar Jun 11 '19 14:06 RickardCardell

Thanks for reporting @RickardCardell ! Based on the versions used, do you think you might have been hitting: https://issues.apache.org/jira/browse/KAFKA-5731 ? Is this still an issue with AK 1.1.1?

I understand that debug logs might be too many during a long period of time, but maybe you could enable them selectively for classes, such as: org.apache.kafka.connect.runtime.WorkerSinkTask or others in org.apache.kafka.connect.runtime.distributed

This is an initial thought. I'll keep looking based on the logs you've shared.

kkonstantine avatar Jun 11 '19 19:06 kkonstantine

KAFKA-5731 seems resolved for the version we use, but if it weren't resolved I would have said it's a likely cause :)

We have already enabled Debug log but no data gaps after it was enabled. We enabled it for org.apache.kafka.connect.runtime.WorkerSinkTask and io.confluent.connect.s3.TopicPartitionWriter but I will look into if we can add some more, i.e some classes of org.apache.kafka.connect.runtime.distributed. We have made some efforts to trigger the error, but haven't succeeded. Thanks for the pointers!

Also I updated scenario 2 since I found that the async commit timed out.

RickardCardell avatar Jun 12 '19 10:06 RickardCardell

I am having issue of losing datal when sending to s3 buckets.I have no exception in the logs, except for WARN message Commit of offsets timed out in the logs. How to figure out what is causing the loss of data in S3 connectors?

sxganapa avatar Aug 21 '20 20:08 sxganapa