agents
agents copied to clipboard
Google TTS (Gemini) now supports streaming โ any plans to integrate it into LiveKit?
Feature Type
I cannot use LiveKit without it
Feature Description
Hey team,
Googleโs new TTS (Gemini) has recently come out of beta and now supports streaming speech synthesis. It looks like it could offer faster response times and improved voice quality compared to the existing Google Cloud TTS integration.
Is there any plan to add support for the Gemini TTS API (especially the streaming mode) in the near future?
Thanks!
Workarounds / Alternatives
No response
Additional Context
No response
Hi, as of right now you can access Gemini TTS via google.beta.GeminiTTS(), we can look into lifting it out of beta and maintaining backward compatibility
Thank you for the note. As you can see, the beta Gemini is set to streaming=False hard-coded. That's a big blocker for Gemini TTS.
Thanks.
the result of this is high latency when returning a response
any update on this?
In the meantime, if anyone wants to use a custom node (forked from LiveKit's Google default TTS):
from __future__ import annotations
import asyncio
import os
import weakref
from collections.abc import AsyncGenerator
from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import DeadlineExceeded, GoogleAPICallError
from google.cloud import texttospeech
from livekit.agents import (
APIConnectOptions,
APIStatusError,
APITimeoutError,
tokenize,
tts,
utils,
)
from livekit.agents.cli.cli import SAMPLE_RATE
from livekit.agents.log import logger
from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
MODEL = "gemini-2.5-flash-tts"
SPEAKING_RATE = 1.0
class GeminiStreamingTTS(tts.TTS):
def __init__(self) -> None:
super().__init__(
capabilities=tts.TTSCapabilities(streaming=True),
sample_rate=SAMPLE_RATE,
num_channels=1,
)
self._client: texttospeech.TextToSpeechAsyncClient | None = None
self._streams = weakref.WeakSet[SynthesizeStream]()
@property
def model(self) -> str:
return MODEL
@property
def provider(self) -> str:
return "Google Cloud Platform"
def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient:
api_endpoint = "texttospeech.googleapis.com"
if self._client is None:
self._client = (
texttospeech.TextToSpeechAsyncClient.from_service_account_file(
os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
client_options=ClientOptions(api_endpoint=api_endpoint),
)
)
assert self._client is not None
return self._client
def stream(
self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
stream = SynthesizeStream(tts=self, conn_options=conn_options)
self._streams.add(stream)
return stream
def synthesize(
self,
text: str,
*,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
async def aclose(self) -> None:
for stream in list(self._streams):
await stream.aclose()
self._streams.clear()
class ChunkedStream(tts.ChunkedStream):
async def _run(self, output_emitter: tts.AudioEmitter) -> None:
raise NotImplementedError(
"Our GeminiStreamingTTS implementation does not support non-streaming synthesis."
)
class SynthesizeStream(tts.SynthesizeStream):
def __init__(self, *, tts: GeminiStreamingTTS, conn_options: APIConnectOptions):
super().__init__(tts=tts, conn_options=conn_options)
self._tts: GeminiStreamingTTS = tts
self._segments_ch = utils.aio.Chan[tokenize.SentenceStream]()
self.tokenizer = tokenize.blingfire.SentenceTokenizer()
async def _run(self, output_emitter: tts.AudioEmitter) -> None:
output_emitter.initialize(
request_id=utils.shortuuid(),
sample_rate=SAMPLE_RATE,
num_channels=1,
# changing to opus after this bug is fixed!
# https://github.com/livekit/agents/issues/3863
mime_type="audio/pcm",
stream=True,
)
streaming_config = texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
name="Puck", model_name=MODEL, language_code="en-US"
),
streaming_audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.PCM,
sample_rate_hertz=SAMPLE_RATE,
speaking_rate=SPEAKING_RATE,
),
custom_pronunciations=None,
)
async def _tokenize_input() -> None:
input_stream = None
async for input in self._input_ch:
if isinstance(input, str):
if input_stream is None:
input_stream = self.tokenizer.stream()
self._segments_ch.send_nowait(input_stream)
input_stream.push_text(input)
elif isinstance(input, self._FlushSentinel):
if input_stream:
input_stream.end_input()
input_stream = None
self._segments_ch.close()
async def _run_segments() -> None:
async for input_stream in self._segments_ch:
await self._run_stream(input_stream, output_emitter, streaming_config)
tasks = [
asyncio.create_task(_tokenize_input()),
asyncio.create_task(_run_segments()),
]
try:
await asyncio.gather(*tasks)
finally:
await utils.aio.cancel_and_wait(*tasks)
async def _run_stream(
self,
input_stream: tokenize.SentenceStream,
output_emitter: tts.AudioEmitter,
streaming_config: texttospeech.StreamingSynthesizeConfig,
) -> None:
@utils.log_exceptions(logger=logger)
async def input_generator() -> AsyncGenerator[
texttospeech.StreamingSynthesizeRequest, None
]:
try:
yield texttospeech.StreamingSynthesizeRequest(
streaming_config=streaming_config
)
async for input in input_stream:
self._mark_started()
yield (
texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(
text=input.token,
# Prompt is only supported in the first input chunk.
)
)
)
except Exception:
logger.exception(
"an error occurred while streaming input to google TTS"
)
input_gen = input_generator()
try:
stream = await self._tts._ensure_client().streaming_synthesize(
input_gen, timeout=self._conn_options.timeout
)
output_emitter.start_segment(segment_id=utils.shortuuid())
async for resp in stream:
output_emitter.push(resp.audio_content)
output_emitter.end_segment()
except DeadlineExceeded:
raise APITimeoutError() from None
except GoogleAPICallError as e:
raise APIStatusError(e.message, status_code=e.code or -1) from e
finally:
await input_gen.aclose()
super().__init__(
instructions="",
stt=...,
llm=...,
tts=GeminiStreamingTTS()
)
(you might have make some changes depending on what credentials you use for Google Cloud)
It works but has initial heavy load time, compared to cartesia