taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Need advice on dynamically change brokers.

Open wwarne opened this issue 5 months ago • 1 comments

I need advice on dynamically switching brokers.

This is similar to: https://github.com/taskiq-python/taskiq/issues/439

I'm trying to support multiple brokers in my app. The idea is that I can't create a broker instance in advance. The broker to use will be determined at runtime based on application settings.

# Not the real code, just to illustrate the idea
def get_real_broker(settings) -> AsyncBroker:
    if settings.broker_type == 'memory':
        return InMemoryBroker()
    elif settings.broker_type == 'redis':
        result_backend = RedisAsyncResultBackend(redis_url=settings.redis_url)
        return RedisStreamBroker(settings.redis_url).with_result_backend(result_backend)
    elif settings.broker_type == 'redis_sentinel':
        ...

Because of this, I can't use the @broker.task decorator since there is no broker instance available at import time.

Also, I want to be able to use .kiq() in my functions without triggering warnings from mypy or the IDE.

So I started thinking about something like this:

class ProxyBroker(AsyncBroker):
    def __init__(self, *args, **kwargs):
        self.real_broker = None
        super().__init__(*args, **kwargs)

This would allow me to define a global broker and still use the @broker.task decorator as intended:

broker = ProxyBroker()

@broker.task
async def add_one(value: int) -> int:
    return value + 1

async def some_other_function():
    # Some code
    await add_one.kiq(value=42)

Here’s what I tried in ProxyBroker:

async def kick(self, message: BrokerMessage) -> None:
    await self.real_broker.kick(message)

def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
    return self.real_broker.listen()

but encountered errors like:

task not found... Maybe you forgot to import it

From what I understand, @task wraps the function in an AsyncTaskiqDecoratedTask, which has a broker attribute.

I'm not sure how to proceed. Should I implement a set_real_broker() method that re-registers all tasks from the ProxyBroker to the actual broker instance?

The docs also mention:

"You can use kicker to change the broker, add labels, or even change task_id."

So maybe I should override the .kiq() method somehow?

I'd appreciate any thoughts or ideas on a better approach to solve this problem.

wwarne avatar Jun 07 '25 10:06 wwarne

I've just found AsyncSharedBroker + shared_task in source code (seems like it solves exactly my idea), i'll give it a try.

wwarne avatar Jun 07 '25 12:06 wwarne