taskiq
taskiq copied to clipboard
Dependency Context Not Propagating to Broker in Task Execution
Issue: Dependency Context Not Propagating to Broker in Task Execution
Problem Description
I'm using a fastapi_app fixture along with InMemoryBroker and InmemoryResultBackend from taskiq. The setup works well for the FastAPI app itself, but tasks executed by TaskIQ encounter an error related to missing dependencies.
Here’s a snippet of my fastapi_app fixture:
@pytest.fixture
def fastapi_app(
dbsession: AsyncSession,
test_rmq_pool: Pool[Channel],
fake_redis_pool: ConnectionPool,
test_searcher: CustomSearcher,
memory_file_system: MemoryFileSystem,
) -> FastAPI:
from backend.web.application import get_app
application = get_app()
application.dependency_overrides[get_db_session] = lambda: dbsession
return application # noqa
And here’s the initialization of the broker:
from taskiq.brokers.inmemory_broker import InMemoryBroker, InmemoryResultBackend
result_backend = InmemoryResultBackend()
broker = InMemoryBroker().with_result_backend(result_backend)
In conftest.py, I also have the following fixture:
@pytest.fixture(autouse=True)
def init_taskiq_deps(fastapi_app: FastAPI):
from backend.tkq import broker
taskiq_fastapi.populate_dependency_context(broker, fastapi_app)
yield
broker.custom_dependency_context = {}
Issue Observed
Despite setting up the dependency context, task executions still fail with an error indicating missing dependencies. Here’s the error log:
{
"is_err": true,
"log": null,
"return_value": null,
"execution_time": 0.01,
"labels": {
"asana_task_id": "1111"
},
"error": {
"exc_type": "AttributeError",
"exc_message": [
"'State' object has no attribute 'db_session_factory'"
],
"exc_module": "builtins",
"exc_cause": null,
"exc_context": {
"exc_type": "KeyError",
"exc_message": [
"db_session_factory"
],
"exc_module": "builtins",
"exc_cause": null,
"exc_context": null,
"exc_suppress_context": false
},
"exc_suppress_context": false
}
}
Solution
The issue was resolved by explicitly updating the broker’s dependency overrides using fastapi_app.dependency_overrides. The updated code looks like this:
@pytest.fixture(autouse=True)
def init_taskiq_deps(fastapi_app: FastAPI):
from backend.tkq import broker
taskiq_fastapi.populate_dependency_context(broker, fastapi_app)
for k, v in fastapi_app.dependency_overrides.items():
broker.dependency_overrides.update({k: v})
yield
broker.custom_dependency_context = {}
Suggested Improvement
It would be beneficial if the dependency context setup in populate_dependency_context could propagate fastapi_app overrides automatically to broker.dependency_overrides or if documentation could be expanded to clarify this requirement.