pipecat icon indicating copy to clipboard operation
pipecat copied to clipboard

Audio Mixer 100% CPU Spike On Twilio Client Disconnected

Open ghd-uds opened this issue 2 months ago • 2 comments

pipecat version

0.0.91

Python version

3.12.7

Operating System

Ubuntu 22.04.03

Issue description

When an audio mixer (sound mixer) is added to a pipeline that uses FastAPI WebSocket Transport and a Twilio Serializer, there is a significant CPU spike that exceeds 100%, regardless of the instance size or number of CPU cores.

This issue occurs consistently with the following pipeline configuration:

Pipeline: Deepgram STT → OpenAI LLM → Cartesia TTS (FastAPI WebSocket Transport with Twilio Serializer)

The CPU spike happens when the call is disconnected by the pipeline, and the usage remains indefinitely high instead of returning to normal levels.

Reproduction steps

  1. Write and run the pipeline code with the described setup (always seen with Cartesia TTS).
  2. Initiate a Twilio call through the pipeline.
  3. Allow the call to proceed normally.
  4. End the call using the pipeline’s Twilio hangup feature.
  5. Monitor CPU usage during and after the call disconnects.
  6. Observe that the CPU usage spikes at the moment of disconnection and remains at 100% indefinitely.

Expected behavior

The CPU spike shouldn't happen at the end of the calls. Even if it happens for a millisecond, it should come back to normal but it remains indefinitely at 100%

Actual behavior

The CPU usage remains stuck at 100% indefinitely after a call ends, instead of returning to normal when using audio mixer. This has been verified by checking the individual thread usage using the py-spy module.

Logs


Image

Image Image

ghd-uds avatar Oct 31 '25 16:10 ghd-uds

CODE FIX FOR SOUNDFILEMIXER_AUDIO.PY:

Copyright (c) 2024–2025, Daily

SPDX-License-Identifier: BSD 2-Clause License

"""Soundfile-based audio mixer for file playback integration.

Provides an audio mixer that combines incoming audio with audio loaded from files using the soundfile library. Supports multiple audio formats and runtime configuration changes. """

import asyncio from typing import Any, Dict, Mapping

import numpy as np from loguru import logger

from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer from pipecat.frames.frames import MixerControlFrame, MixerEnableFrame, MixerUpdateSettingsFrame

try: import soundfile as sf except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error( "In order to use the soundfile mixer, you need to pip install pipecat-ai[soundfile]." ) raise Exception(f"Missing module: {e}")

class SoundfileMixer(BaseAudioMixer): """Audio mixer that combines incoming audio with file-based audio.

This is an audio mixer that mixes incoming audio with audio from a
file. It uses the soundfile library to load files so it supports multiple
formats. The audio files need to only have one channel (mono) and it needs
to match the sample rate of the output transport.

Multiple files can be loaded, each with a different name. The
`MixerUpdateSettingsFrame` has the following settings available: `sound`
(str) and `volume` (float) to be able to update to a different sound file or
to change the volume at runtime.
"""

def __init__(
    self,
    *,
    sound_files: Mapping[str, str],
    default_sound: str,
    volume: float = 0.4,
    mixing: bool = True,
    loop: bool = True,
    **kwargs,
):
    """Initialize the soundfile mixer.

    Args:
        sound_files: Mapping of sound names to file paths for loading.
        default_sound: Name of the default sound to play initially.
        volume: Mixing volume level (0.0 to 1.0). Defaults to 0.4.
        mixing: Whether mixing is initially enabled. Defaults to True.
        loop: Whether to loop audio files when they end. Defaults to True.
        **kwargs: Additional arguments passed to parent class.
    """
    super().__init__(**kwargs)
    self._sound_files = sound_files
    self._volume = volume
    self._sample_rate = 0

    self._sound_pos = 0
    self._sounds: Dict[str, Any] = {}
    self._current_sound = default_sound
    self._mixing = mixing
    self._loop = loop

