google-cloud-rust icon indicating copy to clipboard operation
google-cloud-rust copied to clipboard

Speech v2 streaming request

Open skoky opened this issue 4 months ago • 2 comments

as described in https://docs.rs/google-cloud-speech-v2/latest/google_cloud_speech_v2/index.html

I am requesting GCP Speech V2 streaming API. We are using now a crate gerenareted from protobufs definition and its working fine with speech api v1. But there is some issue with v2 api and our app disconencts after 10 secs. The crate for reference https://github.com/skoky/google-cognitive-apis We wish to use official crate.

skoky avatar Sep 13 '25 07:09 skoky

Thank you for the bug report, it will help us make the case to implement this feature sooner vs. a number of other things.

coryan avatar Sep 15 '25 11:09 coryan

I also have been looking into ways to do this for the past few days and I came up with the following temporary solution:

  • copy the gRPC service definition for the cloud speech service to my project
  • generate a client using tonic-prost-build
  • use the generated client

This seemed to be the most promising interim solution for me, however I am getting an error from the server which I have not been able to solve. The top level error is a very generic h2 protocol error: http2 error and the inner error is a FRAME_SIZE_ERROR which I haven't found much information about at all

WARN   Some(tonic::transport::Error(Transport, hyper::Error(Http2, Error { kind: GoAway(b"", FRAME_SIZE_ERROR, Library) })))
ERROR  failed to connect to dictation API: status: 'Unknown error', self: "h2 protocol error: http2 error"

The full code I have implemented is below, if anyone is familiar with this error and can offer any insight I would appreciate it, and at least there would be a workable solution for anyone else also waiting for these streaming APIs in the rust client

Full Code Listing
GOOGLE_CLOUD_API_KEY = "..."
GOOGLE_CLOUD_PROJECT = "..."
GOOGLE_CLOUD_LOCATION = "eu"
GOOGLE_CLOUD_RECOGNIZER = "my-model"
GOOGLE_CLOUD_SPEECH_ENDPOINT = "https://speech.googleapis.com"
tonic::include_proto!("google.cloud.speech.v2");

use futures::executor::block_on;
use futures::stream::StreamExt;
use futures::Stream;
use log::{info, warn};
use tonic::{
    service::{interceptor::InterceptedService, Interceptor},
    transport::Channel,
    Streaming,
};

use crate::{
    config::Config,
    service::{Error, Result},
};

use speech_client::SpeechClient;
use streaming_recognize_request::StreamingRequest;

#[derive(Clone)]
pub struct DictationService {
    recognizer: String,
    // TODO: update to use google_cloud_speech_v2 when streaming APIs are available
    client: SpeechClient<InterceptedService<Channel, ApiKeyInterceptor>>,
}

impl DictationService {
    pub fn new(config: &Config) -> Self {
        let channel = block_on(
            Channel::builder(
                (&config.google_cloud_speech_endpoint)
                    .try_into()
                    .expect("invalid google cloud speech API endpoint"),
            )
            // tried various values here ranging from 2kb to 1mb but nothing seemed to work
            // .initial_connection_window_size(10 * 1024)
            // .initial_stream_window_size(10 * 1024)
            .connect(),
        )
        .expect("unable to connect to google cloud speech API");

        Self {
            recognizer: format!(
                "projects/{}/locations/{}/recognizers/{}",
                config.google_cloud_project,
                config.google_cloud_location,
                config.google_cloud_recognizer
            ),
            client: SpeechClient::with_interceptor(
                channel,
                ApiKeyInterceptor {
                    api_key: config.google_cloud_api_key.clone(),
                },
            ),
        }
    }

    pub async fn transcribe<'a, A, E>(
        &self,
        audio: A,
    ) -> Result<Streaming<StreamingRecognizeResponse>>
    where
        A: Stream<Item = std::result::Result<Vec<u8>, E>> + Send + Unpin + 'static,
        E: std::error::Error,
    {
        let mut client = self.client.clone();
        let recognizer = self.recognizer.clone();

        // initial config request followed by audio chunks
        let requests = futures::stream::once(async move {
            StreamingRecognizeRequest {
                recognizer: recognizer.clone(),
                streaming_request: None,
            }
        })
        .chain(audio.map(move |c| {
            let c = c.unwrap_or_default();
            info!("sending audio chunk of size [{}] bytes", c.len());
            StreamingRecognizeRequest {
                recognizer: "".to_string(),
                streaming_request: Some(StreamingRequest::Audio(c)),
            }
        }));

        Ok(client
            .streaming_recognize(requests)
            .await
            .map_err(|err| {
                warn!("{:?}", std::error::Error::source(&err));
                Error::InternalError(format!("failed to connect to dictation API: {err}"))
            })?
            .into_inner())
    }
}

#[derive(Clone)]
struct ApiKeyInterceptor {
    api_key: String,
}

impl Interceptor for ApiKeyInterceptor {
    fn call(
        &mut self,
        mut req: tonic::Request<()>,
    ) -> std::result::Result<tonic::Request<()>, tonic::Status> {
        req.metadata_mut().insert(
            "x-goog-api-key",
            self.api_key.parse().map_err(|_| {
                tonic::Status::internal("dictation API credentials could not be parsed")
            })?,
        );
        Ok(req)
    }
}

turbosheep44 avatar Oct 26 '25 18:10 turbosheep44