pynvim icon indicating copy to clipboard operation
pynvim copied to clipboard

How can I do asynchronous job in main thread ?

Open cosven opened this issue 7 years ago • 6 comments

I have a Thread work as a producer, it put things into a Queue every second. Meanwhile, I want to have a coroutine run in main thread which works as a consumer, it get thing from the Queue.

However, I have a problem in implementing the above requirements.

My code looks like the following:

from queue import Queue
queue = Queue()

def func():
    '''can only work in main thread'''
    pass

def producer():
    while True:
        if something1 happened:
            if something2 happened:
                finished = True
            queue.put(func)

def consumer():
    while True:
        if finished:
            break
        func = queue.get()
        func()

threading.Thread(target=producer)
vim.async_call(consumer)    # this will block nvim

My problem is that: vim.async_call(consumer) will block, how can I avoid the blocking? Can you give me some advice?

cosven avatar Oct 07 '16 18:10 cosven

Non-blocking work must be done in a separate (python, in this case) thread. See https://github.com/neovim/python-client/issues/212#issuecomment-226618972

async_call() is asynchronous in the sense that it can interact with the nvim process without waiting on the nvim process main-loop. So it won't be blocked by nvim, but this doesn't change the nature of your python program.

justinmk avatar Oct 10 '16 11:10 justinmk

Non-blocking work must be done in a separate (python, in this case) thread.

I don't think @cosven meant non-blocking as non-blocking when doing intensive CPU-work. Rather that it should be possible to "pause" the consumer() function while waiting for the queue to get non-empty. This is possible using greenlets, even if not "officialy" supported or documented. I do this in nvim-ipy to handle async events from an external source in an internally synchronized way (i e two event handlers must not overlap). I will try to construct a minimal example from it.

bfredl avatar Oct 10 '16 12:10 bfredl

Thanks for your reply. Looking forward to your example, which will be really helpful. @bfredl

I found a similar example written with asyncio but I still have problem in python-client (greenlets) here.

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True:
        yield from q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)


loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()

In my opinion, asyncio.create_task will be similar to nvim.async_call, but it use asyncio.Queue instead of Queue, I don't know if there is something like greenlet.Queue?

cosven avatar Oct 10 '16 13:10 cosven

Here is a stripped down example:

import vim
from threading import Thread
from time import sleep
import greenlet

sleeping = None

def wake_up(message):
    handler = sleeping
    handler.parent = greenlet.getcurrent()
    handler.switch(message)

def suspend():
    global sleeping
    gr = greenlet.getcurrent()
    sleeping = gr
    return gr.parent.switch()


def producer():
    sleep(3)
    vim.async_call(wake_up, "message")

def consumer():
    buf = vim.current.buffer
    buf.append("before")

    message = suspend()

    buf.append("after "+ message)


Thread(target=producer).start()
vim.async_call(consumer)

Though it is in principle unnecessary that wake_up is run in a greenlet, which must be reparented so it is cleaned up properly. In principle the nvim client could expose methods to suspend and resume greenlets and do it more naturally and effeciently.

I use this this in https://github.com/bfredl/nvim-ipy/blob/master/rplugin/python3/nvim_ipy/init.py to do rpc calls to an external process, see the wait_for and on_shell_msg methods. Though just for handling a stream of events there might be a simpler pattern from the same file:

from collections import deque
class ExclusiveHandler(object):
    """Wrapper for buffering incoming messages from a asynchronous source.

    Wraps an async message handler function and ensures a previous message will
    be completely handled before next messsage is processed. Is used to avoid
    iopub messages being printed out-of-order or even interleaved.
    """
    def __init__(self, handler):
        self.msgs = deque()
        self.handler = handler
        self.is_active = False

    def __call__(self, msg):
        self.msgs.append(msg)
        if not self.is_active:
            self.is_active = True
            while self.msgs:
                self.handler(self.msgs.popleft())
            self.is_active = False

def _on_iopub_msg(msg):
    pass# code interacting with vim
on_iopub_msg = ExclusiveHandler(_on_iopub_msg)

# in a thread, when a message is produced
vim.async_call(on_iopub_msg,msg)

There is no async queue class for greenlets that could be used unmodified (as nvim does not use the standard gevent event loop used with greenlets), but in principle one could be written specifically for nvim.

bfredl avatar Oct 10 '16 18:10 bfredl

Thanks a lot.

After I read the source code of async_call function, I found that I have a wrong recognition of async_call function. async_call(func, *args, **kwargs) will ensure that func will be called in main-thread. It does not matter whether async_call is called in main-thread.

So, finally, after I change the code a bit, it works!

from queue import Queue
queue = Queue()

def func():
    '''can only work in main thread'''
    pass

def producer():
    while True:
        if something1 happened:
            if something2 happened:
                finished = True
            queue.put(func)

def consumer():
    while True:
        if finished:
            break
        func = queue.get()
        vim.async_call(func)    # `func` will run in main thread !!!

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Can async_call(func, ...) really make func asynchronous? Temporarily, I think not. It just schedules func to be called by the event loop soon. In my own opinion, the naming of async_call function can be ambiguous. We can change it to call_soon or something else.

Feel free to correct me if I have a misunderstanding of async_call. Thanks again ^_^

cosven avatar Oct 12 '16 17:10 cosven

Can async_call(func, ...) really make func asynchronous? It Just schedules func to be called by the event loop soon.

That is asynchrony. Maybe you are confusing asynchrony with parallelism?

justinmk avatar Oct 12 '16 18:10 justinmk