amazon-kinesis-connectors icon indicating copy to clipboard operation
amazon-kinesis-connectors copied to clipboard

Handling duplicate records

Open ananthrk opened this issue 10 years ago • 8 comments

We are using the latest build to read messages from a multi-shard Kinesis stream using workers running across multiple EC2 instances and write them to S3. In this context, please see the following doc:

http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-duplicates.html

It talks about cases where we can end up with reading duplicate records from the stream. The S3Emitter uses the buffer.firstSeqNumber and buffer.lastSeqNumber to determine the S3 filename. In the case of retries, this approach might not guarantee duplicate removal as the buffer can be filled differently with a different size or count. The approach suggested by the doc does not work in the case of S3Emitter as there is no way to control the exact number of records in the buffer (due to the emit being based on count, time OR size). Any thoughts on how we can overcome this limitation?

Thanks.

ananthrk avatar Dec 31 '14 07:12 ananthrk

Any thoughts/updates on this?

ananthrk avatar Jan 08 '15 05:01 ananthrk

Would it be possible for you to configure the time and size thresholds to large enough values where the emit is triggered by counts alone?

Sincerely, Gaurav

gauravgh avatar Jan 08 '15 16:01 gauravgh

Hi Gaurav,

Thanks for the response. Even if we configure the time & size thresholds in such a way that only the counts matter, the fact that the counts are not always exact would still be a problem. That is, configuring a value of "n" for count does not guarantee only "n" records are buffered for emit and can always go slightly higher depending on the processRecords call. Hence this strategy does not always help, right?

ananthrk avatar Jan 08 '15 19:01 ananthrk

I'm not convinced you need the fixed record count. Let's say you're consuming from a relatively slow-moving stream and you get, say, 15 records (number 10001 - 10015). You write these out into a file with the start ID in its name (my-data-10001) then you fail to checkpoint because you crash. When you restart (or another worker picks up this shard), it will also get a batch starting at 10001. It may get the same 15 records or it may get a few more. Let's say it now gets 17 records (10001 - 10017). It (over-)writes these to the my-data-10001 file. That file has all the data it had previously, plus a bit more. We now checkpoint at 10017, and the next batch starts at 10018 ready to go into the next file.

I can't see the need for the fixed record count.

What I can see the need for that isn't exposed from the connector library, is the shard ID. Without writing my own KinesisConnectorRecordProcessorFactory and having it return a subclass of KinesisConnectorRecordProcessor which overrides initialize and captures its own copy of shardId, I can't see any way of accessing the shard ID. And even then, I'm not sure how you'd get it to the emitter.

codeaholics avatar Jan 16 '15 16:01 codeaholics

What happens when the order of these operations are reversed - that is, we got 17 records the first time, but only 15 the next time (assuming we set recordCount=15)? In that case, we will have two duplicate records in S3.

ananthrk avatar Jan 19 '15 05:01 ananthrk

Why would you get fewer records on the subsequent call? And even if you did, you'd replace your 17 record file with a 15 record file and then your next call would presumably include the missing two records so they'd go into the next file. In any case, you have to cope with duplicates in the S3 files anyway because if you're using the Kinesis Client Library (which the Connector Library does), it can (briefly) have multiple workers subscribed to the same shards. So this whole scheme is just an exercise trying to reduce the number of duplicates, not completely eliminating them.

codeaholics avatar Jan 19 '15 12:01 codeaholics

Why would you get fewer records on the subsequent call?

I did not mean the call to processRecords here, but the number of records that are in the buffer when the file is emitted to S3. Since the "recordCount" variable is not exact, there is a chance for the buffer to hold less or more records depending on when the emit was triggered.

you have to cope with duplicates in the S3 files anyway because if you're using the Kinesis Client Library (which the Connector Library does), it can (briefly) have multiple workers subscribed to the same shards. So this whole scheme is just an exercise trying to reduce the number of duplicates, not completely eliminating them

Does this mean the only way to ensure unique records is through mechanisms available in the consuming system (say an RDBMS)?

ananthrk avatar Jan 20 '15 12:01 ananthrk

That's my belief, yes.

codeaholics avatar Jan 20 '15 12:01 codeaholics