aioredis-py icon indicating copy to clipboard operation
aioredis-py copied to clipboard

pubsub: subscription doesn't get registered when using asyncio.create_task() instead of await

Open alecov opened this issue 3 years ago • 15 comments

Hi,

I'm not sure if this is really an issue or user error, so apologies upfront for any undue trouble.

In this code:

#!/usr/bin/env python3

import asyncio
import aioredis

async def main():
    redis = aioredis.from_url("redis://localhost")
    async with redis.pubsub() as pubsub:
        asyncio.create_task(pubsub.subscribe("")) # vs. await pubsub.subscribe("")
        await pubsub.subscribe("xxx")
        while True:
            message = await pubsub.get_message(timeout=100)
            print(message)

asyncio.run(main())

If instead of awaiting for the pubsub.subscribe("") call, we asyncio.create_task() it, sometimes the subscription does not get registered somehow in the pubsub object (as reflected by the missing subscription confirmation message). The subscription actually does happen in Redis (PUBLISHing shows one subscriber) but the client does not receive any message from the channel.

Since this happens randomly I presume there's some missing synchronization point either in the above code, or inside aioredis itself.

The actual use case I'm trying to implement is akin to:

# register a subscription _before_ starting the evloop:
asyncio.get_event_loop().create_task(pubsub.subscribe(some_channel))
...
# start the event loop
asyncio.get_event_loop().run_until_complete(something)

As I said I'm not sure if this use case is supported, so please point me to the right direction.

aioredis version 2.0.0b1.

alecov avatar Jul 14 '21 17:07 alecov

Hey @alecov -

The reason it sometimes results in a subscription and sometimes not is that you aren't guaranteeing an order of operations in the event loop.

By creating a task, you do guarantee the command executes immediately, but you do not guarantee that your client has received the response from the server. It's possible the response gets received and parsed by the client as the event loop flips through tasks, but not a guarantee.

If you need to ensure the client has received the response from the server, then you need to await the result before executing code which depends upon the result, as shown by the call to await pubsub.subscribe("xxx").

As for your second example, this won't work the way you want it to because the event loop isn't running. Even though you create the task, it's not being executed until you run the loop, which puts you back in the state described above: the task is scheduled to execute immediately (but not await the result) so when you run_until_complete(something), the task sends a the command to the server but the response from the server isn't necessarily received before something is run.

Tasks are best for two scenarios:

  1. Executing multiple IO-bound operations concurrently which don't depend upon one another.
  2. Executing operations which are "fire and forget", e.g., the executor doesn't necessarily care about the result.

seandstewart avatar Jul 14 '21 22:07 seandstewart

Hi @seandstewart, thanks for your detailed explanation.

I'm aware of the behavior regarding order of operations in asyncio and use cases for tasks, and I think my use case still apply — it's a "fire and forget" type of situation.

Consider:

asyncio.create_task(pubsub.subscribe("x"))
asyncio.create_task(pubsub.subscribe("y"))
asyncio.create_task(pubsub.subscribe("z"))
asyncio.create_task(pubsub.subscribe("w"))
await pubsub.subscribe("")
while True:
    message = await pubsub.get_message(timeout=100)
    print(message)

Note: the single await pubsub.subscribe() is there only to guarantee that get_message() doesn't bail out here: https://github.com/aio-libs/aioredis-py/blob/ef76b5ecbe8483a93a8f1f63fb15627328d72725/aioredis/client.py#L3986-L3993

