io icon indicating copy to clipboard operation
io copied to clipboard

tensorflow.org kafka.ipynb failing

Open MarkDaoust opened this issue 3 years ago • 3 comments

In kafka.ipynb:

write_to_kafka("susy-train", zip(x_train, y_train))

---------------------------------------------------------------------------
NoBrokersAvailable                        Traceback (most recent call last)
/tmp/ipykernel_16467/922230483.py in <module>
     11   print("Wrote {0} messages into topic: {1}".format(count, topic_name))
     12 
---> 13 write_to_kafka("susy-train", zip(x_train, y_train))
     14 write_to_kafka("susy-test", zip(x_test, y_test))

/tmp/ipykernel_16467/922230483.py in write_to_kafka(topic_name, items)
      4 def write_to_kafka(topic_name, items):
      5   count=0
----> 6   producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
      7   for message, key in items:
      8     producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)

/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
    381         client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
    382                              wakeup_timeout_ms=self.config['max_block_ms'],
--> 383                              **self.config)
    384 
    385         # Get auto-discovered version from client if necessary

/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kafka/client_async.py in __init__(self, **configs)
    242         if self.config['api_version'] is None:
    243             check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
--> 244             self.config['api_version'] = self.check_version(timeout=check_timeout)
    245 
    246     def _can_bootstrap(self):

/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict)
    898             if try_node is None:
    899                 self._lock.release()
--> 900                 raise Errors.NoBrokersAvailable()
    901             self._maybe_connect(try_node)
    902             conn = self._conns[try_node]

NoBrokersAvailable: NoBrokersAvailable
NoBrokersAvailable: NoBrokersAvailable

MarkDaoust avatar Nov 22 '21 18:11 MarkDaoust

Thanks for filing these Mark. Adding my notes from my investigation.

NoBrokersAvailable comes from a failure to launch Kafka, as the downloaded archive does not exist.

!curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
!tar -xzf kafka_2.13-2.7.0.tgz

gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now

Once that's resolved (#1570), the final step still fails:

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  model.fit(mini_ds, epochs=3)

I think that online_train_ds is still yielding batches (mini_ds) after the queue is exhausted:

model.fit(mini_ds, epochs=3)

ValueError: Unexpected result of `train_function` (Empty logs). Please use `Model.compile(..., run_eagerly=True)`, or `tf.config.run_functions_eagerly(True)` for more information of where went wrong, or file a issue/bug to `tf.keras`.
next(iter(mini_ds.take(1)))

OutOfRangeError: End of sequence [Op:IteratorGetNext]

markmcd avatar Nov 23 '21 02:11 markmcd

Thanks @MarkDaoust @markmcd. I created the PR https://github.com/tensorflow/io/pull/1572 to fix this empty mini ds issue. Please take a look.

kvignesh1420 avatar Nov 24 '21 01:11 kvignesh1420

https://github.com/tensorflow/io/pull/1640 recently tested kafka.ipynb, it is now working. likely due to a stale source where the notebook is trying to download a binary from. mentioned in this issue https://github.com/tensorflow/io/issues/1639

stancsz avatar Feb 11 '22 23:02 stancsz