aiokafka
aiokafka copied to clipboard
SASL connect fails with `RuntimeError: await wasn't used with future`
I'm trying to connect to my Kafka cluster (confluent cloud) using a modified version of the ssl_consume_produce.py
example from the AIOKafka
repo at https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py.
I've configured my AIOKafkaConsumer
and AIOKafkaProducer
with the correct SASL config, but am getting the error RuntimeError: await wasn't used with future
. I've included my config, error details, and the ssl_consume_produce.py
example below.
config
:
bootstrap.servers=*********.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="*********" password\="******************";
sasl.username=*********
sasl.password=*********
error logs
:
/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
[2020-02-06 17:26:57,060] DEBUG [asyncio]: Using selector: KqueueSelector
[2020-02-06 17:26:57,061] DEBUG [aiokafka.producer.producer]: Starting the Kafka producer
[2020-02-06 17:26:57,061] DEBUG [aiokafka]: Attempting to bootstrap via node at pkc-43n10.us-central1.gcp.confluent.cloud:9092
[2020-02-06 17:26:57,223] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 1: ApiVersionRequest_v0()
[2020-02-06 17:26:57,265] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=2), (api_key=6, min_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=6), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_version=6), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=4), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=3), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=3), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=10000, min_version=0, max_version=0)])
[2020-02-06 17:26:57,266] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
[2020-02-06 17:26:57,303] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
Traceback (most recent call last):
File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 78, in <module>
loop.run_until_complete(task)
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 73, in <module>
loop.run_until_complete(task)
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 34, in produce_and_consume
start_future = await producer.start()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
await self.client.bootstrap()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
version_hint=version_hint)
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
await conn.connect()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
await self._do_sasl_handshake()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
payload, expect_response = res
RuntimeError: await wasn't used with future
[2020-02-06 17:26:57,315] ERROR [asyncio]: Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fcee03a84d0>
Process finished with exit code 1
ssl_consume_produce.py
:
import asyncio
import os
import logging
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from kafka.common import TopicPartition
from aiokafka.errors import KafkaError
from aiokafka import AIOKafkaClient
import ccloud_lib
conf = ccloud_lib.read_ccloud_config('kafka_config.conf')
ssl_context = create_ssl_context(cafile='cacert.pem')
log_level = logging.DEBUG
log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)
async def produce_and_consume(loop):
# Produce
producer = AIOKafkaProducer(
bootstrap_servers=conf['bootstrap.servers'],
loop = loop,
security_protocol=conf['security.protocol'],
sasl_mechanism=conf['sasl.mechanism'],
ssl_context=ssl_context,
sasl_plain_username=conf['sasl.username'],
sasl_plain_password=conf['sasl.password'],
api_version='0.10'
)
try:
start_future = await producer.start()
response = await start_future # wait until message is produced
except KafkaError as err:
print("some kafka error on produce: {}".format(err))
try:
msg = await producer.send_and_wait(
'my_topic', b"Super Message", partition=0)
finally:
await producer.stop()
consumer = AIOKafkaConsumer(
bootstrap_servers=conf['bootstrap.servers'],
loop=loop,
ssl_context=ssl_context,
security_protocol=conf['security.protocol'],
sasl_mechanism=conf['sasl.mechanism'],
sasl_plain_password=conf['sasl.password'],
sasl_plain_username=conf['sasl.username']
)
try:
start_future = await consumer.start()
response = await start_future # wait until message is produced
except KafkaError as err:
print("some kafka error on produce: {}".format(err))
try:
consumer.seek(TopicPartition('my_topic', 0), msg.offset)
fetch_msg = await consumer.getone()
finally:
await consumer.stop()
print("Success", msg, fetch_msg)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task = loop.create_task(produce_and_consume(loop))
try:
loop.run_until_complete(task)
finally:
loop.run_until_complete(asyncio.sleep(0, loop=loop))
task.cancel()
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Sorry, it was probably some version of an unstable master at the time. =( I did make a mistake there in 0.5.2 I think. Should be fixed in 0.6.0.
We are still seeing this issue in 0.7.2.
We are still seeing this issue in 0.7.2.
The example here doesn't look like working code, neither AIOKafkaConsumer.start()
nor AIOKafkaProducer.start()
return future. And it would be very helpful if you provide a self-contained example to reproduce the problem, e.g. python script + Dockerfile + docker-compose.yml
We think it could be the issue is in python 3.8 and 3.9 with using wait_for (https://bugs.python.org/issue42130).
We think it could be the issue is in python 3.8 and 3.9 with using wait_for (https://bugs.python.org/issue42130).
Could you then try it with current code in master please? It uses async-timeout
as a replacement for wait_for()
.
It got past the previous error, but it still errors with:
...line 45, in start_consumer
await consumer.start()
File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/consumer/consumer.py\", line 346, in start
await self._client.bootstrap()
File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/client.py\", line 210, in bootstrap
bootstrap_conn = await create_conn(
File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/conn.py\", line 97, in create_conn
await conn.connect()
File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/conn.py\", line 217, in connect
transport, _ = await loop.create_connection(
File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 986, in create_connection
infos = await self._ensure_resolved(
File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 1365, in _ensure_resolved
return await loop.getaddrinfo(host, port, family=family, type=type,
File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 825, in getaddrinfo
return await self.run_in_executor(
builtins.RuntimeError: await wasn't used with future
"}