一、框架菜单
1.1 common模块
1.2 其他
二、Excel接口测试案例编写
三、读取Excel测试封装(核心封装)
excel_utils.py 读取Excel中的数据
import os import xlrd #内置模块、第三方模块pip install 自定义模块 class ExcelUtils(): def __init__(self,file_path,sheet_name): self.file_path = file_path self.sheet_name = sheet_name self.sheet = self.get_sheet() # 整个表格对象 def get_sheet(self): wb = xlrd.open_workbook(self.file_path) sheet = wb.sheet_by_name(self.sheet_name) return sheet def get_row_count(self): row_count = self.sheet.nrows return row_count def get_col_count(self): col_count = self.sheet.ncols return col_count def __get_cell_value(self,row_index, col_index): cell_value = self.sheet.cell_value(row_index,col_index) return cell_value def get_merged_info(self): merged_info = self.sheet.merged_cells return merged_info def get_merged_cell_value(self,row_index, col_index): """既能获取普通单元格的数据又能获取合并单元格数据""" cell_value = None for (rlow, rhigh, clow, chigh) in self.get_merged_info(): if (row_index >= rlow and row_index < rhigh): if (col_index >= clow and col_index < chigh): cell_value = self.__get_cell_value(rlow, clow) break; # 防止循环去进行判断出现值覆盖的情况 else: cell_value = self.__get_cell_value(row_index, col_index) else: cell_value = self.__get_cell_value(row_index, col_index) return cell_value def get_sheet_data_by_dict(self): all_data_list = [] first_row = self.sheet.row(0) #获取首行数据 for row in range(1, self.get_row_count()): row_dict = {} for col in range(0, self.get_col_count()): row_dict[first_row[col].value] = self.get_merged_cell_value(row, col) all_data_list.append(row_dict) return all_data_list if __name__=='__main__': current_path = os.path.dirname(__file__) excel_path = os.path.join( current_path,'..','samples/data/test_case.xlsx' ) excelUtils = ExcelUtils(excel_path,"Sheet1") for row in excelUtils.get_sheet_data_by_dict(): print( row )
import os from common.excel_utils import ExcelUtils from common.config_utils import config current_path = os.path.dirname(__file__) test_data_path = os.path.join( current_path,'..', config.CASE_DATA_PATH ) class TestdataUtils(): def __init__(self,test_data_path = test_data_path): self.test_data_path = test_data_path self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict() self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info() def __get_testcase_data_dict(self): testcase_dict = {} for row_data in self.test_data: testcase_dict.setdefault( row_data['测试用例编号'],[] ).append( row_data ) return testcase_dict def def_testcase_data_list(self): testcase_list = [] for k,v in self.__get_testcase_data_dict().items(): one_case_dict = {} one_case_dict["case_id"] = k one_case_dict["case_info"] = v testcase_list.append( one_case_dict ) return testcase_list if __name__=="__main__": testdataUtils = TestdataUtils() for i in testdataUtils.def_testcase_data_list(): print( i )
testdata_utils.py 读取Excel中的数据后处理成需要的数据
四、request封装(核心封装)
requests_utils.py 包含post请求,get请求,异常,调用断言
import ast import re import requests import jsonpath from requests.exceptions import RequestException from requests.exceptions import ProxyError from requests.exceptions import ConnectionError from common.config_utils import config from common.check_utils import CheckUtils class RequestsUtils(): def __init__(self): self.hosts = config.hosts self.headers = {"ContentType":"application/json;charset=utf-8"} self.session = requests.session() self.temp_variables = {} def __get(self,get_info): try: url = self.hosts + get_info["请求地址"] response = self.session.get( url = url, params = ast.literal_eval(get_info["请求参数(get)"]) ) response.encoding = response.apparent_encoding if get_info["取值方式"] == "json取值": value = jsonpath.jsonpath( response.json(),get_info["取值代码"] )[0] self.temp_variables[ get_info["传值变量"] ] = value elif get_info["取值方式"] == "正则取值": value = re.findall(get_info["取值代码"],response.text)[0] self.temp_variables[get_info["传值变量"]] = value result = CheckUtils(response).run_check(get_info['期望结果类型'], get_info['期望结果']) except ProxyError as e: result = {'code': 4, 'result': '[%s]请求:代理错误异常' % (get_info["接口名称"])} except ConnectionError as e: result = {'code': 4, 'result': '[%s]请求:连接超时异常' % (get_info["接口名称"])} except RequestException as e: result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (get_info["接口名称"], e.__str__())} except Exception as e: result = {'code':4,'result':'[%s]请求:系统异常,原因:%s'%(get_info["接口名称"],e.__str__())} return result def __post(self,post_info): try: url = self.hosts + post_info["请求地址"] response = self.session.post( url = url, headers = self.headers, params = ast.literal_eval(post_info["请求参数(get)"]), # data = post_infos["提交数据(post)"], json=ast.literal_eval(post_info["提交数据(post)"]) ) response.encoding = response.apparent_encoding if post_info["取值方式"] == "json取值": value = jsonpath.jsonpath( response.json(),post_info["取值代码"] )[0] self.temp_variables[ post_info["传值变量"] ] = value elif post_info["取值方式"] == "正则取值": value = re.findall(post_info["取值代码"],response.text)[0] self.temp_variables[post_info["传值变量"]] = value #调用CheckUtils() result = CheckUtils(response).run_check(post_info['期望结果类型'],post_info['期望结果']) except ProxyError as e: result = {'code': 4, 'result': '[%s]请求:代理错误异常' % (post_info["接口名称"])} except ConnectionError as e: result = {'code': 4, 'result': '[%s]请求:连接超时异常' % (post_info["接口名称"])} except RequestException as e: result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (post_info["接口名称"], e.__str__())} except Exception as e: result = {'code':4,'result':'[%s]请求:系统异常,原因:%s'%(post_info["接口名称"],e.__str__())} return result def request(self,step_info): try: request_type = step_info["请求方式"] param_variable_list = re.findall('\\${\w+}', step_info["请求参数(get)"]) if param_variable_list: for param_variable in param_variable_list: step_info["请求参数(get)"] = step_info["请求参数(get)"] .replace(param_variable,'"%s"' % self.temp_variables.get(param_variable[2:-1])) if request_type == "get": result = self.__get( step_info ) elif request_type == "post": data_variable_list = re.findall('\\${\w+}', step_info["提交数据(post)"]) if data_variable_list: for param_variable in data_variable_list: step_info["提交数据(post)"] = step_info["提交数据(post)"] .replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1])) result = self.__post( step_info ) else: result = {'code':1,'result':'请求方式不支持'} except Exception as e: result = {'code':4,'result':'用例编号[%s]的[%s]步骤出现系统异常,原因:%s'%(step_info['测试用例编号'],step_info["测试用例步骤"],e.__str__())} return result def request_by_step(self,step_infos): self.temp_variables = {} for step_info in step_infos: temp_result = self.request( step_info ) # print( temp_result ) if temp_result['code']!=0: break return temp_result if __name__=="__main__": case_info = [ {'请求方式': 'get', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}', '提交数据(post)': '', '取值方式': 'json取值', '传值变量': 'token', '取值代码': '$.access_token', '期望结果类型': '正则匹配', '期望结果': '{"access_token":"(.+","expires_in":(.+"access_token":${token}}', '提交数据(post)': '{"tag" : {"name" : "衡东"}}','取值方式': '无', '传值变量': '', '取值代码': '', '期望结果类型': '正则匹配', '期望结果': '{"tag":{"id":(.+"name":"衡东"}}'} ] RequestsUtils().request_by_step(case_info)
五、断言封装(核心封装)
check_utils.py 断言封装,与实际结果核对
import re import ast class CheckUtils(): def __init__(self,check_response=None): self.ck_response=check_response self.ck_rules = { '无': self.no_check, 'json键是否存在': self.check_key, 'json键值对': self.check_keyvalue, '正则匹配': self.check_regexp } self.pass_result = { 'code': 0, 'response_reason': self.ck_response.reason, 'response_code': self.ck_response.status_code, 'response_headers': self.ck_response.headers, 'response_body': self.ck_response.text, 'check_result': True, 'message': '' # 扩招作为日志输出等 } self.fail_result = { 'code': 2, 'response_reason': self.ck_response.reason, 'response_code': self.ck_response.status_code, 'response_headers': self.ck_response.headers, 'response_body': self.ck_response.text, 'check_result': False, 'message': '' # 扩招作为日志输出等 } def no_check(self): return self.pass_result def check_key(self,check_data=None): check_data_list = check_data.split(',') #把需要判断的值做切割,取出键值 res_list = [] #存放每次比较的结果 wrong_key = [] #存放比较失败key for check_data in check_data_list: #把切割的键值和取出响应结果中的所有的键一个一个对比 if check_data in self.ck_response.json().keys(): res_list.append(self.pass_result ) else: res_list.append( self.fail_result ) wrong_key.append(check_data) #把失败的键放进来,便于后续日志输出 # print(res_list) # print(wrong_key) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_keyvalue(self,check_data=None): res_list = [] # 存放每次比较的结果 wrong_items = [] # 存放比较失败 items for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字符串转成字典,items()取出键值对 if check_item in self.ck_response.json().items(): res_list.append( self.pass_result ) else: res_list.append( self.fail_result ) wrong_items.append(check_item) # print( res_list ) # print( wrong_items ) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_regexp(self,check_data=None): pattern = re.compile(check_data) if re.findall(pattern=pattern,string=self.ck_response.text): #匹配到了,不为空,为true return self.pass_result else: return self.fail_result def run_check(self,check_type=None,check_data=None): code = self.ck_response.status_code if code == 200: if check_type in self.ck_rules.keys(): result=self.ck_rules[check_type](check_data) return result else: self.fail_result['message'] = '不支持%s判断方法'%check_type return self.fail_result else: self.fail_result['message'] = '请求的响应状态码非%s'%str(code) return self.fail_result if __name__=="__main__": # 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值 CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in") #检查键值对是否存在 CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue('{"expires_in": 7200}') #正则对比 #TURE print(CheckUtils('{"access_token":"hello","expires_in":7200}').check_regexp('"expires_in":(.+"access_token":"hello","expires":7200}').check_regexp('"expires_in":(.+"htmlcode">import warnings import unittest import paramunittest from common.testdata_utils import TestdataUtils from common.requests_utils import RequestsUtils #如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql() exccel数据源:def_testcase_data_list() case_infos = TestdataUtils().def_testcase_data_list_by_mysql() @paramunittest.parametrized( *case_infos ) class APITest(paramunittest.ParametrizedTestCase): def setUp(self) -> None: warnings.simplefilter('ignore', ResourceWarning) #不会弹出警告提示 def setParameters(self, case_id, case_info): self.case_id = case_id self.case_info = case_info def test_api_common_function(self): '''测试描述''' self._testMethodName = self.case_info[0].get("测试用例编号") self._testMethodDoc = self.case_info[0].get("测试用例名称") actual_result = RequestsUtils().request_by_step(self.case_info) self.assertTrue( actual_result.get('check_result'),actual_result.get('message') ) if __name__ == '__main__': unittest.main()七、common下的log_utils.py 封装
import os import logging import time from common.config_utils import config current_path = os.path.dirname(__file__) log_output_path = os.path.join( current_path,'..', config.LOG_PATH ) class LogUtils(): def __init__(self,log_path=log_output_path): self.log_name = os.path.join( log_output_path ,'ApiTest_%s.log'%time.strftime('%Y_%m_%d') ) self.logger = logging.getLogger("ApiTestLog") self.logger.setLevel( config.LOG_LEVEL ) console_handler = logging.StreamHandler() # 控制台输出 file_handler = logging.FileHandler(self.log_name,'a',encoding='utf-8') # 文件输出 formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) self.logger.addHandler( console_handler ) self.logger.addHandler( file_handler ) console_handler.close() # 防止打印日志重复 file_handler.close() # 防止打印日志重复 def get_logger(self): return self.logger logger = LogUtils().get_logger() # 防止打印日志重复 if __name__ == '__main__': logger.info('hello')八、common下的config_utils.py的封装
配置文件的编写:
对配置文件的读取封装:
import os import configparser current_path = os.path.dirname(__file__) cfgpath = os.path.join(current_path, "../conf/local_config.ini") print(cfgpath) class ConfigUtils: def __init__(self,config_path=cfgpath): self.__conf=configparser.ConfigParser() self.__conf.read(config_path, encoding="utf-8") def read_ini(self,sec,option): value=self.__conf.get(sec,option) return value @property def hosts(self): value=self.read_ini('default','hosts') return value @property def LOG_PATH(self): value = self.read_ini('path', 'LOG_PATH') return value @property def CASE_DATA_PATH(self): value = self.read_ini('path', 'CASE_DATA_PATH') return value @property def REPORT_PATH(self): value = self.read_ini('path', 'REPORT_PATH') return value @property def LOG_LEVEL(self): value = int(self.read_ini('log', 'LOG_LEVEL')) return value @property def smtp_server(self): smtp_server_value = self.read_ini('email', 'smtp_server') return smtp_server_value @property def smtp_sender(self): smtp_sender_value = self.read_ini('email', 'smtp_sender') return smtp_sender_value @property def smtp_password(self): smtp_password_value = self.read_ini('email', 'smtp_password') return smtp_password_value @property def smtp_receiver(self): smtp_receiver_value = self.read_ini('email', 'smtp_receiver') return smtp_receiver_value @property def smtp_cc(self): smtp_cc_value = self.read_ini('email', 'smtp_cc') return smtp_cc_value @property def smtp_subject(self): smtp_subject_value = self.read_ini('email', 'smtp_subject') return smtp_subject_value config=ConfigUtils() if __name__=='__main__': current_path = os.path.dirname(__file__) cfgpath = os.path.join(current_path, "../conf/local_config.ini") config_u=ConfigUtils() print(config_u.hosts) print(config_u.LOG_LEVEL)九、test_runner下的run_case.py 封装
class RunCase(): def __init__(self): self.test_case_path = test_case_path self.report_path = test_report_path self.title = 'P1P2接口自动化测试报告' self.description = '自动化接口测试框架' self.tester = '测试开发组' def load_test_suite(self): discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path, pattern='api_test.py', top_level_dir=self.test_case_path) all_suite = unittest.TestSuite() all_suite.addTest( discover ) return all_suite def run(self): report_dir = HTMLTestReportCN.ReportDirectory(self.report_path) report_dir.create_dir(self.title) report_file_path = HTMLTestReportCN.GlobalMsg.get_value('report_path') fp = open( report_file_path ,'wb' ) runner = HTMLTestReportCN.HTMLTestRunner(stream=fp, title=self.title, description=self.description, tester=self.tester) runner.run( self.load_test_suite() ) fp.close() return report_file_path if __name__=='__main__': report_path = RunCase().run() EmailUtils(open(report_path, 'rb').read(), report_path).send_mail()十、common下的email_utils.py 封装
import os import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from common.config_utils import config class EmailUtils(): def __init__(self,smtp_body,smtp_attch_path=None): self.smtp_server = config.smtp_server self.smtp_sender = config.smtp_sender self.smtp_password = config.smtp_password self.smtp_receiver = config.smtp_receiver self.smtp_cc = config.smtp_cc self.smtp_subject = config.smtp_subject self.smtp_body = smtp_body self.smtp_attch = smtp_attch_path def mail_message_body(self): message = MIMEMultipart() message['from'] = self.smtp_sender message['to'] = self.smtp_receiver message['Cc'] = self.smtp_cc message['subject'] = self.smtp_subject message.attach( MIMEText(self.smtp_body,'html','utf-8') ) if self.smtp_attch: attach_file = MIMEText(open(self.smtp_attch, 'rb').read(), 'base64', 'utf-8') attach_file['Content-Type'] = 'application/octet-stream' attach_file.add_header('Content-Disposition', 'attachment', filename=('gbk', '', os.path.basename(self.smtp_attch))) message.attach(attach_file) return message def send_mail(self): smtp = smtplib.SMTP() smtp.connect(self.smtp_server) smtp.login(user=self.smtp_sender, password=self.smtp_password) smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(",")+ self.smtp_cc.split(","), self.mail_message_body().as_string()) if __name__=='__main__': html_path = os.path.dirname(__file__) + '/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html' EmailUtils('<h3 align="center">自动化测试报告</h3>',html_path).send_mail()以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新动态
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]