vosk-api
vosk-api copied to clipboard
VOSK API Accuracy
I've done all I can to get this API to work but it's so wildly inaccurate even using the newest gigaspeech model. Is there something in my code causing it to be so inaccurate? (When I say inaccurate I mean 100% incorrect 99% of the time. Here is a link to the .py file and input audio vosk_test.zip.)
import subprocess
import wave
import json
import multiprocessing
from tqdm import tqdm
from vosk import Model, KaldiRecognizer
from termcolor import colored
def analyze_audio(audio_file):
print(colored("Analyzing audio properties...", "yellow"))
ffprobe_cmd = ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=channels,sample_rate",
"-of", "json", audio_file]
result = subprocess.run(ffprobe_cmd, capture_output=True, text=True)
output = result.stdout
properties = {}
try:
json_data = json.loads(output)
stream = json_data["streams"][0]
properties["channels"] = int(stream["channels"])
properties["sample_rate"] = int(stream["sample_rate"])
print(colored("Audio analysis complete.", "green"))
except (json.JSONDecodeError, KeyError):
raise ValueError("Error analyzing audio file.")
return properties
def apply_audio_processing(data, audio_properties):
print(colored("Applying audio processing...", "yellow"))
try:
process = subprocess.Popen(
["ffmpeg", "-hide_banner", "-y", "-f", "s16le", "-ar", str(audio_properties["sample_rate"]), "-ac",
str(audio_properties["channels"]), "-i", "-", "-f", "s16le", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
processed_data, err = process.communicate(input=data)
if process.returncode != 0:
print(colored("Error occurred during audio processing:", "red"))
print(err.decode())
return b'' # Return an empty byte array instead of None
return processed_data
except Exception as e:
print(colored(f"Error occurred during audio processing: {str(e)}", "red"))
return b'' # Return an empty byte array instead of None
def process_frames(args):
frames, audio_properties = args
processed_frames = []
for data in tqdm(frames, desc=colored("Processing", "cyan")):
processed_data = apply_audio_processing(data, audio_properties)
processed_frames.append(processed_data)
return processed_frames
def transcribe_audio(processed_frames, model, sample_rate):
print(colored("Transcribing audio...", "green"))
recognizer = KaldiRecognizer(model, sample_rate)
results = []
for chunk in tqdm(processed_frames, desc=colored("Transcribing", "yellow")):
recognizer.AcceptWaveform(chunk)
result = recognizer.Result()
print(result) # Debug statement to print individual transcription result for each chunk
results.append(result)
return results
def main():
audio_file_path = r"C:\Users\Jonathan\Desktop\Recording.m4a"
converted_wav_file = "converted_audio.wav"
processed_audio_file = "processed_audio.wav"
try:
print(colored("Converting audio file to WAV format...", "green"))
subprocess.run(["ffmpeg", "-i", audio_file_path, converted_wav_file])
print(colored("Audio file converted successfully.", "green"))
print(colored("Opening the converted WAV file...", "green"))
with wave.open(converted_wav_file, "rb") as wf:
print(colored("Analyzing audio properties...", "green"))
audio_properties = analyze_audio(converted_wav_file)
# Set sample rate variables
sample_rate = audio_properties["sample_rate"]
processed_frames = []
buffer_size = 4000
num_processes = multiprocessing.cpu_count()
frames = [wf.readframes(buffer_size) for _ in range(int(wf.getnframes() / buffer_size))]
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(process_frames, [(frames[i:i + num_processes], audio_properties)
for i in range(0, len(frames), num_processes)])
processed_frames = [data for sublist in results for data in sublist]
processed_data = b''.join(processed_frames)
chunk_size = 4000
print(colored("Loading Vosk model...", "green"))
model_path = r"C:\Users\Jonathan\Desktop\GPT\vosk-model-en-us-0.42-gigaspeech"
model = Model(model_path)
print(colored("Processed Data Length:", "green"))
print(len(processed_data))
print(colored("Transcribing audio...", "green"))
transcription_results = transcribe_audio(processed_frames, model, sample_rate)
combined_transcription = {"text": ""}
for result in transcription_results:
result_dict = json.loads(result)
transcription = result_dict.get('text', '')
combined_transcription["text"] += transcription + " "
print(colored("Transcription:", "green"))
print(combined_transcription["text"])
print(colored("Saving processed audio...", "green"))
with wave.open(processed_audio_file, "wb") as processed_wf:
processed_wf.setnchannels(audio_properties["channels"])
processed_wf.setsampwidth(wf.getsampwidth())
processed_wf.setframerate(audio_properties["sample_rate"])
processed_wf.writeframes(processed_data)
print(colored("Processed audio saved successfully.", "green"))
except subprocess.CalledProcessError as e:
print(colored("Error converting the audio file to WAV format:", "red"))
print(colored(str(e), "red"))
except wave.Error as e:
print(colored("Error opening the converted WAV file:", "red"))
print(colored(str(e), "red"))
except Exception as e:
print(colored(f"Error: {str(e)}", "red"))
if __name__ == "__main__":
main()
I also created a much simpler script just to test and I still got the same issue, here is that second script:
import subprocess
import wave
import json
import logging
from vosk import Model, KaldiRecognizer
from tkinter import Tk, filedialog
from tqdm import tqdm
from colorama import init, Fore, Style
# Disable vosk library logging
logging.getLogger("vosk").setLevel(logging.ERROR)
# Initialize colorama
init()
# Print a formatted message with a border
def print_message(message):
border = "=" * len(message)
print(f"{Style.BRIGHT}{Fore.YELLOW}{border}")
print(message)
print(border)
print(Style.RESET_ALL)
print_message("Please select an audio file.")
# Open a file dialog to select an audio file
Tk().withdraw()
audio_file_path = filedialog.askopenfilename()
# Convert audio file to WAV format
converted_wav_file = "converted_audio.wav"
try:
# Use ffmpeg to convert the audio file to WAV
subprocess.run(["ffmpeg", "-i", audio_file_path, converted_wav_file])
print_message("Audio file converted to WAV format.")
# Initialize the model
print_message("Loading model...")
model_path = "C:\\Users\\Jonathan\\Desktop\\GPT\\vosk-model-en-us-0.22"
model = Model(model_path)
print_message("Model loaded.")
# Open the converted WAV file
with wave.open(converted_wav_file, "rb") as wf:
# Initialize the recognizer
rec = KaldiRecognizer(model, wf.getframerate())
# Buffer size for chunk-wise processing
buffer_size = 4000
# Calculate the total number of chunks
total_chunks = wf.getnframes() // buffer_size
print_message("Transcribing audio...")
# List to store transcriptions
transcriptions = []
with tqdm(total=total_chunks, desc="Transcribing", unit="chunk") as progress_bar:
while True:
# Read audio data chunk
data = wf.readframes(buffer_size)
if len(data) == 0:
# No more frames to read, break the loop
break
# Process the audio chunk with the recognizer
rec.AcceptWaveform(data)
# Get the partial transcription result
partial_result = rec.Result()
result_dict = json.loads(partial_result)
# Extract the text from the result
text = result_dict.get('text', '')
# Append the text to the transcription list
transcriptions.append(text)
progress_bar.update(1)
print_message("Audio transcription complete.")
# Get the final transcription result
final_result = rec.FinalResult()
final_dict = json.loads(final_result)
# Extract the text from the final result
final_text = final_dict.get('text', '')
# Append the final text to the transcription list
transcriptions.append(final_text)
print_message("Transcriptions:")
for i, transcription in enumerate(transcriptions):
print(f"{Style.BRIGHT}{Fore.CYAN}Transcription {i + 1}:{Style.RESET_ALL}")
print(transcription)
print()
except subprocess.CalledProcessError:
print_message("Error converting the audio file to WAV format.")
except wave.Error:
print_message("Error opening the converted WAV file.")
```
Please format your code properly using code formatting tags.
You need to use triple ticks
I updated my original post with proper formatting.
audio file must be mono 16khz, you need to add proper options to ffmpeg (-ac 1 -ar 16000)
audio file must be mono 16khz, you need to add proper options to ffmpeg (-ac 1 -ar 16000)
Thanks for the help! I updated the code to
import subprocess
import wave
import json
import multiprocessing
from tqdm import tqdm
from vosk import Model, KaldiRecognizer
from termcolor import colored
def analyze_audio(audio_file):
print(colored("Analyzing audio properties...", "yellow"))
ffprobe_cmd = ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=channels,sample_rate",
"-of", "json", audio_file]
result = subprocess.run(ffprobe_cmd, capture_output=True, text=True)
output = result.stdout
properties = {}
try:
json_data = json.loads(output)
stream = json_data["streams"][0]
properties["channels"] = int(stream["channels"])
properties["sample_rate"] = int(stream["sample_rate"])
print(colored("Audio analysis complete.", "green"))
except (json.JSONDecodeError, KeyError):
raise ValueError("Error analyzing audio file.")
return properties
def apply_audio_processing(data, audio_properties):
print(colored("Applying audio processing...", "yellow"))
try:
process = subprocess.Popen(
["ffmpeg", "-hide_banner", "-y", "-f", "s16le", "-ar", str(audio_properties["sample_rate"]), "-ac", str(audio_properties["channels"]), "-i", "-", "-f", "s16le", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
processed_data, err = process.communicate(input=data)
if process.returncode != 0:
print(colored("Error occurred during audio processing:", "red"))
print(err.decode())
return b'' # Return an empty byte array instead of None
return processed_data
except Exception as e:
print(colored(f"Error occurred during audio processing: {str(e)}", "red"))
return b'' # Return an empty byte array instead of None
def process_frames(args):
frames, audio_properties = args
processed_frames = []
for data in tqdm(frames, desc=colored("Processing", "cyan")):
processed_data = apply_audio_processing(data, audio_properties)
processed_frames.append(processed_data)
return processed_frames
def transcribe_audio(processed_frames, model, sample_rate):
print(colored("Transcribing audio...", "green"))
recognizer = KaldiRecognizer(model, sample_rate)
results = []
for chunk in tqdm(processed_frames, desc=colored("Transcribing", "yellow")):
recognizer.AcceptWaveform(chunk)
result = recognizer.Result()
print(result) # Debug statement to print individual transcription result for each chunk
results.append(result)
return results
def main():
audio_file_path = r"C:\Users\Jonathan\Desktop\Recording.m4a"
converted_wav_file = "converted_audio.wav"
processed_audio_file = "processed_audio.wav"
try:
print(colored("Converting audio file to WAV format...", "green"))
subprocess.run(["ffmpeg", "-i", audio_file_path, "-ac", "1", "-ar", "16000", converted_wav_file])
print(colored("Audio file converted successfully.", "green"))
print(colored("Opening the converted WAV file...", "green"))
with wave.open(converted_wav_file, "rb") as wf:
print(colored("Analyzing audio properties...", "green"))
audio_properties = analyze_audio(converted_wav_file)
# Set sample rate variables
sample_rate = audio_properties["sample_rate"]
processed_frames = []
buffer_size = 4000
num_processes = multiprocessing.cpu_count()
frames = [wf.readframes(buffer_size) for _ in range(int(wf.getnframes() / buffer_size))]
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(process_frames, [(frames[i:i + num_processes], audio_properties)
for i in range(0, len(frames), num_processes)])
processed_frames = [data for sublist in results for data in sublist]
processed_data = b''.join(processed_frames)
chunk_size = 4000
print(colored("Loading Vosk model...", "green"))
model_path = r"C:\Users\Jonathan\Desktop\GPT\vosk-model-en-us-0.22"
model = Model(model_path)
print(colored("Processed Data Length:", "green"))
print(len(processed_data))
print(colored("Transcribing audio...", "green"))
transcription_results = transcribe_audio(processed_frames, model, sample_rate)
combined_transcription = {"text": ""}
for result in transcription_results:
result_dict = json.loads(result)
transcription = result_dict.get('text', '')
combined_transcription["text"] += transcription + " "
print(colored("Transcription:", "green"))
print(combined_transcription["text"])
print(colored("Saving processed audio...", "green"))
with wave.open(processed_audio_file, "wb") as processed_wf:
processed_wf.setnchannels(audio_properties["channels"])
processed_wf.setsampwidth(wf.getsampwidth())
processed_wf.setframerate(audio_properties["sample_rate"])
processed_wf.writeframes(processed_data)
print(colored("Processed audio saved successfully.", "green"))
except subprocess.CalledProcessError as e:
print(colored("Error converting the audio file to WAV format:", "red"))
print(colored(str(e), "red"))
except wave.Error as e:
print(colored("Error opening the converted WAV file:", "red"))
print(colored(str(e), "red"))
except Exception as e:
print(colored(f"Error: {str(e)}", "red"))
if __name__ == "__main__":
main()
These changes made it about 40% more accurate, is there any way to improve it anymore?
You can pick any modern model depending on the type of data you have
https://alphacephei.com/nsh/2022/10/22/whisper.html
for high quality audio whisper should be good