fastapi-mail
fastapi-mail copied to clipboard
Is it possible to send email messages in background using celery?
I use redis as broker and celery as worker, so when I try to send messages using celery, then I got error like:
Arguments: {'hostname': 'celery@639c8e328783', 'id': 'bbe0aa71-7b7b-49d5-a4ab-83cb9b1d4365', 'name': 'send_email', 'exc': "PydanticClassRequired('Message schema should be provided from MessageSchema class')"
@celery.task(name='send_email') def send_email(message: dict) -> None: fm = FastMail(email_config) mes = MessageSchema(**message) asyncio.run(fm.send_message(mes))
here is how my task looks like, my server doesnt break. as I understand, redis make dict from messageschema and sending email doesnt work, maybe someone met this issue
@jabajke Celery only works with synchronous code, and fast-api mail is completely asynchronous, so it doesn't run in celery workers.
You can try asgiref library.
from asgiref.sync import async_to_sync
Although it is an old post, I leave the solution in case anyone has the same question: This is the solution I found for sending mails with celery and fastapi-mail
========== FASTAPI-MAIL =============
from fastapi_mail import ConnectionConfig, FastMail, MessageSchema, MessageType
conf = ConnectionConfig(
# your config here
...
)
async def send_email():
message = MessageSchema(
subject="FASTAPI test",
recipients=["[email protected]"],
body="Test",
subtype=MessageType.html,
)
fm = FastMail(conf)
await fm.send_message(message)
return {"message": "Email sent"}
========== CELERY =========
from celery import Celery
from celery.states import FAILURE, PENDING
import asyncio
celery = Celery("app")
@celery.task(name="celery-mail", bind=True)
def celery_mail(self):
self.update_state(state=PENDING)
try:
asyncio.run(send_email())
except Exception as e: # customize exception if needed
self.update_state(state=FAILURE)
raise e
========== FASTAPI =========
from fastapi import Fastapi
app = Fastapi()
@app.post("/send-email")
async def send_celery_email():
celery_mail.delay()
This is working in production code
Although it is an old post, I leave the solution in case anyone has the same question: This is the solution I found for sending mails with celery and fastapi-mail
========== FASTAPI-MAIL =============
from fastapi_mail import ConnectionConfig, FastMail, MessageSchema, MessageType conf = ConnectionConfig( # your config here ... ) async def send_email(): message = MessageSchema( subject="FASTAPI test", recipients=["[email protected]"], body="Test", subtype=MessageType.html, ) fm = FastMail(conf) await fm.send_message(message) return {"message": "Email sent"}========== CELERY =========
from celery import Celery from celery.states import FAILURE, PENDING import asyncio celery = Celery("app") @celery.task(name="celery-mail", bind=True) def celery_mail(self): self.update_state(state=PENDING) try: asyncio.run(send_email()) except Exception as e: # customize exception if needed self.update_state(state=FAILURE) raise e========== FASTAPI =========
from fastapi import Fastapi app = Fastapi() @app.post("/send-email") async def send_celery_email(): celery_mail.delay()This is working in production code
Thank you, that really works