chatgpt-on-wechat icon indicating copy to clipboard operation
chatgpt-on-wechat copied to clipboard

防撤回功能(已实现 文本、图片、视频、文件)

Open lets-crazy opened this issue 6 months ago • 19 comments

⚠️ 搜索是否存在类似issue

  • [X] 我已经搜索过issues和disscussions,没有发现相似issue

总结

防撤回:搞一个dict存起来,然后2分钟过期,检测如果有“撤回一条消息”提醒,就从刚刚的dict捞出来,发给指定人(我发给我的大号了) -指定人:通过获取朋友接口获取 -图片、视频、文件:是下载到本地的 -如果要发送到自己的“文件传输助手”的话,可以把它先添加到通信录

直接改的代码位置:channel/wechat/wechat_channel.py

  • 两个带@itchat.msg_register方法里面加了一行。调整多接收视频
  • 其他的代码往前往后都没改动

单聊撤回效果image

群里撤回效果image

添加/修改部分代码:图片只是示意,下面代码是最新的 image


//前面的毫无改动
from threading import Timer

# 定义一个dict,保存消息体
msg_dict = {}
# 定义一个list,保存要撤回的消息类型
# msg_type_listen = ['Text', 'Picture', 'Video', 'Attachment']
# 定义接收撤回消息的好友
target_friend = None
# 已接收消息的字典
receivedMsgs1 = {}
# 定义文件存储目录
download_directory = './downloads'

# 确保下载目录存在
if not os.path.exists(download_directory):
    os.makedirs(download_directory)

def get_revoke_msg_receiver():
    global target_friend  # 声明target_friend是全局变量
    friends = itchat.get_friends(update=True)
    for friend in friends:
        # 昵称和备注名称都可以作为判断依据,根据实际情况二选一。(如果是“文件传输助手”的话,需要先把他加为好友)
        #if friend['NickName'] == '大雨天':  # 替换为要发送到的好友的昵称  ( 和下面这行二选一)
        if friend['RemarkName'] == '大雨天666':  # 替换为要发送到的好友的备注
            target_friend = friend
            break
    return target_friend

# 捕获撤回消息的提醒,查找旧消息并回复
def revoke_msg(msg, is_group=False):
    match = re.search('撤回了一条消息', msg['Content'])
    if match:
        # 从撤回消息里提取被撤回的消息的msg_id
        old_msg_id = re.search(r"\<msgid\>(.*?)\<\/msgid\>", msg['Content']).group(1)
        # 判断被撤回消息的msg_id在不在已收取的消息里
        if old_msg_id in msg_dict.keys():
            old_msg = msg_dict[old_msg_id]
            if target_friend is None:
                get_revoke_msg_receiver()  # 更新全局变量target_friend
            # 原消息是文本消息
            if old_msg['Type'] == 'Text':
                old_msg_text = old_msg['Text']
                if is_group:
                    itchat.send(msg='群:【'+msg['User']['NickName']+'】的【'+msg['ActualNickName'] + '】刚刚发过这条消息:' + old_msg_text,
                                toUserName=target_friend['UserName'])
                else:
                    itchat.send(msg='【'+msg['User']['NickName'] + '】刚刚发过这条消息:' + old_msg_text,
                                toUserName=target_friend['UserName'])
                    # 原消息是需要下载的文件消息
            elif old_msg['Type'] in ['Picture', 'Video', 'Attachment']:
                # 发送文本消息给自己
                msg_type = {'Picture': '图片', 'Video': '视频', 'Attachment': '文件'}[
                    old_msg['Type']]
                if is_group:
                    itchat.send_msg(
                        msg=f'群:【{msg["User"]["NickName"]}】的【{msg["ActualNickName"]}】刚刚发过这条{msg_type}👇',
                        toUserName=target_friend['UserName'])
                else:
                    itchat.send_msg(msg=f'【{msg["User"]["NickName"]}】刚刚发过这条{msg_type}👇',
                                    toUserName=target_friend['UserName'])
                # 发送文件
                file_info = msg_dict[old_msg_id]['FileName']
                if old_msg['Type'] == 'Picture':
                    itchat.send_image(file_info, toUserName=target_friend['UserName'])
                elif old_msg['Type'] == 'Video':
                    itchat.send_video(file_info, toUserName=target_friend['UserName'])
                else:
                    itchat.send_file(file_info, toUserName=target_friend['UserName'])

