kopf
kopf copied to clipboard
How to embed kopf together with an asgi service?
Question
Context
I have an operator that is managed by a non-declarative API that acts like a compatibility layer with a legacy, non kubernetes, execution model. Today the API and the operator are two separate services so I added a redis-based caching layer in between them so the API does not have to query k8s everytime it gets a request: I have kopf
updating the cache whenever it handles a state change.
With the release of kopf's in-memory containers and indexing, this whole setup became unecessary, so I'm looking into merging the API with the operator, sacrificing a bit of single-responsibility in favor of having a much simpler and hopefully more efficient setup by having the API access kopf objects from memory. The API uses fastapi which is also async and based on ASGI.
Actual Question
From kopf's embedding documentation it seems like it's a good idea to have two different event loops running on separate threads, but kopf's execution model did not match any of the examples I could find of other people using fastapi
together with async libraries - most of them wanted to do some kind of initialization in a fastapi
hook such as webapp.on_event("startup")
.
So my question is: how should I integrate the two applications? Is it even a good idea?
Kinda Working POC
I actually got both events to start and run, and they seem to be running concurrent as expected, but I'm not 100% convinced this is the correct way, so I'm really looking for some validation. If this proves to be an useful use-case example I'd be happy to craft a working example and add it to kopf's documentation.
import asyncio
import contextlib
import threading
import kopf
import uvicorn
import uvloop
from myproject.api import webapp
def kopf_thread() -> None:
kopf_loop = uvloop.new_event_loop()
asyncio.set_event_loop(kopf_loop)
with contextlib.closing(kopf_loop):
kopf.configure(verbose=True)
kopf_loop.run_until_complete(kopf.operator())
def api_thread() -> None:
api_loop = uvloop.new_event_loop()
asyncio.set_event_loop(api_loop)
with contextlib.closing(api_loop):
config = uvicorn.Config(app=webapp, loop=api_loop)
server = uvicorn.Server(config)
api_loop.run_until_complete(server.serve())
def main() -> None:
t_kopf = threading.Thread(target=kopf_thread)
t_api = threading.Thread(target=api_thread)
t_kopf.start()
t_api.start()
t_api.join()
t_kopf.join()
Notes
- I'm explicitly using
uvloop
anduvicorn
here but neither are a hard requirement - I understand this means my API won't support forked worker servers such as
gunicorn
but I'm happy to scale it by adding more pods, leveraging kopf's peering - I have no idea if any of this conflicts with kopf's builtin healthcheck endpoint
Checklist
- [x] I have read the documentation and searched there for the problem
- [x] I have searched in the GitHub Issues for similar questions
Keywords
api fastapi asgi starlette embedding
Hello.
Thanks for sharing your experience with the framework and this feature! That is always highly valuable.
Regarding the integration: I cannot say what is considered a good way FastAPI (I don't know its mnemonics), but this example looks good enough from Kopf's point of view. You are welcome to add a section in the docs (e.g. "Recipes" / "FastAPI" in https://kopf.readthedocs.io/en/stable/embedding/) if it is short enough, or a public article if it is lengthy (and put a link to the docs).
There is one thing worth considering with more details (and is not covered by docs now): if one of the apps (Kopf in this example) exits by an error, the other one continues running, presumably forever. There should be a way to stop them both — e.g. by cancelling the other root task in another thread when a shared stop-flag is set. By the way, Kopf's operator()
has stop_flag
for graceful termination — maybe this can help here.
So, it would be something like this (I didn't test it, just imagined):
def kopf_thread(stop_me: threading.Event) -> None:
try:
kopf_loop = uvloop.new_event_loop()
asyncio.set_event_loop(kopf_loop)
with contextlib.closing(kopf_loop):
kopf.configure(verbose=True)
kopf_loop.run_until_complete(kopf.operator(stop_flag=stop_me)) # <<< for graceful termination
finally:
stop_me.set()
def api_thread(stop_me: threading.Event) -> None:
try:
api_loop = uvloop.new_event_loop()
asyncio.set_event_loop(api_loop)
# monitor the flag and stop it somehow. here, disgracefully.
with contextlib.closing(api_loop):
config = uvicorn.Config(app=webapp, loop=api_loop)
server = uvicorn.Server(config)
server_task = api_loop.create_task(server.serve())
waiter_task = api_loop.create_task(api_loop.run_in_executor(None, stop_me.wait))
done, pending = api_loop.run_until_complete(asyncio.wait({server_task, waiter_task}, return_when=asyncio.FIRST_COMPLETED))
finally:
stop_me.set()
for task in pending:
pending.cancel()
api_loop.run_until_complete(asyncio.wait(pending))
def main() -> None:
stop_me = threading.Event()
t_kopf = threading.Thread(target=kopf_thread, args=(stop_me,))
t_api = threading.Thread(target=api_thread, args=(stop_me,))
t_kopf.start()
t_api.start()
t_api.join()
t_kopf.join()
Regarding the health-checks in Kopf: they are not enabled unless you pass the liveness_endpoint=
option to operator()
. And in that case, you can select your own port (e.g. liveness_endpoint="http://0.0.0.0:1234"
). The same for the webhooks — they are not listening on any port unless you configured their server.