當前位置:首頁 > IT技術 > 移動平臺 > 正文

016、【前程貸—簡化版,實戰(zhàn) 09】 用 faker庫 生成的隨機手機號碼,去注冊,看是否注冊過 。
2021-09-05 09:04:31

?

?

在 上一節(jié)? (015、【前程貸—簡化版,實戰(zhàn) 08】 用pymysql庫,封裝 handler_mysql? 操作數據庫 )? 的基礎上,增加 驗證手機號是否被注冊過?如果注冊過再繼續(xù)生成隨機手機號碼,直到得到的手機號碼是未被注冊的。

?

如何實現成功注冊呢 :

步驟1、用faker庫生成一個隨機手機號碼 ;

步驟2、用這個手機號碼去數據庫比對,是否注冊過;如果注冊過再生成一個隨機手機號碼 ;

步驟3、如果未被注冊過,用生成的隨機手機號碼 (如:13123456789) 替換,excel表格中的標記字符 #phone# ;

步驟4、替換成功后,得到一個臨時字典,用這個臨時字典做為數據發(fā)送requests請求 ;

?

1、項目層級結構如下 :

?

2、excel表格設計如下,注意 #phone#

?

?

3、各代碼如下,標記紅色部分為主要修改點

a、config.ini? 代碼如下:

# 定義日志相關的配置
# name表示自定義日志搜集器的名字;
# level表示日志級別;
# file_name 表示生成的日志名字
# show_stream_handler表示是否在控制臺輸出日志。False表示不在控制臺顯示。
# when,表示按H(小時)、D(天)、M(分鐘)、S(秒)生成日志 ;注意字母是大寫 ;
[log]
name=future_logging_collector
level=INFO
file_name=future_loan_test.log
show_stream_handler=False
when=H

b、mysql_ini.py 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 13:20
# Project: Future_Loan_day15
# Module:  mysql_ini.py


# 此配置文件用來配置鏈接mysql數據庫
# key已經和代碼關聯了,不能更改key名;
MYSQL_INI = {
    "host": "api.lemonban.com",
    "port": 3306,
    "user": "future",
    "password": "123456",
    "database": "futureloan",
    "charset": "utf8"
}

c、test_1_register.py? 代碼如下(有代碼修改)

import os
import json

import jsonpath
import pytest

from tools.handle_phone import HandlePhone
from tools.handler_request import HandlerRequests
from tools.handler_assert_list import HandlerAssertList
from tools.handler_excel import HandlerExcel
from tools.handler_logging import handler_logger

# 一、準備 數據 cases
# A、用os.path找到 xlsx 文件
file_dir = os.path.dirname(os.path.realpath('__file__'))
base_dir = os.path.dirname(file_dir)
print(f'根目錄:{base_dir}')

file_name = os.path.join(base_dir, 'test_datas', 'test_register_cases.xlsx')
print(f'excel表格的路徑:{file_name}')


# B、調用common——>excel_handler.py 中已封裝好的ExcelHandler類來操作excel表格
# # 1、打開xlsx表格;2、根據表單名字獲取表格數據;3、讀取數據;
excel_handler = HandlerExcel(file_name)
excel_handler.select_sheet_by_name('register_case')
all_cases = excel_handler.read_all_rows_data()

# print(all_cases)
# handler_logger.info(f'=====all_cases=======:{all_cases}')


mrq = HandlerRequests()
add_assert = HandlerAssertList()
hp = HandlePhone()


