WeRoBot icon indicating copy to clipboard operation
WeRoBot copied to clipboard

超时重发的消息会被识别为未知消息类型(即使是text)

Open W1ndys opened this issue 1 month ago • 0 comments

  • 对 Bug 的描述

    • 当前行为:超时重发的消息会被识别为未知消息类型(即使是text)
    • 正确的行为:识别正确
  • 环境

    • 平台:Windows
    • WeRoBot 版本号:werobot==1.13.1
    • Python 版本:Python 3.12.3
  • 复现代码或 repo 链接

# 脱敏后的代码文件
import os
import time
import threading
from datetime import datetime
from werobot import WeRoBot
from core.log_config import setup_logger  # 导入配置函数
from utils.logger import logger  # 导入干净的 logger
from core.config import (
    # TOKEN, # 已脱敏
    # HOST, # 已脱敏
    # PORT, # 已脱敏
    # SERVER, # 已脱敏
    # APP_ID, # 已脱敏
    # APP_SECRET, # 已脱敏
    LOGS_DIR,
    MAINTENANCE_MODE,
)
from utils.feishu import send_feishu_msg
from handlers.menu import MenuHandler

# 模拟从配置文件中读取的脱敏后变量
TOKEN = "YOUR_WECHAT_TOKEN"
APP_ID = "YOUR_APP_ID"
APP_SECRET = "YOUR_APP_SECRET"
HOST = "0.0.0.0"
PORT = 80
SERVER = "waitress"

# 在所有其他代码之前,首先配置日志,日志等级为debug,轮转10MB
# setup_logger("bot", level="DEBUG")
setup_logger("bot", logs_dir=str(LOGS_DIR), level="DEBUG")

from models.database import html_tasks, users
from models.database.qa_database import QADatabase
from models.database.exam_subscription import ExamSubscriptionDatabase
from models.database.score_subscription import ScoreSubscriptionDatabase
from models.database.average_scores import AverageScoresDatabase
from models.database.course_recommendation import CourseRecommendationDatabase
from models.database.message_cache import (
    MessageCacheDatabase,
    STATUS_PENDING,
    STATUS_COMPLETED,
    STATUS_FAILED,
)

# 初始化数据库
html_tasks.init_db()
users.UsersDatabase().init_db()
QADatabase().init_db()
ExamSubscriptionDatabase().init_db()
ScoreSubscriptionDatabase().init_db()

# 初始化消息缓存数据库
message_cache_db = MessageCacheDatabase()
message_cache_db.init_db()

# 消息缓存配置
CACHE_POLL_INTERVAL = 0.5  # 轮询间隔(秒)
CACHE_MAX_WAIT_TIME = 4  # 最大等待时间(秒),微信5秒超时,预留1秒给返回
CACHE_CLEANUP_INTERVAL = 60  # 缓存清理间隔(秒)

# 缓存清理时间记录(用于限制清理频率)
_last_cleanup_time = 0
_cleanup_lock = threading.Lock()

# 初始化平均分数据库,平均分数据来源依赖手动上传,检测表是否存在,如果不存在,发送飞书上报警告
AverageScoresDatabase().init_db()

# 初始化选课推荐数据库,选课推荐数据来源依赖手动上传,检测表是否存在,如果不存在,发送飞书上报警告
CourseRecommendationDatabase().init_db()

# 初始化空教室基准列表
from handlers.jwxt.free_classroom import initialize_baseline_classrooms

initialize_baseline_classrooms()

# 初始化公告
from utils.announcement import ANNOUNCEMENT_SWITCH, ANNOUNCEMENT


# 创建 WeRoBot 核心实例
robot = WeRoBot(token=TOKEN, app_id=APP_ID, app_secret=APP_SECRET)


def _process_message_in_background(message, message_id):
    """
    在后台线程中处理消息并更新缓存

    Args:
        message: WeRoBot消息对象
        message_id: 消息唯一标识符
    """
    try:
        # 执行实际的消息处理
        result = MenuHandler(robot).handle(message)
        if result:
            if ANNOUNCEMENT_SWITCH:
                result = result + "\n" + "━" * 10 + "\n" + "公告:" + ANNOUNCEMENT
        # 更新缓存为已完成状态
        message_cache_db.update_cache_response(message_id, result, STATUS_COMPLETED)
        logger.info(f"消息处理完成: message_id={message_id}")
    except Exception as e:
        error_msg = f'系统暂时出现了一点问题,请稍后再试或发送"联系作者"联系作者获取帮助,提交时请附上错误信息:{str(e)}\n\n发送"曲奇教务"查看可用命令,或者联系作者QQ XXXXXXXXXX 获取帮助。'
        message_cache_db.update_cache_response(message_id, error_msg, STATUS_FAILED)
        logger.error(f"后台处理消息失败: message_id={message_id}, error={e}")
        send_feishu_msg(
            "WeRoBot 服务错误", f"时间:{datetime.now()} \n处理消息时发生未知错误: {e}"
        )


