常见的分几种情况:
- 多个slave共享一份测试数据 and 不唯一 and 循环利用
- 多个slave共享一份测试数据 and 唯一 and 循环利用
- 多个slave共享一份测试数据 and 唯一 and 不可循环利用
多个slave共享一份测试数据 and 不唯一 and 循环利用
- 创建空队列(queue 先进先出)
- 初始化压测数据,插入队列中
- 依次从队列首位中取出数据
- 把取出的数据再次插入队列尾中
```python
!/usr/bin/env python
-- encoding: utf-8 --
“””@File : parameter_test.py
@Time : 2021/12/26 20:48:09
@Author : wangshunzhe
“”” “””分布式参数化: 共享一份测试数据 and 不唯一 and 循环利用””” from locust import FastHttpUser, task, events from logzero import logger from locust.runners import MasterRunner import queue
创建队列
USER_DATA = queue.Queue()
@events.init.addlistener
def (environment, **kwargs):
logger.info(f”初始化压测数据”)
# 用户数据放入队列中data = 1USER_DATA.put_nowait(data)
class UserTask1(FastHttpUser): “””压测任务””” host = “” max_wait = 0 min_wait = 0
def on_start(self):"""setup"""logger.info("***start***")@task(1)def api_test(self):"""压测接口"""# 取出一个队列数据params_data = USER_DATA.get()with self.client.get("/", ) as response:if response.status_code == 200:response.success()else:response.failure(response.json())# 将取出的数据中重新放回队列中USER_DATA.put_nowait(params_data)
<a name="uhah6"></a>#### 多个slave共享一份测试数据 and 唯一 and 不可循环利用1. 创建空队列(queue 先进先出)2. 初始化压测数据,插入队列中3. 依次从队列首位中取出数据4. 判断队列为空则停止<a name="heH8c"></a>#### 多个slave共享一份测试数据 and 唯一 and 不可循环利用方案一:通过locust事件注册init_command_line_parser事件,自定义locust命令行选项,控制每个slave执行不同数据<br />方案二:本地搭建redis,将测试数据存入redis,并依次读取,再插入循环利用```python# 方案二demofrom locust.runners import MasterRunnerfrom locust import TaskSet, constant, task, events, FastHttpUserfrom logzero import loggerimport redis# 本地redisredis_conn = redis.StrictRedis(host="127.0.0.1", port=6379, db=1)# 用户信息队列USERS_QUEUE = queue.Queue()# 构造用户信息数据@events.init.add_listenerdef on_locust_init(environment, **kawrgs):if isinstance(environment.runner, MasterRunner):redis_conn.delete("meeting")# 用户数据写入redisresult = {"name": "wangshunzhe"}redis_conn.lpush("meeting", json.dumps(result))logger.info("写入用户数据完成")class APIMeetingFeedList(FastHttpUser):host = ""max_wait = 0min_wait = 0@task(1)def test_feed_list(self):try:# 读取redisuser_data = redis_conn.lpop("meeting")result = user_data.decode('unicode_escape')# 将读取的值重新插入队列redis_conn.rpushx("meeting", user_data)except BaseException as e:logger.info(f"redis operate error: {str(e)}")with self.client.get("/", params=data, headers=headers, catch_response=True,name="/meeting/feedList") as response:if response.status_code == 200:response.success()else:response.failure(f"{response.json()}, {str(headers)}")logger.info(f"响应结果:{response.json()}")
