NeMo
NeMo copied to clipboard
MSDD inference is too slow
I run the MSDD model on Nvidia A10 (24GB), but the inference is too slow, I looked on the code and there is a lot of traffic between the CPU and GPU and vice versa.
most of the time GPU utilization is on 0%
First the data is split into short segments according to the number of scales (I have 5 scales). After each scale splitting the embedding extraction is applied and save the embedding to pkl file. Then the the clustering is applied and finally the MSDD is applied.
Is there something that can be done in order to speed up the inference? Is there any flag for parallelism the embedding extraction stage?
please help.
What do you mean by parallelism of embedding extraction when you are inferencing on single GPU?
This is very recent issue we also discovered. It's not the MSDD but TitaNet embedding extractor is taking a lot of time. I will look into it and get back soon.
Hi @nithinraok , thanks for your response. What ai meant is that there is a for loop that run the segmentation and the embedding extraction. But as I saw the bottleneck of the execution time is the traffic between the GPU and the CPU, and of course the writing and reading files (e.g. pkl files)
Thanks @tango4j yes I also think so
You can skip writing pkl files as well, have you tried disabling saving pickle files through config?
Of course but the msdd use them so if I disable the saving of the pkl files I get FileNotFoundError
This issue is happening only for MSDD diarizer, not for clustering diarizer. I suppose something related to yaml setting is causing this. Let me get back to this soon.
I want to use the msdd diarization
@SagyHarpazGong Sure, let us work on this. Thanks...!
@SagyHarpazGong I have found that it is getting slow if we use TitaNet ckpt in MSDD nemo file. A quick fix is adding the following:
diarizer.speaker_embeddings.model_path="titanet_large" \
diarizer.msdd_model.parameters.use_speaker_model_from_ckpt=False \
in the yaml config. This could lead to a small change in terms of performance. (could be better or worse). If you want quick fix, try this. Otherwise, it will take some more time to fix the speed with the speaker model from ckpt.
@tango4j I checked as well and still slow, I'm really suspect that the reason for the slow inference is the utilization of the CPUs, most of the inference time the utilization of GPU is on 0%, and all I/O of file system is another reason for slow inference.
@SagyHarpazGong If you do not see the difference between before and after you apply the configs that I suggested, then your code is likely to not reflecting the change on loading TitaNet parameters. Check your CUDA settings, and batch size for diarization inference. Make sure you are maximizing the GPU memory.
If it changes and speeds up, but the improvement is less then 30%, then please let us know.
What CUDA settings should I need to check? By increasing the batch_size the memory used in the GPU is almost on the maximum and still it's slow
@SagyHarpazGong
Did you experience any relative improvement of speed after you apply use_speaker_model_from_ckpt=False
?
@tango4j not at all
@SagyHarpazGong I suspect that the changes in the configuration are not being reflected at all in your code base. Unfortunately, at this time, we don't have a solution for the slowdown issue on your machine with your setup.
Apart from this, I will update the NGC MSDD model checkpoint to resolve this slow down issue.
@tango4j thanks, I'll try to share images of the nvidia-smi during the inference in order to show you that most of the time the utilization of the GPU is on 0%
Hi all, I fixed the issue by inherit the classes: ClusteringDiarizer, ClusterEmbedding, NeuralDiarizer and modified them so instead of saving the embeddings in pkl files and load them for the MSDD inference , the embeddings are passing to the MSDD inference without using the file system they are in the GPU memory.
this is my implementation:
class ClustDiar(ClusteringDiarizer):
def _extract_embeddings(self, manifest_file: str, scale_idx: int, num_scales: int):
"""
This method extracts speaker embeddings from segments passed through manifest_file
Optionally you may save the intermediate speaker embeddings for debugging or any use.
"""
logging.info("Extracting embeddings for Diarization")
self._setup_spkr_test_data(manifest_file)
self.embeddings = {}
self._speaker_model.eval()
self.time_stamps = {}
all_embs = torch.empty([0], device=self._speaker_model.device)
for test_batch in tqdm(
self._speaker_model.test_dataloader(),
desc=f'[{scale_idx+1}/{num_scales}] extract embeddings',
leave=True,
disable=not self.verbose,
):
test_batch = [x.to(self._speaker_model.device) for x in test_batch]
audio_signal, audio_signal_len, labels, slices = test_batch
with autocast():
_, embs = self._speaker_model.forward(input_signal=audio_signal, input_signal_length=audio_signal_len)
emb_shape = embs.shape[-1]
embs = embs.view(-1, emb_shape)
all_embs = torch.cat((all_embs, embs.detach()), dim=0)
del test_batch
with open(manifest_file, 'r', encoding='utf-8') as manifest:
for i, line in enumerate(manifest.readlines()):
line = line.strip()
dic = json.loads(line)
uniq_name = get_uniqname_from_filepath(dic['audio_filepath'])
if uniq_name in self.embeddings:
self.embeddings[uniq_name] = torch.cat((self.embeddings[uniq_name], all_embs[i].view(1, -1)))
else:
self.embeddings[uniq_name] = all_embs[i].view(1, -1)
if uniq_name not in self.time_stamps:
self.time_stamps[uniq_name] = []
start = dic['offset']
end = start + dic['duration']
self.time_stamps[uniq_name].append([start, end])
def diarize(self, paths2audio_files: List[str] = None, batch_size: int = 0):
"""
Diarize files provided thorugh paths2audio_files or manifest file
input:
paths2audio_files (List[str]): list of paths to file containing audio file
batch_size (int): batch_size considered for extraction of speaker embeddings and VAD computation
"""
self._out_dir = self._diarizer_params.out_dir
self._speaker_dir = os.path.join(self._diarizer_params.out_dir, 'speaker_outputs')
if os.path.exists(self._speaker_dir):
logging.warning("Deleting previous clustering diarizer outputs.")
shutil.rmtree(self._speaker_dir, ignore_errors=True)
os.makedirs(self._speaker_dir)
if not os.path.exists(self._out_dir):
os.mkdir(self._out_dir)
self._vad_dir = os.path.join(self._out_dir, 'vad_outputs')
self._vad_out_file = os.path.join(self._vad_dir, "vad_out.json")
if batch_size:
self._cfg.batch_size = batch_size
if paths2audio_files:
if type(paths2audio_files) is list:
self._diarizer_params.manifest_filepath = os.path.join(self._out_dir, 'paths2audio_filepath.json')
self.path2audio_files_to_manifest(paths2audio_files, self._diarizer_params.manifest_filepath)
else:
raise ValueError("paths2audio_files must be of type list of paths to file containing audio file")
self.AUDIO_RTTM_MAP = audio_rttm_map(self._diarizer_params.manifest_filepath)
out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
os.makedirs(out_rttm_dir, exist_ok=True)
# Speech Activity Detection
self._perform_speech_activity_detection()
# Segmentation
scales = self.multiscale_args_dict['scale_dict'].items()
self.emb_scale_seq_dict = {}
for scale_idx, (window, shift) in scales:
# Segmentation for the current scale (scale_idx)
self._run_segmentation(window, shift, scale_tag=f'_scale{scale_idx}')
# Embedding Extraction for the current scale (scale_idx)
self._extract_embeddings(self.subsegments_manifest_path, scale_idx, len(scales))
self.emb_scale_seq_dict[scale_idx] = self.embeddings
self.multiscale_embeddings_and_timestamps[scale_idx] = [self.embeddings, self.time_stamps]
embs_and_timestamps = get_embs_and_timestamps(
self.multiscale_embeddings_and_timestamps, self.multiscale_args_dict
)
# Clustering
all_reference, all_hypothesis = perform_clustering(
embs_and_timestamps=embs_and_timestamps,
AUDIO_RTTM_MAP=self.AUDIO_RTTM_MAP,
out_rttm_dir=out_rttm_dir,
clustering_params=self._cluster_params,
device=self._speaker_model.device,
verbose=self.verbose,
)
logging.info("Outputs are saved in {} directory".format(os.path.abspath(self._diarizer_params.out_dir)))
# Scoring
return score_labels(
self.AUDIO_RTTM_MAP,
all_reference,
all_hypothesis,
collar=self._diarizer_params.collar,
ignore_overlap=self._diarizer_params.ignore_overlap,
verbose=self.verbose,
)
class ClusEmb(ClusterEmbedding):
"""
This class is built for calculating cluster-average embeddings, segmentation and load/save of the estimated cluster labels.
The methods in this class is used for the inference of MSDD models.
Args:
cfg_diar_infer (DictConfig):
Config dictionary from diarization inference YAML file
cfg_msdd_model (DictConfig):
Config dictionary from MSDD model checkpoint file
Class Variables:
self.cfg_diar_infer (DictConfig):
Config dictionary from diarization inference YAML file
cfg_msdd_model (DictConfig):
Config dictionary from MSDD model checkpoint file
self._speaker_model (class `EncDecSpeakerLabelModel`):
This is a placeholder for class instance of `EncDecSpeakerLabelModel`
self.scale_window_length_list (list):
List containing the window lengths (i.e., scale length) of each scale.
self.scale_n (int):
Number of scales for multi-scale clustering diarizer
self.base_scale_index (int):
The index of the base-scale which is the shortest scale among the given multiple scales
"""
def __init__(
self, cfg_diar_infer: DictConfig, cfg_msdd_model: DictConfig, speaker_model: Optional[EncDecSpeakerLabelModel]
):
super().__init__(cfg_diar_infer, cfg_msdd_model, speaker_model)
self.cfg_diar_infer = cfg_diar_infer
self._cfg_msdd = cfg_msdd_model
self._speaker_model = speaker_model
self.scale_window_length_list = list(
self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.window_length_in_sec
)
self.scale_n = len(self.scale_window_length_list)
self.base_scale_index = len(self.scale_window_length_list) - 1
self.clus_diar_model = ClustDiar(cfg=self.cfg_diar_infer, speaker_model=self._speaker_model)
def run_clustering_diarizer(self, manifest_filepath: str, emb_dir: str):
"""
If no pre-existing data is provided, run clustering diarizer from scratch. This will create scale-wise speaker embedding
sequence, cluster-average embeddings, scale mapping and base scale clustering labels. Note that speaker embedding `state_dict`
is loaded from the `state_dict` in the provided MSDD checkpoint.
Args:
manifest_filepath (str):
Input manifest file for creating audio-to-RTTM mapping.
emb_dir (str):
Output directory where embedding files and timestamp files are saved.
Returns:
emb_sess_avg_dict (dict):
Dictionary containing cluster-average embeddings for each session.
emb_scale_seq_dict (dict):
Dictionary containing embedding tensors which are indexed by scale numbers.
base_clus_label_dict (dict):
Dictionary containing clustering results. Clustering results are cluster labels for the base scale segments.
"""
self.cfg_diar_infer.diarizer.manifest_filepath = manifest_filepath
self.cfg_diar_infer.diarizer.out_dir = emb_dir
# Run ClusteringDiarizer which includes system VAD or oracle VAD.
self._out_dir = self.clus_diar_model._diarizer_params.out_dir
self.out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
os.makedirs(self.out_rttm_dir, exist_ok=True)
self.clus_diar_model._cluster_params = self.cfg_diar_infer.diarizer.clustering.parameters
self.clus_diar_model.multiscale_args_dict[
"multiscale_weights"
] = self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.multiscale_weights
self.clus_diar_model._diarizer_params.speaker_embeddings.parameters = (
self.cfg_diar_infer.diarizer.speaker_embeddings.parameters
)
cluster_params = self.clus_diar_model._cluster_params
cluster_params = dict(cluster_params) if isinstance(cluster_params, DictConfig) else cluster_params.dict()
clustering_params_str = json.dumps(cluster_params, indent=4)
logging.info(f"Multiscale Weights: {self.clus_diar_model.multiscale_args_dict['multiscale_weights']}")
logging.info(f"Clustering Parameters: {clustering_params_str}")
scores = self.clus_diar_model.diarize(batch_size=self.cfg_diar_infer.batch_size)
# If RTTM (ground-truth diarization annotation) files do not exist, scores is None.
if scores is not None:
metric, speaker_mapping_dict, _ = scores
else:
metric, speaker_mapping_dict = None, None
# Get the mapping between segments in different scales.
self._embs_and_timestamps = get_embs_and_timestamps(
self.clus_diar_model.multiscale_embeddings_and_timestamps, self.clus_diar_model.multiscale_args_dict
)
session_scale_mapping_dict = self.get_scale_map(self._embs_and_timestamps)
clus_labels = self.load_clustering_labels(emb_dir)
emb_sess_avg_dict, base_clus_label_dict = self.get_cluster_avg_embs(
self.clus_diar_model.emb_scale_seq_dict, clus_labels, speaker_mapping_dict, session_scale_mapping_dict
)
self.clus_diar_model.emb_scale_seq_dict['session_scale_mapping'] = session_scale_mapping_dict
return emb_sess_avg_dict, self.clus_diar_model.emb_scale_seq_dict, base_clus_label_dict, metric
class NeuralDiar(NeuralDiarizer):
def __init__(self, cfg: Union[DictConfig, NeuralDiarizerInferenceConfig]):
super().__init__(cfg)
self._cfg = cfg
self._speaker_model = None
self.msdd_model = None
# Parameter settings for MSDD model
self.use_speaker_model_from_ckpt = cfg.diarizer.msdd_model.parameters.get('use_speaker_model_from_ckpt', True)
self.use_clus_as_main = cfg.diarizer.msdd_model.parameters.get('use_clus_as_main', False)
self.max_overlap_spks = cfg.diarizer.msdd_model.parameters.get('max_overlap_spks', 2)
self.num_spks_per_model = cfg.diarizer.msdd_model.parameters.get('num_spks_per_model', 2)
self.use_adaptive_thres = cfg.diarizer.msdd_model.parameters.get('use_adaptive_thres', True)
self.max_pred_length = cfg.diarizer.msdd_model.parameters.get('max_pred_length', 0)
self.diar_eval_settings = cfg.diarizer.msdd_model.parameters.get(
'diar_eval_settings', [(0.25, True), (0.25, False), (0.0, False)]
)
self._init_msdd_model(cfg)
self.diar_window_length = cfg.diarizer.msdd_model.parameters.diar_window_length
self.transfer_diar_params_to_model_params(self.msdd_model, cfg)
if self.msdd_model is None:
raise TypeError(f'The MSDD model is None')
# Initialize clustering and embedding preparation instance (as a diarization encoder).
self.clustering_embedding = ClusEmb(
cfg_diar_infer=cfg, cfg_msdd_model=self.msdd_model.cfg, speaker_model=self._speaker_model
)
# Parameters for creating diarization results from MSDD outputs.
self.clustering_max_spks = self.msdd_model.cfg.max_num_of_spks
self.overlap_infer_spk_limit = cfg.diarizer.msdd_model.parameters.get(
'overlap_infer_spk_limit', self.clustering_max_spks
)
This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.
This issue was closed because it has been inactive for 7 days since being marked as stale.
Looks like not many people use MSDD. It is 2024 mid and Nemo inference is super slow for MSDD and still no action is taken on this
@SagyHarpazGong did your implementation help reduce the CPU-GPU bottleneck and improve performance speed?
@prkumar112451 thanks for your comments, unfortunately we might have missed this or busy with other works, thank you for bringing this issue again.
@prkumar112451 have you tried msdd? is this the same experience you encountered? what is the error have you seen? Please elaborate for us to solve the issue.
@nithinraok Thanks so much for your response. Actually I am exploring AI models for diarization. Found Nemo and it has a decent accuracy but the problem is.. it is quite slow.
I have tried playing with almost every configuration available for us but still a 20 minutes call recording takes 1 minute to diarize. Also, on a P100 or T4 server, while processing 1 call the CPU spikes to 100% throughout the processing and GPU usage stays below 50% consistently.
I was hoping to recording the processing time of a 20 minute call to under 30 seconds (by half) but nothing worked.
The major problem with a 1 minute diarization time is.. how can we make this scalable. we would need lots of servers if we want to serve thousands of concurrent requests otherwise the delay will just keep on increasing.
My questions are :
- a 20 mins call takes 1 minute to transcribe. Is this delay expected?
- with such a huge delay, what is the best way to scale without using any NVIDIA paid server
I just ran to see the issue @SagyHarpazGong facing regarding GPU utilzation, however I notice that its currently using 70% of GPU as shown in below picture while using <20GB of GPU memory, so we couldn;t see the issue you are mentioning locally, and this is the configuration I am using:
MANIFEST_FILE=callhome_109.json'
python ./neural_diarizer/multiscale_diar_decoder_infer.py \
--config-path='./../conf/inference' --config-name='diar_infer_telephonic.yaml' \
diarizer.manifest_filepath=$MANIFEST_FILE \
diarizer.out_dir='/data/diarization_ch109/' \
diarizer.speaker_embeddings.model_path=$MODEL_PATH \
diarizer.clustering.parameters.oracle_num_speakers=True \
diarizer.oracle_vad=True \
diarizer.ignore_overlap=False \
diarizer.vad.model_path=null \
diarizer.asr.model_path=null \
diarizer.msdd_model.parameters.use_speaker_model_from_ckpt=False \
diarizer.msdd_model.model_path=/data/diar_msdd_telephonic.nemo
@prkumar112451 Thanks for detailed comments. Currently the way we improved accuracy of NeMo diarization is by using embeddings at multi-scales which I believe would be the issue for your 20min audio. There are ways to improve this.
First to answer them I would need some clarifications from your end,
- Would be sticking to "clustering only" a viable option for you?
- We can drastically improve speed by reducing the number of scales we use for getting embeddings at various resolutions but this comes at a cost of bit reduced accuracy, how are your accuracy requirements looking it?
- Would it be possible to share a sample 20min audio you are using (if not that is totally fine) and exact settings you were using.
@nithinraok Thanks for quick response.
the 20 min audio is a call center telephony conversation between a customer and an agent.
I am using a combination of Whisper for transcription and then Nemo for diarization. Taking this repo as reference -
https://github.com/piegu/language-models/blob/master/speech_to_text_transcription_with_speakers_Whisper_Transcription_%2B_NeMo_Diarization.ipynb?source=post_page-----8da2312f1617--------------------------------
we can see that lots of whisper optimization techniques are there like flash-attention, batching etc. And have been able to speed up whisper alot.
But the diarization part is acting as bottleneck. Just to be very sure, I completely removed the whisper part and ran a plain nemo's telephony based ai-model iar_msdd_telephonic but it's speed is 1 minute diarization time for 20 min call recording.
To answer your questions :
- any architecture that solves the use-case and reduce delay with okayish accuracy so that we can scale is good enough
- could you share the configuration name which we need to update to reduce the number of scales
- This is the configuration setting which we are using : https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/diar_infer_telephonic.yaml
Regarding the performace bottleneck of diarization, if you could tolerate some performance in accuracy, I would suggest you to try the clustering diarizer with single scale without msdd model, as shown in below config here:
MANIFEST_FILE='callhome_109.json'
python examples/speaker_tasks/diarization/clustering_diarizer/offline_diar_infer.py \
--config-path='examples/speaker_tasks/diarization/conf/inference' --config-name='diar_infer_telephonic.yaml' \
diarizer.manifest_filepath=$MANIFEST_FILE \
diarizer.out_dir='/data/sample/' \
diarizer.speaker_embeddings.model_path=${MODEL_PATH} \
diarizer.speaker_embeddings.parameters.window_length_in_sec=1.5 \
diarizer.speaker_embeddings.parameters.shift_length_in_sec=0.75 \
diarizer.vad.model_path='vad_multilingual_marblenet' \
diarizer.asr.model_path=null \
diarizer.msdd_model.model_path=null \
diarizer.oracle_vad=False \
diarizer.clustering.parameters.oracle_num_speakers=False \
batch_size=256 \
num_workers=1
This setting would be fast, you may note that we could switch from external VAD to ASR VAD as well, so you could do ASR+SD in one go. We explained some of these settings here, pls feel free to explore: https://github.com/NVIDIA/NeMo/tree/main/examples/speaker_tasks/diarization#run-speech-recognition-with-speaker-diarization. Very important to note that common setting might not be best for all kind of audio samples due to various backgrounds and noise level so use it accordingly. Above configuration does only clustering based diarization with single scale embeddings using VAD output from marblenet vad.
Also, It is very exciting to see your use case and ofcourse we have blazing light ASR models that can do inference with punctuations and capitalizations, could you give this model a try: https://huggingface.co/nvidia/parakeet-tdt_ctc-1.1b ?
I am looking to put together a space with above model and speaker diarization soon will keep it posted here.
We are working on improving RTF for ASR models even more, you can only expect models to get better in terms of both speed and accuracy.