kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Client disconnected when Kafka 2.6.0 with SSL used.

Open mpastecki opened this issue 4 years ago • 1 comments

Description

Using the highlevel KafkaConsumer or KafkaProducer with Kafka 2.6.0 configured with an SSL listener ends with multiple client disconnection. The messages are produced/consumed eventually.

It looks like the socket is closed after the BrokerConnection callback method invokes the _conn_state_change method.

This only happens if SSL is used, I wasn't able to reproduce using PLAINTEXT.

I checked with confluent-kafka python library and it seems to be working fine.

It doesn't happen with previous Kafka versions.

How to reproduce

Code snippet:

#! /usr/bin/env python3

from kafka import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers="localhost:29092",
                         ssl_keyfile="keyfile.key",
                         ssl_certfile="certfile.pem",
                         ssl_cafile="cafile.crt",
                         ssl_password="ssl_password",
                         security_protocol="SSL", ssl_check_hostname=False, auto_offset_reset='earliest',
                         group_id="group_id", consumer_timeout_ms=10000, enable_auto_commit=True)

consumer.subscribe(['3p3r'])

try:
  print(next(consumer))
except:
  pass

Error returned:

2020-11-03 13:23:02,124 [client_async] DEBUG Node 2 connected
2020-11-03 13:23:02,124 [client_async] DEBUG Node 2 connected
2020-11-03 13:23:02,131 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,131 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,131 [conn] DEBUG <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: reconnect backoff 0.05202506418997614 after 1 failures
2020-11-03 13:23:02,131 [conn] DEBUG <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: reconnect backoff 0.05202506418997614 after 1 failures
2020-11-03 13:23:02,132 [client_async] WARNING Node 2 connection failed -- refreshing metadata
2020-11-03 13:23:02,132 [client_async] WARNING Node 2 connection failed -- refreshing metadata
(...)
2020-11-03 13:23:02,441 [conn] DEBUG <BrokerConnection node_id=coordinator-2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]> Response 5 (104.11787033081055 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2020-11-03 13:23:02,441 [conn] DEBUG <BrokerConnection node_id=coordinator-2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]> Response 5 (104.11787033081055 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2020-11-03 13:23:02,441 [base] DEBUG Received successful heartbeat response for group e2e-test-group-temp
2020-11-03 13:23:02,441 [base] DEBUG Received successful heartbeat response for group e2e-test-group-temp
2020-11-03 13:23:02,442 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,442 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,442 [client_async] WARNING Node 2 connection failed -- refreshing metadata
2020-11-03 13:23:02,442 [client_async] WARNING Node 2 connection failed -- refreshing metadata

Kafka Brokers errors:

[2020-11-03 12:25:24,686] DEBUG [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=broker-2-fetcher-0, correlationId=1897) and timeout 30000 to node 1: {replica_id=2,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=383646810,session_epoch=1895,topics=[],forgotten_topics_data=[],rack_id=} (org.apache.kafka.clients.NetworkClient)
[2020-11-03 12:25:24,693] DEBUG [SocketServer brokerId=2] Connection with /0:0:0:0:0:0:0:1 disconnected (org.apache.kafka.common.network.Selector)
java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:239)
	at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:697)
	at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:738)
	at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:763)
	at org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
	at org.apache.kafka.common.record.MultiRecordsSend.writeTo(MultiRecordsSend.java:93)
	at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:425)
	at org.apache.kafka.common.network.Selector.write(Selector.java:648)
	at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:641)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:597)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
	at kafka.network.Processor.poll(SocketServer.scala:913)
	at kafka.network.Processor.run(SocketServer.scala:816)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-03 12:25:24,693] DEBUG [SslTransportLayer channelId=0:0:0:0:0:0:0:1:29092-0:0:0:0:0:0:0:1:38210-28 key=channel=java.nio.channels.SocketChannel[connected local=/0:0:0:0:0:0:0:1:29092 remote=/0:0:0:0:0:0:0:1:38210], selector=sun.nio.ch.EPollSelectorImpl@68dbbefc, interestOps=1, readyOps=0] Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:239)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:182)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:934)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:154)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:955)
	at org.apache.kafka.common.network.Selector.close(Selector.java:939)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:629)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
	at kafka.network.Processor.poll(SocketServer.scala:913)
	at kafka.network.Processor.run(SocketServer.scala:816)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-03 12:25:25,117] DEBUG [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Received FETCH response from node 3 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=broker-2-fetcher-0, correlationId=1897): org.apache.kafka.common.requests.FetchResponse@40c845b8 (org.apache.kafka.clients.NetworkClient)

Checklist

  • Python package version: 2.0.2
  • Apache Kafka broker version: Docker image confluentinc/cp-kafka:6.0.0
  • Operating system: Fedora 32

Thanks for your help!

mpastecki avatar Nov 03 '20 12:11 mpastecki

I'm facing a similar issue. It is not able to connect when using SSL.

jpramos123 avatar Nov 30 '21 13:11 jpramos123