nonebot_plugin_jmcomic icon indicating copy to clipboard operation
nonebot_plugin_jmcomic copied to clipboard

简单改了一下,批量/多任务并发/只下载不发送/私聊发送/任务状态查询等等

Open LoCCai opened this issue 8 months ago • 0 comments

import asyncio
import time
from pathlib import Path
from typing import Dict, List, Tuple , Optional
from contextvars import ContextVar, copy_context
from datetime import datetime
from importlib.metadata import version

from nonebot.log import logger
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageEvent
from nonebot.params import CommandArg
from nonebot.plugin import PluginMetadata
from nonebot.plugin.on import on_command
from nonebot.permission import SUPERUSER
from nonebot import get_plugin_config

from .config import Config, jm_config
from .utils import *
from src.utils.message_fx import *
try:
    __version__ = version("nonebot_plugin_jmcomic")
except Exception:
    __version__ = "0.0.0"

__plugin_meta__ = PluginMetadata(
    name="nonebot-plugin-jmcomic",
    description="下载禁漫并发送",
    usage="jm + id",
    homepage="https://github.com/zhulinyv/nonebot_plugin_jmcomic",
    type="application",
    supported_adapters={"~onebot.v11"},
    config=Config,
    extra={
        "author": "zhulinyv",
        "version": __version__,
    },
)

# 全局任务管理器
class TaskManager:
    def __init__(self):
        self.tasks: Dict[int, str] = {}
        self.contexts: Dict[int, dict] = {}
        self.active_tasks: Dict[int, asyncio.Task] = {}
        self.lock = asyncio.Lock()
        self.semaphore = asyncio.Semaphore(5)
        self.jm_data_dir = Path('/root/.cache/nonebot2/nonebot_plugin_jmcomic')

    async def add_task_context(self, id: int,bot: Bot, event: MessageEvent,send_msg: bool = False):
        async with self.lock:
            # 显式保存关键上下文信息
            self.contexts[id] = {
                'event': event,
                'bot': bot,  # 必须单独保存
                'send_msg': send_msg,
                'user_id': event.user_id,
                'context': copy_context(),
                'timestamp': datetime.now()
            }
    
    async def add_tasks(self, ids: List[int]):
        async with self.lock:
            for id in ids:
                if id not in self.tasks:
                    self.tasks[id] = 'pending'
    async def create_task(self, id: int, event: MessageEvent, send_msg: bool = False):
        async with self.lock:
            if id in self.tasks:
                return

            self.tasks[id] = 'pending'
            self.contexts[id] = {
                'bot': event.bot,
                'event': event,
                'send_msg': send_msg,
                'created_at': datetime.now()
            }
            task = asyncio.create_task(
                self.wrapped_download(id),
                context=copy_context()
            )
            self.active_tasks[id] = task
    async def update_status(self, id: int, status: str):
        async with self.lock:
            self.tasks[id] = status

    async def generate_report(self) -> str:
        async with self.lock:
            completed = [k for k, v in self.tasks.items() if v == 'completed']
            downloading = [k for k, v in self.tasks.items() if v == 'downloading']
            pending = [k for k, v in self.tasks.items() if v == 'pending']
            failed = [k for k, v in self.tasks.items() if v == 'failed']

            def format_ranges(ids: List[int]) -> str:
                if not ids:
                    return "无"
                ids = sorted(ids)
                ranges = []
                start = end = ids[0]
                for id in ids[1:]:
                    if id == end + 1:
                        end = id
                    else:
                        ranges.append(f"{start}-{end}" if start != end else str(start))
                        start = end = id
                ranges.append(f"{start}-{end}" if start != end else str(start))
                return ", ".join(ranges)

            total = len(self.tasks)
            done = len(completed) + len(failed)
            progress = f"总进度: {done}/{total} ({done/total*100:.1f}%)" if total else "无任务"

            report = [
                "当前下载任务状态:",
                f"🔄 下载中: {format_ranges(downloading)}",
                f"✅ 已完成: {format_ranges(completed)}",
                f"⏳ 待处理: {format_ranges(pending)}",
                f"❌ 失败: {format_ranges(failed)}",
                progress
            ]
            return "\n".join(report)
    
    async def check_file_status(self, id: int) -> str:
        file_path = self.jm_data_dir / f"{id}.pdf"
        if file_path.exists():
            return 'completed'
        return self.tasks.get(id, 'pending')
    
    async def wrapped_download(self, id: int):
        """带上下文封装的下载任务"""
        ctx = self.contexts.get(id, {})
        try:
            context = copy_context()
            context.run(current_event.set, ctx.get('event'))
            await context.run(download_and_send, id)
        except Exception as e:
            logger.error(f"任务执行异常: {id} - {str(e)}")
    
    async def release_task(self, id: int):
        async with self.lock:
            if id in self.active_tasks:
                del self.active_tasks[id]
            # 保留最近10个已完成任务的上下文
            if len(self.contexts) > 10:
                oldest_id = min(self.contexts.keys(), key=lambda k: self.contexts[k]['created_at'])
                del self.contexts[oldest_id]
