Provide Real-Time VideoAvatar Latency Information
Feature Type
I cannot use LiveKit without it
Feature Description
VideoAvatar playback does not provide any mechanism to track or measure latency. When sending audio via livekit-simply or livekit-liveavatar plugins, audio frames are forwarded asynchronously, but there is no integration with QueueAudioOutput in livekit.agents.voice.avatar to monitor real-time playback positions or latency
This makes it impossible to measure, debug or improve at all VideoAvatar latency in real time
Workarounds / Alternatives
Include an optional timestamp for each audio frame and propagate it through the avatar pipeline so latency can be measured at the receiving end.
Additional Context
No response
hey!
Implement metrics collector for it. Devs, maybe you can implement this logic normally for every plugin?)
Added a new LatencyAudioOutput wrapper around DataStreamAudioOutput.
- Capturing per-audio-chunk metadata:
send_ts,ingest_ts (when available),duration - Introduced a safe internal queue to pair outgoing audio with incoming video.
- Hooked into the avatar’s video track to process each frame.
For each frame, calculating:
- Generation latency (audio send → video receive)
- Total latency (ingest → video receive) Logging per-frame metrics so we can start evaluating delays and pipeline behavior.
This is an early version — works, but needs refinement.
class LatencyAudioOutput(DataStreamAudioOutput):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._audio_send_times = deque() # deque of {'send_ts': float, 'ingest_ts': float | None, 'duration': float}
self._total_audio_sent = 0
self._lock = asyncio.Lock()
async def capture_frame(self, frame: rtc.AudioFrame):
async with self._lock:
send_ts = time.time()
ingest_ts = frame.userdata.get("ingest_ts")
# add to this audio chunk with its timestamps
self._audio_send_times.append({
'send_ts': send_ts,
'ingest_ts': ingest_ts,
'duration': frame.duration
})
self._total_audio_sent += 1
logger.debug(f"Audio pushed: total={self._total_audio_sent}, pending={len(self._audio_send_times)}")
await super().capture_frame(frame)
async def pop_audio_timing(self):
"""Pop the oldest audio timing for latency calculation"""
async with self._lock:
#print(f'dequeue {self._audio_send_times}')
if self._audio_send_times:
timm = self._audio_send_times.popleft()
print("TIMM. ", timm)
return timm
return None
def attach_video_latency_listener(room: rtc.Room, audio_output: LatencyAudioOutput, avatar_session):
@room.on("track_subscribed")
def subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
logger.info(f"Track subscribed: kind={publication.kind}, participant={participant.identity}")
if publication.kind != rtc.TrackKind.KIND_VIDEO:
return
if participant.identity != avatar_session._avatar_participant_identity:
logger.info(f"Skipping non-avatar participant: {participant.identity}")
return
if isinstance(track, rtc.RemoteVideoTrack):
logger.info("Setting up video frame listener for avatar")
async def listen_video_frames():
try:
video_stream = rtc.VideoStream(track)
logger.info("Video stream created, waiting for frames...")
frame_count = 0
async for frame in video_stream:
frame_count += 1
# get the oldest audio timing
timing = await audio_output.pop_audio_timing()
if timing:
recv_ts = time.time()
# Network + processing latency (from sending audio to receiving video)
generation_latency_ms = (recv_ts - timing['send_ts']) * 1000
logger.debug(f"[SIMLI AVATAR GENERATION LATENCY] {generation_latency_ms:.1f} ms (frame {frame_count})")
# Total end-to-end latency (from audio ingestion to video receipt)
if timing['ingest_ts']:
total_latency_ms = (recv_ts - timing['ingest_ts']) * 1000
logger.debug(f"[SIMLI TOTAL AVATAR LATENCY] {total_latency_ms:.1f} ms (frame {frame_count})")
else:
# idle animation
#logger.debug(f"Video frame {frame_count} received (no pending audio)")
pass
logger.info("Video stream ended")
except Exception as e:
logger.error(f"Error in video frame listener: {e}", exc_info=True)
task = asyncio.create_task(listen_video_frames())
avatar_session._latency_tasks.add(task)
task.add_done_callback(avatar_session._latency_tasks.discard)
logger.info("Video frame listener task created")
Is there any guideline on how to create custom Avatar plugin and integrate to Livekit?
Hi team, I’ve opened an issue and a PR that adds VideoAvatar metrics calculation.
This improves observability by introducing structured metrics for monitoring and debugging. I’d appreciate a review when convenient and any feedback on whether the approach aligns with the project’s direction. I’m also open to any suggestions or alternative implementations you might propose.
Thanks.