async def start(self, sample_rate: int):
    """Initialize the mixer and load all sound files.

    Args:
        sample_rate: The sample rate of the output transport in Hz.
    """
    self._sample_rate = sample_rate
    for sound_name, file_name in self._sound_files.items():
        await asyncio.to_thread(self._load_sound_file, sound_name, file_name)

async def stop(self):
    """Clean up mixer resources."""
    self._mixing = False  # Disable mixing immediately on shutdown
    self._sounds.clear()  # Clear sound data
    self._current_sound = None  # Clear current sound reference
    self._sound_pos = 0  # Reset position

async def process_frame(self, frame: MixerControlFrame):
    """Process mixer control frames to update settings or enable/disable mixing.

    Args:
        frame: The mixer control frame to process.
    """
    if isinstance(frame, MixerUpdateSettingsFrame):
        await self._update_settings(frame)
    elif isinstance(frame, MixerEnableFrame):
        await self._enable_mixing(frame.enable)
    pass

async def mix(self, audio: bytes) -> bytes:
    """Mix transport audio with the current sound file.

    Args:
        audio: Raw audio bytes from the transport to mix.

    Returns:
        Mixed audio bytes combining transport and file audio.
    """
    # Run in thread pool to avoid blocking the event loop during numpy operations
    return await asyncio.to_thread(self._mix_with_sound, audio)

async def _enable_mixing(self, enable: bool):
    """Enable or disable audio mixing."""
    self._mixing = enable

async def _update_settings(self, frame: MixerUpdateSettingsFrame):
    """Update mixer settings from a control frame."""
    for setting, value in frame.settings.items():
        match setting:
            case "sound":
                await self._change_sound(value)
            case "volume":
                await self._update_volume(value)
            case "loop":
                await self._update_loop(value)

async def _change_sound(self, sound: str):
    """Change the currently playing sound file.

    Args:
        sound: Name of the sound file to switch to.
    """
    if sound in self._sound_files:
        self._current_sound = sound
        self._sound_pos = 0
    else:
        logger.error(f"Sound {sound} is not available")

async def _update_volume(self, volume: float):
    """Update the mixing volume level."""
    self._volume = volume

async def _update_loop(self, loop: bool):
    """Update the looping behavior."""
    self._loop = loop

def _load_sound_file(self, sound_name: str, file_name: str):
    """Load an audio file into memory for mixing."""
    try:
        logger.debug(f"Loading mixer sound from {file_name}")
        sound, sample_rate = sf.read(file_name, dtype="int16")

        if sample_rate == self._sample_rate:
            audio = sound.tobytes()
            # Convert from np to bytes again.
            self._sounds[sound_name] = np.frombuffer(audio, dtype=np.int16)
        else:
            logger.warning(
                f"Sound file {file_name} has incorrect sample rate {sample_rate} (should be {self._sample_rate})"
            )
    except Exception as e:
        logger.error(f"Unable to open file {file_name}: {e}")

def _mix_with_sound(self, audio: bytes):
    """Mix raw audio frames with chunks of the same length from the sound file."""
    # Critical: Check mixing status FIRST before any operations
    if not self._mixing:
        return audio

    # Check if sounds are available
    if not self._sounds or self._current_sound not in self._sounds:
        return audio

    audio_np = np.frombuffer(audio, dtype=np.int16)
    chunk_size = len(audio_np)

    # Sound currently playing.
    sound = self._sounds[self._current_sound]

    # Go back to the beginning if we don't have enough data.
    if self._sound_pos + chunk_size > len(sound):
        if not self._loop:
            return audio
        self._sound_pos = 0

    start_pos = self._sound_pos
    end_pos = self._sound_pos + chunk_size
    self._sound_pos = end_pos

    sound_np = sound[start_pos:end_pos]

    # Mix and clamp audio in one operation
    mixed_audio = np.clip(audio_np + sound_np * self._volume, -32768, 32767).astype(np.int16)

    return mixed_audio.tobytes()

ghd-uds avatar Nov 04 '25 19:11 ghd-uds

Mind submitting this in PR format so we can see the diff?

markbackman avatar Nov 04 '25 19:11 markbackman