djapy
djapy copied to clipboard
Implement StreamingHttpResponse for Server Sent Events (EventSource) or ReadableStream
As discussed in an off-topic discussion in issue #32, djapify and async_djapify can be implemented to work with streams, with minor tweaks to ResponseParser, AsyncResponseParser, SyncDjapifyDecorator and AsyncDjapifyDecorator. It can be as easy as having the view function simply yield results, making it a generator, and return None when it is complete.
Do note that, while StreamingHttpResponse supports synchronous operations, performing long-running streams will lock up the process, and most WSGI implementations will timeout and cut off the stream after 30 seconds.
Here's the updated parse_data function from ResponseParse, along with additional functions _parse_data as a helper function to eliminate repeated code, is_sse to determine if the input schema calls for a Server Sent Events output format and format_sse_response to format the output if so.
class ResponseParser(BaseParser):
# Untouched functions omitted for clarity.
def _parse_data(self, data: Any) -> Dict[str, Any]:
"""Helper function to parse and validate response data."""
if isinstance(data, BaseModel): # Direct return if Pydantic model
return data.model_dump(mode="json", by_alias=True)
model = self._create_model()
validated = model.model_validate({JSON_OUTPUT_PARSE_NAME: data}, context={**self._context, "input_data": self.input_data})
return validated.model_dump(mode="json", by_alias=True)[JSON_OUTPUT_PARSE_NAME]
def parse_data(self) -> Dict[str, Any] | Generator:
"""Parse and validate response data."""
if not inspect.isgenerator(self.data):
# Standard HTTP response
return self._parse_data(self.data)
else:
# Streaming HTTP response: wrap the generator and pass it along
def wrapper():
for partial_data in self.data:
# Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]
yield self._parse_data(partial_data_content)
return wrapper()
def is_sse(self, response: Dict[str, Any]) -> bool:
"""Examine parsed schema to determine if Server Sent Events format is desired in place of Readable Stream format."""
# Check for disallowed keys
allowed_keys = {"data", "id", "event", "retry"}
if any(key not in allowed_keys for key in response):
return False
# Check if "data" is a dictionary
if "data" not in response or not isinstance(response["data"], dict):
return False
# Check if "id" is a string, an integer, or None
if "id" in response and not (isinstance(response["id"], (str, int)) or response["id"] is None):
return False
# Check if "event" is a string or None
if "event" in response and not (isinstance(response["event"], str) or response["event"] is None):
return False
# Check if "retry" is an integer or None
if "retry" in response and not (isinstance(response["retry"], int) or response["retry"] is None):
return False
return True
def format_sse_response(self, response: Dict[str, Any]) -> str:
"""If Server Sent Events format is desired, process the data into the appropriate format."""
response_string = ""
if "id" in response and response["id"] is not None:
response_string += f"id: {response["id"]}\n"
if "event" in response and response["event"] is not None:
response_string += f"event: {response["event"]}\n"
if "retry" in response and response["retry"] is not None:
response_string += f"retry: {response["retry"]}\n"
return response_string + f"data: {json.dumps(response["data"], cls=DjangoJSONEncoder)}\n\n"
AsyncResponseParser would then need to have parse_data updated to account for asynchronous views:
class AsyncResponseParser(ResponseParser):
async def parse_data(self) -> Dict[str, Any] | AsyncGenerator:
"""Async version of response data parsing."""
if not inspect.isasyncgen(self.data):
# Standard HTTP response
return await sync_to_async(self._parse_data)(self.data)
else:
# Streaming HTTP response: wrap the generator and pass it along
_iterator = self.data # Save a local copy in case it is re-assigned
async def awrapper():
async for partial_data in _iterator:
# Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]
yield await sync_to_async(self._parse_data)(partial_data_content)
return awrapper()
With the updated parsers, the decorators can now work with the parsed data:
class SyncDjapifyDecorator(BaseDjapifyDecorator):
def __call__(self, view_func: WrappedViewT = None):
if view_func is None:
return lambda v: self.__call__(v)
@wraps(view_func)
def wrapped_view(request: HttpRequest, *args, **kwargs):
self._prepare(view_func)
if msg := self.check_access(request, view_func, *args, **kwargs):
return msg
try:
if not inspect.isgeneratorfunction(view_func):
# Standard HTTP response
response = HttpResponse(content_type="application/json")
else:
# Streaming HTTP response
response = StreamingHttpResponse(content_type="application/stream+json")
# Use sync request parser
req_p = RequestParser(request, view_func, kwargs)
data = req_p.parse_data()
if view_func.djapy_resp_param:
data[view_func.djapy_resp_param.name] = response
content = view_func(request, *args, **data)
# Use sync response parser
res_p = ResponseParser(
request=request,
status=200 if not isinstance(content, tuple) else content[0],
data=content if not isinstance(content, tuple) else content[1],
schemas=view_func.schema,
input_data=data,
)
result = res_p.parse_data()
if not inspect.isgenerator(result):
# Standard HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
response.content = json.dumps(result, cls=DjangoJSONEncoder)
else:
# Streaming HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
# Wrap the generator and pass it along
def wrapper():
for partial_result in result:
if res_p.is_sse(partial_result):
# Server Sent Events response
response.content_type = "text/event-stream"
yield res_p.format_sse_response(partial_result)
else:
# Readable Stream response
yield json.dumps(partial_result, cls=DjangoJSONEncoder)
response.streaming_content = wrapper()
return response
except Exception as exc:
return self.handle_error(request, exc)
self._set_common_attributes(wrapped_view, view_func)
return wrapped_view
class AsyncDjapifyDecorator(BaseDjapifyDecorator):
def __call__(self, view_func: WrappedViewT = None):
if view_func is None:
return lambda v: self.__call__(v)
if not (asyncio.iscoroutinefunction(view_func) or inspect.isasyncgenfunction):
raise ValueError(f"View function {view_func.__name__} must be async")
@wraps(view_func)
async def wrapped_view(request: HttpRequest, *args, **kwargs):
self._prepare(view_func)
if msg := await sync_to_async(self.check_access)(request, view_func, *args, **kwargs):
return msg
try:
if not inspect.isasyncgenfunction(view_func):
# Standard HTTP response
response = HttpResponse(content_type="application/json")
else:
# Streaming HTTP response
response = StreamingHttpResponse(content_type="application/stream+json")
# Use async request parser
parser = AsyncRequestParser(request, view_func, kwargs)
data = await parser.parse_data()
if view_func.djapy_resp_param:
data[view_func.djapy_resp_param.name] = response
if not inspect.isasyncgenfunction(view_func):
# Standard HTTP response: obtain the function's response asynchronously
content = await view_func(request, *args, **data)
else:
# Streaming HTTP response: obtain the generator synchronously
content = view_func(request, *args, **data)
# Use async response parser
parser = AsyncResponseParser(
request=request,
status=200 if not isinstance(content, tuple) else content[0],
data=content if not isinstance(content, tuple) else content[1],
schemas=view_func.schema,
input_data=data,
)
result = await parser.parse_data()
if not inspect.isasyncgen(result):
# Standard HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
response.content = json.dumps(result, cls=DjangoJSONEncoder)
else:
# Streaming HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
# Wrap the generator and pass it along
async def awrapper():
async for partial_result in result:
if await sync_to_async(parser.is_sse)(partial_result):
# Server Sent Events response
response.content_type = "text/event-stream"
yield parser.format_sse_response(partial_result)
else:
# Readable Stream response
yield json.dumps(partial_result, cls=DjangoJSONEncoder)
response.streaming_content = awrapper()
return response
except Exception as exc:
return await sync_to_async(self.handle_error)(request, exc)
self._set_common_attributes(wrapped_view, view_func)
return wrapped_view
Now, the user can implement their views as follows:
class IteratorSchema(Schema):
test_data: str
@async_djapify
async def receive_updates_async(request) -> {200: IteratorSchema}:
for i in range(10):
yield 200, IteratorSchema(test_data=f"test string {i}.")
await asyncio.sleep(1)
return
@djapify
def receive_updates_sync(request) -> {200: IteratorSchema}:
for i in range(10):
yield 200, IteratorSchema(test_data=f"test string {i}.")
time.sleep(1)
return
class ServerSentEvents(Schema):
# If the user wants to implement EventSource with the Server Sent Events formatting, note that only the data field
# is required, and no additional fields can be included.
id: str | int | None
event: str | None
data: IteratorSchema
retry: int | None
@async_djapify
async def receive_updates_sse_async(request) -> {200: ServerSentEvents}:
for i in range(10):
yield 200, ServerSentEvents(
data=IteratorSchema(test_data=f"test string {i}."),
id=i,
event=f"Event #{i}",
retry=None,
)
await asyncio.sleep(1)
return
@djapify
def receive_updates_sse_sync(request) -> {200: ServerSentEvents}:
for i in range(10):
yield 200, ServerSentEvents(
data=IteratorSchema(test_data=f"test string {i}."),
id=i,
event=f"Event #{i}",
retry=None,
)
time.sleep(1)
return
I've tested this code with swagger, and everything works, except that it only outputs the yielded data once return has been called, and it doesn't know how to process the data so it outputs it in raw format. This is a limitation with swagger, and there has been an issue opened since 2018 that has since been ignored.
I haven't tested this with yielding ORM Model or Queryset objects yet, but I suspect it would work since the underlying data parsing logic hasn't changed.
I'm also not familiar with the implication of data[view_func.djapy_resp_param.name] = response and if it matters that the content_type is changed afterward for Server Sent Events.
I just tested this with EventSource, and it aborts due to the content_type being "application/stream+json". Upon review, I realize that the line response.content_type = "text/event-stream" does nothing, since the generator cannot change the content type of the StreamingHttpResponse.
In this case, the parser's is_sse function is unnecessary and can be replaced with if response.content_type == "text/event-stream": and the determination of whether to use SSE or not needs to be done when initializing StreamingHttpResponse. This would also alleviate my last point in the original post.
I suppose that the only way to make this determination would be to inspect the type hint of the decorated function to determine if it meets the schema necessary to implement Server Sent Events with content_type as "text/event-stream".
How can this be achieved?
EDIT 20 minutes later: It seems I can answer my own question. This can be achieved using typing's get_type_hints method.
Since format_sse_response and is_sse is not tied to the parser models, I can pull them out of the class. I will return with updated code.
Okay, here is the latest code.
djapy/core/sse.py
import json
from typing import Any, AsyncGenerator, Callable, Dict, Generator, get_args, get_origin, get_type_hints
from django.core.serializers.json import DjangoJSONEncoder
from pydantic import BaseModel
def _is_sse(schema: BaseModel) -> bool:
"""Examine schema to determine if Server Sent Events format is desired in place of Readable Stream format."""
schema_fields = schema.__fields__
# Check to see if there are any invalid fields
allowed_keys = {"data", "id", "event", "retry"}
if any(key not in allowed_keys for key in schema_fields):
return False
# Ensure there is a 'data' field
if "data" not in schema_fields:
return False
# Check to see if the schema defines the 'data' field to be either a dict, a Pydantic BaseModel schema or a
# subclass of a Pydantic BaseModel schema, or a union of either
data_type = schema_fields["data"].annotation
origin_type = get_origin(data_type)
args = get_args(data_type) if origin_type else (data_type,)
if not any(issubclass(arg, BaseModel) for arg in args if isinstance(arg, type)) and dict not in args:
return False
# Check to see if the schema defines the 'id' field to be any combination of int, str, or None
if "id" in schema_fields and schema_fields["id"].annotation not in (int, str, int | None, str | None, int | str | None):
return False
# Check to see if the schema defines the 'event' field to be any combination of str, or None
if "event" in schema_fields and schema_fields["event"].annotation not in (str, str | None):
return False
# Check to see if the schema defines the 'retry' field to be any combination of int, or None
if "retry" in schema_fields and schema_fields["retry"].annotation not in (int, int | None):
return False
return True
def get_content_type(generator_func: Callable[..., Generator | AsyncGenerator]) -> str:
"""Determine the content type based on the schema type hints."""
type_hints = get_type_hints(generator_func)
if not type_hints:
return "application/stream+json"
return_hint = type_hints.get('return', None)
# If the return hint is a dictionary, check each schema in the dictionary
if isinstance(return_hint, dict):
for schema in return_hint.values():
if isinstance(schema, type) and issubclass(schema, BaseModel) and _is_sse(schema):
return "text/event-stream"
# If the return hint is a single schema, check it directly
elif isinstance(return_hint, type) and issubclass(return_hint, BaseModel) and _is_sse(return_hint):
return "text/event-stream"
return "application/stream+json"
def format_sse_response(response: Dict[str, Any]) -> str:
"""If Server Sent Events format is desired, process the data into the appropriate format."""
response_string = ""
if "id" in response and response["id"] is not None:
response_string += f"id: {response["id"]}\n"
if "event" in response and response["event"] is not None:
response_string += f"event: {response["event"]}\n"
if "retry" in response and response["retry"] is not None:
response_string += f"retry: {response["retry"]}\n"
return response_string + f"data: {json.dumps(response["data"], cls=DjangoJSONEncoder)}\n\n"
djapy/core/parser.py
import inspect
from typing import AsyncGenerator, Generator
from pydantic import BaseModel
# Other imports and classes omitted for clarity
class ResponseParser(BaseParser):
# __init__ and _create_model omitted for clarity
def _parse_data(self, data: Any) -> Dict[str, Any]:
"""Helper function to parse and validate response data."""
if isinstance(data, BaseModel): # Direct return if Pydantic model
return data.model_dump(mode="json", by_alias=True)
model = self._create_model()
validated = model.model_validate({JSON_OUTPUT_PARSE_NAME: data}, context={**self._context, "input_data": self.input_data})
return validated.model_dump(mode="json", by_alias=True)[JSON_OUTPUT_PARSE_NAME]
def parse_data(self) -> Dict[str, Any] | Generator:
"""Parse and validate response data."""
if not inspect.isgenerator(self.data):
# Standard HTTP response
return self._parse_data(self.data)
else:
# Streaming HTTP response: wrap the generator and pass it along
def wrapper():
for partial_data in self.data:
# Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]
yield self._parse_data(partial_data_content)
return wrapper()
class AsyncResponseParser(ResponseParser):
async def parse_data(self) -> Dict[str, Any] | AsyncGenerator:
"""Async version of response data parsing."""
if not inspect.isasyncgen(self.data):
# Standard HTTP response
return await sync_to_async(self._parse_data)(self.data)
else:
# Streaming HTTP response: wrap the generator and pass it along
_iterator = self.data # Save a local copy in case it is re-assigned
async def awrapper():
async for partial_data in _iterator:
# Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]
yield await sync_to_async(self._parse_data)(partial_data_content)
return awrapper()
djapy/core/dec/sync_dec.py
import inspect
import json
from functools import wraps
from django.core.serializers.json import DjangoJSONEncoder
from django.http import HttpRequest, HttpResponse, StreamingHttpResponse
from .base_dec import BaseDjapifyDecorator
from ..parser import RequestParser, ResponseParser
from ..view_func import WrappedViewT
from ..sse import format_sse_response, get_content_type
class SyncDjapifyDecorator(BaseDjapifyDecorator):
def __call__(self, view_func: WrappedViewT = None):
if view_func is None:
return lambda v: self.__call__(v)
@wraps(view_func)
def wrapped_view(request: HttpRequest, *args, **kwargs):
self._prepare(view_func)
if msg := self.check_access(request, view_func, *args, **kwargs):
return msg
try:
if not inspect.isgeneratorfunction(view_func):
# Standard HTTP response
response = HttpResponse(content_type="application/json")
else:
# Streaming HTTP response
response = StreamingHttpResponse(content_type=get_content_type(view_func))
# Use sync request parser
req_p = RequestParser(request, view_func, kwargs)
data = req_p.parse_data()
if view_func.djapy_resp_param:
data[view_func.djapy_resp_param.name] = response
content = view_func(request, *args, **data)
# Use sync response parser
res_p = ResponseParser(
request=request,
status=200 if not isinstance(content, tuple) else content[0],
data=content if not isinstance(content, tuple) else content[1],
schemas=view_func.schema,
input_data=data,
)
result = res_p.parse_data()
if not inspect.isgenerator(result):
# Standard HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
response.content = json.dumps(result, cls=DjangoJSONEncoder)
else:
# Streaming HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
# Wrap the generator and pass it along
def wrapper():
for partial_result in result:
if response.headers["Content-Type"] == "text/event-stream":
# Server Sent Events response
yield format_sse_response(partial_result)
else:
# Readable Stream response
yield json.dumps(partial_result, cls=DjangoJSONEncoder)
response.streaming_content = wrapper()
return response
except Exception as exc:
return self.handle_error(request, exc)
self._set_common_attributes(wrapped_view, view_func)
return wrapped_view
djapy/core/dec/async_dec.py
import inspect
import json
import asyncio
from functools import wraps
from asgiref.sync import sync_to_async
from django.core.serializers.json import DjangoJSONEncoder
from django.http import HttpRequest, HttpResponse, StreamingHttpResponse
from .base_dec import BaseDjapifyDecorator
from ..parser import AsyncRequestParser, AsyncResponseParser
from ..view_func import WrappedViewT
from ..sse import format_sse_response, get_content_type
class AsyncDjapifyDecorator(BaseDjapifyDecorator):
def __call__(self, view_func: WrappedViewT = None):
if view_func is None:
return lambda v: self.__call__(v)
if not (asyncio.iscoroutinefunction(view_func) or inspect.isasyncgenfunction):
raise ValueError(f"View function {view_func.__name__} must be async")
@wraps(view_func)
async def wrapped_view(request: HttpRequest, *args, **kwargs):
self._prepare(view_func)
if msg := await sync_to_async(self.check_access)(request, view_func, *args, **kwargs):
return msg
try:
if not inspect.isasyncgenfunction(view_func):
# Standard HTTP response
response = HttpResponse(content_type="application/json")
else:
# Streaming HTTP response
response = StreamingHttpResponse(content_type=await sync_to_async(get_content_type)(view_func))
# Use async request parser
parser = AsyncRequestParser(request, view_func, kwargs)
data = await parser.parse_data()
if view_func.djapy_resp_param:
data[view_func.djapy_resp_param.name] = response
if not inspect.isasyncgenfunction(view_func):
# Standard HTTP response: obtain the function's response asynchronously
content = await view_func(request, *args, **data)
else:
# Streaming HTTP response: obtain the generator synchronously
content = view_func(request, *args, **data)
# Use async response parser
parser = AsyncResponseParser(
request=request,
status=200 if not isinstance(content, tuple) else content[0],
data=content if not isinstance(content, tuple) else content[1],
schemas=view_func.schema,
input_data=data,
)
result = await parser.parse_data()
if not inspect.isasyncgen(result):
# Standard HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
response.content = json.dumps(result, cls=DjangoJSONEncoder)
else:
# Streaming HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
# Wrap the generator and pass it along
async def awrapper():
async for partial_result in result:
if response.headers["Content-Type"] == "text/event-stream":
# Server Sent Events response
yield format_sse_response(partial_result)
else:
# Readable Stream response
yield json.dumps(partial_result, cls=DjangoJSONEncoder)
response.streaming_content = awrapper()
return response
except Exception as exc:
return await sync_to_async(self.handle_error)(request, exc)
self._set_common_attributes(wrapped_view, view_func)
return wrapped_view
Usage:
- Asynchronous and synchronous Readable Stream format with content type of "application/stream+json":
import asyncio
import time
from djapy import async_djapify, djapify, Schema
from pydantic import BaseModel
class IteratorSchema(Schema):
test_data: str
@async_djapify
async def receive_updates_async(request) -> {200: IteratorSchema}:
for i in range(10):
yield 200, IteratorSchema(test_data=f"test string {i}.")
await asyncio.sleep(1)
@djapify
def receive_updates_sync(request) -> {200: IteratorSchema}:
for i in range(10):
yield 200, IteratorSchema(test_data=f"test string {i}.")
time.sleep(1)
- Asynchronous and synchronous Server Sent Events format with content type of "text/event-stream":
import asyncio
import time
from djapy import async_djapify, djapify, Schema
from pydantic import BaseModel
class IteratorSchema(Schema):
test_data: str
class ServerSentEvents(Schema):
# If the user wants to implement EventSource with the Server Sent Events formatting, note that only the data field
# is required, and no additional fields can be included.
id: str | int | None
event: str | None
data: dict | BaseModel | IteratorSchema
retry: int | None
@async_djapify
async def receive_updates_sse_async(request) -> {200: ServerSentEvents}:
for i in range(10):
yield 200, ServerSentEvents(
data=IteratorSchema(test_data=f"test string {i}."),
id=i,
event=f"Event #{i}",
retry=5000,
)
await asyncio.sleep(1)
yield ServerSentEvents(data={"message": "streaming ended"}, id=None, event=None, retry=None)
@djapify
def receive_updates_sse_sync(request) -> {200: ServerSentEvents}:
for i in range(10):
yield 200, ServerSentEvents(
data=IteratorSchema(test_data=f"test string {i}."),
id=i,
event=f"Event #{i}",
retry=5000,
)
time.sleep(1)
yield ServerSentEvents(data={"message": "streaming ended"}, id=None, event=None, retry=None)
Note that the ServerSentEvents can contain any of the listed fields or omit them, and the type hints can be any combination of those listed, where IteratorSchema is a subclass of BaseModel (for which djapy's Schema is).
I've tested this in swagger, and it all comes out just fine, except the stream needs to complete before showing any data, and it doesn't know how to process Readable Streams, so it just displays the raw data.
To test the Server Sent Events, one can run this code in the browser's developer console:
let source = new EventSource("http://localhost:8000/notifications/updates/");
source.onopen = function(event) {
console.log("Connection to server opened.");
};
source.onmessage = function(event) {
console.log("Received message:", event.data);
if (event.data.includes("streaming ended")) {
source.close(); // Close the connection when the stream ends
console.log("Stream ended, connection closed.");
}
};
source.onerror = function(event) {
console.error("EventSource error:", event);
if (event.readyState === EventSource.CLOSED) {
console.log("Connection was closed.");
}
};
It would be prudent to implement some error handling, but this works for my use case at the moment.
Hi @philrdubois! Thank you for this detailed analysis of implementing streaming responses in djapy. Your approach shows careful consideration of the technical challenges.
I particularly like your suggestions about:
- Supporting both SSE and JSON streaming formats
- Having generator functions yield status codes with data
- Keeping ResponseParser flexible for sync/async cases
Would you be interested in creating a PR? I'd suggest focusing on:
- Core streaming decorator implementation
- Tests for both SSE and JSON streaming
- Basic documentation with usage examples
- Type hints and docstrings
I think extracting common validation logic and adding specific streaming-related error types would help keep the code clean and maintainable.
Let me know if you'd like to collaborate on this - it would be a great addition to djapy!
Hi @Bishwas-py, as much as I enjoyed working on this and am glad to contribute, not being a developer myself, I have zero knowledge or experience regarding git, pull requests, or even tests for that matter. I can expand on the usage examples, provide type hints and docstrings, but that's the extent to what I can do. So I'm a bit lost as to where to begin.
As for your last suggestion, it sounds good to me -- I'm not too fond of having the validation inside the generator function either but couldn't think of a way to improve it at the time. Do you have any suggestions? I also haven't completely considered all error types.