confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Kafka producer is slow on first message
Description
The first message sent after producer is created gets acknowledged with 1 second delay.
How to reproduce
- Run the below producer code two times.
- Check how quickly messages are producer and get acknowledged on the second run.
Expected: messages are acknowledged with milliseconds or at least tens of milliseconds. Actual: the first message always get acknowledged only after 1 second.
Observations:
-
The suggestion to run 2 times is only because the very first time it take about 2 seconds to send the first message (probably because of the overhead to create a new topic).
-
If producer writes to more than one topic then there is a 1-second delay for first messages in each topic.
-
Setting TopicMetadataRefreshIntervalMs to 100ms significantly reduces the acknowledgement for the first message.
-
Before sending the first message to kafka producer sends METADATA requests for all topics. After 1 second it sends another METADATA request, this time for the specific topic that producer is trying to push message to. This gap between two METADATA requests seem to be causing the delay.
static void Main() { var producer = new Producer<Null, string>(new ProducerConfig { BootstrapServers = "localhost:9092" }); var i = 0; while (i < 4) { var start = DateTime.Now; Console.WriteLine($"{start:HH:mm:ss.fff} Sending message."); var deliveryReport = producer.ProduceAsync( new TopicPartition("test-topic", Partition.Any), new Message<Null, string> { Value = "hello kafka!" }); deliveryReport.Wait(2000); var now = DateTime.Now; Console.WriteLine($"{now:HH:mm:ss.fff} Message sent in {(now - start).TotalMilliseconds:N1}."); i++; } producer.Dispose(); }
Checklist
Please provide the following information:
- [x] Confluent.Kafka nuget version: 1.0.0-beta2
- [x] Apache Kafka version: 2.11-2.0.1
- [x] Client configuration: see example app
- [x] Operating system: Windows 10
- [x] Provide logs (with "debug" : "..." as necessary in configuration) 12:14:17.951 Sending message. 12:14:18.967 Message sent in 1,016.4. 12:14:18.969 Sending message. 12:14:18.972 Message sent in 3.0. 12:14:18.973 Sending message. 12:14:18.976 Message sent in 3.0. 12:14:18.976 Sending message. 12:14:18.979 Message sent in 3.0. Press any key to continue . . .
- [x] Provide broker log excerpts [2018-12-13 12:14:17,954] TRACE Processor 0 received request: RequestHeader(apiKey=METADATA, apiVersion=2, clientId=rdkafka, correlationId=2) -- {topics=[]} (kafka.network.RequestChannel$) [2018-12-13 12:14:17,954] TRACE [KafkaApi-0] Handling request:RequestHeader(apiKey=METADATA, apiVersion=2, clientId=rdkafka, correlationId=2) -- {topics=[]} from connection 127.0.0.1:9092-127.0.0.1:61171-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis) [2018-12-13 12:14:17,955] TRACE [KafkaApi-0] Sending topic metadata and brokers 0 : (EndPoint(XXX,9092,ListenerName(PLAINTEXT),PLAINTEXT)) : null for correlation id 2 to client rdkafka (kafka.server.KafkaApis) [2018-12-13 12:14:17,955] TRACE Socket server received response to send to 127.0.0.1:9092-127.0.0.1:61171-1, registering for write and sending data: Response(type=Send, request=Request(processor=0, connectionId=127.0.0.1:9092-127.0.0.1:61171-1, session=Session(User:ANONYMOUS,/127.0.0.1), listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null), send=org.apache.kafka.common.network.NetworkSend@76c89719, asString=Some({brokers=[{node_id=0,host=XXX,port=9092,rack=null}],cluster_id=wb6esWJgRYOEsPGPnFaJ6Q,controller_id=0,topic_metadata=[]})) (kafka.network.Processor) [2018-12-13 12:14:17,956] DEBUG Completed request:RequestHeader(apiKey=METADATA, apiVersion=2, clientId=rdkafka, correlationId=2) -- {topics=[]},response:{brokers=[{node_id=0,host=XXX,port=9092,rack=null}],cluster_id=wb6esWJgRYOEsPGPnFaJ6Q,controller_id=0,topic_metadata=[]} from connection 127.0.0.1:9092-127.0.0.1:61171-1;totalTime:1.838,requestQueueTime:0.081,localTime:0.716,remoteTime:0.0,throttleTime:0.153,responseQueueTime:0.164,sendTime:1.015,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2018-12-13 12:14:18,929] TRACE Processor 0 received request: RequestHeader(apiKey=METADATA, apiVersion=2, clientId=rdkafka, correlationId=3) -- {topics=[test-topic]} (kafka.network.RequestChannel$) [2018-12-13 12:14:18,929] TRACE [KafkaApi-0] Handling request:RequestHeader(apiKey=METADATA, apiVersion=2, clientId=rdkafka, correlationId=3) -- {topics=[test-topic]} from connection 127.0.0.1:9092-127.0.0.1:61171-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis) [2018-12-13 12:14:18,930] TRACE [KafkaApi-0] Sending topic metadata (type=TopicMetadata, error=NONE, topic=test-topic, isInternal=false, partitionMetadata=[(type=PartitionMetadata, error=NONE, partition=0, leader=XXX:9092 (id: 0 rack: null), replicas=XXX:9092 (id: 0 rack: null), isr=XXX:9092 (id: 0 rack: null), offlineReplicas=)]) and brokers 0 : (EndPoint(XXX,9092,ListenerName(PLAINTEXT),PLAINTEXT)) : null for correlation id 3 to client rdkafka (kafka.server.KafkaApis) [2018-12-13 12:14:18,930] TRACE Socket server received response to send to 127.0.0.1:9092-127.0.0.1:61171-1, registering for write and sending data: Response(type=Send, request=Request(processor=0, connectionId=127.0.0.1:9092-127.0.0.1:61171-1, session=Session(User:ANONYMOUS,/127.0.0.1), listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null), send=org.apache.kafka.common.network.NetworkSend@7691cae2, asString=Some({brokers=[{node_id=0,host=XXX,port=9092,rack=null}],cluster_id=wb6esWJgRYOEsPGPnFaJ6Q,controller_id=0,topic_metadata=[{error_code=0,topic=test-topic,is_internal=false,partition_metadata=[{error_code=0,partition=0,leader=0,replicas=[0],isr=[0]}]}]})) (kafka.network.Processor)
- [ ] Critical issue
Attaching kafka-request.log. kafka-request.log
I got here after seeing this issue in python.
Would be great if it's possible to unite the two metadata requests before a producer sends the first message. While decreasing topic.metadata.refresh.interval.ms
indeed solves the latency issue, it's drastically increasing the CPU load of the Producer and that's a high price to pay for a problem that only happens at the first message.
@edenhill - haven't looked closely, but superficially appears to be something that should be optimized to me. comments?
I got here after seeing this issue in python.
Would be great if it's possible to unite the two metadata requests before a producer sends the first message. While decreasing
topic.metadata.refresh.interval.ms
indeed solves the latency issue, it's drastically increasing the CPU load of the Producer and that's a high price to pay for a problem that only happens at the first message.
I've started to notice this in python as well during the last 2 weeks, before that there was no such problem.
settings topic.metadata.refresh.interval.ms: 10
lowered delay from 2 seconds to 10ms for me when producing 2 messages after creating producer
bump
Hey @mhowlett, bumping this up as it also affects our wrapper (https://github.com/bookingcom/perl-Net-Kafka). Any help needed?
Are you seeing this on librdkafka v1.3.0?
If so, please reproduce with debug=msg,topic,broker,protocol,queue
and provide the logs.
@edenhill yep, 1.3.0, built it today. Sure, here you go, the following sequence sends 2 messages to a cluster of 3 brokers (I marked the spot where await happens). Can also confirm that lowering topic.metadata.refresh.interval.ms
reduces delay time.
$ perl producer.pl
%7|1585250579.879|WAKEUPFD|rdkafka#producer-1| [thrd:app]: kafka-806:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1585250579.879|BROKER|rdkafka#producer-1| [thrd:app]: kafka-806:9092/bootstrap: Added new broker with NodeId -1
%7|1585250579.880|CONNECT|rdkafka#producer-1| [thrd:app]: kafka-806:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1585250579.880|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1585250579.880|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.3.0 (0x10300ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS ZLIB SSL ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xe6)
%7|1585250579.880|BRKMAIN|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Enter main broker thread
%7|1585250579.880|CONNECT|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Received CONNECT op
%7|1585250579.880|STATE|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1585250579.880|CONNECT|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1585250579.880|STATE|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1585250579.882|CONNECT|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Connecting to ipv4#[[REDACTED]]:9092 (plaintext) with socket 9
%7|1585250579.882|CONNECT|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Connected to ipv4#[[REDACTED]]:9092
%7|1585250579.882|CONNECTED|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Connected (#1)
%7|1585250579.882|FEATURE|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1585250579.882|STATE|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1585250579.882|SEND|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1585250579.882|RECV|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Received ApiVersionResponse (v0, 276 bytes, CorrId 1, rtt 0.22ms)
%7|1585250579.882|FEATURE|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1585250579.882|STATE|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1585250579.882|SEND|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
%7|1585250579.882|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: test-topic
%7|1585250579.882|RECV|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Received MetadataResponse (v2, 189 bytes, CorrId 2, rtt 0.19ms)
%7|1585250579.883|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW test-topic [-1] 0x7f1f7fb5c400 (at rd_kafka_topic_new0:393)
%7|1585250579.883|WAKEUPFD|rdkafka#producer-1| [thrd:main]: kafka-806.[[REDACTED]]:9092/806: Enabled low-latency ops queue wake-ups
%7|1585250579.883|BROKER|rdkafka#producer-1| [thrd:main]: kafka-806.[[REDACTED]]:9092/806: Added new broker with NodeId 806
%7|1585250579.883|WAKEUPFD|rdkafka#producer-1| [thrd:main]: kafka-804.[[REDACTED]]:9092/804: Enabled low-latency ops queue wake-ups
%7|1585250579.883|BROKER|rdkafka#producer-1| [thrd:main]: kafka-804.[[REDACTED]]:9092/804: Added new broker with NodeId 804
%7|1585250579.883|WAKEUPFD|rdkafka#producer-1| [thrd:main]: kafka-805.[[REDACTED]]:9092/805: Enabled low-latency ops queue wake-ups
%7|1585250579.883|BRKMAIN|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Enter main broker thread
%7|1585250579.883|BROKER|rdkafka#producer-1| [thrd:main]: kafka-805.[[REDACTED]]:9092/805: Added new broker with NodeId 805
%7|1585250579.883|BRKMAIN|rdkafka#producer-1| [thrd:kafka-806.[[REDACTED]]:9092/806]: kafka-806.[[REDACTED]]:9092/806: Enter main broker thread
%7|1585250579.883|CLUSTERID|rdkafka#producer-1| [thrd:main]: kafka-806:9092/bootstrap: ClusterId update "" -> "U3Nele_3RtWXM3KNaWDKeg"
%7|1585250579.883|CONTROLLERID|rdkafka#producer-1| [thrd:main]: kafka-806:9092/bootstrap: ControllerId update -1 -> 804
%7|1585250579.883|BRKMAIN|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Enter main broker thread
--- LONG AWAIT HAPPENS ---
%7|1585250580.880|NOINFO|rdkafka#producer-1| [thrd:main]: Topic test-topic metadata information unknown
%7|1585250580.880|NOINFO|rdkafka#producer-1| [thrd:main]: Topic test-topic partition count is zero: should refresh metadata
%7|1585250580.880|SEND|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Sent MetadataRequest (v2, 45 bytes @ 0, CorrId 3)
%7|1585250580.880|RECV|rdkafka#producer-1| [thrd:kafka-806:9092/bootstrap]: kafka-806:9092/bootstrap: Received MetadataResponse (v2, 372 bytes, CorrId 3, rtt 0.61ms)
%7|1585250580.881|STATE|rdkafka#producer-1| [thrd:main]: Topic test-topic changed state unknown -> exists
%7|1585250580.881|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic test-topic partition count changed from 0 to 6
%7|1585250580.881|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW test-topic [0] 0x7f1f7c371000 (at rd_kafka_topic_partition_cnt_update:703)
%7|1585250580.881|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW test-topic [1] 0x7f1f7c371400 (at rd_kafka_topic_partition_cnt_update:703)
%7|1585250580.881|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW test-topic [2] 0x7f1f7c371800 (at rd_kafka_topic_partition_cnt_update:703)
%7|1585250580.881|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW test-topic [3] 0x7f1f7c371c00 (at rd_kafka_topic_partition_cnt_update:703)
%7|1585250580.881|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW test-topic [4] 0x7f1f7c372000 (at rd_kafka_topic_partition_cnt_update:703)
%7|1585250580.881|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW test-topic [5] 0x7f1f7c372400 (at rd_kafka_topic_partition_cnt_update:703)
%7|1585250580.881|METADATA|rdkafka#producer-1| [thrd:main]: Topic test-topic partition 0 Leader 805
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [0]: delegate to broker kafka-805.[[REDACTED]]:9092/805 (rktp 0x7f1f7c371000, term 0, ref 2)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [0]: delegating to broker kafka-805.[[REDACTED]]:9092/805 for partition with 0 messages (0 bytes) queued
%7|1585250580.881|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic test-topic [0] 0x7f1f7c371000 from (none) to kafka-805.[[REDACTED]]:9092/805 (sending PARTITION_JOIN to kafka-805.[[REDACTED]]:9092/805)
%7|1585250580.881|METADATA|rdkafka#producer-1| [thrd:main]: Topic test-topic partition 1 Leader 804
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [1]: delegate to broker kafka-804.[[REDACTED]]:9092/804 (rktp 0x7f1f7c371400, term 0, ref 2)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [1]: delegating to broker kafka-804.[[REDACTED]]:9092/804 for partition with 0 messages (0 bytes) queued
%7|1585250580.881|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic test-topic [1] 0x7f1f7c371400 from (none) to kafka-804.[[REDACTED]]:9092/804 (sending PARTITION_JOIN to kafka-804.[[REDACTED]]:9092/804)
%7|1585250580.881|METADATA|rdkafka#producer-1| [thrd:main]: Topic test-topic partition 2 Leader 806
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [2]: delegate to broker kafka-806.[[REDACTED]]:9092/806 (rktp 0x7f1f7c371800, term 0, ref 2)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [2]: delegating to broker kafka-806.[[REDACTED]]:9092/806 for partition with 0 messages (0 bytes) queued
%7|1585250580.881|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic test-topic [2] 0x7f1f7c371800 from (none) to kafka-806.[[REDACTED]]:9092/806 (sending PARTITION_JOIN to kafka-806.[[REDACTED]]:9092/806)
%7|1585250580.881|TOPBRK|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Topic test-topic [1]: joining broker (rktp 0x7f1f7c371400, 0 message(s) queued)
%7|1585250580.881|METADATA|rdkafka#producer-1| [thrd:main]: Topic test-topic partition 3 Leader 805
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [3]: delegate to broker kafka-805.[[REDACTED]]:9092/805 (rktp 0x7f1f7c371c00, term 0, ref 2)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [3]: delegating to broker kafka-805.[[REDACTED]]:9092/805 for partition with 0 messages (0 bytes) queued
%7|1585250580.881|TOPBRK|rdkafka#producer-1| [thrd:kafka-806.[[REDACTED]]:9092/806]: kafka-806.[[REDACTED]]:9092/806: Topic test-topic [2]: joining broker (rktp 0x7f1f7c371800, 0 message(s) queued)
%7|1585250580.881|FETCHADD|rdkafka#producer-1| [thrd:kafka-806.[[REDACTED]]:9092/806]: kafka-806.[[REDACTED]]:9092/806: Added test-topic [2] to active list (1 entries, opv 0, 0 messages queued)
%7|1585250580.881|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic test-topic [3] 0x7f1f7c371c00 from (none) to kafka-805.[[REDACTED]]:9092/805 (sending PARTITION_JOIN to kafka-805.[[REDACTED]]:9092/805)
%7|1585250580.881|METADATA|rdkafka#producer-1| [thrd:main]: Topic test-topic partition 4 Leader 804
%7|1585250580.881|TOPBRK|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Topic test-topic [0]: joining broker (rktp 0x7f1f7c371000, 0 message(s) queued)
%7|1585250580.881|FETCHADD|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Added test-topic [0] to active list (1 entries, opv 0, 0 messages queued)
%7|1585250580.881|TOPBRK|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Topic test-topic [3]: joining broker (rktp 0x7f1f7c371c00, 0 message(s) queued)
%7|1585250580.881|FETCHADD|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Added test-topic [3] to active list (2 entries, opv 0, 0 messages queued)
%7|1585250580.881|FETCHADD|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Added test-topic [1] to active list (1 entries, opv 0, 0 messages queued)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [4]: delegate to broker kafka-804.[[REDACTED]]:9092/804 (rktp 0x7f1f7c372000, term 0, ref 2)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [4]: delegating to broker kafka-804.[[REDACTED]]:9092/804 for partition with 0 messages (0 bytes) queued
%7|1585250580.881|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic test-topic [4] 0x7f1f7c372000 from (none) to kafka-804.[[REDACTED]]:9092/804 (sending PARTITION_JOIN to kafka-804.[[REDACTED]]:9092/804)
%7|1585250580.881|METADATA|rdkafka#producer-1| [thrd:main]: Topic test-topic partition 5 Leader 806
%7|1585250580.881|TOPBRK|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Topic test-topic [4]: joining broker (rktp 0x7f1f7c372000, 0 message(s) queued)
%7|1585250580.881|FETCHADD|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Added test-topic [4] to active list (2 entries, opv 0, 0 messages queued)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [5]: delegate to broker kafka-806.[[REDACTED]]:9092/806 (rktp 0x7f1f7c372400, term 0, ref 2)
%7|1585250580.881|BRKDELGT|rdkafka#producer-1| [thrd:main]: test-topic [5]: delegating to broker kafka-806.[[REDACTED]]:9092/806 for partition with 0 messages (0 bytes) queued
%7|1585250580.881|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic test-topic [5] 0x7f1f7c372400 from (none) to kafka-806.[[REDACTED]]:9092/806 (sending PARTITION_JOIN to kafka-806.[[REDACTED]]:9092/806)
%7|1585250580.881|PARTCNT|rdkafka#producer-1| [thrd:main]: Partitioning 2 unassigned messages in topic test-topic to 6 partitions
%7|1585250580.881|TOPBRK|rdkafka#producer-1| [thrd:kafka-806.[[REDACTED]]:9092/806]: kafka-806.[[REDACTED]]:9092/806: Topic test-topic [5]: joining broker (rktp 0x7f1f7c372400, 0 message(s) queued)
%7|1585250580.881|FETCHADD|rdkafka#producer-1| [thrd:kafka-806.[[REDACTED]]:9092/806]: kafka-806.[[REDACTED]]:9092/806: Added test-topic [5] to active list (2 entries, opv 0, 0 messages queued)
%7|1585250580.881|UAS|rdkafka#producer-1| [thrd:main]: 2/2 messages were partitioned in topic test-topic
%7|1585250580.881|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) in xmit queue (1 added from partition queue)
%7|1585250580.881|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) queued but broker not up
%7|1585250580.881|STATE|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Broker changed state INIT -> TRY_CONNECT
%7|1585250580.881|METADATA|rdkafka#producer-1| [thrd:main]: kafka-806:9092/bootstrap: 1/1 requested topic(s) seen in metadata
%7|1585250580.881|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) in xmit queue (1 added from partition queue)
%7|1585250580.881|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) queued but broker not up
%7|1585250580.881|STATE|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Broker changed state INIT -> TRY_CONNECT
%7|1585250580.881|CONNECT|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: broker in state TRY_CONNECT connecting
%7|1585250580.881|STATE|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Broker changed state TRY_CONNECT -> CONNECT
%7|1585250580.881|CONNECT|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: broker in state TRY_CONNECT connecting
%7|1585250580.881|STATE|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Broker changed state TRY_CONNECT -> CONNECT
%7|1585250580.882|CONNECT|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Connecting to ipv4#[[REDACTED]]:9092 (plaintext) with socket 18
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) queued but broker not up
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) queued but broker not up
%7|1585250580.882|CONNECT|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Connecting to ipv4#[[REDACTED]]:9092 (plaintext) with socket 19
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) queued but broker not up
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) queued but broker not up
%7|1585250580.882|CONNECT|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Connected to ipv4#[[REDACTED]]:9092
%7|1585250580.882|CONNECTED|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Connected (#1)
%7|1585250580.882|FEATURE|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1585250580.882|STATE|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1585250580.882|CONNECT|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Connected to ipv4#[[REDACTED]]:9092
%7|1585250580.882|CONNECTED|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Connected (#1)
%7|1585250580.882|FEATURE|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1585250580.882|STATE|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1585250580.882|SEND|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) queued but broker not up
%7|1585250580.882|SEND|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.882|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) queued but broker not up
%7|1585250580.882|RECV|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Received ApiVersionResponse (v0, 276 bytes, CorrId 1, rtt 0.22ms)
%7|1585250580.882|FEATURE|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1585250580.882|STATE|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Broker changed state APIVERSION_QUERY -> UP
%7|1585250580.883|TOPPAR|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.883|PRODUCE|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4]: Produce MessageSet with 1 message(s) (68 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
%7|1585250580.883|SEND|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Sent ProduceRequest (v7, 133 bytes @ 0, CorrId 2)
%7|1585250580.883|RECV|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Received ApiVersionResponse (v0, 276 bytes, CorrId 1, rtt 0.44ms)
%7|1585250580.883|FEATURE|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1585250580.883|STATE|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Broker changed state APIVERSION_QUERY -> UP
%7|1585250580.883|TOPPAR|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0] 1 message(s) in xmit queue (0 added from partition queue)
%7|1585250580.883|PRODUCE|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0]: Produce MessageSet with 1 message(s) (68 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
%7|1585250580.883|SEND|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Sent ProduceRequest (v7, 133 bytes @ 0, CorrId 2)
%7|1585250580.883|RECV|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: Received ProduceResponse (v7, 62 bytes, CorrId 2, rtt 0.30ms)
%7|1585250580.883|MSGSET|rdkafka#producer-1| [thrd:kafka-804.[[REDACTED]]:9092/804]: kafka-804.[[REDACTED]]:9092/804: test-topic [4]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
%7|1585250580.883|RECV|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: Received ProduceResponse (v7, 62 bytes, CorrId 2, rtt 0.34ms)
%7|1585250580.883|MSGSET|rdkafka#producer-1| [thrd:kafka-805.[[REDACTED]]:9092/805]: kafka-805.[[REDACTED]]:9092/805: test-topic [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
[PERL] Produced 2 items
[PERL] Code took: 1.00250697135925 seconds
%7|1585250580.885|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1585250580.885|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1585250580.885|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
... and destroy sequence
Hello,
We have met across the same problem. Since the latency of each message producing is sensible for us, we find sending the first message after a producer initialized takes much more time than the others. E.g., 800ms-1s for first message while 2ms for the others. It's not allowed for our scenario.
We have so far only two choices:
- set the value of
topic.metadata.refresh.interval.ms
as 10. Less than 10 brings no better performance. - manually force to produce a first message before actually use. However, it's just a tricky...
I'm not sure whether this kind of problem is a bug. Is there any further configurations which we can use to solve this problem without introducing large CPU Load?
Bg Kevin
Hi all,
I've came across the same problem and after some debugging of librdkafka found a possible solution. I noticed that with the first message produced to each topic have a significant delay - and that's because librdkafka internally sends a MetadataRequest to the broker and waits for a response before actually producing the message.
The solution in my case is to manually call producer.metadata()
immediately after creating the producer instance.
Notice that this solution will cause you some delay when creating the producer (it's up to you to determine the metadata() request timeout) - but it will cause minimum latency on your messages, including the first one.
Please update if this solution helps.
@edenhill Maybe a good solution will be that librdkafka will automatically send MetadataRequest when a producer is created?
It already does that, but it doesn't know what topics will be produced to so it the topic metadata is not fetched, and thus does not help.
There's a fix here: https://github.com/edenhill/librdkafka/commit/4503dcff351d5cdcd3cc5e3baaad646d678ec0c6
but it will not be included in the v1.6.0 release.
@edenhill Thanks for replying. great to see there's already a fix - waiting for it!
One thing isn't clear to me yet - why manually calling metadata(false, ...)
prevents the delay on the first message - but the automatically-fetched metadata by librdkafka does not help?
Is there any other solution for this except that 2 solutions that @kevinprotoss wrote?
I've just run into this with Python using confluent_kafka.
@YarinLowe 's solution "manually call producer.metadata() immediately after creating the producer instance" seems to work for me, though Producer has no metadata
method, so I am calling list_topics()
.
Hello,
We have met across the same problem. Since the latency of each message producing is sensible for us, we find sending the first message after a producer initialized takes much more time than the others. E.g., 800ms-1s for first message while 2ms for the others. It's not allowed for our scenario.
We have so far only two choices:
- set the value of
topic.metadata.refresh.interval.ms
as 10. Less than 10 brings no better performance.- manually force to produce a first message before actually use. However, it's just a tricky...
I'm not sure whether this kind of problem is a bug. Is there any further configurations which we can use to solve this problem without introducing large CPU Load?
Bg Kevin
Hi, I was wondering if anyone has any update on this (or is there a way to make the producer more performant)
I'm using using Confluence.Kafka 1.9.3 and also getting ~1 second for first message however for each following message (and I tried both a simple object in protobufs and a simple string of "123" as key / value) I'm getting a median time of 15 ms which is > 7 times your 2 ms.
I can live with 2 ms per message. When I send 1k / 10k messages this is getting impossible to scale.
@r-owen Your workaround with calling list_topics()
doesn't work with 2.2.0 or 2.3.0. Described in: https://github.com/confluentinc/confluent-kafka-python/issues/1679
Won't setting topic.metadata.refresh.interval.ms=20
cause http requests to kafka every 20ms to refresh metadata?
As per documentation in https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html:
Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s.
As I understand this will poll for metadata every 20ms. I imagine having multiple producers will cause a lot of stress on Kafka cluster.