amazon-kinesis-connectors
amazon-kinesis-connectors copied to clipboard
Understanding Number of records emitted to S3
Hello All, I have configured my properties file , in a way to write each record in a different file in S3
# 1MB = 1024*1024 = 1048756
bufferByteSizeLimit = 4096
bufferRecordCountLimit = 1
bufferMillisecondsLimit = 3600000
My each record is of size < ~4KB.
Successfully emitted 713 records to Amazon S3 Successfully emitted 745 records to Amazon S3 Successfully emitted 644 records to Amazon S3
And each file in S3 is of size 1.7 MB or 1.6 MB. can someone shed some light on what might be happening here?
@sudharshanPLT This is just an initial speculation of your issue based on the information provided. More details would be required for a more concrete and detailed explanation.
It seems like your all the records are being buffered and being emitted at once to S3. For your use case of writing single record to S3, you want to transform the each record and immediately emit it to S3. You do not need to buffer the records. If you plan to use the buffer, you need intermediate flushing of your buffer after transforming a record. Although this would not be recommended, since S3 recommends having larger object sizes.
@sahilpalvia Thanks for your input. When I setbufferByteSizeLimit = 4096
doesn't that mean my buffer size is limited to 4KB, which is actually one record in my case. Am I missing something here? Please let me know what additional details you need to dig deep.
@sudharshanPLT bufferByteSizeLimit
, bufferRecordCountLimit
and bufferMillisecondsLimit
are used to check if the buffer needs to be flushed. You can check the logic behind the shouldFlush
method here.
As you can see they are just used to check if the buffer meets the conditions to be flushed, the conditions being at least and not at most. The buffer does not automatically flush the records to the downstream service once it gets full. That logic needs to be implemented in your processRecords
method. Have you implemented KinesisConnectorRecordProcessor
and overriden processRecords
? If not, you need to implement it and override the processRecords
method. In there you could transform every single record of yours and emit it, without buffering it. If you implement KinesisConnectorRecordProcessor
, make sure to implement KinesisConnectorRecordProcessorFactory
and override the createKinesisConnectorRecordProcessor
method to use your new record processor and provide your new factory in the KinesisConnectorExecutor
.