diart icon indicating copy to clipboard operation
diart copied to clipboard

Real Time Diarization for Streaming Audio Chunks in Custom ASR Pipeline

Open sprath9 opened this issue 9 months ago • 4 comments

I have a custom streaming pipeline with a VAD setup that triggers ASR processing only when speech is detected on a small chunk. The pipeline operates in a streaming fashion, processing audio chunks sequentially from the client.

In Diart, it seems we need to provide a file path, microphone input, or websocket for audio input. Is there a way to integrate Diart directly into my pipeline, allowing me to pass audio chunks to the diarization module and receive results in real-time? Maintaining speaker consistency across chunks is crucial, as each new chunk shouldn't be treated as a separate audio session.

I attempted to modify the AudioSource class in source.py, experimenting with custom inputs and code adjustments, but I couldn't achieve the desired results.

Could you kindly guide me on how to implement this? If possible, I would greatly appreciate a code snippet to help clarify the approach. From what I understand, the solution likely involves customizing the AudioSource class.

Thanks

sprath9 avatar Feb 21 '25 17:02 sprath9

Hi @sprath9! you can always use the standalone SpeakerDiarization pipeline (see here).

The class is stateful and you can send audio chunks directly. StreamingInference actually started as a quick way to run the typical streaming scenarios without having to build all the glue around it, but the core of streaming diarization is implemented in SpeakerDiarization.

Here's a quick example:

import numpy as np
from pyannote.core import SlidingWindow, SlidingWindowFeature
from diart import SpeakerDiarization

pipeline = SpeakerDiarization()  # pass a SpeakerDiarizationConfig to customize

# Obtain and format audio
sample_rate = pipeline.config.sample_rate
audio = np.random.randn(sample_rate * pipeline.config.duration, 1)
sliding_window = SlidingWindow(
    start=audio_start_time,
    duration=1.0 / sample_rate,
    step=1.0 / sample_rate,
)
audio = SlidingWindowFeature(audio, sliding_window)

# `audio_chunk` is the part of `audio` corresponding to `annotation`
annotation, audio_chunk = pipeline([audio])[0]

Notice that for this to work you have to send chunks of duration pipeline.config.duration with a step of pipeline.config.step. The default configuration is a duration of 5s with a step of 500ms.

juanmc2005 avatar Feb 25 '25 09:02 juanmc2005

Hi @juanmc2005

Thank you for your valuable inputs!

I tried the approach you suggested, but it seems the system is unable to maintain speaker consistency.

I receive 30-second audio chunks at a time, which I need to process through diarization. Each chunk starts from 0 seconds, and I also need to maintain the state across chunks. To simulate this, I tested with two 5-second audio chunks from different speakers, passing them through the same pipeline.

Ideally, when processing audio1, the system should assign it to Speaker 1, and when processing audio2, it should be assigned to Speaker 2. However, after running both, I’m still getting Speaker 1 at the end for both the audio_chunk, which is incorrect.

Would appreciate any insights you might have on resolving this!

import numpy as np
from pyannote.core import SlidingWindow, SlidingWindowFeature
from diart import SpeakerDiarization

pipeline = SpeakerDiarization()  # pass a SpeakerDiarizationConfig to customize

# Obtain and format audio
sample_rate = pipeline.config.sample_rate

audio_path1 = "test.wav"
audio_path2 = "abc.wav"

audio1 = read_wav_file(audio_path1)
audio2 = read_wav_file(audio_path2)

sliding_window = SlidingWindow(
    start=0,
    duration=1.0 / sample_rate,
    step=1.0 / sample_rate,
)

audio1 = SlidingWindowFeature(audio1, sliding_window)
audio2 = SlidingWindowFeature(audio2, sliding_window)

# `audio_chunk` is the part of `audio` corresponding to `annotation`
annotation1, audio_chunk1 = pipeline([audio1])[0]
annotation2, audio_chunk2 = pipeline([audio2])[0]

print(annotation1)
print(annotation2)
Image Image

sprath9 avatar Feb 26 '25 06:02 sprath9

hey @juanmc2005 Did you get the time to look at this? I'm stuck in between!

Thanks for your time

sprath9 avatar Mar 03 '25 08:03 sprath9

As I mentioned in my initial comment:

Notice that for this to work you have to send chunks of duration pipeline.config.duration with a step of pipeline.config.step. The default configuration is a duration of 5s with a step of 500ms.

When you call the pipeline several times, it assumes each consecutive chunk is of the same duration (5s duration by default) and sharing 90% of the audio with the previous chunk (500ms step by default).

If you send two consecutive chunks that are non-overlapping, then diart will not work correctly. This is part of the reason why we provide StreamingInference, so that all of this configuration is automatically handled for you.

You can take a look at rearrange_audio_stream() for an example with rx here, but basically it looks like this:

  1. Accumulate audio from the stream until you reach a total of 5 seconds
  2. Send the 5s buffer to the pipeline
  3. Remove the earliest 500ms from the buffer
  4. Go back to (1)

juanmc2005 avatar May 30 '25 08:05 juanmc2005