kcat
kcat copied to clipboard
Consuming messages from azure event hubs
I am following the azure event hub documentation for consuming messages from a topic.
I have created a kafkacat.conf file similar to following
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
socket.timeout.ms=30000
api.version.request=false
metadata.request.timeout.ms=30000
But I fail to receive any messages with the following command:
kafkacat -C -b <enter event hubs namespace>.servicebus.windows.net:9093 -t my-topic -o beginning -d broker
Some issues I see in the following error output are the following lines:
Broker changed state CONNECT -> APIVERSION_QUERY
%7|1601629794.693|BROKERFAIL|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1601629794.694|FEATURE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Updated enabled protocol features -ApiVersion to
...ApiVersionRequest failed: Local: Broker transport failure: probably due to old broker version
Here is the error output
%7|1601629794.240|BROKER|rdkafka#consumer-1| [thrd:app]: my-namespace.servicebus.windows.net:9093/bootstrap: Added new broker with NodeId -1
%7|1601629794.240|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1601629794.240|STATE|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1601629794.240|BRKMAIN|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Enter main broker thread
%7|1601629794.240|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: broker in state INIT connecting
%7|1601629794.429|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connecting to ipv4#52.179.6.240:9093 (plaintext) with socket 7
%7|1601629794.429|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state INIT -> CONNECT
%7|1601629794.557|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected to ipv4#52.179.6.240:9093
%7|1601629794.557|CONNECTED|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected (#1)
%7|1601629794.557|FEATURE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1601629794.557|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1601629794.693|BROKERFAIL|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1601629794.694|FEATURE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1601629794.694|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
%7|1601629794.694|REQERR|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: ApiVersionRequest failed: Local: Broker transport failure: explicit actions 0x0
%7|1601629794.694|BROKERFAIL|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: failed: err: Local: Not implemented: (errno: Operation now in progress)
%7|1601629794.694|FAIL|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: ApiVersionRequest failed: Local: Broker transport failure: probably due to old broker version
%7|1601629795.694|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: broker in state DOWN connecting
%7|1601629795.884|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connecting to ipv4#52.179.6.240:9093 (plaintext) with socket 7
%7|1601629795.884|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state DOWN -> CONNECT
%7|1601629796.013|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected to ipv4#52.179.6.240:9093
%7|1601629796.013|CONNECTED|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected (#2)
%7|1601629796.013|APIVERSION|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1601629796.013|FEATURE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Updated enabled protocol features to BrokerBalancedConsumer,ThrottleTime,Sasl,BrokerGroupCoordinator,LZ4
%7|1601629796.013|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state CONNECT -> UP
%7|1601629796.143|BROKERFAIL|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1601629796.143|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state UP -> DOWN
%7|1601629796.243|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: broker in state DOWN connecting
%7|1601629796.243|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connecting to ipv4#52.179.6.240:9093 (plaintext) with socket 7
%7|1601629796.244|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state DOWN -> CONNECT
%7|1601629796.380|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected to ipv4#52.179.6.240:9093
%7|1601629796.380|CONNECTED|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected (#3)
%7|1601629796.380|APIVERSION|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1601629796.380|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state CONNECT -> UP
%7|1601629797.369|BROKERFAIL|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1601629797.369|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state UP -> DOWN
%7|1601629797.469|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: broker in state DOWN connecting
%7|1601629797.667|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connecting to ipv4#52.179.6.240:9093 (plaintext) with socket 7
%7|1601629797.667|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state DOWN -> CONNECT
%7|1601629797.796|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected to ipv4#52.179.6.240:9093
%7|1601629797.796|CONNECTED|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected (#4)
%7|1601629797.796|APIVERSION|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1601629797.796|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state CONNECT -> UP
%7|1601629798.369|BROKERFAIL|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1601629798.369|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state UP -> DOWN
%7|1601629798.469|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: broker in state DOWN connecting
%7|1601629798.470|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connecting to ipv4#52.179.6.240:9093 (plaintext) with socket 7
%7|1601629798.470|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state DOWN -> CONNECT
%7|1601629798.597|CONNECT|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected to ipv4#52.179.6.240:9093
%7|1601629798.598|CONNECTED|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Connected (#5)
%7|1601629798.598|APIVERSION|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1601629798.598|STATE|rdkafka#consumer-1| [thrd:my-namespace.servicebus.windows.net:9093/bootstrap]: my-namespace.servicebus.windows.net:9093/bootstrap: Broker changed state CONNECT -> UP
% ERROR: Failed to query metadata for topic my-topic_changes: Local: Timed out
Any idea what am I missing here?
Thanks for any input
You probably shouldn't set api.version.request to false, that's only needed for Apache Kafka <0.10 brokers (really old), so remove that config setting.
Also make sure to use the latest librdkafka and kafkacat, and if the problem persists reproduce with -d broker,security,protocol
.
I get this same error with nothing obvious with -d broker,metadata,protocol,security
OP are you also trying to access behind a proxy?
Currently our proxy only exposes traffic via AMQp, and rejects other protocols.
I suspect this is the issue as Event Hubs may require AMQp in order to connect.