amazon-kinesis-client-python
amazon-kinesis-client-python copied to clipboard
Records not being processed with sample app - simple print(data) not working
I've gone through several issues and SO questions and haven't been able to get the sample app to work.
Here's how to reproduce this;
# Run dynamodb
docker run -d -p 8000:8000 dwmkerr/dynamodb
# Create java8 container
docker run -it jkosgei/alpine-java8 ash # Source at https://github.com/jonathan-kosgei/alpine-java8
export JAVA_HOME=/usr/lib/jvm/java-1.8-openjdk/jre
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
apk add --no-cache python3 curl git
git clone https://github.com/awslabs/amazon-kinesis-client-python.git
cd amazon-kinesis-client-python
# edit sample.properties with sample below
# Start sample producer
sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster
# Do setup
python3 setup.py download_jars &&\
python3 setup.py install
# Run sample app
`amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties prod.properties`
The following are the only things I've changed in my prod.properties
executableName = python3 /amazon-kinesis-client-python/samples/sample_kclpy_app.py
dynamoDBEndpoint = http://127.0.0.1:8000
applicationName = test
maxRecords = 10000
idleTimeBetweenReadsInMillis = 200
callProcessRecordsEvenForEmptyRecordList = true
In the sample_kclpy_app.py
my process_record
function looks like this;
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this recod.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
print('Data: ', data)
with open('/log.txt', 'a') as the_file:
the_file.write(data)
return
My logs
/amazon-kinesis-client-python # `amazon_kclpy_helper.py --print_command --java /
usr/bin/java --properties prod.properties`
Mar 07, 2018 11:36:26 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 13bdbb57-e701-4be1-b2ca-6b808fa95b73
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property regionName with value us-east-1
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property idleTimeBetweenReadsInMillis with value 200
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property maxRecords with value 10000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property callProcessRecordsEvenForEmptyRecordList with value true
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property dynamoDBEndpoint with value http://127.0.0.1:8000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running test to process stream words with executable python3
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/3.6 python3
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
Mar 07, 2018 11:36:37 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator initialize
INFO: Created new lease table for coordinator with initial read capacity of 10 and write capacity of 10.
Mar 07, 2018 11:36:41 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Mar 07, 2018 11:36:43 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Starting LeaseCoordinator
Mar 07, 2018 11:36:46 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
Mar 07, 2018 11:36:48 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 successfully took 1 leases: shardId-000000000000
Mar 07, 2018 11:36:56 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker run
INFO: Initialization complete. Starting worker loop.
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker infoForce
INFO: Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=bleh, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No need to block on parents [] of shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
INFO: Initializing shard shardId-000000000000 with TRIM_HORIZON
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing InitializeMessage to child process for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading STDERR for shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 110 bytes for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
When starting from TRIM_HORIZON it's possible that it will take a number of GetRecords calls for the worker to catch up to the current tip of the shard. For testing can you try:
- Add
initialPositionInStream=LATEST
to the properties file. - Start the consumer process first, and wait for it to start reading from the stream.
Once you see
Created new shardConsumer
. - Start the producing application.
This will ensure that the consumer is reading from the tip before you start putting data in. This usually isn't a problem for application that are running constantly as they stay right near the tip of the shard.
Hi, I've tried this again, with a new app name so it gets a new dynamodb table, a brand new shard and the starting point set to LATEST. Also with the producer putting words into the stream every 2 seconds.
The results show the app sleeps a lot:
/amazon-kinesis-client-python # `amazon_kclpy_helper.py --print_command --java /
usr/bin/java --properties prod.properties`
Mar 07, 2018 3:27:21 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 88a12a3b-9b5d-40c0-827f-29624ec91837
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property regionName with value us-east-1
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property idleTimeBetweenReadsInMillis with value 200
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property maxRecords with value 10000
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property callProcessRecordsEvenForEmptyRecordList with value true
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property dynamoDBEndpoint with value http://dynamodb:8000
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value LATEST
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running test2 to process stream words with executable python3
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: 88a12a3b-9b5d-40c0-827f-29624ec91837
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: bleh
Mar 07, 2018 3:27:22 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/3.6 python3
Mar 07, 2018 3:27:23 PM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Mar 07, 2018 3:27:23 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Mar 07, 2018 3:27:23 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
Mar 07, 2018 3:27:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator initialize
INFO: Created new lease table for coordinator with initial read capacity of 10 and write capacity of 10.
Mar 07, 2018 3:27:40 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Mar 07, 2018 3:27:46 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Starting LeaseCoordinator
Mar 07, 2018 3:27:51 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker 88a12a3b-9b5d-40c0-827f-29624ec91837 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
Mar 07, 2018 3:27:52 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
INFO: Worker 88a12a3b-9b5d-40c0-827f-29624ec91837 successfully took 1 leases: shardId-000000000000
Mar 07, 2018 3:27:58 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker run
INFO: Initialization complete. Starting worker loop.
Mar 07, 2018 3:28:01 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker infoForce
INFO: Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=1632c618-0c26-4a52-af4c-64a2e26cb9e4, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
Mar 07, 2018 3:28:01 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No need to block on parents [] of shard shardId-000000000000
Mar 07, 2018 3:28:07 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
INFO: Initializing shard shardId-000000000000 with LATEST
Mar 07, 2018 3:28:07 PM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing InitializeMessage to child process for shard shardId-000000000000
Mar 07, 2018 3:28:08 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading STDERR for shardId-000000000000
Mar 07, 2018 3:28:08 PM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 104 bytes for shard shardId-000000000000
Mar 07, 2018 3:28:08 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Mar 07, 2018 3:28:23 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:28:23 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:29:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:29:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:30:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:30:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:31:39 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:31:39 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:32:42 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:32:42 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:33:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:33:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:34:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:34:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:35:44 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:35:44 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:36:45 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:36:45 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:37:45 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:37:45 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:38:49 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:38:49 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:39:56 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:39:56 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:40:57 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:40:57 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:41:57 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:41:57 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:43:00 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:43:00 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:44:01 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:44:01 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:45:03 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:45:03 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:46:08 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:46:08 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:47:11 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:47:11 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:48:12 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:48:12 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:49:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:49:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:50:19 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:50:19 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:51:20 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:51:20 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:52:22 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:52:22 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:53:23 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:53:23 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:54:27 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:54:27 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:55:28 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:55:28 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:56:29 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:56:29 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:57:29 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:57:29 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:58:30 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:58:30 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 3:59:30 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 3:59:30 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:00:31 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:00:31 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:01:31 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:01:31 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:02:32 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:02:32 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:03:41 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:03:41 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:04:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:04:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:05:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:05:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:06:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:06:43 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:07:49 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:07:49 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:08:52 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:08:52 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:09:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:09:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:10:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:10:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:11:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:11:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:12:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:12:53 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:13:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:13:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 4:15:01 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 4:15:01 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
My producer is working
sample_kinesis_wordputter.py --stream words -w c
at -w dog -w bird -w lobster -p 2
Connecting to stream: words in us-east-1
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words
Sleeping for 2 seconds
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words
Sleeping for 2 seconds
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words
Sleeping for 2 seconds
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words
Sleeping for 2 seconds
I'm I doing something wrong?
The KCL attempts to retrieve records using a timed process. The value of idleTimeBetweenReadsInMillis
controls how long it will sleep between calls to Kinesis. If it gets no records on a call it extends the amount of time it will sleep. The amount of time the record processor takes to process records can also slow down the rate the KCL polls for records from Kinesis. So sleeping isn't necessarily a problem unless you're falling behind.
You can check approximately how far behind the KCL thinks it is by looking at the value of millis_behind_latest
on the process records input. This requires that you use the v2 record process base class: RecordProcessorBase
.
My idleTimeBetweenReadsInMillis
is 200ms, all I'm trying to do is print(data)
in the process_record
function. I haven't made any other changes.
cat prod.properties
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = python3
/amazon-kinesis-client-python/samples/sample_kclpy_app.py
dynamoDBEndpoint = http://dynamodb:8000
# The name of an Amazon Kinesis stream to process.
streamName = words
# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = test2
# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/3.6
# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = LATEST
# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.
# The KCL defaults to us-east-1
regionName = us-east-1
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000
# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId =
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000
# Max records to fetch from Kinesis in a single GetRecords call.
maxRecords = 10000
# Idle time between record reads in milliseconds.
idleTimeBetweenReadsInMillis = 200
# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
callProcessRecordsEvenForEmptyRecordList = true
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true
# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
Is your record processor getting called correctly with an empty list of records?
Also where
Sleeping for 2 seconds
coming from?
I don't see it getting called at all. This is my function
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this recod.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
print('Data: ', data)
return
The sleeping for 2 seconds is produced by sample_kinesis_wordputter.py
, the command I'm using is
sample_kinesis_wordputter.py --stream words -w c
at -w dog -w bird -w lobster -p 2
All the output from running the consumer is in the comment above, I left it for about 20-30 minutes and no data was printed at all.
Can you follow the instructions in issue #51. This will allow configuration of the KCL's logging system. You will need to install Apache Maven to build the script, but this will allow you to configure logging. The logback.xml
file in the issue enables debug logging that will provide more context to understand what is going on.
Okay, I've done so and tried to run it like this
export CLASSPATH_PREFIX=/amazon-kinesis-client-python/logback
CLASSPATH_PREFIX=./logback amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties prod.properties
But the output is the same, I have the logback folder, I don't have the multi-lang-daemon bin to run directly.
Sorry I thought that comment had everything. This comment has the steps to generate the launcher script. This generates a directory that contains all the jars, and startup scripts to start the KCL.
Thanks, let me try that
I was able to build the bin and run the command, all the output at https://gist.github.com/jonathan-kosgei/ac6958cba7173e6fa06934dbbfb1774d.
I let it run for about 2 mins all the while the producer was still putting words in the stream. Hope this helps.
It looks like it's getting stuck waiting on the response from the initialize method:
20:42:27.998 [multi-lang-daemon-0004] INFO com.amazonaws.services.kinesis.multilang.LineReaderTask - Starting: Reading next message from STDIN for shardId-000000000000 20:42:28.085 [multi-lang-daemon-0000] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer - Previous INITIALIZE task still pending for shard shardId-000000000000 since 1005 ms ago. Not submitting new task. 20:42:28.285 [multi-lang-daemon-0000] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer - Previous INITIALIZE task still pending for shard shardId-000000000000 since 1205 ms ago. Not submitting new task. 20:42:28.486 [multi-lang-daemon-0000] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer - Previous INITIALIZE task still pending for shard shardId-000000000000 since 1406 ms ago. Not submitting new task.
Is there something happening in the initialize(...)
method that could be preventing the method from returning?
Yes that seemed odd, I'm using the code https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample_kclpy_app.py I haven't made any changes except to add print(data)
to process_record
in that file.
Something weird is definitely going on. An interesting aside is when you use Control-C on the parent process it gets sent to the child process as well. Hence a Python stack trace was printed out. I'm not sure why Python shows the stack trace it does, unless it didn't actually start executing the record processor file.
^C20:44:02.061 [multi-lang-daemon-0002] ERROR com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask - Received error line from subprocess [Traceback (most recent call last):] for shard shardId-000000000000 Traceback (most recent call last): 20:44:02.065 [multi-lang-daemon-0002] ERROR com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask - Received error line from subprocess [ File "
", line 1, in ] for shard shardId-000000000000 File " ", line 1, in 20:44:02.065 [multi-lang-daemon-0002] ERROR com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask - Received error line from subprocess [KeyboardInterrupt] for shard shardId-000000000000 KeyboardInterrupt
Here is essentially what the stack trace normally looks like (it's doubled due to having two shards in the stream):
2018-03-07 13:20:05,099 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [Traceback (most recent call last):] for shard shardId-000000000007 2018-03-07 13:20:05,099 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [Traceback (most recent call last):] for shard shardId-000000000008 2018-03-07 13:20:05,099 [Thread-2] INFO c.a.s.k.multilang.MultiLangDaemon - Process terminanted, will initiate shutdown. 2018-03-07 13:20:05,099 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ File "kcl-python-test/kcl-python-worker-v2.py", line 271, in
] for shard shardId-000000000007 2018-03-07 13:20:05,099 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ File "kcl-python-test/kcl-python-worker-v2.py", line 271, in ] for shard shardId-000000000008 2018-03-07 13:20:05,099 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ kclprocess.run()] for shard shardId-000000000007 2018-03-07 13:20:05,099 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ kclprocess.run()] for shard shardId-000000000008 2018-03-07 13:20:05,099 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ File "/usr/local/lib/python3.5/site-packages/amazon_kclpy/kcl.py", line 314, in run] for shard shardId-000000000007 2018-03-07 13:20:05,099 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ File "/usr/local/lib/python3.5/site-packages/amazon_kclpy/kcl.py", line 314, in run] for shard shardId-000000000008 2018-03-07 13:20:05,099 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ line = self.io_handler.read_line()] for shard shardId-000000000007 2018-03-07 13:20:05,099 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ line = self.io_handler.read_line()] for shard shardId-000000000008 2018-03-07 13:20:05,099 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ File "/usr/local/lib/python3.5/site-packages/amazon_kclpy/kcl.py", line 72, in read_line] for shard shardId-000000000007 2018-03-07 13:20:05,100 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ File "/usr/local/lib/python3.5/site-packages/amazon_kclpy/kcl.py", line 72, in read_line] for shard shardId-000000000008 2018-03-07 13:20:05,100 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ return self.input_file.readline()] for shard shardId-000000000007 2018-03-07 13:20:05,100 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [ return self.input_file.readline()] for shard shardId-000000000008 2018-03-07 13:20:05,100 [multi-lang-daemon-0002] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [KeyboardInterrupt] for shard shardId-000000000007 2018-03-07 13:20:05,100 [multi-lang-daemon-0003] ERROR c.a.s.k.m.DrainChildSTDERRTask - Received error line from subprocess [KeyboardInterrupt] for shard shardId-000000000008
I'm also using the version of the library from PyPI: amazon_kclpy 1.5.0
Could it be because I'm using Python 3? On Alpine Linux?
The test output is from Python 3.5, but one possible difference is I'm using something like #!/usr/local/bin/python3
in the record processor script.
Alpine Linux shouldn't cause a problem, but I've never tested on it. It does use a different libc implementation (It uses musl libc). It could cause an issue if the python wasn't compiled with a different libc version, but it would be unlikely for python to even start in that case.
You could verify that the record processor works correctly by running from the standard command line. Typing something and hitting enter should cause it to die due to JSON a deserialization failure.
e.g.
./kcl-python-worker-v2.py
Test
Produces
Traceback (most recent call last):
File "./kcl-python-worker-v2.py", line 271, in <module>
kclprocess.run()
File "/usr/local/lib/python3.5/site-packages/amazon_kclpy/kcl.py", line 316, in run
self._handle_a_line(line)
File "/usr/local/lib/python3.5/site-packages/amazon_kclpy/kcl.py", line 298, in _handle_a_line
action = self.io_handler.load_action(line)
File "/usr/local/lib/python3.5/site-packages/amazon_kclpy/kcl.py", line 84, in load_action
return json.loads(line, object_hook=dispatch.message_decode)
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/json/__init__.py", line 332, in loads
return cls(**kw).decode(s)
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
I got something similar
python3 samples/sample_kclpy_app.py
Test
Traceback (most recent call last):
File "samples/sample_kclpy_app.py", line 183, in <module>
kcl_process.run()
File "/usr/lib/python3.6/site-packages/amazon_kclpy/kcl.py", line 316, in run
self._handle_a_line(line)
File "/usr/lib/python3.6/site-packages/amazon_kclpy/kcl.py", line 298, in _handle_a_line
action = self.io_handler.load_action(line)
File "/usr/lib/python3.6/site-packages/amazon_kclpy/kcl.py", line 84, in load_action
return json.loads(line, object_hook=dispatch.message_decode)
File "/usr/lib/python3.6/json/__init__.py", line 367, in loads
return cls(**kw).decode(s)
File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
My file begins with #!/usr/bin/env python
.
I'm also using a local dynamodb endpoint.
I've added some steps to the post to help reproduce the issue.
Greetings,
are there some news or solutions to this Problem?
In fact I am also getting an issue where I have set maxRecords = 25(To fetch in a single getRecords call). But something weird is happening and it is fetching records continuously. No process method is getting called.. Please help
Try setting callProcessRecordsEvenForEmptyRecordList
to true
. This will cause the KCL to call the record processor even if no records were retrieved. While the KCL calls Kinesis consistently it's possible that no records are retrieved as there were no records available at the time of the call.
We also don't recommend setting maxRecords
to a low value as that can prevent the KCL from making progress if it falls behind.
I encountered a similar issue here like @jonathan-kosgei mentioned as simple print won't show up in the logs. I noticed that the build folder isn't reflecting and I ran python setup.py install
command. After this is done changes were reflected in the build folder and my changes on the sample_kclpy_app.py were reflected on the logs. Also noticed that sample_kclpy_app.py is getting copied into /Library/Frameworks/Python.framework/Versions/3.7/bin/sample_kclpy_app.py
the when im performing the install command.
This might me the reason for changes not getting reflected.
I could be completely in different in a page. But hope it helps.
Has any resolved this issue, I am having the same issue. seeing the shards being processed but the print(data)
does never get fired.
Can I run consumer as a cron, for example 1 time every 5 minutes? So can I set max records for each time the application run?
I had a similar problem that updates to my program in some places were not taking. Now should be noted, this was in the ruby library. It turned out that there were a lot of zombie processes that weren't killed when the rake process was killed, so I went into Task Manager and manually killed all of the running ruby processes that were spawned and not killed. Hope this helps someone that runs into the same scenario that I did
I‘m having the same issue, any fixes? Seems like the sample code does not work properly.
I encountered a similar issue here like @jonathan-kosgei mentioned as simple print won't show up in the logs. I noticed that the build folder isn't reflecting and I ran
python setup.py install
command. After this is done changes were reflected in the build folder and my changes on the sample_kclpy_app.py were reflected on the logs. Also noticed that sample_kclpy_app.py is getting copied into/Library/Frameworks/Python.framework/Versions/3.7/bin/sample_kclpy_app.py
the when im performing the install command. This might me the reason for changes not getting reflected. I could be completely in different in a page. But hope it helps.
It resolve my issue after running python setup.py install
.
Not a single one of these suggestions worked for me. For me, the issue is that the multi-lang daemon seems to be looking in the wrong places for the python code (the code is in /Users/username//AWS/amazon-kinesis-client-python/samples/sample_kclpy_app.py
, but it seems to be looking in /Users/username//AWS/amazon-kinesis-client-python/sample_kclpy_app.py
(a samples
subdirectory missing in its path). This Java multi-lang helper I run thusly:
python samples/amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties samples/sample.properties
(from the directory /Users/username//AWS/amazon-kinesis-client-python/
)
2021-07-08 15:12:55,711 [multi-lang-daemon-0001] ERROR s.a.k.multilang.DrainChildSTDERRTask [NONE] - Received error line from subprocess [/Users/username/.pyenv/versions/3.9.0/bin/python: can't open file '/Users/username//AWS/amazon-kinesis-client-python/sample_kclpy_app.py': [Errno 2] No such file or directory] for shard shardId-000000000000 /Users/srajagopalan/.pyenv/versions/3.9.0/bin/python: can't open file '/Users/srajagopalan/CloudServiceProviders/AWS/amazon-kinesis-client-python/sample_kclpy_app.py': [Errno 2] No such file or directory 2021-07-08 15:12:55,712 [multi-lang-daemon-0003] INFO s.a.kinesis.multilang.LineReaderTask [NONE] - Stopping: Reading next message from STDIN for shardId-000000000000 2021-07-08 15:12:55,712 [multi-lang-daemon-0001] INFO s.a.kinesis.multilang.LineReaderTask [NONE] - Stopping: Reading STDERR for shardId-000000000000 2021-07-08 15:12:55,714 [ShardRecordProcessor-0000] ERROR s.a.k.multilang.MultiLangProtocol [NONE] - Failed to get status message for initialize action for shard shardId-000000000000 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at software.amazon.kinesis.multilang.MultiLangProtocol.futureMethod(MultiLangProtocol.java:208) at software.amazon.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:182) at software.amazon.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:152) at software.amazon.kinesis.multilang.MultiLangProtocol.initialize(MultiLangProtocol.java:84) at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:91) at software.amazon.kinesis.lifecycle.InitializeTask.call(InitializeTask.java:103) at software.amazon.kinesis.lifecycle.ShardConsumer.executeTask(ShardConsumer.java:334) at software.amazon.kinesis.lifecycle.ShardConsumer.lambda$initializeComplete$2(ShardConsumer.java:287) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at software.amazon.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:80) at software.amazon.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:29) at software.amazon.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:67) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
@sonnyrajagopalanuptycs check out the consumer described in this example: https://github.com/aws-samples/amazon-kinesis-data-processor-aws-fargate
It runs in a container but this is not a must of course.
Thank you, @flo-mair . But I am not sure what changes with this consumer in https://github.com/aws-samples/amazon-kinesis-data-processor-aws-fargate/blob/master/consumer/record_processor.py (I have not tried it, but not sure how to either--my problem is the bombing Multi-Lang-Daemon, as I mentioned above). That said, the process_record ()
function is different in the code you allude to (though my setup with KCL is bombing elsewhere).
I just want to do a read. Why is it so hard to find simpler SDK and a really simple example? AWS' sample code simply does not work, and hasn't worked for a while, from what I can tell.
It is clear that I am missing something (I am dead certain), but shouldn't I expect a sample to work out of the box without so much hair pulling configuration fiddling?