kafka-flow
kafka-flow copied to clipboard
Possible improvements to the codebase
Here I'm outlining my thoughts on what we can change to improve readability, make introducing new features less complex, and ease the burden of maintenance of the library.
Different approach to restoring state from snapshots
Current approach to restoring the state from snapshots is dictated by how we restore it from Cassandra:
first, we read all the keys, then we read the snapshots for each individual key.
The logic is implemented
in KeyStateOf
and used in PartitionFlow. This is very inconvenient, in particular, when using Kafka as snapshot storage as
we're supposed to read the state topic only once. What we could do is try to rework KeyStateOf, adding a method
to read all snapshots. This could delegate to a new abstraction which could be implemented separately for all
underlying storages: it can read keys and snapshots from Cassandra or read everything in a single pass from Kafka.
Example of what I'm talking about (we could design it any other way):
trait KeyStateOf[F[_]] {
// Read state from a snapshot for a single key
def apply(
topicPartition: TopicPartition,
key: String,
createdAt: Timestamp,
context: KeyContext[F]
): Resource[F, KeyState[F, ConsRecord]]
// Read all snapshots
def all(topicPartition: TopicPartition): Stream[F, KeyStateOf[F, ConsRecord]]
}
This will most likely be a huge change as it'll require moving a lot of things.
State consolidation
Currently, we keep multiple fragments of state in different data classes, accessing them via different APIs in order to read or modify (essentially, getters and setters). This creates more layers of indirection, making it harder to predict what impact any change may potentially have. The obvious ones I found:
- state of aggregate itself: created/modified in
KeyFlow.of, modified inFoldToStateandTickToState - timestamps (
Timestamps) and offsets designating when the aggregate was last persisted, processed and 'touched' ('current' timestamp): read in at least 4 different places (usages of methods ofReadTimestamps), modified in at least 2 (usages of methods ofWriteTimestamps) - timestamps (
Timers) of scheduled actions for an aggregate (essentially, when to runTimerFlowfor an aggregate): read and modified in two different places in code - an offset to commit for an aggregate (
KeyContext): read inPartitionFlowbefore committing, modified inTimerFlowOfwhen state is flushed.
The last one has the worst impact on discoverability, making the logic really 'implicit'. For example, PartitionFlow
updates the value of the offset of the last record in batch in Timestamps before triggering timers, so
that TimerFlowOf#persistPeriodically can read it from Timestamps and update the value in KeyContext via its hold
method after persisting the snapshot, causing PartitionFlow to calculate a correct offset to commit among all
aggregates via using KeyContext#holding. Changing this logic in any way would require knowing about this
chain of calls and the proper way of passing data around via these mutable shared state references. The more
functionality we have, the more complex it becomes, potentially resulting in breaking something irrelevant to the code
one wants to change.
What I would like to see here is a single immutable data model, describing an internal representation of a state of an
aggregate. Instances of such a model should be stored in a single unit of code with other units of code returning a new
updated data class. Essentially, KeyFlow#apply, TimerFlow#onTimer etc. returning a new version of an aggregate's
state.
Tick and TimerFlow execution
Currently, Tick and TimerFlow are executed only together with a configured interval. This could be inconvenient
when one wants to run Tick at a different interval than TimerFlow (when implementing state clean-up, for example).
We could untangle those two so that they can run at separate intervals.
Tick/Timer parallelism
Timer and Tick are started in a separate fiber for each individual key without limiting parallelism in any way.
This can negatively impact performance of an application when persistPeriodically is used together with a large
number of aggregates, resulting in (potentially) hundreds of thousands of fibers started simultaneously. This puts
pressure on the IO runtime and can even cause it to break. We could limit the degree of parallelism.
Usage of Cache in PartitionFlow
Instances of KeyState are stored in a Cache (from scache) inside PartitionFlow due to a Resource nature of
creating KeyState and Cache supporting it natively. In fact, using it as a Resource is required only for two
purposes that I've found:
- automatic unregistering of an aggregate from
EntityRegistry(inKeyFlow) - persisting the snapshot via
TimerFlowOf#flushOnRevokewhen partitions are revoked and aggregates are deallocated
Using scache might be an overkill in our case as we have no need in TTLs, loading values on the fly etc.
Instead, sometimes we have to deal with its APIs which have its own peculiarities.
We could investigate if our use-cases can be implemented in any other way without dropping the functionality.