winton-kafka-streams icon indicating copy to clipboard operation
winton-kafka-streams copied to clipboard

Wordcount example: TypeError: an integer is required (got type tuple)

Open dqii opened this issue 4 years ago • 2 comments

I'm getting an error when I run the wordcount example. I cloned the git repo and did not make any changes. The issue seems to be that timestamp is represented as a tuple, but it expects an integer.

I ran zookeeper-server-start.sh, kafka-server-start.sh, example.py, and source_client.py.

Source client output:

producing a b c to wks-wordcount-example-topic source_client.py:9: DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats p.produce(topic, data.encode('utf-8')) producing a b to wks-wordcount-example-topic producing a to wks-wordcount-example-topic

example.py output:

WARNING:winton_kafka_streams.processor._stream_thread(Thread-1):Unexpected state transition from RUNNING to ASSIGNING_PARTITIONS. WARNING:winton_kafka_streams.processor._stream_thread(Thread-1):Unexpected state transition from RUNNING to NOT_RUNNING. Exception in thread Thread-1: Traceback (most recent call last): File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/local/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_thread.py", line 140, in run self.process_and_punctuate() File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_thread.py", line 181, in process_and_punctuate if task.process(): File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_task.py", line 122, in process self.topology.sources[topic].process(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/topology.py", line 25, in process self.processor.process(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/processor.py", line 35, in process self.context.forward(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/processor_context.py", line 49, in forward child.process(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/topology.py", line 25, in process self.processor.process(key, value) File "example.py", line 37, in process self.word_count_store[word] = count + 1 File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/state/logging/change_logging_state_store.py", line 46, in setitem self.change_logger.log_change(key_bytes, value_bytes) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/state/logging/store_change_logger.py", line 10, in log_change self.record_collector.send(self.topic, key, value, self.context.timestamp, partition=self.partition) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_record_collector.py", line 38, in send self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp) TypeError: an integer is required (got type tuple)

dqii avatar Jan 14 '20 20:01 dqii

I experienced the same issue - it seems to be an issue with the timestamp handling (not sure why) - try changing the offending line to

                self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp[1])

seanrmurphy avatar Sep 11 '20 15:09 seanrmurphy

You can try this fix: https://github.com/wintoncode/winton-kafka-streams/pull/58/files I had issues with timestamp[1] due to sometimes not receiving a tuple but a float.

MaximWolpher avatar Sep 13 '20 16:09 MaximWolpher