fs2-kafka icon indicating copy to clipboard operation
fs2-kafka copied to clipboard

Records are missing when second rebalance happens.

Open MrKustra94 opened this issue 2 years ago • 6 comments

Hey, during deployments we have noticed a strange issue with records consumption.

Versions

FS2-Kafka Version = 3.0.0-M7 (observed also for M4) Kafka client version = 3.2.0

Background

Let's assume that we are working with topic called topic, which has 100 partitions. Our consumer is running as a single pod (pod-1). It is consuming all 100 partitions. During Kubernetes rolling deployments another instance is created, let's call it pod-2. First rebalance is triggered, making pod-1 and pod-2 consuming 50 partitions each. Let's assume that: pod-1 is consuming partitions 0, 1, 2, 3, ...., 49 pod-2 is consuming partitions 50, 51, 52, 53, ...., 99 After few seconds, pod-1 gets shutdown and pod-2 is the only working pod. Second rebalance gets triggered and now pod-2 is consuming all partitions. What we have noticed is that for partitions, which were previously assigned, some messages are not consumed just after second re-assignment. More examples below.

Code

This issue is really hard to reproduce. It happens very rarely.

val settings =
  ConsumerSettings[F, Option[String], Array[Byte]](
    keyDeserializer = Deserializer[F, Option[String]],
    valueDeserializer = Deserializer[F, Array[Byte]],
  ).withBootstrapServers("<configuration>")
   .withGroupId("TestGroup")
   .withAutoOffsetReset("earliest")
   .withClientId("TestClientId")
   .withEnableAutoCommit(false)
   .withPollInterval(50 millis)
  //Important note: following assignment strategies are included:
 //partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

def recordProcessor(
  records: Stream[F, CommittableConsumerRecord[F, Option[String], Array[Byte]]]
): Stream[F, CommittableOffset[F]] =
  records.evalMap { record =>
    val recordFormatted =
      s"Published record: ${record.record.topic}-${record.record.partition}:${record.record.offset}"
    Sync[F].delay(println(recordFormatted)) >> Sync[F].pure(record.offset)
  }

KafkaConsumer
  .stream(settings)
  .evalTap(_.subscribeTo("topic"))
  .flatMap(_.partitionedStream.map(recordProcessor))
  .map(_
    .through(
      _.groupWithin(100, 50 millis)
        .evalMap(CommittableOffsetBatch.fromFoldable(_).commit)
    )
  )
  .parJoinUnbounded
  .scope

Example issued output

//pod-2 has been started. Consumer is being prepared...
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000028, offsetEpoch=Optional.empty, ...}
//pod-1 has been shutdown. Rebalance has started again and now all partitions are assigned to pod-2
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000032, offsetEpoch=Optional.empty, ...}
Published record: topic-59:1000037 //Offset is not moved back to 1000032, but it should, since Range (EAGER) 
// Commit happens, setting offset to 1000038.
Published record: topic-59:1000040 //Missed records: 1000038, 1000039, which exist in the topic but were skipped by consumer.
Published record: topic-59:1000041
Published record: topic-59:1000042

MrKustra94 avatar Aug 09 '22 10:08 MrKustra94

One more observation: this issue is appearing when partition has been paused just before rebalance. It is not occurring for all partitions, but if it happens, then issued partition was paused just before rebalance.

MrKustra94 avatar Aug 12 '22 07:08 MrKustra94

I think that it may be related: https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-13463

MrKustra94 avatar Aug 16 '22 20:08 MrKustra94

I think I found the potential issue. I will try to clarify it via few examples.

I've noticed that above issues are appearing when some of the partitions were paused before rebalance. In Kafka 3.2.0 there is a special INFO message logs them:

INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] The pause flag in partitions [topic-59, topic-67, ...] will be removed due to revocation.

Those partitions were then re-assigned, but if we take a look at example above:

//pod-2 has been started. Consumer is being prepared...
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000028, offsetEpoch=Optional.empty, ...}
//pod-1 has been shutdown. Rebalance has started again and now all partitions are assigned to pod-2
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000032, offsetEpoch=Optional.empty, ...}
Published record: topic-59:1000037 //Offset is not moved back to 1000032, but it should, since Range (EAGER) 
// Commit happens, setting offset to 1000038. I think they were just delivered lately due too many concurrent streams going on. That's why they appear as if they were delivered after rebalance.
Published record: topic-59:1000040 //Missed records: 1000038, 1000039, which exist in the topic but were skipped by consumer.
Published record: topic-59:1000041
Published record: topic-59:1000042

Fetching for those partitions, after rebalance, hasn't been started from 1000032, but from 1000040. This is caused by the issue mentioned below: https://issues.apache.org/jira/browse/KAFKA-13463

What I think that happened is that missing records were dropped during revoke as part of KafkaConsumerActor defined callback, too eagerly. They might have been placed there because poll has returned them even if they were paused. I prepared a potential MR with fix to that.

MrKustra94 avatar Aug 18 '22 08:08 MrKustra94

@MrKustra94 nice research! Thank you.

Also, I wonder why do we need this pause calls inside fs2-kafka. Maybe we could just get rid of them? Just use poll and that's it. What do you think @bplommer @vlovgr ?

LMnet avatar Aug 18 '22 08:08 LMnet

I think it was introduced in order to increase the general fairness and throughput on slow consumers. So that we can imitate that each partition is consumed in fair fashion. If there is a fetch request and record for it available, let's just skip fetching new records for this partition. Let's give other partitions a chance to do so. I see that it is a quite popular approach in Java/Scala wrappers on Java Kafka clients.

MrKustra94 avatar Aug 18 '22 08:08 MrKustra94

I know this issue is quite old now, but I think we're facing this same issue. We added logging that shows the last offset processed when a partition is revoked, then later on when it arrives back at the same node, the first offset processed is some amount advanced from that - and no commits were performed in between.

We also see lots of similar FS2 Kafka logs to the OP about pausing/revoking etc

simonpetty avatar Dec 11 '23 12:12 simonpetty