aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

SASL connect fails with `RuntimeError: await wasn't used with future`

Open galen211 opened this issue 4 years ago • 6 comments

I'm trying to connect to my Kafka cluster (confluent cloud) using a modified version of the ssl_consume_produce.py example from the AIOKafka repo at https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py.

I've configured my AIOKafkaConsumer and AIOKafkaProducer with the correct SASL config, but am getting the error RuntimeError: await wasn't used with future. I've included my config, error details, and the ssl_consume_produce.py example below.

config:

bootstrap.servers=*********.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="*********" password\="******************";
sasl.username=*********
sasl.password=*********

error logs:

/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
[2020-02-06 17:26:57,060] DEBUG [asyncio]: Using selector: KqueueSelector
[2020-02-06 17:26:57,061] DEBUG [aiokafka.producer.producer]: Starting the Kafka producer
[2020-02-06 17:26:57,061] DEBUG [aiokafka]: Attempting to bootstrap via node at pkc-43n10.us-central1.gcp.confluent.cloud:9092
[2020-02-06 17:26:57,223] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 1: ApiVersionRequest_v0()
[2020-02-06 17:26:57,265] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=2), (api_key=6, min_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=6), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_version=6), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=4), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=3), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=3), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=10000, min_version=0, max_version=0)])
[2020-02-06 17:26:57,266] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
[2020-02-06 17:26:57,303] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
Traceback (most recent call last):
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 78, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 73, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 34, in produce_and_consume
    start_future = await producer.start()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
    await self.client.bootstrap()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
    version_hint=version_hint)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
    await conn.connect()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
    await self._do_sasl_handshake()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
    payload, expect_response = res
RuntimeError: await wasn't used with future
[2020-02-06 17:26:57,315] ERROR [asyncio]: Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fcee03a84d0>

Process finished with exit code 1

ssl_consume_produce.py:

import asyncio
import os
import logging

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from kafka.common import TopicPartition
from aiokafka.errors import KafkaError

from aiokafka import AIOKafkaClient

import ccloud_lib

conf = ccloud_lib.read_ccloud_config('kafka_config.conf')
ssl_context = create_ssl_context(cafile='cacert.pem')

log_level = logging.DEBUG
log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)

async def produce_and_consume(loop):
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop = loop,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        ssl_context=ssl_context,
        sasl_plain_username=conf['sasl.username'],
        sasl_plain_password=conf['sasl.password'],
        api_version='0.10'
    )
    try:
        start_future = await producer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))

    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop=loop,
        ssl_context=ssl_context,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )
    try:
        start_future = await consumer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))


    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = loop.create_task(produce_and_consume(loop))
    try:
        loop.run_until_complete(task)
    finally:
        loop.run_until_complete(asyncio.sleep(0, loop=loop))
        task.cancel()
        try:
            loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass

galen211 avatar Feb 06 '20 22:02 galen211

Sorry, it was probably some version of an unstable master at the time. =( I did make a mistake there in 0.5.2 I think. Should be fixed in 0.6.0.

tvoinarovskyi avatar May 15 '20 15:05 tvoinarovskyi

We are still seeing this issue in 0.7.2.

offero avatar Dec 09 '21 21:12 offero

We are still seeing this issue in 0.7.2.

The example here doesn't look like working code, neither AIOKafkaConsumer.start() nor AIOKafkaProducer.start() return future. And it would be very helpful if you provide a self-contained example to reproduce the problem, e.g. python script + Dockerfile + docker-compose.yml

ods avatar Dec 11 '21 13:12 ods

We think it could be the issue is in python 3.8 and 3.9 with using wait_for (https://bugs.python.org/issue42130).

offero avatar Dec 13 '21 22:12 offero

We think it could be the issue is in python 3.8 and 3.9 with using wait_for (https://bugs.python.org/issue42130).

Could you then try it with current code in master please? It uses async-timeout as a replacement for wait_for().

ods avatar Dec 14 '21 03:12 ods

It got past the previous error, but it still errors with:

...line 45, in start_consumer
    await consumer.start()
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/consumer/consumer.py\", line 346, in start
    await self._client.bootstrap()
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/client.py\", line 210, in bootstrap
    bootstrap_conn = await create_conn(
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/conn.py\", line 97, in create_conn
    await conn.connect()
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/conn.py\", line 217, in connect
    transport, _ = await loop.create_connection(
  File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 986, in create_connection
    infos = await self._ensure_resolved(
  File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 1365, in _ensure_resolved
    return await loop.getaddrinfo(host, port, family=family, type=type,
  File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 825, in getaddrinfo
    return await self.run_in_executor(
builtins.RuntimeError: await wasn't used with future
"}


offero avatar Dec 15 '21 01:12 offero