aio-pika icon indicating copy to clipboard operation
aio-pika copied to clipboard

How to use it correctly with py-cord? (Task was destroyed but it is pending)

Open zentixua opened this issue 1 year ago • 17 comments

Hello. Following the tutorial from the wiki (https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/1-introduction.html#receiving) I don't have any problems, but if I try to run main() via await in the on_ready() event, then the code will start giving errors like this:

Traceback (most recent call last):
    async with connection:
RuntimeError: coroutine ignored GeneratorExit
Ignoring exception in on_ready
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/discord/client.py", line 378, in _run_event
    await coro(*args, **kwargs)
  File "/home/host/servers/bots/adventblocks/bot.py", line 51, in on_ready
    await main()
RuntimeError: coroutine ignored GeneratorExit
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_ready' coro=<Client._run_event() done, defined at /usr/local/lib/python3.10/dist-packages/discord/client.py:370> wait_for=<Future pending cb=[Task.task_wakeup()]>>
 [x] Received message IncomingMessage:{'app_id': None.......

zentixua avatar Apr 11 '23 10:04 zentixua

Please add your code example.

mosquito avatar Apr 11 '23 10:04 mosquito


@bot.event
async def on_ready():
    await main()

async def send_message(message: AbstractIncomingMessage) -> None:
    print(" [x] Received message %r" % message)
    print("Message body is: %r" % message.body)

    print("Before sleep!")
    await asyncio.sleep(5)  # Represents async I/O operations
    print("After sleep!")


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue("request")

        # Start listening the queue with name 'hello'
        await queue.consume(send_message, no_ack=True)

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()

zentixua avatar Apr 11 '23 10:04 zentixua

I used a slightly different way before, but it's unstable when the rabbitmq server is unavailable - if more than two times, then I get this error:

Connection attempt to "amqp://guest:******@localhost:5672//" failed: [Errno 111] Connect call failed ('127.0.0.1', 5672). Reconnecting after 5 seconds.
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_ready' coro=<Client._run_event() running at /usr/local/lib/python3.10/dist-packages/discord/client.py:378> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Connection attempt to "amqp://guest:******@localhost:5672//" failed: [Errno 111] Connect call failed ('127.0.0.1', 5672). Reconnecting after 5 seconds.
@bot.event
async def on_ready():
    await connect_rabbitmq()
    

async def send(message):
    ...


async def connect_rabbitmq():
    connection = await aio_pika.connect_robust(host="localhost")
    
    queue_name = "request"

    # Creating channel
    channel = await connection.channel()

    # Declaring queue
    queue = await channel.declare_queue(queue_name)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                await send(message.body)

zentixua avatar Apr 11 '23 11:04 zentixua

The first of all your main() should be a separate task IMHO.

Just add:

@bot.event
async def on_ready():
    task = asyncio.create_task(main())
    # if the your discord library supports long events just add awaiting of this task
    # await task

mosquito avatar Apr 11 '23 11:04 mosquito

But to be honest, I would advise you to look at aiomisc and run it as 2 separate aiomisc.Service

mosquito avatar Apr 11 '23 11:04 mosquito

Using task = asyncio.create_task(connect_rabbitmq())

Now I get this after first getting a message from rabbitmq:

Exception ignored in: <coroutine object connect_rabbitmq at 0x7faa2b35eb20>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/discord/http.py", line 111, in __init__
    {
RuntimeError: coroutine ignored GeneratorExit
Task was destroyed but it is pending!
task: <Task pending name='Task-16' coro=<connect_rabbitmq() running at **/bot.py:225> wait_for=<Future pending cb=[Task.task_wakeup()]>>
async def send_message(message: AbstractIncomingMessage):
    nickname = json.loads(message.body.decode())["nickname"]
    data = json.loads(message.body.decode())["data"]
    
    embed = discord.Embed(...)
    await channel.send(...)

zentixua avatar Apr 11 '23 11:04 zentixua

Okay, I'll use my old method for now, but via task = asyncio.create_task(connect_rabbitmq())

What about reconnects? After 15-20 reconnects I get this:

Task was destroyed but it is pending!
task: <Task pending name='Task-16' coro=<connect_rabbitmq() running at /home/host/servers/bots/adventblocks/bot.py:236> wait_for=<Future pending cb=[Task.task_wakeup() ]>>

zentixua avatar Apr 11 '23 11:04 zentixua

I really don't understand the architecture of your service, but it looks like the event loop has been stopped.

You may use this example instead of aio_pika and, I guess, the result will be the same.

@bot.event
async def on_ready():
    try:
        await asyncio.sleep(86400)
    except:
        print("oooops")
    else:
        print("The mosquito was wrong")

mosquito avatar Apr 11 '23 12:04 mosquito

Miracles. Sometimes (after restarting the bot) this problem does not occur. Magic again

zentixua avatar Apr 11 '23 12:04 zentixua

Ever since I got an "D" in telepathy in high school, I don't believe in magic.

mosquito avatar Apr 11 '23 12:04 mosquito

this problem only occurs when reconnecting to some of the attempts SOMETIMES. Someone has ideas?

zentixua avatar Apr 11 '23 13:04 zentixua

The try: block prevents this problem from occurring.. Lol what

@bot.event
async def on_ready():
    await bot.change_presence(activity=discord.Game(name=".help"))
    bot.add_view(Requests())
    task = asyncio.create_task(connect_rabbitmq())
    
    print("One")
    
    try:
        await asyncio.sleep(1000)
    except:
        print("oooops")
        traceback.format_exc()
    else:
        print("The mosquito was wrong")
        
    print("Two")

zentixua avatar Apr 11 '23 13:04 zentixua

on my local PC everything seems to be fine. But it's not certain yet.

zentixua avatar Apr 11 '23 13:04 zentixua

no, the same thing on the test ubuntu 22.04 hyper-v instance

zentixua avatar Apr 11 '23 13:04 zentixua

Same thing with this one! On test host!

import asyncio
import json
import traceback

import aio_pika


async def send(message):
    nickname = json.loads(message.decode())["nickname"]
    data = json.loads(message.decode())["data"]
    print(f"Заявка на ник: {nickname}")


async def connect_rabbitmq():
    # logging.basicConfig(level=logging.DEBUG)
    connection = await aio_pika.connect_robust(host="localhost")
    print("Успешно подключено к RabbitMQ!")

    queue_name = "request"

    # Creating channel
    channel = await connection.channel()

    # Will take no more than 10 messages in advance

    # Declaring queue
    queue = await channel.declare_queue(queue_name)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                await send(message.body)


async def connect():
    asyncio.create_task(connect_rabbitmq())

    print("One")

    try:
        await asyncio.sleep(5000)
    except:
        print("oooops")
        traceback.format_exc()
    else:
        print("The mosquito was wrong")

    print("Two")


asyncio.run(connect())
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<connect_rabbitmq() running at /home/main/test2.py:30> wait_for=<Future pending cb=[Task.task_wakeup()]>>

Even with a try: block

zentixua avatar Apr 11 '23 14:04 zentixua

But this time it took longer to get the problem

zentixua avatar Apr 11 '23 14:04 zentixua

This is 100% happening with the code above, check it yourself. Here is the log: custom.log

zentixua avatar Apr 12 '23 09:04 zentixua