janus
janus copied to clipboard
Added the possibility of synchronous initialization
What do these changes do?
Prior to this commit, you could only initialize a queue in an asynchronous function. Now it is possible in synchronous, but you need to pass the event loop (even empty\not running) to the constructor
Just a new way to init janus. But gives more ways to handle exceptions and code manage.
(Minimum code to understand the changes)
Before
import asyncio
import janus
def async_f(async_q: janus.AsyncQueue[int]):
...
def sync_f(sync_q: janus.SyncQueue[int]):
...
async def main() -> None:
loop = asyncio.get_running_loop()
queue: janus.Queue[int] = janus.Queue()
loop.run_in_executor(None, sync_f, queue.sync_q)
await async_f(queue.async_q)
...
if __name__ == "__main__":
asyncio.run(main())
Now
import asyncio
import janus
def async_f(async_q: janus.AsyncQueue[int]):
...
def sync_f(sync_q: janus.SyncQueue[int]):
...
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue: janus.Queue[int] = janus.Queue(loop=loop)
loop.create_task(async_f(queue.async_q))
loop.run_in_executor(None, sync_f, queue.sync_q)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
Are there changes in behavior for the user?
There are no behavior changes for users.
Checklist
- [X] I think the code is well written
- [x] Unit tests for the changes exist
- [ ] Documentation reflects the changes
- [ ] Add a new news fragment into the
CHANGES
folder- name it
<issue_id>.<type>
(e.g.588.bugfix
) - if you don't have an
issue_id
change it to the pr id after creating the PR - ensure type is one of the following:
-
.feature
: Signifying a new feature. -
.bugfix
: Signifying a bug fix. -
.doc
: Signifying a documentation improvement. -
.removal
: Signifying a deprecation or removal of public API. -
.misc
: A ticket has been closed, but it is not of interest to users.
-
- Make sure to use full sentences with correct case and punctuation, for example:
Fix issue with non-ascii contents in doctest text files.
- name it
@s0d3s The asyncio.Condition and Event in the Queue.init should also pass the self._loop. Otherwise, it will raise the following Error
RuntimeError: Task <Task pending name='Task-1' coro=<run_ace_server.<locals>.xxxx() running at server.py:21>> got Future <Future pending> attached to a different loop
Calling asyncio.get_event_loop()
from non-async code is soft-deprecated now.
Please provide a realistic usage example that doesn't contain the deprecated call.
I can imagine an alternative design with postponed initialization. The tricky part is that: threading synchronization primitives can be created from any thread, but async counterparts should be called from running asyncio loop only. There are two usage scenarios: async API is called first, and sync API is called first.
The first case is trivial: all structures are initialized as now but later in time.
The second case is more complex: sync init could instantiate only threading.Lock
and threading.Condition
instances leaving async counterparts untouched. In this case, the first async call, performed after the sync call, should instantiate asyncio.Lock
/ asyncio.Condition
instances and put them in the proper state according to self._maxsize
and len(self._queue)
combination. _notify_async_full()
and _notify_async_not_empty()
should be called as well if needed.
Double-checked locking should be used b calling both sync and async inits to prevent racing.
The proposal is viable but not trivial. It requires careful thinking of processing a semi-initialized queue state in a multithreaded environment. It is an exciting challenge, and I will happily review a pull request from a champion.
It's not such a challenge, but it's accepted😉 Wait for the news soon.
I set myself the main task - to introduce new functionality while not changing the behavior of the existing one. The decisions made to achieve this may seem controversial (for a short introduction, see the commit comment). Nevertheless, all tasks were completed.
Now the following is possible:
- Using janus in ONLY synchronous mode
import janus
if __name__ == '__main__':
items_count = 10
queue = janus.Queue(maxsize=items_count, init_async_part=False)
for i in range(items_count):
queue.sync_q.put(i)
while not queue.sync_q.empty():
print(queue.sync_q.get(), end=" ")
- Possibility of post initialization of asynchronous parts
- direct
import threading import asyncio import janus if __name__ == '__main__': items_count = 10 queue = janus.Queue(maxsize=items_count, init_async_part=False) all_done = threading.Event() def wait_init_the_get(sync_q: janus.SyncQueue): # Also now you can wait for init using `full_init` event # sync_q._parent.full_init.wait() all_done.wait() while not sync_q.empty(): print(sync_q.get(), end=" ") sync_q.task_done() async def init_async_then_put(queue_: janus.Queue): queue_.trigger_async_initialization() for i in range(items_count): await queue_.async_q.put(i) all_done.set() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.create_task(init_async_then_put(queue)) loop.run_until_complete(loop.run_in_executor(None, wait_init_the_get, queue.sync_q)) loop.close()
- via
proxy
import threading import asyncio import janus from typing import Union if __name__ == '__main__': items_count = 10 queue = janus.Queue(maxsize=items_count, init_async_part=False) all_done = threading.Event() def wait_init_the_get(sync_q: janus.SyncQueue): all_done.wait() while not sync_q.empty(): print(sync_q.get(), end=" ") sync_q.task_done() async def init_async_then_put(async_q: Union[janus.AsyncQueue, janus.PreInitDummyAsyncQueue]): for i in range(items_count): # asynchronous parts will be automatically initialized # the first time the "asynchronous queue" attributes are accessed # (in fact, until the moment of full initialization, # this will be a stub object not real `async_q`) await async_q.put(i) all_done.set() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.create_task(init_async_then_put(queue.async_q)) loop.run_until_complete(loop.run_in_executor(None, wait_init_the_get, queue.sync_q)) loop.close()
P.S. @asvetlov I took note of your comments, but found no use for them. P.S.S. After re-reading it again, it seemed to me that you maybe meant the possibility of initializing only the asynchronous part (without the synchronous one), but I don’t see the point in this, since the synchronous part cannot “in any way” interfere with the asynchronous one. Probably I misunderstood you =/