audio_common
audio_common copied to clipboard
ROS2 foxy: Problem when writing to wav file.
Hi,
I have a ROS2 node that collects an audio message, convert it to a numpy array, write it to wav file, load it back to memory, and passes it to a model to process it. The problem is that the numpy array before writing to wav and the one loaded back has a different shape, and when I open the written wav file, I can only hear noise. My code works fine when using ROS Noetic.
I capture the audio by the following command:
ros2 run audio_capture audio_capture_node
I don't know the format of the published audio when I try to run the following command, nothing shows up.
ros2 topic echo /audio_info
Here is the shortened version of my ROS2 node, which only do audio capture.
import wave
from queue import Queue
from threading import Thread
from tempfile import NamedTemporaryFile
import numpy as np
import rclpy
from rclpy.node import Node
from audio_common_msgs.msg import AudioData
class AudioCaptureNode(Node):
def __init__(
self,
input_audio_topic: str = "/audio",
sample_width: int = 2,
sample_rate: int = 16000,
):
super().__init__("audio_capture_node")
self.data_queue = Queue()
self.sample_width = sample_width
self.sample_rate = sample_rate
self.create_subscription(AudioData, input_audio_topic, self.callback, 1)
self.temp_file = "./temp.wav"
self.last_sample = b""
# Start processing thread
self.processing_thread = Thread(target=self.process_audio)
self.processing_thread.start()
def callback(self, data):
"""
Put data to queue.
"""
self.data_queue.put(data.data)
def _write_to_wav(self, numpy_data: np.ndarray):
"""
Write numpy array to wav file.
"""
# Write wav data to the temporary file.
with wave.open(self.temp_file, "wb") as f:
f.setnchannels(1)
f.setsampwidth(self.sample_width)
f.setframerate(self.sample_rate)
# Convert audio data to numpy array
f.writeframes(numpy_data.tobytes())
def process_audio(self):
"""
Process the audio data from the queue and publish the transcription.
"""
while rclpy.ok():
# Check if there is any data in the queue
if not self.data_queue.empty():
new_data = b""
while not self.data_queue.empty():
new_data += self.data_queue.get()
self.last_sample = new_data
# Convert audio data to numpy array
numpy_data = np.frombuffer(self.last_sample, dtype=np.int16)
self._write_to_wav(numpy_data)
def main(args=None):
rclpy.init(args=args)
# Create the node with the hardcoded parameters
node = AudioCaptureNode(sample_width=2, sample_rate=16000)
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
I am able to get the audio to wav file by updating the repo. But I still cannot get audio info.
audio_info
is latched topic, so it will be published only once.
Can't you subscribe the topic even once?