# -*- coding: utf-8 -*-## file: socketio# Author: ShunZhe# Date: 2021/6/26import jsonimport loggingimport reimport timeimport geventfrom websocket import create_connectionfrom locust import User"""自定义websocket客户端"""header = {'Accept-Encoding': 'gzip, deflate, br','Connection': 'keep-alive','Content-Type': 'application/json;charset=UTF-8','User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ''Chrome/87.0.4280.88 Safari/537.36 '}class SocketIO(object):_locust_environment = Nonestart_time = 0def __init__(self):self.events = Noneself.ws = Nonedef _require_failure(self, name, response_time, message, action, context, **kwargs):"""统计失败请求数"""self.events.request_failure.fire(request_type="接收数据",name=name,response_time=response_time,response_length=len(message),exception=f'{action} Content Error!',context=context,)def _require_success(self, name, response_time, message, **kwargs):"""统计成功请求数"""self.events.request_success.fire(request_type="接收数据",name=name,response_time=response_time,response_length=len(message),)def _require_fire(self, name, response_time, message, context, **kwargs):"""通用统计请求数"""self.events.request.fire(request_type="接收数据",name=name,response_time=response_time,response_length=len(message),exception=None,context=context,)def connect(self, host):"""建立连接"""self.ws = create_connection(host, header=header)gevent.spawn(self.receive)def receive(self, context=None):"""接收事件下发数据"""message_regex = re.compile(r"(\d*)(.*)")while True:message = self.ws.recv()total_time = int((time.time() - self.start_time) * 1000) # 统计响应时间logging.info(f"WSR: {message}")m = message_regex.match(message)if m is None:raise Exception(f"got no matches in {message}")code = m.group(1) # 正则获取codejson_string = m.group(2) # 正则获取下发数据if code == "0":name = "0 open"elif code == "3":name = "3 heartbeat"elif code == "40":name = "40 message ok"elif code == "42":"""根据具体的业务场景统计请求的成功或者失败情况此处校验op操作如果length返回小于1则为失败"""obj = json.loads(json_string)ts_type, payload = objname = f"{code} {ts_type}"if 'join_ack' == ts_type:if payload['code'] == 1 or payload['code'] == 20000002:self._require_success(name, total_time, message)else:self._require_failure(name, total_time, message, ts_type, context)if 'operation_ack' == ts_type:"""定义op操作成功、失败断言"""if payload['code'] == 1 and payload['length'] >= 1:self._require_success(name, total_time, message)else:self._require_failure(name, total_time, message, ts_type, context)elif 'set_doc_ack' == ts_type:"""定义set_doc操作成功、失败断言"""if payload['code'] == 1:self._require_success(name, total_time, message)else:self._require_failure(name, total_time, message, ts_type, context)else:logging.info(f"Received unexpected message: {message}")continueif name in ('42 operation_ack', '42 set_doc_ack', '42 join_ack'):passelse:self._require_fire(name, total_time, message, context)def send(self, body, context=None):"""发送数据"""# if context is None:# context = {}# if body == "2":# action = "2 heartbeat"# else:# m = re.compile(r"(\d*)(.*)").match(body)# assert m is not None# code = m.group(1)# action = m.group(2).split(',')[0].split('\"')[1]# action = f"{code} {action}"# self._require_fire(name=action, response_time=None, message=body, context=context)# logging.info(f"WSS: {body}")self.start_time = time.time() # 开始发送数据的时间self.ws.send(body)def sleep_with_heartbeat(self, seconds):"""模拟心跳"""while seconds >= 0:gevent.sleep(min(25, seconds))seconds -= 25self.send("2")# logging.info('模拟心跳~')class SocketIOUser(User):abstract = Truedef __init__(self, *args, **kwargs):super(SocketIOUser, self).__init__(*args, **kwargs)self.client = SocketIO()self.client._locust_environment = self.environmentself.client.events = self.environment.events
测试脚本:
# -*- coding: utf-8 -*-## file: websock_op# Author: ShunZhe# Date: 2021/6/26import jsonimport osimport queueimport uuidfrom gevent import sleepfrom gevent._semaphore import Semaphore, BoundedSemaphorefrom locust import task, TaskSet, events, constantfrom locust.log import setup_loggingfrom locust.runners import MasterRunnerfrom socket_client import SocketIOUsersetup_logging("INFO", None)# all_locusts_spawned = Semaphore()all_locusts_spawned = BoundedSemaphore()all_locusts_spawned.acquire()@events.spawning_complete.add_listenerdef on_hatch_complete(**kwargs):all_locusts_spawned.release() # 创建集合点# events.spawning_complete.add_listener(on_hatch_complete) # 1.0之后的写法work_ids = queue.Queue()@events.init_command_line_parser.add_listenerdef add_user_parser(parser, **kwargs):"""自定义locust命令参数"""parser.add_argument("-fi","--first_index",help="获取截取列表首索引""[fc:]",default=0,type=str)parser.add_argument("-li","--last_index",help="获取截取列表末索引""[:li]",default=0,type=str)args = parser.parse_args()if args.first_index:os.environ['first_index'] = args.first_indexif args.last_index:os.environ['last_index'] = args.last_index@events.init.add_listenerdef on_locust_init(environment, **kwargs):print("当前环境变量first_index为: %s" % os.environ.get('first_index'))print("当前环境变量last_index为: %s" % os.environ.get('last_index'))work_id_data()print('初始化测试数据完毕')if isinstance(environment.runner, MasterRunner):print("master节点执行")else:print("worker节点执行")def work_id_data():"""队列读取作品ID"""# work_ids = queue.Queue()with open('work_id.txt', 'r', encoding='utf-8') as f:for i in f.readlines()[int(os.environ.get('first_index')):int(os.environ.get('last_index'))]:work_ids.put_nowait(int(i.strip('\n')))return work_idsdef version_id_data():"""生成version_ids字典"""version_id = []for i in range(1000000):version_id.append(i)version_ids = dict(zip(version_id, version_id))return version_idsdef read_doc():"""获取doc字符串"""with open('set_doc.json', 'r', encoding="utf-8") as f:data = json.load(f)data = json.dumps(data)return ''.join(('42', data))def create_op_data():"""op操作-创建用户同步"""with open('user_sync.json', 'r', encoding="utf-8") as f:data = json.load(f)data = json.dumps(data)return ''.join(('42', data))class ApiUser(TaskSet):"""压测op操作"""index = 1@task(15)def test_op(self):"""op操作"""all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态self.client.send('42["operation",{'f'"revision":{self.user.version_id.get(self.index)}'',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'f'"{uuid.uuid1()}"''}]]]}]')self.index += 1@task(1)def test_sleep_with_heartbeat(self):"""心跳"""self.client.sleep_with_heartbeat(24)class JoinUser(TaskSet):"""验证最大连接数"""index = 1@task(1)def test_op(self):"""op操作"""sleep(10)all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态self.client.send('42["operation",{'f'"revision":{self.user.version_id.get(self.index)}'',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'f'"{uuid.uuid1()}"''}]]]}]')self.index += 1@task(1)def test_sleep_with_heartbeat(self):"""心跳"""self.client.sleep_with_heartbeat(10)class WebSocketUser(SocketIOUser):host = "wss://press-socketcoll"# wait_time = between(0.2, 0.5) # 设置任务执行完毕等待0.2~0.5wait_time = constant(1) # 设置任务执行等待1s后开始下一个任务# wait_time = constant_pacing(1) # 设置任务启动总等待时间1sversion_id = version_id_data() # op操作版本号doc_data = read_doc() # doc jsonuser_op = create_op_data() # 创建用户同步tasks = {ApiUser}# tasks = {JoinUser} # 测试场景:最大连接数def on_start(self):work_id = work_ids.get() # 队列获取作品IDprint('当前作品为: %s' % work_id)# work_id = 3515541 # 队列获取作品IDdoc_data = self.doc_data # 作品docuser_op = self.user_op # 用户同步token = 'CDM5LTk2MjYwOWVhMTA0YSJ9.2k2QUywuWjjHu5P2I'url = f'wss://press-socketcoll/?work_id={work_id}&stag=1&token={token}&EIO=3&transport=websocket'self.client.connect(url)self.client.send(f'42["join",{work_id}]')sleep(1)self.client.send(doc_data)sleep(1)# 创建用户同步self.client.send(user_op)sleep(1)if __name__ == "__main__":file_name = os.path.abspath(__file__)os.system(f'locust -f {file_name} --master --first_index 0 --last_index 1000')# # setup Environment and Runner# env = Environment(user_classes=[WebSocketUser])# env.create_local_runner()# env.events.request_success.fire()# # start a WebUI instance# env.create_web_ui("127.0.0.1", 8089)# # start a greenlet that periodically outputs the current stats# gevent.spawn(stats_printer(env.stats))# ## # # start a greenlet that save current stats to history# gevent.spawn(stats_history, env.runner)# # start the test# env.runner.start(1000, spawn_rate=30)# # in 60 seconds stop the runner# gevent.spawn_later(30, lambda: env.runner.quit())# # wait for the greenlets# env.runner.greenlet.join()# # stop the web server for good measures# env.web_ui.stop()
运行结果:

参考locust插件项目:https://github.com/SvenskaSpel/locust-plugins
