janus icon indicating copy to clipboard operation
janus copied to clipboard

Added the possibility of synchronous initialization

Open s0d3s opened this issue 2 years ago • 5 comments

What do these changes do?

Prior to this commit, you could only initialize a queue in an asynchronous function. Now it is possible in synchronous, but you need to pass the event loop (even empty\not running) to the constructor

Just a new way to init janus. But gives more ways to handle exceptions and code manage.

(Minimum code to understand the changes)

Before

import asyncio
import janus

def async_f(async_q: janus.AsyncQueue[int]):
    ...
def sync_f(sync_q: janus.SyncQueue[int]):
    ...
async def main() -> None:
    loop = asyncio.get_running_loop()
    queue: janus.Queue[int] = janus.Queue()

    loop.run_in_executor(None, sync_f, queue.sync_q)
    await async_f(queue.async_q)
    ...

if __name__ == "__main__":
    asyncio.run(main())

Now

import asyncio
import janus

def async_f(async_q: janus.AsyncQueue[int]):
    ...
def sync_f(sync_q: janus.SyncQueue[int]):
    ...

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue: janus.Queue[int] = janus.Queue(loop=loop)
    loop.create_task(async_f(queue.async_q))
    loop.run_in_executor(None, sync_f, queue.sync_q)
    try:
         loop.run_forever()
    except KeyboardInterrupt:
         pass

Are there changes in behavior for the user?

There are no behavior changes for users.

Checklist

  • [X] I think the code is well written
  • [x] Unit tests for the changes exist
  • [ ] Documentation reflects the changes
  • [ ] Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

s0d3s avatar Aug 21 '22 14:08 s0d3s

@s0d3s The asyncio.Condition and Event in the Queue.init should also pass the self._loop. Otherwise, it will raise the following Error

RuntimeError: Task <Task pending name='Task-1' coro=<run_ace_server.<locals>.xxxx() running at server.py:21>> got Future <Future pending> attached to a different loop

ghost avatar Mar 22 '23 07:03 ghost

Calling asyncio.get_event_loop() from non-async code is soft-deprecated now. Please provide a realistic usage example that doesn't contain the deprecated call.

asvetlov avatar Mar 22 '23 09:03 asvetlov

I can imagine an alternative design with postponed initialization. The tricky part is that: threading synchronization primitives can be created from any thread, but async counterparts should be called from running asyncio loop only. There are two usage scenarios: async API is called first, and sync API is called first.

The first case is trivial: all structures are initialized as now but later in time.

The second case is more complex: sync init could instantiate only threading.Lock and threading.Condition instances leaving async counterparts untouched. In this case, the first async call, performed after the sync call, should instantiate asyncio.Lock / asyncio.Condition instances and put them in the proper state according to self._maxsize and len(self._queue) combination. _notify_async_full() and _notify_async_not_empty() should be called as well if needed.

Double-checked locking should be used b calling both sync and async inits to prevent racing.

The proposal is viable but not trivial. It requires careful thinking of processing a semi-initialized queue state in a multithreaded environment. It is an exciting challenge, and I will happily review a pull request from a champion.

asvetlov avatar Mar 22 '23 09:03 asvetlov

It's not such a challenge, but it's accepted😉 Wait for the news soon.

s0d3s avatar Mar 26 '23 21:03 s0d3s

I set myself the main task - to introduce new functionality while not changing the behavior of the existing one. The decisions made to achieve this may seem controversial (for a short introduction, see the commit comment). Nevertheless, all tasks were completed.

Now the following is possible:

  • Using janus in ONLY synchronous mode
import janus

if __name__ == '__main__':
   items_count = 10
   queue = janus.Queue(maxsize=items_count, init_async_part=False)

   for i in range(items_count):
       queue.sync_q.put(i)

   while not queue.sync_q.empty():
       print(queue.sync_q.get(), end=" ")
  • Possibility of post initialization of asynchronous parts
    • direct
    import threading
    import asyncio
    import janus
    
    if __name__ == '__main__':
        items_count = 10
        queue = janus.Queue(maxsize=items_count, init_async_part=False)
    
        all_done = threading.Event()
    
        def wait_init_the_get(sync_q: janus.SyncQueue):
            # Also now you can wait for init using `full_init` event
            # sync_q._parent.full_init.wait()
            all_done.wait()
    
            while not sync_q.empty():
                print(sync_q.get(), end=" ")
                sync_q.task_done()
    
        async def init_async_then_put(queue_: janus.Queue):
            queue_.trigger_async_initialization()
    
            for i in range(items_count):
                await queue_.async_q.put(i)
    
            all_done.set()
    
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
        loop.create_task(init_async_then_put(queue))
        loop.run_until_complete(loop.run_in_executor(None, wait_init_the_get, queue.sync_q))
        loop.close()
    
    
    • via proxy
    import threading
    import asyncio
    import janus
    
    from typing import Union
    
    if __name__ == '__main__':
        items_count = 10
        queue = janus.Queue(maxsize=items_count, init_async_part=False)
        all_done = threading.Event()
    
        def wait_init_the_get(sync_q: janus.SyncQueue):
            all_done.wait()
    
            while not sync_q.empty():
                print(sync_q.get(), end=" ")
                sync_q.task_done()
    
        async def init_async_then_put(async_q: Union[janus.AsyncQueue, janus.PreInitDummyAsyncQueue]):
    
            for i in range(items_count):
                # asynchronous parts will be automatically initialized
                # the first time the "asynchronous queue" attributes are accessed
                # (in fact, until the moment of full initialization,
                #  this will be a stub object not real `async_q`)
                await async_q.put(i)
    
            all_done.set()
    
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
        loop.create_task(init_async_then_put(queue.async_q))
        loop.run_until_complete(loop.run_in_executor(None, wait_init_the_get, queue.sync_q))
        loop.close()
    
    

P.S. @asvetlov I took note of your comments, but found no use for them. P.S.S. After re-reading it again, it seemed to me that you maybe meant the possibility of initializing only the asynchronous part (without the synchronous one), but I don’t see the point in this, since the synchronous part cannot “in any way” interfere with the asynchronous one. Probably I misunderstood you =/

s0d3s avatar Mar 30 '23 20:03 s0d3s