我参考作者的实现方式自己实现了GaussianTalker接入进来,为什么asyncio.Queue会一直阻塞住呢?各位懂异步编程的可以帮帮我吗
我参考作者的实现思路,实现了Player和Tracker,下面是我的简单demo,但我就是不明白为什么在Tracker的recv那里,异步Queue被阻塞了,希望各位帮我看下,不胜感谢: '''python import time, av import aiohttp import logging import uvicorn import asyncio import threading import multiprocessing from aiortc import RTCPeerConnection, RTCSessionDescription import fractions, asyncio from typing import ( Set, Tuple, Union, Optional, ) from loguru import logger from av.frame import Frame from av.packet import Packet from aiortc import MediaStreamTrack AUDIO_PTIME = 0.020 # 20ms audio packetization VIDEO_CLOCK_RATE = 90000 VIDEO_PTIME = 1 / 25 # 30fps VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE) SAMPLE_RATE = 48000 AUDIO_TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)
def player_worker_thread( quit_event, loop, linker, audio_track, video_track ): linker.render_forever(quit_event, loop, audio_track, video_track)
class PlayerStreamTrack(MediaStreamTrack):
def __init__(self, player, kind):
super().__init__()
self.kind = kind
self._player = player
self._queue = asyncio.Queue()
_start: float
_timestamp: int
async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
if self.readyState != 'live':
raise Exception
if self.kind == 'video':
if hasattr(self, '_timestamp'):
self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE)
if wait > 0:
await asyncio.sleep(wait)
else:
self._start = time.time()
self._timestamp = 0
# print('video start:', self._start)
return self._timestamp, VIDEO_TIME_BASE
else:
if hasattr(self, '_timestamp'):
self._timestamp += int(AUDIO_PTIME * SAMPLE_RATE)
wait = self._start + (self._timestamp / SAMPLE_RATE) - time.time()
if wait > 0:
await asyncio.sleep(wait)
else:
self._start = time.time()
self._timestamp = 0
# print('audio start:', self._start)
return self._timestamp, VIDEO_TIME_BASE
async def recv(self) -> Union[Frame, Packet]:
self._player._start(self)
frame = await self._queue.get()
print(11111, self.kind, frame)
if frame is None:
self.stop()
raise Exception
pts, time_base = await self.next_timestamp()
frame.pts = pts
frame.time_base = time_base
print(self.kind, self._queue.qsize())
return frame
def stop(self):
super().stop()
if self._player is not None:
self._player._stop(self)
self._player = None
class HumanPlayer:
def __init__(self, linker):
self.__thread: Optional[threading.Thread] = None
self.__thread_quit: Optional[threading.Event] = None
self.__started: Set[PlayerStreamTrack] = set()
self.__audio: Optional[PlayerStreamTrack] = None
self.__video: Optional[PlayerStreamTrack] = None
self.__audio = PlayerStreamTrack(self, 'audio')
self.__video = PlayerStreamTrack(self, 'video')
self.__linker = linker
@property
def audio(self) -> MediaStreamTrack:
return self.__audio
@property
def video(self) -> MediaStreamTrack:
return self.__video
def _start(self, track: PlayerStreamTrack) -> None:
self.__started.add(track)
if self.__thread is None:
self.__log_debug('Starting worker thread')
self.__thread_quit = threading.Event()
self.__thread = threading.Thread(
name='media-player',
target=player_worker_thread,
args=(
self.__thread_quit,
asyncio.new_event_loop(),
self.__linker,
self.__audio,
self.__video
)
)
self.__thread.start()
time.sleep(2)
def _stop(self, track: PlayerStreamTrack) -> None:
self.__started.discard(track)
if not self.__started and self.__thread is not None:
self.__log_debug("Stopping worker thread")
self.__thread_quit.set()
self.__thread.join()
self.__thread = None
if not self.__started and self.__linker is not None:
self.__linker = None
def __log_debug(self, msg: str, *args) -> None:
logger.debug(f'HumanPlayer {msg}', *args)
class GaussianTalkerLinker:
def __init__(self):
con = av.open('1.mov')
self.audio_generator = [f for f in con.decode(audio=0)]
self.audio_idx = 0
con = av.open('1.mov')
self.video_generator = [f for f in con.decode(video=0)]
self.video_idx = 0
def get_audio(self):
if self.audio_idx >= len(self.audio_generator):
self.audio_idx = 0
frame = self.audio_generator[self.audio_idx]
self.audio_idx += 1
return frame
def get_video(self):
if self.video_idx >= len(self.video_generator):
self.video_idx = 0
frame = self.video_generator[self.video_idx]
self.video_idx += 1
return frame
def render_forever(
self,
quit_event,
loop=None,
audio_track: PlayerStreamTrack = None,
video_track: PlayerStreamTrack = None):
while not quit_event.is_set():
self.run_step(loop, audio_track, video_track)
if video_track._queue.qsize() >= 5:
time.sleep(0.04*video_track._queue.qsize()*0.8)
print('linker thread stop')
def run_step(self, loop=None, audio_track=None, video_track=None):
for i in range(2):
new_frame = self.get_audio()
asyncio.run_coroutine_threadsafe(audio_track._queue.put(new_frame), loop)
# 模型推理
new_frame = self.get_video()
asyncio.run_coroutine_threadsafe(video_track._queue.put(new_frame), loop)
async def post(url,data): try: async with aiohttp.ClientSession() as session: async with session.post(url,data=data) as response: return await response.text() except aiohttp.ClientError as e: print(f'Error: {e}')
pcs = set()
async def run(push_url): pc = RTCPeerConnection() pcs.add(pc) @pc.on("connectionstatechange") async def on_connectionstatechange(): print("Connection state is %s" % pc.connectionState) if pc.connectionState == "failed": await pc.close() pcs.discard(pc) player = HumanPlayer(linker) audio_sender = pc.addTrack(player.audio) video_sender = pc.addTrack(player.video) await pc.setLocalDescription(await pc.createOffer()) answer = await post(push_url, pc.localDescription.sdp) await pc.setRemoteDescription(RTCSessionDescription(sdp=answer, type='answer'))
if name == 'main': linker = GaussianTalkerLinker() multiprocessing.set_start_method('spawn') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run('http://192.168.31.19:1985/rtc/v1/whip/?app=live&stream=livestream')) loop.run_forever() '''
异步编程实在搞不定呀,它们不都是一个loop的吗,为什么在recv那里的queue.get会阻塞住呀?
实例化的哪个地方改改
GaussianTalker速度和效果怎么样
是哪个实例化的哪个地方改改是哪个地方实例化的问题呢
同问解决了嘛