taskiq
taskiq copied to clipboard
Need advice on dynamically change brokers.
I need advice on dynamically switching brokers.
This is similar to: https://github.com/taskiq-python/taskiq/issues/439
I'm trying to support multiple brokers in my app. The idea is that I can't create a broker instance in advance. The broker to use will be determined at runtime based on application settings.
# Not the real code, just to illustrate the idea
def get_real_broker(settings) -> AsyncBroker:
if settings.broker_type == 'memory':
return InMemoryBroker()
elif settings.broker_type == 'redis':
result_backend = RedisAsyncResultBackend(redis_url=settings.redis_url)
return RedisStreamBroker(settings.redis_url).with_result_backend(result_backend)
elif settings.broker_type == 'redis_sentinel':
...
Because of this, I can't use the @broker.task decorator since there is no broker instance available at import time.
Also, I want to be able to use .kiq() in my functions without triggering warnings from mypy or the IDE.
So I started thinking about something like this:
class ProxyBroker(AsyncBroker):
def __init__(self, *args, **kwargs):
self.real_broker = None
super().__init__(*args, **kwargs)
This would allow me to define a global broker and still use the @broker.task decorator as intended:
broker = ProxyBroker()
@broker.task
async def add_one(value: int) -> int:
return value + 1
async def some_other_function():
# Some code
await add_one.kiq(value=42)
Here’s what I tried in ProxyBroker:
async def kick(self, message: BrokerMessage) -> None:
await self.real_broker.kick(message)
def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
return self.real_broker.listen()
but encountered errors like:
task not found... Maybe you forgot to import it
From what I understand, @task wraps the function in an AsyncTaskiqDecoratedTask, which has a broker attribute.
I'm not sure how to proceed. Should I implement a set_real_broker() method that re-registers all tasks from the ProxyBroker to the actual broker instance?
The docs also mention:
"You can use
kickerto change the broker, add labels, or even changetask_id."
So maybe I should override the .kiq() method somehow?
I'd appreciate any thoughts or ideas on a better approach to solve this problem.
I've just found AsyncSharedBroker + shared_task in source code (seems like it solves exactly my idea), i'll give it a try.