def _wait_for_cache_result(message_id, max_wait_time=CACHE_MAX_WAIT_TIME):
    """
    轮询等待缓存结果

    Args:
        message_id: 消息唯一标识符
        max_wait_time: 最大等待时间(秒)

    Returns:
        处理结果字符串,如果超时则返回None
    """
    start_time = time.time()
    while time.time() - start_time < max_wait_time:
        # 使用原子操作获取并删除已完成的缓存,避免竞态条件
        response = message_cache_db.get_and_delete_completed_cache(message_id)
        if response is not None:
            return response
        time.sleep(CACHE_POLL_INTERVAL)
    return None


# 注册消息处理器,将所有文本消息都交由分发器处理
@robot.text
def handle_text_message(message):
    """
    处理所有文本消息

    使用消息缓存机制将容错时间扩大到15秒:
    1. 首次收到消息时,创建PENDING缓存并启动后台处理
    2. 重试请求到来时,检测处理是否完成
    3. 完成则返回结果并删除记录,未完成则返回空字符串触发微信重试

    注意:返回空字符串会触发微信服务器重试机制
    """
    try:
        message_id = str(message.message_id)
        user_id = message.source

        logger.info(
            f"收到消息: message_id={message_id}, user_id={user_id}, content='{message.content}'"
        )

        # 维护模式直接返回,无需缓存
        if MAINTENANCE_MODE:
            return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"

        # 定期清理过期缓存(限制清理频率,避免性能影响)
        global _last_cleanup_time
        current_time = time.time()
        with _cleanup_lock:
            if current_time - _last_cleanup_time >= CACHE_CLEANUP_INTERVAL:
                _last_cleanup_time = current_time
                try:
                    message_cache_db.cleanup_expired_cache()
                except Exception as cleanup_error:
                    logger.warning(
                        f"清理过期缓存失败(不影响正常处理): {cleanup_error}"
                    )

        # 检查是否有现有缓存
        cache = message_cache_db.get_message_cache(message_id)

        if cache:
            # 缓存存在
            if cache["status"] in (STATUS_COMPLETED, STATUS_FAILED):
                # 处理已完成,使用原子操作获取结果并删除缓存
                response = message_cache_db.get_and_delete_completed_cache(message_id)
                if response is not None:
                    logger.info(f"从缓存返回结果: message_id={message_id}")
                    return response
                # 如果原子操作返回None,可能是被其他请求处理了,重新轮询
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                logger.warning(f"获取缓存结果超时: message_id={message_id}")
                return ""
            else:
                # 处理中,轮询等待结果
                logger.info(f"消息处理中,开始轮询等待: message_id={message_id}")
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                logger.warning(f"等待消息处理超时: message_id={message_id}")
                return ""
        else:
            # 首次收到该消息,创建缓存并启动后台处理
            if message_cache_db.create_pending_cache(message_id, user_id):
                # 成功创建缓存,启动后台处理线程
                thread = threading.Thread(
                    target=_process_message_in_background,
                    args=(message, message_id),
                    daemon=True,
                )
                thread.start()
                logger.info(f"启动后台处理线程: message_id={message_id}")

                # 等待处理结果
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                logger.warning(f"首次处理消息超时: message_id={message_id}")
                return ""
            else:
                # 创建缓存失败(可能是并发创建),尝试等待结果
                logger.info(f"缓存创建失败,尝试等待结果: message_id={message_id}")
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                return ""

    except Exception as e:
        logger.error(f"处理消息时发生未知错误: {e}")
        send_feishu_msg(
            "WeRoBot 服务错误", f"时间:{datetime.now()} \n处理消息时发生未知错误: {e}"
        )
        return f'系统暂时出现了一点问题,请稍后再试或发送"联系作者"联系作者获取帮助,提交时请附上错误信息:{str(e)}'


@robot.image
def handle_image_message(message):
    """处理所有图片消息"""
    logger.info(f"收到图片消息: user_id={message.source}, img='{message.img}'")
    if MAINTENANCE_MODE:
        return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"
    return "图片消息已收到,但我还不能处理图片内容。"


@robot.subscribe
def subscribe_handler(message):
    return "欢迎关注曲奇教务,发送“曲奇教务”查看可用曲奇教务命令,或者联系作者QQ XXXXXXXXXX 获取帮助。"


@robot.handler
def default_handler(message):
    """处理所有未被其他处理器处理的消息"""
    logger.info(
        f"收到未知类型消息: user_id={message.source}, type='{message.type}', 原始内容='{message.raw}'"
    )
    if MAINTENANCE_MODE:
        return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"
    return (
        "无效的命令,请发送“曲奇教务”查看可用命令,或者联系作者QQ XXXXXXXXXX 获取帮助。"
    )


if __name__ == "__main__":
    send_feishu_msg(
        "WeRoBot 服务已启动", f"启动时间:{datetime.now()}\n运行路径:{os.getcwd()}"
    )

    try:
        if os.name == "nt":
            # 在 Windows 上,通常使用 waitress 作为生产服务器
            robot.run(host=HOST, port=PORT, server="waitress")
        else:
            # 在 Linux/macOS 上,可以使用默认的 werkzeug 或其他如 gunicorn
            robot.run(host=HOST, port=PORT, server=SERVER)
    except KeyboardInterrupt:
        logger.info("手动停止WeRoBot 服务")


  • 复现步骤

  • 其他信息

W1ndys avatar Nov 27 '25 01:11 W1ndys