NeMo icon indicating copy to clipboard operation
NeMo copied to clipboard

MSDD inference is too slow

Open SagyHarpazGong opened this issue 1 year ago • 46 comments

I run the MSDD model on Nvidia A10 (24GB), but the inference is too slow, I looked on the code and there is a lot of traffic between the CPU and GPU and vice versa.

most of the time GPU utilization is on 0%

First the data is split into short segments according to the number of scales (I have 5 scales). After each scale splitting the embedding extraction is applied and save the embedding to pkl file. Then the the clustering is applied and finally the MSDD is applied.

Is there something that can be done in order to speed up the inference? Is there any flag for parallelism the embedding extraction stage?

please help.

SagyHarpazGong avatar Jul 24 '23 20:07 SagyHarpazGong

What do you mean by parallelism of embedding extraction when you are inferencing on single GPU?

nithinraok avatar Jul 25 '23 17:07 nithinraok

This is very recent issue we also discovered. It's not the MSDD but TitaNet embedding extractor is taking a lot of time. I will look into it and get back soon.

tango4j avatar Jul 25 '23 17:07 tango4j

Hi @nithinraok , thanks for your response. What ai meant is that there is a for loop that run the segmentation and the embedding extraction. But as I saw the bottleneck of the execution time is the traffic between the GPU and the CPU, and of course the writing and reading files (e.g. pkl files)

SagyHarpazGong avatar Jul 25 '23 17:07 SagyHarpazGong

Thanks @tango4j yes I also think so

SagyHarpazGong avatar Jul 25 '23 17:07 SagyHarpazGong

You can skip writing pkl files as well, have you tried disabling saving pickle files through config?

nithinraok avatar Jul 25 '23 18:07 nithinraok

Of course but the msdd use them so if I disable the saving of the pkl files I get FileNotFoundError

SagyHarpazGong avatar Jul 25 '23 18:07 SagyHarpazGong

This issue is happening only for MSDD diarizer, not for clustering diarizer. I suppose something related to yaml setting is causing this. Let me get back to this soon.

tango4j avatar Jul 25 '23 18:07 tango4j

I want to use the msdd diarization

SagyHarpazGong avatar Jul 25 '23 18:07 SagyHarpazGong

@SagyHarpazGong Sure, let us work on this. Thanks...!

tango4j avatar Jul 25 '23 18:07 tango4j

@SagyHarpazGong I have found that it is getting slow if we use TitaNet ckpt in MSDD nemo file. A quick fix is adding the following:

 diarizer.speaker_embeddings.model_path="titanet_large" \
 diarizer.msdd_model.parameters.use_speaker_model_from_ckpt=False \

in the yaml config. This could lead to a small change in terms of performance. (could be better or worse). If you want quick fix, try this. Otherwise, it will take some more time to fix the speed with the speaker model from ckpt.

tango4j avatar Jul 26 '23 01:07 tango4j

@tango4j I checked as well and still slow, I'm really suspect that the reason for the slow inference is the utilization of the CPUs, most of the inference time the utilization of GPU is on 0%, and all I/O of file system is another reason for slow inference.

SagyHarpazGong avatar Jul 27 '23 18:07 SagyHarpazGong

@SagyHarpazGong If you do not see the difference between before and after you apply the configs that I suggested, then your code is likely to not reflecting the change on loading TitaNet parameters. Check your CUDA settings, and batch size for diarization inference. Make sure you are maximizing the GPU memory.

If it changes and speeds up, but the improvement is less then 30%, then please let us know.

tango4j avatar Jul 27 '23 18:07 tango4j

What CUDA settings should I need to check? By increasing the batch_size the memory used in the GPU is almost on the maximum and still it's slow

SagyHarpazGong avatar Jul 27 '23 18:07 SagyHarpazGong

@SagyHarpazGong Did you experience any relative improvement of speed after you apply use_speaker_model_from_ckpt=False ?

tango4j avatar Jul 27 '23 18:07 tango4j

@tango4j not at all

SagyHarpazGong avatar Jul 27 '23 18:07 SagyHarpazGong

