articles
articles copied to clipboard
批量处理crash脚本
在遇到大量crash结果时,分析crash结果,归类crash结果,有时会耗费大量的时间,对于我来说,每天上班前,运行一下该脚本,有新类型的crash就分析一下,提交report,节省了我大量的精力和时间
#encoding:utf-8
# ====================================================
# python: 3.5+
# 处理fuzz crash脚本
# 执行: python3 handle-crash.py -h 查看帮助选项
# example:
# python3.exe handle-crash.py -f D:\FuzzProgram.exe --post_fix dwrite -i D:\fuzz_output
# version: 0.5.20211228
# ====================================================
import os
import glob
import lief
import json
import time
import signal
import shutil
import logging
import datetime
import argparse
import subprocess
from functools import partial
from multiprocessing import Pool
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
options = {}
debug_mode = False
# 调试器位置
debugger_x64_origin = "C:\\Program Files (x86)\\Windows Kits\\10\Debuggers\\x64\\cdb.exe"
debugger_x64_move = "D:\\Windows Kits\\10\\Debuggers\\x64\\cdb.exe"
debugger_x86_origin= "C:\\Program Files (x86)\\Windows Kits\\10\\Debuggers\\x86\\cdb.exe"
debugger_x86_move = "D:\\Windows Kits\\10\\Debuggers\\x86\\cdb.exe"
debugger_x64 = ''
debugger_x86 = ''
result_dir = 'result_jsons'
def prepare_debugger():
global debugger_x86, debugger_x64
if os.path.exists(debugger_x64_origin):
debugger_x64 = debugger_x64_origin
else:
debugger_x64 = debugger_x64_move
if os.path.exists(debugger_x86_origin):
debugger_x86 = debugger_x86_origin
else:
debugger_x86 = debugger_x86_move
def sendmail(content=''):
sender = 'xx'
passwd = 'xx'
mail_host = 'xx'
receiver = 'xx'
message = MIMEMultipart()
message['Subject'] = "[Crash Handle Finish]"
message['From'] = sender
message.attach(MIMEText(content))
try:
s = smtplib.SMTP_SSL(mail_host, 465)
# s.set_debuglevel(2)
s.login(sender, passwd)
s.sendmail(sender, receiver, message.as_string())
s.quit()
except smtplib.SMTPException as e:
pass
pass
class Logger(object):
def __init__(self):
"""
initial
"""
# log_path = logPath
logging.addLevelName(20, "INFO:")
logging.addLevelName(30, "WARNING:")
logging.addLevelName(40, "FATAL:")
logging.addLevelName(50, "FATAL:")
logging.basicConfig(level=logging.DEBUG,
# format="%(levelname)s %(asctime)s %(filename)s %(message)s",
format="%(levelname)s %(asctime)s %(message)s",
datefmt="%m-%d %H:%M:%S",
filename='HandleCrash.log',
filemode="a")
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
# formatter = logging.Formatter("%(levelname)s %(asctime)s %(filename)s %(message)s")
formatter = logging.Formatter("%(levelname)s %(asctime)s %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
def debug(self, msg=""):
"""
output DEBUG level LOG
"""
logging.debug(str(msg))
def info(self, msg=""):
"""
output INFO level LOG
"""
logging.info(str(msg))
def warning(self, msg=""):
"""
output WARN level LOG
"""
logging.warning(str(msg))
def exception(self, msg=""):
"""
output Exception stack LOG
"""
logging.exception(str(msg))
def error(self, msg=""):
"""
output ERROR level LOG
"""
logging.error(str(msg))
def critical(self, msg=""):
"""
output FATAL level LOG
"""
logging.critical(str(msg))
logger = Logger()
def init_fuzzer():
"""
Pool worker initializer for keyboard interrupt on Windows
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
# 根据调用栈最后两个函数判定类型 (Windows)
def get_crash_type_win32(stdout):
# 获取到crash type
decode_error = False
stdout_lines = []
# 是否需要解码
if type(stdout) == str:
stdout_lines = stdout.split('\n')
else:
# 对应数据为bytes
# 第一次尝试使用utf-8解码
try:
stdout_lines = stdout.decode().split('\n')
except UnicodeDecodeError:
decode_error = True
except Exception as ex:
logger.error(ex)
return None
# 第二次尝试使用ISO-8859-1解码
if decode_error:
try:
stdout_lines = stdout.decode(encoding='ISO-8859-1').split('\n')
except Exception as ex:
logger.error(ex)
return None
i = -1
get_RetAddr = False
for crash_data in stdout_lines:
i += 1
if "RetAddr" in crash_data:
get_RetAddr = True
continue
# 处理第一个调用栈就出错的特殊情况
# 0:000> k
# ChildEBP RetAddr
# WARNING: Frame IP not in any known module. Following frames may be wrong.
# 00 0019d3f8 65245480 0x8f3600c
# 01 0019d964 65238ad8 ECompositeViewer!DllCanUnloadNow+0x66890
if "WARNING" in crash_data and get_RetAddr:
i += 1
break
if "WARNING" not in crash_data and get_RetAddr:
break
if i == len(stdout_lines):
return None
crash_type = None
# 得出最后两个调用栈作为划分类型
if len(stdout_lines) > i and len(stdout_lines[i].split()) >= 3:
crash_type = stdout_lines[i].split()[2]
if len(stdout_lines) > i+1 and len(stdout_lines[i+1].split()) >= 3:
crash_type += ' -> ' + stdout_lines[i+1].split()[2]
return crash_type
# 比较新产生的漏洞类型,将老的删除
def compare_crash_types(crash_types, output_root_dir):
global options
# 将以前存储的崩溃类型json文件打开
result_json_file = os.path.join(result_dir, options.post_fix + '.json')
result_json = None
total_crash_types = {}
repeat_files = []
old_crash_types = []
# 判断路径是否存在
if not os.path.exists(result_dir):
os.mkdir(result_dir)
if os.path.exists(result_json_file):
result_json = json.load(open(result_json_file, ))
old_crash_types = result_json['crash_types']
# 删除已经存在的crash_type
for crash_type in old_crash_types:
if crash_type in crash_types:
# 将重复的数据统一处理
repeat_files.extend(crash_types[crash_type])
# crash_types.remove(crash_type)
del crash_types[crash_type]
# 求两次的并集
total_crash_types['crash_types'] = list(set(old_crash_types).union(set(crash_types)))
# 将重复的文件统一移动到repeated目录
repeat_dir = os.path.join(output_root_dir, "repeated")
if not os.path.exists(repeat_dir) and len(repeat_files) > 0:
os.mkdir(repeat_dir)
for repeat_file in repeat_files:
if not os.path.exists(repeat_file):
continue
if os.path.isdir(repeat_file):
continue
base_name = os.path.basename(repeat_file)
output_file = os.path.join(repeat_dir, base_name)
shutil.move(repeat_file, output_file)
# 把新旧数据重新写入json
with open(result_json_file, 'w') as fp:
json.dump(total_crash_types, fp)
# 将crash_types写入文件
def handle_crash_types(crash_types):
global options
output_root_dir = options.input_directory
uniq_crash_type_file = 'uniq_crash_{}_{}.log'.format(
(datetime.datetime.now()).strftime("%Y-%m-%d"),
options.post_fix)
compare_crash_types(crash_types, output_root_dir)
fp = open(uniq_crash_type_file, 'w')
for crash_type in crash_types:
if not crash_type:
continue
fp.write(crash_type + ':\n')
# 创建各种漏洞类型目录
output_dir = crash_type.replace(" -> ", "__")
output_dir = output_dir.replace("<", "(")
output_dir = output_dir.replace(">", ")")
output_dir = output_dir.replace("::", "#")
output_dir = os.path.join(output_root_dir, output_dir)
# 判断目标目录是否存在
if not os.path.exists(output_dir):
os.mkdir(output_dir)
for crash_file in crash_types[crash_type]:
if not os.path.exists(crash_file):
continue
if os.path.isdir(crash_file):
continue
base_name = os.path.basename(crash_file)
output_file = os.path.join(output_dir, base_name)
shutil.move(crash_file, output_file)
fp.write(" [+] {}\n".format(output_file))
fp.close()
# windows多进程处理
def crash_on_windows(input_file, options, debugger):
input_file = input_file.strip()
# 不处理目录
if os.path.isdir(input_file):
return (None, None)
try:
# 将crash文件数据写入特定文件
specific_file = None
if options.specific_file:
file_name, file_ext = os.path.splitext(options.specific_file)
specific_file = '{}_{}{}'.format(file_name, str(os.getpid()), file_ext)
with open(input_file, 'rb') as f:
fp_specific = open(specific_file, 'wb')
crash_file_data = f.read()
fp_specific.write(crash_file_data)
fp_specific.close()
else:
specific_file = input_file
except Exception as ex:
logger.error("bypassing file " + input_file)
return (None, None, None)
if debugger == None:
logger.error("No debugger found!")
exit(1)
process_out = None
if options.fuzz_args:
command = [debugger,
# "-y",
# "srv*c:\symbols*https://msdl.microsoft.com/download/symbols",
# "srv*c:\symbols",
"-c",
"g;g;g;kp 10;q",
options.fuzz_program,
options.fuzz_args,
specific_file]
else:
command = [debugger,
"-y",
"srv*c:\symbols*https://msdl.microsoft.com/download/symbols",
# "srv*c:\symbols",
"-c",
"g;kp 10;q",
options.fuzz_program,
specific_file]
if debug_mode:
logger.info(' '.join(command))
try:
wait_time = int(options.wait_time)
process_out = subprocess.run(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=wait_time)
except subprocess.TimeoutExpired:
return ('timeout', input_file)
except Exception as ex:
logger.exception("subprocess run with file: " + input_file)
crash_type = None
if process_out != None:
crash_type = get_crash_type_win32(process_out.stdout)
return (crash_type, input_file)
def main():
prepare_debugger()
if options.fuzz_args == None: # input "<space> -r" if dash in arguments
options.fuzz_args = ''
# 获取所有输入文件
files = []
files.extend(glob.glob(os.path.join(options.input_directory, '*'), recursive=True))
if len(files) == 0:
logger.error("No input files")
exit(1)
# 确定测试程序位数
debugger = None
pe = lief.parse(options.fuzz_program)
pe_bits = pe.header.machine.name
if pe_bits == "I386":
debugger = debugger_x86
else:
debugger = debugger_x64
# 开始测试
index = 1
crash_types = {}
thread_nums = int(options.threads)
pool = Pool(thread_nums, init_fuzzer)
logger.info('start review crash...')
for result in pool.imap_unordered(partial(crash_on_windows, options=options, debugger=debugger), files):
crash_type, input_file = result
if crash_type == None or \
input_file == None:
continue
file_basename = os.path.basename(input_file)
dst_file = os.path.join(options.input_directory, file_basename)
result_info = '{} ({}) ### {}'.format(index, crash_type, dst_file)
logger.info(result_info)
if crash_type not in crash_types.keys():
crash_types[crash_type] = [input_file]
else:
crash_types[crash_type].append(input_file)
index += 1
time.sleep(2)
# 写结果
logger.info('write data to file...')
handle_crash_types(crash_types)
uniq_types_count = len(crash_types.keys())
sendmail("type1: {}: {}".format(options.post_fix, uniq_types_count))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--fuzz_program", help="fuzz program", required=True)
parser.add_argument("-a", "--fuzz_args", help="fuzz program arguments")
parser.add_argument("-i", "--input_directory", help="input files directory", required=True)
parser.add_argument("-t", "--threads", help="threads num", required=True)
parser.add_argument("-w", "--wait_time", help="windbg debug wait time", required=True)
parser.add_argument("--post_fix", help="output file post fix", required=True)
parser.add_argument("-s", "--specific_file", help="write crash file to a specific file")
options = parser.parse_args()
main()
result
文件就是常规的debugger
输出数据,可以作为对uniq
结果的参考,文件特别长,uniq
文件是最需要关注的,可以及时发现新出现的crash
,其中uniq
文件结果大概是这样的
data:image/s3,"s3://crabby-images/7d8f1/7d8f12830564306ddaf2595de06345a670d91b50" alt="WeChat3e6175fedf78b4c39b150d54a57e1f6c"
windows输出结果
整个处理流程还在优化中,遇到特殊的crash结果,会及时更新
20200528
添加BugId批量处理
20200623
添加libfuzzer crash处理
20200710
添加选项将crash_file写入指定文件进行debug
20211228
删除linux
相关支持,目前仅支持windows
,并优化多个崩溃处理逻辑