taipy icon indicating copy to clipboard operation
taipy copied to clipboard

[QUESTION] Implementation of a console which shows work progress from a FastAPI backend via websocket - slow

Open dkersh opened this issue 1 year ago • 3 comments

What would you like to share or ask?

Hello, i asked this question on discord and was recommended to post it here. I have a working solution but the UI updates very slowly so any recommendations on how to improve this implementation would be greatly appreciated. First, the two code files:

Taipy Frontend

import threading

import requests
from taipy import Gui
from taipy.gui import Markdown
from websocket import WebSocket

console = "Websocket console implementation"

home_page_md = Markdown(
    """
# Taipy Websockets via FastAPI
<|ping do_work backend|button|on_action={ping_endpoint}|>
<h1>Console</h1>
<|{console}|text|mode=md|class_name=console|>

""",
    style={
        ".taipy-text.console": {
            "color": "black",
            "background-color": "white",
            "display": "block",
            "word-wrap": "break-word",
            "white-space": "pre-line",
        }
    },
)


def websocket_client(app):
    global console
    ws = WebSocket()
    ws.connect("ws://127.0.0.1:8000/ws")
    while True:
        message = ws.recv()
        console += "\n" + message
        app.broadcast_change("console", console)
        print("Received:", message)


def ping_endpoint(state):
    response = requests.get(url="http://127.0.0.1:8000/work_endpoint")
    print(response.content)


app = Gui(page=home_page_md)

thread = threading.Thread(target=websocket_client, args=[app])
thread.start()
app.run(use_reloader=False, port="auto")

FastAPI backend

import time

from broadcaster import Broadcast
from fastapi import FastAPI, WebSocket

app = FastAPI()
broadcast = Broadcast("memory://")


@app.on_event("startup")
async def startup():
    await broadcast.connect()


@app.on_event("shutdown")
async def shutdown():
    await broadcast.disconnect()


@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()
    async with broadcast.subscribe(channel="notifications") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)


@app.get("/work_endpoint")
async def do_work():
    print("starting work")
    for i in range(10):
        await broadcast.publish(channel="notifications", message=f"performing iteration {i}/10")
        print(i)
        time.sleep(1)
    return {"response": "ping received"}

Explanation and Problem

I'm trying to have a Taipy text element update in real-time as my fastapi backend does work (be it processing, data acquisition etc); I'm using taipy almost exclusively as a package for creating a pretty frontend (and it's great for that).

The solution I've given works, but it's very slow. I can monitor the progress of the work_endpoint in the terminal, and the console text element on the Taipy frontend does reflect the progress (eventually), but it updates very slowly (a few seconds) and definitely doesn't give the effect of "real-time".

I used the example in here to work out how to update the console text element but I'm wondering if that's a bit slow.

Any suggestions on how to improve this implementation would be much appreciated - I've been stuck on this for a couple of nights now.

[edit] I could potentially take advantage of some of the callbacks listed here so I'll have to look at that some more.

Many thanks

Code of Conduct

  • [X] I have checked the existing issues.
  • [X] I am willing to work on this issue (optional)

dkersh avatar Oct 31 '24 15:10 dkersh

Thank you, @dkersh, for this very well-described issue. @FabienLelaquais is the right person to help you with that.

jrobinAV avatar Nov 08 '24 14:11 jrobinAV

@FlorianJacta Can you help as well?

jrobinAV avatar Dec 16 '24 11:12 jrobinAV

Here is an update code with 4.0 and websockets. However, I did not succeed; I don't have the real time update of the console. Taipy waits for all the messages to send to the app:

Taipy frontend:

import asyncio
import threading
import requests
from taipy import Gui
from taipy.gui import Markdown, get_state_id, invoke_callback
import websockets
from threading import Thread


# Shared console variable and lock for thread safety
console = "WebSocket console implementation"
console_lock = threading.Lock()

# Taipy Markdown page
home_page_md = Markdown(
    """
# Taipy Websockets via FastAPI
<|Ping Backend|button|on_action={ping_endpoint}|>
<h1>Console</h1>
<|{console}|text|mode=md|class_name=console|>
""",
    style={
        ".taipy-text.console": {
            "color": "black",
            "background-color": "white",
            "display": "block",
            "word-wrap": "break-word",
            "white-space": "pre-line",
        }
    },
)


def change_console(state, console):
    print("Changing console... with  ", console)
    state.console = console
    print("Changed console:", state.console)


def async_callback_in_thread(gui, state_id, console):
    print(state_id, console)
    invoke_callback(gui, state_id, change_console, [console])


def async_callback(state, console):
    thread = Thread(
        target=async_callback_in_thread,
        args=[state.get_gui(), get_state_id(state), console],
    )
    thread.start()


# WebSocket client function (asynchronous)
async def websocket_client(app: Gui):
    global console
    try:
        async with websockets.connect("ws://127.0.0.1:8080/ws") as websocket:
            print("Connected to WebSocket server.")
            while True:
                message = await websocket.recv()
                with console_lock:
                    console += "\n" + message
                app.broadcast_callback(async_callback, [console])
                print("Received:", message)
    except Exception as e:
        print("WebSocket client error:", e)


# Wrapper for running the async WebSocket client in a thread
def run_websocket_client(app):
    asyncio.run(websocket_client(app))


# Endpoint ping function
def ping_endpoint(state):
    try:
        response = requests.get(url="http://127.0.0.1:8080/work_endpoint")
        print("Backend Response:", response.content.decode())
    except Exception as e:
        print("Error pinging backend:", e)


# Initialize Taipy GUI
app = Gui(page=home_page_md)

# Start WebSocket client in a separate thread
thread = threading.Thread(target=run_websocket_client, args=[app], daemon=True)
thread.start()

# Run Taipy GUI
app.run(use_reloader=False, port="auto")

FastAPI backend

import asyncio
from broadcaster import Broadcast
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles


# FastAPI app and broadcaster
app = FastAPI()
broadcast = Broadcast("memory://")


app.mount("/static", StaticFiles(directory="static"), name="static")


@app.get("/favicon.ico")
async def favicon():
    return {"message": "No favicon provided!"}


@app.on_event("startup")
async def startup():
    await broadcast.connect()


@app.on_event("shutdown")
async def shutdown():
    await broadcast.disconnect()


@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    print("WebSocket connected")
    await websocket.accept()
    async with broadcast.subscribe(channel="notifications") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)


@app.get("/")
async def read_root():
    return {"message": "Welcome to the WebSocket server!"}


@app.get("/work_endpoint")
async def do_work():
    print("Starting work")
    for i in range(10):
        await broadcast.publish(
            channel="notifications", message=f"Performing iteration {i}/10"
        )
        print(f"Iteration {i}")
        await asyncio.sleep(1)  # Use asyncio.sleep for non-blocking behavior
    return {"response": "Ping received"}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="127.0.0.1", port=8080)

FlorianJacta avatar Dec 16 '24 16:12 FlorianJacta