agents icon indicating copy to clipboard operation
agents copied to clipboard

Provide Real-Time VideoAvatar Latency Information

Open Viktoriagrg opened this issue 1 month ago • 2 comments

Feature Type

I cannot use LiveKit without it

Feature Description

VideoAvatar playback does not provide any mechanism to track or measure latency. When sending audio via livekit-simply or livekit-liveavatar plugins, audio frames are forwarded asynchronously, but there is no integration with QueueAudioOutput in livekit.agents.voice.avatar to monitor real-time playback positions or latency

This makes it impossible to measure, debug or improve at all VideoAvatar latency in real time

Workarounds / Alternatives

Include an optional timestamp for each audio frame and propagate it through the avatar pipeline so latency can be measured at the receiving end.

Additional Context

No response

Viktoriagrg avatar Dec 08 '25 15:12 Viktoriagrg

hey!

Implement metrics collector for it. Devs, maybe you can implement this logic normally for every plugin?)

Added a new LatencyAudioOutput wrapper around DataStreamAudioOutput.

  • Capturing per-audio-chunk metadata: send_ts, ingest_ts (when available), duration
  • Introduced a safe internal queue to pair outgoing audio with incoming video.
  • Hooked into the avatar’s video track to process each frame.

For each frame, calculating:

  • Generation latency (audio send → video receive)
  • Total latency (ingest → video receive) Logging per-frame metrics so we can start evaluating delays and pipeline behavior.

This is an early version — works, but needs refinement.

class LatencyAudioOutput(DataStreamAudioOutput):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._audio_send_times = deque()  # deque of {'send_ts': float, 'ingest_ts': float | None, 'duration': float}
        self._total_audio_sent = 0
        self._lock = asyncio.Lock()

    async def capture_frame(self, frame: rtc.AudioFrame):
        async with self._lock:
            send_ts = time.time()
            ingest_ts = frame.userdata.get("ingest_ts")
            
            # add to this audio chunk with its timestamps
            self._audio_send_times.append({
                'send_ts': send_ts,
                'ingest_ts': ingest_ts,
                'duration': frame.duration
            })
            self._total_audio_sent += 1
            
            logger.debug(f"Audio pushed: total={self._total_audio_sent}, pending={len(self._audio_send_times)}")
        
        await super().capture_frame(frame)
    
    async def pop_audio_timing(self):
        """Pop the oldest audio timing for latency calculation"""
        async with self._lock:
            #print(f'dequeue   {self._audio_send_times}')
            if self._audio_send_times:
                timm = self._audio_send_times.popleft()
                print("TIMM. ", timm)
                return timm
            return None


def attach_video_latency_listener(room: rtc.Room, audio_output: LatencyAudioOutput, avatar_session):
    @room.on("track_subscribed")
    def subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
        logger.info(f"Track subscribed: kind={publication.kind}, participant={participant.identity}")
        
        if publication.kind != rtc.TrackKind.KIND_VIDEO:
            return
        
        if participant.identity != avatar_session._avatar_participant_identity:
            logger.info(f"Skipping non-avatar participant: {participant.identity}")
            return
        
        if isinstance(track, rtc.RemoteVideoTrack):
            logger.info("Setting up video frame listener for avatar")
            
            async def listen_video_frames():
                try:
                    video_stream = rtc.VideoStream(track)
                    logger.info("Video stream created, waiting for frames...")
                    frame_count = 0
                    
                    async for frame in video_stream:
                        frame_count += 1
                        
                        # get the oldest audio timing
                        timing = await audio_output.pop_audio_timing()
                        
                        if timing:
                            recv_ts = time.time()
                            
                            # Network + processing latency (from sending audio to receiving video)
                            generation_latency_ms = (recv_ts - timing['send_ts']) * 1000
                            logger.debug(f"[SIMLI AVATAR GENERATION LATENCY] {generation_latency_ms:.1f} ms (frame {frame_count})")
                            
                            # Total end-to-end latency (from audio ingestion to video receipt)
                            if timing['ingest_ts']:
                                total_latency_ms = (recv_ts - timing['ingest_ts']) * 1000
                                logger.debug(f"[SIMLI TOTAL AVATAR LATENCY] {total_latency_ms:.1f} ms (frame {frame_count})")
                        else:
                            # idle animation
                            #logger.debug(f"Video frame {frame_count} received (no pending audio)")
                            pass
                            
                    logger.info("Video stream ended")
                except Exception as e:
                    logger.error(f"Error in video frame listener: {e}", exc_info=True)
            
            task = asyncio.create_task(listen_video_frames())
            avatar_session._latency_tasks.add(task)
            task.add_done_callback(avatar_session._latency_tasks.discard)
            logger.info("Video frame listener task created")

Viktoriagrg avatar Dec 10 '25 16:12 Viktoriagrg

Is there any guideline on how to create custom Avatar plugin and integrate to Livekit?

Arslan-Mehmood1 avatar Dec 22 '25 06:12 Arslan-Mehmood1

Hi team, I’ve opened an issue and a PR that adds VideoAvatar metrics calculation.

This improves observability by introducing structured metrics for monitoring and debugging. I’d appreciate a review when convenient and any feedback on whether the approach aligns with the project’s direction. I’m also open to any suggestions or alternative implementations you might propose.

Thanks.

Viktoriagrg avatar Jan 08 '26 10:01 Viktoriagrg