@SagyHarpazGong I suspect that the changes in the configuration are not being reflected at all in your code base. Unfortunately, at this time, we don't have a solution for the slowdown issue on your machine with your setup.

Apart from this, I will update the NGC MSDD model checkpoint to resolve this slow down issue.

tango4j avatar Jul 27 '23 18:07 tango4j

@tango4j thanks, I'll try to share images of the nvidia-smi during the inference in order to show you that most of the time the utilization of the GPU is on 0%

SagyHarpazGong avatar Jul 27 '23 19:07 SagyHarpazGong

Hi all, I fixed the issue by inherit the classes: ClusteringDiarizer, ClusterEmbedding, NeuralDiarizer and modified them so instead of saving the embeddings in pkl files and load them for the MSDD inference , the embeddings are passing to the MSDD inference without using the file system they are in the GPU memory.

this is my implementation:

class ClustDiar(ClusteringDiarizer):
    def _extract_embeddings(self, manifest_file: str, scale_idx: int, num_scales: int):
        """
        This method extracts speaker embeddings from segments passed through manifest_file
        Optionally you may save the intermediate speaker embeddings for debugging or any use.
        """
        logging.info("Extracting embeddings for Diarization")
        self._setup_spkr_test_data(manifest_file)
        self.embeddings = {}
        self._speaker_model.eval()
        self.time_stamps = {}

        all_embs = torch.empty([0], device=self._speaker_model.device)
        for test_batch in tqdm(
            self._speaker_model.test_dataloader(),
            desc=f'[{scale_idx+1}/{num_scales}] extract embeddings',
            leave=True,
            disable=not self.verbose,
        ):
            test_batch = [x.to(self._speaker_model.device) for x in test_batch]
            audio_signal, audio_signal_len, labels, slices = test_batch
            with autocast():
                _, embs = self._speaker_model.forward(input_signal=audio_signal, input_signal_length=audio_signal_len)
                emb_shape = embs.shape[-1]
                embs = embs.view(-1, emb_shape)
                all_embs = torch.cat((all_embs, embs.detach()), dim=0)
            del test_batch

        with open(manifest_file, 'r', encoding='utf-8') as manifest:
            for i, line in enumerate(manifest.readlines()):
                line = line.strip()
                dic = json.loads(line)
                uniq_name = get_uniqname_from_filepath(dic['audio_filepath'])
                if uniq_name in self.embeddings:
                    self.embeddings[uniq_name] = torch.cat((self.embeddings[uniq_name], all_embs[i].view(1, -1)))
                else:
                    self.embeddings[uniq_name] = all_embs[i].view(1, -1)
                if uniq_name not in self.time_stamps:
                    self.time_stamps[uniq_name] = []
                start = dic['offset']
                end = start + dic['duration']
                self.time_stamps[uniq_name].append([start, end])

    def diarize(self, paths2audio_files: List[str] = None, batch_size: int = 0):
        """
        Diarize files provided thorugh paths2audio_files or manifest file
        input:
        paths2audio_files (List[str]): list of paths to file containing audio file
        batch_size (int): batch_size considered for extraction of speaker embeddings and VAD computation
        """

        self._out_dir = self._diarizer_params.out_dir

        self._speaker_dir = os.path.join(self._diarizer_params.out_dir, 'speaker_outputs')

        if os.path.exists(self._speaker_dir):
            logging.warning("Deleting previous clustering diarizer outputs.")
            shutil.rmtree(self._speaker_dir, ignore_errors=True)
        os.makedirs(self._speaker_dir)

        if not os.path.exists(self._out_dir):
            os.mkdir(self._out_dir)

        self._vad_dir = os.path.join(self._out_dir, 'vad_outputs')
        self._vad_out_file = os.path.join(self._vad_dir, "vad_out.json")

        if batch_size:
            self._cfg.batch_size = batch_size

        if paths2audio_files:
            if type(paths2audio_files) is list:
                self._diarizer_params.manifest_filepath = os.path.join(self._out_dir, 'paths2audio_filepath.json')
                self.path2audio_files_to_manifest(paths2audio_files, self._diarizer_params.manifest_filepath)
            else:
                raise ValueError("paths2audio_files must be of type list of paths to file containing audio file")

        self.AUDIO_RTTM_MAP = audio_rttm_map(self._diarizer_params.manifest_filepath)

        out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
        os.makedirs(out_rttm_dir, exist_ok=True)

        # Speech Activity Detection
        self._perform_speech_activity_detection()

        # Segmentation
        scales = self.multiscale_args_dict['scale_dict'].items()
        self.emb_scale_seq_dict = {}
        for scale_idx, (window, shift) in scales:

            # Segmentation for the current scale (scale_idx)
            self._run_segmentation(window, shift, scale_tag=f'_scale{scale_idx}')

            # Embedding Extraction for the current scale (scale_idx)
            self._extract_embeddings(self.subsegments_manifest_path, scale_idx, len(scales))

            self.emb_scale_seq_dict[scale_idx] = self.embeddings

            self.multiscale_embeddings_and_timestamps[scale_idx] = [self.embeddings, self.time_stamps]

        embs_and_timestamps = get_embs_and_timestamps(
            self.multiscale_embeddings_and_timestamps, self.multiscale_args_dict
        )

        # Clustering
        all_reference, all_hypothesis = perform_clustering(
            embs_and_timestamps=embs_and_timestamps,
            AUDIO_RTTM_MAP=self.AUDIO_RTTM_MAP,
            out_rttm_dir=out_rttm_dir,
            clustering_params=self._cluster_params,
            device=self._speaker_model.device,
            verbose=self.verbose,
        )
        logging.info("Outputs are saved in {} directory".format(os.path.abspath(self._diarizer_params.out_dir)))

        # Scoring
        return score_labels(
            self.AUDIO_RTTM_MAP,
            all_reference,
            all_hypothesis,
            collar=self._diarizer_params.collar,
            ignore_overlap=self._diarizer_params.ignore_overlap,
            verbose=self.verbose,
        )


