分享

AutoLine源码之RobotFramework运行器

 开源优测 2021-12-09

什么是AutoLine开源平台

AutoLine开源平台是一个开源自动化测试解决方案,基于RobotFramework进行二次开发,支持RobotFramework几乎所有的库。

源码地址

github地址: https://github.com/small99/AutoLine 

码 云 地 址:https:///lym51/AutoLine 

运行器源码路径及源码结构

在AutoLine中我们自定义实现了RobotFramework的运行器,其路径如下图所示:

源码结构如下图所示:

说明:

  1. 一些已经实现的运行器,用于调试测试用

  2. 运行器分为自动化运行器、调试运行器、手工运行器三种模式

下面我们对源码进行注释

__author__ = "苦叶子"

"""

公众号: 开源优测

Email: lymking@foxmail.com

"""

import os
import platform
import codecs
import time
import json
import subprocess
from datetime import datetime
from threading import Thread, Timer
import xml.etree.ElementTree as ET
from flask import current_app
from flask_login import current_user
from sqlalchemy import and_
from ..models import AutoTask, AutoProject, User
from .. import db
from ..auto.builder import Builder
from .process import Process

# 同步运行器,即阻塞模式,一次只能运行一个RF进程
def robot_run(category, id):
   app = current_app._get_current_object()
   if len(app.config["RESULTS"]) > int(app.config['AUTO_PROCESS_COUNT']):
       return json.dumps({"status": "busying", "msg": "任务池已满!!!"})

   builder = Builder(category, id)
   builder.build()
   app.config["RESULTS"].append(app.config["POOL"].apply_async(builder.test_run, (app, current_user.get_id(),)))

   # app.config["POOL"].join()

   
return json.dumps({"status": "success", "msg": "任务启动成功"})

# 异步运行器,采用多线程方式,可以启动多个RF进程
def robot_async_run(category, id):
   app = current_app._get_current_object()

   builder = Builder(category, id)
   builder.build()

   thr = Thread(target=builder.test_run, args=[app, current_user.get_id()])
   #app.config["RESULTS"].append(thr)

   
thr.start()

   return json.dumps({"status": "success", "msg": "任务启动成功"})

# 用于检查RF运行进程的状态
def check_process_status(app):
   print("timer to check ....%d" % len(app.config["RUNNERS"]))
   with app.app_context():

       try:
           for runner in app.config["RUNNERS"]:
               if runner.is_finish():
                   runner.write_result()
                   app.config["RUNNERS"].remove(runner)
       except Exception as e:
           print(e)

# debug模式运行器
def debug_run(id):
   builder = Builder(id)
   builder.build()
   runner = Runner(builder.id, builder.build_no)

   runner.debug()

   return (builder.id, builder.build_no)

# RF运行器
def run_process(category, id):
   builder = Builder(id)
   builder.build()

   if builder.has_test_case():

       runner = Runner(builder.id, builder.build_no)
       if category == "auto":
           runner.auto_run()
       else:
           runner.run()

       app = current_app._get_current_object()

       app.config["TRIGGER"].update_job(id)

       app.config["RUNNERS"].append({
           "project_id": builder.id,
           
"task_id": builder.build_no,
           
"runner": runner
       })

       return json.dumps({"status": "success", "msg": "任务启动成功"})
   else:
       return json.dumps({"status": "fail", "msg": "项目中没有创建关键字步骤,任务启动失败,请新增关键字步骤!!!"})

# 执行器类
class Runner:
   def __init__(self, project_id, build_no):
       self.project_id = project_id
       self.build_no = build_no
       self._process = None
       
self._timer = None
       
self._out_fd = None

   # 用于调度器自动执行
   def
auto_run(self):
       try:
           user_id = User.query.filter_by(username="AutoExecutor").first().id
           name = AutoProject.query.filter_by(id=self.project_id).first().name
           task = AutoTask(project_id=self.project_id,
                           
build_no=self.build_no,
                           
status="running",
                           
create_author_id=user_id,
                           
create_timestamp=datetime.now())
           db.session.add(task)
           db.session.commit()

           output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
           output_dir = output_dir.replace("\\", "/")

           # -x result/output.xml -l result/log.html -r result/report.html
           
shell = False
           if
"Windows" in platform.platform():
               self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "cp936")
               command = "pybot -d %s -L DEBUG -N %s %s/testcase.robot" % (output_dir, name, output_dir)
               shell = True
           else
:
               self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "utf-8")
               command = ["pybot", "-d", "%s" % output_dir, "-L", "DEBUG", "-N", "%s" % name, "%s/testcase.robot" % output_dir]
               #print(command)

           