I would not expect that the order of execution of any subscribe requests to matter (and I don't care about their result either), but get_message() sometimes does not return any published messages for some of the x..w channels, even after all of the tasks complete successfully (as evidenced by eg. using add_done_callback() on the tasks).

This boils down to this question: is that by design that all subscription requests must complete before get_message() begins?

alecov avatar Jul 14 '21 23:07 alecov

I'm taking a look at this now. There's something I don't quite understand about how create_task() works. It's not very well documented. I'll continue my investigation!

abrookins avatar Jul 23 '21 01:07 abrookins

This is interesting. I'd like to get this nailed down so we can advise people on how to use create_task() effectively with aioredis-py.

I created a silly example that doesn't use redis, and it seems to have the same problem. Check it out.

#!/usr/bin/env python3

import asyncio
import aioredis

async def main():
    redis = aioredis.from_url("redis://localhost")
    async with redis.pubsub() as pubsub:
        asyncio.create_task(pubsub.subscribe("xxx")) # vs. await pubsub.subscribe("")
        await pubsub.subscribe("yyy")
        while True:
            message = await pubsub.get_message(timeout=100)
            print(message)

            
class Greeter:
    def __init__(self) -> None:
        self.greetings = []

    async def greet(self, name):
        print(f"Greeting {name}")
        self.greetings.append(f"hello {name}")

    async def get_greetings(self):
        return self.greetings


async def main2():
    greeter = Greeter()
    asyncio.create_task(greeter.greet("bob"))
    asyncio.create_task(greeter.greet("suzie"))
    asyncio.create_task(greeter.greet("billy"))

    greetings = await greeter.get_greetings()
    print(greetings)
    num_greetings = len(greetings)

    while True:
        # Why does it work with sleep?
        await asyncio.sleep(1)
        greetings = await greeter.get_greetings()
        if len(greetings) > num_greetings:
            print(greeter.greetings)
            num_greetings = len(greetings)


asyncio.run(main2())

If you take out the await asyncio.sleep(1) call, this code prints an empty list and waits forever:

env ❯ python test_pubsub.py
[]

Add that sleep and...

env ❯ python test_pubsub.py
[]
Greeting bob
Greeting suzie
Greeting billy
['hello bob', 'hello suzie', 'hello billy']

The docs on exactly how create_task works are a little sparse. If you don't await these tasks, the scheduler seems to just try them at an appropriate time. But what is an appropriate time, exactly? Or why does calling await asyncio.sleep(1) let the scheduler run our task, but not await greeter.get_greetings()? 🤔

abrookins avatar Jul 23 '21 06:07 abrookins

@abrookins @alecov I found this full thread to be very helpful learning why create_task does this: https://stackoverflow.com/a/62529343

TL;DR in your event loop, imo adding await asyncio.sleep(0.01) or await asyncio.create_task(coro) will make it work.

In general, on the 5th line greetings = await greeter.get_greetings(), it receives the empty list because this is immediately executed and will always come before whatever is in the tasks. If you take out the entire while loop, you'll see:

[]
Greeting bob
Greeting suzie
Greeting billy

Ok, but if we put the while loop back in but leave the asyncio.sleep commented out (I added break in the if block), you'll be stuck in the loop. I think if you don't have the asyncio.sleep and since asyncio runs in a single thread, you're simply blocking the execution for other tasks in the event loop (the well known thing about async frameworks is that when something is in suspension (not sure if that's the right word), then the loop will find something else to do; in this case, there was no worthy suspension to find something else to do).

If instead you did:

    while True:
        # Why does it work with sleep?
        greetings = await asyncio.create_task(greeter.get_greetings())
        if len(greetings) > num_greetings:
            print(greeter.greetings)
            num_greetings = len(greetings)

You'd see it works because of an additional task. If you don't await that new task, (and switched len(greetings) for len(greeter.greetings)), again you'd be stuck with a backlog of pending tasks.

Quoting from that answer:

(In the same analogy, awaiting the task with await is the equivalent of joining a thread.)

Notes

Code in question:

import asyncio

async def counter_loop(x, n):
    for i in range(1, n + 1):
        print(f"Counter {x}: {i}")
        await asyncio.sleep(0.5)
    return f"Finished {x} in {n}"

async def main():
    slow_task = asyncio.create_task(counter_loop("Slow", 4))
    fast_coro = counter_loop("Fast", 2)

    print("Awaiting Fast")
    fast_val = await fast_coro
    print("Finished Fast")

    print("Awaiting Slow")
    slow_val = await slow_task
    print("Finished Slow")

    print(f"{fast_val}, {slow_val}")

