alpakka-kafka
alpakka-kafka copied to clipboard
Pass through context from getOffsetsOnAssign
Short description
The use case is a Kafka event-sourced setup with periodic partition snapshot storing
In this case when a partition is assigned you have the following steps:
- load snapshot for partition
- start reading topic at the offset of that snapshot
- (now you get your alpakka stream of streams partition stream)
- rebuild the state from the snapshot (need a way to access the snapshot here)
- fetch the actual latest committed offset on that partition
- start reading the stream and while you have not reached the latest committed offset you apply the events without performing any side-effects
The problem is that the work done in step 2 can not be accessed in step 4 without resorting to temporarily storing that in some threadsafe reference which might get out of sync if repartitions happen
Details
current function
def committablePartitionedManualOffsetSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]],
onRevoke: Set[TopicPartition] => Unit = _ => ()
): Source[(TopicPartition, Source[CommittableMessage[K, V], NotUsed]), Control]
A simple solution would be to have some context produced in getOffsetsOnAssign
to be passed in next to the TopicPartition
and the Source[CommitableMessage]
improved function that introduces a C
for Context (Snapshot in my case)
def committablePartitionedManualOffsetSource[K, V, C](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, (Long, C)]],
onRevoke: Set[TopicPartition] => Unit = _ => ()
): Source[(TopicPartition, C, Source[CommittableMessage[K, V], NotUsed]), Control]
Any workarounds would also be welcome
If I understand correctly, you want to be able to maintain state for a particular partition when using partitioned sources. Presently at a rebalance the inner partition streams will complete and the new partition streams for that group member get re-emitted. If the same partition is re-emitted to the same group you want it to keep a reference to that context C
so it can be re-emitted with the new stream?
A partial implementation or some pseudo code might be helpful for my own understanding.
I just want to pass some info from the offset lookup code to the start of the partition stream. Will try to come up with an example.
But indeed this would also be useful in case of a complete-re-emit case but that would involve even more API changes
Here some code that shows the problem (we have to do a expensive operation twice)
case class Snapshot(offset: Long, state: State)
def loadSnapshot(partition: TopicPartition): Future[Snapshot] = {
// expensive operation here
}
def getOffsetsOnAssign(partitions: Set[TopicPartition]): Future[Map[TopicPartition, Long]] =
Future
.traverse(partitions) { partition =>
// we drop s.state here
loadSnapshot(partition).map(s => (partition, s.offset))
}
.map(_.toMap)
def getLastCommittedOffset(partition: TopicPartition): Future[Long] =
// use metaDataClient.getCommittedOffset()
committablePartitionedManualOffsetSource(settings, subscription, getOffsetsOnAssign).flatMapConcat {
case (partition, source) =>
val f = for {
lastCommittedOffset <- getLastCommittedOffset(partition)
snapshot <- loadSnapshot(partition)
} yield {
// use snapshot.state to set up the partition
// start business logic that consumes events for the source
// if we have not reached lastCommittedOffset we ignore side effects and commits
// from time to time write out snapshots
}
Source.futureSource(f)
}
I would like to pass snapshot.state
through so we don't have to do this expensive operation twice. (also this code might get out of sync and load a newer snapshot the second time, since snapshot writing is async)
Got it. I agree, it would be useful to propagate additional user context in this scenario.
Since your proposed solution involves breaking API changes, maybe we can add a new factory method that emits a SourceWithContext
(where C
in your use case would be the context). We can use a new getOffsetsOnAssign
signature that returns a C
to populate it. To access the context you could mapContext
, or drop down to a normal Source
with SourceWithContext.asSource
and the context would be available in a tuple of ((TopicPartition, Source[CommittableOffset], Ctx)
. Or users could just access the context for that partitioned source downstream if they want, though options are limited for merging streams together, etc, perhaps this is overthinking it though. Maybe you could experiment with it?
I shudder to think what this factory method would be called. sourceWithContextCommittablePartitionedManualOffsetSource
just doesn't roll off the tongue!
Had a look at implementing this today but got a bit stuck on the internal bookkeeping, think it will require a few iterations.
We went through a few iterations but have not found a satisfying solution mainly because of the multi-case logic in the driver. We might have another go at this later on by dropping everything we don't need but our next try will be using the java driver.
The problem is that the work done in step 2 can not be accessed in step 4 without resorting to temporarily storing that in some threadsafe reference which might get out of sync if repartitions happen
Why not use Akka Kafka sharding? It does state management robust to reassignment.
https://akka.io/blog/news/2020/03/18/akka-sharding-kafka-video https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#external-shard-allocation