class ClusEmb(ClusterEmbedding):
    """
    This class is built for calculating cluster-average embeddings, segmentation and load/save of the estimated cluster labels.
    The methods in this class is used for the inference of MSDD models.

    Args:
        cfg_diar_infer (DictConfig):
            Config dictionary from diarization inference YAML file
        cfg_msdd_model (DictConfig):
            Config dictionary from MSDD model checkpoint file

    Class Variables:
        self.cfg_diar_infer (DictConfig):
            Config dictionary from diarization inference YAML file
        cfg_msdd_model (DictConfig):
            Config dictionary from MSDD model checkpoint file
        self._speaker_model (class `EncDecSpeakerLabelModel`):
            This is a placeholder for class instance of `EncDecSpeakerLabelModel`
        self.scale_window_length_list (list):
            List containing the window lengths (i.e., scale length) of each scale.
        self.scale_n (int):
            Number of scales for multi-scale clustering diarizer
        self.base_scale_index (int):
            The index of the base-scale which is the shortest scale among the given multiple scales
    """
    def __init__(
        self, cfg_diar_infer: DictConfig, cfg_msdd_model: DictConfig, speaker_model: Optional[EncDecSpeakerLabelModel]
    ):
        super().__init__(cfg_diar_infer, cfg_msdd_model, speaker_model)
        self.cfg_diar_infer = cfg_diar_infer
        self._cfg_msdd = cfg_msdd_model
        self._speaker_model = speaker_model
        self.scale_window_length_list = list(
            self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.window_length_in_sec
        )
        self.scale_n = len(self.scale_window_length_list)
        self.base_scale_index = len(self.scale_window_length_list) - 1
        self.clus_diar_model = ClustDiar(cfg=self.cfg_diar_infer, speaker_model=self._speaker_model)

    def run_clustering_diarizer(self, manifest_filepath: str, emb_dir: str):
        """
        If no pre-existing data is provided, run clustering diarizer from scratch. This will create scale-wise speaker embedding
        sequence, cluster-average embeddings, scale mapping and base scale clustering labels. Note that speaker embedding `state_dict`
        is loaded from the `state_dict` in the provided MSDD checkpoint.

        Args:
            manifest_filepath (str):
                Input manifest file for creating audio-to-RTTM mapping.
            emb_dir (str):
                Output directory where embedding files and timestamp files are saved.

        Returns:
            emb_sess_avg_dict (dict):
                Dictionary containing cluster-average embeddings for each session.
            emb_scale_seq_dict (dict):
                Dictionary containing embedding tensors which are indexed by scale numbers.
            base_clus_label_dict (dict):
                Dictionary containing clustering results. Clustering results are cluster labels for the base scale segments.
        """
        self.cfg_diar_infer.diarizer.manifest_filepath = manifest_filepath
        self.cfg_diar_infer.diarizer.out_dir = emb_dir

        # Run ClusteringDiarizer which includes system VAD or oracle VAD.
        self._out_dir = self.clus_diar_model._diarizer_params.out_dir
        self.out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
        os.makedirs(self.out_rttm_dir, exist_ok=True)

        self.clus_diar_model._cluster_params = self.cfg_diar_infer.diarizer.clustering.parameters
        self.clus_diar_model.multiscale_args_dict[
            "multiscale_weights"
        ] = self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.multiscale_weights
        self.clus_diar_model._diarizer_params.speaker_embeddings.parameters = (
            self.cfg_diar_infer.diarizer.speaker_embeddings.parameters
        )
        cluster_params = self.clus_diar_model._cluster_params
        cluster_params = dict(cluster_params) if isinstance(cluster_params, DictConfig) else cluster_params.dict()
        clustering_params_str = json.dumps(cluster_params, indent=4)

        logging.info(f"Multiscale Weights: {self.clus_diar_model.multiscale_args_dict['multiscale_weights']}")
        logging.info(f"Clustering Parameters: {clustering_params_str}")
        scores = self.clus_diar_model.diarize(batch_size=self.cfg_diar_infer.batch_size)

        # If RTTM (ground-truth diarization annotation) files do not exist, scores is None.
        if scores is not None:
            metric, speaker_mapping_dict, _ = scores
        else:
            metric, speaker_mapping_dict = None, None

        # Get the mapping between segments in different scales.
        self._embs_and_timestamps = get_embs_and_timestamps(
            self.clus_diar_model.multiscale_embeddings_and_timestamps, self.clus_diar_model.multiscale_args_dict
        )
        session_scale_mapping_dict = self.get_scale_map(self._embs_and_timestamps)
        clus_labels = self.load_clustering_labels(emb_dir)
        emb_sess_avg_dict, base_clus_label_dict = self.get_cluster_avg_embs(
            self.clus_diar_model.emb_scale_seq_dict, clus_labels, speaker_mapping_dict, session_scale_mapping_dict
        )
        self.clus_diar_model.emb_scale_seq_dict['session_scale_mapping'] = session_scale_mapping_dict
        return emb_sess_avg_dict, self.clus_diar_model.emb_scale_seq_dict, base_clus_label_dict, metric


