kafka-flow icon indicating copy to clipboard operation
kafka-flow copied to clipboard

Possible improvements to the codebase

Open Z1kkurat opened this issue 2 years ago • 0 comments
trafficstars

Here I'm outlining my thoughts on what we can change to improve readability, make introducing new features less complex, and ease the burden of maintenance of the library.

Different approach to restoring state from snapshots

Current approach to restoring the state from snapshots is dictated by how we restore it from Cassandra: first, we read all the keys, then we read the snapshots for each individual key.
The logic is implemented in KeyStateOf and used in PartitionFlow. This is very inconvenient, in particular, when using Kafka as snapshot storage as we're supposed to read the state topic only once. What we could do is try to rework KeyStateOf, adding a method to read all snapshots. This could delegate to a new abstraction which could be implemented separately for all underlying storages: it can read keys and snapshots from Cassandra or read everything in a single pass from Kafka.
Example of what I'm talking about (we could design it any other way):

trait KeyStateOf[F[_]] {
  // Read state from a snapshot for a single key
  def apply(
             topicPartition: TopicPartition,
             key: String,
             createdAt: Timestamp,
             context: KeyContext[F]
           ): Resource[F, KeyState[F, ConsRecord]]

  // Read all snapshots
  def all(topicPartition: TopicPartition): Stream[F, KeyStateOf[F, ConsRecord]]
}

This will most likely be a huge change as it'll require moving a lot of things.

State consolidation

Currently, we keep multiple fragments of state in different data classes, accessing them via different APIs in order to read or modify (essentially, getters and setters). This creates more layers of indirection, making it harder to predict what impact any change may potentially have. The obvious ones I found:

  • state of aggregate itself: created/modified in KeyFlow.of, modified in FoldToState and TickToState
  • timestamps (Timestamps) and offsets designating when the aggregate was last persisted, processed and 'touched' ('current' timestamp): read in at least 4 different places (usages of methods of ReadTimestamps), modified in at least 2 (usages of methods of WriteTimestamps)
  • timestamps (Timers) of scheduled actions for an aggregate (essentially, when to run TimerFlow for an aggregate): read and modified in two different places in code
  • an offset to commit for an aggregate (KeyContext): read in PartitionFlow before committing, modified in TimerFlowOf when state is flushed.

The last one has the worst impact on discoverability, making the logic really 'implicit'. For example, PartitionFlow updates the value of the offset of the last record in batch in Timestamps before triggering timers, so that TimerFlowOf#persistPeriodically can read it from Timestamps and update the value in KeyContext via its hold method after persisting the snapshot, causing PartitionFlow to calculate a correct offset to commit among all aggregates via using KeyContext#holding. Changing this logic in any way would require knowing about this chain of calls and the proper way of passing data around via these mutable shared state references. The more functionality we have, the more complex it becomes, potentially resulting in breaking something irrelevant to the code one wants to change.

What I would like to see here is a single immutable data model, describing an internal representation of a state of an aggregate. Instances of such a model should be stored in a single unit of code with other units of code returning a new updated data class. Essentially, KeyFlow#apply, TimerFlow#onTimer etc. returning a new version of an aggregate's state.

Tick and TimerFlow execution

Currently, Tick and TimerFlow are executed only together with a configured interval. This could be inconvenient when one wants to run Tick at a different interval than TimerFlow (when implementing state clean-up, for example). We could untangle those two so that they can run at separate intervals.

Tick/Timer parallelism

Timer and Tick are started in a separate fiber for each individual key without limiting parallelism in any way. This can negatively impact performance of an application when persistPeriodically is used together with a large number of aggregates, resulting in (potentially) hundreds of thousands of fibers started simultaneously. This puts pressure on the IO runtime and can even cause it to break. We could limit the degree of parallelism.

Usage of Cache in PartitionFlow

Instances of KeyState are stored in a Cache (from scache) inside PartitionFlow due to a Resource nature of creating KeyState and Cache supporting it natively. In fact, using it as a Resource is required only for two purposes that I've found:

  • automatic unregistering of an aggregate from EntityRegistry (in KeyFlow)
  • persisting the snapshot via TimerFlowOf#flushOnRevoke when partitions are revoked and aggregates are deallocated

Using scache might be an overkill in our case as we have no need in TTLs, loading values on the fly etc. Instead, sometimes we have to deal with its APIs which have its own peculiarities. We could investigate if our use-cases can be implemented in any other way without dropping the functionality.

Z1kkurat avatar Mar 20 '23 16:03 Z1kkurat