trio.TrioInternalError when sync func run with trio.to_thread.run_sync returns
Version: 0.23.1 Just trying out trio with some test code and got this:
python-BaseException
Traceback (most recent call last):
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 2574, in unrolled_run
runner.task_exited(task, final_outcome)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1800, in task_exited
task._parent_nursery._child_finished(task, outcome)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1069, in _child_finished
self._add_exc(outcome.error)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1057, in _add_exc
self.cancel_scope.cancel()
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_ki.py", line 180, in wrapper
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 773, in cancel
self._cancel_status.recalculate()
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 431, in recalculate
task._attempt_delivery_of_any_pending_cancel()
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1411, in _attempt_delivery_of_any_pending_cancel
self._attempt_abort(raise_cancel)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1393, in _attempt_abort
success = self._abort_func(raise_cancel)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_channel.py", line 206, in abort_fn
self._tasks.remove(task)
KeyError: <Task '__main__.async_main' at 0x259b1770a40>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 2240, in run
timeout = gen.send(next_send)
^^^^^^^^^^^^^^^^^^^
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 2631, in unrolled_run
raise TrioInternalError("internal error in Trio - please file a bug!") from exc
trio.TrioInternalError: internal error in Trio - please file a bug!
Here's the test code that caused it:
import trio
import time
start = 0
class dummy_driver:
def get_handler(self, msg):
if msg == 'step':
return self.run_step
def run_step(self, ext_comms, send_resp):
time.sleep(2)
# trio.sleep(2)
print(f"handler(): sending response, time = {int(time.time())-start}")
send_resp(ext_comms, 'ok')
class dummy_io:
def get(self):
time.sleep(2)
return 'step'
def put(self, resp):
time.sleep(1)
print(f"received resp {resp}")
# reads msg from the comms channel from TEAL and sends it back to main driver thread
def read_msg(ext_comms, msg_in: trio.MemoryReceiveChannel, msg_out: trio.MemorySendChannel):
while True:
msg = ext_comms.get() # blocks
trio.from_thread.run(msg_out.send, msg)
try:
die = msg_in.receive_nowait()
if die:
print(f"read_msg: terminating, time = {int(time.time())-start}")
return
except trio.WouldBlock as e:
pass
def send_resp(ext_comms, resp):
if resp:
ext_comms.put(resp)
async def async_main():
driver = dummy_driver()
ext_comms = dummy_io()
send, receive = trio.open_memory_channel(0)
send2, receive2 = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
count = 0
nursery.start_soon(trio.to_thread.run_sync, read_msg, ext_comms, receive2, send)
while True:
msg = await receive.receive()
fn = driver.get_handler(msg)
print(f"main: assign item to handler, time = {int(time.time())-start}")
nursery.start_soon(trio.to_thread.run_sync, fn, ext_comms, send_resp)
count += 1
if count == 5:
print(f"main: terminating, time = {int(time.time())-start}")
await send2.send(True)
if __name__ == "__main__":
start = int(time.time())
trio.run(async_main)
print(f"main: terminated, time = {int(time.time())-start}")
This happened when count reached 5 and read_msg() returned.
Python 3.11.5
I'm not certain, but I suspect your issue might be with the msg_in.receive_nowait() call in read_msg(). Trio's functions in general aren't thread-safe - that allows them to be fast, but it means you'll need to use trio.from_thread.run_sync there.
It didn't matter, got the same error. Here's additional exceptions that I forgot to post earlier:
received resp ok, time = 11
Exception while delivering result of thread
Traceback (most recent call last):
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_thread_cache.py", line 169, in _handle_job
deliver(result)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_threads.py", line 377, in deliver_worker_fn_result
current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 211, in run_sync_soon
self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 139, in run_sync_soon
self.wakeup.wakeup_thread_and_signal_safe()
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_wakeup_socketpair.py", line 35, in wakeup_thread_and_signal_safe
self.write_sock.send(b"\x00")
OSError: [WinError 10038] An operation was attempted on something that is not a socket
handler(): sending response, time = 12
received resp ok, time = 13
Exception while delivering result of thread
Traceback (most recent call last):
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_thread_cache.py", line 169, in _handle_job
deliver(result)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_threads.py", line 377, in deliver_worker_fn_result
current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 211, in run_sync_soon
self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 139, in run_sync_soon
self.wakeup.wakeup_thread_and_signal_safe()
File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_wakeup_socketpair.py", line 35, in wakeup_thread_and_signal_safe
self.write_sock.send(b"\x00")
OSError: [WinError 10038] An operation was attempted on something that is not a socket
Hope it helps.
I can reproduce your failure with the original code you posted, but I don't see it anymore after the fix that @TeamSpen210 suggested. Specifically, I changed die = msg_in.receive_nowait() to die = trio.from_thread.run_sync(msg_in.receive_nowait). I also added a break after await send2.send(True), because otherwise it doesn't terminate (it will loop around and block in await receive.receive() because the read_msg thread has exited so no one is feeding that channel anymore).
I can confirm that fix suggested by @TeamSpen210 works on Ubuntu 22.04 running Python 3.10.12 and save version of trio, but not on Windows 11 where it was originally seen.
One other thing you could do is simplify the code a bit by getting rid of send2/receive2. You can close() the receive side of the channel, which will make the send side immediately raise an exception. Also read_msg() might be better if it was an async function itself, just use to_thread.run_sync() with ext_comms.get(). Then you don't have to worry about any of this.
I personally cannot reproduce after applying @TeamSpen210's fix on Windows 11, running trio as of dfa5576907ef7fb2eabe4ad3c1520655d02a9002, and CPython 3.11.4.
At least I assume it's supposed to hang after this output?:
main: assign item to handler, time = 2
main: assign item to handler, time = 4
handler(): sending response, time = 4
received resp ok
handler(): sending response, time = 6
main: assign item to handler, time = 6
received resp ok
handler(): sending response, time = 8
main: assign item to handler, time = 8
received resp ok
handler(): sending response, time = 10
main: assign item to handler, time = 10
main: terminating, time = 10
read_msg: terminating, time = 10
received resp ok
handler(): sending response, time = 12
received resp ok
If so! Then maybe still breaking after that fix is a more machine specific thing?
As for the original issue, I haven't looked at the source code at all but maybe there's some cheap way to check for concurrency and error in most cases? (without a lock, obviously...)
Incidentally the thread-unsafe operation you did was checking for a termination message, which is no longer really required in trio>=0.23 if you are using trio.from_thread.run regularly:
import trio
import time
start = 0
class dummy_driver:
def get_handler(self, msg):
if msg == 'step':
return self.run_step
def run_step(self, ext_comms, send_resp):
time.sleep(2)
# trio.sleep(2)
print(f"handler(): sending response, time = {int(time.time())-start}")
send_resp(ext_comms, 'ok')
class dummy_io:
def get(self):
time.sleep(2)
return 'step'
def put(self, resp):
time.sleep(1)
print(f"received resp {resp}")
# reads msg from the comms channel from TEAL and sends it back to main driver thread
def read_msg(ext_comms, msg_in: trio.MemoryReceiveChannel, msg_out: trio.MemorySendChannel):
try:
while True:
msg = ext_comms.get() # blocks
# This line will notice cancellation and end the thread
trio.from_thread.run(msg_out.send, msg)
finally:
print(f"read_msg: terminating, time = {int(time.time())-start}")
def send_resp(ext_comms, resp):
if resp:
ext_comms.put(resp)
async def async_main():
driver = dummy_driver()
ext_comms = dummy_io()
send, receive = trio.open_memory_channel(0)
send2, receive2 = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
count = 0
nursery.start_soon(trio.to_thread.run_sync, read_msg, ext_comms, receive2, send)
while True:
msg = await receive.receive()
fn = driver.get_handler(msg)
print(f"main: assign item to handler, time = {int(time.time())-start}")
nursery.start_soon(trio.to_thread.run_sync, fn, ext_comms, send_resp)
count += 1
if count == 5:
print(f"main: terminating, time = {int(time.time())-start}")
# just cancel everything to exit
nursery.cancel_scope.cancel()
if __name__ == "__main__":
start = int(time.time())
trio.run(async_main)
print(f"main: terminated, time = {int(time.time())-start}")
At least I assume it's supposed to hang after this output?:
In my tests it did that unless I added a break after the operation in async_main that sends the termination request. I assume that's a bug in the reduced example that wasn't noticed because execution didn't reach that point before.
As for the original issue, I haven't looked at the source code at all but maybe there's some cheap way to check for concurrency and error in most cases? (without a lock, obviously...)
I think it already would fail when receive_nowait calls trio.lowlevel.reschedule(), because reschedule won't be able to find the runner via GLOBAL_RUN_CONTEXT, because the context is thread-local. It's not obvious to me what caused the TrioInternalError here and whether we could guard against that.
Thanks @TeamSpen210. That solves the problem for me. Please feel free to close or let me know if you want me to close the issue if you don't need it to resolve the trio.TrioInternalError.
One other thing you could do is simplify the code a bit by getting rid of
send2/receive2. You canclose()thereceiveside of the channel, which will make the send side immediately raise an exception. Alsoread_msg()might be better if it was an async function itself, just useto_thread.run_sync()withext_comms.get(). Then you don't have to worry about any of this.