DeepLearningExamples icon indicating copy to clipboard operation
DeepLearningExamples copied to clipboard

[JASPER/Pytorch] JASPER NVIDIA DALI Preprocessing Librispeech

Open karanveersingh5623 opened this issue 2 years ago • 1 comments

Related to Model/Framework(s) or something else (describe)

Examples:

  • GNMT/PyTorch
  • AMP
  • Tensorflow 2.0
  • Jupyter notebooks
  • DALI version - 1.8.0

Is your feature request related to a problem? Please describe. Need to process more flac files in data processing to generate more Disk IOs . I have R740 Dell server with 88 cores , so the data-processing code has multi-processing enabled but all the cores get utilized and only 40 MBps IOs are getting served . Need to increase it to 2 GBps . If I lower the docker container memory to 4g (which is minimum required) , still the same and it takes more time

Describe the solution you'd like I have 2 x Nvidia A100 40GB Gpus in R740 server , I want to utilize it for data preprocessing as my CPU cores are not enough to increase the conversion speed and Disk IOs. Need to implement DALI code for GPUs for data processing script (convert_librispeech.py and preprocessing_utils.py)

Describe alternatives you've considered Please help me out with this DALI conversion as we need to showcase that Faster Preprocessing requires faster Storage Media and GPU . Please let me know if you need more information

Additional context Below is the python code which I am trying to run Nvidia DALI


#!/usr/bin/env python
import argparse
import os
import glob
import multiprocessing
import json
import librosa
import functools

import sox

from nvidia.dali import pipeline_def, fn
import pandas as pd

from preprocessing_utils import parallel_preprocess

parser = argparse.ArgumentParser(description='Preprocess LibriSpeech.')
parser.add_argument('--input_dir', type=str, required=True,
                    help='LibriSpeech collection input dir')
parser.add_argument('--dest_dir', type=str, required=True,
                    help='Output dir')
parser.add_argument('--output_json', type=str, default='./',
                    help='name of the output json file.')
parser.add_argument('-s','--speed', type=float, nargs='*',
                    help='Speed perturbation ratio')
parser.add_argument('--target_sr', type=int, default=None,
                    help='Target sample rate. '
                         'defaults to the input sample rate')
parser.add_argument('--overwrite', action='store_true',
                    help='Overwrite file if exists')
parser.add_argument('--parallel', type=int, default=multiprocessing.cpu_count(),
                    help='Number of threads to use when processing audio files')
args = parser.parse_args()

args.input_dir = args.input_dir.rstrip('/')
args.dest_dir = args.dest_dir.rstrip('/')

#@pipeline_def
def build_input_arr(input_dir):
    txt_files = glob.glob(os.path.join(input_dir, '**', '*.trans.txt'),
                          recursive=True)
    #print(txt_files)
    input_data = []
    for txt_file in txt_files:
        rel_path = os.path.relpath(txt_file, input_dir)
        with open(txt_file) as fp:
            for line in fp:
                fname, _, transcript = line.partition(' ')
                input_data.append(dict(input_relpath=os.path.dirname(rel_path),
                                       input_fname=fname+'.flac',
                                       transcript=transcript))
    #print(input_data)
    return input_data

#@pipeline_def
def preprocess_audio(data, input_dir, dest_dir, target_sr=None, speed=None,
               overwrite=True):
    speed = speed or []
    speed.append(1)
    speed = list(set(speed))  # Make uniqe
    #print(type(data['input_relpath']))
    input_fname = os.path.join(input_dir,
                               data['input_relpath'],
                               intdata['input_fname'])
    input_sr = sox.file_info.sample_rate(input_fname)
    target_sr = target_sr or input_sr

    os.makedirs(os.path.join(dest_dir, data['input_relpath']), exist_ok=True)

    output_dict = {}
    output_dict['transcript'] = data['transcript'].lower().strip()
    output_dict['files'] = []

    fname = os.path.splitext(data['input_fname'])[0]
    for s in speed:
        output_fname = fname + '{}.wav'.format('' if s==1 else '-{}'.format(s))
        output_fpath = os.path.join(dest_dir,
                                    data['input_relpath'],
                                    output_fname)

        if not os.path.exists(output_fpath) or overwrite:
            cbn = sox.Transformer().speed(factor=s).convert(target_sr)
            cbn.build(input_fname, output_fpath)

        file_info = sox.file_info.info(output_fpath)
        file_info['fname'] = os.path.join(os.path.basename(dest_dir),
                                          data['input_relpath'],
                                          output_fname)
        file_info['speed'] = s
        output_dict['files'].append(file_info)

        if s == 1:
            file_info = sox.file_info.info(output_fpath)
            output_dict['original_duration'] = file_info['duration']
            output_dict['original_num_samples'] = file_info['num_samples']

    return output_dict

