一:框架构思

(此代码只作为简单演示使用,因为好多问题没有考虑到,时间有限,没有做参数化,没有重跑机制,代码规范等等,请各位仅供参考。)

base:是基于seleniium的二次封装的点击、输入、刷新等操作
common:是基于业务的底层公共方法
config:配置文件
log:收集log的方法,以及生成的截图
excute_logs:生成的日志都会打印在一个文件
page_object:webui登录的方法和一些二次封装的方法
testcase:是testcase
reports:是生成reports的方法和生成的报告

image.png

二:框架代码展示

base_page.py文件

  1. # coding=utf-8
  2. import time
  3. from time import sleep
  4. from log.getLogStream import logStream
  5. log = logStream()
  6. # 创建基类
  7. class BasePage:
  8. # driver = webdriver.Chrome()
  9. # 构造函数
  10. def __init__(self, driver):
  11. log.info('初始化driver{}'.format(driver))
  12. self.driver = driver
  13. # 访问URL
  14. def open(self, url):
  15. """
  16. function: 打开浏览器,访问url
  17. description:
  18. arg:
  19. return:
  20. """
  21. log.info('访问网址')
  22. self.driver.get(url)
  23. self.driver.maximize_window()
  24. sleep(3)
  25. # 元素定位
  26. def locator(self, loc):
  27. """
  28. function: 定位元素
  29. description:
  30. arg:
  31. return:
  32. """
  33. log.info('正在定位{}元素'.format(loc))
  34. return self.driver.find_element(*loc)
  35. # 输入
  36. def input_(self, loc, txt):
  37. """
  38. function: 输入
  39. description:
  40. arg:
  41. return:
  42. """
  43. try:
  44. log.info('正在定位{}元素, 输入{}内容'.format(loc, txt))
  45. self.locator(loc).send_keys(txt)
  46. sleep(2)
  47. except Exception as e:
  48. self.screenShot()
  49. log.error('错误日志' % e)
  50. # 点击
  51. def click(self, loc):
  52. """
  53. function: 点击
  54. description:
  55. arg:
  56. return:
  57. """
  58. try:
  59. log.info('正在点击{}元素'.format(loc))
  60. self.locator(loc).click()
  61. except Exception as e:
  62. self.screenShot()
  63. log.error('错误日志' % e)
  64. # 等待
  65. def wait(self, time_):
  66. """
  67. function: 等待
  68. description:
  69. arg:
  70. return:
  71. """
  72. log.info('等待时间{}秒'.format(time_))
  73. sleep(time_)
  74. # 关闭
  75. def quit(self):
  76. """
  77. function: 退出
  78. description:
  79. arg:
  80. return:
  81. """
  82. log.info('退出')
  83. self.driver.quit()
  84. # 最大化
  85. def maxSize(self):
  86. """
  87. function: 最大化
  88. description:
  89. arg:
  90. return:
  91. """
  92. log.info('最大化')
  93. self.driver.maximize_window()
  94. # 截图并保存
  95. def screenShot(self):
  96. """
  97. function: 绑定服务器
  98. description: bind服务器
  99. arg:
  100. return:
  101. """
  102. current_time = time.strftime('%Y-%m-%d %H-%M-%S')
  103. print(current_time)
  104. pic_path = '../log/screenshot' + '/' + current_time + '.png'
  105. self.driver.save_screenshot(pic_path)
  106. # 关闭浏览器
  107. def close(self):
  108. """
  109. function: 关闭当前浏览器
  110. description:
  111. arg:
  112. return:
  113. """
  114. self.driver.close()
  115. # 刷新浏览器
  116. def refresh(self):
  117. """
  118. function: 刷新浏览器
  119. description:
  120. arg:
  121. return:
  122. """
  123. self.driver.refresh()
  124. def waitUntilPageContains(self, message):
  125. """
  126. function: 等待界面出现某个字段
  127. description:
  128. arg:
  129. :return:
  130. example: self.waitUntilPageContains('vsite_automation')
  131. """
  132. log.info('正在获取页面%s字段' % message)
  133. sleep(3)
  134. msg = self.driver.find_element_by_xpath('//*[contains(text(), "%s")]' % message).text
  135. print(msg)
  136. return msg

common目录下的业务文件

  1. import paramiko
  2. from time import sleep
  3. import re
  4. class CLI:
  5. def ssh_ag(self):
  6. """
  7. :param self:
  8. :return:
  9. """
  10. # 创建ssh对象
  11. self.ssh = paramiko.SSHClient()
  12. # 允许连接不在know_hosts文件中的主机
  13. self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
  14. # 连接AG
  15. self.ssh.connect(hostname='192.168.120.220', port=22, username='gaojs', password='123')
  16. sleep(5)
  17. channel = self.ssh.invoke_shell()
  18. self.channel = channel
  19. channel.settimeout(5)
  20. sleep(5)
  21. self.cli_cmd('enable')
  22. self.cli_cmd('')
  23. self.cli_cmd('config ter')
  24. def print_step(self):
  25. """
  26. :return:
  27. """
  28. result = self.channel.recv(2048)
  29. print(result.decode())

