agents icon indicating copy to clipboard operation
agents copied to clipboard

Google TTS (Gemini) now supports streaming โ€” any plans to integrate it into LiveKit?

Open dvirginz opened this issue 4 weeks ago โ€ข 3 comments

Feature Type

I cannot use LiveKit without it

Feature Description

Hey team,

Googleโ€™s new TTS (Gemini) has recently come out of beta and now supports streaming speech synthesis. It looks like it could offer faster response times and improved voice quality compared to the existing Google Cloud TTS integration.

Is there any plan to add support for the Gemini TTS API (especially the streaming mode) in the near future?

Thanks!

Workarounds / Alternatives

No response

Additional Context

No response

dvirginz avatar Nov 10 '25 15:11 dvirginz

Hi, as of right now you can access Gemini TTS via google.beta.GeminiTTS(), we can look into lifting it out of beta and maintaining backward compatibility

tinalenguyen avatar Nov 11 '25 06:11 tinalenguyen

Thank you for the note. As you can see, the beta Gemini is set to streaming=False hard-coded. That's a big blocker for Gemini TTS.

Thanks.

dvirginz avatar Nov 11 '25 07:11 dvirginz

the result of this is high latency when returning a response

ringofhealth avatar Nov 14 '25 13:11 ringofhealth

any update on this?

oijoijcoiejoijce avatar Nov 29 '25 03:11 oijoijcoiejoijce

In the meantime, if anyone wants to use a custom node (forked from LiveKit's Google default TTS):

from __future__ import annotations

import asyncio
import os
import weakref
from collections.abc import AsyncGenerator

from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import DeadlineExceeded, GoogleAPICallError
from google.cloud import texttospeech
from livekit.agents import (
    APIConnectOptions,
    APIStatusError,
    APITimeoutError,
    tokenize,
    tts,
    utils,
)
from livekit.agents.cli.cli import SAMPLE_RATE
from livekit.agents.log import logger
from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
MODEL = "gemini-2.5-flash-tts"
SPEAKING_RATE = 1.0


class GeminiStreamingTTS(tts.TTS):
    def __init__(self) -> None:
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=SAMPLE_RATE,
            num_channels=1,
        )

        self._client: texttospeech.TextToSpeechAsyncClient | None = None
        self._streams = weakref.WeakSet[SynthesizeStream]()

    @property
    def model(self) -> str:
        return MODEL

    @property
    def provider(self) -> str:
        return "Google Cloud Platform"

    def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient:
        api_endpoint = "texttospeech.googleapis.com"

        if self._client is None:
            self._client = (
                texttospeech.TextToSpeechAsyncClient.from_service_account_file(
                    os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
                    client_options=ClientOptions(api_endpoint=api_endpoint),
                )
            )

        assert self._client is not None
        return self._client

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()


class ChunkedStream(tts.ChunkedStream):
    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        raise NotImplementedError(
            "Our GeminiStreamingTTS implementation does not support non-streaming synthesis."
        )


class SynthesizeStream(tts.SynthesizeStream):
    def __init__(self, *, tts: GeminiStreamingTTS, conn_options: APIConnectOptions):
        super().__init__(tts=tts, conn_options=conn_options)
        self._tts: GeminiStreamingTTS = tts
        self._segments_ch = utils.aio.Chan[tokenize.SentenceStream]()
        self.tokenizer = tokenize.blingfire.SentenceTokenizer()

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        output_emitter.initialize(
            request_id=utils.shortuuid(),
            sample_rate=SAMPLE_RATE,
            num_channels=1,
            # changing to opus after this bug is fixed!
            # https://github.com/livekit/agents/issues/3863
            mime_type="audio/pcm",
            stream=True,
        )

        streaming_config = texttospeech.StreamingSynthesizeConfig(
            voice=texttospeech.VoiceSelectionParams(
                name="Puck", model_name=MODEL, language_code="en-US"
            ),
            streaming_audio_config=texttospeech.StreamingAudioConfig(
                audio_encoding=texttospeech.AudioEncoding.PCM,
                sample_rate_hertz=SAMPLE_RATE,
                speaking_rate=SPEAKING_RATE,
            ),
            custom_pronunciations=None,
        )

        async def _tokenize_input() -> None:
            input_stream = None
            async for input in self._input_ch:
                if isinstance(input, str):
                    if input_stream is None:
                        input_stream = self.tokenizer.stream()
                        self._segments_ch.send_nowait(input_stream)
                    input_stream.push_text(input)
                elif isinstance(input, self._FlushSentinel):
                    if input_stream:
                        input_stream.end_input()
                    input_stream = None

            self._segments_ch.close()

        async def _run_segments() -> None:
            async for input_stream in self._segments_ch:
                await self._run_stream(input_stream, output_emitter, streaming_config)

        tasks = [
            asyncio.create_task(_tokenize_input()),
            asyncio.create_task(_run_segments()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.cancel_and_wait(*tasks)

    async def _run_stream(
        self,
        input_stream: tokenize.SentenceStream,
        output_emitter: tts.AudioEmitter,
        streaming_config: texttospeech.StreamingSynthesizeConfig,
    ) -> None:
        @utils.log_exceptions(logger=logger)
        async def input_generator() -> AsyncGenerator[
            texttospeech.StreamingSynthesizeRequest, None
        ]:
            try:
                yield texttospeech.StreamingSynthesizeRequest(
                    streaming_config=streaming_config
                )

                async for input in input_stream:
                    self._mark_started()
                    yield (
                        texttospeech.StreamingSynthesizeRequest(
                            input=texttospeech.StreamingSynthesisInput(
                                text=input.token,
                                # Prompt is only supported in the first input chunk.
                            )
                        )
                    )

            except Exception:
                logger.exception(
                    "an error occurred while streaming input to google TTS"
                )

        input_gen = input_generator()

        try:
            stream = await self._tts._ensure_client().streaming_synthesize(
                input_gen, timeout=self._conn_options.timeout
            )
            output_emitter.start_segment(segment_id=utils.shortuuid())

            async for resp in stream:
                output_emitter.push(resp.audio_content)

            output_emitter.end_segment()

        except DeadlineExceeded:
            raise APITimeoutError() from None
        except GoogleAPICallError as e:
            raise APIStatusError(e.message, status_code=e.code or -1) from e
        finally:
            await input_gen.aclose()
super().__init__(
    instructions="", 
    stt=..., 
    llm=..., 
    tts=GeminiStreamingTTS()
)

(you might have make some changes depending on what credentials you use for Google Cloud)

aryanvdesh avatar Nov 30 '25 01:11 aryanvdesh

It works but has initial heavy load time, compared to cartesia

mbrian-dev avatar Dec 01 '25 22:12 mbrian-dev