aiokafka
aiokafka copied to clipboard
SASL connect fails with RuntimeError: await wasn't used with future
robinhood-aiokafka version 1.1.3 Connecting to Kafka using SASL fails with 'RuntimeError: await wasn't used with future' in aiokafka/conn.py, AIOKafkaConnection._do_sasl_handshake(): res = await authenticator.step(auth_bytes)
Fix is missing await in aiokafka/conn.py: class BaseSaslAuthenticator:
async def step(self, payload):
return **await** self._loop.run_in_executor(None, self._step, payload
Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/mode/worker.py", line 264, in execute_from_commandline self.loop.run_until_complete(self._starting_fut) File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete return future.result() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 719, in start await self._default_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 726, in _default_start await self._actually_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 750, in _actually_start await child.maybe_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 778, in maybe_start await self.start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 719, in start await self._default_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 726, in _default_start await self._actually_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 750, in _actually_start await child.maybe_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 778, in maybe_start await self.start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 719, in start await self._default_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 726, in _default_start await self._actually_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 743, in _actually_start await self.on_start() File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 649, in on_start await producer.start() File "/usr/local/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start await self.client.bootstrap() File "/usr/local/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap version_hint=version_hint) File "/usr/local/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn await conn.connect() File "/usr/local/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect await self._do_sasl_handshake() File "/usr/local/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake payload, expect_response = res RuntimeError: await wasn't used with future
i think it's better to stay close as possible of the behavior of aio-libs/aiokafka, i made another PR will fix the issue #18
I'm also seeing this issue, PR #18 fixes it for me.
@jsurloppe this issue is present in aio-libs/aiokafka, too, but there is no PR over there.
@deed02392 , just tested on aio-libs/aiokafka master, i don't have the issue.
Open an issue on aio-libs repo with more context if you're still having it.
I am also getting this error and have opened an issue on aio-libs: https://github.com/aio-libs/aiokafka/issues/580