模拟一定数量的用户,同时并发请求。
from gevent._semaphore import BoundedSemaphore
# 创建集合点
all_locusts_spawned = BoundedSemaphore()
all_locusts_spawned.acquire()
# 注册事件
@events.spawning_complete.add_listener
def on_hatch_complete(**kwargs):
all_locusts_spawned.release()
class ApiUser(TaskSet):
"""压测op操作"""
index = 1
@task(15)
def test_op(self):
"""op操作"""
all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态
self.client.send('42["operation",{'
f'"revision":{self.user.version_id.get(self.index)}'
',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'
f'"{uuid.uuid1()}"'
'}]]]}]')
self.index += 1