vosk-server
vosk-server copied to clipboard
Profanity filtering?
Greetings. Thank you for your wonderful project, we work a lot with the Deaf and Hard of Hearing (HoH) with our volunteer work, and use Jitsi a lot, so we are very grateful. Since we work with protected and at-risk populations, including children, are there any options or models that can address a "profanity filter" of some kind? For example, I said jigger this to fit, instead Vosk used the "N" word. Would be good to have a profanity filter option that ********* any ban listed words. Is this possible as-is with Vosk with some clever approach you could suggest? Or would this require a whole other feature extension or model overhaul? Appreciate your feedback and suggestions on how we could make this work. Thank you kindly!
That would be a nice feature to have, it should be easily doable in python code.
It would help if you contribute a list of words to filter.
This example list here is much smaller than our list, but couldn't this be used as testing list for proof of concept: https://www.cs.cmu.edu/~biglou/resources/bad-words.txt Is this something the community here is willing to take on? Or is there a way to just "plug in" some third party component into Vosk instead of building custom code? For example could something like this be easily added somehow: https://pypi.org/project/profanity-check/ ? Thank you very much for your support in trying to make this work.
Hello, I was trying to apply profanity filtering (using the profanity-filter python library) and I have some confusion about how Vosk produces partial and full transcripts. In the image below, I have applied a profanity filter to the partial text, however, Vosk sends an uncensored full text. (I have also added the words "[Partial]" and "[Full]" to help distinguish the strings)
I was wondering how this works? I assumed the full text was constructed from the partial sequence. But does Vosk handle partial and full text separately instead?
I have pasted the code below (slightly modified from the vosk-server/websocket/asr_server.py
file):
#!/usr/bin/env python3
import json
import os
import sys
import asyncio
import pathlib
import websockets
import concurrent.futures
import logging
from vosk import Model, KaldiRecognizer
from profanity_filter import ProfanityFilter
def process_chunk(rec, message):
if message == '{"eof" : 1}':
return rec.FinalResult(), True
elif rec.AcceptWaveform(message):
return rec.Result(), False
else:
return rec.PartialResult(), False
async def recognize(websocket, path):
global model
global args
global loop
global pool
rec = None
phrase_list = None
sample_rate = args.sample_rate
show_words = args.show_words
max_alternatives = args.max_alternatives
logging.info('Connection from %s', websocket.remote_address);
pf = None
while True:
message = await websocket.recv()
# Load configuration if provided
if isinstance(message, str) and 'config' in message:
jobj = json.loads(message)['config']
logging.info("Config %s", jobj)
if 'phrase_list' in jobj:
phrase_list = jobj['phrase_list']
if 'sample_rate' in jobj:
sample_rate = float(jobj['sample_rate'])
if 'words' in jobj:
show_words = bool(jobj['words'])
if 'max_alternatives' in jobj:
max_alternatives = int(jobj['max_alternatives'])
continue
# Create the recognizer, word list is temporary disabled since not every model supports it
if not rec:
if phrase_list:
rec = KaldiRecognizer(model, sample_rate, json.dumps(phrase_list, ensure_ascii=False))
else:
rec = KaldiRecognizer(model, sample_rate)
rec.SetWords(show_words)
rec.SetMaxAlternatives(max_alternatives)
response, stop = await loop.run_in_executor(pool, process_chunk, rec, message)
if not pf:
pf = ProfanityFilter()
parsed_response = json.loads(response)
if "partial" in response:
censored_transcript = pf.censor(parsed_response["partial"])
parsed_response["partial"] = "[Partial] " + censored_transcript
logging.info('Censored response: %s', parsed_response["partial"])
elif "text" in response:
parsed_response["text"] = "[Full] " + parsed_response["text"]
response = json.dumps(parsed_response)
await websocket.send(response)
if stop: break
def start():
global model
global args
global loop
global pool
# Enable loging if needed
#
# logger = logging.getLogger('websockets')
# logger.setLevel(logging.INFO)
# logger.addHandler(logging.StreamHandler())
logging.basicConfig(level=logging.INFO)
args = type('', (), {})()
args.interface = os.environ.get('VOSK_SERVER_INTERFACE', '0.0.0.0')
args.port = int(os.environ.get('VOSK_SERVER_PORT', 2700))
args.model_path = os.environ.get('VOSK_MODEL_PATH', 'model')
args.sample_rate = float(os.environ.get('VOSK_SAMPLE_RATE', 8000))
args.max_alternatives = int(os.environ.get('VOSK_ALTERNATIVES', 0))
args.show_words = bool(os.environ.get('VOSK_SHOW_WORDS', True))
if len(sys.argv) > 1:
args.model_path = sys.argv[1]
# Gpu part, uncomment if vosk-api has gpu support
#
# from vosk import GpuInit, GpuInstantiate
# GpuInit()
# def thread_init():
# GpuInstantiate()
# pool = concurrent.futures.ThreadPoolExecutor(initializer=thread_init)
model = Model(args.model_path)
pool = concurrent.futures.ThreadPoolExecutor((os.cpu_count() or 1))
loop = asyncio.get_event_loop()
start_server = websockets.serve(
recognize, args.interface, args.port)
logging.info("Listening on %s:%d", args.interface, args.port)
loop.run_until_complete(start_server)
loop.run_forever()
if __name__ == '__main__':
start()
I have set up Vosk Server as outlined here
Please let me know if you need any more details. I would happy to test out some suggestions as well.
As for a list of swear words, there is an audio dataset called TAPAD that contains audio files of numerous swear words: https://github.com/theabuseproject/tapad The files are in .mp3
format, but they can be converted to .wav
using FFmpeg.
For example, this blog post discusses training an AI model on the TAPAD dataset to auto-censor swear words.
Update on the profanity filter
I decided to use two libraries, profanity-filter and alt-profanity-check because the combination performs faster than using profanity filter alone. I tested two conditions on some profane text and found that both censored the text the exact same. Then I tested the performance using cProfile:
Metric | Profanity Check, Then Filter | Profanity Filter |
---|---|---|
Function Calls | 763751 function calls (732075 primitive calls) | 2416817 function calls (2313160 primitive calls) |
Total Time | 1.069 seconds | 3.402 seconds |
Slowest Function (total time in seconds, total number of calls) | thinc/neural/ops.pyx:514(gemm) (0.235, 3522) | thinc/neural/ops.pyx:514(gemm) (0.768, 11640) |
Check and Filter condition appears to run faster. Both conditions have the same slowest function (from the Profanity Filter module), therefore, the Check and Filter condition is faster because it reduces the number of calls to the slowest function.
Here is my implementation of the profanity filter with the Vosk WebSocket asr_server.py
:
#!/usr/bin/env python3
import json
import os
import sys
import asyncio
import pathlib
import websockets
import concurrent.futures
import logging
from vosk import Model, KaldiRecognizer
from profanity_filter import ProfanityFilter
from profanity_check import predict
def process_chunk(rec, message):
if message == '{"eof" : 1}':
return rec.FinalResult(), True
elif rec.AcceptWaveform(message):
return rec.Result(), False
else:
return rec.PartialResult(), False
async def recognize(websocket, path):
global model
global args
global loop
global pool
global should_filter_profanity
rec = None
phrase_list = None
sample_rate = args.sample_rate
show_words = args.show_words
max_alternatives = args.max_alternatives
profanity_filter = None
logging.info('Connection from %s', websocket.remote_address);
while True:
message = await websocket.recv()
# Load configuration if provided
if isinstance(message, str) and 'config' in message:
jobj = json.loads(message)['config']
logging.info("Config %s", jobj)
if 'phrase_list' in jobj:
phrase_list = jobj['phrase_list']
if 'sample_rate' in jobj:
sample_rate = float(jobj['sample_rate'])
if 'words' in jobj:
show_words = bool(jobj['words'])
if 'max_alternatives' in jobj:
max_alternatives = int(jobj['max_alternatives'])
continue
# Create the recognizer, word list is temporary disabled since not every model supports it
if not rec:
if phrase_list:
rec = KaldiRecognizer(model, sample_rate, json.dumps(phrase_list, ensure_ascii=False))
else:
rec = KaldiRecognizer(model, sample_rate)
rec.SetWords(show_words)
rec.SetMaxAlternatives(max_alternatives)
response, stop = await loop.run_in_executor(pool, process_chunk, rec, message)
if should_filter_profanity:
py_json_response = json.loads(response)
if profanity_filter is None:
profanity_filter = ProfanityFilter()
py_json_response = filter_profanity(py_json_response, profanity_filter)
response = json.dumps(py_json_response)
await websocket.send(response)
if stop: break
def filter_profanity(response: dict, pf: ProfanityFilter):
if "partial" in response:
text_type = "partial"
elif "text" in response:
text_type = "text"
transcript = response[text_type]
has_profanity = predict([transcript])[0]
# logging.info("Transcript is profane? %s", (transcript, has_profanity))
if has_profanity:
censored_transcript = pf.censor(transcript)
response[text_type] = censored_transcript
return response
def start():
global model
global args
global loop
global pool
global should_filter_profanity
should_filter_profanity = True
# Enable loging if needed
#
# logger = logging.getLogger('websockets')
# logger.setLevel(logging.INFO)
# logger.addHandler(logging.StreamHandler())
logging.basicConfig(level=logging.INFO)
args = type('', (), {})()
args.interface = os.environ.get('VOSK_SERVER_INTERFACE', '0.0.0.0')
args.port = int(os.environ.get('VOSK_SERVER_PORT', 2700))
args.model_path = os.environ.get('VOSK_MODEL_PATH', 'model')
args.sample_rate = float(os.environ.get('VOSK_SAMPLE_RATE', 8000))
args.max_alternatives = int(os.environ.get('VOSK_ALTERNATIVES', 0))
args.show_words = bool(os.environ.get('VOSK_SHOW_WORDS', True))
if len(sys.argv) > 1:
args.model_path = sys.argv[1]
# Gpu part, uncomment if vosk-api has gpu support
#
# from vosk import GpuInit, GpuInstantiate
# GpuInit()
# def thread_init():
# GpuInstantiate()
# pool = concurrent.futures.ThreadPoolExecutor(initializer=thread_init)
model = Model(args.model_path)
pool = concurrent.futures.ThreadPoolExecutor((os.cpu_count() or 1))
loop = asyncio.get_event_loop()
start_server = websockets.serve(
recognize, args.interface, args.port)
logging.info("Listening on %s:%d", args.interface, args.port)
loop.run_until_complete(start_server)
loop.run_forever()
if __name__ == '__main__':
start()
Profanity Filter Implementation Update
vosk_server/websocket/asr_server.py
#!/usr/bin/env python3
import json
import os
import sys
import asyncio
import pathlib
import websockets
import concurrent.futures
import logging
from vosk import Model, SpkModel, KaldiRecognizer
from asr_server_filter import Filter
def process_chunk(rec, message):
if message == '{"eof" : 1}':
return rec.FinalResult(), True
elif rec.AcceptWaveform(message):
return rec.Result(), False
else:
return rec.PartialResult(), False
async def recognize(websocket, path):
global model
global spk_model
global args
global loop
global pool
rec = None
phrase_list = None
sample_rate = args.sample_rate
show_words = args.show_words
max_alternatives = args.max_alternatives
apply_filter = args.apply_filter
p_filter = None if not apply_filter else Filter()
logging.info('Connection from %s', websocket.remote_address);
while True:
message = await websocket.recv()
# Load configuration if provided
if isinstance(message, str) and 'config' in message:
jobj = json.loads(message)['config']
logging.info("Config %s", jobj)
if 'phrase_list' in jobj:
phrase_list = jobj['phrase_list']
if 'sample_rate' in jobj:
sample_rate = float(jobj['sample_rate'])
if 'words' in jobj:
show_words = bool(jobj['words'])
if 'max_alternatives' in jobj:
max_alternatives = int(jobj['max_alternatives'])
continue
# Create the recognizer, word list is temporary disabled since not every model supports it
if not rec:
if phrase_list:
rec = KaldiRecognizer(model, sample_rate, json.dumps(phrase_list, ensure_ascii=False))
else:
rec = KaldiRecognizer(model, sample_rate)
rec.SetWords(show_words)
rec.SetMaxAlternatives(max_alternatives)
if spk_model:
rec.SetSpkModel(spk_model)
response, stop = await loop.run_in_executor(pool, process_chunk, rec, message)
if apply_filter:
response = p_filter.filter(response)
await websocket.send(response)
if stop: break
def start():
global model
global spk_model
global args
global loop
global pool
# Enable loging if needed
#
# logger = logging.getLogger('websockets')
# logger.setLevel(logging.INFO)
# logger.addHandler(logging.StreamHandler())
logging.basicConfig(level=logging.INFO)
args = type('', (), {})()
args.interface = os.environ.get('VOSK_SERVER_INTERFACE', '0.0.0.0')
args.port = int(os.environ.get('VOSK_SERVER_PORT', 2700))
args.model_path = os.environ.get('VOSK_MODEL_PATH', 'model')
args.spk_model_path = os.environ.get('VOSK_SPK_MODEL_PATH')
args.sample_rate = float(os.environ.get('VOSK_SAMPLE_RATE', 8000))
args.max_alternatives = int(os.environ.get('VOSK_ALTERNATIVES', 0))
args.show_words = bool(os.environ.get('VOSK_SHOW_WORDS', True))
args.apply_filter = bool(os.environ.get('VOSK_FILTER', True))
if len(sys.argv) > 1:
args.model_path = sys.argv[1]
# Gpu part, uncomment if vosk-api has gpu support
#
# from vosk import GpuInit, GpuInstantiate
# GpuInit()
# def thread_init():
# GpuInstantiate()
# pool = concurrent.futures.ThreadPoolExecutor(initializer=thread_init)
model = Model(args.model_path)
spk_model = SpkModel(args.spk_model_path) if args.spk_model_path else None
pool = concurrent.futures.ThreadPoolExecutor((os.cpu_count() or 1))
loop = asyncio.get_event_loop()
start_server = websockets.serve(
recognize, args.interface, args.port)
logging.info("Listening on %s:%d", args.interface, args.port)
loop.run_until_complete(start_server)
loop.run_forever()
if __name__ == '__main__':
start()
vosk-server/websocket/asr_server_filter.py
#!/usr/bin/env python3
import json
import logging
from profanity_filter import ProfanityFilter
from profanity_check import predict
class Filter:
def __init__(self):
self.pf = ProfanityFilter()
def filter(self, response: str):
py_json_response = self.apply_filter(json.loads(response))
return json.dumps(py_json_response)
def apply_filter(self, response: dict):
if "partial" in response:
text_type = "partial"
elif "text" in response:
text_type = "text"
transcript = response[text_type]
has_profanity = predict([transcript])[0]
#logging.info("Transcript is profane? %s", (transcript, has_profanity))
if has_profanity:
censored_transcript = self.pf.censor(transcript)
response[text_type] = censored_transcript
return response
Many thanks to Alex for his work on this effort!
We still have to migrate this into our official DEV and PRD cycle, will let folks know how that goes. If there are any bugs or user issues found as it goes through official DEV and QA.