task_manager = TaskManager()

# 命令处理器
jm = on_command("jm", aliases={"JM"}, priority=30, block=True, permission=SUPERUSER)
task_check = on_command("查看任务", aliases={"任务状态"}, priority=30, block=True, permission=SUPERUSER)

def parse_id_input(input_str: str) -> List[int]:
    ids = set()
    for part in input_str.split(','):
        part = part.strip()
        if '-' in part:
            try:
                start, end = map(int, part.split('-', 1))
                ids.update(range(min(start, end), max(start, end)+1))
            except ValueError:
                continue
        elif part.isdigit():
            ids.add(int(part))
    return sorted(ids)

@jm.handle()
async def handle_jm(bot: Bot, event: MessageEvent, msg: Message = CommandArg()):
    input_str = msg.extract_plain_text().strip()
    if not input_str:
        await jm.finish("请输入漫画ID或范围(例如:18 或 1-100,200-300)")

    ids = parse_id_input(input_str)
    if not ids:
        await jm.finish("输入格式错误,请使用数字ID或范围(例如:18 或 1-100,200-300)")

    await task_manager.add_tasks(ids)
    await jm.send(f"🎯 已添加 {len(ids)} 个下载任务,最大并发数5")

    # 创建下载任务并保存上下文
    for id in ids:
        await task_manager.add_task_context(id,bot, event)  # 保存上下文
        task = asyncio.create_task(
            run_with_context(download_and_send, id),  # 使用上下文包装器
            context=copy_context()
        )
        async with task_manager.lock:
            task_manager.active_tasks[id] = task
    # 等待执行完毕
    await jm.send("✅ 你 请求 の 禁漫 已 收到 ,七七 正在 努力 抓取 下载 ...\n请 耐心 等待 下载 完...\n或 发送 “/查看任务” 或者 “/查看进度” 给 七七 来 查看 下载 状态")
    # 创建后台监控任务
    asyncio.create_task(monitor_tasks(bot, event, ids))

async def run_with_context(coro_func, id: int):
    """带上下文执行的任务包装器"""
    ctx = task_manager.contexts.get(id, {}).get('context')
    if ctx:
        return await ctx.run(coro_func, id)
    return await coro_func(id)

async def monitor_tasks(bot: Bot, event: MessageEvent, ids: List[int]):
    """后台监控任务进度"""
    while True:
        await asyncio.sleep(5)
        async with task_manager.lock:
            remaining = [id for id in ids if task_manager.tasks.get(id) not in ('completed', 'failed')]
            
        if not remaining:
            report = await task_manager.generate_report()
            jm.send(f"⚠️ 七七 已经 执行 完 了 所有 下载 任务...\n{report}")
            break
        '''
        # 每30秒发送一次进度报告
        if time.time() - start_time >= 30:
            report = await task_manager.generate_report()
            await bot.send_private_msg(
                user_id=event.user_id,
                message=f"当前进度:\n{report}"
            )
            start_time = time.time()
        '''

@task_check.handle()
async def handle_task_check(bot: Bot, event: MessageEvent):
    report = await task_manager.generate_report()
    await task_check.send(report)

get_jm = on_command("获取jm", aliases={"下载jm","下载JM","获取JM"}, priority=30, block=True, permission=SUPERUSER)