class NeuralDiar(NeuralDiarizer):
    def __init__(self, cfg: Union[DictConfig, NeuralDiarizerInferenceConfig]):
        super().__init__(cfg)
        self._cfg = cfg
        self._speaker_model = None
        self.msdd_model = None

        # Parameter settings for MSDD model
        self.use_speaker_model_from_ckpt = cfg.diarizer.msdd_model.parameters.get('use_speaker_model_from_ckpt', True)
        self.use_clus_as_main = cfg.diarizer.msdd_model.parameters.get('use_clus_as_main', False)
        self.max_overlap_spks = cfg.diarizer.msdd_model.parameters.get('max_overlap_spks', 2)
        self.num_spks_per_model = cfg.diarizer.msdd_model.parameters.get('num_spks_per_model', 2)
        self.use_adaptive_thres = cfg.diarizer.msdd_model.parameters.get('use_adaptive_thres', True)
        self.max_pred_length = cfg.diarizer.msdd_model.parameters.get('max_pred_length', 0)
        self.diar_eval_settings = cfg.diarizer.msdd_model.parameters.get(
            'diar_eval_settings', [(0.25, True), (0.25, False), (0.0, False)]
        )

        self._init_msdd_model(cfg)
        self.diar_window_length = cfg.diarizer.msdd_model.parameters.diar_window_length
        self.transfer_diar_params_to_model_params(self.msdd_model, cfg)

        if self.msdd_model is None:
            raise TypeError(f'The MSDD model is None')
        # Initialize clustering and embedding preparation instance (as a diarization encoder).
        self.clustering_embedding = ClusEmb(
            cfg_diar_infer=cfg, cfg_msdd_model=self.msdd_model.cfg, speaker_model=self._speaker_model
        )

        # Parameters for creating diarization results from MSDD outputs.
        self.clustering_max_spks = self.msdd_model.cfg.max_num_of_spks
        self.overlap_infer_spk_limit = cfg.diarizer.msdd_model.parameters.get(
            'overlap_infer_spk_limit', self.clustering_max_spks
        )

