bluetoothlover_doc icon indicating copy to clipboard operation
bluetoothlover_doc copied to clipboard

cd qemu-vexpress-a9

Open supperthomas opened this issue 11 months ago • 5 comments

supperthomas avatar Feb 01 '25 12:02 supperthomas

    def check_env(self):
        # 检查QEMU环境是否可用
        def exec_cmd(cmd):
            # 执行命令并返回输出文本
            text = ''
            p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
            out, err = p.communicate()
            for line in out.splitlines():
                text += str(line, encoding="utf8") + '\r\n'
            return text

        # 获取QEMU版本信息,检查其是否包含"version"
        result = exec_cmd('qemu-system-{} --version'.format(self.system))
        check_ok = (result.find('version') != -1)
        if check_ok:
            self.env_version = result
        return check_ok

supperthomas avatar Feb 01 '25 12:02 supperthomas

    def run(self):
        # 运行QEMU
        if self.check_env():
            logger.info("QEMU environment check OK.")  # 环境检查通过
            logger.debug(self.env_version)
        else:
            logger.error("QEMU environment check FAILED.")  # 环境检查失败
            return

        # 构建运行QEMU的命令
        cmd = 'qemu-system-{} -nographic -M {} -kernel {}'.format(self.system, self.machine, self.elf_path)
        
        if self.sd_path != "None":
            cmd = cmd + ' -sd ' + self.sd_path

        # 根据平台设置不同的创建子进程方法
        if platform.system() == "Windows":
            self.sub_proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
        else:
            self.sub_proc = subprocess.Popen("exec " + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             shell=True)
        event = threading.Event()  # 创建事件对象,用于线程间通信

supperthomas avatar Feb 01 '25 12:02 supperthomas

    def run(self):
        # 运行QEMU
        if self.check_env():
            logger.info("QEMU environment check OK.")  # 环境检查通过
            logger.debug(self.env_version)
        else:
            logger.error("QEMU environment check FAILED.")  # 环境检查失败
            return

        # 构建运行QEMU的命令
        cmd = 'qemu-system-{} -nographic -M {} -kernel {}'.format(self.system, self.machine, self.elf_path)
        
        if self.sd_path != "None":
            cmd = cmd + ' -sd ' + self.sd_path

        # 根据平台设置不同的创建子进程方法
        if platform.system() == "Windows":
            self.sub_proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
        else:
            self.sub_proc = subprocess.Popen("exec " + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             shell=True)
        event = threading.Event()  # 创建事件对象,用于线程间通信

supperthomas avatar Feb 01 '25 12:02 supperthomas

bsp_board_info:
  arch: arm
  toolchain: arm-none-eabi-gcc
  pre_build: |
    scons -c
    qemu-system-arm --version
  build_cmd: |
    scons -j8
  post_build: |
    scons --version
    qemu-system-arm --version
  run_cmd: qemu-system-arm -M vexpress-a9 -smp cpus=2 -kernel rtthread.bin -nographic -sd sd.bin
  qemu_flag: true
pkg.tools.coremark:
  kconfig:
    - CONFIG_PKG_USING_COREMARK=y
    - CONFIG_COREMARK_ITERATIONS=36000
  ci_build_run_flag : true
  buildcheckresult: "core_main"              #检查编译的log中是否有匹配字
  msh_cmd: |
    ps
    version
    core_mark
  msh_cmd_timeout: 60
  checkresult: 'CoreMark 1.0' #检查执行过程中的log是否有匹配字
pkg.tools.coremark:
  pre_build: |
    scons -c
    qemu-system-arm --version
  kconfig:
    - CONFIG_PKG_USING_COREMARK=y
    - CONFIG_COREMARK_ITERATIONS=36000
  ci_build_run_flag : true
  buildcheckresult: "core_main"              #检查编译的log中是否有匹配字
  msh_cmd: |
    ps
    version
    core_mark
  msh_cmd_timeout: 60
  checkresult: 'CoreMark 1.0' #检查执行过程中的log是否有匹配字
  post_build: |
    scons --version
    qemu-system-arm --version
  build_cmd: |
    scons -j8

board_info中的prebuild,post_build可以替换

supperthomas avatar Feb 01 '25 13:02 supperthomas

import subprocess
import threading
import time
import logging
import sys
import os
import shutil
import re
import multiprocessing
import yaml


# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

