io
io copied to clipboard
tensorflow.org kafka.ipynb failing
In kafka.ipynb:
write_to_kafka("susy-train", zip(x_train, y_train))
---------------------------------------------------------------------------
NoBrokersAvailable Traceback (most recent call last)
/tmp/ipykernel_16467/922230483.py in <module>
11 print("Wrote {0} messages into topic: {1}".format(count, topic_name))
12
---> 13 write_to_kafka("susy-train", zip(x_train, y_train))
14 write_to_kafka("susy-test", zip(x_test, y_test))
/tmp/ipykernel_16467/922230483.py in write_to_kafka(topic_name, items)
4 def write_to_kafka(topic_name, items):
5 count=0
----> 6 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
7 for message, key in items:
8 producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
381 client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
382 wakeup_timeout_ms=self.config['max_block_ms'],
--> 383 **self.config)
384
385 # Get auto-discovered version from client if necessary
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kafka/client_async.py in __init__(self, **configs)
242 if self.config['api_version'] is None:
243 check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
--> 244 self.config['api_version'] = self.check_version(timeout=check_timeout)
245
246 def _can_bootstrap(self):
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict)
898 if try_node is None:
899 self._lock.release()
--> 900 raise Errors.NoBrokersAvailable()
901 self._maybe_connect(try_node)
902 conn = self._conns[try_node]
NoBrokersAvailable: NoBrokersAvailable
NoBrokersAvailable: NoBrokersAvailable
Thanks for filing these Mark. Adding my notes from my investigation.
NoBrokersAvailable
comes from a failure to launch Kafka, as the downloaded archive does not exist.
!curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
!tar -xzf kafka_2.13-2.7.0.tgz
gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now
Once that's resolved (#1570), the final step still fails:
for mini_ds in online_train_ds:
mini_ds = mini_ds.shuffle(buffer_size=32)
mini_ds = mini_ds.map(decode_kafka_online_item)
mini_ds = mini_ds.batch(32)
model.fit(mini_ds, epochs=3)
I think that online_train_ds
is still yielding batches (mini_ds
) after the queue is exhausted:
model.fit(mini_ds, epochs=3)
ValueError: Unexpected result of `train_function` (Empty logs). Please use `Model.compile(..., run_eagerly=True)`, or `tf.config.run_functions_eagerly(True)` for more information of where went wrong, or file a issue/bug to `tf.keras`.
next(iter(mini_ds.take(1)))
OutOfRangeError: End of sequence [Op:IteratorGetNext]
Thanks @MarkDaoust @markmcd. I created the PR https://github.com/tensorflow/io/pull/1572 to fix this empty mini ds issue. Please take a look.
https://github.com/tensorflow/io/pull/1640 recently tested kafka.ipynb, it is now working. likely due to a stale source where the notebook is trying to download a binary from. mentioned in this issue https://github.com/tensorflow/io/issues/1639