faust
faust copied to clipboard
agent.ask() not retrieving value
Checklist
- [V ] I have included information about relevant versions
- [V ] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
Calling agent.ask() outside of faust from an async function.
I firstly run the faust app via faust -A simplefaust worker -l info
Then I have the following async function:
`
async def s_faust():
from simplefaust import balance_agent
print('getting balance')
balance = await balance_agent.ask(
DummyOrder(order_id='AB001', amount=0)
)
print(f'balance is {balance}')
if __name__=='__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(s_faust())
Expected behavior
agent.ask() is supposed to retrieve value from the faust agent but it never retrieve any
Actual behavior
faust app successfully executed the stream process but it never return to the caller
Full traceback
Task exception was never retrieved
future: <Task finished coro=<Service._execute_task() done, defined at /opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py:800> exception=RuntimeError('Non-thread-safe operation invoked on an event loop other than the current one')>
Traceback (most recent call last):
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
await task
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/faust/transport/consumer.py", line 176, in _fetcher
await consumer._drain_messages(self)
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/faust/transport/consumer.py", line 1172, in _drain_messages
callback(message), self.suspend_flow.wait()
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
f.result() # propagate exceptions
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/faust/transport/conductor.py", line 274, in on_message
return await get_callback_for_tp(message.tp)(message)
KeyError: TopicPartition(topic='simplefaust_topic', partition=0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 813, in _execute_task
await self.crash(exc)
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 848, in crash
child._crash(reason)
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 855, in _crash
node._crash(reason)
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 855, in _crash
node._crash(reason)
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 852, in _crash
self._crashed.set()
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/utils/locks.py", line 53, in set
fut.set_result(True)
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/asyncio/base_events.py", line 693, in call_soon
self._check_thread()
File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/asyncio/base_events.py", line 731, in _check_thread
"Non-thread-safe operation invoked on an event loop other "
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
Versions
- Python version 3.8.13
- Faust version faust-streaming 0.8.4
- Operating system MacOS 12.3
- Kafka version Confluent Cloud Kafka
- RocksDB version (if applicable) N/A