python-sounddevice
python-sounddevice copied to clipboard
Advice on asynchronous playback
I'm using your module to provide "audio feedback" for a digital sensor. The idea is I have a sensor with one-dimensional time-series data being read in at 40 Hz. I have a target value that I want the sensor to read. If the sensor is close to that target then a pure sine wave is played, if it's not then the pure tone is superposed with white noise of an amplitude proportional to the error. The audio and sensor reading is done asynchronously. I used your example of playing a sine wave and the asynchronous examples. What I've got actually works, but I don't fully understand the API and I'm certain I'm doing some very ugly stuff. Would just like a nudge in the right direction if that's ok! I have a minimal working example below, I should say that the actually callback function I'm using is a fair bit more complicated but this gives the gist.
import asyncio
import sounddevice as sd
from timeit import default_timer as timer
import numpy as np
#white noise
def white(N):
return np.random.randn(N)
start_idx = 0
async def play_audio(sensor):
loop = asyncio.get_event_loop()
event = asyncio.Event()
samplerate = sd.query_devices(1, 'output')['default_samplerate']
freq = 500.
def callback(outdata, frames, time, status):
if status:
print(status, file=sys.stderr)
global start_idx
y = sensor.data[-1]
t = (start_idx + np.arange(frames)) / samplerate
t = t.reshape(-1, 1)
sine_wave = 0.5 * np.sin(2 * np.pi * freq * t)
scale_factor = abs(sensor.target - y) / sensor.target
# if latest data is within threshold of target play pure tone
if y > sensor.target - sensor.threshold and y < sensor.target + sensor.threshold:
outdata[:] = sine_wave
# else play noisy pure tone with amplitude
# proportional to error
else:
noisy_wave = scale_factor * white(len(t)).reshape(-1,1)
noisy_wave += sine_wave
outdata[:] = noisy_wave[:frames]
start_idx += frames
stream = sd.OutputStream(device=1, channels=1, callback=callback,
samplerate=samplerate)
with stream:
await event.wait()
class Sensor():
def __init__(self):
self.start_time = timer()
self.target = 5.
self.threshold = 0.2
self.sample_frequency = 40.
self.input_period = 10
self.data = [0]
async def read(self):
while True:
time = timer()-self.start_time
self.data.append(self.input(time))
await asyncio.sleep(1./self.sample_frequency)
# mimics the input that the sensor would read with
# shifted sine wave of period `self.input_period`
def input(self, time):
arg = 2 * np.pi * time / self.input_period
return self.target + np.sin(arg)
sensor = Sensor()
# read sensor data and play audio feedback
async def main():
try:
async with asyncio.timeout(10):
await asyncio.gather(sensor.read(), play_audio(sensor))
except TimeoutError:
print("Done")
if __name__ == "__main__":
asyncio.run(main())