kopf icon indicating copy to clipboard operation
kopf copied to clipboard

How to embed kopf together with an asgi service?

Open guhcampos opened this issue 3 years ago • 1 comments

Question

Context

I have an operator that is managed by a non-declarative API that acts like a compatibility layer with a legacy, non kubernetes, execution model. Today the API and the operator are two separate services so I added a redis-based caching layer in between them so the API does not have to query k8s everytime it gets a request: I have kopf updating the cache whenever it handles a state change.

With the release of kopf's in-memory containers and indexing, this whole setup became unecessary, so I'm looking into merging the API with the operator, sacrificing a bit of single-responsibility in favor of having a much simpler and hopefully more efficient setup by having the API access kopf objects from memory. The API uses fastapi which is also async and based on ASGI.

Actual Question

From kopf's embedding documentation it seems like it's a good idea to have two different event loops running on separate threads, but kopf's execution model did not match any of the examples I could find of other people using fastapi together with async libraries - most of them wanted to do some kind of initialization in a fastapi hook such as webapp.on_event("startup").

So my question is: how should I integrate the two applications? Is it even a good idea?

Kinda Working POC

I actually got both events to start and run, and they seem to be running concurrent as expected, but I'm not 100% convinced this is the correct way, so I'm really looking for some validation. If this proves to be an useful use-case example I'd be happy to craft a working example and add it to kopf's documentation.

import asyncio
import contextlib
import threading

import kopf
import uvicorn
import uvloop

from myproject.api import webapp


def kopf_thread() -> None:
    kopf_loop = uvloop.new_event_loop()
    asyncio.set_event_loop(kopf_loop)

    with contextlib.closing(kopf_loop):
        kopf.configure(verbose=True)
        kopf_loop.run_until_complete(kopf.operator())


def api_thread() -> None:
    api_loop = uvloop.new_event_loop()
    asyncio.set_event_loop(api_loop)

    with contextlib.closing(api_loop):
        config = uvicorn.Config(app=webapp, loop=api_loop)
        server = uvicorn.Server(config)
        api_loop.run_until_complete(server.serve())


def main() -> None:
    t_kopf = threading.Thread(target=kopf_thread)
    t_api = threading.Thread(target=api_thread)

    t_kopf.start()
    t_api.start()

    t_api.join()
    t_kopf.join()

Notes

  • I'm explicitly using uvloop and uvicorn here but neither are a hard requirement
  • I understand this means my API won't support forked worker servers such as gunicorn but I'm happy to scale it by adding more pods, leveraging kopf's peering
  • I have no idea if any of this conflicts with kopf's builtin healthcheck endpoint

Checklist

  • [x] I have read the documentation and searched there for the problem
  • [x] I have searched in the GitHub Issues for similar questions

Keywords

api fastapi asgi starlette embedding

guhcampos avatar May 09 '21 01:05 guhcampos

Hello.

Thanks for sharing your experience with the framework and this feature! That is always highly valuable.

Regarding the integration: I cannot say what is considered a good way FastAPI (I don't know its mnemonics), but this example looks good enough from Kopf's point of view. You are welcome to add a section in the docs (e.g. "Recipes" / "FastAPI" in https://kopf.readthedocs.io/en/stable/embedding/) if it is short enough, or a public article if it is lengthy (and put a link to the docs).


There is one thing worth considering with more details (and is not covered by docs now): if one of the apps (Kopf in this example) exits by an error, the other one continues running, presumably forever. There should be a way to stop them both — e.g. by cancelling the other root task in another thread when a shared stop-flag is set. By the way, Kopf's operator() has stop_flag for graceful termination — maybe this can help here.

So, it would be something like this (I didn't test it, just imagined):

def kopf_thread(stop_me: threading.Event) -> None:
    try:
        kopf_loop = uvloop.new_event_loop()
        asyncio.set_event_loop(kopf_loop)

        with contextlib.closing(kopf_loop):
            kopf.configure(verbose=True)
            kopf_loop.run_until_complete(kopf.operator(stop_flag=stop_me))  # <<< for graceful termination 
    finally:
        stop_me.set()


def api_thread(stop_me: threading.Event) -> None:
    try:
        api_loop = uvloop.new_event_loop()
        asyncio.set_event_loop(api_loop)

        # monitor the flag and stop it somehow. here, disgracefully.
        with contextlib.closing(api_loop):
            config = uvicorn.Config(app=webapp, loop=api_loop)
            server = uvicorn.Server(config)
            server_task = api_loop.create_task(server.serve())
            waiter_task = api_loop.create_task(api_loop.run_in_executor(None, stop_me.wait))
            done, pending = api_loop.run_until_complete(asyncio.wait({server_task, waiter_task}, return_when=asyncio.FIRST_COMPLETED))
    finally:
        stop_me.set()

        for task in pending:
            pending.cancel()
        api_loop.run_until_complete(asyncio.wait(pending))
       

def main() -> None:
    stop_me = threading.Event()
    t_kopf = threading.Thread(target=kopf_thread, args=(stop_me,))
    t_api = threading.Thread(target=api_thread, args=(stop_me,))

    t_kopf.start()
    t_api.start()

    t_api.join()
    t_kopf.join()

Regarding the health-checks in Kopf: they are not enabled unless you pass the liveness_endpoint= option to operator(). And in that case, you can select your own port (e.g. liveness_endpoint="http://0.0.0.0:1234"). The same for the webhooks — they are not listening on any port unless you configured their server.

nolar avatar May 09 '21 08:05 nolar