chatgpt-on-wechat
chatgpt-on-wechat copied to clipboard
防撤回功能(已实现 文本、图片、视频、文件)
⚠️ 搜索是否存在类似issue
- [X] 我已经搜索过issues和disscussions,没有发现相似issue
总结
防撤回:搞一个dict存起来,然后2分钟过期,检测如果有“撤回一条消息”提醒,就从刚刚的dict捞出来,发给指定人(我发给我的大号了) -指定人:通过获取朋友接口获取 -图片、视频、文件:是下载到本地的 -如果要发送到自己的“文件传输助手”的话,可以把它先添加到通信录
直接改的代码位置:channel/wechat/wechat_channel.py
- 两个带@itchat.msg_register方法里面加了一行。调整多接收视频
- 其他的代码往前往后都没改动
单聊撤回效果:
群里撤回效果:
添加/修改部分代码:图片只是示意,下面代码是最新的
//前面的毫无改动
from threading import Timer
# 定义一个dict,保存消息体
msg_dict = {}
# 定义一个list,保存要撤回的消息类型
# msg_type_listen = ['Text', 'Picture', 'Video', 'Attachment']
# 定义接收撤回消息的好友
target_friend = None
# 已接收消息的字典
receivedMsgs1 = {}
# 定义文件存储目录
download_directory = './downloads'
# 确保下载目录存在
if not os.path.exists(download_directory):
os.makedirs(download_directory)
def get_revoke_msg_receiver():
global target_friend # 声明target_friend是全局变量
friends = itchat.get_friends(update=True)
for friend in friends:
# 昵称和备注名称都可以作为判断依据,根据实际情况二选一。(如果是“文件传输助手”的话,需要先把他加为好友)
#if friend['NickName'] == '大雨天': # 替换为要发送到的好友的昵称 ( 和下面这行二选一)
if friend['RemarkName'] == '大雨天666': # 替换为要发送到的好友的备注
target_friend = friend
break
return target_friend
# 捕获撤回消息的提醒,查找旧消息并回复
def revoke_msg(msg, is_group=False):
match = re.search('撤回了一条消息', msg['Content'])
if match:
# 从撤回消息里提取被撤回的消息的msg_id
old_msg_id = re.search(r"\<msgid\>(.*?)\<\/msgid\>", msg['Content']).group(1)
# 判断被撤回消息的msg_id在不在已收取的消息里
if old_msg_id in msg_dict.keys():
old_msg = msg_dict[old_msg_id]
if target_friend is None:
get_revoke_msg_receiver() # 更新全局变量target_friend
# 原消息是文本消息
if old_msg['Type'] == 'Text':
old_msg_text = old_msg['Text']
if is_group:
itchat.send(msg='群:【'+msg['User']['NickName']+'】的【'+msg['ActualNickName'] + '】刚刚发过这条消息:' + old_msg_text,
toUserName=target_friend['UserName'])
else:
itchat.send(msg='【'+msg['User']['NickName'] + '】刚刚发过这条消息:' + old_msg_text,
toUserName=target_friend['UserName'])
# 原消息是需要下载的文件消息
elif old_msg['Type'] in ['Picture', 'Video', 'Attachment']:
# 发送文本消息给自己
msg_type = {'Picture': '图片', 'Video': '视频', 'Attachment': '文件'}[
old_msg['Type']]
if is_group:
itchat.send_msg(
msg=f'群:【{msg["User"]["NickName"]}】的【{msg["ActualNickName"]}】刚刚发过这条{msg_type}👇',
toUserName=target_friend['UserName'])
else:
itchat.send_msg(msg=f'【{msg["User"]["NickName"]}】刚刚发过这条{msg_type}👇',
toUserName=target_friend['UserName'])
# 发送文件
file_info = msg_dict[old_msg_id]['FileName']
if old_msg['Type'] == 'Picture':
itchat.send_image(file_info, toUserName=target_friend['UserName'])
elif old_msg['Type'] == 'Video':
itchat.send_video(file_info, toUserName=target_friend['UserName'])
else:
itchat.send_file(file_info, toUserName=target_friend['UserName'])
# 定时清理过期消息
out_date_msg_dict = []
# 过期时间,正常是120秒,测试的时候可以少一点
out_date_time = 120
# 删除间隔时间
delete_cycle_time = 2
def delete_out_date_msg():
# 遍历存储消息的dict里,找出过期的消息
for m in msg_dict:
current_time = time.time()
current_time_int = int(current_time)
if (current_time_int - msg_dict[m]['CreateTime']) > out_date_time:
out_date_msg_dict.append(m)
# 用已存储在list里的过期消息的key,删除dict里的消息
for n in out_date_msg_dict:
# 文本消息只要删掉dict里的消息
if msg_dict[n]['Type'] == 'Text':
msg_dict.pop(n)
# 文件消息要额外删掉文件
elif msg_dict[n]['Type'] in ['Picture', 'Video', 'Attachment']:
file_path = msg_dict[n]['FileName']
if os.path.exists(file_path):
os.remove(file_path)
msg_dict.pop(n)
# 清空存储过期消息key的list,为下一次遍历做准备
out_date_msg_dict.clear()
t = Timer(delete_cycle_time, delete_out_date_msg)
t.start()
delete_out_date_msg()
# 消息检查装饰器
def _msg_check(func):
def wrapper(msg, is_group):
msgId = msg['MsgId']
if msgId in receivedMsgs1:
logger.info("Wechat message {} already received, ignore".format(msgId))
return
receivedMsgs1[msgId] = True
create_time = msg['CreateTime'] # 消息时间戳
if int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
logger.debug("[WX]history message {} skipped".format(msgId))
return
my_msg = msg["ToUserName"] == msg["User"]["UserName"] and \
msg["ToUserName"] != msg["FromUserName"]
if my_msg and not is_group:
logger.debug("[WX]my message {} skipped".format(msgId))
return
return func(msg, is_group)
return wrapper
# 下载文件的函数
def download_files(msg):
# 发送的文件的文件名(图片给出的默认文件名)都存储在msg的FileName键
# 附件下载方法存储在msg的Text键中
# 下载文件并保存到指定目录
file_path = os.path.join(download_directory, msg['FileName'])
msg['Text'](file_path)
return '@%s@%s' % ({'Picture': 'img', 'Video': 'vid', 'Attachment': 'fil'}.get(msg['Type'], 'fil'), file_path)
# 消息撤回处理
@_msg_check
def just_revoke(msg, is_group=False):
try:
if msg["Type"] == 'Text':
msg_dict[msg['MsgId']] = msg
elif msg["Type"] in ['Picture', 'Video', 'Attachment']:
# 存到字典
msg_dict[msg['MsgId']] = msg
# 使用 download_files 函数下载文件
file_info = download_files(msg)
msg_dict[msg['MsgId']]['FileName'] = file_info.split('@')[-1]
elif msg["Type"] == 'Note':
revoke_msg(msg, is_group)
except Exception as e:
logger.exception('防撤回异常:消息类型:{}'.format(msg["Type"]), e)
@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING, VIDEO])
def handler_single_msg(msg):
try:
just_revoke(msg, False)
cmsg = WechatMessage(msg, False)
except NotImplementedError as e:
logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))
return None
WechatChannel().handle_single(cmsg)
return None
@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING, VIDEO], isGroupChat=True)
def handler_group_msg(msg):
try:
just_revoke(msg, True)
cmsg = WechatMessage(msg, True)
except NotImplementedError as e:
logger.debug("[WX]group message {} skipped: {}".format(msg["MsgId"], e))
return None
WechatChannel().handle_group(cmsg)
return None
//后面的毫无改动
你需要调整的
你需要注意的
举例
No response
动机
感觉这个功能比较实用。 “语音”好像不支持,自己的语音也无法转发之类的,比较特殊,后续考虑直接转成文字来防撤回。