?
?
在 上一節(jié)? (015、【前程貸—簡化版,實戰(zhàn) 08】 用pymysql庫,封裝 handler_mysql? 操作數據庫 )? 的基礎上,增加 驗證手機號是否被注冊過?如果注冊過再繼續(xù)生成隨機手機號碼,直到得到的手機號碼是未被注冊的。
?
如何實現成功注冊呢 :
步驟1、用faker庫生成一個隨機手機號碼 ;
步驟2、用這個手機號碼去數據庫比對,是否注冊過;如果注冊過再生成一個隨機手機號碼 ;
步驟3、如果未被注冊過,用生成的隨機手機號碼 (如:13123456789) 替換,excel表格中的標記字符 #phone# ;
步驟4、替換成功后,得到一個臨時字典,用這個臨時字典做為數據發(fā)送requests請求 ;
?
1、項目層級結構如下 :
?
2、excel表格設計如下,注意 #phone# :
?
?
3、各代碼如下,標記紅色部分為主要修改點 :
a、config.ini? 代碼如下:
# 定義日志相關的配置 # name表示自定義日志搜集器的名字; # level表示日志級別; # file_name 表示生成的日志名字 # show_stream_handler表示是否在控制臺輸出日志。False表示不在控制臺顯示。 # when,表示按H(小時)、D(天)、M(分鐘)、S(秒)生成日志 ;注意字母是大寫 ; [log] name=future_logging_collector level=INFO file_name=future_loan_test.log show_stream_handler=False when=H
b、mysql_ini.py 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 13:20 # Project: Future_Loan_day15 # Module: mysql_ini.py # 此配置文件用來配置鏈接mysql數據庫 # key已經和代碼關聯了,不能更改key名; MYSQL_INI = { "host": "api.lemonban.com", "port": 3306, "user": "future", "password": "123456", "database": "futureloan", "charset": "utf8" }
c、test_1_register.py? 代碼如下(有代碼修改):
import os import json import jsonpath import pytest from tools.handle_phone import HandlePhone from tools.handler_request import HandlerRequests from tools.handler_assert_list import HandlerAssertList from tools.handler_excel import HandlerExcel from tools.handler_logging import handler_logger # 一、準備 數據 cases # A、用os.path找到 xlsx 文件 file_dir = os.path.dirname(os.path.realpath('__file__')) base_dir = os.path.dirname(file_dir) print(f'根目錄:{base_dir}') file_name = os.path.join(base_dir, 'test_datas', 'test_register_cases.xlsx') print(f'excel表格的路徑:{file_name}') # B、調用common——>excel_handler.py 中已封裝好的ExcelHandler類來操作excel表格 # # 1、打開xlsx表格;2、根據表單名字獲取表格數據;3、讀取數據; excel_handler = HandlerExcel(file_name) excel_handler.select_sheet_by_name('register_case') all_cases = excel_handler.read_all_rows_data() # print(all_cases) # handler_logger.info(f'=====all_cases=======:{all_cases}') mrq = HandlerRequests() add_assert = HandlerAssertList() hp = HandlePhone() class TestRegister: # 三、參數化測試用例 # pytest.mark.parametrize("item", cases) 中的 "item" 必須 和 def test_my_requests(item): 中的 item 名字一樣 @pytest.mark.parametrize("item", all_cases) def test_my_register(self, item): # handler_logger.info('=====注冊接口測試=======') handler_logger.info(f'"#phone#"未替換的前的數據:{item["req_data"]}=======') # 1、替換占位符 # item 的值為:一行的數據,如下: """ { 'case_id': 'test_register_002', 'case_name': '注冊成功,普通用戶', 'method': 'POST', 'url': 'http://api.lemonban.com/futureloan/member/register', 'req_data': '{"mobile_phone": "#phone#","pwd": "12345678","reg_name":"Sky","type":1}', 'assert_response_value_list': '[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"OK","type":"eq"}]', 'actual_results': None } """ # 1、用隨機生成的,未被注冊過的手機號碼,把 標記 "#phone#"的字符串,替換;
# a、查詢數據庫,并得到一個未被注冊的手機號碼 new_phone = hp.get_phone() # handler_logger.info(f'=====new_phone:{new_phone}=======') # handler_logger.info(f'=====對比結果:{item["req_data"].find("#phone#") != -1}=======')
# b、判斷字符串中是否含有 #phone#,如果有把它替換成 剛隨機生成的手機號碼 if item["req_data"] and item["req_data"].find("#phone#") != -1: item['req_data'] = item['req_data'].replace("#phone#", new_phone) if item["assert_response_value_list"] and item["assert_response_value_list"].find("#phone#") != -1: item["assert_response_value_list"] = item["assert_response_value_list"].replace("#phone#", new_phone) # 2、把json格式的字符串轉換成 字典 temp_req_data_dict = json.loads(item['req_data']) handler_logger.info(f'用隨機手機號碼替換"#phone#"得到的數據:{temp_req_data_dict}=======') # 3、發(fā)起請求 resp = mrq.send_reuqests(item['method'], item['url'], temp_req_data_dict) print(resp.json()) # 4、斷言 add_assert.assert_response_value(item["assert_response_value_list"], resp.json())
?
d、handle_phone.py? 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com from faker import Faker from tools.handler_mysql import handler_mysql from tools.handler_logging import handler_logger class HandlePhone: def __init__(self): self.fk = Faker(locale='zh-cn') # 如果系統對手機號前綴有要求的,要做前綴校驗 def __faker_phone(self): phone = self.fk.phone_number() handler_logger.info(f'隨機生成的手機號為:{phone}') return phone # 獲取數據庫查詢結果 def __select_phone(self, phone): sql = "select * from member where mobile_phone = '{}'".format(phone) select_phone_result = handler_mysql.get_db_all_data(sql=sql) return select_phone_result # 獲取未注冊的手機號 # 1、生成一個隨機手機號碼 # 2、用生成的手機號碼去數據庫查詢,如果查詢結果大于1行說明已經存在,被注冊過; def get_phone(self): while True: phone = self.__faker_phone() # 隨機生成手機號 select_phone_result = self.__select_phone(phone) # 拿到數據庫執(zhí)行結果 if len(select_phone_result) > 0: continue else: return phone if __name__ == '__main__': cl = HandlePhone() result = cl.get_phone() print(result)
?
e、handler_assert_list.py 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 16:41 # Project: Future_Loan_day15 # Module: handler_assert_list.py import jsonpath import ast # 封裝 assert_list列 添加斷言 class HandlerAssertList: def assert_response_value(self, assert_response_value_list, response_dict): """ 根據響應的值斷言 :param assert_response_value_list: 從excel表格中獲取到的斷言字符串,比如:'[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"賬號已存在","type":"eq"}]' :param response_dict: 發(fā)起請求后得到的響應結果 :return: """ # 把字符串轉換成python列表 # 比eval安全一點,轉換成列表。eval去掉最外層引號后還會自動計算,literal_eval僅去掉最外層引號; check_list = ast.literal_eval(assert_response_value_list) # print(check_list) # 每一個單元格所有斷言的比對結果存放在check_res check_res = [] for check in check_list: # 通過jsonpath表達式,從響應結果中拿到實際結果 actual = jsonpath.jsonpath(response_dict, check["expr"]) if isinstance(actual, list): actual = actual[0] # 與實際結果比對 if check["type"] == "eq": # print(f' 實際比對結果:{actual == check["expected"]}') check_res.append(actual == check["expected"]) # print(f'所有斷言結果:{check_res}') # 如果 斷言列表中有False,拋出 斷言異常 if False in check_res: raise AssertionError if __name__ == '__main__': # 測試代碼 assert_response_value_list = '[{"expr":"$.code","expected":0,"type":"eq"},' '{"expr":"$.msg","expected":"OK","type":"eq"}]' response_dict = { "code": 0, "msg": "OK", "data": { "id": 123660458, "reg_name": "Sky", "mobile_phone": "13321886699" }, "copyright": "Copyright 檸檬班 ? 2017-2020 湖南省零檬信息技術有限公司 All Rights Reserved" } add_assert = HandlerAssertList() check_res = add_assert.assert_response_value(assert_response_value_list, response_dict)
?
f、handler_conf.py? 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 17:58 # Project: Future_Loan_day15 # Module: handler_conf.py from configparser import ConfigParser class HandlerConf(ConfigParser): def __init__(self, file_name): super().__init__() self.read(file_name, encoding='utf-8')
?
g、handler_excel.py? 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 16:34 # Project: Future_Loan_day15 # Module: handler_excel.py import openpyxl from tools.handler_logging import handler_logger # 封裝一個xlsx表格操作類 class HandlerExcel: # 操作一個excel表格: # 第一步:打開工作簿 # 第二步:選取表單 # 第三步:讀取數據 # 第四步:關閉打開的工作簿 def __init__(self, xlsx_file_path: str): """ 傳入一個xlsx文件路徑,用load_workbook()方法加載,如果文件加載不成功,拋出異常。如果成功,打開一個工作簿。 :param xlsx_file_path: xlsx文件路徑 """ try: self.wb = openpyxl.load_workbook(xlsx_file_path) except FileNotFoundError as ffe: # print('打開文件失敗') handler_logger.error(ffe) raise # 不確定打開的是哪個表單 self.sh = None def close_workbook(self): """ 關閉當前打開的工作簿 :return: """ self.wb.close() def select_sheet_by_name(self, sheet_name: str): """ 根據傳入的工作表的名字,打開工作表。 :param sheet_name: 作表的名字 """ self.sh = self.wb[f'{sheet_name}'] def read_all_rows_data(self): """ 從選定的表單當中,第一行作為key. 將后面的每一行數據,與第一行拼接成一個字典數據,作為一條測試用例數據。 將所有測試用例數據,添加到一個列表當中。 :return: 測試用例數據列表 """ # 獲取表單的所有行,即獲取表單的所有數據 sheet_all_rows = list(self.sh.values) # 把第一行作為數據的keys keys = sheet_all_rows[0] # print(keys) # 定義 cases_list 存放測試用例 cases_list = [] # 以下代碼功能:excel表單第2行開始的每一行測試數據,與第一行的keys拼接成一個字典。 for single_row in sheet_all_rows[1:]: case_dict = dict(zip(keys, single_row)) cases_list.append(case_dict) return cases_list if __name__ == '__main__': eh = HandlerExcel(r'D:SkyWorkSpaceWorkSpaceAPI_testlmFuture_Loan' r'Future_Loan_day16 est_datas est_register_cases.xlsx') eh.select_sheet_by_name('register_case') print(eh.read_all_rows_data()) eh.close_workbook()
?
h、handler_logging.py 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com import logging # 自定義一個日志模塊 import os # 導入 ConfigParser 類 ,用來操作 config.ini 文件 ; import time from configparser import ConfigParser from logging import Logger from logging import handlers class HandlerLogger(Logger): def __init__(self): # 一、用 ConfigParser類 來操作 config.ini 配置文件 ; # 實例化一個 ConfigParser ; conf = ConfigParser() # 1、獲取config.ini文件 common_dir = os.path.dirname(os.path.realpath(__file__)) base_dir = os.path.dirname(common_dir) config_ini_file = os.path.join(base_dir, 'conf', 'config.ini') # 2、從配置文件獲取值 conf.read(config_ini_file, encoding='utf-8') logger_name = conf.get('log', 'name') level = conf.get('log', 'level') file_name = conf.get('log', 'file_name') show_stream_handler = conf.get('log', 'show_stream_handler') when = conf.get('log', 'when') # 二、設置自定義日志搜集器名字、設置日志級別; super().__init__(logger_name, level) # 三、定義日志輸出格式, 使用Formatter類實例化一個日志格式類; fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s, %(pathname)s,line=%(lineno)d' # fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s,line=%(lineno)d' formatter = logging.Formatter(fmt) # 四A、日志默認輸出到控制臺,如果設置為False,日志將不輸出到控制臺; if show_stream_handler == 'True': stream_handler = logging.StreamHandler() # 設置渠道當中的日志格式 stream_handler.setFormatter(formatter) # 將渠道與實例日志搜集器綁定 self.addHandler(stream_handler) # 四B、把日志輸出到文件file # 首先拼接存放log的file文件 logs_file = os.path.join(base_dir, 'logs', file_name) print(logs_file) if logs_file: file_handle = handlers.TimedRotatingFileHandler(filename=logs_file, when=when, encoding='utf-8', interval=1, backupCount=5) # 設置渠道當中的日志格式 file_handle.setFormatter(formatter) # 將渠道與實例日志搜集器綁定 self.addHandler(file_handle) # 生成一個 handler_logger 實例,在其他所有模塊中導入該模塊時,共用這一個日志搜集實例。handler_logger 類似于 全局變量 # 日志搜集是典型的單列設計模式 (單實例模式) 。 handler_logger = HandlerLogger() if __name__ == '__main__': handler_logger = HandlerLogger() for i in range(10): time.sleep(1) handler_logger.debug('=====debug=====') handler_logger.info('=====info=====') handler_logger.warning('=====warning=====') handler_logger.error('=====error=====')
?
i、handler_mysql.py 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 13:18 # Project: Future_Loan_day15 # Module: handler_mysql.py import pymysql from conf.mysql_ini import MYSQL_INI # 封裝操作Mysql數據庫類 class HandleMysql: def __init__(self): """ 1、初始化建立連接,創(chuàng)建數據庫連接 charset='utf8', 注意,不是utf-8 哦 返回數據格式控制: cursorclass=pymysql.cursors.DictCursor 加上這個表示返回字典格式的數據;不加的話,以元組的形式返回; """ self.connection = pymysql.connect( host=MYSQL_INI['host'], port=MYSQL_INI['port'], user=MYSQL_INI['user'], password=MYSQL_INI['password'], database=MYSQL_INI['database'], charset=MYSQL_INI['charset'], cursorclass=pymysql.cursors.DictCursor) # 2、創(chuàng)建游標 self.cur = self.connection.cursor() # 獲取 查詢得到的行數(數量) def get_count(self, sql): count = self.cur.execute(sql) return count # 獲取一條數據,一般都是最前面的那條數據 def get_db_one_data(self, sql): self.cur.execute(sql) return self.cur.fetchone() # 獲取全部數據 def get_db_all_data(self, sql): self.cur.execute(sql) return self.cur.fetchall() # 關閉數據庫連接 def close(self): self.cur.close() self.connection.close() # 使用單例模式,后續(xù)導入數據庫就只導入 handler_mysql handler_mysql = HandleMysql() if __name__ == '__main__': # 1、建立鏈接 handler_mysql = HandleMysql() # 2、執(zhí)行 sql 語句 # 先在 sql客戶端 (Navicat Premium 12免安裝) 執(zhí)行以下sql語句,查詢結果 phone = 18837906872 sql_str_2 = f"select * from member where mobile_phone='{phone}'" # 方式二 # 執(zhí)行sql語句 results = handler_mysql.get_db_one_data(sql_str_2) print(results)
?
j、handler_request.py? 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com import requests from tools.handler_logging import handler_logger class HandlerRequests: def __init__(self): """ 初始化方法,初始化請求頭; """ self.headers = {"X-Lemonban-Media-Type": "lemonban.v2"} handler_logger.info(f'請求頭為:{self.headers}') # 方法 post/put... json=xxx, get方法用 params=xxx def send_reuqests(self, method, url, req_data, token=None): """ 調用requests庫里面的方法去發(fā)起請求,并得到響應結果; :param url: 接口url :param method: 請求方法,get,psot :param req_data: 請求數據 :param token: 如果有token,添加token """ handler_logger.info(f'請求方法為:{method}') handler_logger.info(f'請求url為:{url}') handler_logger.info(f'請求數據為:{req_data}') # 如果有token,添加token self.__del_header(token) # 注意 request() 方法是不帶s的,requests庫是帶s的; if method.upper() == "GET": resp = requests.request(method, url, params=req_data, headers=self.headers) return resp if method.upper() == "POST": # 為了便于學習,簡單的認為就是用 json 格式傳; resp = requests.request(method, url, json=req_data, headers=self.headers) return resp def __del_header(self, token=None): """ 如果有token,添加token處理 :param token: 如果有token,添加token處理 """ if token: self.headers["Authorization"] = f"Bearer {token}" if __name__ == '__main__': handler_requests = HandlerRequests()
?
k、操作mysql.py? 代碼如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 10:02 # Project: Future_Loan_day15 # Module: 操作mysql.py import pymysql # 1、建立連接,創(chuàng)建數據庫連接 # charset='utf8', 注意,不是utf-8 哦 # 返回數據格式控制: cursorclass=pymysql.cursors.DictCursor 加上這個表示返回字典格式的數據;不加的話,以元組的形式返回; connection = pymysql.connect(host='api.lemonban.com', port=3306, user='future', password='123456', database='futureloan', charset='utf8', cursorclass=pymysql.cursors.DictCursor) # 2、創(chuàng)建游標 cur = connection.cursor() # 3、執(zhí)行sql語句 , 返回數據 sql_str = "select * from member where reg_name='娜娜'" # 方式一 # phone = 18837906872 # sql_str_2 = f"select * from member where mobile_phone='{phone}'" # 方式二 affected_rows = cur.execute(sql_str) # 返回影響的行數 print(f'返回影響的行數:{affected_rows}') # 4、獲取查詢的結果 first_data = cur.fetchone() # 獲取一條數據,第一條數據;字典形式返回 ; all_data = cur.fetchall() # 獲取全部數據;列表形式返回 ; two_data = cur.fetchmany(size=2) # 獲取前2行數據;列表形式返回 ; print(f'獲取第一條數據:{first_data}') print(f'獲取全部數據:{all_data}') print(f'獲取前2行數據:{two_data}') # 5、關閉數據庫連接 cur.close() # 首先關閉游標 connection.close() # 關閉數據庫
運行方式如下:
?
執(zhí)行結果如下:
?
本文摘自 :https://www.cnblogs.com/