taskiq-nats icon indicating copy to clipboard operation
taskiq-nats copied to clipboard

Error on startup PushBasedJetStreamBroker - "consumer name already in use"

Open Ruslan-Droid opened this issue 1 month ago • 0 comments

I'm trying to run the PushBasedJetStreamBroker as shown in the example. But when I run the main python file, I get an error - "consumer name already in use"

My code:

# broker_example.py
import asyncio
from taskiq_nats import PushBasedJetStreamBroker

broker = PushBasedJetStreamBroker(
    servers='localhost',
    queue='broker_example_queue',
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

My dependencies:

python = "^3.13"
taskiq = "^0.11.20"
taskiq-nats = "^0.5.1"

My startup steps:

  1. I started the broker with taskiq worker broker_example:broker
(testtaskiq-py3.13) PS C:\Users\Inquisitor\PycharmProjects\TestTaskiq> taskiq worker broker_example:broker -fsd
[2025-11-17 20:00:08,690][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 18400
[2025-11-17 20:00:08,690][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2025-11-17 20:00:08,694][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 18844 
[2025-11-17 20:00:08,697][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 8316 
[2025-11-17 20:00:09,215][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2025-11-17 20:00:09,216][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
  1. I ran python broker_example.py
Traceback (most recent call last):
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\broker_example.py", line 25, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "C:\Users\Inquisitor\AppData\Local\Programs\Python\Python313\Lib\asyncio\runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "C:\Users\Inquisitor\AppData\Local\Programs\Python\Python313\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "C:\Users\Inquisitor\AppData\Local\Programs\Python\Python313\Lib\asyncio\base_events.py", line 725, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\broker_example.py", line 17, in main
    await broker.startup()
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\taskiq_nats\broker.py", line 156, in startup
    await self._startup_consumer()
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\taskiq_nats\broker.py", line 208, in _startup_consumer
    self.consumer = await self.js.subscribe(
                    ^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\client.py", line 476, in subscribe
    consumer_info = await self._jsm.add_consumer(stream, config=config)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\manager.py", line 262, in add_consumer
    resp = await self._api_request(subject, req_data, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\manager.py", line 484, in _api_request
    raise APIError.from_error(resp["error"])
          ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\errors.py", line 89, in from_error
    raise BadRequestError(**err)
nats.js.errors.BadRequestError: nats: BadRequestError: code=400 err_code=10013 description='consumer name already in use'

I found one solution:

The queue name must match the the durable name.

If I do this, I don't have an error:

# broker_example.py
import asyncio
from taskiq_nats import PushBasedJetStreamBroker

broker = PushBasedJetStreamBroker(
    servers='localhost',
    queue="taskiq_consumer",
)

@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()
    await my_lovely_task.kiq()
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

The reason lies in the main js.subscribe function of the nats-py library::

# class JetStreamContext(JetStreamManager):
# async def subscribe
....
# If using a queue, that will be the consumer/durable name.
        if queue:
            if durable and durable != queue:
                raise nats.js.errors.Error(
                    f"cannot create queue subscription '{queue}' to consumer '{durable}'"
                )
            else:
                durable = queue    # ← main moment
...

Is it correct? or i'm doing something wrong?

Ruslan-Droid avatar Nov 17 '25 21:11 Ruslan-Droid