1. # -*- coding: utf-8 -*-#
    2. # file: socketio
    3. # Author: ShunZhe
    4. # Date: 2021/6/26
    5. import json
    6. import logging
    7. import re
    8. import time
    9. import gevent
    10. from websocket import create_connection
    11. from locust import User
    12. """自定义websocket客户端"""
    13. header = {
    14. 'Accept-Encoding': 'gzip, deflate, br',
    15. 'Connection': 'keep-alive',
    16. 'Content-Type': 'application/json;charset=UTF-8',
    17. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
    18. 'Chrome/87.0.4280.88 Safari/537.36 '
    19. }
    20. class SocketIO(object):
    21. _locust_environment = None
    22. start_time = 0
    23. def __init__(self):
    24. self.events = None
    25. self.ws = None
    26. def _require_failure(self, name, response_time, message, action, context, **kwargs):
    27. """统计失败请求数"""
    28. self.events.request_failure.fire(
    29. request_type="接收数据",
    30. name=name,
    31. response_time=response_time,
    32. response_length=len(message),
    33. exception=f'{action} Content Error!',
    34. context=context,
    35. )
    36. def _require_success(self, name, response_time, message, **kwargs):
    37. """统计成功请求数"""
    38. self.events.request_success.fire(
    39. request_type="接收数据",
    40. name=name,
    41. response_time=response_time,
    42. response_length=len(message),
    43. )
    44. def _require_fire(self, name, response_time, message, context, **kwargs):
    45. """通用统计请求数"""
    46. self.events.request.fire(
    47. request_type="接收数据",
    48. name=name,
    49. response_time=response_time,
    50. response_length=len(message),
    51. exception=None,
    52. context=context,
    53. )
    54. def connect(self, host):
    55. """建立连接"""
    56. self.ws = create_connection(host, header=header)
    57. gevent.spawn(self.receive)
    58. def receive(self, context=None):
    59. """接收事件下发数据"""
    60. message_regex = re.compile(r"(\d*)(.*)")
    61. while True:
    62. message = self.ws.recv()
    63. total_time = int((time.time() - self.start_time) * 1000) # 统计响应时间
    64. logging.info(f"WSR: {message}")
    65. m = message_regex.match(message)
    66. if m is None:
    67. raise Exception(f"got no matches in {message}")
    68. code = m.group(1) # 正则获取code
    69. json_string = m.group(2) # 正则获取下发数据
    70. if code == "0":
    71. name = "0 open"
    72. elif code == "3":
    73. name = "3 heartbeat"
    74. elif code == "40":
    75. name = "40 message ok"
    76. elif code == "42":
    77. """
    78. 根据具体的业务场景统计请求的成功或者失败情况
    79. 此处校验op操作如果length返回小于1则为失败
    80. """
    81. obj = json.loads(json_string)
    82. ts_type, payload = obj
    83. name = f"{code} {ts_type}"
    84. if 'join_ack' == ts_type:
    85. if payload['code'] == 1 or payload['code'] == 20000002:
    86. self._require_success(name, total_time, message)
    87. else:
    88. self._require_failure(name, total_time, message, ts_type, context)
    89. if 'operation_ack' == ts_type:
    90. """定义op操作成功、失败断言"""
    91. if payload['code'] == 1 and payload['length'] >= 1:
    92. self._require_success(name, total_time, message)
    93. else:
    94. self._require_failure(name, total_time, message, ts_type, context)
    95. elif 'set_doc_ack' == ts_type:
    96. """定义set_doc操作成功、失败断言"""
    97. if payload['code'] == 1:
    98. self._require_success(name, total_time, message)
    99. else:
    100. self._require_failure(name, total_time, message, ts_type, context)
    101. else:
    102. logging.info(f"Received unexpected message: {message}")
    103. continue
    104. if name in ('42 operation_ack', '42 set_doc_ack', '42 join_ack'):
    105. pass
    106. else:
    107. self._require_fire(name, total_time, message, context)
    108. def send(self, body, context=None):
    109. """发送数据"""
    110. # if context is None:
    111. # context = {}
    112. # if body == "2":
    113. # action = "2 heartbeat"
    114. # else:
    115. # m = re.compile(r"(\d*)(.*)").match(body)
    116. # assert m is not None
    117. # code = m.group(1)
    118. # action = m.group(2).split(',')[0].split('\"')[1]
    119. # action = f"{code} {action}"
    120. # self._require_fire(name=action, response_time=None, message=body, context=context)
    121. # logging.info(f"WSS: {body}")
    122. self.start_time = time.time() # 开始发送数据的时间
    123. self.ws.send(body)
    124. def sleep_with_heartbeat(self, seconds):
    125. """模拟心跳"""
    126. while seconds >= 0:
    127. gevent.sleep(min(25, seconds))
    128. seconds -= 25
    129. self.send("2")
    130. # logging.info('模拟心跳~')
    131. class SocketIOUser(User):
    132. abstract = True
    133. def __init__(self, *args, **kwargs):
    134. super(SocketIOUser, self).__init__(*args, **kwargs)
    135. self.client = SocketIO()
    136. self.client._locust_environment = self.environment
    137. self.client.events = self.environment.events

    测试脚本:

    1. # -*- coding: utf-8 -*-#
    2. # file: websock_op
    3. # Author: ShunZhe
    4. # Date: 2021/6/26
    5. import json
    6. import os
    7. import queue
    8. import uuid
    9. from gevent import sleep
    10. from gevent._semaphore import Semaphore, BoundedSemaphore
    11. from locust import task, TaskSet, events, constant
    12. from locust.log import setup_logging
    13. from locust.runners import MasterRunner
    14. from socket_client import SocketIOUser
    15. setup_logging("INFO", None)
    16. # all_locusts_spawned = Semaphore()
    17. all_locusts_spawned = BoundedSemaphore()
    18. all_locusts_spawned.acquire()
    19. @events.spawning_complete.add_listener
    20. def on_hatch_complete(**kwargs):
    21. all_locusts_spawned.release() # 创建集合点
    22. # events.spawning_complete.add_listener(on_hatch_complete) # 1.0之后的写法
    23. work_ids = queue.Queue()
    24. @events.init_command_line_parser.add_listener
    25. def add_user_parser(parser, **kwargs):
    26. """自定义locust命令参数"""
    27. parser.add_argument(
    28. "-fi",
    29. "--first_index",
    30. help="获取截取列表首索引"
    31. "[fc:]",
    32. default=0,
    33. type=str
    34. )
    35. parser.add_argument(
    36. "-li",
    37. "--last_index",
    38. help="获取截取列表末索引"
    39. "[:li]",
    40. default=0,
    41. type=str
    42. )
    43. args = parser.parse_args()
    44. if args.first_index:
    45. os.environ['first_index'] = args.first_index
    46. if args.last_index:
    47. os.environ['last_index'] = args.last_index
    48. @events.init.add_listener
    49. def on_locust_init(environment, **kwargs):
    50. print("当前环境变量first_index为: %s" % os.environ.get('first_index'))
    51. print("当前环境变量last_index为: %s" % os.environ.get('last_index'))
    52. work_id_data()
    53. print('初始化测试数据完毕')
    54. if isinstance(environment.runner, MasterRunner):
    55. print("master节点执行")
    56. else:
    57. print("worker节点执行")
    58. def work_id_data():
    59. """队列读取作品ID"""
    60. # work_ids = queue.Queue()
    61. with open('work_id.txt', 'r', encoding='utf-8') as f:
    62. for i in f.readlines()[int(os.environ.get('first_index')):int(os.environ.get('last_index'))]:
    63. work_ids.put_nowait(int(i.strip('\n')))
    64. return work_ids
    65. def version_id_data():
    66. """生成version_ids字典"""
    67. version_id = []
    68. for i in range(1000000):
    69. version_id.append(i)
    70. version_ids = dict(zip(version_id, version_id))
    71. return version_ids
    72. def read_doc():
    73. """获取doc字符串"""
    74. with open('set_doc.json', 'r', encoding="utf-8") as f:
    75. data = json.load(f)
    76. data = json.dumps(data)
    77. return ''.join(('42', data))
    78. def create_op_data():
    79. """op操作-创建用户同步"""
    80. with open('user_sync.json', 'r', encoding="utf-8") as f:
    81. data = json.load(f)
    82. data = json.dumps(data)
    83. return ''.join(('42', data))
    84. class ApiUser(TaskSet):
    85. """压测op操作"""
    86. index = 1
    87. @task(15)
    88. def test_op(self):
    89. """op操作"""
    90. all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态
    91. self.client.send('42["operation",{'
    92. f'"revision":{self.user.version_id.get(self.index)}'
    93. ',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'
    94. f'"{uuid.uuid1()}"'
    95. '}]]]}]')
    96. self.index += 1
    97. @task(1)
    98. def test_sleep_with_heartbeat(self):
    99. """心跳"""
    100. self.client.sleep_with_heartbeat(24)
    101. class JoinUser(TaskSet):
    102. """验证最大连接数"""
    103. index = 1
    104. @task(1)
    105. def test_op(self):
    106. """op操作"""
    107. sleep(10)
    108. all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态
    109. self.client.send('42["operation",{'
    110. f'"revision":{self.user.version_id.get(self.index)}'
    111. ',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'
    112. f'"{uuid.uuid1()}"'
    113. '}]]]}]')
    114. self.index += 1
    115. @task(1)
    116. def test_sleep_with_heartbeat(self):
    117. """心跳"""
    118. self.client.sleep_with_heartbeat(10)
    119. class WebSocketUser(SocketIOUser):
    120. host = "wss://press-socketcoll"
    121. # wait_time = between(0.2, 0.5) # 设置任务执行完毕等待0.2~0.5
    122. wait_time = constant(1) # 设置任务执行等待1s后开始下一个任务
    123. # wait_time = constant_pacing(1) # 设置任务启动总等待时间1s
    124. version_id = version_id_data() # op操作版本号
    125. doc_data = read_doc() # doc json
    126. user_op = create_op_data() # 创建用户同步
    127. tasks = {ApiUser}
    128. # tasks = {JoinUser} # 测试场景:最大连接数
    129. def on_start(self):
    130. work_id = work_ids.get() # 队列获取作品ID
    131. print('当前作品为: %s' % work_id)
    132. # work_id = 3515541 # 队列获取作品ID
    133. doc_data = self.doc_data # 作品doc
    134. user_op = self.user_op # 用户同步
    135. token = 'CDM5LTk2MjYwOWVhMTA0YSJ9.2k2QUywuWjjHu5P2I'
    136. url = f'wss://press-socketcoll/?work_id={work_id}&stag=1&token={token}&EIO=3&transport=websocket'
    137. self.client.connect(url)
    138. self.client.send(f'42["join",{work_id}]')
    139. sleep(1)
    140. self.client.send(doc_data)
    141. sleep(1)
    142. # 创建用户同步
    143. self.client.send(user_op)
    144. sleep(1)
    145. if __name__ == "__main__":
    146. file_name = os.path.abspath(__file__)
    147. os.system(f'locust -f {file_name} --master --first_index 0 --last_index 1000')
    148. # # setup Environment and Runner
    149. # env = Environment(user_classes=[WebSocketUser])
    150. # env.create_local_runner()
    151. # env.events.request_success.fire()
    152. # # start a WebUI instance
    153. # env.create_web_ui("127.0.0.1", 8089)
    154. # # start a greenlet that periodically outputs the current stats
    155. # gevent.spawn(stats_printer(env.stats))
    156. # #
    157. # # # start a greenlet that save current stats to history
    158. # gevent.spawn(stats_history, env.runner)
    159. # # start the test
    160. # env.runner.start(1000, spawn_rate=30)
    161. # # in 60 seconds stop the runner
    162. # gevent.spawn_later(30, lambda: env.runner.quit())
    163. # # wait for the greenlets
    164. # env.runner.greenlet.join()
    165. # # stop the web server for good measures
    166. # env.web_ui.stop()

    运行结果:

    image.png
    参考locust插件项目:https://github.com/SvenskaSpel/locust-plugins