amazon-kinesis-connectors
amazon-kinesis-connectors copied to clipboard
Handling duplicate records
We are using the latest build to read messages from a multi-shard Kinesis stream using workers running across multiple EC2 instances and write them to S3. In this context, please see the following doc:
http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-duplicates.html
It talks about cases where we can end up with reading duplicate records from the stream. The S3Emitter uses the buffer.firstSeqNumber and buffer.lastSeqNumber to determine the S3 filename. In the case of retries, this approach might not guarantee duplicate removal as the buffer can be filled differently with a different size or count. The approach suggested by the doc does not work in the case of S3Emitter as there is no way to control the exact number of records in the buffer (due to the emit being based on count, time OR size). Any thoughts on how we can overcome this limitation?
Thanks.
Any thoughts/updates on this?
Would it be possible for you to configure the time and size thresholds to large enough values where the emit is triggered by counts alone?
Sincerely, Gaurav
Hi Gaurav,
Thanks for the response. Even if we configure the time & size thresholds in such a way that only the counts matter, the fact that the counts are not always exact would still be a problem. That is, configuring a value of "n" for count does not guarantee only "n" records are buffered for emit and can always go slightly higher depending on the processRecords call. Hence this strategy does not always help, right?
I'm not convinced you need the fixed record count. Let's say you're consuming from a relatively slow-moving stream and you get, say, 15 records (number 10001 - 10015). You write these out into a file with the start ID in its name (my-data-10001) then you fail to checkpoint because you crash. When you restart (or another worker picks up this shard), it will also get a batch starting at 10001. It may get the same 15 records or it may get a few more. Let's say it now gets 17 records (10001 - 10017). It (over-)writes these to the my-data-10001 file. That file has all the data it had previously, plus a bit more. We now checkpoint at 10017, and the next batch starts at 10018 ready to go into the next file.
I can't see the need for the fixed record count.
What I can see the need for that isn't exposed from the connector library, is the shard ID. Without writing my own KinesisConnectorRecordProcessorFactory and having it return a subclass of KinesisConnectorRecordProcessor which overrides initialize and captures its own copy of shardId, I can't see any way of accessing the shard ID. And even then, I'm not sure how you'd get it to the emitter.
What happens when the order of these operations are reversed - that is, we got 17 records the first time, but only 15 the next time (assuming we set recordCount=15)? In that case, we will have two duplicate records in S3.
Why would you get fewer records on the subsequent call? And even if you did, you'd replace your 17 record file with a 15 record file and then your next call would presumably include the missing two records so they'd go into the next file. In any case, you have to cope with duplicates in the S3 files anyway because if you're using the Kinesis Client Library (which the Connector Library does), it can (briefly) have multiple workers subscribed to the same shards. So this whole scheme is just an exercise trying to reduce the number of duplicates, not completely eliminating them.
Why would you get fewer records on the subsequent call?
I did not mean the call to processRecords here, but the number of records that are in the buffer when the file is emitted to S3. Since the "recordCount" variable is not exact, there is a chance for the buffer to hold less or more records depending on when the emit was triggered.
you have to cope with duplicates in the S3 files anyway because if you're using the Kinesis Client Library (which the Connector Library does), it can (briefly) have multiple workers subscribed to the same shards. So this whole scheme is just an exercise trying to reduce the number of duplicates, not completely eliminating them
Does this mean the only way to ensure unique records is through mechanisms available in the consuming system (say an RDBMS)?
That's my belief, yes.