fastrtc icon indicating copy to clipboard operation
fastrtc copied to clipboard

Unable to process audio-video modality properly

Open niranjanakella opened this issue 9 months ago • 1 comments

The code samples and informaiton regarding the audio-video modality is very limited @freddyaboulton. I am trying to build a AWS bedrock flow with this and I did try to wrap my head around the gemini audio-video chat that you had. But clearly am unable to understand the flow. Below is my code sample, just am not able to understand how to make it work at all. And I strongly feed that you should make this audio-video thing work with open LLMs and vLLM locally if possible with examples.

import os
import time, boto3, json
import base64
from io import BytesIO
import asyncio

import gradio as gr
import numpy as np
from dotenv import load_dotenv
# from elevenlabs import ElevenLabs
from fastapi import FastAPI
from fastrtc import (
    AdditionalOutputs,
    AsyncAudioVideoStreamHandler,
    ReplyOnPause,
    Stream,
    get_stt_model,
    get_twilio_turn_credentials,
    wait_for_item,
)
from gradio.utils import get_space
from groq import Groq
from numpy.typing import NDArray
from PIL import Image

load_dotenv()
# groq_client = Groq()
# tts_client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY"))
stt_model = get_stt_model()

polly = boto3.client('polly')
brk = boto3.client('bedrock-runtime')

def encode_image(data: np.ndarray) -> str:
    # Accepts np.ndarray (H, W, 3), returns base64 JPEG string
    with BytesIO() as output_bytes:
        pil_image = Image.fromarray(data.astype(np.uint8))
        pil_image.save(output_bytes, "JPEG")
        bytes_data = output_bytes.getvalue()
    return base64.b64encode(bytes_data).decode("utf-8")