class TestRegister:
    # 三、參數化測試用例
    # pytest.mark.parametrize("item", cases) 中的 "item" 必須 和 def test_my_requests(item): 中的 item 名字一樣
    @pytest.mark.parametrize("item", all_cases)
    def test_my_register(self, item):
        # handler_logger.info('=====注冊接口測試=======')
        handler_logger.info(f'"#phone#"未替換的前的數據:{item["req_data"]}=======')

        # 1、替換占位符
        # item 的值為:一行的數據,如下:
        """
            {
                'case_id': 'test_register_002',
                'case_name': '注冊成功,普通用戶',
                'method': 'POST',
                'url': 'http://api.lemonban.com/futureloan/member/register',
                'req_data': '{"mobile_phone": "#phone#","pwd": "12345678","reg_name":"Sky","type":1}',
                'assert_response_value_list': '[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"OK","type":"eq"}]',
                'actual_results': None
            }
        """

        # 1、用隨機生成的,未被注冊過的手機號碼,把 標記 "#phone#"的字符串,替換;
     # a、查詢數據庫,并得到一個未被注冊的手機號碼 new_phone = hp.get_phone() # handler_logger.info(f'=====new_phone:{new_phone}=======') # handler_logger.info(f'=====對比結果:{item["req_data"].find("#phone#") != -1}=======')       

     # b、判斷字符串中是否含有 #phone#,如果有把它替換成 剛隨機生成的手機號碼 if item["req_data"] and item["req_data"].find("#phone#") != -1: item['req_data'] = item['req_data'].replace("#phone#", new_phone) if item["assert_response_value_list"] and item["assert_response_value_list"].find("#phone#") != -1: item["assert_response_value_list"] = item["assert_response_value_list"].replace("#phone#", new_phone) # 2、把json格式的字符串轉換成 字典 temp_req_data_dict = json.loads(item['req_data']) handler_logger.info(f'用隨機手機號碼替換"#phone#"得到的數據:{temp_req_data_dict}=======')
# 3、發(fā)起請求 resp = mrq.send_reuqests(item['method'], item['url'], temp_req_data_dict) print(resp.json()) # 4、斷言 add_assert.assert_response_value(item["assert_response_value_list"], resp.json())

?

d、handle_phone.py? 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com

from faker import Faker
from tools.handler_mysql import handler_mysql
from tools.handler_logging import handler_logger


class HandlePhone:
    def __init__(self):
        self.fk = Faker(locale='zh-cn')

    # 如果系統對手機號前綴有要求的,要做前綴校驗
    def __faker_phone(self):
        phone = self.fk.phone_number()
        handler_logger.info(f'隨機生成的手機號為:{phone}')
        return phone

    # 獲取數據庫查詢結果
    def __select_phone(self, phone):
        sql = "select * from member where mobile_phone = '{}'".format(phone)
        select_phone_result = handler_mysql.get_db_all_data(sql=sql)
        return select_phone_result

    # 獲取未注冊的手機號
    # 1、生成一個隨機手機號碼
    # 2、用生成的手機號碼去數據庫查詢,如果查詢結果大于1行說明已經存在,被注冊過;
    def get_phone(self):
        while True:
            phone = self.__faker_phone()  # 隨機生成手機號
            select_phone_result = self.__select_phone(phone)  # 拿到數據庫執(zhí)行結果
            if len(select_phone_result) > 0:
                continue
            else:
                return phone


if __name__ == '__main__':
    cl = HandlePhone()
    result = cl.get_phone()
    print(result)

?

e、handler_assert_list.py 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 16:41
# Project: Future_Loan_day15
# Module:  handler_assert_list.py

import jsonpath
import ast


# 封裝 assert_list列 添加斷言
class HandlerAssertList:

    def assert_response_value(self, assert_response_value_list, response_dict):
        """
        根據響應的值斷言
        :param assert_response_value_list: 從excel表格中獲取到的斷言字符串,比如:'[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"賬號已存在","type":"eq"}]'
        :param response_dict: 發(fā)起請求后得到的響應結果
        :return:
        """
        # 把字符串轉換成python列表
        # 比eval安全一點,轉換成列表。eval去掉最外層引號后還會自動計算,literal_eval僅去掉最外層引號;
        check_list = ast.literal_eval(assert_response_value_list)
        # print(check_list)

        # 每一個單元格所有斷言的比對結果存放在check_res
        check_res = []
        for check in check_list:
            # 通過jsonpath表達式,從響應結果中拿到實際結果
            actual = jsonpath.jsonpath(response_dict, check["expr"])
            if isinstance(actual, list):
                actual = actual[0]
            # 與實際結果比對
            if check["type"] == "eq":
                # print(f'