asyncio.run(main())

Output:

001 | Awaiting Fast
002 | Counter Fast: 1
003 | Counter Slow: 1
004 | Counter Fast: 2
005 | Counter Slow: 2
006 | Finished Fast
007 | Awaiting Slow
008 | Counter Slow: 3
009 | Counter Slow: 4
010 | Finished Slow
011 | Finished Fast in 2, Finished Slow in 4

Running this code, "counter fast: 1" always appears to be printed first. Is there a deterministic reason for this? If there's a task already submitted by create_task and then an await coroutine call, is the await coroutine guaranteed to run first and if so why? ..or this is just an implementation-dependent determinism

If you are referring to the code exactly as in the question, there is. In general asyncio doesn't guarantee the order in which runnable tasks are executed, but here the code awaits a coroutine, not a task. The code in a directly awaited coroutine is immediately executed (without yielding to the event loop) up to the first suspension. Since there is no suspension between await fast_coro and the first print, it will always before anything that comes from another task.

Andrew-Chen-Wang avatar Jul 23 '21 14:07 Andrew-Chen-Wang

If you take out the await asyncio.sleep(1) call, this code prints an empty list and waits forever

When you call greetings = await greeter.get_greetings() the result is ready immediately so it just returns that. In other words, you never transfer thread control back to the loop and asyncio.eventloop is precluded from scheduling other tasks (check the CPU% in htop to see the Python executable consuming 100% CPU on the busy loop). Returning the result immediately when it's ready is know as dispatch semantics. The opposite of dispatch semantics are post semantics (NodeJS folks know it as run-to-completion semantics).

It's never safe to use dispatch semantics by default. Python goes to great lengths to perform post semantics by default. For instance, when you create a coroutine, no code is executed. If no code is executed then no event is registered to wakeup that coroutine later. You need to execute the coroutine until its first suspension point so an event is registered. For instance, for the following coroutine:

async def foobar():
    await asyncio.sleep(1)
    print("Hello World")

You need to execute foobar until asyncio.sleep() is called. If asyncio.sleep() is not called then no timer is registered to wakeup this coroutine later. That should suffice to illustrate why it's important to execute the coroutine until it's first suspension point.

Why then Python doesn't always execute the coroutine until it's first suspension point as soon as it's created? Post semantics. Running the coroutine immediately would break post semantics. Running the coroutine immediately would be dispatch semantics and that's dangerous. To understand the issues behind dispatch semantics by default in detail (e.g. unfairness, starvation, stack overflow, ...) you may read Executors and Asynchronous Operations from Christopher Kohlhoff. That's why every async operation in Chris's library (not related to Python) has a comment such as “Regardless of whether the asynchronous operation completes immediately or not, the handler will not be invoked from within this function. Invocation of the handler will be performed in a manner equivalent to using boost::asio::io_context::post().”

And then we have asyncio.create_task(). All it does is schedule the coroutine to near execution (post semantics) through loop.call_soon(). You may check this yourself on Python's source code: https://github.com/python/cpython/blob/bb3e0c240bc60fe08d332ff5955d54197f79751c/Lib/asyncio/tasks.py#L112.

You call asyncio.create_task() when you need to increase the level of concurrency (when you have serial work where current operations blocks progress of the next operation, you already can use await).

EDIT

Ninja'd. It seems you found the answer yourself while I was typing.

vinipsmaker avatar Jul 23 '21 14:07 vinipsmaker

lol @vinipsmaker yours is definitely more right than my guesses :D Very cool concepts!

Andrew-Chen-Wang avatar Jul 23 '21 14:07 Andrew-Chen-Wang

@Andrew-Chen-Wang @vinipsmaker Yeah, the following three are all conceptually similar (mind the details not):

retval = await coro()
task = asyncio.create_task(coro())
retval = await task
thread = create_thread(func)
retval = thread.join()

So apart the details, create_task() posts a task to the currently running event loop, with the intention being "run this thing concurrently (pseudo-parallel) for me".

