常见的分几种情况:
- 多个slave共享一份测试数据 and 不唯一 and 循环利用
- 多个slave共享一份测试数据 and 唯一 and 循环利用
- 多个slave共享一份测试数据 and 唯一 and 不可循环利用
多个slave共享一份测试数据 and 不唯一 and 循环利用
- 创建空队列(queue 先进先出)
- 初始化压测数据,插入队列中
- 依次从队列首位中取出数据
- 把取出的数据再次插入队列尾中
```python
!/usr/bin/env python
-- encoding: utf-8 --
“””@File : parameter_test.py
@Time : 2021/12/26 20:48:09
@Author : wangshunzhe
“”” “””分布式参数化: 共享一份测试数据 and 不唯一 and 循环利用””” from locust import FastHttpUser, task, events from logzero import logger from locust.runners import MasterRunner import queue
创建队列
USER_DATA = queue.Queue()
@events.init.addlistener
def (environment, **kwargs):
logger.info(f”初始化压测数据”)
# 用户数据放入队列中
data = 1
USER_DATA.put_nowait(data)
class UserTask1(FastHttpUser): “””压测任务””” host = “” max_wait = 0 min_wait = 0
def on_start(self):
"""setup"""
logger.info("***start***")
@task(1)
def api_test(self):
"""压测接口"""
# 取出一个队列数据
params_data = USER_DATA.get()
with self.client.get("/", ) as response:
if response.status_code == 200:
response.success()
else:
response.failure(response.json())
# 将取出的数据中重新放回队列中
USER_DATA.put_nowait(params_data)
<a name="uhah6"></a>
#### 多个slave共享一份测试数据 and 唯一 and 不可循环利用
1. 创建空队列(queue 先进先出)
2. 初始化压测数据,插入队列中
3. 依次从队列首位中取出数据
4. 判断队列为空则停止
<a name="heH8c"></a>
#### 多个slave共享一份测试数据 and 唯一 and 不可循环利用
方案一:通过locust事件注册init_command_line_parser事件,自定义locust命令行选项,控制每个slave执行不同数据<br />方案二:本地搭建redis,将测试数据存入redis,并依次读取,再插入循环利用
```python
# 方案二demo
from locust.runners import MasterRunner
from locust import TaskSet, constant, task, events, FastHttpUser
from logzero import logger
import redis
# 本地redis
redis_conn = redis.StrictRedis(host="127.0.0.1", port=6379, db=1)
# 用户信息队列
USERS_QUEUE = queue.Queue()
# 构造用户信息数据
@events.init.add_listener
def on_locust_init(environment, **kawrgs):
if isinstance(environment.runner, MasterRunner):
redis_conn.delete("meeting")
# 用户数据写入redis
result = {"name": "wangshunzhe"}
redis_conn.lpush("meeting", json.dumps(result))
logger.info("写入用户数据完成")
class APIMeetingFeedList(FastHttpUser):
host = ""
max_wait = 0
min_wait = 0
@task(1)
def test_feed_list(self):
try:
# 读取redis
user_data = redis_conn.lpop("meeting")
result = user_data.decode('unicode_escape')
# 将读取的值重新插入队列
redis_conn.rpushx("meeting", user_data)
except BaseException as e:
logger.info(f"redis operate error: {str(e)}")
with self.client.get("/", params=data, headers=headers, catch_response=True,name="/meeting/feedList") as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"{response.json()}, {str(headers)}")
logger.info(f"响应结果:{response.json()}")