self._process = subprocess.Popen(command, shell=shell, stdout=self._out_fd, stderr=subprocess.STDOUT)
           #self._process = Process(command)

           #self._process.start()

       
except Exception as e:
           print(str(e))
           pass

       return
{"status": "success",
               
"msg": "任务启动成功",
               
"project_id": self.project_id,
               
"build_no": self.build_no}

   # 用于手工在页面触发执行
   def run(self):
       #
       
try:
           name = AutoProject.query.filter_by(id=self.project_id).first().name
           task = AutoTask(project_id=self.project_id,
                           
build_no=self.build_no,
                           
status="running",
                           
create_author_id=current_user.get_id(),
                           
create_timestamp=datetime.now())
           db.session.add(task)
           db.session.commit()

           output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
           output_dir = output_dir.replace("\\", "/")

           shell = False
           if
"Windows" in platform.platform():
               self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "cp936")
               command = "pybot -d %s -L DEBUG -N %s %s/testcase.robot" % (output_dir, name, output_dir)
               shell = True
           else
:
               self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "utf-8")
               command = ["pybot", "-d", "%s" % output_dir, "-L", "DEBUG", "-N", "%s" % name,
                         
"%s/testcase.robot" % output_dir]
               # print(command)

           
self._process = subprocess.Popen(command, shell=shell, stdout=self._out_fd, stderr=subprocess.STDOUT)

       except Exception as e:
           print(str(e))
           pass

       return
{"status": "success",
               
"msg": "任务启动成功",
               
"project_id": self.project_id,
               
"build_no": self.build_no}

   # 调试模式,用于检查是否符合RF语法
   def debug(self):
       try:
           output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
           output_dir = output_dir.replace("\\", "/")
           # -x result/output.xml -l result/log.html -r result/report.html
           
command = ["pybot", "-d", "%s" % output_dir, "--dryrun", "-N", "调试输出", "%s/testcase.robot" % output_dir]
           self._out_fd = open(output_dir + "/debug.log", "a+")
           self._process = subprocess.Popen(command, shell=False, stdout=self._out_fd, stderr=subprocess.STDOUT)
           while True:
               if self._process.poll() == 0:  # 判断子进程是否结束
                   
break
               else
:
                   time.sleep(0.2)

       except Exception as e:
           print(str(e))
           pass

       return
{"status": "success",
               
"msg": "任务启动成功",
               
"project_id": self.project_id,
               
"build_no": self.build_no}
   # 停止任务
   def stop(self):
       status = "success"
       
msg = "任务终止"
       
try:
           self._process.stop()
           msg += "成功"
       
except Exception as e:
           status = "fail"
           
msg = msg + "异常" + str(e)

       return {"status": status,
               
"msg": msg,
               
"project_id": self.project_id,
               
"build_no": self.build_no}

   def get_output(self, wait_until_finished=False):
       return self._process.get_output(wait_until_finished)

   def is_finish(self):
       return self._process.is_finished()

   def write_result(self):
       output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
       output_dir = output_dir.replace("\\", "/")
       print("write ... result ...")
       print(os.path.exists(output_dir + "/log.html"))
       if os.path.exists(output_dir + "/log.html"):
           time.sleep(0.2)
           task = AutoTask.query.filter(and_(AutoTask.project_id == self.project_id,
                                             
AutoTask.build_no == self.build_no)).first()
           tree = ET.parse(output_dir + "/output.xml")
           root = tree.getroot()
           passed = root.find("./statistics/suite/stat").attrib["pass"]
           fail = root.find("./statistics/suite/stat").attrib["fail"]
           if int(fail) != 0:
               task.status = 'fail'
           
else:
               task.status = 'pass'
           
db.session.merge(task)
           db.session.commit()

           self._timer.canel()

说明:

  1. 在运行器中,关键的是一个Builder类,该类实现了从数据库读取数据,并序列号为RF语法的文件

  2. Runner执行器根据类型(web、app、http)调用Builder加载不同的RobotFramework支持库和通用的库,实现对RobotFramework的完整的支持

  3. 大家主要看Runner类,这里不对代码一一解释,因为代码本身没什么难度,关键在于细节的看上几遍就懂了的

AutoLine开源平台简明教程

AutoLine开源平台安装部署教程

AutoLine开源平台常见问题解答

AutoLine开源平台源码组织结构

AutoLine源码分析之开始篇

AutoLine源码分析之入口源码

AutoLine源码分析之配置管理

AutoLine源码分析之数据库模型

AutoLine源码分析之Flask初始化模块

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多