amazon-kinesis-connectors icon indicating copy to clipboard operation
amazon-kinesis-connectors copied to clipboard

Understanding Number of records emitted to S3

Open sudharshanPLT opened this issue 7 years ago • 3 comments

Hello All, I have configured my properties file , in a way to write each record in a different file in S3

 # 1MB = 1024*1024 = 1048756
 bufferByteSizeLimit = 4096 
 bufferRecordCountLimit = 1
 bufferMillisecondsLimit = 3600000

My each record is of size < ~4KB.

Successfully emitted 713 records to Amazon S3 Successfully emitted 745 records to Amazon S3 Successfully emitted 644 records to Amazon S3

And each file in S3 is of size 1.7 MB or 1.6 MB. can someone shed some light on what might be happening here?

sudharshanPLT avatar Aug 22 '17 00:08 sudharshanPLT

@sudharshanPLT This is just an initial speculation of your issue based on the information provided. More details would be required for a more concrete and detailed explanation.

It seems like your all the records are being buffered and being emitted at once to S3. For your use case of writing single record to S3, you want to transform the each record and immediately emit it to S3. You do not need to buffer the records. If you plan to use the buffer, you need intermediate flushing of your buffer after transforming a record. Although this would not be recommended, since S3 recommends having larger object sizes.

sahilpalvia avatar Aug 24 '17 21:08 sahilpalvia

@sahilpalvia Thanks for your input. When I setbufferByteSizeLimit = 4096 doesn't that mean my buffer size is limited to 4KB, which is actually one record in my case. Am I missing something here? Please let me know what additional details you need to dig deep.

sudharshanPLT avatar Aug 24 '17 22:08 sudharshanPLT

@sudharshanPLT bufferByteSizeLimit, bufferRecordCountLimit and bufferMillisecondsLimit are used to check if the buffer needs to be flushed. You can check the logic behind the shouldFlush method here.

As you can see they are just used to check if the buffer meets the conditions to be flushed, the conditions being at least and not at most. The buffer does not automatically flush the records to the downstream service once it gets full. That logic needs to be implemented in your processRecords method. Have you implemented KinesisConnectorRecordProcessor and overriden processRecords? If not, you need to implement it and override the processRecords method. In there you could transform every single record of yours and emit it, without buffering it. If you implement KinesisConnectorRecordProcessor, make sure to implement KinesisConnectorRecordProcessorFactory and override the createKinesisConnectorRecordProcessor method to use your new record processor and provide your new factory in the KinesisConnectorExecutor.

sahilpalvia avatar Aug 25 '17 22:08 sahilpalvia