getLogStream.py收集日志文件

  1. import logging
  2. def logStream():
  3. # 创建一个日志器
  4. logger = logging.getLogger()
  5. # 设置日志级别为info
  6. logger.setLevel(logging.INFO)
  7. # 日志想要输出到哪,就要制定输出目的地,控制台 要创建一个控制台处理器
  8. console = logging.StreamHandler()
  9. logger.addHandler(console)
  10. # 创建一个格式器
  11. # 设置日志格式
  12. fmt = '%(asctime)s %(filename)s %(levelname)s %(module)s %(funcName)s %(message)s'
  13. # 生成日志信息 生成时间 文件名 日志的状态 类名 方法名 日志内容
  14. fomator = logging.Formatter(fmt)
  15. # 优化及控制台格式
  16. console.setFormatter(fomator)
  17. # 创建一个处理器 文本处理器 制定日期信息输出到文本处理器
  18. filehandler = logging.FileHandler('../excute_logs/logger.log', encoding='utf-8')
  19. logger.addHandler(filehandler)
  20. # 设置logger.log文本格式
  21. filehandler.setFormatter(fomator)
  22. return logger

base_logging.py文件

  1. import logging
  2. def logger():
  3. # 设置日志格式
  4. fmt = '%(asctime)s %(filename)s %(levelname)s %(module)s %(funcName)s %(message)s'
  5. # 生成日志信息 生成时间 文件名 日志的状态 类名 方法名 日志内容
  6. # 设置日志级别
  7. logging.basicConfig(level=logging.INFO, format=fmt, filename='../excute_logs/log.log')
  8. return logging

page_object目录下的login_page.py文件

  1. from time import sleep
  2. from selenium.webdriver.common.by import By
  3. from base.base_page import BasePage
  4. from selenium import webdriver
  5. class LoginPage(BasePage):
  6. def login(self, username, password):
  7. """
  8. description:登录webui界面
  9. :param username:
  10. :param password:
  11. :return:
  12. example:
  13. self.login('array', 'admin')
  14. """
  15. self.driver = webdriver.Chrome()
  16. # URL
  17. # self.url = "https://%s:%s" % ip, port
  18. self.url = "https://192.168.120.209:8888"
  19. # 页面元素
  20. self.user = (By.NAME, "username")
  21. self.passwd = (By.ID, 'password')
  22. self.button = (By.ID, 'loginID')
  23. # 访问URL,最大化
  24. self.open(self.url)
  25. # 点击页面上不是隐私连接提示
  26. try:
  27. self.waitUntilPageContains('不是私密连接')
  28. self.driver.find_element_by_xpath('//button[@id="details-button"]').click()
  29. sleep(1)
  30. self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click()
  31. except:
  32. pass
  33. # 输入账号
  34. sleep(5)
  35. self.input_(self.user, username)
  36. # 输入密码
  37. self.input_(self.passwd, password)
  38. # 点击登录按钮
  39. self.click(self.button)
  40. sleep(5)
  41. def login_L3vpn_test(self, ip, method, username, password, challenge=None, challenge_passwd1=None, challenge_passwd2=None):
  42. """
  43. function:登录L3vpn
  44. :argument:
  45. ip: 虚拟站点IP
  46. method:方法名
  47. username:用户名
  48. password:密码
  49. :return:
  50. examlpe:
  51. self.login_L3vpn_test('192.168.120.x', 'http_challenge', 'array', 'admin')
  52. self.login_L3vpn_test(self.vsiteip, 'http_challenge_test', self.username, self.passwd,
  53. challenge=True, challenge_passwd1='chal1', challenge_passwd2='chal2')
  54. """
  55. self.driver = webdriver.Chrome()
  56. # URL
  57. self.url = "https://%s" % ip
  58. # 页面元素
  59. self.user = (By.NAME, "uname")
  60. self.passwd = (By.NAME, 'pwd')
  61. self.button = (By.NAME, 'submitbutton')
  62. self.select_method = (By.NAME, "method")
  63. self.challenge_signin = (By.NAME, "option")
  64. # 访问URL,最大化
  65. self.open(self.url)
  66. sleep(5)
  67. try:
  68. self.driver.find_element_by_xpath('//button[@id="details-button"]').click()
  69. sleep(1)
  70. self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click()
  71. except:
  72. pass
  73. # 切换方法
  74. self.click(self.select_method)
  75. self.driver.find_element_by_xpath('//option[@value="%s"]' % method).click()
  76. # 输入账号
  77. sleep(5)
  78. self.input_(self.user, username)
  79. # 输入密码
  80. self.input_(self.passwd, password)
  81. # 点击登录按钮
  82. self.click(self.button)
  83. sleep(5)
  84. # 挑战模式
  85. if challenge:
  86. try:
  87. self.input_(self.passwd, challenge_passwd1)
  88. self.click(self.challenge_signin)
  89. self.input_(self.passwd, challenge_passwd2)
  90. self.click(self.challenge_signin)
  91. self.waitUntilPageContains('welcome to the ArrayOS')
  92. except Exception as e:
  93. print(e)
  94. print('挑战失败,请重试!')
  95. else:
  96. print('不符合挑战条件,请检查配置!')

