LiveTalking
LiveTalking copied to clipboard
”每个连接指定不同avatar和音色“怎么实现实时切换数字人
怎么实时切换数字人形象和音色
在app添加了 `async def switch_avatar(request): try: params = await request.json() sessionid = params.get('sessionid', 0) avatar_id = params.get('avatar_id', '')
if sessionid not in nerfreals:
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": "会话不存在"
})
)
# 获取当前的数字人实例
current_nerfreal = nerfreals[sessionid]
try:
# 创建新的opt副本,确保新实例使用正确的参数
new_opt = copy.deepcopy(opt)
new_opt.avatar_id = avatar_id
# 根据模型类型加载新的avatar资源
if opt.model == 'musetalk':
from musereal import load_avatar
new_avatar = load_avatar(avatar_id)
# 关键修改:创建新的MuseReal实例
from musereal import MuseReal
# 需要获取model,这里假设model是全局变量
new_nerfreal = MuseReal(new_opt, model, new_avatar)
elif opt.model == 'wav2lip':
from lipreal import load_avatar, LipReal
new_avatar = load_avatar(avatar_id)
new_nerfreal = LipReal(new_opt, model, new_avatar)
elif opt.model == 'ultralight':
from lightreal import load_avatar, LightReal
new_avatar = load_avatar(avatar_id)
new_nerfreal = LightReal(new_opt, model, new_avatar)
else:
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": f"不支持的模型类型: {opt.model}"
})
)
# 停止当前实例的渲染
quit_event = Event()
quit_event.set()
# 给旧实例一些时间清理资源
await asyncio.sleep(0.5)
# 替换nerfreals中的实例
nerfreals[sessionid] = new_nerfreal
# 更新全局avatar变量,以便新的会话也能使用
global avatar
avatar = new_avatar
logger.info(f"Session {sessionid} 成功切换数字人形象至: {avatar_id}")
return web.Response(
content_type="application/json",
text=json.dumps({
"code": 0,
"msg": "数字人形象切换成功"
})
)
except Exception as e:
logger.error(f'切换数字人形象失败: {str(e)}')
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": f"加载头像失败: {str(e)}"
})
)
except Exception as e:
logger.exception('处理切换数字人请求时发生异常:')
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": str(e)
})
)`
在HumanPlayer改了 `class HumanPlayer:
def __init__(self, nerfreal, format=None, options=None, timeout=None, loop=False, decode=True):
self.__thread: Optional[threading.Thread] = None
self.__thread_quit: Optional[threading.Event] = None
# examine streams
self.__started: Set[PlayerStreamTrack] = set()
self.__audio: Optional[PlayerStreamTrack] = None
self.__video: Optional[PlayerStreamTrack] = None
self.__audio = PlayerStreamTrack(self, kind="audio")
self.__video = PlayerStreamTrack(self, kind="video")
self.__container = nerfreal
def switch_container(self, new_container):
"""切换到新的数字人实例"""
# 停止当前容器的渲染
if self.__container is not None:
# 清理旧实例的资源
quit_event = Event()
quit_event.set()
# 等待旧线程结束
if self.__thread is not None:
self.__thread_quit.set()
self.__thread.join(timeout=1.0) # 等待最多1秒
self.__thread = None
# 设置新容器
self.__container = new_container
# 清空队列,防止旧数据影响新实例
for track in self.__started:
# 清空队列的方法,具体实现可能需要根据PlayerStreamTrack的实现调整
while not track._queue.empty():
try:
track._queue.get_nowait()
except queue.Empty:
break
# 重启工作线程
if self.__started:
self.__thread_quit = threading.Event()
self.__thread = threading.Thread(
name="media-player",
target=player_worker_thread,
args=(
self.__thread_quit,
asyncio.get_event_loop(),
self.__container,
self.__audio,
self.__video
),
)
self.__thread.start()
def notify(self, eventpoint):
self.__container.notify(eventpoint)
@property
def audio(self) -> MediaStreamTrack:
"""
A :class:`aiortc.MediaStreamTrack` instance if the file contains audio.
"""
return self.__audio
@property
def video(self) -> MediaStreamTrack:
"""
A :class:`aiortc.MediaStreamTrack` instance if the file contains video.
"""
return self.__video
# 其他方法保持不变`
在app再改 `async def switch_avatar(request): try: params = await request.json() sessionid = params.get('sessionid', 0) avatar_id = params.get('avatar_id', '')
if sessionid not in nerfreals:
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": "会话不存在"
})
)
# 获取当前的数字人实例
current_nerfreal = nerfreals[sessionid]
try:
# 创建新的opt副本
new_opt = copy.deepcopy(opt)
new_opt.avatar_id = avatar_id
new_opt.sessionid = sessionid
# 根据模型类型加载新的avatar和创建新实例
if opt.model == 'musetalk':
from musereal import load_avatar, MuseReal
new_avatar = load_avatar(avatar_id)
new_nerfreal = MuseReal(new_opt, model, new_avatar)
elif opt.model == 'wav2lip':
from lipreal import load_avatar, LipReal
new_avatar = load_avatar(avatar_id)
new_nerfreal = LipReal(new_opt, model, new_avatar)
elif opt.model == 'ultralight':
from lightreal import load_avatar, LightReal
new_avatar = load_avatar(avatar_id)
new_nerfreal = LightReal(new_opt, model, new_avatar)
else:
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": f"不支持的模型类型: {opt.model}"
})
)
# 替换nerfreals中的实例
nerfreals[sessionid] = new_nerfreal
# 更新全局avatar变量
global avatar
avatar = new_avatar
logger.info(f"Session {sessionid} 成功切换数字人形象至: {avatar_id}")
return web.Response(
content_type="application/json",
text=json.dumps({
"code": 0,
"msg": "数字人形象切换成功"
})
)
except Exception as e:
logger.error(f'切换数字人形象失败: {str(e)}')
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": f"加载头像失败: {str(e)}"
})
)
except Exception as e:
logger.exception('处理切换数字人请求时发生异常:')
return web.Response(
content_type="application/json",
text=json.dumps({
"code": -1,
"msg": str(e)
})
)`
数字人切换之后就是服务器不默认部署的数字人形象也就改了,我原来启动后打开端1用的是avatar1,在端2改到avatar2,那端1也就需要重新连接了数字人就换成了avatar2
你好,在HumanPlayer中添加的switch_container又在哪儿调用着呢,我没找到,感谢