kafka-flow icon indicating copy to clipboard operation
kafka-flow copied to clipboard

Flushing snapshots and committing offsets on partition revoke doesn't work as expected

Open Z1kkurat opened this issue 2 years ago • 0 comments

When using TimerFlowOf.unloadOrphaned with flushOnRevoke = true together with PartitionFlowConfig(commitOnRevoke = true) it might be expected that on revoking partition (when the application is being shut down, for example) the framework persists current in-memory snapshots and commits the offset corresponding to the persisted snapshots. This doesn't happen because of the order in which resources (instances of TimerFlow in particular) are released inside the PartitionFlow.
Instances of TimerFlow are allocated as Resource and put into a Cache when a key is first encountered by either consuming from the topic or eagerly reading data from the persistence layer. When an interruption is being handled, a release method of PartitionFlow is executed first. The method looks like this:

val release: F[Unit] = if (config.commitOnRevoke) {
  offsetToCommit.flatMap { offset =>
    offset.traverse_ { offset =>
      log.info(s"committing on revoke: $offset") *> scheduleCommit.schedule(offset)
    }
  }
} else {
  ().pure[F]
}

As such, this release function calculates the current offset to commit before any instance of TimerFlow is released and its corresponding release logic executed. These instances will be released only when the Cache is released (thus triggering a release of its stored values) which happens later due to the Cache being allocated in the acquire section of PartitionFlow. Only then will it execute a release function of TimerFlow, persisting the current snapshot.

This can be reproduced by this code:

import cats.effect.{Async, ExitCode, IO, IOApp}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.kafka.ScheduleCommit
import com.evolutiongaming.kafka.flow.persistence.PersistenceOf
import com.evolutiongaming.kafka.flow.registry.EntityRegistry
import com.evolutiongaming.kafka.flow.timer.{TimerFlowOf, TimersOf}
import com.evolutiongaming.kafka.journal.ConsRecord
import com.evolutiongaming.skafka.TimestampType.Create
import com.evolutiongaming.skafka.consumer.WithSize
import com.evolutiongaming.skafka.{Offset, Partition, TimestampAndType, TopicPartition}
import scodec.bits.ByteVector

import java.nio.charset.Charset
import java.time.Instant
import scala.concurrent.duration._

object PartitionFlowTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val fold: FoldOption[IO, Int, ConsRecord] =
      FoldOption.of[IO, Int, ConsRecord]((_: Option[Int], value: ConsRecord) =>
        IO.println(s"Received $value") >> IO.pure(Some(1))
      )

    for {
      logOf    <- LogOf.slf4j[IO]
      timersOf <- TimersOf.memory[IO, KafkaKey]
      partitionFlowOf = PartitionFlowOf[IO](
        keyStateOf = KeyStateOf.lazyRecovery[IO, Int](
          applicationId = "id",
          groupId       = "group",
          timersOf      = timersOf,
          persistenceOf = PersistenceOf.empty,
          timerFlowOf = TimerFlowOf
            .unloadOrphaned(fireEvery = 1.second, maxOffsetDifference = 100, maxIdle = 1.second, flushOnRevoke = true),
          fold     = fold,
          tick     = TickOption.id[IO, Int],
          registry = EntityRegistry.empty
        ),
        config = PartitionFlowConfig(commitOnRevoke = true),
        filter = None
      )(Async[IO], logOf)
      _ <- partitionFlowOf
        .apply(
          topicPartition = TopicPartition("topic", Partition.min),
          assignedAt     = Offset.min,
          scheduleCommit = new ScheduleCommit[IO] {
            override def schedule(offset: Offset): IO[Unit] = IO.println(s"Scheduled commit for offset $offset")
          }
        )
        .use { partitionFlow =>
          val record1 =
            ConsRecord(
              topicPartition   = TopicPartition("topic", Partition.min),
              offset           = Offset.unsafe(1L),
              timestampAndType = Some(TimestampAndType(Instant.now(), Create)),
              key              = Some(WithSize("key1")),
              value            = Some(WithSize(ByteVector("value".getBytes(Charset.defaultCharset())))),
            )
          val record2 = record1.copy(key = Some(WithSize("key2")), offset = Offset.unsafe(2L))
          partitionFlow.apply(List(record1, record2))
        }
    } yield ExitCode.Success
  }
}

The output of that code demonstrates the aforementioned order of execution:

11:49:58.003 [io-compute-4] INFO  c.e.kafka.flow.PartitionFlow - topic-0 starting
11:49:58.051 [io-compute-4] INFO  c.e.kafka.flow.PartitionFlow - topic-0 partition recovery started
11:49:58.056 [io-compute-4] INFO  c.e.kafka.flow.PartitionFlow - topic-0 partition recovery finished, 0 keys recovered
Received ConsumerRecord(topic-0,2,Some(TimestampAndType(2023-04-18T09:49:58.060890Z,Create)),Some(WithSize(key2,0)),Some(WithSize(ByteVector(5 bytes, 0x76616c7565),0)),List())
Received ConsumerRecord(topic-0,1,Some(TimestampAndType(2023-04-18T09:49:58.060890Z,Create)),Some(WithSize(key1,0)),Some(WithSize(ByteVector(5 bytes, 0x76616c7565),0)),List())
11:49:58.192 [io-compute-10] INFO  c.e.kafka.flow.PartitionFlow - topic-0 offset: 1 (+1)
Scheduled commit for offset 1
11:49:58.201 [io-compute-3] INFO  c.e.kafka.flow.PartitionFlow - topic-0 key2 flush on revoke, holding offset: Some(2)
11:49:58.201 [io-compute-3] INFO  c.e.kafka.flow.PartitionFlow - topic-0 key1 flush on revoke, holding offset: Some(1)
11:49:58.202 [io-compute-4] INFO  c.e.kafka.flow.PartitionFlow - topic-0 stopping

In the example above, the offset to commit is calculated before any snapshot is persisted. If the application has persisted snapshots before and then received new events that resulted in updated snapshots, then the older offset will be committed, but the newer snapshots will be persisted which could be incorrect.

Example:

  • the application receives key1 and key2 at offsets 1 and 2
  • both snapshots are persisted, offset 1 is marked for key1 and offset 2 is marked for key2 as "held"
  • a regular commit sequence happens, the lowest held offset = 1 is committed
  • new events are received, at offset 3 for key1 and offset 4 for key2, state of both aggregates is updated by Fold
  • partition is revoked, PartitionFlow's release logic calculates the lowest held offset to commit which is still 1 since newer snapshots weren't persisted
  • cache is cleared in PartitionFlow, both TimerFlows are deallocated and their release logic is executed which triggers flushOnCancel which persists current snapshots that are based on events with offsets 3 and 4 As a result: snapshots for offsets 3 and 4 are persisted, but offset 1 is committed.
    This might not be an issue for an application that deduplicates incoming events against the snapshot, but it might come as a surprise.

Initially, I thought clearing the cache explicitly on a release of PartitionFlow when commitOnRevoke = true before calculating the offset to commit could be a way to mitigate the issue. Clearing the cache would cause all TimerFlow to be released and flushOnCancel logic to be executed, resulting in snapshots being persisted first, and the proper offset being calculated afterward in the release of PartitionFlow.
However, clearing the cache will trigger persisting of snapshots but after that, there will be no cache entries to calculate a minimal offset for when releasing PartitionFlow.

Z1kkurat avatar Apr 18 '23 10:04 Z1kkurat