SagyHarpazGong avatar Jul 31 '23 21:07 SagyHarpazGong

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.

github-actions[bot] avatar Aug 31 '23 01:08 github-actions[bot]

This issue was closed because it has been inactive for 7 days since being marked as stale.

github-actions[bot] avatar Sep 07 '23 01:09 github-actions[bot]

Looks like not many people use MSDD. It is 2024 mid and Nemo inference is super slow for MSDD and still no action is taken on this

prkumar112451 avatar May 12 '24 14:05 prkumar112451

@SagyHarpazGong did your implementation help reduce the CPU-GPU bottleneck and improve performance speed?

nithinraok avatar May 13 '24 15:05 nithinraok

@prkumar112451 thanks for your comments, unfortunately we might have missed this or busy with other works, thank you for bringing this issue again.

nithinraok avatar May 13 '24 15:05 nithinraok

@prkumar112451 have you tried msdd? is this the same experience you encountered? what is the error have you seen? Please elaborate for us to solve the issue.

nithinraok avatar May 13 '24 15:05 nithinraok

@nithinraok Thanks so much for your response. Actually I am exploring AI models for diarization. Found Nemo and it has a decent accuracy but the problem is.. it is quite slow.

I have tried playing with almost every configuration available for us but still a 20 minutes call recording takes 1 minute to diarize. Also, on a P100 or T4 server, while processing 1 call the CPU spikes to 100% throughout the processing and GPU usage stays below 50% consistently.

I was hoping to recording the processing time of a 20 minute call to under 30 seconds (by half) but nothing worked.

The major problem with a 1 minute diarization time is.. how can we make this scalable. we would need lots of servers if we want to serve thousands of concurrent requests otherwise the delay will just keep on increasing.

My questions are :

  1. a 20 mins call takes 1 minute to transcribe. Is this delay expected?
  2. with such a huge delay, what is the best way to scale without using any NVIDIA paid server

prkumar112451 avatar May 13 '24 16:05 prkumar112451

I just ran to see the issue @SagyHarpazGong facing regarding GPU utilzation, however I notice that its currently using 70% of GPU as shown in below picture while using <20GB of GPU memory, so we couldn;t see the issue you are mentioning locally, and this is the configuration I am using:

MANIFEST_FILE=callhome_109.json'
python ./neural_diarizer/multiscale_diar_decoder_infer.py \
        --config-path='./../conf/inference' --config-name='diar_infer_telephonic.yaml' \
    diarizer.manifest_filepath=$MANIFEST_FILE \
    diarizer.out_dir='/data/diarization_ch109/' \
    diarizer.speaker_embeddings.model_path=$MODEL_PATH \
    diarizer.clustering.parameters.oracle_num_speakers=True \
    diarizer.oracle_vad=True \
    diarizer.ignore_overlap=False \
    diarizer.vad.model_path=null \
    diarizer.asr.model_path=null \
    diarizer.msdd_model.parameters.use_speaker_model_from_ckpt=False \
    diarizer.msdd_model.model_path=/data/diar_msdd_telephonic.nemo

