aio-pika
aio-pika copied to clipboard
How to use it correctly with py-cord? (Task was destroyed but it is pending)
Hello. Following the tutorial from the wiki (https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/1-introduction.html#receiving) I don't have any problems, but if I try to run main()
via await in the on_ready()
event, then the code will start giving errors like this:
Traceback (most recent call last):
async with connection:
RuntimeError: coroutine ignored GeneratorExit
Ignoring exception in on_ready
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/discord/client.py", line 378, in _run_event
await coro(*args, **kwargs)
File "/home/host/servers/bots/adventblocks/bot.py", line 51, in on_ready
await main()
RuntimeError: coroutine ignored GeneratorExit
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_ready' coro=<Client._run_event() done, defined at /usr/local/lib/python3.10/dist-packages/discord/client.py:370> wait_for=<Future pending cb=[Task.task_wakeup()]>>
[x] Received message IncomingMessage:{'app_id': None.......
Please add your code example.
@bot.event
async def on_ready():
await main()
async def send_message(message: AbstractIncomingMessage) -> None:
print(" [x] Received message %r" % message)
print("Message body is: %r" % message.body)
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue("request")
# Start listening the queue with name 'hello'
await queue.consume(send_message, no_ack=True)
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
I used a slightly different way before, but it's unstable when the rabbitmq server is unavailable - if more than two times, then I get this error:
Connection attempt to "amqp://guest:******@localhost:5672//" failed: [Errno 111] Connect call failed ('127.0.0.1', 5672). Reconnecting after 5 seconds.
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_ready' coro=<Client._run_event() running at /usr/local/lib/python3.10/dist-packages/discord/client.py:378> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Connection attempt to "amqp://guest:******@localhost:5672//" failed: [Errno 111] Connect call failed ('127.0.0.1', 5672). Reconnecting after 5 seconds.
@bot.event
async def on_ready():
await connect_rabbitmq()
async def send(message):
...
async def connect_rabbitmq():
connection = await aio_pika.connect_robust(host="localhost")
queue_name = "request"
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(queue_name)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
await send(message.body)
The first of all your main()
should be a separate task IMHO.
Just add:
@bot.event
async def on_ready():
task = asyncio.create_task(main())
# if the your discord library supports long events just add awaiting of this task
# await task
But to be honest, I would advise you to look at aiomisc and run it as 2 separate aiomisc.Service
Using task = asyncio.create_task(connect_rabbitmq())
Now I get this after first getting a message from rabbitmq:
Exception ignored in: <coroutine object connect_rabbitmq at 0x7faa2b35eb20>
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/discord/http.py", line 111, in __init__
{
RuntimeError: coroutine ignored GeneratorExit
Task was destroyed but it is pending!
task: <Task pending name='Task-16' coro=<connect_rabbitmq() running at **/bot.py:225> wait_for=<Future pending cb=[Task.task_wakeup()]>>
async def send_message(message: AbstractIncomingMessage):
nickname = json.loads(message.body.decode())["nickname"]
data = json.loads(message.body.decode())["data"]
embed = discord.Embed(...)
await channel.send(...)
Okay, I'll use my old method for now, but via task = asyncio.create_task(connect_rabbitmq())
What about reconnects? After 15-20 reconnects I get this:
Task was destroyed but it is pending!
task: <Task pending name='Task-16' coro=<connect_rabbitmq() running at /home/host/servers/bots/adventblocks/bot.py:236> wait_for=<Future pending cb=[Task.task_wakeup() ]>>
I really don't understand the architecture of your service, but it looks like the event loop has been stopped.
You may use this example instead of aio_pika
and, I guess, the result will be the same.
@bot.event
async def on_ready():
try:
await asyncio.sleep(86400)
except:
print("oooops")
else:
print("The mosquito was wrong")
Miracles. Sometimes (after restarting the bot) this problem does not occur. Magic again
Ever since I got an "D" in telepathy in high school, I don't believe in magic.
this problem only occurs when reconnecting to some of the attempts SOMETIMES. Someone has ideas?
The try:
block prevents this problem from occurring.. Lol what
@bot.event
async def on_ready():
await bot.change_presence(activity=discord.Game(name=".help"))
bot.add_view(Requests())
task = asyncio.create_task(connect_rabbitmq())
print("One")
try:
await asyncio.sleep(1000)
except:
print("oooops")
traceback.format_exc()
else:
print("The mosquito was wrong")
print("Two")
on my local PC everything seems to be fine. But it's not certain yet.
no, the same thing on the test ubuntu 22.04 hyper-v instance
Same thing with this one! On test host!
import asyncio
import json
import traceback
import aio_pika
async def send(message):
nickname = json.loads(message.decode())["nickname"]
data = json.loads(message.decode())["data"]
print(f"Заявка на ник: {nickname}")
async def connect_rabbitmq():
# logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(host="localhost")
print("Успешно подключено к RabbitMQ!")
queue_name = "request"
# Creating channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
# Declaring queue
queue = await channel.declare_queue(queue_name)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
await send(message.body)
async def connect():
asyncio.create_task(connect_rabbitmq())
print("One")
try:
await asyncio.sleep(5000)
except:
print("oooops")
traceback.format_exc()
else:
print("The mosquito was wrong")
print("Two")
asyncio.run(connect())
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<connect_rabbitmq() running at /home/main/test2.py:30> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Even with a try:
block
But this time it took longer to get the problem
This is 100% happening with the code above, check it yourself. Here is the log: custom.log