reports目录下的testReports.py文件

import time
import unittest
from BeautifulReport import BeautifulReport


# 找到用例defaultTestLoader默认加载
# import HTMLTestRunner


def testReports():
    """
    function: 生成测试报告方法
    description: 生成测试报告
    arg:
    return:
    """
    case_dir = '../testcase/aaa_http/'
    discover = unittest.defaultTestLoader.discover(case_dir, 'test*.py')

    # 用时间命名测试报告   测试报告生成时间  +  后缀名   2021-11-20 14-49-30test_report.html
    report_dir = '../reports/'
    now = time.strftime('%Y-%m-%d %H-%M-%S')
    report_name = report_dir + '/' +now+ '_test_report.html'

    with open(report_name, 'wb') as f:
        # 执行用例
        # HTMLTestRunner.HTMLTestRunner(stream=f, verbosity=2, title='unittest测试报告练习', description='练习HTMLTestRunner使用').run(discover)
        BeautifulReport(discover).report(description=u'UAG每日构建测试报告', filename=report_name, report_dir='../reports/')

run.py文件:

from reports.testReports import testReports


if __name__ == "__main__":
    testReports()

三:用例代码

testcase目录下的test_01文件

import unittest
from common.agCli import *
from page_object.login_page import *


class Testcase(unittest.TestCase, LoginPage, CLI):

    def setUp(self) -> None:
        self.ssh_ag()
        self.vsitename = 'vsite_automation'
        self.username = 'array'
        self.passwd = 'admin'
        self.message = 'vsite_automation'

    def test_01(self):
        """
        description:  cli创建虚拟站点,登录webui去查看是否创建成功,是否有vsite_automation虚拟站点信息
        :return:
        date: 2022/02/17
        author: gaojs
        """
        self.cli_cmd('virtual site name vsite_automation')
        self.login(self.username, self.passwd)
        self.switch_vsite(self.vsitename)
        msg = self.waitUntilPageContains(self.message)
        print(msg)
        if msg not in(self.message):
            raise Exception('切换虚拟站点失败,请重试!')

    # 恢复环境
    def tearDown(self) -> None:
        self.cli_cmd('no virtual site name vsite_automation')
        self.cli_cmd('YES')
        self.quit_enable()
        self.close()

压力测试用例test_02:

from locust import HttpUser, between, task, TaskSet
import os
from common.agCli import *
import logging


class TaskTest(TaskSet, CLI):

    # 执行并发前置动作,比如清理当前所有session
    def on_start(self):
        """
        description:登录ag, 清理log
        :return:
        """
        self.ssh_ag()
        self.clear_log()
        logging.info('清理log结束,压测开始!!!')

    # 压测任务,也可以是@task(10)啥的,这个数字是代表权重,数值越大,执行的频率就越高
    @task
    def login(self):
        url = '/prx/000/http/localh/login'
        data = {
            "method": "http1",
            "uname": "gaojs",
            "pwd1": "",
            "pwd2": "",
            "pwd": "admin",
            "submitbutton": "Sign"
        }
        header = {"Content-Type": "application/json;charset=UTF-8"}
        self.client.request(method='POST', url=url, data=data, headers=header, name='登录虚拟站点', verify=False, allow_redirects=False)

    # 执行并发测试后执行的动作,比如保存log等操作,查看报告http://localhost:8089/
    def on_stop(self):
        self.ssh_ag()
        self.cli_cmd('switch vsite')
        self.cli_cmd('session kill all')
        logging.info('清理session结束,压测结束,请查看report, http://localhost:8089!!!')


class Login(HttpUser):
    host = 'https://192.168.120.206'
    # 每次请求停顿时间
    wait_time = between(1, 3)
    tasks = [TaskTest]


if __name__ == "__main__":
    os.system("locust -f locust_test.py --host=https://192.168.120.206")

四:测试报告

生成报告展示:
image.png

image.png