asgiref icon indicating copy to clipboard operation
asgiref copied to clipboard

Persistence of state from lifespan events to requests

Open adriangb opened this issue 2 years ago • 7 comments

ASGI lifespan events are designed to handle setup and subsequent teardown of resource like a database connection, but does not provide any persistence to store this state to requests.

Starlette implements an Application.state namespace (docs) which modifies the application object in place. Quart suggest to store data in the app's namespace directly. This is not ideal because it gives an otherwise stateless thing state, and there is also no correlation between this state and the event loop / ASGI state, which can lead to inconsistent state. Here's an artificial but not absurd example of how this could lead to a confusing user experience:

import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator

from starlette.applications import Starlette
from starlette.testclient import TestClient
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Route


@dataclass
class Connection:
    loop: asyncio.AbstractEventLoop

    async def run(self) -> None:
        if asyncio.get_event_loop() is not self.loop:
            raise Exception("Some obtuse error")


@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
    app.state.db = Connection(asyncio.get_event_loop())
    yield


async def endpoint(request: Request) -> Response:
    await request.app.state.db.run()
    return Response()


app = Starlette(
    routes=[Route("/", endpoint)],
    lifespan=lifespan,
)


def some_test_using_lifespans() -> None:
    with TestClient(app):
        pass


def some_test_where_the_user_forgets_to_use_the_lifespan() -> None:
    TestClient(app).get("/")


if __name__ == "__main__":
    some_test_using_lifespans()
    some_test_where_the_user_forgets_to_use_the_lifespan()

Here an honest mistake ends up leaking state between tests, and maybe giving the user an obtuse error about event loops and such.

I think it would be beneficial if the spec provided a namespace scoped to each ASGI lifespan / request. This namespace would basically be an empty dict that gets persisted from the lifespan event into each request. I think it would make sense to model this like contextvar propagation into tasks: every request gets a copy of the namespace from the lifespan (unless there was no lifespan, in which it's a brand new dict or maybe None). Then state can be naturally stored in this namespace. Since the ASGI server manages this namespace (and it already manages it's own state because of lifespans), the application can be stateless and neither the framework nor the users have to worry about clearing state at the end of a lifespan or anything like.

This can easily be backwards compatible: it's a new key in the scope and we bump the minor version of the ASGI version so that frameworks can check if this is supported or not.

adriangb avatar Apr 26 '22 04:04 adriangb

Here's a somewhat minimal runnable example:

import asyncio
import json
from typing import Any
from asgiref.typing import (
    Scope,
    ASGIReceiveCallable,
    ASGISendCallable,
    ASGISendEvent,
    ASGIReceiveEvent,
    HTTPResponseStartEvent,
    HTTPResponseBodyEvent,
    LifespanScope,
    LifespanStartupCompleteEvent,
    LifespanStartupEvent,
    HTTPRequestEvent,
)


DB = list[Any]


async def lifespan(
    scope: LifespanScope,
    receive: ASGIReceiveCallable,
    send: ASGISendCallable,
) -> None:
    event = await receive()
    assert event["type"] == "lifespan.startup"
    scope["state"]["db"] = DB()  # type: ignore
    await send(LifespanStartupCompleteEvent(type="lifespan.startup.complete"))


async def app(
    scope: Scope,
    receive: ASGIReceiveCallable,
    send: ASGISendCallable,
) -> None:
    if scope["type"] == "lifespan":
        await lifespan(scope, receive, send)
        return
    assert scope["type"] == "http"
    event = await receive()
    assert event["type"] == "http.request"
    body = event["body"]
    db: DB = scope["state"]["db"]  # type: ignore
    db.append(body.decode())
    await send(
        HTTPResponseStartEvent(
            type="http.response.start",
            status=200,
            headers=[],
        )
    )
    await send(
        HTTPResponseBodyEvent(
            type="http.response.body",
            body=json.dumps(db).encode(),
            more_body=False,
        )
    )

async def ignored_send(event: ASGISendEvent) -> None:
    if event["type"] == "http.response.body":
        print(json.loads(event["body"]))


async def lifespan_rcv() -> ASGIReceiveEvent:
    return LifespanStartupEvent(type="lifespan.startup")


async def http_rcv() -> ASGIReceiveEvent:
    return HTTPRequestEvent(type="http.request", body=b'{"foo": "bar"}', more_body=False)


