confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

1 second delay in first Producer.produce if producer created before subscribing to the topic

Open r-owen opened this issue 3 years ago • 2 comments

Description

I'm seeing an unexpected approx. 1 second delay in producing the first message (despite specifying queue.buffering.max.ms=0)

How to reproduce

Run the following code:

import base64
import os
import random
import string
import time

from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic

PRODUCER_WAIT_ACKS = 1
BROKER_ADDR = "broker:29092"

def make_producer():
    return Producer(
        {
            "acks": PRODUCER_WAIT_ACKS,
            "queue.buffering.max.ms": 0,
            "bootstrap.servers": BROKER_ADDR,
        }
    )

def create_topic(topic_name):
    broker_client = AdminClient({"bootstrap.servers": BROKER_ADDR})
    new_topic = NewTopic(topic=topic_name, num_partitions=1, replication_factor=1)
    create_result = broker_client.create_topics([new_topic])
    for topic_name, future in create_result.items():
        assert future.exception() is None
    print(f"created topic {topic_name}")

def write(producer, topic_name, raw_data):
    t0 = time.monotonic()

    def callback(err, _):
        assert err is None
        dt = time.monotonic() - t0
        print(f"write took {dt:0.2f} seconds")

    producer.produce(topic_name, raw_data, on_delivery=callback)
    producer.flush()

def main():
    random_str = base64.urlsafe_b64encode(os.urandom(9)).decode().replace("=", "_")
    topic_name = f"test_{random_str}"
    print(f"topic_name={topic_name}")
    producer = make_producer()
    create_topic(topic_name)

    for _ in range(10):
        random_data = "".join(
            random.choice(string.printable) for i in range(100)
        ).encode()
        write(producer, topic_name, random_data)


main()

My output is:

topic_name=test_X896dxXeK4Ue
created topic test_X896dxXeK4Ue
write took 0.85 seconds
write took 0.00 seconds
write took 0.00 seconds
write took 0.00 seconds
write took 0.00 seconds
write took 0.00 seconds
write took 0.00 seconds
write took 0.00 seconds
write took 0.00 seconds
write took 0.00 seconds

If I call subscribe(topic_name) before calling make_producer() and only write a very short byte array, then the delay goes away.

Am I doing something stupid?

Note: our system values minimal latency and reliability over maximum throughput. Hence calling producer.flush() and waiting for at least one ack.

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): confluent_kafka.version(): ('1.8.2', 17302016) confluent_kafka.libversion(): ('1.7.0', 17236223)

  • [ ] Apache Kafka broker version: I am running Docker image confluentinc/cp-enterprise-kafka 6.2.4 Along with the same version of cp-schema-registry and cp-zookeeper. The example doesn't use schemas, but my real code does.

  • [ ] Client configuration: {...} AdminClient uses {"bootstrap.servers": ..., "api.version.request": True} (api.version.request based on some google suggestion for an earlier problem I was having; removing it makes no difference).

    Also, broker configuration, from my docker-compose.yaml: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  • [ ] Operating system: AlmaLinux release 8.6 (running on Docker version 20.10.17, build 100c701 on macOS 12.5.1)

  • [ ] Provide client logs (with 'debug': '..' as necessary) I'm not sure what to do here.

  • [ ] Provide broker log excerpts I'm not sure what to do here.

  • [ ] Critical issue ? I'm just reporting an unexpected delay.

r-owen avatar Aug 25 '22 16:08 r-owen

In case it helps, here is my docker-compose file (zipped, since I can't upload plain yaml). I use this command to start my Kafka system: docker-compose up -d zookeeper broker schema-registry (you can skip the schema-registry for this demo).

docker-compose.yaml.zip

r-owen avatar Aug 25 '22 17:08 r-owen

This appears to be https://github.com/confluentinc/confluent-kafka-dotnet/issues/701

r-owen avatar Aug 25 '22 19:08 r-owen

the code you provide doesn't have a subscribe method.

it looks like you are creating the producer before the topic, so initially the producer won't have the metadata for the topic. i'm not surprised there is a delay before it produces, though i don't see why we can't do better than 1s.

mhowlett avatar Oct 24 '22 17:10 mhowlett

@mhowlett You were right that constructing the producer after the topic helps immensely. In my tests the initial delay is down to 0.01 seconds in that order. Thank you.

(Regarding "the code you provide doesn't have a subscribe method.": the code only produces data, it does not consume it, so I don't see how subscribing is possible.)

r-owen avatar Jan 30 '23 23:01 r-owen