Regardless, what happens in aioredis in this situation is that the following very obvious usage pattern doesn't work:

async def read_loop():
    while True:
        message = await pubsub.get_message()
        await dispatch(message)

# create a concurrent async task for reading/dispatching.
# won't work due to:
# 1. this bug; and
# 2. need to at least one subscription (see point 2 below).
asyncio.create_task(read_loop())

# in some other running coros:
async def coro():
    ...
    pubsub.subscribe("somechannel")
    ...

From the example on my second comment:

asyncio.create_task(pubsub.subscribe("x"))
asyncio.create_task(pubsub.subscribe("y"))
asyncio.create_task(pubsub.subscribe("z"))
asyncio.create_task(pubsub.subscribe("w"))
await pubsub.subscribe("")
while True:
    message = await pubsub.get_message(timeout=100)
    print(message)

All channels get effectively subscribed at the Redis server (which means the tasks complete at the socket level), but for some reason the internal aioredis machinery loses track of this and get_message() does not return messages for some, all or none of the channels subscribed depending on the way the tasks are scheduled, which makes this a clear bug in my opinion.

NB: While on the matter, a few further thoughts on the get_message() interface:

  1. The timeout argument is unneeded (since coros can just be cancelled) and actually harmful (if you forget it, the code might spin). Synchronization points (that is, awaits) are calls which would block if they were synchronous: they do block the calling coroutine but yield control to the scheduler. There's no need for timeouts in code such as this;
  2. await get_message() should just block and not bail out due to missing subscriptions or anything. There should be no need to couple the necessity of having previous subscriptions before calling get_message();
  3. It's actually a good thing to have get_message() instead of some monolithic call like run() that some async libraries offer, because it allows the user to control exactly how the read loop is implemented.

I believe the first two points are due to how redis-py works, but do mind that aioredis is an async library, and as such it is important to focus on the async part and not implement every aspect of the API it tries to emulate.

alecov avatar Jul 23 '21 15:07 alecov

Great comments from everyone involved. I am going to continue driving this issue until we arrive at plain language that explains how to use asyncio.create_task() with aioredis-py, specifically with PubSub because we're talking about it now.

I don't think we're there yet. We'll probably be there when I understand it myself. ;)

So let's delve deeper.

The Greeter Example

Returning to my "Greeter" example, my intuition was the same as Andrew's:

in this case, there was no worthy suspension to find something else to do

And:

TL;DR in your event loop, imo adding await asyncio.sleep(0.01) or await asyncio.create_task(coro) will make it work.

This doesn't make sense to me yet. In both examples in this thread, we use await inside of the while loop. Why isn't await enough to tell asyncio it can switch? That's even what the SO post you linked says:

But create_task does exactly that: submit it to the event loop for execution concurrently with other tasks, the point of switching being any await.

You're right that wrapping the coroutine in a task solves the problem. This works:

        greetings = await asyncio.create_task(greeter.get_greetings())

And this:

    while True:
        await asyncio.sleep(.000001)
        greetings = await greeter.get_greetings()

While this does not:

    while True:
        greetings = await greeter.get_greetings()

But let's return to @alecov's example code with the insight that this use case might only work if users add a call to asyncio.sleep() or wrap the coroutine in the while loop in a task.

The Original Aioredis Example

If I understand correctly, the use case in question is creating multiple Tasks to subscribe to channels and expecting asyncio to run them soon, interleaved with calls to get_message().

@alecov does not seem to expect to have to use asyncio.sleep() or wrap the get_message() coroutine in a Task for this to work. With my current incomplete understanding, this seems like a reasonable expectation.

Let's try the solutions that worked for the Greeter code. What if we use asyncio.sleep() within the while loop?

# This seems to break our assumption about asyncio.sleep()?
async def main6():
    redis = aioredis.from_url("redis://localhost")
    async with redis.pubsub() as pubsub:
        asyncio.create_task(pubsub.subscribe("xxx")) # vs. await pubsub.subscribe("")
        await pubsub.subscribe("yyy")
        while True:
            await asyncio.sleep(1)
            message = await pubsub.get_message(timeout=100)
            print(message)

