Flushing snapshots and committing offsets on partition revoke doesn't work as expected
When using TimerFlowOf.unloadOrphaned with flushOnRevoke = true together with PartitionFlowConfig(commitOnRevoke = true) it might be expected that on revoking partition (when the application is being shut down, for example) the framework persists current in-memory snapshots and commits the offset corresponding to the persisted snapshots. This doesn't happen because of the order in which resources (instances of TimerFlow in particular) are released inside the PartitionFlow.
Instances of TimerFlow are allocated as Resource and put into a Cache when a key is first encountered by either consuming from the topic or eagerly reading data from the persistence layer. When an interruption is being handled, a release method of PartitionFlow is executed first. The method looks like this:
val release: F[Unit] = if (config.commitOnRevoke) {
offsetToCommit.flatMap { offset =>
offset.traverse_ { offset =>
log.info(s"committing on revoke: $offset") *> scheduleCommit.schedule(offset)
}
}
} else {
().pure[F]
}
As such, this release function calculates the current offset to commit before any instance of TimerFlow is released and its corresponding release logic executed. These instances will be released only when the Cache is released (thus triggering a release of its stored values) which happens later due to the Cache being allocated in the acquire section of PartitionFlow. Only then will it execute a release function of TimerFlow, persisting the current snapshot.
This can be reproduced by this code:
import cats.effect.{Async, ExitCode, IO, IOApp}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.kafka.ScheduleCommit
import com.evolutiongaming.kafka.flow.persistence.PersistenceOf
import com.evolutiongaming.kafka.flow.registry.EntityRegistry
import com.evolutiongaming.kafka.flow.timer.{TimerFlowOf, TimersOf}
import com.evolutiongaming.kafka.journal.ConsRecord
import com.evolutiongaming.skafka.TimestampType.Create
import com.evolutiongaming.skafka.consumer.WithSize
import com.evolutiongaming.skafka.{Offset, Partition, TimestampAndType, TopicPartition}
import scodec.bits.ByteVector
import java.nio.charset.Charset
import java.time.Instant
import scala.concurrent.duration._
object PartitionFlowTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val fold: FoldOption[IO, Int, ConsRecord] =
FoldOption.of[IO, Int, ConsRecord]((_: Option[Int], value: ConsRecord) =>
IO.println(s"Received $value") >> IO.pure(Some(1))
)
for {
logOf <- LogOf.slf4j[IO]
timersOf <- TimersOf.memory[IO, KafkaKey]
partitionFlowOf = PartitionFlowOf[IO](
keyStateOf = KeyStateOf.lazyRecovery[IO, Int](
applicationId = "id",
groupId = "group",
timersOf = timersOf,
persistenceOf = PersistenceOf.empty,
timerFlowOf = TimerFlowOf
.unloadOrphaned(fireEvery = 1.second, maxOffsetDifference = 100, maxIdle = 1.second, flushOnRevoke = true),
fold = fold,
tick = TickOption.id[IO, Int],
registry = EntityRegistry.empty
),
config = PartitionFlowConfig(commitOnRevoke = true),
filter = None
)(Async[IO], logOf)
_ <- partitionFlowOf
.apply(
topicPartition = TopicPartition("topic", Partition.min),
assignedAt = Offset.min,
scheduleCommit = new ScheduleCommit[IO] {
override def schedule(offset: Offset): IO[Unit] = IO.println(s"Scheduled commit for offset $offset")
}
)
.use { partitionFlow =>
val record1 =
ConsRecord(
topicPartition = TopicPartition("topic", Partition.min),
offset = Offset.unsafe(1L),
timestampAndType = Some(TimestampAndType(Instant.now(), Create)),
key = Some(WithSize("key1")),
value = Some(WithSize(ByteVector("value".getBytes(Charset.defaultCharset())))),
)
val record2 = record1.copy(key = Some(WithSize("key2")), offset = Offset.unsafe(2L))
partitionFlow.apply(List(record1, record2))
}
} yield ExitCode.Success
}
}
The output of that code demonstrates the aforementioned order of execution:
11:49:58.003 [io-compute-4] INFO c.e.kafka.flow.PartitionFlow - topic-0 starting
11:49:58.051 [io-compute-4] INFO c.e.kafka.flow.PartitionFlow - topic-0 partition recovery started
11:49:58.056 [io-compute-4] INFO c.e.kafka.flow.PartitionFlow - topic-0 partition recovery finished, 0 keys recovered
Received ConsumerRecord(topic-0,2,Some(TimestampAndType(2023-04-18T09:49:58.060890Z,Create)),Some(WithSize(key2,0)),Some(WithSize(ByteVector(5 bytes, 0x76616c7565),0)),List())
Received ConsumerRecord(topic-0,1,Some(TimestampAndType(2023-04-18T09:49:58.060890Z,Create)),Some(WithSize(key1,0)),Some(WithSize(ByteVector(5 bytes, 0x76616c7565),0)),List())
11:49:58.192 [io-compute-10] INFO c.e.kafka.flow.PartitionFlow - topic-0 offset: 1 (+1)
Scheduled commit for offset 1
11:49:58.201 [io-compute-3] INFO c.e.kafka.flow.PartitionFlow - topic-0 key2 flush on revoke, holding offset: Some(2)
11:49:58.201 [io-compute-3] INFO c.e.kafka.flow.PartitionFlow - topic-0 key1 flush on revoke, holding offset: Some(1)
11:49:58.202 [io-compute-4] INFO c.e.kafka.flow.PartitionFlow - topic-0 stopping
In the example above, the offset to commit is calculated before any snapshot is persisted. If the application has persisted snapshots before and then received new events that resulted in updated snapshots, then the older offset will be committed, but the newer snapshots will be persisted which could be incorrect.
Example:
- the application receives
key1andkey2at offsets1and2 - both snapshots are persisted, offset
1is marked forkey1and offset2is marked forkey2as "held" - a regular commit sequence happens, the lowest held offset =
1is committed - new events are received, at offset
3forkey1and offset4forkey2, state of both aggregates is updated by Fold - partition is revoked,
PartitionFlow'sreleaselogic calculates the lowest held offset to commit which is still1since newer snapshots weren't persisted - cache is cleared in
PartitionFlow, bothTimerFlows are deallocated and theirreleaselogic is executed which triggersflushOnCancelwhich persists current snapshots that are based on events with offsets3and4As a result: snapshots for offsets3and4are persisted, but offset1is committed.
This might not be an issue for an application that deduplicates incoming events against the snapshot, but it might come as a surprise.
Initially, I thought clearing the cache explicitly on a release of PartitionFlow when commitOnRevoke = true before calculating the offset to commit could be a way to mitigate the issue. Clearing the cache would cause all TimerFlow to be released and flushOnCancel logic to be executed, resulting in snapshots being persisted first, and the proper offset being calculated afterward in the release of PartitionFlow.
However, clearing the cache will trigger persisting of snapshots but after that, there will be no cache entries to calculate a minimal offset for when releasing PartitionFlow.