實際比對結果:{actual == check["expected"]}')
                check_res.append(actual == check["expected"])

        # print(f'所有斷言結果:{check_res}')
        # 如果 斷言列表中有False,拋出 斷言異常
        if False in check_res:
            raise AssertionError


if __name__ == '__main__':
    # 測試代碼
    assert_response_value_list = '[{"expr":"$.code","expected":0,"type":"eq"},' 
                                 '{"expr":"$.msg","expected":"OK","type":"eq"}]'
    response_dict = {
        "code": 0,
        "msg": "OK",
        "data": {
            "id": 123660458,
            "reg_name": "Sky",
            "mobile_phone": "13321886699"
        },
        "copyright": "Copyright 檸檬班 ? 2017-2020 湖南省零檬信息技術有限公司 All Rights Reserved"
    }

    add_assert = HandlerAssertList()
    check_res = add_assert.assert_response_value(assert_response_value_list, response_dict)

?

f、handler_conf.py? 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 17:58
# Project: Future_Loan_day15
# Module:  handler_conf.py

from configparser import ConfigParser


class HandlerConf(ConfigParser):

    def __init__(self, file_name):
        super().__init__()
        self.read(file_name, encoding='utf-8')

?

g、handler_excel.py? 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 16:34
# Project: Future_Loan_day15
# Module:  handler_excel.py

import openpyxl

from tools.handler_logging import handler_logger


# 封裝一個xlsx表格操作類
class HandlerExcel:
    # 操作一個excel表格:
    # 第一步:打開工作簿
    # 第二步:選取表單
    # 第三步:讀取數據
    # 第四步:關閉打開的工作簿

    def __init__(self, xlsx_file_path: str):
        """
        傳入一個xlsx文件路徑,用load_workbook()方法加載,如果文件加載不成功,拋出異常。如果成功,打開一個工作簿。
        :param xlsx_file_path: xlsx文件路徑
        """
        try:
            self.wb = openpyxl.load_workbook(xlsx_file_path)
        except FileNotFoundError as ffe:
            # print('打開文件失敗')
            handler_logger.error(ffe)
            raise
        # 不確定打開的是哪個表單
        self.sh = None

    def close_workbook(self):
        """
        關閉當前打開的工作簿
        :return:
        """
        self.wb.close()

    def select_sheet_by_name(self, sheet_name: str):
        """
        根據傳入的工作表的名字,打開工作表。
        :param sheet_name: 作表的名字
        """
        self.sh = self.wb[f'{sheet_name}']

    def read_all_rows_data(self):
        """
        從選定的表單當中,第一行作為key.
        將后面的每一行數據,與第一行拼接成一個字典數據,作為一條測試用例數據。
        將所有測試用例數據,添加到一個列表當中。
       :return: 測試用例數據列表
        """
        # 獲取表單的所有行,即獲取表單的所有數據
        sheet_all_rows = list(self.sh.values)
        # 把第一行作為數據的keys
        keys = sheet_all_rows[0]
        # print(keys)

        # 定義 cases_list 存放測試用例
        cases_list = []

        # 以下代碼功能:excel表單第2行開始的每一行測試數據,與第一行的keys拼接成一個字典。
        for single_row in sheet_all_rows[1:]:
            case_dict = dict(zip(keys, single_row))
            cases_list.append(case_dict)
        return cases_list


if __name__ == '__main__':
    eh = HandlerExcel(r'D:SkyWorkSpaceWorkSpaceAPI_testlmFuture_Loan'
                      r'Future_Loan_day16	est_datas	est_register_cases.xlsx')
    eh.select_sheet_by_name('register_case')
    print(eh.read_all_rows_data())
    eh.close_workbook()

?

h、handler_logging.py 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com


import logging
# 自定義一個日志模塊
import os
# 導入 ConfigParser 類 ,用來操作 config.ini 文件 ;
import time
from configparser import ConfigParser
from logging import Logger
from logging import handlers


