模拟一定数量的用户,同时并发请求。

    1. from gevent._semaphore import BoundedSemaphore
    2. # 创建集合点
    3. all_locusts_spawned = BoundedSemaphore()
    4. all_locusts_spawned.acquire()
    5. # 注册事件
    6. @events.spawning_complete.add_listener
    7. def on_hatch_complete(**kwargs):
    8. all_locusts_spawned.release()
    9. class ApiUser(TaskSet):
    10. """压测op操作"""
    11. index = 1
    12. @task(15)
    13. def test_op(self):
    14. """op操作"""
    15. all_locusts_spawned.wait() # 限制在所有用户准备完成前处于等待状态
    16. self.client.send('42["operation",{'
    17. f'"revision":{self.user.version_id.get(self.index)}'
    18. ',"op":[[["extra","user_state","275127340","current_stage_actor",{"r":true,"i":'
    19. f'"{uuid.uuid1()}"'
    20. '}]]]}]')
    21. self.index += 1