class LLMVoiceVideoHandler(AsyncAudioVideoStreamHandler):
    def __init__(self):
        super().__init__(
            expected_layout="mono",
            output_sample_rate=16000,
            input_sample_rate=16000,
        )
        self.latest_video_frame = None
        self.chatbot = []
        self.audio_queue = asyncio.Queue()
        self.video_queue = asyncio.Queue()
        self.tts_queue = asyncio.Queue()
        self.quit = asyncio.Event()
        self.processing = False

    def copy(self):
        return LLMVoiceVideoHandler()

    async def start_up(self):
        # Start background task to process audio and video
        # TTS
        polly_response = polly.synthesize_speech(
            Text="Hello Welcome to the World",
            OutputFormat='pcm',
            VoiceId='Joanna'
        )
        stream = polly_response['AudioStream']

        chunk_size = 10000  # bytes; adjust as needed for your use case

        # for i in range(1000000):  # Large upper bound to avoid infinite loop
        #     chunk = stream.read(chunk_size)
        #     if not chunk:
        #         break

        audio_array = np.frombuffer(stream.read(), dtype=np.int16).reshape(1, -1)
        self.tts_queue.put_nowait(audio_array)

        # --- End handle_interaction logic ---
        self.processing = False
    
        while not self.quit.is_set():
            # Wait for audio (simulate pause detection by chunk size or time)
            audio_chunk = await self.audio_queue.get()
            if not self.processing:
                self.processing = True

                sr = audio_chunk[0]
                audio_np = audio_chunk[1]
                audio = (sr, audio_np)

                # Transcribe
                text = stt_model.stt(audio)
                if text == "":
                    self.processing = False
                    # await asyncio.sleep(1)
                    continue

                print("transcribed", text)
                self.chatbot.append({"role": "user", "content": text})
                await self.tts_queue.put(AdditionalOutputs(self.chatbot))
                # Prepare Bedrock payload
                messages = [{"role": d["role"], "content": d["content"]} for d in self.chatbot]
                payload = {
                    "messages": messages,
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 4096,
                    'temperature': 0.8,
                    'top_k': 250,
                    'top_p': 0.999,
                }
                # Attach image if available
                if self.latest_video_frame is not None:
                    payload["messages"][-1] = {"role": "user", 
                                                "content": [{
                                                    "type": "image",
                                                    "source": {
                                                        "type": "base64",
                                                        "media_type": "image/jpeg",
                                                        "data": encode_image(self.latest_video_frame)
                                                    }
                                                },{
                                                    "type": "text",
                                                    "text": payload["messages"][-1]["content"]
                                                    }
                                                ]

                                                    }

                response = brk.invoke_model(
                    modelId="anthropic.claude-3-haiku-20240307-v1:0",
                    body=json.dumps(payload),
                    contentType="application/json"
                )
                response_body = json.loads(response.get('body').read())
                response_text = response_body['content'][-1]['text']
                self.chatbot.append({"role": "assistant", "content": response_text})
                # TTS
                polly_response = polly.synthesize_speech(
                    Text=response_text,
                    OutputFormat='pcm',
                    VoiceId='Joanna'
                )
                stream = polly_response['AudioStream']
                chunk_size = 10000
                i = 0
                while True:
                    chunk = stream.read(chunk_size)
                    if not chunk:
                        break
                    if i == 0:
                        await self.tts_queue.put(AdditionalOutputs(self.chatbot))
                    audio_array = np.frombuffer(chunk, dtype=np.int16).reshape(1, -1)
                    await self.tts_queue.put((16000, audio_array))
                    i += 1
                # --- End handle_interaction logic ---
                self.processing = False

    # Audio handling
    async def receive(self, frame: tuple[int, NDArray[np.int16 | np.float32]]):
        # Buffer audio for STT
        # _, array = frame
        # array = array.squeeze()
        self.audio_queue.put_nowait(frame)

    async def emit(self):
        # Emit TTS audio chunks if available
        array = await wait_for_item(self.tts_queue, 0.01)
        if array is not None:
            return (self.output_sample_rate, array)
        return array

    # Video handling
    async def video_receive(self, frame: np.ndarray):
        self.video_queue.put_nowait(frame)
        # Store the latest video frame
        self.latest_video_frame = frame

    async def video_emit(self):
        frame = await wait_for_item(self.video_queue, 0.01)
        if frame is not None:
            return frame
        else:
            # Return a blank frame (e.g., 100x100 black image)
            return np.zeros((100, 100, 3), dtype=np.uint8)

    async def shutdown(self) -> None:
        # if self.session:
        self.quit.set()
            # await self.session.close()
        self.quit.clear()
            
chatbot = gr.Chatbot(type="messages")
stream = Stream(
    modality="audio-video",
    mode="send-receive",
    handler=LLMVoiceVideoHandler(),
    additional_outputs_handler=lambda a, b: b,
    rtc_configuration=get_twilio_turn_credentials() if get_space() else None,
    concurrency_limit=5 if get_space() else None,
    time_limit=90 if get_space() else None,
    ui_args={"title": "LLM Voice+Video Chat (Bedrock, Polly, WebRTC ⚡️)"},
)

# Mount the STREAM UI to the FastAPI app
# Because I don't want to build the UI manually
# app = FastAPI()
# app = gr.mount_gradio_app(app, stream.ui, path="/")


if __name__ == "__main__":
    # import os

    # os.environ["GRADIO_SSR_MODE"] = "false"
    # stream.fastphone(host="0.0.0.0", port=7860)
    stream.ui.launch(server_port=7860)

I tried my level best but unable to make it work. The audio transcription always outputs '' empty string because the audio samples are way small. It worked well when only doing the audio modality but with audio-video I am unable to understand the flow. And trust me when I say this is for a bigger agentic project and not just some side project.

@mhart @freddyaboulton Please kindly help me out.

niranjanakella avatar Jun 26 '25 10:06 niranjanakella

@freddyaboulton Any idea how to host a local or bedrock LLM for this purpose like I am trying to do? Should have have a different session created with the audio stream coming in ? or something else. Please help. Thank you.

May be is it possible to like stream the audio frames to a ReplyOnPause things with VAD and then if there is some activity then we would trigger the LLM flow?

niranjanakella avatar Jun 27 '25 11:06 niranjanakella