kafka-python
kafka-python copied to clipboard
Client disconnected when Kafka 2.6.0 with SSL used.
Description
Using the highlevel KafkaConsumer or KafkaProducer with Kafka 2.6.0 configured with an SSL listener ends with multiple client disconnection. The messages are produced/consumed eventually.
It looks like the socket is closed after the BrokerConnection callback method invokes the _conn_state_change method.
This only happens if SSL is used, I wasn't able to reproduce using PLAINTEXT.
I checked with confluent-kafka python library and it seems to be working fine.
It doesn't happen with previous Kafka versions.
How to reproduce
Code snippet:
#! /usr/bin/env python3
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers="localhost:29092",
ssl_keyfile="keyfile.key",
ssl_certfile="certfile.pem",
ssl_cafile="cafile.crt",
ssl_password="ssl_password",
security_protocol="SSL", ssl_check_hostname=False, auto_offset_reset='earliest',
group_id="group_id", consumer_timeout_ms=10000, enable_auto_commit=True)
consumer.subscribe(['3p3r'])
try:
print(next(consumer))
except:
pass
Error returned:
2020-11-03 13:23:02,124 [client_async] DEBUG Node 2 connected
2020-11-03 13:23:02,124 [client_async] DEBUG Node 2 connected
2020-11-03 13:23:02,131 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,131 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,131 [conn] DEBUG <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: reconnect backoff 0.05202506418997614 after 1 failures
2020-11-03 13:23:02,131 [conn] DEBUG <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: reconnect backoff 0.05202506418997614 after 1 failures
2020-11-03 13:23:02,132 [client_async] WARNING Node 2 connection failed -- refreshing metadata
2020-11-03 13:23:02,132 [client_async] WARNING Node 2 connection failed -- refreshing metadata
(...)
2020-11-03 13:23:02,441 [conn] DEBUG <BrokerConnection node_id=coordinator-2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]> Response 5 (104.11787033081055 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2020-11-03 13:23:02,441 [conn] DEBUG <BrokerConnection node_id=coordinator-2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]> Response 5 (104.11787033081055 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2020-11-03 13:23:02,441 [base] DEBUG Received successful heartbeat response for group e2e-test-group-temp
2020-11-03 13:23:02,441 [base] DEBUG Received successful heartbeat response for group e2e-test-group-temp
2020-11-03 13:23:02,442 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,442 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,442 [client_async] WARNING Node 2 connection failed -- refreshing metadata
2020-11-03 13:23:02,442 [client_async] WARNING Node 2 connection failed -- refreshing metadata
Kafka Brokers errors:
[2020-11-03 12:25:24,686] DEBUG [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=broker-2-fetcher-0, correlationId=1897) and timeout 30000 to node 1: {replica_id=2,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=383646810,session_epoch=1895,topics=[],forgotten_topics_data=[],rack_id=} (org.apache.kafka.clients.NetworkClient)
[2020-11-03 12:25:24,693] DEBUG [SocketServer brokerId=2] Connection with /0:0:0:0:0:0:0:1 disconnected (org.apache.kafka.common.network.Selector)
java.io.IOException: Broken pipe
at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:239)
at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:697)
at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:738)
at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:763)
at org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
at org.apache.kafka.common.record.MultiRecordsSend.writeTo(MultiRecordsSend.java:93)
at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:425)
at org.apache.kafka.common.network.Selector.write(Selector.java:648)
at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:641)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:597)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:913)
at kafka.network.Processor.run(SocketServer.scala:816)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-03 12:25:24,693] DEBUG [SslTransportLayer channelId=0:0:0:0:0:0:0:1:29092-0:0:0:0:0:0:0:1:38210-28 key=channel=java.nio.channels.SocketChannel[connected local=/0:0:0:0:0:0:0:1:29092 remote=/0:0:0:0:0:0:0:1:38210], selector=sun.nio.ch.EPollSelectorImpl@68dbbefc, interestOps=1, readyOps=0] Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:239)
at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:182)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:934)
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:154)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:955)
at org.apache.kafka.common.network.Selector.close(Selector.java:939)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:629)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:913)
at kafka.network.Processor.run(SocketServer.scala:816)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-03 12:25:25,117] DEBUG [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Received FETCH response from node 3 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=broker-2-fetcher-0, correlationId=1897): org.apache.kafka.common.requests.FetchResponse@40c845b8 (org.apache.kafka.clients.NetworkClient)
Checklist
- Python package version: 2.0.2
- Apache Kafka broker version: Docker image confluentinc/cp-kafka:6.0.0
- Operating system: Fedora 32
Thanks for your help!
I'm facing a similar issue. It is not able to connect when using SSL.