feat: add `callback` api to `WebSocketProxy`
- bump
typing_extensionsfrom4.5.0to4.12.0 - explicitly add
anyioas a dependency - fix docs
hosttypo - fix broken
httpxauthentication docs link - refactor
tool_4_test_fixtureto factory to support testing different ws apps
Summary
Close: #40
example
proxy server
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI
from fastapi_proxy_lib.core.websocket import (
CallbackPipeContextType,
ReverseWebSocketProxy,
)
from httpx import AsyncClient
from starlette.websockets import WebSocket
proxy = ReverseWebSocketProxy(AsyncClient(), base_url="ws://echo.websocket.events/")
async def client_to_server_callback(pipe_context: CallbackPipeContextType[str]) -> None:
with pipe_context as (sender, receiver):
async for message in receiver:
print(f"Received from client: {message}")
await sender.send(f"CTS:{message}")
print("client_to_server_callback end")
async def server_to_client_callback(
pipe_context: CallbackPipeContextType[str],
) -> None:
with pipe_context as (sender, receiver):
async for message in receiver:
print(f"Received from server: {message}")
await sender.send("STC:{message}")
print("server_to_client_callback end")
@asynccontextmanager
async def close_proxy_event(_: FastAPI) -> AsyncIterator[None]:
"""Close proxy."""
yield
await proxy.aclose()
app = FastAPI(lifespan=close_proxy_event)
@app.websocket("/{path:path}")
async def _(websocket: WebSocket, path: str = ""):
return await proxy.proxy(
websocket=websocket,
path=path,
client_to_server_callback=client_to_server_callback,
server_to_client_callback=server_to_client_callback,
)
# Then run shell: `uvicorn <your_py>:app --host 127.0.0.1 --port 8000`
# visit the app: `ws://127.0.0.1:8000/`
# you can establish websocket connection with `ws://echo.websocket.events`
client
from httpx_ws import aconnect_ws
async with aconnect_ws('ws://127.0.0.1:8000/') as ws:
message = await ws.receive_text()
print(f"Received: {message}")
await ws.send_text('Hello, World!')
message = await ws.receive_text()
print(f"Received: {message}")
Checklist
- [ ] I've read
CONTRIBUTING.md. - [x] I understand that this PR may be closed in case there was no previous discussion. (This doesn't apply to typos!)
- [x] I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
- [x] I've updated the documentation accordingly.
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 97.13%. Comparing base (
977d9c1) to head (bda8b06). Report is 1 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #41 +/- ##
==========================================
+ Coverage 96.74% 97.13% +0.39%
==========================================
Files 9 9
Lines 461 524 +63
Branches 67 74 +7
==========================================
+ Hits 446 509 +63
Misses 9 9
Partials 6 6
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
CC @IvesAwadi
It's almost done. Would you like to give it a try?
pip install git+https://github.com/WSH032/fastapi-proxy-lib.git@feat/ws-callback
This will pull the latest fastapi-proxy-lib from this PR.
Before merging, there are a few things that need to be addressed:
- Add tests
- Write documentation
And, I'm a bit unsatisfied with the API; is client_to_server_callback too long?
CC @IvesAwadi
It's almost done. Would you like to give it a try?
pip install git+https://github.com/WSH032/fastapi-proxy-lib.git@feat/ws-callbackThis will pull the latest
fastapi-proxy-libfrom this PR.Before merging, there are a few things that need to be addressed:
- Add tests
- Write documentation
And, I'm a bit unsatisfied with the API; is
client_to_server_callbacktoo long?
It works perfectly, no issues on my end using Python 3.11.9. The client_to_server_callback I'd argue isn't long at all, the name make sense in this context.
I just found a bug. The current implementation only supports a strict one-receive-one-send mode within a single loop. If this pattern is violated, such as multiple receives and one send, one receive and multiple sends, or sending before receiving within a single loop, it will result in a deadlock.
async def callback(ctx: CallbackPipeContextType[str]) -> None:
with ctx as (sender, receiver):
# multiple receives and one send, dead lock!
await receiver.receive()
await receiver.receive()
await sender.send("foo")
async def callback(ctx: CallbackPipeContextType[str]) -> None:
with ctx as (sender, receiver):
# one receive and multiple sends, dead lock!
async for message in receiver:
await sender.send("foo")
await sender.send("bar")
async def callback(ctx: CallbackPipeContextType[str]) -> None:
with ctx as (sender, receiver):
# sending before receiving, dead lock!
await sender.send("foo")
async for message in receiver:
await sender.send(message)
Unfortunately, we can't resolve this issue until the underlying logic is rewritten using anyio and memory-object-streams.
There is already a PR for anyio: #34. However, this PR still has many issues, and I currently don't have time to merge it.
Edit:
I just created an issue to track this bug. #42
@IvesAwadi. This PR is completed. Do you think there are any areas that need improvement? If not, I will merge this PR tomorrow.
As a non-native English speaker, Iām unsure about the quality of the API documentation (including inline docstrings). I would greatly appreciate any suggestions for improvement.
If you want to preview the documentation in advance, you can follow the instructions in the CONTRIBUTING.