amazon-kinesis-client-nodejs
amazon-kinesis-client-nodejs copied to clipboard
processRecords with async operations?
Decided to use the new basic consumer sample as a template for my current processor..
So on this project, we have some asynchronous tasks to do for each kinesis message, we also want to checkpoint after each message. Was wondering if you could take a peek at my processRecords impl to see if it looks solid:
fyi to decouple a lot of our processing and retry logic from the consumer we pass a recordProcessingStrategyCallback
to the consumer.. @sahilpalvia
'use strict';
var util = require('util');
module.exports = function strategizedKinesisConsumer(logger, recordProcessingStrategyCallback) {
var shardId;
var logger = logger;
var recordProcessingStrategyCallback = recordProcessingStrategyCallback;
return {
initialize: function(initializeInput, completeCallback) {
shardId = initializeInput.shardId;
logger.info('strategizedKinesisConsumer', {customText: `New KCL consumer initializing with config: ${JSON.stringify(initializeInput)}`});
completeCallback();
},
processRecords: function(processRecordsInput, completeCallback) {
if (!processRecordsInput || !processRecordsInput.records) {
completeCallback();
return;
}
var records = processRecordsInput.records;
return new Promise( async (resolve, reject) => {
for (var i = 0 ; i < records.length ; ++i) {
let record = records[i];
let data = new Buffer(record.data, 'base64').toString();
let dataObj = JSON.parse(data);
let sequenceNumber = record.sequenceNumber;
let partitionKey = record.partitionKey;
logger.info(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey));
//call logic
logger.info('event', {eventId: dataObj.eventId, correlationId: dataObj.correlationId, customText: 'Delegating message to blackbox strategy for processing'});
await recordProcessingStrategyCallback(dataObj);
logger.info('event', {eventId: dataObj.eventId, correlationId: dataObj.correlationId, customText: '...back from blackbox strategy'});
if (!sequenceNumber) {
completeCallback();
return;
}
const checkpoint = util.promisify(processRecordsInput.checkpointer.checkpoint).bind(processRecordsInput.checkpointer);
logger.info('event', {eventId: dataObj.eventId, correlationId: dataObj.correlationId, customText: 'Checkpointing message...'});
await checkpoint(sequenceNumber);
logger.info('event', {eventId: dataObj.eventId, correlationId: dataObj.correlationId, customText: '..message checkpointed'});
}
completeCallback()
resolve()
});
},
leaseLost: function(leaseLostInput, completeCallback) {
...
},
shardEnded: function(shardEndedInput, completeCallback) {
..
},
shutdownRequested: function(shutdownRequestedInput, completeCallback) {
..
}
};
}
could you guys add an example or a consumer that does async processing for each record? @sahilpalvia ?