Commit 10b4056d by 15629057652

第一次尝试提交

parents
Showing with 4437 additions and 0 deletions
File added
##ignore this file##
/venv/
/target/
/.idea/
/.settings/
/.vscode/
/bin/
/report/
!index.pyc
*.pyc
.classpath
.project
.settings
.idea
##filter databfile、sln file##
*.mdb
*.ldb
*.sln
##class file##
*.com
*.class
*.dll
*.exe
*.q
*.o
*.so
# compression file
*.7z
*.dmg
*.gz
*.iso
*.jar
*.rar
*.tar
*.zip
*.via
*.tmp
*.err
*.log
*.iml
# OS generated files #
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
Icon?
ehthumbs.db
Thumbs.db
.factorypath
/.mvn/
/mvnw.cmd
/mvnw
\ No newline at end of file
This diff is collapsed. Click to expand it.
project_name: xxx项目名称
env: 测试环境
# 测试人员名称,作用于自动生成代码的作者,以及发送企业微信、钉钉通知的测试负责人
tester_name: lyd
# 域名1
host: https://www.wanandroid.com
# 域名2,支持多个域名配置
app_host:
# 实时更新用例内容,False时,已生成的代码不会在做变更
# 设置为True的时候,修改yaml文件的用例,代码中的内容会实时更新
real_time_update_test_cases: False
# 报告通知类型:0: 不发送通知 1:钉钉 2:企业微信通知 3、邮箱通知 4、飞书通知
# 支持同时发送多个通知,如多个,则用逗号分割, 如 1, 2
notification_type: 0
# 收集失败的用例开关,整理成excel报告的形式,自动发送,目前只支持返送企业微信通知
excel_report: False
# 钉钉相关配置
ding_talk:
webhook: https://oapi.dingtalk.com/robot/send?access_token=5052516dc545509a2674d67bf0092f4a803b151ce165748013d3f30388cf3e81
secret: SECc9c961257fbab55744f3163f6de14676699792ebe54e6ce96696517d1a847625
# webhook:
# secret:
# 数据库相关配置
mysql_db:
# 数据库开关
switch: False
host:
user: root
password: '123456'
port: 3306
# 镜像源
mirror_source: http://mirrors.aliyun.com/pypi/simple/
# 企业通知的相关配置
wechat:
webhook:
### 邮箱必填,需要全部都配置好,程序运行失败时,会发送邮件通知!!!!
### 邮箱必填,需要全部都配置好,程序运行失败时,会发送邮件通知!!!!
### 邮箱必填,需要全部都配置好,程序运行失败时,会发送邮件通知!!!!
### 重要的事情说三遍
email:
send_user:
email_host:
# 自己到QQ邮箱中配置stamp_key
stamp_key:
# 收件人改成自己的邮箱
send_list:
# 飞书通知
lark:
webhook:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from typing import Text
def root_path():
""" 获取 根路径 """
path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
return path
def ensure_path_sep(path: Text) -> Text:
"""兼容 windows 和 linux 不同环境的操作系统路径 """
if "/" in path:
path = os.sep.join(path.split("/"))
if "\\" in path:
path = os.sep.join(path.split("\\"))
return root_path() + path
# 公共参数
case_common:
allureEpic: 开发平台接口
allureFeature: 收藏模块
allureStory: 收藏网址接口
collect_addtool_01:
host: ${{host()}}
url: /lg/collect/addtool/json
method: POST
detail: 新增收藏网址接口
headers:
# 这里cookie的值,写的是存入缓存的名称
cookie: $cache{login_cookie}
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
name: 自动化
link: https://gitee.com/yu_xiao_qi/pytest-api
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: 0
AssertType:
message: "errorCode 断言为 0"
current_request_set_cache:
- type: response
jsonpath: $.data.id
# 自定义的缓存名称
name: yushaoqi_sql
sql:
teardown:
- case_id: collect_delete_tool_01
send_request:
- dependent_type: response
jsonpath: $.data.id
replace_key: $.data.id
teardown_sql:
- UPDATE `api_test`.`ysq_test` SET `name` = '$json($.data.id)$' WHERE `name` = '2' LIMIT 1
collect_addtool_02:
host: ${{host()}}
url: /lg/collect/addtool/json
method: POST
detail: 未登录状态下新增收藏网址
headers:
Content-Type: multipart/form-data;
# 这里cookie的值,写的是存入缓存的名称
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
name: 自动生成收藏网址${{random_int()}}
link: https://gitee.com/yu_xiao_qi/pytest-api
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
status_code: 200
# 断言接口状态码
errorCode:
# 断言接口状态码
jsonpath: $.errorCode
type: ==
value: -1001
AssertType:
errorMsg:
jsonpath: $.errorMsg
type: ==
value: '请先登录!'
AssertType:
sql:
# 公共参数
case_common:
allureEpic: 开发平台接口
allureFeature: 收藏模块
allureStory: 删除收藏网站接口
collect_delete_tool_01:
host: ${{host()}}
url: /lg/collect/deletetool/json
method: POST
detail: 正常删除收藏网站
headers:
Content-Type: multipart/form-data;
# 这里cookie的值,写的是存入缓存的名称
cookie: $cache{login_cookie}
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
id: $cache{collect_delete_tool_01_id}
id2: 2
dependence_case: True
# 依赖的数据
dependence_case_data:
- case_id: collect_addtool_01
dependent_data:
- dependent_type: response
jsonpath: $.data.id
set_cache: collect_delete_tool_01_id
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: 0
AssertType:
sql:
collect_delete_tool_02:
host: ${{host()}}
url: /lg/collect/deletetool/json
method: POST
detail: 正常删除不存在的ID数据(接口未完成此功能,跳过该条用例)
headers:
Content-Type: multipart/form-data;
# 这里cookie的值,写的是存入缓存的名称
cookie: $cache{login_cookie}
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run: False
data:
id: 111
# 是否有依赖业务,为空或者false则表示没有
dependence_case: True
# 依赖的数据
dependence_case_data:
- case_id: collect_addtool_01
dependent_data:
- dependent_type: response
jsonpath: $.data.id
replace_key: $.data.id
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: 0
AssertType:
sql:
# 公共参数
case_common:
allureEpic: 开发平台接口
allureFeature: 收藏模块
allureStory: 收藏网址列表接口
collect_tool_list_01:
host: ${{host()}}
url: /lg/collect/usertools/json
method: GET
detail: 查看收藏网址列表接口
headers:
Content-Type: multipart/form-data;
# 这里cookie的值,写的是存入缓存的名称
cookie: $cache{login_cookie}
# 请求的数据,是 params 还是 json、或者file、data
requestType: None
# 是否执行,空或者 true 都会执行
is_run:
data:
# 是否有依赖业务,为空或者false则表示没有
dependence_case: True
# 依赖的数据
dependence_case_data:
- case_id: self
dependent_data:
- dependent_type: sqlData
jsonpath: $.business_type
set_cache: yushaoqi
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: 0
AssertType:
status_code: 200
sql:
setup_sql:
- SELECT * FROM `api_test`.`t_open_field_cfg_copy1` LIMIT 0,1;
sleep: 2
# 公共参数
case_common:
allureEpic: 开发平台接口
allureFeature: 收藏模块
allureStory: 编辑收藏网址接口
collect_update_tool_01:
host: ${{host()}}
url: /lg/collect/addtool/json
method: POST
detail: 编辑收藏网址
headers:
Content-Type: multipart/form-data;
# 这里cookie的值,写的是存入缓存的名称
cookie: $cache{login_cookie}
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run: False
data:
name: 自动化编辑网址名称
link: https://gitee.com/yu_xiao_qi/pytest-api
id:
# 是否有依赖业务,为空或者false则表示没有
dependence_case: True
# 依赖的数据
dependence_case_data:
- case_id: collect_addtool_01
dependent_data:
- dependent_type: response
jsonpath: $.data.id
replace_key: $.data.id
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: 0
AssertType:
sql:
teardown:
# 先搜索
- case_id: collect_tool_list_01
param_prepare:
- dependent_type: self_response
jsonpath: $.data[-1:].id
set_cache: $set_cache{artile_id}
# 删除
- case_id: collect_delete_tool_01
send_request:
# 删除从缓存中拿数据
- dependent_type: cache
cache_data: int:artile_id
replace_key: $.data.id
# 公共参数
case_common:
allureEpic: 开发平台接口
allureFeature: 登录模块
allureStory: 登录
login_01:
host: ${{host()}}
url: /user/login
method: POST
detail: 正常登录
headers:
# Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username: '18800000001'
password: '123456'
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: 0
AssertType:
# 断言接口返回的username
username:
jsonpath: $.data.username
type: ==
value: '18800000001'
AssertType:
sql:
login_02:
host: ${{host()}}
url: /user/login
method: POST
detail: 输入错误的密码
headers:
Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username: '18800000001'
password: '12345'
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: -1
AssertType:
# 断言接口返回的username
errorMsg:
jsonpath: $.errorMsg
type: ==
value: "账号密码不匹配!"
AssertType:
sql:
login_03:
host: ${{host()}}
url: /user/login
method: POST
detail: 登录密码为空
headers:
Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username: '18800000001'
password:
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: -1
AssertType:
# 断言接口返回的username
errorMsg:
jsonpath: $.errorMsg
type: ==
value: "账号密码不匹配!"
AssertType:
sql:
login_04:
host: ${{host()}}
url: /user/login
method: POST
detail: 输入非1开头的手机号码
headers:
Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username: '28800000001'
password: '12345'
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: -1
AssertType:
# 断言接口返回的username
errorMsg:
jsonpath: $.errorMsg
type: ==
value: "账号密码不匹配!"
AssertType:
sql:
login_05:
host: ${{host()}}
url: /user/login
method: POST
detail: 输入手机号码小于11位
headers:
Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username: '1880000000'
password: '12345'
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: -1
AssertType:
# 断言接口返回的username
errorMsg:
jsonpath: $.errorMsg
type: ==
value: "账号密码不匹配!"
AssertType:
sql:
login_06:
host: ${{host()}}
url: /user/login
method: POST
detail: 输入手机号码大于于11位
headers:
Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username: '18800000000'
password: '12345'
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: -1
AssertType:
# 断言接口返回的username
errorMsg:
jsonpath: $.errorMsg
type: ==
value: "账号密码不匹配!"
AssertType:
sql:
login_07:
host: ${{host()}}
url: /user/login
method: POST
detail: 手机号码为空
headers:
Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username:
password: '12345'
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: -1
AssertType:
# 断言接口返回的username
errorMsg:
jsonpath: $.errorMsg
type: ==
value: "账号密码不匹配!"
AssertType:
sql:
login_08:
host: ${{host()}}
url: /user/login
method: POST
detail: 手机号码首位包含空格
headers:
Content-Type: multipart/form-data;
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
username: ' 18867507063 '
password: '12345'
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: -1
AssertType:
# 断言接口返回的username
errorMsg:
jsonpath: $.errorMsg
type: ==
value: "账号密码不匹配!"
AssertType:
sql:
# 公共参数
case_common:
allureEpic: 开发平台接口
allureFeature: 个人信息模块
allureStory: 个人信息接口
get_user_info_01:
host: ${{host()}}
url: /user/lg/userinfo/json
method: GET
detail: 正常获取个人身份信息
headers:
Content-Type: multipart/form-data;
# 这里cookie的值,写的是存入缓存的名称
cookie: $cache{login_cookie}
# 请求的数据,是 params 还是 json、或者file、data
requestType: data
# 是否执行,空或者 true 都会执行
is_run:
data:
# 是否有依赖业务,为空或者false则表示没有
dependence_case: False
# 依赖的数据
dependence_case_data:
assert:
# 断言接口状态码
errorCode:
jsonpath: $.errorCode
type: ==
value: 0
AssertType:
# 断言接口返回的username
username:
jsonpath: $.data.userInfo.username
type: ==
value: '18800000001'
AssertType:
sql:
[pytest]
addopts = -p no:warnings
testpaths = test_case/
python_files = test_*.py
python_classes = Test*
python_function = test_*
markers =
smoke: 冒烟测试
aiofiles==0.8.0
allure-pytest==2.9.45
allure-python-commons==2.9.45
asgiref==3.5.1
atomicwrites==1.4.0
attrs==21.2.0
blinker==1.4
Brotli==1.0.9
certifi==2021.10.8
cffi==1.15.0
chardet==4.0.0
charset-normalizer==2.0.7
click==8.1.3
colorama==0.4.4
colorlog==6.6.0
cryptography==36.0.0
DingtalkChatbot==1.5.3
et-xmlfile==1.1.0
execnet==1.9.0
Faker==9.8.3
Flask==2.0.3
h11==0.13.0
h2==4.1.0
hpack==4.0.0
httptools==0.4.0
hyperframe==6.0.1
idna==3.3
iniconfig==1.1.0
itchat==1.3.10
itsdangerous==2.1.2
Jinja2==3.1.2
jsonpath==0.82
kaitaistruct==0.9
ldap3==2.9.1
MarkupSafe==2.1.1
mitmproxy~=8.1.0
msgpack==1.0.3
multidict==6.0.2
openpyxl==3.0.9
packaging==21.3
passlib==1.7.4
pluggy==1.0.0
protobuf==3.19.4
publicsuffix2==2.20191221
py==1.11.0
pyasn1==0.4.8
pycparser==2.21
pydivert==2.1.0
PyMySQL==1.0.2
pyOpenSSL==21.0.0
pyparsing==3.0.6
pyperclip==1.8.2
pypng==0.0.21
PyQRCode==1.2.1
pytest~=7.1.2
pytest-forked==1.3.0
pytest-xdist==2.4.0
python-dateutil==2.8.2
pywin32==304
PyYAML~=5.4.1
requests==2.26.0
requests-toolbelt==0.9.1
ruamel.yaml==0.17.21
ruamel.yaml.clib==0.2.6
sanic==22.3.1
sanic-routing==22.3.0
six==1.16.0
sortedcontainers==2.4.0
text-unidecode==1.3
toml==0.10.2
tornado==6.1
urllib3==1.26.7
urwid==2.1.2
websockets==10.3
Werkzeug==2.1.2
wsproto==1.1.0
xlrd==2.0.1
xlutils==2.0.0
xlwings==0.27.7
xlwt==1.3.0
zstandard==0.17.0
pyDes~=2.0.1
crypto~=1.4.1
redis~=4.3.4
pydantic~=1.8.2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import traceback
import pytest
from utils.other_tools.models import NotificationType
from utils.other_tools.allure_data.allure_report_data import AllureFileClean
from utils.logging_tool.log_control import INFO
from utils.notify.wechat_send import WeChatSend
from utils.notify.ding_talk import DingTalkSendMsg
from utils.notify.send_mail import SendEmail
from utils.notify.lark import FeiShuTalkChatBot
from utils.other_tools.allure_data.error_case_excel import ErrorCaseExcel
from utils.other_tools.allure_data.error_case_excel import ErrorCaseExcel
from utils import config
def run():
# 从配置文件中获取项目名称
try:
INFO.logger.info(
"""
_ _ _ _____ _
__ _ _ __ (_) / \\ _ _| |_ __|_ _|__ ___| |_
/ _` | '_ \\| | / _ \\| | | | __/ _ \\| |/ _ \\/ __| __|
| (_| | |_) | |/ ___ \\ |_| | || (_) | | __/\\__ \\ |_
\\__,_| .__/|_/_/ \\_\\__,_|\\__\\___/|_|\\___||___/\\__|
|_|
开始执行{}项目...
""".format(config.project_name)
)
# 判断现有的测试用例,如果未生成测试代码,则自动生成
# TestCaseAutomaticGeneration().get_case_automatic()
pytest.main(['-s', '-W', 'ignore:Module already imported:pytest.PytestWarning',
'--alluredir', './report/tmp', "--clean-alluredir"])
"""
--reruns: 失败重跑次数
--count: 重复执行次数
-v: 显示错误位置以及错误的详细信息
-s: 等价于 pytest --capture=no 可以捕获print函数的输出
-q: 简化输出信息
-m: 运行指定标签的测试用例
-x: 一旦错误,则停止运行
--maxfail: 设置最大失败次数,当超出这个阈值时,则不会在执行测试用例
"--reruns=3", "--reruns-delay=2"
"""
os.system(r"allure generate ./report/tmp -o ./report/html --clean")
allure_data = AllureFileClean().get_case_count()
notification_mapping = {
NotificationType.DING_TALK.value: DingTalkSendMsg(allure_data).send_ding_notification,
NotificationType.WECHAT.value: WeChatSend(allure_data).send_wechat_notification,
NotificationType.EMAIL.value: SendEmail(allure_data).send_main,
NotificationType.FEI_SHU.value: FeiShuTalkChatBot(allure_data).post
}
if config.notification_type != NotificationType.DEFAULT.value:
notify_type = config.notification_type.split(",")
for i in notify_type:
notification_mapping.get(i.lstrip(""))()
if config.excel_report:
ErrorCaseExcel().write_case()
# 程序运行之后,自动启动报告,如果不想启动报告,可注释这段代码
os.system(f"allure serve ./report/tmp -h 127.0.0.1 -p 9999")
except Exception:
# 如有异常,相关异常发送邮件
e = traceback.format_exc()
send_email = SendEmail(AllureFileClean.get_case_count())
send_email.error_mail(e)
raise
if __name__ == '__main__':
run()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023-03-20 13:55:04
import allure
import pytest
from utils.read_files_tools.get_yaml_data_analysis import GetTestCase
from utils.assertion.assert_control import Assert
from utils.requests_tool.request_control import RequestControl
from utils.read_files_tools.regular_control import regular
from utils.requests_tool.teardown_control import TearDownHandler
case_id = ['collect_addtool_01', 'collect_addtool_02']
TestData = GetTestCase.case_data(case_id)
re_data = regular(str(TestData))
@allure.epic("开发平台接口")
@allure.feature("收藏模块")
class TestCollectAddtool:
@allure.story("收藏网址接口")
@pytest.mark.parametrize('in_data', eval(re_data), ids=[i['detail'] for i in TestData])
def test_collect_addtool(self, in_data, case_skip):
"""
:param :
:return:
"""
res = RequestControl(in_data).http_request()
TearDownHandler(res).teardown_handle()
Assert(assert_data=in_data['assert_data'],
sql_data=res.sql_data,
request_data=res.body,
response_data=res.response_data,
status_code=res.status_code).assert_type_handle()
if __name__ == '__main__':
pytest.main(['test_test_collect_addtool.py', '-s', '-W', 'ignore:Module already imported:pytest.PytestWarning'])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023-03-20 13:55:04
import allure
import pytest
from utils.read_files_tools.get_yaml_data_analysis import GetTestCase
from utils.assertion.assert_control import Assert
from utils.requests_tool.request_control import RequestControl
from utils.read_files_tools.regular_control import regular
from utils.requests_tool.teardown_control import TearDownHandler
case_id = ['collect_delete_tool_01', 'collect_delete_tool_02']
TestData = GetTestCase.case_data(case_id)
re_data = regular(str(TestData))
@allure.epic("开发平台接口")
@allure.feature("收藏模块")
class TestCollectDeleteTool:
@allure.story("删除收藏网站接口")
@pytest.mark.parametrize('in_data', eval(re_data), ids=[i['detail'] for i in TestData])
def test_collect_delete_tool(self, in_data, case_skip):
"""
:param :
:return:
"""
res = RequestControl(in_data).http_request()
TearDownHandler(res).teardown_handle()
Assert(assert_data=in_data['assert_data'],
sql_data=res.sql_data,
request_data=res.body,
response_data=res.response_data,
status_code=res.status_code).assert_type_handle()
if __name__ == '__main__':
pytest.main(['test_test_collect_delete_tool.py', '-s', '-W', 'ignore:Module already imported:pytest.PytestWarning'])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023-03-20 13:55:04
import allure
import pytest
from utils.read_files_tools.get_yaml_data_analysis import GetTestCase
from utils.assertion.assert_control import Assert
from utils.requests_tool.request_control import RequestControl
from utils.read_files_tools.regular_control import regular
from utils.requests_tool.teardown_control import TearDownHandler
case_id = ['collect_tool_list_01']
TestData = GetTestCase.case_data(case_id)
re_data = regular(str(TestData))
@allure.epic("开发平台接口")
@allure.feature("收藏模块")
class TestCollectToolList:
@allure.story("收藏网址列表接口")
@pytest.mark.parametrize('in_data', eval(re_data), ids=[i['detail'] for i in TestData])
def test_collect_tool_list(self, in_data, case_skip):
"""
:param :
:return:
"""
res = RequestControl(in_data).http_request()
TearDownHandler(res).teardown_handle()
Assert(assert_data=in_data['assert_data'],
sql_data=res.sql_data,
request_data=res.body,
response_data=res.response_data,
status_code=res.status_code).assert_type_handle()
if __name__ == '__main__':
pytest.main(['test_test_collect_tool_list.py', '-s', '-W', 'ignore:Module already imported:pytest.PytestWarning'])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023-03-20 13:55:04
import allure
import pytest
from utils.read_files_tools.get_yaml_data_analysis import GetTestCase
from utils.assertion.assert_control import Assert
from utils.requests_tool.request_control import RequestControl
from utils.read_files_tools.regular_control import regular
from utils.requests_tool.teardown_control import TearDownHandler
case_id = ['collect_update_tool_01']
TestData = GetTestCase.case_data(case_id)
re_data = regular(str(TestData))
@allure.epic("开发平台接口")
@allure.feature("收藏模块")
class TestCollectUpdateTool:
@allure.story("编辑收藏网址接口")
@pytest.mark.parametrize('in_data', eval(re_data), ids=[i['detail'] for i in TestData])
def test_collect_update_tool(self, in_data, case_skip):
"""
:param :
:return:
"""
res = RequestControl(in_data).http_request()
TearDownHandler(res).teardown_handle()
Assert(assert_data=in_data['assert_data'],
sql_data=res.sql_data,
request_data=res.body,
response_data=res.response_data,
status_code=res.status_code).assert_type_handle()
if __name__ == '__main__':
pytest.main(['test_test_collect_update_tool.py', '-s', '-W', 'ignore:Module already imported:pytest.PytestWarning'])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023-03-20 13:55:04
import allure
import pytest
from utils.read_files_tools.get_yaml_data_analysis import GetTestCase
from utils.assertion.assert_control import Assert
from utils.requests_tool.request_control import RequestControl
from utils.read_files_tools.regular_control import regular
from utils.requests_tool.teardown_control import TearDownHandler
case_id = ['login_01', 'login_02', 'login_03', 'login_04', 'login_05', 'login_06', 'login_07', 'login_08']
TestData = GetTestCase.case_data(case_id)
re_data = regular(str(TestData))
@allure.epic("开发平台接口")
@allure.feature("登录模块")
class TestLogin:
@allure.story("登录")
@pytest.mark.parametrize('in_data', eval(re_data), ids=[i['detail'] for i in TestData])
def test_login(self, in_data, case_skip):
"""
:param :
:return:
"""
res = RequestControl(in_data).http_request()
TearDownHandler(res).teardown_handle()
Assert(assert_data=in_data['assert_data'],
sql_data=res.sql_data,
request_data=res.body,
response_data=res.response_data,
status_code=res.status_code).assert_type_handle()
if __name__ == '__main__':
pytest.main(['test_test_login.py', '-s', '-W', 'ignore:Module already imported:pytest.PytestWarning'])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023-03-20 13:55:04
import allure
import pytest
from utils.read_files_tools.get_yaml_data_analysis import GetTestCase
from utils.assertion.assert_control import Assert
from utils.requests_tool.request_control import RequestControl
from utils.read_files_tools.regular_control import regular
from utils.requests_tool.teardown_control import TearDownHandler
case_id = ['get_user_info_01']
TestData = GetTestCase.case_data(case_id)
re_data = regular(str(TestData))
@allure.epic("开发平台接口")
@allure.feature("个人信息模块")
class TestGetUserInfo:
@allure.story("个人信息接口")
@pytest.mark.parametrize('in_data', eval(re_data), ids=[i['detail'] for i in TestData])
def test_get_user_info(self, in_data, case_skip):
"""
:param :
:return:
"""
res = RequestControl(in_data).http_request()
TearDownHandler(res).teardown_handle()
Assert(assert_data=in_data['assert_data'],
sql_data=res.sql_data,
request_data=res.body,
response_data=res.response_data,
status_code=res.status_code).assert_type_handle()
if __name__ == '__main__':
pytest.main(['test_test_get_user_info.py', '-s', '-W', 'ignore:Module already imported:pytest.PytestWarning'])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from common.setting import ensure_path_sep
from utils.read_files_tools.get_yaml_data_analysis import CaseData
from utils.read_files_tools.get_all_files_path import get_all_files
from utils.cache_process.cache_control import CacheHandler, _cache_config
def write_case_process():
"""
获取所有用例,写入用例池中
:return:
"""
# 循环拿到所有存放用例的文件路径
for i in get_all_files(file_path=ensure_path_sep("\\data"), yaml_data_switch=True):
# 循环读取文件中的数据
case_process = CaseData(i).case_process(case_id_switch=True)
if case_process is not None:
# 转换数据类型
for case in case_process:
for k, v in case.items():
# 判断 case_id 是否已存在
case_id_exit = k in _cache_config.keys()
# 如果case_id 不存在,则将用例写入缓存池中
if case_id_exit is False:
CacheHandler.update_cache(cache_name=k, value=v)
# case_data[k] = v
# 当 case_id 为 True 存在时,则跑出异常
elif case_id_exit is True:
raise ValueError(f"case_id: {k} 存在重复项, 请修改case_id\n"
f"文件路径: {i}")
write_case_process()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import time
import allure
import requests
import ast
from common.setting import ensure_path_sep
from utils.requests_tool.request_control import cache_regular
from utils.logging_tool.log_control import INFO, ERROR, WARNING
from utils.other_tools.models import TestCase
from utils.read_files_tools.clean_files import del_file
from utils.other_tools.allure_data.allure_tools import allure_step, allure_step_no
from utils.cache_process.cache_control import CacheHandler
@pytest.fixture(scope="session", autouse=False)
def clear_report():
"""如clean命名无法删除报告,这里手动删除"""
del_file(ensure_path_sep("\\report"))
@pytest.fixture(scope="session", autouse=True)
def work_login_init():
"""
获取登录的cookie
:return:
"""
url = "https://www.wanandroid.com/user/login"
data = {
"username": 18800000001,
"password": 123456
}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
# 请求登录接口
res = requests.post(url=url, data=data, verify=True, headers=headers)
response_cookie = res.cookies
cookies = ''
for k, v in response_cookie.items():
_cookie = k + "=" + v + ";"
# 拿到登录的cookie内容,cookie拿到的是字典类型,转换成对应的格式
cookies += _cookie
# 将登录接口中的cookie写入缓存中,其中login_cookie是缓存名称
CacheHandler.update_cache(cache_name='login_cookie', value=cookies)
def pytest_collection_modifyitems(items):
"""
测试用例收集完成时,将收集到的 item 的 name 和 node_id 的中文显示在控制台上
:return:
"""
for item in items:
item.name = item.name.encode("utf-8").decode("unicode_escape")
item._nodeid = item.nodeid.encode("utf-8").decode("unicode_escape")
# 期望用例顺序
# print("收集到的测试用例:%s" % items)
appoint_items = ["test_get_user_info", "test_collect_addtool", "test_Cart_List", "test_ADD", "test_Guest_ADD",
"test_Clear_Cart_Item"]
# 指定运行顺序
run_items = []
for i in appoint_items:
for item in items:
module_item = item.name.split("[")[0]
if i == module_item:
run_items.append(item)
for i in run_items:
run_index = run_items.index(i)
items_index = items.index(i)
if run_index != items_index:
n_data = items[run_index]
run_index = items.index(n_data)
items[items_index], items[run_index] = items[run_index], items[items_index]
def pytest_configure(config):
config.addinivalue_line("markers", 'smoke')
config.addinivalue_line("markers", '回归测试')
@pytest.fixture(scope="function", autouse=True)
def case_skip(in_data):
"""处理跳过用例"""
in_data = TestCase(**in_data)
if ast.literal_eval(cache_regular(str(in_data.is_run))) is False:
allure.dynamic.title(in_data.detail)
allure_step_no(f"请求URL: {in_data.is_run}")
allure_step_no(f"请求方式: {in_data.method}")
allure_step("请求头: ", in_data.headers)
allure_step("请求数据: ", in_data.data)
allure_step("依赖数据: ", in_data.dependence_case_data)
allure_step("预期数据: ", in_data.assert_data)
pytest.skip()
def pytest_terminal_summary(terminalreporter):
"""
收集测试结果
"""
_PASSED = len([i for i in terminalreporter.stats.get('passed', []) if i.when != 'teardown'])
_ERROR = len([i for i in terminalreporter.stats.get('error', []) if i.when != 'teardown'])
_FAILED = len([i for i in terminalreporter.stats.get('failed', []) if i.when != 'teardown'])
_SKIPPED = len([i for i in terminalreporter.stats.get('skipped', []) if i.when != 'teardown'])
_TOTAL = terminalreporter._numcollected
_TIMES = time.time() - terminalreporter._sessionstarttime
INFO.logger.error(f"用例总数: {_TOTAL}")
INFO.logger.error(f"异常用例数: {_ERROR}")
ERROR.logger.error(f"失败用例数: {_FAILED}")
WARNING.logger.warning(f"跳过用例数: {_SKIPPED}")
INFO.logger.info("用例执行时长: %.2f" % _TIMES + " s")
try:
_RATE = _PASSED / _TOTAL * 100
INFO.logger.info("用例成功率: %.2f" % _RATE + " %")
except ZeroDivisionError:
INFO.logger.info("用例成功率: 0.00 %")
from utils.read_files_tools.yaml_control import GetYamlData
from common.setting import ensure_path_sep
from utils.other_tools.models import Config
_data = GetYamlData(ensure_path_sep("\\common\\config.yaml")).get_yaml_data()
config = Config(**_data)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
断言类型封装,支持json响应断言、数据库断言
"""
import ast
import json
from typing import Text, Dict, Any, Union
from jsonpath import jsonpath
from utils.other_tools.models import AssertMethod
from utils.logging_tool.log_control import ERROR, WARNING
from utils.read_files_tools.regular_control import cache_regular
from utils.other_tools.models import load_module_functions
from utils.assertion import assert_type
from utils.other_tools.exceptions import JsonpathExtractionFailed, SqlNotFound, AssertTypeError
from utils import config
class AssertUtil:
def __init__(self, assert_data, sql_data, request_data, response_data, status_code):
self.response_data = response_data
self.request_data = request_data
self.sql_data = sql_data
self.assert_data = assert_data
self.sql_switch = config.mysql_db.switch
self.status_code = status_code
@staticmethod
def literal_eval(attr):
return ast.literal_eval(cache_regular(str(attr)))
@property
def get_assert_data(self):
assert self.assert_data is not None, (
"'%s' should either include a `assert_data` attribute, "
% self.__class__.__name__
)
return ast.literal_eval(cache_regular(str(self.assert_data)))
@property
def get_type(self):
assert 'type' in self.get_assert_data.keys(), (
" 断言数据: '%s' 中缺少 `type` 属性 " % self.get_assert_data
)
# 获取断言类型对应的枚举值
name = AssertMethod(self.get_assert_data.get("type")).name
return name
@property
def get_value(self):
assert 'value' in self.get_assert_data.keys(), (
" 断言数据: '%s' 中缺少 `value` 属性 " % self.get_assert_data
)
return self.get_assert_data.get("value")
@property
def get_jsonpath(self):
assert 'jsonpath' in self.get_assert_data.keys(), (
" 断言数据: '%s' 中缺少 `jsonpath` 属性 " % self.get_assert_data
)
return self.get_assert_data.get("jsonpath")
@property
def get_assert_type(self):
assert 'AssertType' in self.get_assert_data.keys(), (
" 断言数据: '%s' 中缺少 `AssertType` 属性 " % self.get_assert_data
)
return self.get_assert_data.get("AssertType")
@property
def get_message(self):
"""
获取断言描述,如果未填写,则返回 `None`
:return:
"""
return self.get_assert_data.get("message", None)
@property
def get_sql_data(self):
# 判断数据库开关为开启,并需要数据库断言的情况下,未编写sql,则抛异常
if self.sql_switch_handle:
assert self.sql_data != {'sql': None}, (
"请在用例中添加您要查询的SQL语句。"
)
# 处理 mysql查询出来的数据类型如果是bytes类型,转换成str类型
if isinstance(self.sql_data, bytes):
return self.sql_data.decode('utf=8')
sql_data = jsonpath(self.sql_data, self.get_value)
assert sql_data is not False, (
f"数据库断言数据提取失败,提取对象: {self.sql_data} , 当前语法: {self.get_value}"
)
if len(sql_data) > 1:
return sql_data
return sql_data[0]
@staticmethod
def functions_mapping():
return load_module_functions(assert_type)
@property
def get_response_data(self):
return json.loads(self.response_data)
@property
def sql_switch_handle(self):
"""
判断数据库开关,如果未开启,则打印断言部分的数据
:return:
"""
if self.sql_switch is False:
WARNING.logger.warning(
"检测到数据库状态为关闭状态,程序已为您跳过此断言,断言值:%s" % self.get_assert_data
)
return self.sql_switch
def _assert(self, check_value: Any, expect_value: Any, message: Text = ""):
self.functions_mapping()[self.get_type](check_value, expect_value, str(message))
@property
def _assert_resp_data(self):
resp_data = jsonpath(self.get_response_data, self.get_jsonpath)
assert resp_data is not False, (
f"jsonpath数据提取失败,提取对象: {self.get_response_data} , 当前语法: {self.get_jsonpath}"
)
if len(resp_data) > 1:
return resp_data
return resp_data[0]
@property
def _assert_request_data(self):
req_data = jsonpath(self.request_data, self.get_jsonpath)
assert req_data is not False, (
f"jsonpath数据提取失败,提取对象: {self.request_data} , 当前语法: {self.get_jsonpath}"
)
if len(req_data) > 1:
return req_data
return req_data[0]
def assert_type_handle(self):
# 判断请求参数数据库断言
if self.get_assert_type == "R_SQL":
self._assert(self._assert_request_data, self.get_sql_data, self.get_message)
# 判断请求参数为响应数据库断言
elif self.get_assert_type == "SQL" or self.get_assert_type == "D_SQL":
self._assert(self._assert_resp_data, self.get_sql_data, self.get_message)
# 判断非数据库断言类型
elif self.get_assert_type is None:
self._assert(self._assert_resp_data, self.get_value, self.get_message)
else:
raise AssertTypeError("断言失败,目前只支持数据库断言和响应断言")
class Assert(AssertUtil):
def assert_data_list(self):
assert_list = []
for k, v in self.assert_data.items():
if k == "status_code":
assert self.status_code == v, "响应状态码断言失败"
else:
assert_list.append(v)
return assert_list
def assert_type_handle(self):
for i in self.assert_data_list():
self.assert_data = i
super().assert_type_handle()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Assert 断言类型
"""
from typing import Any, Union, Text
def equals(
check_value: Any, expect_value: Any, message: Text = ""
):
"""判断是否相等"""
assert check_value == expect_value, message
def less_than(
check_value: Union[int, float], expect_value: Union[int, float], message: Text = ""
):
"""判断实际结果小于预期结果"""
assert check_value < expect_value, message
def less_than_or_equals(
check_value: Union[int, float], expect_value: Union[int, float], message: Text = ""):
"""判断实际结果小于等于预期结果"""
assert check_value <= expect_value, message
def greater_than(
check_value: Union[int, float], expect_value: Union[int, float], message: Text = ""
):
"""判断实际结果大于预期结果"""
assert check_value > expect_value, message
def greater_than_or_equals(
check_value: Union[int, float], expect_value: Union[int, float], message: Text = ""
):
"""判断实际结果大于等于预期结果"""
assert check_value >= expect_value, message
def not_equals(
check_value: Any, expect_value: Any, message: Text = ""
):
"""判断实际结果不等于预期结果"""
assert check_value != expect_value, message
def string_equals(
check_value: Text, expect_value: Any, message: Text = ""
):
"""判断字符串是否相等"""
assert check_value == expect_value, message
def length_equals(
check_value: Text, expect_value: int, message: Text = ""
):
"""判断长度是否相等"""
assert isinstance(
expect_value, int
), "expect_value 需要为 int 类型"
assert len(check_value) == expect_value, message
def length_greater_than(
check_value: Text, expect_value: Union[int, float], message: Text = ""
):
"""判断长度大于"""
assert isinstance(
expect_value, (float, int)
), "expect_value 需要为 float/int 类型"
assert len(str(check_value)) > expect_value, message
def length_greater_than_or_equals(
check_value: Text, expect_value: Union[int, float], message: Text = ""
):
"""判断长度大于等于"""
assert isinstance(
expect_value, (int, float)
), "expect_value 需要为 float/int 类型"
assert len(check_value) >= expect_value, message
def length_less_than(
check_value: Text, expect_value: Union[int, float], message: Text = ""
):
"""判断长度小于"""
assert isinstance(
expect_value, (int, float)
), "expect_value 需要为 float/int 类型"
assert len(check_value) < expect_value, message
def length_less_than_or_equals(
check_value: Text, expect_value: Union[int, float], message: Text = ""
):
"""判断长度小于等于"""
assert isinstance(
expect_value, (int, float)
), "expect_value 需要为 float/int 类型"
assert len(check_value) <= expect_value, message
def contains(check_value: Any, expect_value: Any, message: Text = ""):
"""判断期望结果内容包含在实际结果中"""
assert isinstance(
check_value, (list, tuple, dict, str, bytes)
), "expect_value 需要为 list/tuple/dict/str/bytes 类型"
assert expect_value in check_value, message
def contained_by(check_value: Any, expect_value: Any, message: Text = ""):
"""判断实际结果包含在期望结果中"""
assert isinstance(
expect_value, (list, tuple, dict, str, bytes)
), "expect_value 需要为 list/tuple/dict/str/bytes 类型"
assert check_value in expect_value, message
def startswith(
check_value: Any, expect_value: Any, message: Text = ""
):
"""检查响应内容的开头是否和预期结果内容的开头相等"""
assert str(check_value).startswith(str(expect_value)), message
def endswith(
check_value: Any, expect_value: Any, message: Text = ""
):
"""检查响应内容的结尾是否和预期结果内容相等"""
assert str(check_value).endswith(str(expect_value)), message
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
缓存文件处理
"""
import os
from typing import Any, Text, Union
from common.setting import ensure_path_sep
from utils.other_tools.exceptions import ValueNotFoundError
class Cache:
""" 设置、读取缓存 """
def __init__(self, filename: Union[Text, None]) -> None:
# 如果filename不为空,则操作指定文件内容
if filename:
self.path = ensure_path_sep("\\cache" + filename)
# 如果filename为None,则操作所有文件内容
else:
self.path = ensure_path_sep("\\cache")
def set_cache(self, key: Text, value: Any) -> None:
"""
设置缓存, 只支持设置单字典类型缓存数据, 缓存文件如以存在,则替换之前的缓存内容
:return:
"""
with open(self.path, 'w', encoding='utf-8') as file:
file.write(str({key: value}))
def set_caches(self, value: Any) -> None:
"""
设置多组缓存数据
:param value: 缓存内容
:return:
"""
with open(self.path, 'w', encoding='utf-8') as file:
file.write(str(value))
def get_cache(self) -> Any:
"""
获取缓存数据
:return:
"""
try:
with open(self.path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
pass
def clean_cache(self) -> None:
"""删除所有缓存文件"""
if not os.path.exists(self.path):
raise FileNotFoundError(f"您要删除的缓存文件不存在 {self.path}")
os.remove(self.path)
@classmethod
def clean_all_cache(cls) -> None:
"""
清除所有缓存文件
:return:
"""
cache_path = ensure_path_sep("\\cache")
# 列出目录下所有文件,生成一个list
list_dir = os.listdir(cache_path)
for i in list_dir:
# 循环删除文件夹下得所有内容
os.remove(cache_path + i)
_cache_config = {}
class CacheHandler:
@staticmethod
def get_cache(cache_data):
try:
return _cache_config[cache_data]
except KeyError:
raise ValueNotFoundError(f"{cache_data}的缓存数据未找到,请检查是否将该数据存入缓存中")
@staticmethod
def update_cache(*, cache_name, value):
_cache_config[cache_name] = value
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
redis 缓存操作封装
"""
from typing import Text, Any
import redis
class RedisHandler:
""" redis 缓存读取封装 """
def __init__(self):
self.host = '127.0.0.0'
self.port = 6379
self.database = 0
self.password = 123456
self.charset = 'UTF-8'
self.redis = redis.Redis(
self.host,
port=self.port,
password=self.password,
decode_responses=True,
db=self.database
)
def set_string(
self, name: Text,
value, exp_time=None,
exp_milliseconds=None,
name_not_exist=False,
name_exit=False) -> None:
"""
缓存中写入 str(单个)
:param name: 缓存名称
:param value: 缓存值
:param exp_time: 过期时间(秒)
:param exp_milliseconds: 过期时间(毫秒)
:param name_not_exist: 如果设置为True,则只有name不存在时,当前set操作才执行(新增)
:param name_exit: 如果设置为True,则只有name存在时,当前set操作才执行(修改)
:return:
"""
self.redis.set(
name,
value,
ex=exp_time,
px=exp_milliseconds,
nx=name_not_exist,
xx=name_exit
)
def key_exit(self, key: Text):
"""
判断redis中的key是否存在
:param key:
:return:
"""
return self.redis.exists(key)
def incr(self, key: Text):
"""
使用 incr 方法,处理并发问题
当 key 不存在时,则会先初始为 0, 每次调用,则会 +1
:return:
"""
self.redis.incr(key)
def get_key(self, name: Any) -> Text:
"""
读取缓存
:param name:
:return:
"""
return self.redis.get(name)
def set_many(self, *args, **kwargs):
"""
批量设置
支持如下方式批量设置缓存
eg: set_many({'k1': 'v1', 'k2': 'v2'})
set_many(k1="v1", k2="v2")
:return:
"""
self.redis.mset(*args, **kwargs)
def get_many(self, *args):
"""获取多个值"""
results = self.redis.mget(*args)
return results
def del_all_cache(self):
"""清理所有现在的数据"""
for key in self.redis.keys():
self.del_cache(key)
def del_cache(self, name):
"""
删除缓存
:param name:
:return:
"""
self.redis.delete(name)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
日志封装,可设置不同等级的日志颜色
"""
import logging
from logging import handlers
from typing import Text
import colorlog
import time
from common.setting import ensure_path_sep
class LogHandler:
""" 日志打印封装"""
# 日志级别关系映射
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
def __init__(
self,
filename: Text,
level: Text = "info",
when: Text = "D",
fmt: Text = "%(levelname)-8s%(asctime)s%(name)s:%(filename)s:%(lineno)d %(message)s"
):
self.logger = logging.getLogger(filename)
formatter = self.log_color()
# 设置日志格式
format_str = logging.Formatter(fmt)
# 设置日志级别
self.logger.setLevel(self.level_relations.get(level))
# 往屏幕上输出
screen_output = logging.StreamHandler()
# 设置屏幕上显示的格式
screen_output.setFormatter(formatter)
# 往文件里写入#指定间隔时间自动生成文件的处理器
time_rotating = handlers.TimedRotatingFileHandler(
filename=filename,
when=when,
backupCount=3,
encoding='utf-8'
)
# 设置文件里写入的格式
time_rotating.setFormatter(format_str)
# 把对象加到logger里
self.logger.addHandler(screen_output)
self.logger.addHandler(time_rotating)
self.log_path = ensure_path_sep('\\logs\\log.log')
@classmethod
def log_color(cls):
""" 设置日志颜色 """
log_colors_config = {
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
formatter = colorlog.ColoredFormatter(
'%(log_color)s[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s',
log_colors=log_colors_config
)
return formatter
now_time_day = time.strftime("%Y-%m-%d", time.localtime())
INFO = LogHandler(ensure_path_sep(f"\\logs\\info-{now_time_day}.log"), level='info')
ERROR = LogHandler(ensure_path_sep(f"\\logs\\error-{now_time_day}.log"), level='error')
WARNING = LogHandler(ensure_path_sep(f'\\logs\\warning-{now_time_day}.log'))
if __name__ == '__main__':
ERROR.logger.error("测试")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
日志装饰器,控制程序日志输入,默认为 True
如设置 False,则程序不会打印日志
"""
import ast
from functools import wraps
from utils.read_files_tools.regular_control import cache_regular
from utils.logging_tool.log_control import INFO, ERROR
def log_decorator(switch: bool):
"""
封装日志装饰器, 打印请求信息
:param switch: 定义日志开关
:return:
"""
def decorator(func):
@wraps(func)
def swapper(*args, **kwargs):
# 判断日志为开启状态,才打印日志
res = func(*args, **kwargs)
# 判断日志开关为开启状态
if switch:
_log_msg = f"\n======================================================\n" \
f"用例标题: {res.detail}\n" \
f"请求路径: {res.url}\n" \
f"请求方式: {res.method}\n" \
f"请求头: {res.headers}\n" \
f"请求内容: {res.request_body}\n" \
f"接口响应内容: {res.response_data}\n" \
f"接口响应时长: {res.res_time} ms\n" \
f"Http状态码: {res.status_code}\n" \
"====================================================="
_is_run = ast.literal_eval(cache_regular(str(res.is_run)))
# 判断正常打印的日志,控制台输出绿色
if _is_run in (True, None) and res.status_code == 200:
INFO.logger.info(_log_msg)
else:
# 失败的用例,控制台打印红色
ERROR.logger.error(_log_msg)
return res
return swapper
return decorator
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
统计请求运行时长装饰器,如请求响应时间超时
程序中会输入红色日志,提示时间 http 请求超时,默认时长为 3000ms
"""
from utils.logging_tool.log_control import ERROR
def execution_duration(number: int):
"""
封装统计函数执行时间装饰器
:param number: 函数预计运行时长
:return:
"""
def decorator(func):
def swapper(*args, **kwargs):
res = func(*args, **kwargs)
run_time = res.res_time
# 计算时间戳毫米级别,如果时间大于number,则打印 函数名称 和运行时间
if run_time > number:
ERROR.logger.error(
"\n==============================================\n"
"测试用例执行时间较长,请关注.\n"
"函数运行时间: %s ms\n"
"测试用例相关数据: %s\n"
"================================================="
, run_time, res)
return res
return swapper
return decorator
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
mysql 封装,支持 增、删、改、查
"""
import ast
import datetime
import decimal
from warnings import filterwarnings
import pymysql
from typing import List, Union, Text, Dict
from utils import config
from utils.logging_tool.log_control import ERROR
from utils.read_files_tools.regular_control import sql_regular
from utils.read_files_tools.regular_control import cache_regular
from utils.other_tools.exceptions import DataAcquisitionFailed, ValueTypeError
# 忽略 Mysql 告警信息
filterwarnings("ignore", category=pymysql.Warning)
class MysqlDB:
""" mysql 封装 """
if config.mysql_db.switch:
def __init__(self):
try:
# 建立数据库连接
self.conn = pymysql.connect(
host=config.mysql_db.host,
user=config.mysql_db.user,
password=config.mysql_db.password,
port=config.mysql_db.port
)
# 使用 cursor 方法获取操作游标,得到一个可以执行sql语句,并且操作结果为字典返回的游标
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
except AttributeError as error:
ERROR.logger.error("数据库连接失败,失败原因 %s", error)
def __del__(self):
try:
# 关闭游标
self.cur.close()
# 关闭连接
self.conn.close()
except AttributeError as error:
ERROR.logger.error("数据库连接失败,失败原因 %s", error)
def query(self, sql, state="all"):
"""
查询
:param sql:
:param state: all 是默认查询全部
:return:
"""
try:
self.cur.execute(sql)
if state == "all":
# 查询全部
data = self.cur.fetchall()
else:
# 查询单条
data = self.cur.fetchone()
return data
except AttributeError as error_data:
ERROR.logger.error("数据库连接失败,失败原因 %s", error_data)
raise
def execute(self, sql: Text):
"""
更新 、 删除、 新增
:param sql:
:return:
"""
try:
# 使用 execute 操作 sql
rows = self.cur.execute(sql)
# 提交事务
self.conn.commit()
return rows
except AttributeError as error:
ERROR.logger.error("数据库连接失败,失败原因 %s", error)
# 如果事务异常,则回滚数据
self.conn.rollback()
raise
@classmethod
def sql_data_handler(cls, query_data, data):
"""
处理部分类型sql查询出来的数据格式
@param query_data: 查询出来的sql数据
@param data: 数据池
@return:
"""
# 将sql 返回的所有内容全部放入对象中
for key, value in query_data.items():
if isinstance(value, decimal.Decimal):
data[key] = float(value)
elif isinstance(value, datetime.datetime):
data[key] = str(value)
else:
data[key] = value
return data
class SetUpMySQL(MysqlDB):
""" 处理前置sql """
def setup_sql_data(self, sql: Union[List, None]) -> Dict:
"""
处理前置请求sql
:param sql:
:return:
"""
sql = ast.literal_eval(cache_regular(str(sql)))
try:
data = {}
if sql is not None:
for i in sql:
# 判断断言类型为查询类型的时候,
if i[0:6].upper() == 'SELECT':
sql_date = self.query(sql=i)[0]
for key, value in sql_date.items():
data[key] = value
else:
self.execute(sql=i)
return data
except IndexError as exc:
raise DataAcquisitionFailed("sql 数据查询失败,请检查setup_sql语句是否正确") from exc
class AssertExecution(MysqlDB):
""" 处理断言sql数据 """
def assert_execution(self, sql: list, resp) -> dict:
"""
执行 sql, 负责处理 yaml 文件中的断言需要执行多条 sql 的场景,最终会将所有数据以对象形式返回
:param resp: 接口响应数据
:param sql: sql
:return:
"""
try:
if isinstance(sql, list):
data = {}
_sql_type = ['UPDATE', 'update', 'DELETE', 'delete', 'INSERT', 'insert']
if any(i in sql for i in _sql_type) is False:
for i in sql:
# 判断sql中是否有正则,如果有则通过jsonpath提取相关的数据
sql = sql_regular(i, resp)
if sql is not None:
# for 循环逐条处理断言 sql
query_data = self.query(sql)[0]
data = self.sql_data_handler(query_data, data)
else:
raise DataAcquisitionFailed(f"该条sql未查询出任何数据, {sql}")
else:
raise DataAcquisitionFailed("断言的 sql 必须是查询的 sql")
else:
raise ValueTypeError("sql数据类型不正确,接受的是list")
return data
except Exception as error_data:
ERROR.logger.error("数据库连接失败,失败原因 %s", error_data)
raise error_data
if __name__ == '__main__':
a = MysqlDB()
b = a.query(sql="select * from `test_obp_configure`.lottery_prize where activity_id = 3")
print(b)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
钉钉通知封装
"""
import base64
import hashlib
import hmac
import time
import urllib.parse
from typing import Any, Text
from dingtalkchatbot.chatbot import DingtalkChatbot, FeedLink
from utils.other_tools.get_local_ip import get_host_ip
from utils.other_tools.allure_data.allure_report_data import AllureFileClean, TestMetrics
from utils import config
class DingTalkSendMsg:
""" 发送钉钉通知 """
def __init__(self, metrics: TestMetrics):
self.metrics = metrics
self.timeStamp = str(round(time.time() * 1000))
def xiao_ding(self):
sign = self.get_sign()
# 从yaml文件中获取钉钉配置信息
webhook = config.ding_talk.webhook + "&timestamp=" + self.timeStamp + "&sign=" + sign
return DingtalkChatbot(webhook)
def get_sign(self) -> Text:
"""
根据时间戳 + "sign" 生成密钥
:return:
"""
string_to_sign = f'{self.timeStamp}\n{config.ding_talk.secret}'.encode('utf-8')
hmac_code = hmac.new(
config.ding_talk.secret.encode('utf-8'),
string_to_sign,
digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def send_text(
self,
msg: Text,
mobiles=None
) -> None:
"""
发送文本信息
:param msg: 文本内容
:param mobiles: 艾特用户电话
:return:
"""
if not mobiles:
self.xiao_ding().send_text(msg=msg, is_at_all=True)
else:
if isinstance(mobiles, list):
self.xiao_ding().send_text(msg=msg, at_mobiles=mobiles)
else:
raise TypeError("mobiles类型错误 不是list类型.")
def send_link(
self,
title: Text,
text: Text,
message_url: Text,
pic_url: Text
) -> None:
"""
发送link通知
:return:
"""
self.xiao_ding().send_link(
title=title,
text=text,
message_url=message_url,
pic_url=pic_url
)
def send_markdown(
self,
title: Text,
msg: Text,
mobiles=None,
is_at_all=False
) -> None:
"""
:param is_at_all:
:param mobiles:
:param title:
:param msg:
markdown 格式
"""
if mobiles is None:
self.xiao_ding().send_markdown(title=title, text=msg, is_at_all=is_at_all)
else:
if isinstance(mobiles, list):
self.xiao_ding().send_markdown(title=title, text=msg, at_mobiles=mobiles)
else:
raise TypeError("mobiles类型错误 不是list类型.")
@staticmethod
def feed_link(
title: Text,
message_url: Text,
pic_url: Text
) -> Any:
""" FeedLink 二次封装 """
return FeedLink(
title=title,
message_url=message_url,
pic_url=pic_url
)
def send_feed_link(self, *arg) -> None:
"""发送 feed_lik """
self.xiao_ding().send_feed_card(list(arg))
def send_ding_notification(self):
""" 发送钉钉报告通知 """
# 判断如果有失败的用例,@所有人
is_at_all = False
if self.metrics.failed + self.metrics.broken > 0:
is_at_all = True
text = f"#### {config.project_name}自动化通知 " \
f"\n\n>Python脚本任务: {config.project_name}" \
f"\n\n>环境: TEST\n\n>" \
f"执行人: {config.tester_name}" \
f"\n\n>执行结果: {self.metrics.pass_rate}% " \
f"\n\n>总用例数: {self.metrics.total} " \
f"\n\n>成功用例数: {self.metrics.passed}" \
f" \n\n>失败用例数: {self.metrics.failed} " \
f" \n\n>异常用例数: {self.metrics.broken} " \
f"\n\n>跳过用例数: {self.metrics.skipped}" \
f" ![screenshot](" \
f"https://img.alicdn.com/tfs/TB1NwmBEL9TBuNjy1zbXXXpepXa-2400-1218.png" \
f")\n" \
f" > ###### 测试报告 [详情](http://{get_host_ip()}:9999/index.html) \n"
DingTalkSendMsg(AllureFileClean().get_case_count()).send_markdown(
title="【接口自动化通知】",
msg=text,
is_at_all=is_at_all
)
if __name__ == '__main__':
DingTalkSendMsg(AllureFileClean().get_case_count()).send_ding_notification()
"""
发送飞书通知
"""
import json
import logging
import time
import datetime
import requests
import urllib3
from utils.other_tools.allure_data.allure_report_data import TestMetrics
from utils import config
urllib3.disable_warnings()
try:
JSONDecodeError = json.decoder.JSONDecodeError
except AttributeError:
JSONDecodeError = ValueError
def is_not_null_and_blank_str(content):
"""
非空字符串
:param content: 字符串
:return: 非空 - True,空 - False
"""
return bool(content and content.strip())
class FeiShuTalkChatBot:
"""飞书机器人通知"""
def __init__(self, metrics: TestMetrics):
self.metrics = metrics
def send_text(self, msg: str):
"""
消息类型为text类型
:param msg: 消息内容
:return: 返回消息发送结果
"""
data = {"msg_type": "text", "at": {}}
if is_not_null_and_blank_str(msg): # 传入msg非空
data["content"] = {"text": msg}
else:
logging.error("text类型,消息内容不能为空!")
raise ValueError("text类型,消息内容不能为空!")
logging.debug('text类型:%s', data)
return self.post()
def post(self):
"""
发送消息(内容UTF-8编码)
:return: 返回消息发送结果
"""
rich_text = {
"email": "1603453211@qq.com",
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "【自动化测试通知】",
"content": [
[
{
"tag": "a",
"text": "测试报告",
"href": "https://192.168.xx.72:8080"
},
{
"tag": "at",
"user_id": "ou_18eac85d35a26f989317ad4f02e8bbbb"
# "text":"陈锐男"
}
],
[
{
"tag": "text",
"text": "测试 人员 : "
},
{
"tag": "text",
"text": f"{config.tester_name}"
}
],
[
{
"tag": "text",
"text": "运行 环境 : "
},
{
"tag": "text",
"text": f"{config.env}"
}
],
[{
"tag": "text",
"text": "成 功 率 : "
},
{
"tag": "text",
"text": f"{self.metrics.pass_rate} %"
}], # 成功率
[{
"tag": "text",
"text": "成功用例数 : "
},
{
"tag": "text",
"text": f"{self.metrics.passed}"
}], # 成功用例数
[{
"tag": "text",
"text": "失败用例数 : "
},
{
"tag": "text",
"text": f"{self.metrics.failed}"
}], # 失败用例数
[{
"tag": "text",
"text": "异常用例数 : "
},
{
"tag": "text",
"text": f"{self.metrics.failed}"
}], # 损坏用例数
[
{
"tag": "text",
"text": "时 间 : "
},
{
"tag": "text",
"text": f"{datetime.datetime.now().strftime('%Y-%m-%d')}"
}
],
[
{
"tag": "img",
"image_key": "d640eeea-4d2f-4cb3-88d8-c964fab53987",
"width": 300,
"height": 300
}
]
]
}
}
}
}
headers = {'Content-Type': 'application/json; charset=utf-8'}
post_data = json.dumps(rich_text)
response = requests.post(
config.lark.webhook,
headers=headers,
data=post_data,
verify=False
)
result = response.json()
if result.get('StatusCode') != 0:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
result_msg = result['errmsg'] if result.get('errmsg', False) else '未知异常'
error_data = {
"msgtype": "text",
"text": {
"content": f"[注意-自动通知]飞书机器人消息发送失败,时间:{time_now},"
f"原因:{result_msg},请及时跟进,谢谢!"
},
"at": {
"isAtAll": False
}
}
logging.error("消息发送失败,自动通知:%s", error_data)
requests.post(config.lark.webhook, headers=headers, data=json.dumps(error_data))
return result
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
描述: 发送邮件
"""
import smtplib
from email.mime.text import MIMEText
from utils.other_tools.allure_data.allure_report_data import TestMetrics, AllureFileClean
from utils import config
class SendEmail:
""" 发送邮箱 """
def __init__(self, metrics: TestMetrics):
self.metrics = metrics
self.allure_data = AllureFileClean()
self.CaseDetail = self.allure_data.get_failed_cases_detail()
@classmethod
def send_mail(cls, user_list: list, sub, content: str) -> None:
"""
@param user_list: 发件人邮箱
@param sub:
@param content: 发送内容
@return:
"""
user = "余少琪" + "<" + config.email.send_user + ">"
message = MIMEText(content, _subtype='plain', _charset='utf-8')
message['Subject'] = sub
message['From'] = user
message['To'] = ";".join(user_list)
server = smtplib.SMTP()
server.connect(config.email.email_host)
server.login(config.email.send_user, config.email.stamp_key)
server.sendmail(user, user_list, message.as_string())
server.close()
def error_mail(self, error_message: str) -> None:
"""
执行异常邮件通知
@param error_message: 报错信息
@return:
"""
email = config.email.send_list
user_list = email.split(',') # 多个邮箱发送,config文件中直接添加 '806029174@qq.com'
sub = config.project_name + "接口自动化执行异常通知"
content = f"自动化测试执行完毕,程序中发现异常,请悉知。报错信息如下:\n{error_message}"
self.send_mail(user_list, sub, content)
def send_main(self) -> None:
"""
发送邮件
:return:
"""
email = config.email.send_list
user_list = email.split(',') # 多个邮箱发送,yaml文件中直接添加 '806029174@qq.com'
sub = config.project_name + "接口自动化报告"
content = f"""
各位同事, 大家好:
自动化用例执行完成,执行结果如下:
用例运行总数: {self.metrics.total} 个
通过用例个数: {self.metrics.passed} 个
失败用例个数: {self.metrics.failed} 个
异常用例个数: {self.metrics.broken} 个
跳过用例个数: {self.metrics.skipped} 个
成 功 率: {self.metrics.pass_rate} %
{self.allure_data.get_failed_cases_detail()}
**********************************
jenkins地址:https://121.xx.xx.47:8989/login
详细情况可登录jenkins平台查看,非相关负责人员可忽略此消息。谢谢。
"""
self.send_mail(user_list, sub, content)
if __name__ == '__main__':
SendEmail(AllureFileClean().get_case_count()).send_main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
描述: 发送企业微信通知
"""
import requests
from utils.logging_tool.log_control import ERROR
from utils.other_tools.allure_data.allure_report_data import TestMetrics, AllureFileClean
from utils.times_tool.time_control import now_time
from utils.other_tools.get_local_ip import get_host_ip
from utils.other_tools.exceptions import SendMessageError, ValueTypeError
from utils import config
class WeChatSend:
"""
企业微信消息通知
"""
def __init__(self, metrics: TestMetrics):
self.metrics = metrics
self.headers = {"Content-Type": "application/json"}
def send_text(self, content, mentioned_mobile_list=None):
"""
发送文本类型通知
:param content: 文本内容,最长不超过2048个字节,必须是utf8编码
:param mentioned_mobile_list: 手机号列表,提醒手机号对应的群成员(@某个成员),@all表示提醒所有人
:return:
"""
_data = {"msgtype": "text", "text": {"content": content, "mentioned_list": None,
"mentioned_mobile_list": mentioned_mobile_list}}
if mentioned_mobile_list is None or isinstance(mentioned_mobile_list, list):
# 判断手机号码列表中得数据类型,如果为int类型,发送得消息会乱码
if len(mentioned_mobile_list) >= 1:
for i in mentioned_mobile_list:
if isinstance(i, str):
res = requests.post(url=config.wechat.webhook, json=_data, headers=self.headers)
if res.json()['errcode'] != 0:
ERROR.logger.error(res.json())
raise SendMessageError("企业微信「文本类型」消息发送失败")
else:
raise ValueTypeError("手机号码必须是字符串类型.")
else:
raise ValueTypeError("手机号码列表必须是list类型.")
def send_markdown(self, content):
"""
发送 MarkDown 类型消息
:param content: 消息内容,markdown形式
:return:
"""
_data = {"msgtype": "markdown", "markdown": {"content": content}}
res = requests.post(url=config.wechat.webhook, json=_data, headers=self.headers)
if res.json()['errcode'] != 0:
ERROR.logger.error(res.json())
raise SendMessageError("企业微信「MarkDown类型」消息发送失败")
def _upload_file(self, file):
"""
先将文件上传到临时媒体库
"""
key = config.wechat.webhook.split("key=")[1]
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file"
data = {"file": open(file, "rb")}
res = requests.post(url, files=data).json()
return res['media_id']
def send_file_msg(self, file):
"""
发送文件类型的消息
@return:
"""
_data = {"msgtype": "file", "file": {"media_id": self._upload_file(file)}}
res = requests.post(url=config.wechat.webhook, json=_data, headers=self.headers)
if res.json()['errcode'] != 0:
ERROR.logger.error(res.json())
raise SendMessageError("企业微信「file类型」消息发送失败")
def send_wechat_notification(self):
""" 发送企业微信通知 """
text = f"""【{config.project_name}自动化通知】
>测试环境:<font color=\"info\">TEST</font>
>测试负责人:@{config.tester_name}
>
> **执行结果**
><font color=\"info\">成 功 率 : {self.metrics.pass_rate}%</font>
>用例 总数:<font color=\"info\">{self.metrics.total}</font>
>成功用例数:<font color=\"info\">{self.metrics.passed}</font>
>失败用例数:`{self.metrics.failed}个`
>异常用例数:`{self.metrics.broken}个`
>跳过用例数:<font color=\"warning\">{self.metrics.skipped}个</font>
>用例执行时长:<font color=\"warning\">{self.metrics.time} s</font>
>时间:<font color=\"comment\">{now_time()}</font>
>
>非相关负责人员可忽略此消息。
>测试报告,点击查看>>[测试报告入口](http://{get_host_ip()}:9999/index.html)"""
WeChatSend(AllureFileClean().get_case_count()).send_markdown(text)
if __name__ == '__main__':
WeChatSend(AllureFileClean().get_case_count()).send_wechat_notification()
# coding=utf-8
"""
"""
from utils.mysql_tool.mysql_control import MysqlDB
import copy
class AddressDetection(MysqlDB):
def get_shop_address_entity_str(self):
"""
获取所有已经上线并且未删除的店铺地址(去除自定店铺,自营店铺没有地址)
:return:
"""
shop_info = self.query("SELECT id, name, attribute, shop_type, sub_shop_type "
"FROM `test_obp_supplier`.`supplier_shop` "
"where status = 2 and delete_flag = 0 and sub_shop_type > 300 "
"and sub_shop_type = 300")
return shop_info
def get_logistics_address_library(self):
"""
获取平台地址库中的省份code
:return:
"""
code = self.query("select name, code from `test_obp_order`.`logistics_address_library` "
"where parent_code > 0")
area_code = {}
for i in code:
area_code[i['code']] = i['name']
return area_code
def get_error_shop(self):
"""
获取错误的店铺数据
:return:
"""
# 获取区域code
get_logistics_address_library = self.get_logistics_address_library()
num = 0
for i in self.get_shop_address_entity_str():
# 获取店铺地址
shop_address_entity_str = eval(i['attribute'])['shopAddressEntityStr']
if shop_address_entity_str['countiesName'] == get_logistics_address_library[str(shop_address_entity_str['countiesCode'])]:
pass
else:
area_name = self.query(f"SELECT name, code FROM `test_obp_order`.`logistics_address_library`"
f" where parent_code = {shop_address_entity_str['cityCode']} and name = '{shop_address_entity_str['countiesName']}'")
num += 1
new_shop_address_entity_str = copy.deepcopy(shop_address_entity_str)
new_shop_address_entity_str['countiesCode'] = area_name[0]['code']
# print(str(f'update obp_supplier.supplier_shop set attribute = json_set(attribute,"$.shopAddressEntityStr.countiesCode",{area_name[0]["code"]}) where id = {i["id"]};'))
print(f"店铺名称: {i['name']}, 店铺id: {i['id']}, "
f"店铺地址:{shop_address_entity_str['cityName']}{shop_address_entity_str['provinceName']}{shop_address_entity_str['countiesName']}"
f"\n当前实际数据:{shop_address_entity_str}"
f"\n{shop_address_entity_str['countiesName']}的实际code码为 {area_name}"
f"\n更改后的数据: {new_shop_address_entity_str}")
print("*" * 100)
print(num)
AddressDetection().get_error_shop()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
描述: 收集 allure 报告
"""
import json
from typing import List, Text
from common.setting import ensure_path_sep
from utils.read_files_tools.get_all_files_path import get_all_files
from utils.other_tools.models import TestMetrics
class AllureFileClean:
"""allure 报告数据清洗,提取业务需要得数据"""
@classmethod
def get_testcases(cls) -> List:
""" 获取所有 allure 报告中执行用例的情况"""
# 将所有数据都收集到files中
files = []
for i in get_all_files(ensure_path_sep("\\report\\html\\data\\test-cases")):
with open(i, 'r', encoding='utf-8') as file:
date = json.load(file)
files.append(date)
return files
def get_failed_case(self) -> List:
""" 获取到所有失败的用例标题和用例代码路径"""
error_case = []
for i in self.get_testcases():
if i['status'] == 'failed' or i['status'] == 'broken':
error_case.append((i['name'], i['fullName']))
return error_case
def get_failed_cases_detail(self) -> Text:
""" 返回所有失败的测试用例相关内容 """
date = self.get_failed_case()
values = ""
# 判断有失败用例,则返回内容
if len(date) >= 1:
values = "失败用例:\n"
values += " **********************************\n"
for i in date:
values += " " + i[0] + ":" + i[1] + "\n"
return values
@classmethod
def get_case_count(cls) -> "TestMetrics":
""" 统计用例数量 """
try:
file_name = ensure_path_sep("\\report\\html\\widgets\\summary.json")
with open(file_name, 'r', encoding='utf-8') as file:
data = json.load(file)
_case_count = data['statistic']
_time = data['time']
keep_keys = {"passed", "failed", "broken", "skipped", "total"}
run_case_data = {k: v for k, v in data['statistic'].items() if k in keep_keys}
# 判断运行用例总数大于0
if _case_count["total"] > 0:
# 计算用例成功率
run_case_data["pass_rate"] = round(
(_case_count["passed"] + _case_count["skipped"]) / _case_count["total"] * 100, 2
)
else:
# 如果未运行用例,则成功率为 0.0
run_case_data["pass_rate"] = 0.0
# 收集用例运行时长
run_case_data['time'] = _time if run_case_data['total'] == 0 else round(_time['duration'] / 1000, 2)
return TestMetrics(**run_case_data)
except FileNotFoundError as exc:
raise FileNotFoundError(
"程序中检查到您未生成allure报告,"
"通常可能导致的原因是allure环境未配置正确,"
"详情可查看如下博客内容:"
"https://blog.csdn.net/weixin_43865008/article/details/124332793"
) from exc
if __name__ == '__main__':
AllureFileClean().get_case_count()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import json
import allure
from utils.other_tools.models import AllureAttachmentType
def allure_step(step: str, var: str) -> None:
"""
:param step: 步骤及附件名称
:param var: 附件内容
"""
with allure.step(step):
allure.attach(
json.dumps(
str(var),
ensure_ascii=False,
indent=4),
step,
allure.attachment_type.JSON)
def allure_attach(source: str, name: str, extension: str):
"""
allure报告上传附件、图片、excel等
:param source: 文件路径,相当于传一个文件
:param name: 附件名称
:param extension: 附件的拓展名称
:return:
"""
# 获取上传附件的尾缀,判断对应的 attachment_type 枚举值
_name = name.split('.')[-1].upper()
_attachment_type = getattr(AllureAttachmentType, _name, None)
allure.attach.file(
source=source,
name=name,
attachment_type=_attachment_type if _attachment_type is None else _attachment_type.value,
extension=extension
)
def allure_step_no(step: str):
"""
无附件的操作步骤
:param step: 步骤名称
:return:
"""
with allure.step(step):
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
描述:
"""
class MyBaseFailure(Exception):
pass
class JsonpathExtractionFailed(MyBaseFailure):
pass
class NotFoundError(MyBaseFailure):
pass
class FileNotFound(FileNotFoundError, NotFoundError):
pass
class SqlNotFound(NotFoundError):
pass
class AssertTypeError(MyBaseFailure):
pass
class DataAcquisitionFailed(MyBaseFailure):
pass
class ValueTypeError(MyBaseFailure):
pass
class SendMessageError(MyBaseFailure):
pass
class ValueNotFoundError(MyBaseFailure):
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import socket
def get_host_ip():
"""
查询本机ip地址
:return:
"""
_s = None
try:
_s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
_s.connect(('8.8.8.8', 80))
l_host = _s.getsockname()[0]
finally:
_s.close()
return l_host
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
# @Time : 2022/5/10 14:02
# @Author : 余少琪
# @Email : 1603453211@qq.com
# @File : install_requirements
# @describe: 判断程序是否每次会更新依赖库,如有更新,则自动安装
"""
import os
import chardet
from common.setting import ensure_path_sep
from utils.logging_tool.log_control import INFO
from utils import config
os.system("pip3 install chardet")
class InstallRequirements:
""" 自动识别安装最新的依赖库 """
def __init__(self):
self.version_library_comparisons_path = ensure_path_sep("\\utils\\other_tools\\install_tool\\") \
+ "version_library_comparisons.txt"
self.requirements_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) \
+ os.sep + "requirements.txt"
self.mirror_url = config.mirror_source
# 初始化时,获取最新的版本库
# os.system("pip freeze > {0}".format(self.requirements_path))
def read_version_library_comparisons_txt(self):
"""
获取版本比对默认的文件
@return:
"""
with open(self.version_library_comparisons_path, 'r', encoding="utf-8") as file:
return file.read().strip(' ')
@classmethod
def check_charset(cls, file_path):
"""获取文件的字符集"""
with open(file_path, "rb") as file:
data = file.read(4)
charset = chardet.detect(data)['encoding']
return charset
def read_requirements(self):
"""获取安装文件"""
file_data = ""
with open(
self.requirements_path,
'r',
encoding=self.check_charset(self.requirements_path)
) as file:
for line in file:
if "" in line:
line = line.replace("", "")
file_data += line
with open(
self.requirements_path,
"w",
encoding=self.check_charset(self.requirements_path)
) as file:
file.write(file_data)
return file_data
def text_comparison(self):
"""
版本库比对
@return:
"""
read_version_library_comparisons_txt = self.read_version_library_comparisons_txt()
read_requirements = self.read_requirements()
if read_version_library_comparisons_txt == read_requirements:
INFO.logger.info("程序中未检查到更新版本库,已为您跳过自动安装库")
# 程序中如出现不同的文件,则安装
else:
INFO.logger.info("程序中检测到您更新了依赖库,已为您自动安装")
os.system(f"pip3 install -r {self.requirements_path}")
with open(self.version_library_comparisons_path, "w",
encoding=self.check_charset(self.requirements_path)) as file:
file.write(read_requirements)
if __name__ == '__main__':
InstallRequirements().text_comparison()
aiofiles==0.8.0
allure-pytest==2.9.45
allure-python-commons==2.9.45
asgiref==3.5.1
atomicwrites==1.4.0
attrs==21.2.0
blinker==1.4
Brotli==1.0.9
certifi==2021.10.8
cffi==1.15.0
chardet==4.0.0
charset-normalizer==2.0.7
click==8.1.3
colorama==0.4.4
colorlog==6.6.0
cryptography==36.0.0
DingtalkChatbot==1.5.3
et-xmlfile==1.1.0
execnet==1.9.0
Faker==9.8.3
Flask==2.0.3
h11==0.13.0
h2==4.1.0
hpack==4.0.0
httptools==0.4.0
hyperframe==6.0.1
idna==3.3
iniconfig==1.1.0
itchat==1.3.10
itsdangerous==2.1.2
Jinja2==3.1.2
jsonpath==0.82
kaitaistruct==0.9
ldap3==2.9.1
MarkupSafe==2.1.1
mitmproxy==8.0.0
msgpack==1.0.3
multidict==6.0.2
openpyxl==3.0.9
packaging==21.3
passlib==1.7.4
pluggy==1.0.0
protobuf==3.19.4
publicsuffix2==2.20191221
py==1.11.0
pyasn1==0.4.8
pycparser==2.21
pydivert==2.1.0
PyMySQL==1.0.2
pyOpenSSL==21.0.0
pyparsing==3.0.6
pyperclip==1.8.2
pypng==0.0.21
PyQRCode==1.2.1
pytest==6.2.5
pytest-forked==1.3.0
pytest-xdist==2.4.0
python-dateutil==2.8.2
pywin32==304
PyYAML==6.0
requests==2.26.0
requests-toolbelt==0.9.1
ruamel.yaml==0.17.21
ruamel.yaml.clib==0.2.6
sanic==22.3.1
sanic-routing==22.3.0
six==1.16.0
sortedcontainers==2.4.0
text-unidecode==1.3
toml==0.10.2
tornado==6.1
urllib3==1.26.7
urwid==2.1.2
websockets==10.3
Werkzeug==2.1.2
wsproto==1.1.0
xlrd==2.0.1
xlutils==2.0.0
xlwings==0.27.7
xlwt==1.3.0
zstandard==0.17.0
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
"""
def jsonpath_replace(change_data, key_name, data_switch=None):
"""处理jsonpath数据"""
_new_data = key_name + ''
for i in change_data:
if i == '$':
pass
elif data_switch is None and i == "data":
_new_data += '.data'
elif i[0] == '[' and i[-1] == ']':
_new_data += "[" + i[1:-1] + "]"
else:
_new_data += '[' + '"' + i + '"' + "]"
return _new_data
if __name__ == '__main__':
jsonpath_replace(change_data=['$', 'data', 'id'], key_name='self.__yaml_case')
import types
from enum import Enum, unique
from typing import Text, Dict, Callable, Union, Optional, List, Any
from dataclasses import dataclass
from pydantic import BaseModel, Field
class NotificationType(Enum):
""" 自动化通知方式 """
DEFAULT = '0'
DING_TALK = '1'
WECHAT = '2'
EMAIL = '3'
FEI_SHU = '4'
@dataclass
class TestMetrics:
""" 用例执行数据 """
passed: int
failed: int
broken: int
skipped: int
total: int
pass_rate: float
time: Text
class RequestType(Enum):
"""
request请求发送,请求参数的数据类型
"""
JSON = "JSON"
PARAMS = "PARAMS"
DATA = "DATA"
FILE = 'FILE'
EXPORT = "EXPORT"
NONE = "NONE"
class TestCaseEnum(Enum):
URL = ("url", True)
HOST = ("host", True)
METHOD = ("method", True)
DETAIL = ("detail", True)
IS_RUN = ("is_run", True)
HEADERS = ("headers", True)
REQUEST_TYPE = ("requestType", True)
DATA = ("data", True)
DE_CASE = ("dependence_case", True)
DE_CASE_DATA = ("dependence_case_data", False)
CURRENT_RE_SET_CACHE = ("current_request_set_cache", False)
SQL = ("sql", False)
ASSERT_DATA = ("assert", True)
SETUP_SQL = ("setup_sql", False)
TEARDOWN = ("teardown", False)
TEARDOWN_SQL = ("teardown_sql", False)
SLEEP = ("sleep", False)
class Method(Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
PATCH = "PATCH"
DELETE = "DELETE"
HEAD = "HEAD"
OPTION = "OPTION"
def load_module_functions(module) -> Dict[Text, Callable]:
""" 获取 module中方法的名称和所在的内存地址 """
module_functions = {}
for name, item in vars(module).items():
if isinstance(item, types.FunctionType):
module_functions[name] = item
return module_functions
@unique
class DependentType(Enum):
"""
数据依赖相关枚举
"""
RESPONSE = 'response'
REQUEST = 'request'
SQL_DATA = 'sqlData'
CACHE = "cache"
class Assert(BaseModel):
jsonpath: Text
type: Text
value: Any
AssertType: Union[None, Text] = None
class DependentData(BaseModel):
dependent_type: Text
jsonpath: Text
set_cache: Optional[Text]
replace_key: Optional[Text]
class DependentCaseData(BaseModel):
case_id: Text
# dependent_data: List[DependentData]
dependent_data: Union[None, List[DependentData]] = None
class ParamPrepare(BaseModel):
dependent_type: Text
jsonpath: Text
set_cache: Text
class SendRequest(BaseModel):
dependent_type: Text
jsonpath: Optional[Text]
cache_data: Optional[Text]
set_cache: Optional[Text]
replace_key: Optional[Text]
class TearDown(BaseModel):
case_id: Text
param_prepare: Optional[List["ParamPrepare"]]
send_request: Optional[List["SendRequest"]]
class CurrentRequestSetCache(BaseModel):
type: Text
jsonpath: Text
name: Text
class TestCase(BaseModel):
url: Text
method: Text
detail: Text
# assert_data: Union[Dict, Text] = Field(..., alias="assert")
assert_data: Union[Dict, Text]
headers: Union[None, Dict, Text] = {}
requestType: Text
is_run: Union[None, bool, Text] = None
data: Any = None
dependence_case: Union[None, bool] = False
dependence_case_data: Optional[Union[None, List["DependentCaseData"], Text]] = None
sql: List = None
setup_sql: List = None
status_code: Optional[int] = None
teardown_sql: Optional[List] = None
teardown: Union[List["TearDown"], None] = None
current_request_set_cache: Optional[List["CurrentRequestSetCache"]]
sleep: Optional[Union[int, float]]
class ResponseData(BaseModel):
url: Text
is_run: Union[None, bool, Text]
detail: Text
response_data: Text
request_body: Any
method: Text
sql_data: Dict
yaml_data: "TestCase"
headers: Dict
cookie: Dict
assert_data: Dict
res_time: Union[int, float]
status_code: int
teardown: List["TearDown"] = None
teardown_sql: Union[None, List]
body: Any
class DingTalk(BaseModel):
webhook: Union[Text, None]
secret: Union[Text, None]
class MySqlDB(BaseModel):
switch: bool = False
host: Union[Text, None] = None
user: Union[Text, None] = None
password: Union[Text, None] = None
port: Union[int, None] = 3306
class Webhook(BaseModel):
webhook: Union[Text, None]
class Email(BaseModel):
send_user: Union[Text, None]
email_host: Union[Text, None]
stamp_key: Union[Text, None]
# 收件人
send_list: Union[Text, None]
class Config(BaseModel):
project_name: Text
env: Text
tester_name: Text
notification_type: Text = '0'
excel_report: bool
ding_talk: "DingTalk"
mysql_db: "MySqlDB"
mirror_source: Text
wechat: "Webhook"
email: "Email"
lark: "Webhook"
real_time_update_test_cases: bool = False
host: Text
app_host: Union[Text, None]
@unique
class AllureAttachmentType(Enum):
"""
allure 报告的文件类型枚举
"""
TEXT = "txt"
CSV = "csv"
TSV = "tsv"
URI_LIST = "uri"
HTML = "html"
XML = "xml"
JSON = "json"
YAML = "yaml"
PCAP = "pcap"
PNG = "png"
JPG = "jpg"
SVG = "svg"
GIF = "gif"
BMP = "bmp"
TIFF = "tiff"
MP4 = "mp4"
OGG = "ogg"
WEBM = "webm"
PDF = "pdf"
@unique
class AssertMethod(Enum):
"""断言类型"""
equals = "=="
less_than = "lt"
less_than_or_equals = "le"
greater_than = "gt"
greater_than_or_equals = "ge"
not_equals = "not_eq"
string_equals = "str_eq"
length_equals = "len_eq"
length_greater_than = "len_gt"
length_greater_than_or_equals = 'len_ge'
length_less_than = "len_lt"
length_less_than_or_equals = 'len_le'
contains = "contains"
contained_by = 'contained_by'
startswith = 'startswith'
endswith = 'endswith'
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import time
import threading
class PyTimer:
"""定时器类"""
def __init__(self, func, *args, **kwargs):
"""构造函数"""
self.func = func
self.args = args
self.kwargs = kwargs
self.running = False
def _run_func(self):
"""运行定时事件函数"""
_thread = threading.Thread(target=self.func, args=self.args, kwargs=self.kwargs)
_thread.setDaemon(True)
_thread.start()
def _start(self, interval, once):
"""启动定时器的线程函数"""
interval = max(interval, 0.01)
if interval < 0.050:
_dt = interval / 10
else:
_dt = 0.005
if once:
deadline = time.time() + interval
while time.time() < deadline:
time.sleep(_dt)
# 定时时间到,调用定时事件函数
self._run_func()
else:
self.running = True
deadline = time.time() + interval
while self.running:
while time.time() < deadline:
time.sleep(_dt)
# 更新下一次定时时间
deadline += interval
# 定时时间到,调用定时事件函数
if self.running:
self._run_func()
def start(self, interval, once=False):
"""启动定时器
interval - 定时间隔,浮点型,以秒为单位,最高精度10毫秒
once - 是否仅启动一次,默认是连续的
"""
thread_ = threading.Thread(target=self._start, args=(interval, once))
thread_.setDaemon(True)
thread_.start()
def stop(self):
"""停止定时器"""
self.running = False
def do_something(name, gender='male'):
"""执行"""
print(time.time(), '定时时间到,执行特定任务')
print('name:%s, gender:%s', name, gender)
time.sleep(5)
print(time.time(), '完成特定任务')
timer = PyTimer(do_something, 'Alice', gender='female')
timer.start(0.5, once=False)
input('按回车键结束\n') # 此处阻塞住进程
timer.stop()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import os
from typing import Text, Dict
from common.setting import ensure_path_sep
from utils.read_files_tools.testcase_template import write_testcase_file
from utils.read_files_tools.yaml_control import GetYamlData
from utils.read_files_tools.get_all_files_path import get_all_files
from utils.other_tools.exceptions import ValueNotFoundError
class TestCaseAutomaticGeneration:
def __init__(self):
self.yaml_case_data = None
self.file_path = None
@property
def case_date_path(self) -> Text:
"""返回 yaml 用例文件路径"""
return ensure_path_sep("\\data")
@property
def case_path(self) -> Text:
""" 存放用例代码路径"""
return ensure_path_sep("\\test_case")
@property
def allure_epic(self):
_allure_epic = self.yaml_case_data.get("case_common").get("allureEpic")
assert _allure_epic is not None, (
"用例中 allureEpic 为必填项,请检查用例内容, 用例路径:'%s'" % self.file_path
)
return _allure_epic
@property
def allure_feature(self):
_allure_feature = self.yaml_case_data.get("case_common").get("allureFeature")
assert _allure_feature is not None, (
"用例中 allureFeature 为必填项,请检查用例内容, 用例路径:'%s'" % self.file_path
)
return _allure_feature
@property
def allure_story(self):
_allure_story = self.yaml_case_data.get("case_common").get("allureStory")
assert _allure_story is not None, (
"用例中 allureStory 为必填项,请检查用例内容, 用例路径:'%s'" % self.file_path
)
return _allure_story
@property
def file_name(self) -> Text:
"""
通过 yaml文件的命名,将名称转换成 py文件的名称
:return: 示例: DateDemo.py
"""
i = len(self.case_date_path)
yaml_path = self.file_path[i:]
file_name = None
# 路径转换
if '.yaml' in yaml_path:
file_name = yaml_path.replace('.yaml', '.py')
elif '.yml' in yaml_path:
file_name = yaml_path.replace('.yml', '.py')
return file_name
@property
def get_test_class_title(self):
"""
自动生成类名称
:return: sup_apply_list --> SupApplyList
"""
# 提取文件名称
_file_name = os.path.split(self.file_name)[1][:-3]
_name = _file_name.split("_")
_name_len = len(_name)
# 将文件名称格式,转换成类名称: sup_apply_list --> SupApplyList
for i in range(_name_len):
_name[i] = _name[i].capitalize()
_class_name = "".join(_name)
return _class_name
@property
def func_title(self) -> Text:
"""
函数名称
:return:
"""
return os.path.split(self.file_name)[1][:-3]
@property
def spilt_path(self):
path = self.file_name.split(os.sep)
path[-1] = path[-1].replace(path[-1], "test_" + path[-1])
return path
@property
def get_case_path(self):
"""
根据 yaml 中的用例,生成对应 testCase 层代码的路径
:return: D:\\Project\\test_case\\test_case_demo.py
"""
new_name = os.sep.join(self.spilt_path)
return ensure_path_sep("\\test_case" + new_name)
@property
def case_ids(self):
return [k for k in self.yaml_case_data.keys() if k != "case_common"]
@property
def get_file_name(self):
# 这里通过“\\” 符号进行分割,提取出来文件名称
# 判断生成的 testcase 文件名称,需要以test_ 开头
case_name = self.spilt_path[-1].replace(
self.spilt_path[-1], "test_" + self.spilt_path[-1]
)
return case_name
def mk_dir(self) -> None:
""" 判断生成自动化代码的文件夹路径是否存在,如果不存在,则自动创建 """
# _LibDirPath = os.path.split(self.libPagePath(filePath))[0]
_case_dir_path = os.path.split(self.get_case_path)[0]
if not os.path.exists(_case_dir_path):
os.makedirs(_case_dir_path)
def get_case_automatic(self) -> None:
""" 自动生成 测试代码"""
file_path = get_all_files(file_path=ensure_path_sep("\\data"), yaml_data_switch=True)
for file in file_path:
# 判断代理拦截的yaml文件,不生成test_case代码
if 'proxy_data.yaml' not in file:
# 判断用例需要用的文件夹路径是否存在,不存在则创建
self.yaml_case_data = GetYamlData(file).get_yaml_data()
self.file_path = file
self.mk_dir()
write_testcase_file(
allure_epic=self.allure_epic,
allure_feature=self.allure_feature,
class_title=self.get_test_class_title,
func_title=self.func_title,
case_path=self.get_case_path,
case_ids=self.case_ids,
file_name=self.get_file_name,
allure_story=self.allure_story
)
if __name__ == '__main__':
TestCaseAutomaticGeneration().get_case_automatic()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import os
def del_file(path):
"""删除目录下的文件"""
list_path = os.listdir(path)
for i in list_path:
c_path = os.path.join(path, i)
if os.path.isdir(c_path):
del_file(c_path)
else:
os.remove(c_path)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import json
import xlrd
from xlutils.copy import copy
from common.setting import ensure_path_sep
def get_excel_data(sheet_name: str, case_name: any) -> list:
"""
读取 Excel 中的数据
:param sheet_name: excel 中的 sheet 页的名称
:param case_name: 测试用例名称
:return:
"""
res_list = []
excel_dire = ensure_path_sep("\\data\\TestLogin.xls")
work_book = xlrd.open_workbook(excel_dire, formatting_info=True)
# 打开对应的子表
work_sheet = work_book.sheet_by_name(sheet_name)
# 读取一行
idx = 0
for one in work_sheet.col_values(0):
# 运行需要运行的测试用例
if case_name in one:
req_body_data = work_sheet.cell(idx, 9).value
resp_data = work_sheet.cell(idx, 11).value
res_list.append((req_body_data, json.loads(resp_data)))
idx += 1
return res_list
def set_excel_data(sheet_index: int) -> tuple:
"""
excel 写入
:return:
"""
excel_dire = '../data/TestLogin.xls'
work_book = xlrd.open_workbook(excel_dire, formatting_info=True)
work_book_new = copy(work_book)
work_sheet_new = work_book_new.get_sheet(sheet_index)
return work_book_new, work_sheet_new
if __name__ == '__main__':
get_excel_data("异常用例", '111')
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import os
def get_all_files(file_path, yaml_data_switch=False) -> list:
"""
获取文件路径
:param file_path: 目录路径
:param yaml_data_switch: 是否过滤文件为 yaml格式, True则过滤
:return:
"""
filename = []
# 获取所有文件下的子文件名称
for root, dirs, files in os.walk(file_path):
for _file_path in files:
path = os.path.join(root, _file_path)
if yaml_data_switch:
if 'yaml' in path or '.yml' in path:
filename.append(path)
else:
filename.append(path)
return filename
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
from enum import Enum
from typing import Union, Text, Dict, List
from utils.read_files_tools.yaml_control import GetYamlData
from utils.other_tools.models import TestCase
from utils.other_tools.exceptions import ValueNotFoundError
from utils.cache_process.cache_control import CacheHandler
from utils import config
from utils.other_tools.models import RequestType, Method, TestCaseEnum
import os
class CaseDataCheck:
"""
yaml 数据解析, 判断数据填写是否符合规范
"""
def __init__(self, file_path):
self.file_path = file_path
if os.path.exists(self.file_path) is False:
raise FileNotFoundError("用例地址未找到")
self.case_data = None
self.case_id = None
def _assert(self, attr: Text):
assert attr in self.case_data.keys(), (
f"用例ID为 {self.case_id} 的用例中缺少 {attr} 参数,请确认用例内容是否编写规范."
f"当前用例文件路径:{self.file_path}"
)
def check_params_exit(self):
for enum in list(TestCaseEnum._value2member_map_.keys()):
if enum[1]:
self._assert(enum[0])
def check_params_right(self, enum_name, attr):
_member_names_ = enum_name._member_names_
assert attr.upper() in _member_names_, (
f"用例ID为 {self.case_id} 的用例中 {attr} 填写不正确,"
f"当前框架中只支持 {_member_names_} 类型."
f"如需新增 method 类型,请联系管理员."
f"当前用例文件路径:{self.file_path}"
)
return attr.upper()
@property
def get_method(self) -> Text:
return self.check_params_right(
Method,
self.case_data.get(TestCaseEnum.METHOD.value[0])
)
@property
def get_host(self) -> Text:
host = (
self.case_data.get(TestCaseEnum.HOST.value[0]) +
self.case_data.get(TestCaseEnum.URL.value[0])
)
return host
@property
def get_request_type(self):
return self.check_params_right(
RequestType,
self.case_data.get(TestCaseEnum.REQUEST_TYPE.value[0])
)
@property
def get_dependence_case_data(self):
_dep_data = self.case_data.get(TestCaseEnum.DE_CASE.value[0])
if _dep_data:
assert self.case_data.get(TestCaseEnum.DE_CASE_DATA.value[0]) is not None, (
f"程序中检测到您的 case_id 为 {self.case_id} 的用例存在依赖,但是 {_dep_data} 缺少依赖数据."
f"如已填写,请检查缩进是否正确, 用例路径: {self.file_path}"
)
return self.case_data.get(TestCaseEnum.DE_CASE_DATA.value[0])
@property
def get_assert(self):
_assert_data = self.case_data.get(TestCaseEnum.ASSERT_DATA.value[0])
assert _assert_data is not None, (
f"用例ID 为 {self.case_id} 未添加断言,用例路径: {self.file_path}"
)
return _assert_data
@property
def get_sql(self):
_sql = self.case_data.get(TestCaseEnum.SQL.value[0])
# 判断数据库开关为开启状态,并且sql不为空
if config.mysql_db.switch and _sql is None:
return None
return _sql
class CaseData(CaseDataCheck):
def case_process(self, case_id_switch: Union[None, bool] = None):
data = GetYamlData(self.file_path).get_yaml_data()
case_list = []
for key, values in data.items():
# 公共配置中的数据,与用例数据不同,需要单独处理
if key != 'case_common':
self.case_data = values
self.case_id = key
super().check_params_exit()
case_date = {
'method': self.get_method,
'is_run': self.case_data.get(TestCaseEnum.IS_RUN.value[0]),
'url': self.get_host,
'detail': self.case_data.get(TestCaseEnum.DETAIL.value[0]),
'headers': self.case_data.get(TestCaseEnum.HEADERS.value[0]),
'requestType': super().get_request_type,
'data': self.case_data.get(TestCaseEnum.DATA.value[0]),
'dependence_case': self.case_data.get(TestCaseEnum.DE_CASE.value[0]),
'dependence_case_data': self.get_dependence_case_data,
"current_request_set_cache": self.case_data.get(TestCaseEnum.CURRENT_RE_SET_CACHE.value[0]),
"sql": self.get_sql,
"assert_data": self.get_assert,
"setup_sql": self.case_data.get(TestCaseEnum.SETUP_SQL.value[0]),
"teardown": self.case_data.get(TestCaseEnum.TEARDOWN.value[0]),
"teardown_sql": self.case_data.get(TestCaseEnum.TEARDOWN_SQL.value[0]),
"sleep": self.case_data.get(TestCaseEnum.SLEEP.value[0]),
}
if case_id_switch is True:
case_list.append({key: TestCase(**case_date).dict()})
else:
case_list.append(TestCase(**case_date).dict())
return case_list
class GetTestCase:
@staticmethod
def case_data(case_id_lists: List):
case_lists = []
for i in case_id_lists:
_data = CacheHandler.get_cache(i)
case_lists.append(_data)
return case_lists
"""
Desc : 自定义函数调用
"""
import re
import datetime
import random
from datetime import date, timedelta, datetime
from jsonpath import jsonpath
from faker import Faker
from utils.logging_tool.log_control import ERROR
class Context:
""" 正则替换 """
def __init__(self):
self.faker = Faker(locale='zh_CN')
@classmethod
def random_int(cls) -> int:
"""
:return: 随机数
"""
_data = random.randint(0, 5000)
return _data
def get_phone(self) -> int:
"""
:return: 随机生成手机号码
"""
phone = self.faker.phone_number()
return phone
def get_id_number(self) -> int:
"""
:return: 随机生成身份证号码
"""
id_number = self.faker.ssn()
return id_number
def get_female_name(self) -> str:
"""
:return: 女生姓名
"""
female_name = self.faker.name_female()
return female_name
def get_male_name(self) -> str:
"""
:return: 男生姓名
"""
male_name = self.faker.name_male()
return male_name
def get_email(self) -> str:
"""
:return: 生成邮箱
"""
email = self.faker.email()
return email
@classmethod
def self_operated_id(cls):
"""自营店铺 ID """
operated_id = 212
return operated_id
@classmethod
def get_time(cls) -> str:
"""
计算当前时间
:return:
"""
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return now_time
@classmethod
def today_date(cls):
"""获取今日0点整时间"""
_today = date.today().strftime("%Y-%m-%d") + " 00:00:00"
return str(_today)
@classmethod
def time_after_week(cls):
"""获取一周后12点整的时间"""
_time_after_week = (date.today() + timedelta(days=+6)).strftime("%Y-%m-%d") + " 00:00:00"
return _time_after_week
@classmethod
def host(cls) -> str:
from utils import config
""" 获取接口域名 """
return config.host
@classmethod
def app_host(cls) -> str:
from utils import config
"""获取app的host"""
return config.app_host
def sql_json(js_path, res):
""" 提取 sql中的 json 数据 """
_json_data = jsonpath(res, js_path)[0]
if _json_data is False:
raise ValueError(f"sql中的jsonpath获取失败 {res}, {js_path}")
return jsonpath(res, js_path)[0]
def sql_regular(value, res=None):
"""
这里处理sql中的依赖数据,通过获取接口响应的jsonpath的值进行替换
:param res: jsonpath使用的返回结果
:param value:
:return:
"""
sql_json_list = re.findall(r"\$json\((.*?)\)\$", value)
for i in sql_json_list:
pattern = re.compile(r'\$json\(' + i.replace('$', "\$").replace('[', '\[') + r'\)\$')
key = str(sql_json(i, res))
value = re.sub(pattern, key, value, count=1)
return value
def cache_regular(value):
from utils.cache_process.cache_control import CacheHandler
"""
通过正则的方式,读取缓存中的内容
例:$cache{login_init}
:param value:
:return:
"""
# 正则获取 $cache{login_init}中的值 --> login_init
regular_dates = re.findall(r"\$cache\{(.*?)\}", value)
# 拿到的是一个list,循环数据
for regular_data in regular_dates:
value_types = ['int:', 'bool:', 'list:', 'dict:', 'tuple:', 'float:']
if any(i in regular_data for i in value_types) is True:
value_types = regular_data.split(":")[0]
regular_data = regular_data.split(":")[1]
# pattern = re.compile(r'\'\$cache{' + value_types.split(":")[0] + r'(.*?)}\'')
pattern = re.compile(r'\'\$cache\{' + value_types.split(":")[0] + ":" + regular_data + r'\}\'')
else:
pattern = re.compile(
r'\$cache\{' + regular_data.replace('$', "\$").replace('[', '\[') + r'\}'
)
try:
# cache_data = Cache(regular_data).get_cache()
cache_data = CacheHandler.get_cache(regular_data)
# 使用sub方法,替换已经拿到的内容
value = re.sub(pattern, str(cache_data), value)
except Exception:
pass
return value
def regular(target):
"""
新版本
使用正则替换请求数据
:return:
"""
try:
regular_pattern = r'\${{(.*?)}}'
while re.findall(regular_pattern, target):
key = re.search(regular_pattern, target).group(1)
value_types = ['int:', 'bool:', 'list:', 'dict:', 'tuple:', 'float:']
if any(i in key for i in value_types) is True:
func_name = key.split(":")[1].split("(")[0]
value_name = key.split(":")[1].split("(")[1][:-1]
if value_name == "":
value_data = getattr(Context(), func_name)()
else:
value_data = getattr(Context(), func_name)(*value_name.split(","))
regular_int_pattern = r'\'\${{(.*?)}}\''
target = re.sub(regular_int_pattern, str(value_data), target, 1)
else:
func_name = key.split("(")[0]
value_name = key.split("(")[1][:-1]
if value_name == "":
value_data = getattr(Context(), func_name)()
else:
value_data = getattr(Context(), func_name)(*value_name.split(","))
target = re.sub(regular_pattern, str(value_data), target, 1)
return target
except AttributeError:
ERROR.logger.error("未找到对应的替换的数据, 请检查数据是否正确 %s", target)
raise
except IndexError:
ERROR.logger.error("yaml中的 ${{}} 函数方法不正确,正确语法实例:${{get_time()}}")
raise
if __name__ == '__main__':
a = "${{host()}} aaa"
b = regular(a)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import json
from jsonpath import jsonpath
from common.setting import ensure_path_sep
from typing import Dict
from ruamel import yaml
import os
class SwaggerForYaml:
def __init__(self):
self._data = self.get_swagger_json()
@classmethod
def get_swagger_json(cls):
"""
获取 swagger 中的 json 数据
:return:
"""
try:
with open('./file/test_OpenAPI.json', "r", encoding='utf-8') as f:
row_data = json.load(f)
return row_data
except FileNotFoundError:
raise FileNotFoundError("文件路径不存在,请重新输入")
def get_allure_epic(self):
""" 获取 yaml 用例中的 allure_epic """
_allure_epic = self._data['info']['title']
return _allure_epic
@classmethod
def get_allure_feature(cls, value):
""" 获取 yaml 用例中的 allure_feature """
_allure_feature = value['tags']
return str(_allure_feature)
@classmethod
def get_allure_story(cls, value):
""" 获取 yaml 用例中的 allure_story """
_allure_story = value['summary']
return _allure_story
@classmethod
def get_case_id(cls, value):
""" 获取 case_id """
_case_id = value.replace("/", "_")
return "01" + _case_id
@classmethod
def get_detail(cls, value):
_get_detail = value['summary']
return "测试" + _get_detail
@classmethod
def get_request_type(cls, value, headers):
""" 处理 request_type """
if jsonpath(obj=value, expr="$.parameters") is not False:
_parameters = value['parameters']
if _parameters[0]['in'] == 'query':
return "params"
else:
if 'application/x-www-form-urlencoded' or 'multipart/form-data' in headers:
return "data"
elif 'application/json' in headers:
return "json"
elif 'application/octet-stream' in headers:
return "file"
else:
return "data"
@classmethod
def get_case_data(cls, value):
""" 处理 data 数据 """
_dict = {}
if jsonpath(obj=value, expr="$.parameters") is not False:
_parameters = value['parameters']
for i in _parameters:
if i['in'] == 'header':
...
else:
_dict[i['name']] = None
else:
return None
return _dict
@classmethod
def yaml_cases(cls, data: Dict, file_path: str) -> None:
"""
写入 yaml 数据
:param file_path:
:param data: 测试用例数据
:return:
"""
_file_path = ensure_path_sep("\\data\\" + file_path[1:].replace("/", os.sep) + '.yaml')
_file = _file_path.split(os.sep)[:-1]
_dir_path = ''
for i in _file:
_dir_path += i + os.sep
try:
os.makedirs(_dir_path)
except FileExistsError:
...
with open(_file_path, "a", encoding="utf-8") as file:
yaml.dump(data, file, Dumper=yaml.RoundTripDumper, allow_unicode=True)
file.write('\n')
@classmethod
def get_headers(cls, value):
""" 获取请求头 """
_headers = {}
if jsonpath(obj=value, expr="$.consumes") is not False:
_headers = {"Content-Type": value['consumes'][0]}
if jsonpath(obj=value, expr="$.parameters") is not False:
for i in value['parameters']:
if i['in'] == 'header':
_headers[i['name']] = None
else:
_headers = None
return _headers
def write_yaml_handler(self):
_api_data = self._data['paths']
for key, value in _api_data.items():
for k, v in value.items():
yaml_data = {
"case_common": {"allureEpic": self.get_allure_epic(), "allureFeature": self.get_allure_feature(v),
"allureStory": self.get_allure_story(v)},
self.get_case_id(key): {
"host": "${{host()}}", "url": key, "method": k, "detail": self.get_detail(v),
"headers": self.get_headers(v), "requestType": self.get_request_type(v, self.get_headers(v)),
"is_run": None, "data": self.get_case_data(v), "dependence_case": False,
"assert": {"status_code": 200}, "sql": None}}
self.yaml_cases(yaml_data, file_path=key)
if __name__ == '__main__':
SwaggerForYaml().write_yaml_handler()
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
# @describe: 用例模板
"""
import datetime
import os
from utils.read_files_tools.yaml_control import GetYamlData
from common.setting import ensure_path_sep
from utils.other_tools.exceptions import ValueNotFoundError
def write_case(case_path, page):
""" 写入用例数据 """
with open(case_path, 'w', encoding="utf-8") as file:
file.write(page)
def write_testcase_file(*, allure_epic, allure_feature, class_title,
func_title, case_path, case_ids, file_name, allure_story):
"""
:param allure_story:
:param file_name: 文件名称
:param allure_epic: 项目名称
:param allure_feature: 模块名称
:param class_title: 类名称
:param func_title: 函数名称
:param case_path: case 路径
:param case_ids: 用例ID
:return:
"""
conf_data = GetYamlData(ensure_path_sep("\\common\\config.yaml")).get_yaml_data()
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
real_time_update_test_cases = conf_data['real_time_update_test_cases']
page = f'''#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : {now}
import allure
import pytest
from utils.read_files_tools.get_yaml_data_analysis import GetTestCase
from utils.assertion.assert_control import Assert
from utils.requests_tool.request_control import RequestControl
from utils.read_files_tools.regular_control import regular
from utils.requests_tool.teardown_control import TearDownHandler
case_id = {case_ids}
TestData = GetTestCase.case_data(case_id)
re_data = regular(str(TestData))
@allure.epic("{allure_epic}")
@allure.feature("{allure_feature}")
class Test{class_title}:
@allure.story("{allure_story}")
@pytest.mark.parametrize('in_data', eval(re_data), ids=[i['detail'] for i in TestData])
def test_{func_title}(self, in_data, case_skip):
"""
:param :
:return:
"""
res = RequestControl(in_data).http_request()
TearDownHandler(res).teardown_handle()
Assert(assert_data=in_data['assert_data'],
sql_data=res.sql_data,
request_data=res.body,
response_data=res.response_data,
status_code=res.status_code).assert_type_handle()
if __name__ == '__main__':
pytest.main(['{file_name}', '-s', '-W', 'ignore:Module already imported:pytest.PytestWarning'])
'''
if real_time_update_test_cases:
write_case(case_path=case_path, page=page)
elif real_time_update_test_cases is False:
if not os.path.exists(case_path):
write_case(case_path=case_path, page=page)
else:
raise ValueNotFoundError("real_time_update_test_cases 配置不正确,只能配置 True 或者 False")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import os
import ast
import yaml.scanner
from utils.read_files_tools.regular_control import regular
class GetYamlData:
""" 获取 yaml 文件中的数据 """
def __init__(self, file_dir):
self.file_dir = str(file_dir)
def get_yaml_data(self) -> dict:
"""
获取 yaml 中的数据
:param: fileDir:
:return:
"""
# 判断文件是否存在
if os.path.exists(self.file_dir):
data = open(self.file_dir, 'r', encoding='utf-8')
res = yaml.load(data, Loader=yaml.FullLoader)
else:
raise FileNotFoundError("文件路径不存在")
return res
def write_yaml_data(self, key: str, value) -> int:
"""
更改 yaml 文件中的值, 并且保留注释内容
:param key: 字典的key
:param value: 写入的值
:return:
"""
with open(self.file_dir, 'r', encoding='utf-8') as file:
# 创建了一个空列表,里面没有元素
lines = []
for line in file.readlines():
if line != '\n':
lines.append(line)
file.close()
with open(self.file_dir, 'w', encoding='utf-8') as file:
flag = 0
for line in lines:
left_str = line.split(":")[0]
if key == left_str.lstrip() and '#' not in line:
newline = f"{left_str}: {value}"
line = newline
file.write(f'{line}\n')
flag = 1
else:
file.write(f'{line}')
file.close()
return flag
class GetCaseData(GetYamlData):
""" 获取测试用例中的数据 """
def get_different_formats_yaml_data(self) -> list:
"""
获取兼容不同格式的yaml数据
:return:
"""
res_list = []
for i in self.get_yaml_data():
res_list.append(i)
return res_list
def get_yaml_case_data(self):
"""
获取测试用例数据, 转换成指定数据格式
:return:
"""
_yaml_data = self.get_yaml_data()
# 正则处理yaml文件中的数据
re_data = regular(str(_yaml_data))
return ast.literal_eval(re_data)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment