amazon-kinesis-client-nodejs icon indicating copy to clipboard operation
amazon-kinesis-client-nodejs copied to clipboard

Error while running consumer

Open sapinder opened this issue 7 years ago • 12 comments

I am trying the to run the sample as it is, producer works fine but i am getting errors while running the consumer: Here is the console dump: Apr 24, 2018 2:55:02 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000 Apr 24, 2018 2:55:02 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info INFO: Sleeping ... Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading next message from STDIN for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading STDERR for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangProtocol futureMethod SEVERE: Failed to get status message for initialize action for shard shardId-000000000000 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at java.base/java.util.concurrent.FutureTask.report(Unknown Source) at java.base/java.util.concurrent.FutureTask.get(Unknown Source) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.futureMethod(MultiLangProtocol.java:197) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:171) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:138) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.initialize(MultiLangProtocol.java:78) at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:94) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:90) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84) at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31) at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70) ... 4 more

Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor stopProcessing SEVERE: Encountered an error while trying to initialize record processor java.lang.RuntimeException: Failed to initialize child process at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:95) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:90) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source)

Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Starting: Draining STDOUT for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Draining STDOUT for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor childProcessShutdownSequence INFO: Child process exited with value: 1 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemon run INFO: Process terminanted, will initiate shutdown.

sapinder avatar Apr 24 '18 09:04 sapinder

@sapinder Did you find any solutions?

MariaTolstov avatar Sep 05 '18 20:09 MariaTolstov

We are seeing similar issue while running consumer application using NodeJS aws-kcl library with version 0.7.0 , the simple consumer app fails after some time.


Error Log : Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading next message from STDIN for shardId-000000000001 Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask handleLine SEVERE: Received error line from subprocess [Error: Kinesis Client Library is in the invalid state. Cannot proceed further.] for shard shardId-000000000001 Error: Kinesis Client Library is in the invalid state. Cannot proceed further. Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask handleLine SEVERE: Received error line from subprocess [ at KCLManager._handleStateInput (/usr/src/app/node_modules/aws-kcl/lib/kcl/kcl_manager.js:399:11)] for shard shardId-000000000001 at KCLManager._handleStateInput (/usr/src/app/node_modules/aws-kcl/lib/kcl/kcl_manager.js:399:11) Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.MultiLangProtocol futureMethod SEVERE: Failed to get status message for processRecords action for shard shardId-000000000001 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000001 so won't be able to return a message. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.futureMethod(MultiLangProtocol.java:197) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:171) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:138) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.processRecords(MultiLangProtocol.java:92) at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.processRecords(MultiLangRecordProcessor.java:108) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:215) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:170) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000001 so won't be able to return a message. at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84) at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31) at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70) ... 4 more

Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask handleLine SEVERE: Received error line from subprocess [ at KCLManager. (/usr/src/app/node_modules/aws-kcl/lib/kcl/kcl_manager.js:381:12)] for shard shardId-000000000001 at KCLManager. (/usr/src/app/node_modules/aws-kcl/lib/kcl/kcl_manager.js:381:12) Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor stopProcessing SEVERE: Encountered an error while trying to process records java.lang.RuntimeException: Child process failed to process records at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.processRecords(MultiLangRecordProcessor.java:109) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:215) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:170) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)


Any help would be appreciated.

ps185130 avatar Oct 22 '18 20:10 ps185130

Same issue here..Any help?

niketh90 avatar Oct 05 '20 18:10 niketh90

Any update on this?

brandonmino avatar Feb 09 '21 18:02 brandonmino

We fixed the issue. There was some issue with the braces in code and it was failing at checkpointing. After refactoring its working fine.

Here is the working copy of the code.

 `function recordProcessor () {var shardId return {
initialize: function (initializeInput, completeCallback) {
  shardId = initializeInput.shardId
  completeCallback()
},processRecords: function (processRecordsInput, completeCallback) {
  if (!processRecordsInput || !processRecordsInput.records) {
    completeCallback()
    return
  }

  var records = processRecordsInput.records
  var record, data, sequenceNumber, partitionKey
  for (var i = 0; i < records.length; ++i) {
    record = records[i]
    data = new Buffer(record.data, 'base64').toString()

         sequenceNumber = record.sequenceNumber
    partitionKey = record.partitionKey
    Logger.instance().info(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey));
    
    ProcessMyData(data).then(result => {
         Logger.instance().info(JSON.stringify(result))
    }).catch(err => {
      Logger.instance().error('Error  ' + JSON.stringify(err))
    })
  }
  Logger.instance().info(util.format('records %s', JSON.stringify(records)))
  if (!sequenceNumber) {
    Logger.instance().info(util.format('Checkpoint not done since seq number is undefined. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber));
    completeCallback()
    return
  } else {
    // If checkpointing, completeCallback should only be called once checkpoint is complete.
    processRecordsInput.checkpointer.checkpoint(sequenceNumber, function (_err, sequenceNumber) {
      Logger.instance().info(util.format('Checkpoint successful. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber))
      completeCallback()
    })
  }
},
leaseLost: function (leaseLostInput, completeCallback) {
  Logger.instance().info(util.format('Lease was lost for ShardId: %s', shardId))
  completeCallback()
},

shardEnded: function (shardEndedInput, completeCallback) {
  Logger.instance().info(util.format('ShardId: %s has ended. Will checkpoint now.', shardId))
  shardEndedInput.checkpointer.checkpoint(function (_err) {
    completeCallback()
  })
},

shutdownRequested: function (shutdownRequestedInput, completeCallback) {
  shutdownRequestedInput.checkpointer.checkpoint(function (_err) {
    completeCallback()
  })
}  }}kcl(recordProcessor()).run()`

niketh90 avatar Feb 09 '21 20:02 niketh90

I essentially copied and pasted your code, but I still have the same error...

brandonmino avatar Feb 09 '21 21:02 brandonmino

Then it will be a good idea to enable debug logs and send logs to aws support for analysis.

niketh90 avatar Feb 09 '21 21:02 niketh90

Same here

vincentpham13 avatar Dec 04 '21 08:12 vincentpham13

Did anyone find solution ?

softmantk avatar Feb 10 '22 09:02 softmantk

Seconded. We're still dealing with this issue. Did anyone find a fix? @pfifer

SimranjitKambojTR avatar Jun 29 '22 15:06 SimranjitKambojTR

We are also finding the same issue and we had to restart the processor to get the consumer up and running again.

ashrafuzzaman avatar Jul 24 '22 06:07 ashrafuzzaman

what we have seen is the issue happens when processing time for one of the message is longer. We need to keep restarting consumer.

mgagguturi avatar Jul 28 '22 15:07 mgagguturi