@pipeline_def
def dali_parallel_preprocess(dataset, input_dir, dest_dir, target_sr, speed, overwrite):
    func = functools.partial(preprocess_audio, input_dir=input_dir, dest_dir=dest_dir,
            target_sr=target_sr, speed=speed, overwrite=overwrite)
    dataset = list((func, dataset))
    return dataset
#    dataset = list((func, dataset))
    #return func


print("[%s] Scaning input dir..." % args.output_json)
dataset = build_input_arr(input_dir=args.input_dir)
#print(type(dataset['input_relpath']))

pipe = dali_parallel_preprocess(batch_size=32,
                        num_threads=3,
                        device_id=0,
                        dataset=dataset,
                        input_dir=args.input_dir,
                        dest_dir=args.dest_dir,
                        target_sr=args.target_sr,
                        speed=args.speed,
                        overwrite=args.overwrite)
pipe.build()
dataset = pipe.run()
#print("[%s] Converting audio files..." % args.output_json)
#dataset = dali_parallel_preprocess(dataset=dataset,
#                              input_dir=args.input_dir,
#                              dest_dir=args.dest_dir,
#                              target_sr=args.target_sr,
#                              speed=args.speed,
#                              overwrite=args.overwrite)
#                              parallel=args.parallel)
#
print("[%s] Generating json..." % args.output_json)
df = pd.DataFrame(dataset, dtype=object)

# Save json with python. df.to_json() produces back slashed in file paths
dataset = df.to_dict(orient='records')
#print(dataset)
with open(args.output_json, 'w') as fp:
    json.dump(dataset, fp, indent=2)

And below is the error I am facing , (looks like functools.partial is not supported in DALI)

root@065b1c76dee3:/workspace/jasper# python /workspace/jasper/utils/convert_librispeech_dali.py --input_dir /workspace/jasper/datasets/LibriSpeech/train-other-500 --dest_dir /workspace/jasper/datasets/LibriSpeech/train-other-500-wav --output_json /workspace/jasper/datasets/LibriSpeech/librispeech-train-other-500-wav.json
[/workspace/jasper/datasets/LibriSpeech/librispeech-train-other-500-wav.json] Scaning input dir...
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/nvidia/dali/pipeline.py", line 538, in _build_graph
    outputs[i] = types.Constant(outputs[i], device="cpu")
  File "/opt/conda/lib/python3.6/site-packages/nvidia/dali/types.py", line 492, in Constant
    return ConstantNode(device, value, dtype, shape, layout, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/nvidia/dali/types.py", line 411, in ConstantNode
    actual_type = _type_from_value_or_list(value)
  File "/opt/conda/lib/python3.6/site-packages/nvidia/dali/types.py", line 400, in _type_from_value_or_list
    raise TypeError("Unexpected type: " + str(type(x)))
TypeError: Unexpected type: <class 'functools.partial'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/workspace/jasper/utils/convert_librispeech_dali.py", line 137, in <module>
    pipe.build()
  File "/opt/conda/lib/python3.6/site-packages/nvidia/dali/pipeline.py", line 704, in build
    self.start_py_workers()
  File "/opt/conda/lib/python3.6/site-packages/nvidia/dali/pipeline.py", line 675, in start_py_workers
    self._build_graph()
  File "/opt/conda/lib/python3.6/site-packages/nvidia/dali/pipeline.py", line 540, in _build_graph
    raise TypeError(f"Illegal output type. The output {i} is a `{type(outputs[i])}`. "
TypeError: Illegal output type. The output 0 is a `<class 'list'>`. Allowed types are ``DataNode`` and types convertible to `types.Constant` (numerical constants, 1D lists/tuple of numbers and ND arrays).

karanveersingh5623 avatar Dec 16 '21 08:12 karanveersingh5623

Hi @karanveersingh5623 ,

Sorry for a late reply.

afaik DALI uses custom pipelines in which you're permitted only to use DALI operations, so no sox. It might suffice to build a data loader that reads in .flac files with fn.decoders.audio and returns raw vectors of samples, and then save them to disk with scipy as .wav. It would be best to ask directly the DALI team . However, I still doubt that, with a fast enough disk, you'll be able to saturate IO.

alancucki avatar Mar 14 '22 17:03 alancucki