from gevent import monkey
monkey.patch_all()
import time
from locust import TaskSet, task, constant_throughput, wait_time
from locust.contrib.fasthttp import FastHttpUser
from logzero import logger
class NotifyMsgApi(TaskSet):
def on_start(self):
self.headers = {
"Adapter-data": 'xxxx'
}
@task
def notify_msg(self):
params = {
"moduleType": "SHARE",
"moduleType": "LIVE",
"moduleType": "ALIGN",
}
start = time.perf_counter()
assert_data = ""
with self.client.get(
"/msg", headers=self.headers, params=params, catch_response=True, name="msg") as response:
if response.status_code == 200:
response.success()
# logger.info(response.json())
else:
response.failure()
logger.info(response.text)
assert_data = response.json().get('logId')
all_time = (time.perf_counter() - start) * 1000
if int(all_time) > 1000:
logger.info(f"all_time: {all_time}-- {assert_data}")
class UserTest(FastHttpUser):
host = "http://xxxx:8906"
wait_time = constant_throughput(1.1)
tasks = [NotifyMsgApi]
if __name__ == "__main__":
"""
ps -ef | grep locust | grep -v grep | grep msg_perf.py | awk '{print $2}' | xargs kill -9
"""
import os
from threading import Timer
import requests
import json
file_name = os.path.abspath(__file__)
logger.info("##################启动新压测进程########################")
os.system(f"nohup locust -f {file_name} --web-host=0.0.0.0 --master -P 8086 &")
for i in range(5):
os.system(f"nohup locust -f {file_name} --web-host=0.0.0.0 --worker &")
def stop():
logger.info("停止运行压测任务")
response = requests.get("http://x.x.x.x:8086/stop")
logger.info(f"operate stop perf task api: {response.text}")
logger.info("##################kill 进程########################")
if json.loads(response.text)["success"]:
os.system(
"nohup ps -ef | grep locust | grep -v grep | grep %s | awk '{print $2}' | xargs kill -9 &" % file_name
)
time = "00:30:20" # 设置压测时间
run_time = sum(x * int(t) for x, t in zip([3600, 60, 1], time.split(":")))
logger.info(f"perf task run_time: {run_time}")
t = Timer(run_time, stop)
t.start()