pipecat icon indicating copy to clipboard operation
pipecat copied to clipboard

Interruption handling over FastAPIWebsocket not working when Bot speaking

Open samthnkr opened this issue 4 months ago • 2 comments

pipecat version

0.0.73

Python version

3.12

Operating System

macOS

Question

While using Pipecat for voice bot application between client-server interaction over Websocket, user started speaking messages cannot be seen in logs and hence real-time barge-ins are not happening. Are there any inbuilt features in Pipecat-AI FastAPI Websocket based communication where real-time user interruption are captured, task-cancellation performed and User Speaking message sent to client side.

What I've tried

As of now, we have encapsulated the existing libraries to send "USER_SPEAKING" messages to client but this adds latency to the process(around 3-5 seconds). Following encapsulations were done for sending audio packets to be sent in asynchronous manner and also perform web-socket "USER_SPEAKING" signal sending in real-time.

The default behavior of sending audio can block the server from receiving

# user audio, preventing interruptions. We replace the transport's audio
# writing method with a non-blocking "fire-and-forget" version.
# --------------------------------------------------------------------------
async def non_blocking_write_audio_frame(frame: OutputAudioRawFrame):
    data = await transport._params.serializer.serialize(frame)
    if data:
        # Schedule the send operation to run in the background without blocking.
        # asyncio.create_task(websocket.send_bytes(data))
        asyncio.create_task(transport._client.send(data))

transport.output().write_audio_frame = non_blocking_write_audio_frame
# --------------------------------------------------------------------------

# <<< NEW: SEND USER_SPEAKING SIGNAL TO CLIENT >>>
# We override the method that handles user interruptions to also send a
# text message over the websocket when the user starts speaking.
# --------------------------------------------------------------------------
# Get a reference to the original method from the transport's input handler.
original_handle_user_interruption = transport.input()._handle_user_interruption

async def handle_user_interruption_with_signal(frame: Frame):
    # First, run the original logic to ensure interruptions and other
    # pipeline events work as expected.
    await original_handle_user_interruption(frame)

    # Now, add our custom logic.
    if isinstance(frame, UserStartedSpeakingFrame):
        logger.debug("Sending 'USER_SPEAKING' signal to client")
        try:
            # We directly use the underlying websocket object to send a
            # text message, bypassing the transport's binary serializer.
            await websocket.send_text("USER_SPEAKING")
        except Exception as e:
            logger.error(f"Failed to send USER_SPEAKING signal: {e}")

# Apply the patch to the input transport.
transport.input()._handle_user_interruption = handle_user_interruption_with_signal

Context

The first part of the issue faced was when instead of USER_STARTED_SPEAKING message we were getting "Emulating user started speaking" message in logs when FastAPI based transport was used. The most anticipated issue was Audio packets being sent were consuming the whole thread from which audio input packets were being received. This phenomenon resulted in VAD not happening on server side. When this issue was resolved via encapsulation, the latency for VAD detection turned out to be higher (>3 seconds).

samthnkr avatar Aug 18 '25 11:08 samthnkr

Yes, this is already supported using the Pipecat client SDKs. You can check out this example: https://github.com/pipecat-ai/pipecat-examples/tree/main/websocket

Using the WebSocketTransport, the client and server can communicate the RTVI events, which include the user started / stopped speaking events: https://docs.pipecat.ai/client/js/api-reference/callbacks#audio-and-voice-activity

markbackman avatar Aug 18 '25 12:08 markbackman

what about server APIs though? i am having the same issue with gemini 2.5 flash native audio with a websocket telephony

tesla1900 avatar Nov 29 '25 06:11 tesla1900

i am having the same issue with gemini 2.5 flash native audio with a websocket telephony

I'm assuming you're referring to Gemini Live, right? If so, the interruptions are internal to the model, which is, in ways, a black box. That would be different from what the original user asked.

Closing this out since the original question was answered and the thread has gone stale.

markbackman avatar Nov 30 '25 15:11 markbackman