Idea to reuse strategies between partition assign/revoke
We currently recreate the processing strategy everytime we assign/revoke a set of partitions. To my knowledge there are two reasons for that:
- We want to flush out old messages to preserve ordering guarantees (messages on the same partition should be submitted in-order, and strategy should commit in-order)
-
StrategyFactory.create_with_partitionsgets the currently assigned partitions passed
However, at the same time:
-
we only provide ordering guarantees per-partition. if a new partition is assigned, we can reuse the processing strategy. if a partition is revoked, we can reuse the processing strategy. the only case where we need to join() is when the same partition is revoked and assigned again to the same consumer
-
Almost no factory we write actually cares about the assigned partitions.
And additionally, we observe in production rebalancing where partitions get shuffled around between consumers in the following pattern:
- new partition assigned
n - partition
nrevoked - new partition assigned
n + 1 - partition
n+1revoked - ...
(see attachment out.tsv for full logs pertaining to INC-402)
The logs show that we are closing the strategy on every partition revocation, i.e. on step 2 and 4. In theory it can also happen on partition assignment. But in the pattern above, it is completely unnecessary to recreate the strategy at all in order to preserve ordering of messages per-partition.
Proposal:
-
Deprecate
create_with_partitionsand replace it with a simplercreateinterface that does not take partition count.There is one strategy here that uses partition count to determine query concurrency. But could it possibly observe partitions as messages come in, and adjust query concurrency based on that?
Alternatively there can be a mechanism where strategy factories that do define
create_with_partitionsstill enable today's behavior on rebalancing, while most other factories that don't need partition information can usecreatewill receive a nice speed boost during rebalancing. -
In
on_revoke, do not close the strategy. Instead, add the revoked partitions to a set of "unflushed partitions" -
In
on_assign, also do not close the strategy, but only create it if it isNone. If there is overlap between the "unflushed partitions" and the newly assigned partitions, calljoin()on the strategy (but do not recreate it) and clear the set "unflushed partitions" (only ifjoin()happened)
Unclear:
- Do we need to actually recreate the strategy, or can we just call
join()and continue using it?