Error while running consumer
I am trying the to run the sample as it is, producer works fine but i am getting errors while running the consumer: Here is the console dump: Apr 24, 2018 2:55:02 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000 Apr 24, 2018 2:55:02 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info INFO: Sleeping ... Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading next message from STDIN for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading STDERR for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangProtocol futureMethod SEVERE: Failed to get status message for initialize action for shard shardId-000000000000 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at java.base/java.util.concurrent.FutureTask.report(Unknown Source) at java.base/java.util.concurrent.FutureTask.get(Unknown Source) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.futureMethod(MultiLangProtocol.java:197) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:171) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:138) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.initialize(MultiLangProtocol.java:78) at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:94) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:90) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84) at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31) at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70) ... 4 more
Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor stopProcessing SEVERE: Encountered an error while trying to initialize record processor java.lang.RuntimeException: Failed to initialize child process at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:95) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:90) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source)
Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Starting: Draining STDOUT for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Draining STDOUT for shardId-000000000000 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor childProcessShutdownSequence INFO: Child process exited with value: 1 Apr 24, 2018 2:55:15 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemon run INFO: Process terminanted, will initiate shutdown.
@sapinder Did you find any solutions?
We are seeing similar issue while running consumer application using NodeJS aws-kcl library with version 0.7.0 , the simple consumer app fails after some time.
Error Log : Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading next message from STDIN for shardId-000000000001 Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask handleLine SEVERE: Received error line from subprocess [Error: Kinesis Client Library is in the invalid state. Cannot proceed further.] for shard shardId-000000000001 Error: Kinesis Client Library is in the invalid state. Cannot proceed further. Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask handleLine SEVERE: Received error line from subprocess [ at KCLManager._handleStateInput (/usr/src/app/node_modules/aws-kcl/lib/kcl/kcl_manager.js:399:11)] for shard shardId-000000000001 at KCLManager._handleStateInput (/usr/src/app/node_modules/aws-kcl/lib/kcl/kcl_manager.js:399:11) Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.MultiLangProtocol futureMethod SEVERE: Failed to get status message for processRecords action for shard shardId-000000000001 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000001 so won't be able to return a message. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.futureMethod(MultiLangProtocol.java:197) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:171) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:138) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.processRecords(MultiLangProtocol.java:92) at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.processRecords(MultiLangRecordProcessor.java:108) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:215) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:170) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000001 so won't be able to return a message. at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84) at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31) at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70) ... 4 more
Oct 22, 2018 7:47:35 PM com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask handleLine
SEVERE: Received error line from subprocess [ at KCLManager.
Any help would be appreciated.
Same issue here..Any help?
Any update on this?
We fixed the issue. There was some issue with the braces in code and it was failing at checkpointing. After refactoring its working fine.
Here is the working copy of the code.
`function recordProcessor () {var shardId return {
initialize: function (initializeInput, completeCallback) {
shardId = initializeInput.shardId
completeCallback()
},processRecords: function (processRecordsInput, completeCallback) {
if (!processRecordsInput || !processRecordsInput.records) {
completeCallback()
return
}
var records = processRecordsInput.records
var record, data, sequenceNumber, partitionKey
for (var i = 0; i < records.length; ++i) {
record = records[i]
data = new Buffer(record.data, 'base64').toString()
sequenceNumber = record.sequenceNumber
partitionKey = record.partitionKey
Logger.instance().info(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey));
ProcessMyData(data).then(result => {
Logger.instance().info(JSON.stringify(result))
}).catch(err => {
Logger.instance().error('Error ' + JSON.stringify(err))
})
}
Logger.instance().info(util.format('records %s', JSON.stringify(records)))
if (!sequenceNumber) {
Logger.instance().info(util.format('Checkpoint not done since seq number is undefined. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber));
completeCallback()
return
} else {
// If checkpointing, completeCallback should only be called once checkpoint is complete.
processRecordsInput.checkpointer.checkpoint(sequenceNumber, function (_err, sequenceNumber) {
Logger.instance().info(util.format('Checkpoint successful. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber))
completeCallback()
})
}
},
leaseLost: function (leaseLostInput, completeCallback) {
Logger.instance().info(util.format('Lease was lost for ShardId: %s', shardId))
completeCallback()
},
shardEnded: function (shardEndedInput, completeCallback) {
Logger.instance().info(util.format('ShardId: %s has ended. Will checkpoint now.', shardId))
shardEndedInput.checkpointer.checkpoint(function (_err) {
completeCallback()
})
},
shutdownRequested: function (shutdownRequestedInput, completeCallback) {
shutdownRequestedInput.checkpointer.checkpoint(function (_err) {
completeCallback()
})
} }}kcl(recordProcessor()).run()`
I essentially copied and pasted your code, but I still have the same error...
Then it will be a good idea to enable debug logs and send logs to aws support for analysis.
Same here
Did anyone find solution ?
Seconded. We're still dealing with this issue. Did anyone find a fix? @pfifer
We are also finding the same issue and we had to restart the processor to get the consumer up and running again.
what we have seen is the issue happens when processing time for one of the message is longer. We need to keep restarting consumer.