bluetoothlover_doc
bluetoothlover_doc copied to clipboard
cd qemu-vexpress-a9
def check_env(self):
# 检查QEMU环境是否可用
def exec_cmd(cmd):
# 执行命令并返回输出文本
text = ''
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
for line in out.splitlines():
text += str(line, encoding="utf8") + '\r\n'
return text
# 获取QEMU版本信息,检查其是否包含"version"
result = exec_cmd('qemu-system-{} --version'.format(self.system))
check_ok = (result.find('version') != -1)
if check_ok:
self.env_version = result
return check_ok
def run(self):
# 运行QEMU
if self.check_env():
logger.info("QEMU environment check OK.") # 环境检查通过
logger.debug(self.env_version)
else:
logger.error("QEMU environment check FAILED.") # 环境检查失败
return
# 构建运行QEMU的命令
cmd = 'qemu-system-{} -nographic -M {} -kernel {}'.format(self.system, self.machine, self.elf_path)
if self.sd_path != "None":
cmd = cmd + ' -sd ' + self.sd_path
# 根据平台设置不同的创建子进程方法
if platform.system() == "Windows":
self.sub_proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
self.sub_proc = subprocess.Popen("exec " + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
shell=True)
event = threading.Event() # 创建事件对象,用于线程间通信
def run(self):
# 运行QEMU
if self.check_env():
logger.info("QEMU environment check OK.") # 环境检查通过
logger.debug(self.env_version)
else:
logger.error("QEMU environment check FAILED.") # 环境检查失败
return
# 构建运行QEMU的命令
cmd = 'qemu-system-{} -nographic -M {} -kernel {}'.format(self.system, self.machine, self.elf_path)
if self.sd_path != "None":
cmd = cmd + ' -sd ' + self.sd_path
# 根据平台设置不同的创建子进程方法
if platform.system() == "Windows":
self.sub_proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
self.sub_proc = subprocess.Popen("exec " + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
shell=True)
event = threading.Event() # 创建事件对象,用于线程间通信
bsp_board_info:
arch: arm
toolchain: arm-none-eabi-gcc
pre_build: |
scons -c
qemu-system-arm --version
build_cmd: |
scons -j8
post_build: |
scons --version
qemu-system-arm --version
run_cmd: qemu-system-arm -M vexpress-a9 -smp cpus=2 -kernel rtthread.bin -nographic -sd sd.bin
qemu_flag: true
pkg.tools.coremark:
kconfig:
- CONFIG_PKG_USING_COREMARK=y
- CONFIG_COREMARK_ITERATIONS=36000
ci_build_run_flag : true
buildcheckresult: "core_main" #检查编译的log中是否有匹配字
msh_cmd: |
ps
version
core_mark
msh_cmd_timeout: 60
checkresult: 'CoreMark 1.0' #检查执行过程中的log是否有匹配字
pkg.tools.coremark:
pre_build: |
scons -c
qemu-system-arm --version
kconfig:
- CONFIG_PKG_USING_COREMARK=y
- CONFIG_COREMARK_ITERATIONS=36000
ci_build_run_flag : true
buildcheckresult: "core_main" #检查编译的log中是否有匹配字
msh_cmd: |
ps
version
core_mark
msh_cmd_timeout: 60
checkresult: 'CoreMark 1.0' #检查执行过程中的log是否有匹配字
post_build: |
scons --version
qemu-system-arm --version
build_cmd: |
scons -j8
board_info中的prebuild,post_build可以替换
import subprocess
import threading
import time
import logging
import sys
import os
import shutil
import re
import multiprocessing
import yaml
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
class QemuManager:
def __init__(self, qemu_cmd, idle_timeout=5, checkresult=None):
"""
初始化QEMU管理器
:param qemu_cmd: QEMU启动命令
:param idle_timeout: 日志空闲超时时间(秒)
"""
self.qemu_cmd = qemu_cmd
self.idle_timeout = idle_timeout
self.qemu_process = None
self.log_thread = None
self.checkresult = checkresult
self.last_log_time = time.time()
self.logs = []
self.running = False
self.checkresult_found = False # 标记是否找到checkresult
def start_qemu(self):
"""启动QEMU进程"""
logger.info("Starting QEMU...")
self.qemu_process = subprocess.Popen(
self.qemu_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
bufsize=0,
text=True
)
self.running = True
logger.info("QEMU started successfully.")
def log_monitor(self):
"""监控QEMU输出日志"""
logger.info("Starting log monitor...")
while self.running:
line = self.qemu_process.stdout.readline()
if line:
line = line.strip()
self.logs.append(line)
self.last_log_time = time.time() # 更新最后日志时间
logger.info(f"QEMU Output: {line}") # 实时打印日志
# 检查是否包含checkresult
if self.checkresult and self.checkresult in line:
logger.info(f"Checkresult '{self.checkresult}' found in logs. Success...")
self.checkresult_found = True
#self.running = False # 停止监控
#break
else:
time.sleep(0.1)
def send_command(self, command):
"""向QEMU发送命令"""
if not self.running or not self.qemu_process:
logger.error("QEMU is not running.")
return False
logger.info(f"Sending command: {command}")
try:
self.qemu_process.stdin.write(command + "\n")
self.qemu_process.stdin.flush()
return True
except Exception as e:
logger.error(f"Failed to send command: {e}")
return False
def stop_qemu(self):
"""停止QEMU进程"""
if self.qemu_process:
logger.info("Stopping QEMU...")
self.running = False
self.qemu_process.terminate()
self.qemu_process.wait(timeout=5)
logger.info("QEMU stopped.")
def run(self,commands):
"""主运行逻辑"""
try:
# 启动QEMU
self.start_qemu()
# 启动日志监控线程
self.log_thread = threading.Thread(target=self.log_monitor, daemon=True)
self.log_thread.start()
# 等待QEMU启动完成
time.sleep(5)
for cmd in commands:
if not self.send_command(cmd):
break
time.sleep(2) # 命令之间间隔2秒
# 监控日志输出,超时退出
while self.running:
idle_time = time.time() - self.last_log_time
if idle_time > self.idle_timeout :
if not self.checkresult_found:
logger.info(f"No logs for {self.idle_timeout} seconds. ::error:: Exiting...")
else:
logger.info(f"No logs for {self.idle_timeout} seconds. ::check success:: Exiting...")
break
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("Script interrupted by user.")
except Exception as e:
logger.error(f"An error occurred: {e}")
finally:
self.stop_qemu()
def run_command(command, timeout=None):
"""运行命令并返回输出和结果"""
print(command)
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
return result.stdout, result.returncode
def check_output(output, check_string):
"""检查输出中是否包含指定字符串"""
flag = check_string in output
if flag == True:
print('find string ' + check_string)
else:
print('::error:: can not find string ' + check_string + output)
return flag
if __name__ == "__main__":
# QEMU启动命令
pre_build_commands = [
"scons -c"
]
post_build_command = "echo end"
build_command = "scons"
build_check_result = "core_main"
check_result = "CoreMark 1.0"
qemu_timeout_second = 50
qemu_command = (
"qemu-system-arm -M vexpress-a9 -smp cpus=2 -kernel rtthread.bin -nographic -sd sd.bin"
)
commands = ["version", "ps", "core_mark"]
# 创建QEMU管理器并运行
yml_files_content = []
bsp_root = os.getcwd()
print(bsp_root)
directory = os.path.join(bsp_root , '.ci/attachconfig')
if os.path.exists(directory):
for root, dirs, files in os.walk(directory):
for filename in files:
print(filename)
if filename.endswith('attachconfig.yml'):
file_path = os.path.join(root, filename)
print(file_path)
if os.path.exists(file_path):
try:
with open(file_path, 'r' ,encoding='utf-8') as file:
print('===========')
content = yaml.safe_load(file)
if content is None:
continue
yml_files_content.append(content)
except yaml.YAMLError as e:
print(f"::error::Error parsing YAML file: {e}")
continue
except Exception as e:
print(f"::error::Error reading file: {e}")
continue
for projects in yml_files_content:
for name, details in projects.items():
if(name == 'bsp_board_info'):
print(details)
pre_build_commands = details.get("pre_build").splitlines()
build_command = details.get("build_cmd").splitlines()
post_build_command = details.get("post_build").splitlines()
qemu_command = details.get("run_cmd")
if details.get("kconfig") is not None:
if details.get("buildcheckresult") is not None:
build_check_result = details.get("buildcheckresult")
if details.get("msh_cmd") is not None:
commands = details.get("msh_cmd").splitlines()
if details.get("checkresult") is not None:
check_result = details.get("checkresult")
print(commands)
print(check_result)
print(build_check_result)
print(qemu_command)
print(pre_build_commands)
#exit(1)
# 执行 pre_build 命令
for command in pre_build_commands:
output, returncode = run_command(command)
print(output)
if returncode != 0:
print(f"Pre-build command failed: {command}")
print(output)
# return
for command in build_command:
output, returncode = run_command(command)
print(output)
if returncode != 0 or not check_output(output, build_check_result):
print("Build failed or build check result not found")
print(output)
#return
print('==========')
for command in post_build_command:
output, returncode = run_command(command)
print(output)
if returncode != 0:
print(f"Post-build command failed: {post_build_command}")
print(output)
#return
print(qemu_command)
#exit(1)
qemu_manager = QemuManager(qemu_cmd=qemu_command, idle_timeout=qemu_timeout_second,checkresult=check_result)
qemu_manager.run(commands)