class HandlerLogger(Logger):

    def __init__(self):

        # 一、用 ConfigParser類 來操作 config.ini 配置文件 ;
        # 實例化一個 ConfigParser ;
        conf = ConfigParser()

        # 1、獲取config.ini文件
        common_dir = os.path.dirname(os.path.realpath(__file__))
        base_dir = os.path.dirname(common_dir)
        config_ini_file = os.path.join(base_dir, 'conf', 'config.ini')

        # 2、從配置文件獲取值
        conf.read(config_ini_file, encoding='utf-8')
        logger_name = conf.get('log', 'name')
        level = conf.get('log', 'level')
        file_name = conf.get('log', 'file_name')
        show_stream_handler = conf.get('log', 'show_stream_handler')
        when = conf.get('log', 'when')

        # 二、設置自定義日志搜集器名字、設置日志級別;
        super().__init__(logger_name, level)

        # 三、定義日志輸出格式, 使用Formatter類實例化一個日志格式類;
        fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s, %(pathname)s,line=%(lineno)d'
        # fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s,line=%(lineno)d'
        formatter = logging.Formatter(fmt)

        # 四A、日志默認輸出到控制臺,如果設置為False,日志將不輸出到控制臺;
        if show_stream_handler == 'True':
            stream_handler = logging.StreamHandler()
            # 設置渠道當中的日志格式
            stream_handler.setFormatter(formatter)
            # 將渠道與實例日志搜集器綁定
            self.addHandler(stream_handler)

        # 四B、把日志輸出到文件file
        # 首先拼接存放log的file文件
        logs_file = os.path.join(base_dir, 'logs', file_name)
        print(logs_file)
        if logs_file:
            file_handle = handlers.TimedRotatingFileHandler(filename=logs_file,
                                                            when=when,
                                                            encoding='utf-8',
                                                            interval=1,
                                                            backupCount=5)

            # 設置渠道當中的日志格式
            file_handle.setFormatter(formatter)
            # 將渠道與實例日志搜集器綁定
            self.addHandler(file_handle)


# 生成一個 handler_logger 實例,在其他所有模塊中導入該模塊時,共用這一個日志搜集實例。handler_logger 類似于 全局變量
# 日志搜集是典型的單列設計模式 (單實例模式) 。
handler_logger = HandlerLogger()

if __name__ == '__main__':
    handler_logger = HandlerLogger()
    for i in range(10):
        time.sleep(1)
        handler_logger.debug('=====debug=====')
        handler_logger.info('=====info=====')
        handler_logger.warning('=====warning=====')
        handler_logger.error('=====error=====')

?

i、handler_mysql.py 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 13:18
# Project: Future_Loan_day15
# Module:  handler_mysql.py

import pymysql

from conf.mysql_ini import MYSQL_INI


# 封裝操作Mysql數據庫類
class HandleMysql:
    def __init__(self):
        """
        1、初始化建立連接,創(chuàng)建數據庫連接
        charset='utf8', 注意,不是utf-8 哦
        返回數據格式控制: cursorclass=pymysql.cursors.DictCursor  加上這個表示返回字典格式的數據;不加的話,以元組的形式返回;
        """
        self.connection = pymysql.connect(
            host=MYSQL_INI['host'],
            port=MYSQL_INI['port'],
            user=MYSQL_INI['user'],
            password=MYSQL_INI['password'],
            database=MYSQL_INI['database'],
            charset=MYSQL_INI['charset'],
            cursorclass=pymysql.cursors.DictCursor)
        # 2、創(chuàng)建游標
        self.cur = self.connection.cursor()

    # 獲取 查詢得到的行數(數量)
    def get_count(self, sql):
        count = self.cur.execute(sql)
        return count

    # 獲取一條數據,一般都是最前面的那條數據
    def get_db_one_data(self, sql):
        self.cur.execute(sql)
        return self.cur.fetchone()

    # 獲取全部數據
    def get_db_all_data(self, sql):
        self.cur.execute(sql)
        return self.cur.fetchall()

    # 關閉數據庫連接
    def close(self):
        self.cur.close()
        self.connection.close()