async def run_server() -> None:
    # create state
    state = {}
    # send lifespan
    await app(
        {"type": "lifespan", "state": state},
        receive=lifespan_rcv,
        send=ignored_send,
    )
    # send request
    await app(
        {"type": "http", "state": state.copy()},
        receive=http_rcv,
        send=ignored_send,
    )
    # send request
    await app(
        {"type": "http", "state": state.copy()},
        receive=http_rcv,
        send=ignored_send,
    )


if __name__ == "__main__":
    asyncio.run(run_server())

adriangb avatar Apr 27 '22 22:04 adriangb

So to get this clear, you're requesting that there's something that you can write into from the startup event that then gets directly copied into every subsequent scope within the same... process? thread? coroutine? Lifespan is already a little unclear when it comes to thread vs. process.

andrewgodwin avatar Apr 28 '22 05:04 andrewgodwin

you're requesting that there's something that you can write into from the startup event that then gets directly copied into every subsequent scope

Yep

Lifespan is already a little unclear when it comes to thread vs. process.

I wasn't necessarily thinking of threads, coroutines or processes. At a high level, a lifespan "wraps" a group of requests that are processed while that lifespan is active. How those requests are executed shouldn't really matter. In practice I believe every ASGI server executes requests and lifespans in the same event loop, so using coroutines, not processes or threads. Things like database connections are not thread or process safe so lifespans in general would be kinda useless if they used a different process.

adriangb avatar Apr 28 '22 05:04 adriangb

One use case for this is the discussions in #200.

Let's say you want to have a TaskGroup (anyio or stdlib) to put background tasks into. Storing it in the application state is a bit dangerous for the reasons mentioned above but also because it assumes that every request/response is going to be running in the same event loop. Like you say, there is no specification that an ASGI server runs in a single thread with a single event look creating tasks for every request. What if the server creates multiple threads each with their own event loop and dispatches between them?

If we were allowed to do scope["state"]["tg"] = TaskGroup() within the lifespan event then every request could access scope["state"]["tg"] and the server can manage event loops as it pleases (it only has to guarantee that lifespans and requests run in the same event loop, which IMO is a reasonable guarantee and should be part of the spec if it's not).

adriangb avatar Jun 26 '22 18:06 adriangb

Here's another distinct scenario where the global mutable state on the application causes issues:

import asyncio
import uvicorn
from starlette.applications import Starlette

app = Starlette()
app.state.foo = None

async def startup():
    assert app.state.foo is None, "ha!"
    app.state.foo = 1

app.on_event("startup")(startup)

async def run(port):
    cfg = uvicorn.Config(app, port=port)
    await uvicorn.Server(cfg).serve()

async def main():
    await asyncio.gather(
        asyncio.create_task(run(8000)),
        asyncio.create_task(run(8001)),
    )

asyncio.run(main())

Currently both of these servers would use the same (global) state and thus step on each others toes. The same thing would happen if you mount an app instance / route under two different endpoints (e.g. using starlette.routing.Mount).

adriangb avatar Jul 15 '22 06:07 adriangb

One alternative to this proposal is context variables. aiohttp makes strong promises about them: https://docs.aiohttp.org/en/stable/web_advanced.html#contextvars-support. If the ASGI spec was willing to make such strong promises to framework (which yes may limit frameworks in some ways) that would sorta resolve this issue.

adriangb avatar Jul 22 '22 00:07 adriangb

Here's a concrete example of an implementation: https://github.com/encode/uvicorn/pull/1582

adriangb avatar Jul 26 '22 04:07 adriangb

@andrewgodwin I think the main pushback here was the relationship between lifespans and event loops/threads/processes. Now that we've resolved that in #338, do you think we can continue this discussion?

adriangb avatar Nov 09 '22 07:11 adriangb

I think the next best step might be drafting what the spec change looks like in a PR?

andrewgodwin avatar Nov 10 '22 04:11 andrewgodwin

Sounds good to me!

adriangb avatar Nov 10 '22 05:11 adriangb

@andrewgodwin would you mind taking a look at #354? Thanks!

adriangb avatar Dec 15 '22 19:12 adriangb

I've looked at it already once, I just haven't been in the right headspace to sit down and think about it as a spec - might take a few weeks or so!

andrewgodwin avatar Dec 16 '22 23:12 andrewgodwin