iptv-api icon indicating copy to clipboard operation
iptv-api copied to clipboard

是否可以使用yt-dlp对直播源进行测速,而不是通过ffmpeg获取响应时间

Open hllkk opened this issue 1 year ago • 7 comments

原因是我发现使用ffmpeg获取响应速度很快,但是实际播放体验的时候效果不佳,具体表现为每个频道的前几个都卡,而有一部分不靠前的资源反倒很快,如果通过yt-dlp获取到url的真实速度,是不是就可以避免这种情况出现?

hllkk avatar Nov 16 '24 14:11 hllkk

的确很快而且不准

kool99988 avatar Nov 17 '24 16:11 kool99988

我将测试后考虑是否采用

Guovin avatar Nov 18 '24 01:11 Guovin

经测试,可行,比ffmpeg更快与更准确,基本实现秒播,预计下版本实现

Guovin avatar Nov 22 '24 10:11 Guovin

大佬赞赞哒

hllkk avatar Nov 24 '24 04:11 hllkk

贴一个改用yt-dlp来实现的代码供大佬参考,望大佬不要笑话。/utils/speed.py

from aiohttp import ClientSession, TCPConnector
from time import time
import asyncio
import re
from yt_dlp import YoutubeDL
from utils.config import config
from utils.tools import is_ipv6, add_url_info, remove_cache_info, get_resolution_value

async def get_speed(url, timeout=config.sort_timeout, proxy=None):
    """
    Get the speed of the URL
    """
    async with ClientSession(
        connector=TCPConnector(verify_ssl=False), trust_env=True
    ) as session:
        start = time()
        end = None
        try:
            async with session.get(url, timeout=timeout, proxy=proxy) as response:
                if response.status == 404:
                    return float("inf")
                content = await response.read()
                if content:
                    end = time()
                else:
                    return float("inf")
        except Exception as e:
            return float("inf")
        return int(round((end - start) * 1000)) if end else float("inf")


def get_video_info_with_ytdlp(url, timeout=config.sort_timeout):
    """
    Use yt-dlp to get video info
    """
    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'format': 'best',  # Get the best available format
        'socket_timeout': timeout,
    }
    try:
        with YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=False)
            resolution = f"{info['width']}x{info['height']}" if 'width' in info and 'height' in info else None
            return resolution, info.get("fps", None)
    except Exception as e:
        return None, None


async def check_stream_speed(url_info):
    """
    Check the stream speed using yt-dlp
    """
    try:
        url = url_info[0]
        resolution, fps = get_video_info_with_ytdlp(url)
        if resolution is None:
            return float("inf")
        url_info[2] = resolution
        return (url_info, fps or float("inf"))
    except Exception as e:
        print(e)
        return float("inf")


speed_cache = {}


async def get_speed_by_info(
    url_info, use_ytdlp, semaphore, ipv6_proxy=None, callback=None
):
    """
    Get the info with speed
    """
    async with semaphore:
        url, _, resolution, _ = url_info
        url_info = list(url_info)
        cache_key = None
        url_is_ipv6 = is_ipv6(url)
        if "$" in url:
            url, _, cache_info = url.partition("$")
            matcher = re.search(r"cache:(.*)", cache_info)
            if matcher:
                cache_key = matcher.group(1)
            url_show_info = remove_cache_info(cache_info)
        url_info[0] = url
        if cache_key in speed_cache:
            speed = speed_cache[cache_key][0]
            url_info[2] = speed_cache[cache_key][1]
            if speed != float("inf"):
                if url_show_info:
                    url_info[0] = add_url_info(url, url_show_info)
                return (tuple(url_info), speed)
            else:
                return float("inf")
        try:
            if ipv6_proxy and url_is_ipv6:
                url_speed = 0
                speed = (url_info, url_speed)
            elif use_ytdlp:
                speed = await check_stream_speed(url_info)
                url_speed = speed[1] if speed != float("inf") else float("inf")
                if url_speed == float("inf"):
                    url_speed = await get_speed(url)
                resolution = speed[0][2] if speed != float("inf") else None
            else:
                url_speed = await get_speed(url)
                speed = (
                    (url_info, url_speed) if url_speed != float("inf") else float("inf")
                )
            if cache_key and cache_key not in speed_cache:
                speed_cache[cache_key] = (url_speed, resolution)
            if url_show_info:
                speed[0][0] = add_url_info(speed[0][0], url_show_info)
            speed = (tuple(speed[0]), speed[1])
            return speed
        except:
            return float("inf")
        finally:
            if callback:
                callback()


async def sort_urls_by_speed_and_resolution(
    data, use_ytdlp=False, ipv6_proxy=None, callback=None
):
    """
    Sort by speed and resolution
    """
    semaphore = asyncio.Semaphore(20)
    response = await asyncio.gather(
        *(
            get_speed_by_info(
                url_info, use_ytdlp, semaphore, ipv6_proxy=ipv6_proxy, callback=callback
            )
            for url_info in data
        )
    )
    valid_response = [res for res in response if res != float("inf")]

    def combined_key(item):
        (_, _, resolution, _), response_time = item
        resolution_value = get_resolution_value(resolution) if resolution else 0
        return (
            -(config.response_time_weight * response_time)
            + config.resolution_weight * resolution_value
        )

    sorted_res = sorted(valid_response, key=combined_key, reverse=True)
    return sorted_res

liulei120 avatar Nov 26 '24 03:11 liulei120

@liulei120 好的,感谢你的分享

Guovin avatar Nov 26 '24 05:11 Guovin

提醒一下大佬,提供代码的同学抓取的信息依然是fps,而这个fps的信息貌似与链接的真实效果有出入,正常应该是抓取yt-dlp中的速率哪里,我采用的是如下代码,仅供参考,目前觉得还是有点瑕疵: async def test_url_speed_with_shell(url): """ 测试url速度 :param url: 直播源 :return: """ docker = check_platform() # 获取yt-dlp路径 yt_path = 'yt-dlp' if docker else os.path.join(settings.BASE_DIR, '.venv/bin', 'yt-dlp')

url = url.strip()
# 生成唯一的ID来确保每次都生成新的缓存文件
unique_id = str(uuid.uuid4())
output_file = os.path.join(TEMP_DIR, f'output_{unique_id}.ts')
archive_file = os.path.join(TEMP_DIR, f'new-archive_{unique_id}.txt')

command = [
    yt_path,
    '--ignore-config',
    '--no-cache-dir',
    '--verbose',
    '--output', output_file,
    '--download-archive', archive_file,
    '--external-downloader', 'ffmpeg',
    '--external-downloader-args', 'ffmpeg:-t 5',
    url
]
process = None
response = None
# 执行命令并捕获输出
try:
    process = await asyncio.create_subprocess_exec(*command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    # 等待进程输出,超时时间为10秒
    output, err = await asyncio.wait_for(process.communicate(), timeout=settings.TIMEOUT + 8)
    if output:
        response = output.decode('utf-8')
        match = re.search(r'at\s*\d+\.\d+', response)
        if match:
            response = match.group(0).split('at ')[-1].strip()
    return response
except asyncio.TimeoutError:
    logger.error(f"测试 {url} 速度超时")
    if process:
        process.kill()
    return None
except Exception as e:
    logger.error(f"测试 {url} 速度失败: {e}")
    if process:
        process.kill()
    return None
finally:
    if os.path.exists(output_file):
        os.remove(output_file)
    if os.path.exists(archive_file):
        os.remove(archive_file)

hllkk avatar Nov 26 '24 12:11 hllkk

已实现,请更新v1.5.4

Guovin avatar Nov 29 '24 09:11 Guovin