# 使用單例模式,后續(xù)導入數據庫就只導入 handler_mysql
handler_mysql = HandleMysql()


if __name__ == '__main__':

    # 1、建立鏈接
    handler_mysql = HandleMysql()

    # 2、執(zhí)行 sql 語句
    # 先在 sql客戶端 (Navicat Premium 12免安裝) 執(zhí)行以下sql語句,查詢結果
    phone = 18837906872
    sql_str_2 = f"select * from member where mobile_phone='{phone}'"    # 方式二
    # 執(zhí)行sql語句
    results = handler_mysql.get_db_one_data(sql_str_2)
    print(results)

?

j、handler_request.py? 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
import requests
from tools.handler_logging import handler_logger


class HandlerRequests:

    def __init__(self):
        """
        初始化方法,初始化請求頭;
        """
        self.headers = {"X-Lemonban-Media-Type": "lemonban.v2"}
        handler_logger.info(f'請求頭為:{self.headers}')

    # 方法 post/put...  json=xxx, get方法用 params=xxx
    def send_reuqests(self, method, url, req_data, token=None):
        """
        調用requests庫里面的方法去發(fā)起請求,并得到響應結果;
        :param url: 接口url
        :param method: 請求方法,get,psot
        :param req_data: 請求數據
        :param token: 如果有token,添加token
        """
        handler_logger.info(f'請求方法為:{method}')
        handler_logger.info(f'請求url為:{url}')
        handler_logger.info(f'請求數據為:{req_data}')

        # 如果有token,添加token
        self.__del_header(token)

        # 注意 request() 方法是不帶s的,requests庫是帶s的;
        if method.upper() == "GET":
            resp = requests.request(method, url, params=req_data, headers=self.headers)
            return resp
        if method.upper() == "POST":
            # 為了便于學習,簡單的認為就是用 json 格式傳;
            resp = requests.request(method, url, json=req_data, headers=self.headers)
            return resp

    def __del_header(self, token=None):
        """
        如果有token,添加token處理
        :param token: 如果有token,添加token處理
        """
        if token:
            self.headers["Authorization"] = f"Bearer {token}"


if __name__ == '__main__':
    handler_requests = HandlerRequests()

?

k、操作mysql.py? 代碼如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 10:02
# Project: Future_Loan_day15
# Module:  操作mysql.py


import pymysql

# 1、建立連接,創(chuàng)建數據庫連接
# charset='utf8', 注意,不是utf-8 哦
# 返回數據格式控制: cursorclass=pymysql.cursors.DictCursor  加上這個表示返回字典格式的數據;不加的話,以元組的形式返回;
connection = pymysql.connect(host='api.lemonban.com',
                             port=3306,
                             user='future',
                             password='123456',
                             database='futureloan',
                             charset='utf8',
                             cursorclass=pymysql.cursors.DictCursor)

# 2、創(chuàng)建游標
cur = connection.cursor()

# 3、執(zhí)行sql語句 , 返回數據
sql_str = "select * from member where reg_name='娜娜'"   # 方式一

# phone = 18837906872
# sql_str_2 = f"select * from member where mobile_phone='{phone}'"    # 方式二

affected_rows = cur.execute(sql_str)    # 返回影響的行數
print(f'返回影響的行數:{affected_rows}')

# 4、獲取查詢的結果
first_data = cur.fetchone()  # 獲取一條數據,第一條數據;字典形式返回 ;
all_data = cur.fetchall()  # 獲取全部數據;列表形式返回 ;
two_data = cur.fetchmany(size=2)   # 獲取前2行數據;列表形式返回 ;
print(f'獲取第一條數據:{first_data}')
print(f'獲取全部數據:{all_data}')
print(f'獲取前2行數據:{two_data}')

# 5、關閉數據庫連接
cur.close()     # 首先關閉游標
connection.close()      # 關閉數據庫

運行方式如下:

?

執(zhí)行結果如下:

?

本文摘自 :https://www.cnblogs.com/

開通會員,享受整站包年服務立即開通 >