alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

Default PartitionAssignmentHandler behaviour on revoke+assing in transactional

Open szymonm opened this issue 4 years ago • 5 comments

Short description

SingleSourceLogic clears the buffer of accumulated records of revoked partitions only after new partitions are assigned. I think that this causes blocking onRevoke callback to wait for all the records in the buffer to be drained. While this approach may work in certain cases, it is not universal and I propose to add a configuration option to allow dropping records just after receiving onRevoke.

Details

Looking into the following code from SingleSourceLogic:

override protected def addToPartitionAssignmentHandler(
      handler: PartitionAssignmentHandler
  ): PartitionAssignmentHandler = {
    val flushMessagesOfRevokedPartitions: PartitionAssignmentHandler = new PartitionAssignmentHandler {
      private var lastRevoked = Set.empty[TopicPartition]

      override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
        lastRevoked = revokedTps

      override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
        filterRevokedPartitionsCB.invoke(lastRevoked -- assignedTps)


    }
    new PartitionAssignmentHelpers.Chain(handler, flushMessagesOfRevokedPartitions)
  }

I can see that we call filterRevokedPartitionsCB only after the new partitions are assigned. I would like to understand what are the pros and cons of this approach vs dropping the messages in the buffer just after receiving onRevoke from the consumer.

While it seems reasonable not to drop records that have already been read from Kafka it leads to the following inefficiency:

  1. onRevoke is called in the thread that calls consumer.poll. After this callback, we start draining the transactional stream. This is done by blocking the KafkaConsumerActor.
  2. We don't have control over stream processing, so the stream is working as before, except it will not receive new messages from the KafkaConsumerActor.
  3. The source is keeping track of record offsets emitted and will unblock the KafkaConsumerActor only after all emitted messages are committed. It is possible that we will wait for all messages accumulated in buffer to be committed.
  4. onRevoke callback is blocking all consumers in the consumer group for the given topic. Eventually, all consumers for this consumer group and topic are waiting for the last stream buffer to be drained.

In our setting, where we have hundreds of KTPs, it means that all consumers are blocked waiting for the last one to finish draining the whole buffer. This slows down processing for the whole system. I think that a strategy when we drop buffer records immediately after receiving onRevoke is not worse.

I would like to prove it using some benchmark and I'm happy to contribute the change, but I need the following:

  1. Confirmation that my understanding of the code is correct.
  2. Some guidance about how to start benchmarking this rebalancing scenario.
  3. Acceptance of adding the configuration option.

szymonm avatar Feb 24 '20 10:02 szymonm

@szymonm Thanks for raising this issue.

SingleSourceLogic clears the buffer of accumulated records of revoked partitions only after new partitions are assigned. I think that this causes blocking onRevoke callback to wait for all the records in the buffer to be drained.

AFAICT (it's been awhile since I've looked at TxSources) the SingleSourceLogic code you highlighted isn't actually run in transactional workloads. If you look at TransactionalSourceLogic it implements its own addToPartitionAssignmentHandler which does nothing on partition assignment, but will block on revoke while it waits for in-flight messages to be drained.

Before I address your other observations I want to make sure we're on the same page here, or if I've misunderstood.

seglo avatar Feb 24 '20 20:02 seglo

You are right, @seglo. TransactionalSource is not using the default handler defined in SingleSourceLogic. This makes symptoms I described even worse.

So should we include the call filterRevokedPartitionsCB.invoke(revokedTps) in TransactionalSource too? If yes, I suggest we add them to the onRevoke callback.

szymonm avatar Feb 25 '20 08:02 szymonm

@szymonm This sounds reasonable to me. To rephrase for my own understanding:

You would like to filter buffered messages that have not been emitted from the Transactional Source, but keep the existing in-flight message draining in place.

The in-flight message draining is crucial to make sure duplicates don't show up after a rebalance with transactions. It's tough to say if the proposed change will make a substantial difference in your use case, but I think it's worth benchmarking.

Before merging any changes I'll want to make certain our transactional integration tests pass. We've had trouble ensuring transactional guarantees in various rebalance use cases. I'll follow up in the PR about what we can do to feel more comfortable that the changes don't introduce regressions.

seglo avatar Feb 25 '20 15:02 seglo

@seglo Your understanding is correct.

I got your point about testing it rigorously. Created a PR with the change only. No tests and benchmarks so far. So far the tests are passing (except Scala 2.13 JDK 8 for reasons unrelated to this change I think).

We also have a bunch of tests on our side that discovered issues with transactions before. I'm pushing the version from my PR to some of our testing environments to check if there are benefits with the real load.

szymonm avatar Feb 25 '20 15:02 szymonm

We also have a bunch of tests on our side that discovered issues with transactions before.

It would be great to get an idea of the issues you've experienced, and how the tests are setup. We could use more automated testing here.

seglo avatar Feb 26 '20 16:02 seglo