class QemuManager:
    def __init__(self, qemu_cmd, idle_timeout=5, checkresult=None):
        """
        初始化QEMU管理器
        :param qemu_cmd: QEMU启动命令
        :param idle_timeout: 日志空闲超时时间(秒)
        """
        self.qemu_cmd = qemu_cmd
        self.idle_timeout = idle_timeout
        self.qemu_process = None
        self.log_thread = None
        self.checkresult = checkresult
        self.last_log_time = time.time()
        self.logs = []
        self.running = False
        self.checkresult_found = False  # 标记是否找到checkresult
    def start_qemu(self):
        """启动QEMU进程"""
        logger.info("Starting QEMU...")
        self.qemu_process = subprocess.Popen(
            self.qemu_cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True,
            bufsize=0,
            text=True
        )
        self.running = True
        logger.info("QEMU started successfully.")

    def log_monitor(self):
        """监控QEMU输出日志"""
        logger.info("Starting log monitor...")
        while self.running:
            line = self.qemu_process.stdout.readline()
            if line:
                line = line.strip()
                self.logs.append(line)
                self.last_log_time = time.time()  # 更新最后日志时间
                logger.info(f"QEMU Output: {line}")  # 实时打印日志
            # 检查是否包含checkresult
            if self.checkresult and self.checkresult in line:
                logger.info(f"Checkresult '{self.checkresult}' found in logs. Success...")
                self.checkresult_found = True
                #self.running = False  # 停止监控
                #break
            else:
                time.sleep(0.1)

    def send_command(self, command):
        """向QEMU发送命令"""
        if not self.running or not self.qemu_process:
            logger.error("QEMU is not running.")
            return False

        logger.info(f"Sending command: {command}")
        try:
            self.qemu_process.stdin.write(command + "\n")
            self.qemu_process.stdin.flush()
            return True
        except Exception as e:
            logger.error(f"Failed to send command: {e}")
            return False

    def stop_qemu(self):
        """停止QEMU进程"""
        if self.qemu_process:
            logger.info("Stopping QEMU...")
            self.running = False
            self.qemu_process.terminate()
            self.qemu_process.wait(timeout=5)
            logger.info("QEMU stopped.")



    def run(self,commands):
        """主运行逻辑"""
        try:
            # 启动QEMU
            self.start_qemu()

            # 启动日志监控线程
            self.log_thread = threading.Thread(target=self.log_monitor, daemon=True)
            self.log_thread.start()

            # 等待QEMU启动完成
            time.sleep(5)

            for cmd in commands:
                if not self.send_command(cmd):
                    break
                time.sleep(2)  # 命令之间间隔2秒

            # 监控日志输出,超时退出
            while self.running:
                idle_time = time.time() - self.last_log_time
                if idle_time > self.idle_timeout :
                    if not self.checkresult_found:
                        logger.info(f"No logs for {self.idle_timeout} seconds. ::error:: Exiting...")
                    else:
                        logger.info(f"No logs for {self.idle_timeout} seconds. ::check success:: Exiting...")
                    break
                time.sleep(0.1)

        except KeyboardInterrupt:
            logger.info("Script interrupted by user.")
        except Exception as e:
            logger.error(f"An error occurred: {e}")
        finally:
            self.stop_qemu()
def run_command(command, timeout=None):
    """运行命令并返回输出和结果"""
    print(command)
    result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
    return result.stdout, result.returncode

def check_output(output, check_string):
    """检查输出中是否包含指定字符串"""
    flag = check_string in output
    if flag == True:
        print('find string ' + check_string)
    else:
        print('::error:: can not find string ' + check_string + output)
    return flag

if __name__ == "__main__":
    # QEMU启动命令
    pre_build_commands = [
        "scons -c"
    ]
    post_build_command = "echo end"
    build_command = "scons"
    build_check_result = "core_main"
    check_result = "CoreMark 1.0"
    qemu_timeout_second = 50
    qemu_command = (
        "qemu-system-arm -M vexpress-a9 -smp cpus=2 -kernel rtthread.bin -nographic -sd sd.bin"
    )
    commands = ["version", "ps", "core_mark"]
    # 创建QEMU管理器并运行
    yml_files_content = []
    bsp_root = os.getcwd()
    print(bsp_root)
    directory = os.path.join(bsp_root , '.ci/attachconfig')
    if os.path.exists(directory):
        for root, dirs, files in os.walk(directory):
            for filename in files:
                print(filename)
                if filename.endswith('attachconfig.yml'):
                    file_path = os.path.join(root, filename)
                    print(file_path)
                    if os.path.exists(file_path):
                        try:
                            with open(file_path, 'r' ,encoding='utf-8') as file:
                                print('===========')
                                content = yaml.safe_load(file)
                                if content is None:
                                    continue
                                yml_files_content.append(content)
                        except yaml.YAMLError as e:
                            print(f"::error::Error parsing YAML file: {e}")
                            continue
                        except Exception as e:
                            print(f"::error::Error reading file: {e}")
                            continue

    for projects in yml_files_content:
            for name, details in projects.items():
                if(name == 'bsp_board_info'):
                    print(details)
                    pre_build_commands = details.get("pre_build").splitlines()
                    build_command = details.get("build_cmd").splitlines()
                    post_build_command = details.get("post_build").splitlines()
                    qemu_command = details.get("run_cmd")
                if details.get("kconfig") is not None:
                    if details.get("buildcheckresult")  is not None:
                        build_check_result = details.get("buildcheckresult")
                    if details.get("msh_cmd") is not None:
                        commands = details.get("msh_cmd").splitlines()
                    if details.get("checkresult") is not None:
                        check_result = details.get("checkresult")

    print(commands)
    print(check_result)
    print(build_check_result)
    print(qemu_command)
    print(pre_build_commands)
    #exit(1)

    # 执行 pre_build 命令
    for command in pre_build_commands:
        output, returncode = run_command(command)
        print(output)
        if returncode != 0:
            print(f"Pre-build command failed: {command}")
            print(output)
           # return
    for command in build_command:
        output, returncode = run_command(command)
        print(output)
        if returncode != 0 or not check_output(output, build_check_result):
                print("Build failed or build check result not found")
                print(output)
                #return
    print('==========')
    for command in post_build_command:
        output, returncode = run_command(command)
        print(output)
        if returncode != 0:
            print(f"Post-build command failed: {post_build_command}")
            print(output)
            #return

    print(qemu_command)
    #exit(1)
    qemu_manager = QemuManager(qemu_cmd=qemu_command, idle_timeout=qemu_timeout_second,checkresult=check_result)
    qemu_manager.run(commands)

supperthomas avatar Feb 01 '25 13:02 supperthomas