pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

Auto-created topics partitions incorrect / divide by zero

Open rjemanuele opened this issue 6 years ago • 3 comments

PyKafka version: 2.8.0 Kafka version: 2.2.0

When producing to an auto-created topic a divide by zero exception can occur.

Traceback (most recent call last):
  File "/opt/system/project/usr/share/signal_generator.py", line 464, in emit
    producer.produce(j.encode())
  File "/usr/local/lib/python3.7/site-packages/pykafka/producer.py", line 385, in produce
    partition_id = self._partitioner(partitions, partition_key).id
  File "/usr/local/lib/python3.7/site-packages/pykafka/partitioners.py", line 47, in __call__
    self.idx = (self.idx + 1) % len(partitions)
ZeroDivisionError: integer division or modulo by zero

My thought is that the partition list may be incomplete when a message is produced. (eg. Kafka hasn't fully created the partition list or pyKafka hasn't updated it yet.)

rjemanuele avatar May 02 '19 17:05 rjemanuele

@emmett9001 Hi Emmett, Could you have a look at this one? I'm not sure what the best course of action is here. Thanks.

rjemanuele avatar May 06 '19 19:05 rjemanuele

Assuming your topic is actually being created with the proper number of partitions, a workaround could be to initialize a producer with a preexisting topic name instead of relying on autocreation.

emmettbutler avatar May 07 '19 05:05 emmettbutler

So I found a retry and update_cluster does work. These topics are low volume, single partition.

    retry = 0
    topic = self.kafka.topics[mytopic]
    while retry < self.retry_topic_partitions_found:
        if len(topic.partitions):
            break
        logging.warning("No partitions found on retry %d for %s" % (
            retry, mytopic))
        retry += 1
        time.sleep(.25)
        self.kafka.update_cluster()
        topic = self.kafka.topics[mytopic]

    if len(topic.partitions) == 0:
        logging.error("No partitions found after retries for %s" % (
            mytopic))

    self.producer = topic.get_producer()

For this, I've only ever seen a single retry.

rjemanuele avatar May 07 '19 22:05 rjemanuele