image

nithinraok avatar May 13 '24 16:05 nithinraok

@prkumar112451 Thanks for detailed comments. Currently the way we improved accuracy of NeMo diarization is by using embeddings at multi-scales which I believe would be the issue for your 20min audio. There are ways to improve this.

First to answer them I would need some clarifications from your end,

  • Would be sticking to "clustering only" a viable option for you?
  • We can drastically improve speed by reducing the number of scales we use for getting embeddings at various resolutions but this comes at a cost of bit reduced accuracy, how are your accuracy requirements looking it?
  • Would it be possible to share a sample 20min audio you are using (if not that is totally fine) and exact settings you were using.

nithinraok avatar May 13 '24 16:05 nithinraok

@nithinraok Thanks for quick response.

the 20 min audio is a call center telephony conversation between a customer and an agent.

I am using a combination of Whisper for transcription and then Nemo for diarization. Taking this repo as reference -

https://github.com/piegu/language-models/blob/master/speech_to_text_transcription_with_speakers_Whisper_Transcription_%2B_NeMo_Diarization.ipynb?source=post_page-----8da2312f1617--------------------------------

we can see that lots of whisper optimization techniques are there like flash-attention, batching etc. And have been able to speed up whisper alot.

But the diarization part is acting as bottleneck. Just to be very sure, I completely removed the whisper part and ran a plain nemo's telephony based ai-model iar_msdd_telephonic but it's speed is 1 minute diarization time for 20 min call recording.

To answer your questions :

  1. any architecture that solves the use-case and reduce delay with okayish accuracy so that we can scale is good enough
  2. could you share the configuration name which we need to update to reduce the number of scales
  3. This is the configuration setting which we are using : https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/diar_infer_telephonic.yaml

prkumar112451 avatar May 13 '24 17:05 prkumar112451

Regarding the performace bottleneck of diarization, if you could tolerate some performance in accuracy, I would suggest you to try the clustering diarizer with single scale without msdd model, as shown in below config here:

MANIFEST_FILE='callhome_109.json'
python examples/speaker_tasks/diarization/clustering_diarizer/offline_diar_infer.py \
        --config-path='examples/speaker_tasks/diarization/conf/inference' --config-name='diar_infer_telephonic.yaml' \
    diarizer.manifest_filepath=$MANIFEST_FILE \
    diarizer.out_dir='/data/sample/' \
    diarizer.speaker_embeddings.model_path=${MODEL_PATH} \
    diarizer.speaker_embeddings.parameters.window_length_in_sec=1.5 \
    diarizer.speaker_embeddings.parameters.shift_length_in_sec=0.75 \
    diarizer.vad.model_path='vad_multilingual_marblenet' \
    diarizer.asr.model_path=null \
    diarizer.msdd_model.model_path=null \
    diarizer.oracle_vad=False \
    diarizer.clustering.parameters.oracle_num_speakers=False \
    batch_size=256 \
    num_workers=1

This setting would be fast, you may note that we could switch from external VAD to ASR VAD as well, so you could do ASR+SD in one go. We explained some of these settings here, pls feel free to explore: https://github.com/NVIDIA/NeMo/tree/main/examples/speaker_tasks/diarization#run-speech-recognition-with-speaker-diarization. Very important to note that common setting might not be best for all kind of audio samples due to various backgrounds and noise level so use it accordingly. Above configuration does only clustering based diarization with single scale embeddings using VAD output from marblenet vad.

Also, It is very exciting to see your use case and ofcourse we have blazing light ASR models that can do inference with punctuations and capitalizations, could you give this model a try: https://huggingface.co/nvidia/parakeet-tdt_ctc-1.1b ?

I am looking to put together a space with above model and speaker diarization soon will keep it posted here.

nithinraok avatar May 13 '24 18:05 nithinraok

We are working on improving RTF for ASR models even more, you can only expect models to get better in terms of both speed and accuracy.

nithinraok avatar May 13 '24 18:05 nithinraok