python-dependency-injector
python-dependency-injector copied to clipboard
Wiring Dependencies in sanic for background tasks not working
I have a sanic app. I want to run some background jobs. I have configured the container but am getting unexpected beavior
# container.py
"""Containers module."""
from dependency_injector import containers, providers
from domain.job.ports.repository import IJobRepository
from domain.job.ports.service import IJobService
from domain.job.services import JobService
from infrastructure.database.job.repository import JobRepository
class Container(containers.DeclarativeContainer):
config = providers.Configuration()
database = "xxxxx" # Database()
job_repository = providers.Factory(IJobRepository, JobRepository, db=database)
#
job_service = providers.Factory(
IJobService, JobService,
repository=job_repository,
)
here are my background task where they are added to sanic add_task
from sanic import Sanic
from dependency_injector.wiring import Provide, inject
from adapters.application.container import Container
from domain.job.ports.service import IJobService
@inject
async def job_runner(job_service: IJobService= Provide[Container.job_service]):
print(job_service)
sorted = await job_service.fetch_sorted()
async def init_jobs():
await job_runner()
def run_bg_tasks(app: Sanic):
app.add_task(init_jobs)
i registred my container as below
# boot.py
from .container import Container
from adapters.application.tasks import run_bg_tasks
...
def register_felicity(app: Sanic):
register_configs(app)
register_events(app)
register_middlewares(app)
register_dependencies(app)
register_blueprints(app)
register_graphql(app)
register_health(app)
app.ctx.container = Container()
run_bg_tasks(app)
Finally i run sanic application
# main.py
"""Note:
- The felicity app must be created first before importing register_felicity.
- This is important to register felicity in sanic registry :) its a sanic thing anyways
"""
from sanic import Sanic
felicity = Sanic("felicity-hexagonal")
from adapters.application import register_felicity
register_felicity(felicity)
i then ran the app
uvicorn main:felicity --reload
The print(job_service) outputs <dependency_injector.wiring.Provide object at 0x7f7079e17590>
The call sorted = await job_service.fetch_sorted() fails with error
sorted = await job_service.fetch_sorted()
^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Provide' object has no attribute 'fetch_sorted'
My use case it to rum some jobs at some intervals. These jobs can not be run simpy by the sanic add_task as they are created deep inside the application. I have a Job service that needs to keep track of all created jobs and then runs them. These jobs also depend on services some with many dependencies, which is why i need dependency injection. I did not find a way to use default sanic dependency injection.
If there is a way to also use this library with apscheduler. i am open to other better alternative usecases
Thank you