Speech v2 streaming request
as described in https://docs.rs/google-cloud-speech-v2/latest/google_cloud_speech_v2/index.html
I am requesting GCP Speech V2 streaming API. We are using now a crate gerenareted from protobufs definition and its working fine with speech api v1. But there is some issue with v2 api and our app disconencts after 10 secs. The crate for reference https://github.com/skoky/google-cognitive-apis We wish to use official crate.
Thank you for the bug report, it will help us make the case to implement this feature sooner vs. a number of other things.
I also have been looking into ways to do this for the past few days and I came up with the following temporary solution:
- copy the gRPC service definition for the cloud speech service to my project
- generate a client using
tonic-prost-build - use the generated client
This seemed to be the most promising interim solution for me, however I am getting an error from the server which I have not been able to solve. The top level error is a very generic h2 protocol error: http2 error and the inner error is a FRAME_SIZE_ERROR which I haven't found much information about at all
WARN Some(tonic::transport::Error(Transport, hyper::Error(Http2, Error { kind: GoAway(b"", FRAME_SIZE_ERROR, Library) })))
ERROR failed to connect to dictation API: status: 'Unknown error', self: "h2 protocol error: http2 error"
The full code I have implemented is below, if anyone is familiar with this error and can offer any insight I would appreciate it, and at least there would be a workable solution for anyone else also waiting for these streaming APIs in the rust client
Full Code Listing
GOOGLE_CLOUD_API_KEY = "..."
GOOGLE_CLOUD_PROJECT = "..."
GOOGLE_CLOUD_LOCATION = "eu"
GOOGLE_CLOUD_RECOGNIZER = "my-model"
GOOGLE_CLOUD_SPEECH_ENDPOINT = "https://speech.googleapis.com"
tonic::include_proto!("google.cloud.speech.v2");
use futures::executor::block_on;
use futures::stream::StreamExt;
use futures::Stream;
use log::{info, warn};
use tonic::{
service::{interceptor::InterceptedService, Interceptor},
transport::Channel,
Streaming,
};
use crate::{
config::Config,
service::{Error, Result},
};
use speech_client::SpeechClient;
use streaming_recognize_request::StreamingRequest;
#[derive(Clone)]
pub struct DictationService {
recognizer: String,
// TODO: update to use google_cloud_speech_v2 when streaming APIs are available
client: SpeechClient<InterceptedService<Channel, ApiKeyInterceptor>>,
}
impl DictationService {
pub fn new(config: &Config) -> Self {
let channel = block_on(
Channel::builder(
(&config.google_cloud_speech_endpoint)
.try_into()
.expect("invalid google cloud speech API endpoint"),
)
// tried various values here ranging from 2kb to 1mb but nothing seemed to work
// .initial_connection_window_size(10 * 1024)
// .initial_stream_window_size(10 * 1024)
.connect(),
)
.expect("unable to connect to google cloud speech API");
Self {
recognizer: format!(
"projects/{}/locations/{}/recognizers/{}",
config.google_cloud_project,
config.google_cloud_location,
config.google_cloud_recognizer
),
client: SpeechClient::with_interceptor(
channel,
ApiKeyInterceptor {
api_key: config.google_cloud_api_key.clone(),
},
),
}
}
pub async fn transcribe<'a, A, E>(
&self,
audio: A,
) -> Result<Streaming<StreamingRecognizeResponse>>
where
A: Stream<Item = std::result::Result<Vec<u8>, E>> + Send + Unpin + 'static,
E: std::error::Error,
{
let mut client = self.client.clone();
let recognizer = self.recognizer.clone();
// initial config request followed by audio chunks
let requests = futures::stream::once(async move {
StreamingRecognizeRequest {
recognizer: recognizer.clone(),
streaming_request: None,
}
})
.chain(audio.map(move |c| {
let c = c.unwrap_or_default();
info!("sending audio chunk of size [{}] bytes", c.len());
StreamingRecognizeRequest {
recognizer: "".to_string(),
streaming_request: Some(StreamingRequest::Audio(c)),
}
}));
Ok(client
.streaming_recognize(requests)
.await
.map_err(|err| {
warn!("{:?}", std::error::Error::source(&err));
Error::InternalError(format!("failed to connect to dictation API: {err}"))
})?
.into_inner())
}
}
#[derive(Clone)]
struct ApiKeyInterceptor {
api_key: String,
}
impl Interceptor for ApiKeyInterceptor {
fn call(
&mut self,
mut req: tonic::Request<()>,
) -> std::result::Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut().insert(
"x-goog-api-key",
self.api_key.parse().map_err(|_| {
tonic::Status::internal("dictation API credentials could not be parsed")
})?,
);
Ok(req)
}
}