taskiq-nats icon indicating copy to clipboard operation
taskiq-nats copied to clipboard

Struggling to make it work

Open XChikuX opened this issue 3 months ago • 3 comments

# broker.py
import asyncio

from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
from taskiq_nats import NatsBroker

result_backend = RedisAsyncResultBackend(
    redis_url="redis://default:P@[email protected]:6379",
)

# Or you can use PubSubBroker if you need broadcasting
# Or ListQueueBroker if you don't want acknowledges
broker = RedisStreamBroker(
    url="redis://default:P@[email protected]:6379",
).with_result_backend(result_backend)


broker_nats = NatsBroker(
    servers=["nats://nats:P@[email protected]:4222"], queue="my_queue"
).with_result_backend(result_backend)


@broker.task
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(0.5)
    print("All problems are solved!")


@broker_nats.task
async def best_task_ever_nats() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(0.5)
    print("All problems are solved with NATS!")


async def main():
    await broker.startup()
    await broker_nats.startup()
    task = await best_task_ever.kiq()
    print(await task.wait_result())
    task_nats = await best_task_ever_nats.kiq()
    print(await task_nats.wait_result())


if __name__ == "__main__":
    asyncio.run(main())

I've tried everything I understood before coming here.

  1. I started the broker with taskiq broker:broker
  2. I ran python3 broker.py

OUTPUT:

Worker (taskiq broker:broker)

08:00:23 ((.env) ) root@MSI nats ±|main ✗|→ taskiq worker broker:broker
[2025-09-17 20:00:24,913][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 97868
[2025-09-17 20:00:24,913][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2025-09-17 20:00:24,915][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 97869 
[2025-09-17 20:00:24,917][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 97871 
[2025-09-17 20:00:25,135][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2025-09-17 20:00:25,136][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2025-09-17 20:00:31,753][taskiq.receiver.receiver][INFO   ][worker-1] Executing task broker:best_task_ever with ID: bf5154a9a08d4434a49e14c562d8fffc
All problems are solved!

Broker (python3 broker.py)

08:00:30 ((.env) ) root@MSI nats ±|main ✗|→ python3 broker.py 
is_err=False log=None return_value=None execution_time=0.53 labels={} error=None

It's just indefinitely stuck after this.

Can you please help @s3rius

What am I getting wrong??

taskiq = "^0.11.17"
taskiq-fastapi = "^0.3.5" 
taskiq-nats = "^0.5.1"
taskiq-redis = "^1.0.9"

XChikuX avatar Sep 17 '25 20:09 XChikuX

Yes, because you only start brokers for broker:broker, which happens to be RedisStreamBroker.

In order to fix it, you can start another process that will use nats broker by starting taskiq worker broker:broker_nats.

Hope it helps.

s3rius avatar Sep 18 '25 10:09 s3rius

Thank you @s3rius.

I added some return values just for testing purposes.

10:28:37 ((.env) ) root@MSI nats ±|main ✗|→ python3 broker.py 
is_err=False log=None return_value='FTW!' execution_time=0.5 labels={} error=None
is_err=False log=None return_value='NATS FTW!' execution_time=0.51 labels={} error=None

I notice that the redis_result_backend doesn't receive anything from the nats_broker. Only the broker task is visible

Image

XChikuX avatar Sep 18 '25 10:09 XChikuX

It should not receive any tasks, because tasks you were sending asigned to different brokers. And they are sent in respected brokers only.

If you want to send it to another broker, you can try using: task.kicker().with_broker(other_broker).

s3rius avatar Sep 18 '25 10:09 s3rius