enrich
enrich copied to clipboard
enrich-kinesis: Avoid duplicates when losing lease to a new worker
Once #649 has been solved, we will correctly recover from exceptions when a worker loses a lease.
But.... immediately after losing a release, there is a high chance that we write a large number of events to the output stream, even though we will never be able to checkpoint those events. They will be re-processed by the worker that steals the lease, and we will end up with a large number of duplicate events in the output stream.
We make no guarantees about duplicates in the output stream, so on the one hand this is OK. But on the other hand it is good to reduce duplicates whenever we can.
We should consider doing an extra check immediately before writing to the output stream. On the CommittableRecord we could check the value of canCheckpoint and isLastInShard. The logic is approximately (ignoring the F):
if (record.isLastInShard || record.canCheckpoint)
// Write chunk of events to the output stream
else
// Do nothing
To see why we need to check both values you need to inspect how the ChunkedRecordProcessor implements shardEnded and the last record.
I hate this proposed solution, and it seems very hacky. For Enrich, we can choose to not make this change. But for the streaming transformer (RDB Loader) we need a good solution, because there it is critical that we don't output duplicates, once deduplication is enabled.
We will either need to implement this hacky solution as I have described, or implement our own ShardRecordProcessor instead of using the fs2-aws lib.