websockets icon indicating copy to clipboard operation
websockets copied to clipboard

Audio packet transmission behavior changed after upgrading from websockets 13.0 to 15.0.1

Open Pliosauroidea opened this issue 3 weeks ago • 1 comments

Description

Summary

After upgrading the websockets library from version 13.0 to 15.0.1, I'm experiencing different behavior when sending and receiving audio packets over WebSocket connections. The audio transmission that worked correctly in 13.0 now behaves unexpectedly in 15.0.1.

Image Image

Environment

websockets version (working): 13.0 websockets version (issue): 15.0.1 Python version: python3.12.12 Operating System: Linux/Ubuntu 22.04 Use case: Real-time audio streaming over WebSocket

Expected Behavior

In websockets 13.0, audio packets were transmitted and received reliably with consistent timing and no data loss.

Actual Behavior

After upgrading to websockets 15.0.1, the audio packet transmission exhibits different behavior:

  1. some packages are missed
  2. unexpected packages show up with random content

clue that might help:

  1. i'm sending those audio chunks quite fast
  2. and have no additional buffer on server side (i'm assuming there is an intern buffer)
  3. i'm using asyncio in other places on client side,can this cause what happened?
  4. each chunk is about 6k bytes,and i'm sending roughly 10 of them per second

code related:

client side

  # 启动音频节点
            self.audio_ws = await websockets.connect(
                f"ws://{audio_server['host']}:{audio_server['port']}",
                subprotocols=["binary"],
            )
            self.audio_input_manager = MCPAudioInputManager(
                AudioConfig(**config.input_audio_config), audio_ws=self.audio_ws
            )
            self.audio_output_manager = MCPAudioOutputManager(
                AudioConfig(**config.output_audio_config), audio_ws=self.audio_ws
            )
            await self.audio_output_manager.initialize()

            # 启动播放线程
            self.is_recording = True
            self.is_playing = True

            audio_player_thread = threading.Thread(target=self.sync_audio_player_thread)
            audio_player_thread.daemon = True
            audio_player_thread.start()

...

    async def _audio_player_thread(self):
        """音频播放线程"""
        while self.is_playing:
            try:
                # 从队列获取音频数据
                if self.is_analyzing_intent:  # 要先进行意图判定,才确定播放什么音频
                    await asyncio.sleep(0.1)
                    continue
                audio_data = self.audio_queue.get(timeout=0.05)
                if audio_data is not None:
                    # 第一次播放音频时回收计时
                    if self.is_timing:
                        latency = self.latency_timer.stop_timing()
                        self.is_timing = False

                        # 检查时延是否合理(大于0且小于10秒)
                        if 0 < latency < 10.0:
                            report = self.latency_timer.get_latency_report()
                            log_info(f"时延报告: {report}")
                        else:
                            log_warning(f"不合理的时延: {latency:.3f}s,已排除")

                    await self.audio_output_manager.write_audio(audio_data)
            except queue.Empty:
                # 队列为空时等待一小段时间
                await asyncio.sleep(0.1)
            except Exception as e:
                log_error(f"音频播放错误: {e}")
                await asyncio.sleep(0.1)

    def sync_audio_player_thread(self):
        """同步调用音频播放线程"""
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self._audio_player_thread())

...
(class OutputManager:)
 async def write_audio(self, audio_data: bytes):
        audio_data = self._amplify_audio(audio_data)
        resampled_audio_int16 = self._resample_audio(audio_data)
        try:

            header = self.session_id.to_bytes(4, byteorder="big")
            audio_message = header + resampled_audio_int16.tobytes()
            # audio_message = header  # TODO: 测试

            await self.audio_ws.send(audio_message)

            log_debug(
                f"发送音频数据, session_id: {self.session_id}, 数据长度: {len(resampled_audio_int16)}"
            )
            self.session_id += 1

        except Exception as e:
            log_error(f"RosPlayer: 发布音频数据时出错: {e}")
            import traceback

            traceback.print_exc()