Curiously, asyncio.sleep() doesn't work here:

env ❯ python test_pubsub.py 6
{'type': 'subscribe', 'pattern': None, 'channel': b'yyy', 'data': 1}

But why not?

What if we sleep right after creating the task?

# Sleeping after create_task() works. Why?
async def main5():
    redis = aioredis.from_url("redis://localhost")
    async with redis.pubsub() as pubsub:
        asyncio.create_task(pubsub.subscribe("xxx")) # vs. await pubsub.subscribe("")
        await asyncio.sleep(1)  # <-- Sleeping right after create_task() works.
        await pubsub.subscribe("yyy")
        while True:
            message = await pubsub.get_message(timeout=100)
            print(message)

That works:

env ❯ python test_pubsub.py 5 
{'type': 'subscribe', 'pattern': None, 'channel': b'xxx', 'data': 1}
{'type': 'subscribe', 'pattern': None, 'channel': b'yyy', 'data': 2}

However, sleeping elsewhere doesn't work.

        asyncio.create_task(pubsub.subscribe("xxx")) # vs. await pubsub.subscribe("")
        await pubsub.subscribe("yyy")
        await asyncio.sleep(1)  # <-- Sleeping here does NOT work.

I don't know why this is the case.

But surely, users can wrap the coroutine within the while loop in a task, like we could with Greeter:

async def main11():
    redis = aioredis.from_url("redis://localhost")
    async with redis.pubsub() as pubsub:
        asyncio.create_task(pubsub.subscribe("xxx")) # vs. await pubsub.subscribe("")
        await pubsub.subscribe("yyy")
        while True:
            # Surely this should work?
            message = await asyncio.create_task(pubsub.get_message(timeout=100))
            print(message)

But it doesn't work:

env ❯ python test_pubsub.py 11
{'type': 'subscribe', 'pattern': None, 'channel': b'yyy', 'data': 1}

Clearly, I am missing something!

I'll post this now in case someone else sees the thing that I'm missing and can point it out. Meanwhile, I will continue to dwell in contemplation on this matter.

abrookins avatar Jul 23 '21 19:07 abrookins

@bmerry I'm curious if you have any insight into this. I've seen you mention Tasks in other issues. It's very possible I'm simply being dense and need some repeated explanations of the same concept, but currently, my understanding is incomplete!

abrookins avatar Jul 23 '21 20:07 abrookins

@abrookins The Greeter example is rather simple to get if you understand how the scheduler works, so let me attempt to clarify things for you.

After running this code:

async def main():
    t1 = asyncio.create_task(greeter.greet("bob"))
    t2 = asyncio.create_task(greeter.greet("suzie"))
    t3 = asyncio.create_task(greeter.greet("billy"))

asyncio.run(main())

The scheduler working state will look roughly like this:

goal_task = main # the task which the event loop needs concluded
currently_running = main
pending = [t1, t2, t3]

Now, when this code runs:

async def get_greetings(self):
    return self.greetings
await greeter.greetings() # call it t4

The scheduler will suspend main (put it inside pending at an arbitrary position, but it just happens to be in the head of the queue) and schedule t4 immediately:

currently_running = t4
pending = [main, t1, t2, t3]

But greeter.get_greetings() has no suspension point (no await anything), so it returns immediately. Then main continues execution as previously:

currently_running = main
pending = [t1, t2, t3]

And after main finishes (it runs greeter.get_greetings() again with the same above effect), the event loop stops because we told asyncio to only run main to completion, not any pending tasks in the queue. main is the goal task -- once it finishes, the event loop stops and asyncio.run() returns (synchronously).

Now, what happens when we insert an await asyncio.sleep() in main?

This happens: asyncio.sleep() is a control yielding call which, conceptually, goes to the end of the pending task queue (while the sleep interval has not yet passed, asyncio.sleep() cannot complete). [NB: it works a little different than that because it's an I/O-like call, but let's keep things simple.]

