from gevent import monkeymonkey.patch_all()import timefrom locust import TaskSet, task, constant_throughput, wait_timefrom locust.contrib.fasthttp import FastHttpUserfrom logzero import loggerclass NotifyMsgApi(TaskSet): def on_start(self): self.headers = { "Adapter-data": 'xxxx' } @task def notify_msg(self): params = { "moduleType": "SHARE", "moduleType": "LIVE", "moduleType": "ALIGN", } start = time.perf_counter() assert_data = "" with self.client.get( "/msg", headers=self.headers, params=params, catch_response=True, name="msg") as response: if response.status_code == 200: response.success() # logger.info(response.json()) else: response.failure() logger.info(response.text) assert_data = response.json().get('logId') all_time = (time.perf_counter() - start) * 1000 if int(all_time) > 1000: logger.info(f"all_time: {all_time}-- {assert_data}")class UserTest(FastHttpUser): host = "http://xxxx:8906" wait_time = constant_throughput(1.1) tasks = [NotifyMsgApi]if __name__ == "__main__": """ ps -ef | grep locust | grep -v grep | grep msg_perf.py | awk '{print $2}' | xargs kill -9 """ import os from threading import Timer import requests import json file_name = os.path.abspath(__file__) logger.info("##################启动新压测进程########################") os.system(f"nohup locust -f {file_name} --web-host=0.0.0.0 --master -P 8086 &") for i in range(5): os.system(f"nohup locust -f {file_name} --web-host=0.0.0.0 --worker &") def stop(): logger.info("停止运行压测任务") response = requests.get("http://x.x.x.x:8086/stop") logger.info(f"operate stop perf task api: {response.text}") logger.info("##################kill 进程########################") if json.loads(response.text)["success"]: os.system( "nohup ps -ef | grep locust | grep -v grep | grep %s | awk '{print $2}' | xargs kill -9 &" % file_name ) time = "00:30:20" # 设置压测时间 run_time = sum(x * int(t) for x, t in zip([3600, 60, 1], time.split(":"))) logger.info(f"perf task run_time: {run_time}") t = Timer(run_time, stop) t.start()