server side

  async def serve_audio_ws(self):
        """
        发布音频websocket端口
        """
        audio_ws_host = self.config["audio_ws"]["host"]
        audio_ws_port = self.config["audio_ws"]["port"]

        rospy.loginfo(
            f"Starting Audio WebSocket server on ws://{audio_ws_host}:{audio_ws_port}"
        )

        await websockets.serve(
            self.handle_audio_messages,
            audio_ws_host,
            audio_ws_port,
            max_size=2**22,
            max_queue=500,
            write_limit=2**25,
        )
        await asyncio.Future()

    async def handle_audio_messages(self, websocket):
        """
        处理音频websocket连接
        """
        self.audio_ws = websocket
        rospy.loginfo(
            f"✓ Audio WebSocket connection established from {websocket.remote_address}"
        )

        try:
            while True:
                # 接收音频websocket数据
                async for data in websocket:
                    # data = await self.audio_ws.recv()
                    header, message = data[:4], data[4:]
                    header = int.from_bytes(header, byteorder="big")
                    print(f"header: {header}")
                # message = np.frombuffer(message, dtype=np.int16).tolist()
                # print(f"header: {header}, message len: {len(message)}")
                # if not self.audio_out:
                #     rospy.logwarn("Audio data topic not connected")
                #     continue
                # # 将字节数据转换为ROS消息
                # ros_msg = Int16MultiArray()
                # ros_msg.data = message

                # self.audio_out.publish(ros_msg)

        except Exception as e:
            rospy.logerr(f"处理音频数据时出错:{traceback.format_exc()}")
        finally:
            # 连接关闭时的处理
            self.audio_ws = None
            rospy.loginfo("✗ Audio WebSocket connection closed")

please guide me and confirm whether this is a bug,or anything i did was wrong

Pliosauroidea avatar Dec 05 '25 08:12 Pliosauroidea

some further test:

Image

Pliosauroidea avatar Dec 05 '25 09:12 Pliosauroidea

Running a second asyncio event loop in a separate thread sounds very sketchy. Indeed, asyncio is not thread-safe. I suspect unsafe calls between the two threads.

If you want two event loops to collaborate safely, objects must be owned by one of the two event loops, and any call from one event loop to the other must use call_soon_threadsafe. I will let you figure out if it's current_event_loop.call_soon_threadsafe or other_event_loop.call_soon_threadsafe :-)


In your code, you start a new thread:

            audio_player_thread = threading.Thread(target=self.sync_audio_player_thread)
            audio_player_thread.daemon = True
            audio_player_thread.start()

which starts a new event loop:

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self._audio_player_thread())

which eventually calls send() on an object created in the first event loop:

            await self.audio_ws.send(audio_message)

Don't do that, as mentioned in the FAQ.


As a quick mitigation, try switching back to the legacy implementation (as explained in the release notes for version 14.0), since your unsafe code appeared to work there.

aaugustin avatar Dec 06 '25 17:12 aaugustin

Running a second asyncio event loop in a separate thread sounds very sketchy. Indeed, asyncio is not thread-safe. I suspect unsafe calls between the two threads.

If you want two event loops to collaborate safely, objects must be owned by one of the two event loops, and any call from one event loop to the other must use call_soon_threadsafe. I will let you figure out if it's current_event_loop.call_soon_threadsafe or other_event_loop.call_soon_threadsafe :-)


In your code, you start a new thread:

            audio_player_thread = threading.Thread(target=self.sync_audio_player_thread)
            audio_player_thread.daemon = True
            audio_player_thread.start()

which starts a new event loop:

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self._audio_player_thread())

which eventually calls send() on an object created in the first event loop:

            await self.audio_ws.send(audio_message)

Don't do that, as mentioned in the FAQ.


As a quick mitigation, try switching back to the legacy implementation (as explained in the release notes for version 14.0), since your unsafe code appeared to work there.

appreciate greatly for your help!

Pliosauroidea avatar Dec 07 '25 01:12 Pliosauroidea