# -*- coding: utf-8 -*-#
# file: socketio
# Author: ShunZhe
# Date: 2021/6/26
import json
import logging
import re
import time
import gevent
from websocket import create_connection
from locust import User
"""自定义websocket客户端"""
header = {
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Content-Type': 'application/json;charset=UTF-8',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.88 Safari/537.36 '
}
class SocketIO(object):
_locust_environment = None
start_time = 0
def __init__(self):
self.events = None
self.ws = None
def _require_failure(self, name, response_time, message, action, context, **kwargs):
"""统计失败请求数"""
self.events.request_failure.fire(
request_type="接收数据",
name=name,
response_time=response_time,
response_length=len(message),
exception=f'{action} Content Error!',
context=context,
)
def _require_success(self, name, response_time, message, **kwargs):
"""统计成功请求数"""
self.events.request_success.fire(
request_type="接收数据",
name=name,
response_time=response_time,
response_length=len(message),
)
def _require_fire(self, name, response_time, message, context, **kwargs):
"""通用统计请求数"""
self.events.request.fire(
request_type="接收数据",
name=name,
response_time=response_time,
response_length=len(message),
exception=None,
context=context,
)
def connect(self, host):
"""建立连接"""
self.ws = create_connection(host, header=header)
gevent.spawn(self.receive)
def receive(self, context=None):
"""接收事件下发数据"""
message_regex = re.compile(r"(\d*)(.*)")
while True:
message = self.ws.recv()
total_time = int((time.time() - self.start_time) * 1000) # 统计响应时间
logging.info(f"WSR: {message}")
m = message_regex.match(message)
if m is None:
raise Exception(f"got no matches in {message}")
code = m.group(1) # 正则获取code
json_string = m.group(2) # 正则获取下发数据
if code == "0":
name = "0 open"
elif code == "3":
name = "3 heartbeat"
elif code == "40":
name = "40 message ok"
elif code == "42":
"""
根据具体的业务场景统计请求的成功或者失败情况
此处校验op操作如果length返回小于1则为失败
"""
obj = json.loads(json_string)
ts_type, payload = obj
name = f"{code} {ts_type}"
if 'join_ack' == ts_type:
if payload['code'] == 1 or payload['code'] == 20000002:
self._require_success(name, total_time, message)
else:
self._require_failure(name, total_time, message, ts_type, context)
if 'operation_ack' == ts_type:
"""定义op操作成功、失败断言"""
if payload['code'] == 1 and payload['length'] >= 1:
self._require_success(name, total_time, message)
else:
self._require_failure(name, total_time, message, ts_type, context)
elif 'set_doc_ack' == ts_type:
"""定义set_doc操作成功、失败断言"""
if payload['code'] == 1:
self._require_success(name, total_time, message)
else:
self._require_failure(name, total_time, message, ts_type, context)
else:
logging.info(f"Received unexpected message: {message}")
continue
if name in ('42 operation_ack', '42 set_doc_ack', '42 join_ack'):
pass
else:
self._require_fire(name, total_time, message, context)
def send(self, body, context=None):
"""发送数据"""
# if context is None:
# context = {}
# if body == "2":
# action = "2 heartbeat"
# else:
# m = re.compile(r"(\d*)(.*)").match(body)
# assert m is not None
# code = m.group(1)
# action = m.group(2).split(',')[0].split('\"')[1]
# action = f"{code} {action}"
# self._require_fire(name=action, response_time=None, message=body, context=context)
# logging.info(f"WSS: {body}")
self.start_time = time.time() # 开始发送数据的时间
self.ws.send(body)
def sleep_with_heartbeat(self, seconds):
"""模拟心跳"""
while seconds >= 0:
gevent.sleep(min(25, seconds))
seconds -= 25
self.send("2")
# logging.info('模拟心跳~')
class SocketIOUser(User):
abstract = True
def __init__(self, *args, **kwargs):
super(SocketIOUser, self).__init__(*args, **kwargs)
self.client = SocketIO()
self.client._locust_environment = self.environment
self.client.events = self.environment.events
测试脚本:
# -*- coding: utf-8 -*-#
# file: websock_op
# Author: ShunZhe
# Date: 2021/6/26
import json
import os
import queue
import uuid
from gevent import sleep
from gevent._semaphore import Semaphore, BoundedSemaphore
from locust import task, TaskSet, events, constant
from locust.log import setup_logging
from locust.runners import MasterRunner
from socket_client import SocketIOUser
setup_logging("INFO", None)
# all_locusts_spawned = Semaphore()
all_locusts_spawned = BoundedSemaphore()
all_locusts_spawned.acquire()
@events.spawning_complete.add_listener
def on_hatch_complete(**kwargs):
all_locusts_spawned.release() # 创建集合点
# events.spawning_complete.add_listener(on_hatch_complete) # 1.0之后的写法
work_ids = queue.Queue()
@events.init_command_line_parser.add_listener
def add_user_parser(parser, **kwargs):
"""自定义locust命令参数"""
parser.add_argument(
"-fi",
"--first_index",
help="获取截取列表首索引"
"[fc:]",
default=0,
type=str
)
parser.add_argument(
"-li",
"--last_index",
help="获取截取列表末索引"
"[:li]",
default=0,
type=str
)
args = parser.parse_args()
if args.first_index:
os.environ['first_index'] = args.first_index
if args.last_index:
os.environ['last_index'] = args.last_index
@events.init.add_listener
def on_locust_init(environment, **kwargs):
print("当前环境变量first_index为: %s" % os.environ.get('first_index'))
print("当前环境变量last_index为: %s" % os.environ.get('last_index'))
work_id_data()
print('初始化测试数据完毕')
if isinstance(environment.runner, MasterRunner):
print("master节点执行")
else:
print("worker节点执行")
def work_id_data():
"""队列读取作品ID"""
# work_ids = queue.Queue()
with open('work_id.txt', 'r', encoding='utf-8') as f:
for i in f.readlines()[int(os.environ.get('first_index')):int(os.environ.get('last_index'))]:
work_ids.put_nowait(int(i.strip('\n')))
return work_ids
def version_id_data():
"""生成version_ids字典"""
version_id = []
for i in range(1000000):
version_id.append(i)
version_ids = dict(zip(version_id, version_id))
return version_ids
def read_doc():
"""获取doc字符串"""
with open('set_doc.json', 'r', encoding="utf-8") as f:
data = json.load(f)
data = json.dumps(data)
return ''.join(('42', data))
def create_op_data():
"""op操作-创建用户同步"""
with open('user_sync.json', 'r', encoding="utf-8") as f:
data = json.load(f)
data = json.dumps(data)
return ''.join(('42', data))
class ApiUser(TaskSet):
"""压测op操作"""
index = 1
@task(15)
def test_op(self):
"""op操作"""
all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态
self.client.send('42["operation",{'
f'"revision":{self.user.version_id.get(self.index)}'
',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'
f'"{uuid.uuid1()}"'
'}]]]}]')
self.index += 1
@task(1)
def test_sleep_with_heartbeat(self):
"""心跳"""
self.client.sleep_with_heartbeat(24)
class JoinUser(TaskSet):
"""验证最大连接数"""
index = 1
@task(1)
def test_op(self):
"""op操作"""
sleep(10)
all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态
self.client.send('42["operation",{'
f'"revision":{self.user.version_id.get(self.index)}'
',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'
f'"{uuid.uuid1()}"'
'}]]]}]')
self.index += 1
@task(1)
def test_sleep_with_heartbeat(self):
"""心跳"""
self.client.sleep_with_heartbeat(10)
class WebSocketUser(SocketIOUser):
host = "wss://press-socketcoll"
# wait_time = between(0.2, 0.5) # 设置任务执行完毕等待0.2~0.5
wait_time = constant(1) # 设置任务执行等待1s后开始下一个任务
# wait_time = constant_pacing(1) # 设置任务启动总等待时间1s
version_id = version_id_data() # op操作版本号
doc_data = read_doc() # doc json
user_op = create_op_data() # 创建用户同步
tasks = {ApiUser}
# tasks = {JoinUser} # 测试场景:最大连接数
def on_start(self):
work_id = work_ids.get() # 队列获取作品ID
print('当前作品为: %s' % work_id)
# work_id = 3515541 # 队列获取作品ID
doc_data = self.doc_data # 作品doc
user_op = self.user_op # 用户同步
token = 'CDM5LTk2MjYwOWVhMTA0YSJ9.2k2QUywuWjjHu5P2I'
url = f'wss://press-socketcoll/?work_id={work_id}&stag=1&token={token}&EIO=3&transport=websocket'
self.client.connect(url)
self.client.send(f'42["join",{work_id}]')
sleep(1)
self.client.send(doc_data)
sleep(1)
# 创建用户同步
self.client.send(user_op)
sleep(1)
if __name__ == "__main__":
file_name = os.path.abspath(__file__)
os.system(f'locust -f {file_name} --master --first_index 0 --last_index 1000')
# # setup Environment and Runner
# env = Environment(user_classes=[WebSocketUser])
# env.create_local_runner()
# env.events.request_success.fire()
# # start a WebUI instance
# env.create_web_ui("127.0.0.1", 8089)
# # start a greenlet that periodically outputs the current stats
# gevent.spawn(stats_printer(env.stats))
# #
# # # start a greenlet that save current stats to history
# gevent.spawn(stats_history, env.runner)
# # start the test
# env.runner.start(1000, spawn_rate=30)
# # in 60 seconds stop the runner
# gevent.spawn_later(30, lambda: env.runner.quit())
# # wait for the greenlets
# env.runner.greenlet.join()
# # stop the web server for good measures
# env.web_ui.stop()
运行结果:
参考locust插件项目:https://github.com/SvenskaSpel/locust-plugins