# 定时清理过期消息
out_date_msg_dict = []
# 过期时间,正常是120秒,测试的时候可以少一点
out_date_time = 120
# 删除间隔时间
delete_cycle_time = 2

def delete_out_date_msg():
    # 遍历存储消息的dict里,找出过期的消息
    for m in msg_dict:
        current_time = time.time()
        current_time_int = int(current_time)
        if (current_time_int - msg_dict[m]['CreateTime']) > out_date_time:
            out_date_msg_dict.append(m)
    # 用已存储在list里的过期消息的key,删除dict里的消息
    for n in out_date_msg_dict:
        # 文本消息只要删掉dict里的消息
        if msg_dict[n]['Type'] == 'Text':
            msg_dict.pop(n)
        # 文件消息要额外删掉文件
        elif msg_dict[n]['Type'] in ['Picture', 'Video', 'Attachment']:
            file_path = msg_dict[n]['FileName']
            if os.path.exists(file_path):
                os.remove(file_path)
            msg_dict.pop(n)
    # 清空存储过期消息key的list,为下一次遍历做准备
    out_date_msg_dict.clear()
    t = Timer(delete_cycle_time, delete_out_date_msg)
    t.start()

delete_out_date_msg()

# 消息检查装饰器
def _msg_check(func):
    def wrapper(msg, is_group):
        msgId = msg['MsgId']
        if msgId in receivedMsgs1:
            logger.info("Wechat message {} already received, ignore".format(msgId))
            return
        receivedMsgs1[msgId] = True
        create_time = msg['CreateTime']  # 消息时间戳
        if int(create_time) < int(time.time()) - 60:  # 跳过1分钟前的历史消息
            logger.debug("[WX]history message {} skipped".format(msgId))
            return
        my_msg = msg["ToUserName"] == msg["User"]["UserName"] and \
                 msg["ToUserName"] != msg["FromUserName"]
        if my_msg and not is_group:
            logger.debug("[WX]my message {} skipped".format(msgId))
            return
        return func(msg, is_group)
    return wrapper

# 下载文件的函数
def download_files(msg):
    # 发送的文件的文件名(图片给出的默认文件名)都存储在msg的FileName键
    # 附件下载方法存储在msg的Text键中
    # 下载文件并保存到指定目录
    file_path = os.path.join(download_directory, msg['FileName'])
    msg['Text'](file_path)
    return '@%s@%s' % ({'Picture': 'img', 'Video': 'vid', 'Attachment': 'fil'}.get(msg['Type'], 'fil'), file_path)

# 消息撤回处理
@_msg_check
def just_revoke(msg, is_group=False):
    try:
        if msg["Type"] == 'Text':
            msg_dict[msg['MsgId']] = msg
        elif msg["Type"] in ['Picture', 'Video', 'Attachment']:
            # 存到字典
            msg_dict[msg['MsgId']] = msg
            # 使用 download_files 函数下载文件
            file_info = download_files(msg)
            msg_dict[msg['MsgId']]['FileName'] = file_info.split('@')[-1]
        elif msg["Type"] == 'Note':
            revoke_msg(msg, is_group)
    except Exception as e:
        logger.exception('防撤回异常:消息类型:{}'.format(msg["Type"]), e)

@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING, VIDEO])
def handler_single_msg(msg):
    try:
        just_revoke(msg, False)
        cmsg = WechatMessage(msg, False)
    except NotImplementedError as e:
        logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))
        return None
    WechatChannel().handle_single(cmsg)
    return None


@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING, VIDEO], isGroupChat=True)
def handler_group_msg(msg):
    try:
        just_revoke(msg, True)
        cmsg = WechatMessage(msg, True)
    except NotImplementedError as e:
        logger.debug("[WX]group message {} skipped: {}".format(msg["MsgId"], e))
        return None
    WechatChannel().handle_group(cmsg)
    return None


//后面的毫无改动

你需要调整的 image 你需要注意的 image

举例

No response

动机

感觉这个功能比较实用。 “语音”好像不支持,自己的语音也无法转发之类的,比较特殊,后续考虑直接转成文字来防撤回。

lets-crazy avatar Jul 30 '24 09:07 lets-crazy