classArgs: @classmethod defparse(cls, fields: list, args: tuple, kwargs: dict) -> tuple: # Parse fixed args results = [] for i, field in enumerate(fields): if i < len(args): results.append(args[i]) else: value = kwargs.get(field, None) if value: results.append(value) # Args comes from kwargs, pop the key kwargs.pop(field) results.append(kwargs) return tuple(results)
defBodyImpl(json_str: str, expr: dict) -> Result: json_obj = json.loads(json_str) for json_path, value in expr.items(): _assign(json_obj, json_path, value) result = Result() result.data = json_obj return result
def_parse_dict_expr(expr: str) -> list: """ Input: '["store"]["book"][0]["title"]' Output: ['store', 'book', 0, 'title'] """ tokens = re.findall(r'\["(.*?)"\]|\[(\d+)\]', expr) result = [int(index) if index.isdigit() else name for name, index in tokens] return result
def_nested_modify(json_obj: [dict, list], keys: list, value: Any, current_level: int = 0): if current_level == len(keys) - 1: json_obj[keys[current_level]] = value else: current_key = keys[current_level] # Nested string json {"id": 1, "param": "{\"page\": 1}"} if isinstance(json_obj[current_key], str): # str to json current_value = json.loads(json_obj[current_key]) if isinstance(current_value, dict) or isinstance(current_value, list): nested_string_json_obj = current_value _nested_modify(nested_string_json_obj, keys[current_level + 1:], value) # json to str json_obj[current_key] = json.dumps(nested_string_json_obj, ensure_ascii=False) else: _nested_modify(json_obj[current_key], keys, value, current_level + 1)
if __name__ == '__main__': settings = { "path": ["test_demo.py"], # Path to run, relative path to case "report": False, # Output test report or not "report_type": "pytest-html"# "pytest-html" "allure" } Run(settings)
deftep_plugins(): """ Must be placed at the top, execute first to initialize base dir """ caller = inspect.stack()[1] Config.BASE_DIR = os.path.abspath(os.path.dirname(caller.filename)) plugins = _keyword_path() + _fixture_path() # +[other plugins] return plugins
classConfig: # Class variable initialize first BASE_DIR = ""
# Constant CREATE_ENV = False # The temporary directory of the allure source file, which is a pile of JSON files, # will be deleted when generating HTML reports ALLURE_SOURCE_PATH = ".allure.source.temp"
classPlugin: @staticmethod defpytest_addoption(parser): """ Allure test report, command line parameters """ parser.addoption( "--tep-reports", action="store_const", const=True, help="Create tep allure HTML reports." )
@staticmethod defpytest_configure(config): """ Reference: https://github.com/allure-framework/allure-python/blob/master/allure-pytest/src/plugin.py In order to generate an allure source file for generating HTML reports """ if _tep_reports(config): if os.path.exists(Config.ALLURE_SOURCE_PATH): shutil.rmtree(Config.ALLURE_SOURCE_PATH) test_listener = AllureListener(config) config.pluginmanager.register(test_listener) allure_commons.plugin_manager.register(test_listener) config.add_cleanup(cleanup_factory(test_listener))
@staticmethod defpytest_sessionfinish(session): """ Generate an allure report after the test run ends """ reports_path = os.path.join(Config.BASE_DIR, "reports") if _tep_reports(session.config): if _is_master(session.config): # Generate reports only at the master node # Historical data from the latest report, filling in the allure trend chart if os.path.exists(reports_path): his_reports = os.listdir(reports_path) if his_reports: latest_report_history = os.path.join(reports_path, his_reports[-1], "history") shutil.copytree(latest_report_history, os.path.join(Config.ALLURE_SOURCE_PATH, "history"))
classTepResponse(Response): """ Inherit on requests.Response, adding additional methods """
def__init__(self, response): super().__init__() for k, v in response.__dict__.items(): self.__dict__[k] = v
defjsonpath(self, expr: str): """ Force the first value here for simple values If complex values are taken, it is recommended to use jsonpath native directly """ return jsonpath.jsonpath(self.json(), expr)[0]
@app.post("/order") asyncdeforder(req: Request): body = await req.json() if req.headers.get("Cookie") == "de2e3ffu29"and body["skuId"] == "222": return {"orderId": "333"} return""
@app.post("/pay") asyncdefpay(req: Request): body = await req.json() if req.headers.get("Cookie") == "de2e3ffu29"and body["orderId"] == "333": return {"success": "true"} return""
if __name__ == '__main__': uvicorn.run("mock:app", host="127.0.0.1", port=5000)
工具包
Pairwise.py,根据多个条件生成两两组合过滤后的结果集,适用于查询条件组合验证。
import copy import itertools from sys import stdout
from loguru import logger
defparewise(option: list) -> list: """ Automatically generate composite use cases """ cp = [] # Cartesian product s = [] # Split in pairs for x in eval('itertools.product' + str(tuple(option))): cp.append(x) s.append([i for i in itertools.combinations(x, 2)]) logger.info('Cartesian product:%s' % len(cp)) del_row = [] print_progress_bar(0) s2 = copy.deepcopy(s) for i in range(len(s)): # Match each line of use cases if (i % 100) == 0or i == len(s) - 1: print_progress_bar(int(100 * i / (len(s) - 1))) t = 0 # Judge whether the pairwise splitting of each line of use cases appears in other lines for j in range(len(s[i])): flag = False for i2 in [x for x in range(len(s2)) if s2[x] != s[i]]: # Find the same column if s[i][j] == s2[i2][j]: t = t + 1 flag = True break # The same column was not found, so there's no need to search for the remaining columns ifnot flag: break if t == len(s[i]): del_row.append(i) s2.remove(s[i]) res = [cp[i] for i in range(len(cp)) if i notin del_row] logger.info('After filtering:%s' % len(res)) return res
defprint_progress_bar(i): c = int(i / 10) progress = '\r %2d%% [%s%s]' a = '■' * c b = '□' * (10 - c) msg = progress % (i, a, b) stdout.write(msg) stdout.flush()
if __name__ == '__main__': pl = [['M', 'O', 'P'], ['W', 'L', 'I'], ['C', 'E']] res = parewise(pl) print() for x in res: print(x)
proc = subprocess.Popen(["pip", "uninstall", "tep"], stdin=subprocess.PIPE) proc.communicate(input="y".encode()) os.chdir(r"/Users/wanggang888/Desktop/PycharmProjects/tep/venv/lib/python3.8/site-packages") for dir_name in os.listdir(): if dir_name.startswith("tep"): shutil.rmtree(dir_name)