currently_running = None
pending = [t1, t2, t3, asyncio.sleep(), main]

main goes after asyncio.sleep() in this queue precisely because it synchronizes with the latter (main awaits asyncio.sleep(), so it cannot continue while the sleep doesn't complete). So the scheduler becomes idle: the coroutine it was executing has halted and the scheduler has nothing else to do.

What does it do next, then? It picks the next pending task and executes it:

currently_running = t1
pending = [t2, t3, asyncio.sleep(), main]

And so on, for t2 and t3, and it will execute all pending tasks at least until it is preempted arbitrarily by the resolving sleep (which is implemented in a low-level select()-like call and will preempt the scheduler not earlier than the sleep interval). Hence all three tasks complete:

currently_running = None
pending = [asyncio.sleep(), main]

Then the scheduler becomes idle again (since it has nothing else left for it to do) until the sleep completes. Afterwards, main completes, and asyncio.run() returns as explained previously.

There might be one or other detail missing from the above explanation, but conceptually this is what happens. What is important to notice is this: when you create and schedule a task, it only executes if it ever gets the chance to execute. It seems that the particular implementation of the scheduler always suspends callers at the head of the pending task queue (if no I/O is involved); regardless, what matters is that the created tasks were never awaited for, so nothing synchronizes with them, and thus they might never execute.

When scheduled tasks execute non-awaitedly, we can say in a sense that they've been executed rather speculatively by an otherwise idle scheduler which decided to do it just because it could afford it. Of course, the meaning of "being scheduled for execution" is exactly this: execute it when and if you're able to, unless I require completion (await or async.run()), in which case you must execute it.

Compare all of this with threads: if async tasks are threads and awaiting is joining, then if a thread is never joined, it is a detached thread. A detached thread might as well never get the chance to execute if the process finishes quickly enough (the "event loop" in this case is the very OS scheduler).

Please let me know if this explanation was of any utility for you.

alecov avatar Jul 23 '21 21:07 alecov

Delving a bit further, if an asyncio lock is used to protect the pubsub object:

import asyncio
import aioredis
from contextvars import ContextVar

redis = aioredis.from_url("redis://localhost")
pubsub = redis.pubsub()
lock = ContextVar("lock")

async def coro(x):
    async with lock.get():
        await pubsub.subscribe(x)

async def main():
    lock.set(asyncio.Lock())
    async with pubsub:
        asyncio.create_task(coro("x"))
        asyncio.create_task(coro("y"))
        asyncio.create_task(coro("z"))
        asyncio.create_task(coro("w"))
        async with lock.get():
            await pubsub.subscribe("xxx")
        while True:
            message = await pubsub.get_message(timeout=100)
            print(message)

asyncio.run(main())

the problem disappears. So PubSub objects are not async-concurrent-safe for subscribe calls (which is perhaps okay), but it still requires a previous subscription before get_message() becomes useful.

alecov avatar Jul 23 '21 21:07 alecov

Very interesting, and thanks for the in-depth exploration of how the scheduler works, @alecov.

PubSub already has an async lock. If we locked subscribe methods, PubSub would work for people using Tasks like this.

Perhaps we could go further and continue using thread-like primitives to solve the other problem too. We could set an asyncio.Event after subscribe completes its work. When someone calls read_message() or listen(), we can asyncio.wait_for() the event to be set. I'll noodle on a PR for this to get more feedback.

abrookins avatar Jul 24 '21 00:07 abrookins

@alecov Not sure how I feel about this yet but here's some noodling on allowing PubSub to be used from Tasks more effectively: https://github.com/aio-libs/aioredis-py/pull/1067/files

abrookins avatar Jul 24 '21 06:07 abrookins

@abrookins Just a quick followup, I'll be really busy during the next two weeks, so I'll only be able to take a deeper look at your PR after that. I do appreciate you taking your time to discuss and solve this issue!

alecov avatar Jul 26 '21 18:07 alecov