@get_jm.handle()
async def handle_get_jm(bot: Bot, event: MessageEvent, msg: Message = CommandArg()):
    input_str = msg.extract_plain_text().strip()
    if not input_str:
        await get_jm.finish("⚠️ 告诉 七七 你 要 下载 哪些 禁漫\n(例如:18 或 1-100,200-300)")

    ids = parse_id_input(input_str)
    if not ids:
        await get_jm.finish("⚠️ 你 输入 的 格式 不对 哦 \n(例如:18 或 1-100,200-300)")

    # 检查文件存在性
    existing_files = []
    missing_ids = []
    
    for id in ids:
        file_path = task_manager.jm_data_dir / f"{id}.pdf"
        if file_path.exists():
            existing_files.append(id)
        else:
            missing_ids.append(id)

    # 发送已存在的文件(带上下文修复)
    for id in existing_files:
        try:
            # 直接发送
            await send_file_msg(bot, event, str(file_path), f"{id}.pdf")
        except ActionFailed:
            logger.error(f"文件发送失败: {id} - {str(e)}")

    # 处理缺失的文件(使用增强的任务管理)
    if missing_ids:
        await task_manager.add_tasks(missing_ids)
        await get_jm.send(f"🎯 发现{len(missing_ids)}个缺失文件,已加入下载队列")
        
        # 创建带上下文的下载任务
        for id in missing_ids:
            await task_manager.add_task_context(id,bot,event,True)  # 保存上下文
            task = asyncio.create_task(
                run_with_context(download_and_send,id),  # 使用上下文包装器
                context=copy_context()
            )
            async with task_manager.lock:
                task_manager.active_tasks[id] = task

# 修改下载逻辑增加自动状态检测
async def download_and_send(id: int, max_retries: int = 3):
    """重构后的下载执行函数"""
    try:
        # 从任务管理器获取保存的上下文
        ctx = task_manager.contexts.get(id)
        if not ctx:
            raise ValueError(f"任务 {id} 上下文丢失")

        bot = ctx['bot']
        event = ctx['event']
        send_msg = ctx.get('send_msg', False)

        file_path = task_manager.jm_data_dir / f"{id}.pdf"
        
        # 前置状态检查
        if await check_existing_file(id, file_path):
            return

        # 核心下载流程
        await execute_download(id, file_path, max_retries)

        # 后置处理
        if send_msg:
            await handle_file_delivery(bot, event, id, file_path)
        else:
            await task_manager.update_status(id, 'completed')
    except Exception as e:
        await handle_download_error(id, e, max_retries)
    finally:
        await task_manager.release_task(id)

async def check_existing_file(id: int, file_path: Path) -> bool:
    """文件存在性检查"""
    if file_path.exists():
        await task_manager.update_status(id, 'completed')
        logger.info(f"文件已存在,跳过下载: {id}")
        return True
    return False

async def execute_download(id: int, file_path: Path, max_retries: int):
    """执行下载核心逻辑"""
    retry_count = 0
    while retry_count < max_retries:
        try:
            await task_manager.update_status(id, 'downloading')
            
            async with task_manager.semaphore:
                if not file_path.exists():
                    await async_download_album(str(id))
                
                if not file_path.exists():
                    raise FileNotFoundError(f"文件未生成: {id}")
                
                return  # 下载成功

        except Exception as e:
            retry_count += 1
            if retry_count >= max_retries:
                raise
            await asyncio.sleep(2 ** retry_count)  # 指数退避

async def handle_file_delivery(bot: Bot, event: MessageEvent, id: int, file_path: Path):
    """文件发送处理"""
    try:
        file_uri = f"file:///{file_path}" if jm_config.jm_client else str(file_path)
        await send_file_msg(bot, event, file_uri, f"{id}.pdf")
        await task_manager.update_status(id, 'completed')
    except ActionFailed as e:
        logger.error(f"文件发送失败: {id} - {str(e)}")
        await task_manager.update_status(id, 'failed')
        raise
'''
async def send_file_msg(bot: Bot, event: MessageEvent, file_uri: str, filename: str):
    """封装文件发送逻辑"""
    try:
        await bot.upload_private_file(
            user_id=event.user_id,
            file=file_uri,
            name=filename,
        )
    except Exception as e:
        logger.error(f"文件发送异常: {filename} - {str(e)}")
        await task_manager.update_status(id, 'failed')
        raise
'''
async def handle_download_error(id: int, error: Exception, remaining_retries: int):
    """统一错误处理"""
    logger.error(f"下载任务失败: {id} - {str(error)}")
    
    if remaining_retries > 0:
        logger.info(f"准备重试任务: {id} (剩余尝试次数: {remaining_retries})")
        await asyncio.sleep(5)
        asyncio.create_task(download_and_send(id, remaining_retries-1))
    else:
        await task_manager.update_status(id, 'failed')
        # 通知用户
        ctx = task_manager.contexts.get(id)
        if ctx and ctx.get('event'):
            await ctx['bot'].send_private_msg(
                user_id=ctx['event'].user_id,
                message=f"下载任务 {id} 最终失败: {str(error)}"
            )

LoCCai avatar May 04 '25 08:05 LoCCai