常见的分几种情况:

  1. 多个slave共享一份测试数据 and 不唯一 and 循环利用
  2. 多个slave共享一份测试数据 and 唯一 and 循环利用
  3. 多个slave共享一份测试数据 and 唯一 and 不可循环利用

多个slave共享一份测试数据 and 不唯一 and 循环利用

  1. 创建空队列(queue 先进先出)
  2. 初始化压测数据,插入队列中
  3. 依次从队列首位中取出数据
  4. 把取出的数据再次插入队列尾中 ```python

    !/usr/bin/env python

    -- encoding: utf-8 --

    “””

    @File : parameter_test.py

    @Time : 2021/12/26 20:48:09

    @Author : wangshunzhe

    “”” “””分布式参数化: 共享一份测试数据 and 不唯一 and 循环利用””” from locust import FastHttpUser, task, events from logzero import logger from locust.runners import MasterRunner import queue

创建队列

USER_DATA = queue.Queue()

@events.init.addlistener def (environment, **kwargs):
logger.info(f”初始化压测数据”)

  1. # 用户数据放入队列中
  2. data = 1
  3. USER_DATA.put_nowait(data)

class UserTask1(FastHttpUser): “””压测任务””” host = “” max_wait = 0 min_wait = 0

  1. def on_start(self):
  2. """setup"""
  3. logger.info("***start***")
  4. @task(1)
  5. def api_test(self):
  6. """压测接口"""
  7. # 取出一个队列数据
  8. params_data = USER_DATA.get()
  9. with self.client.get("/", ) as response:
  10. if response.status_code == 200:
  11. response.success()
  12. else:
  13. response.failure(response.json())
  14. # 将取出的数据中重新放回队列中
  15. USER_DATA.put_nowait(params_data)
  1. <a name="uhah6"></a>
  2. #### 多个slave共享一份测试数据 and 唯一 and 不可循环利用
  3. 1. 创建空队列(queue 先进先出)
  4. 2. 初始化压测数据,插入队列中
  5. 3. 依次从队列首位中取出数据
  6. 4. 判断队列为空则停止
  7. <a name="heH8c"></a>
  8. #### 多个slave共享一份测试数据 and 唯一 and 不可循环利用
  9. 方案一:通过locust事件注册init_command_line_parser事件,自定义locust命令行选项,控制每个slave执行不同数据<br />方案二:本地搭建redis,将测试数据存入redis,并依次读取,再插入循环利用
  10. ```python
  11. # 方案二demo
  12. from locust.runners import MasterRunner
  13. from locust import TaskSet, constant, task, events, FastHttpUser
  14. from logzero import logger
  15. import redis
  16. # 本地redis
  17. redis_conn = redis.StrictRedis(host="127.0.0.1", port=6379, db=1)
  18. # 用户信息队列
  19. USERS_QUEUE = queue.Queue()
  20. # 构造用户信息数据
  21. @events.init.add_listener
  22. def on_locust_init(environment, **kawrgs):
  23. if isinstance(environment.runner, MasterRunner):
  24. redis_conn.delete("meeting")
  25. # 用户数据写入redis
  26. result = {"name": "wangshunzhe"}
  27. redis_conn.lpush("meeting", json.dumps(result))
  28. logger.info("写入用户数据完成")
  29. class APIMeetingFeedList(FastHttpUser):
  30. host = ""
  31. max_wait = 0
  32. min_wait = 0
  33. @task(1)
  34. def test_feed_list(self):
  35. try:
  36. # 读取redis
  37. user_data = redis_conn.lpop("meeting")
  38. result = user_data.decode('unicode_escape')
  39. # 将读取的值重新插入队列
  40. redis_conn.rpushx("meeting", user_data)
  41. except BaseException as e:
  42. logger.info(f"redis operate error: {str(e)}")
  43. with self.client.get("/", params=data, headers=headers, catch_response=True,name="/meeting/feedList") as response:
  44. if response.status_code == 200:
  45. response.success()
  46. else:
  47. response.failure(f"{response.json()}, {str(headers)}")
  48. logger.info(f"响应结果:{response.json()}")