faust icon indicating copy to clipboard operation
faust copied to clipboard

agent.ask() not retrieving value

Open mozTheFuzz opened this issue 2 years ago • 0 comments

Checklist

  • [V ] I have included information about relevant versions
  • [V ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Calling agent.ask() outside of faust from an async function.

I firstly run the faust app via faust -A simplefaust worker -l info

Then I have the following async function:

`

async def s_faust():
    from simplefaust import balance_agent
    print('getting balance')
    balance = await balance_agent.ask(
        DummyOrder(order_id='AB001', amount=0)
    )
    print(f'balance is {balance}')


if __name__=='__main__':
      loop = asyncio.get_event_loop()
      loop.run_until_complete(s_faust())

Expected behavior

agent.ask() is supposed to retrieve value from the faust agent but it never retrieve any

Actual behavior

faust app successfully executed the stream process but it never return to the caller

Full traceback

Task exception was never retrieved
future: <Task finished coro=<Service._execute_task() done, defined at /opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py:800> exception=RuntimeError('Non-thread-safe operation invoked on an event loop other than the current one')>
Traceback (most recent call last):
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await consumer._drain_messages(self)
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/faust/transport/consumer.py", line 1172, in _drain_messages
    callback(message), self.suspend_flow.wait()
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
    f.result()  # propagate exceptions
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/faust/transport/conductor.py", line 274, in on_message
    return await get_callback_for_tp(message.tp)(message)
KeyError: TopicPartition(topic='simplefaust_topic', partition=0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 813, in _execute_task
    await self.crash(exc)
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 848, in crash
    child._crash(reason)
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 855, in _crash
    node._crash(reason)
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 855, in _crash
    node._crash(reason)
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/services.py", line 852, in _crash
    self._crashed.set()
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/site-packages/mode/utils/locks.py", line 53, in set
    fut.set_result(True)
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/asyncio/base_events.py", line 693, in call_soon
    self._check_thread()
  File "/opt/anaconda3/envs/smarter_ddd/lib/python3.7/asyncio/base_events.py", line 731, in _check_thread
    "Non-thread-safe operation invoked on an event loop other "
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Versions

  • Python version 3.8.13
  • Faust version faust-streaming 0.8.4
  • Operating system MacOS 12.3
  • Kafka version Confluent Cloud Kafka
  • RocksDB version (if applicable) N/A

mozTheFuzz avatar Apr 27 '22 10:04 mozTheFuzz