Dockerfile、CronJob、Deployment
FROM python:3.7.9
COPY requirements.txt xjob_alert.py /
RUN apt-get update && apt-get -y install cron vim
RUN pip install --no-cache-dir -r requirements.txt
RUN chmod 0644 xjob_alert.py
RUN touch /var/log/xfep.log /var/log/xpair.log /var/log/status2.log /var/log/status109.log
RUN crontab -l | { cat ; echo "*/3 * * * * /usr/local/bin/python /xjob_alert.py xpair >>/var/log/xpair.log 2>&1"; } |crontab -
RUN crontab -l | { cat ; echo "*/10 * * * * /usr/local/bin/python /xjob_alert.py xfep >>/var/log/xfep.log 2>&1"; } |crontab -
RUN crontab -l | { cat ; echo "*/5 * * * * /usr/local/bin/python /xjob_alert.py status2 >>/var/log/status2.log 2>&1"; } |crontab -
RUN crontab -l | { cat ; echo "0 */6 * * * /usr/local/bin/python /xjob_alert.py status109 >>/var/log/status109.log 2>&1"; } |crontab -
CMD cron && tail -f /var/log/xpair.log
---
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: xpair-alert
spec:
schedule: "*/5 * * * *"
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
containers:
- name: xjob-alert
image: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.2
imagePullPolicy: IfNotPresent
command:
- /usr/bin/python
- xjob_alert.py
- xpair
restartPolicy: OnFailure
---
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: xfep-alert
spec:
schedule: "*/30 * * * *"
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
containers:
- name: xjob-alert
image: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.2
imagePullPolicy: IfNotPresent
command:
- /usr/bin/python
- xjob_alert.py
- xfep
restartPolicy: OnFailure
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: xjob-alert
name: xjob-alert
namespace: production
spec:
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
app: xjob-alert
template:
metadata:
labels:
app: xjob-alert
spec:
containers:
- name: xjob-alert
image: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.3
imagePullPolicy: Always
restartPolicy: Always
imagePullSecrets:
- name: regcred
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import psycopg2
import requests as requests
from psycopg2 import sql
import json, os, time, sys
class NewDrugAlert:
def __init__(self, env):
if env == 'pro':
self.parms = {
"database": "anakin",
"user": "anakin",
"password": "81b88ebb893",
"host": "luke-pro.ct9zn8ktuvkc.us-east-1.rds.amazonaws.com",
"port": "5432",
}
if env == 'dev' or env == '':
self.parms = {
"database": "anakin",
"user": "postgres",
"password": "xtalpi123",
"host": "luke-postgres.development.svc",
"port": "5432",
}
def job(self,sql,job_type):
conn = psycopg2.connect(**self.parms)
cur = conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
conn.close()
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),":",job_type,"job select results:",rows)
return rows
def alert(self,msg,job_type,bussiness):
#test:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/e9a0edf5-46f2-44c4-8b67-b2d12a8579e2'
#guyan:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/4663ac9b-bb95-4fbc-9124-c83e347a5f89'
#xinyao:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/8d738505-682a-4fc7-8cb0-21c5ea1e2d2a'
if bussiness == "guyan":
fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/4663ac9b-bb95-4fbc-9124-c83e347a5f89'
elif bussiness == "xinyao":
fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/8d738505-682a-4fc7-8cb0-21c5ea1e2d2a'
headers = {'Content-Type': 'application/json;charset=utf-8'}
text = {"msg_type": "text", "content": {"msg":"","text":msg}}
res = requests.post(fs_url, json.dumps(text), headers=headers).content
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),":",job_type,"job alert results:",res)
return res
def xpair(self):
job_type = "xpair"
sql = "select handle,\"user\",status,cost_dimension,\
date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) as time_diff \
from job where status in (1,2,3) \
and image LIKE '%tasksys_fep_map_py%' \
and \"group\"::text LIKE '%xpair%' \
and date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) >= 5 \
order by create_at desc;"
job_res=NewDrugAlert.job(self,sql,job_type)
alert_msg = ''
over_5 = "xpair任务1、2、3状态超过5分钟:"
over_10 = "xpair任务1、2、3状态超过10分钟:"
over_20 = "xpair任务1、2、3状态超过20分钟:"
if job_res:
for m in job_res:
if m[-1] >= 5 and m[-1] < 10:
over_5 = over_5 + "\n" + str(m)
elif m[-1] >= 10 and m[-1] < 20:
over_10 = over_10 + "\n" + str(m)
else:
over_20 = over_20 + "\n" + str(m)
alert_msg = alert_msg + over_20 + "\n" + over_10 + "\n" + over_5
xpair_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")
print(alert_msg)
def xfep(self):
job_type = "xfep"
sql = "select handle,\"user\",status,cost_dimension,\
date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) as time_diff \
from job where cluster_id in ('gcloud','DefaultGPU') \
and status in (1,2,3) \
and image like '%tasksys_fep_py3%' \
and date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) > 30 \
order by time_diff desc;"
job_res=NewDrugAlert.job(self,sql,job_type)
alert_msg = 'xfep任务1、2、3状态持续超过30分钟:'
if job_res:
for m in job_res:
alert_msg = alert_msg + "\n" + str(m)
xfep_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")
print(alert_msg)
def status2(self):
job_type = "status 2"
sql = "select handle,\"user\",status,cluster_id,cost_dimension,job_times from job \
where status in (2) \
and image like '%tasksys_fep_py3%' \
and cluster_id in ('gcloud','DefaultGPU') \
order by status desc, job_times asc"
job_res=NewDrugAlert.job(self,sql,job_type)
now_time = time.time()
alert_msg = 'gcloud、DefaultGPU集群2状态持续超过30分钟(请确认队列是否阻塞):'
if job_res:
for m in job_res:
time_diff = int((now_time - m[-1][-1][-1]) / 60)
if time_diff > 30:
alert_msg = alert_msg + "\n" + str(m[0]) + "," + str(m[1]) + "," + str(m[3]) + "," + str(m[4]) + "," + str(time_diff) + "min"
if alert_msg.count("xtalpi") > 0:
status2_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")
print(alert_msg)
def status109(self):
job_type = "status 109"
sql = "select handle,\"user\",status,cluster_id,cost_dimension, \
to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS') as create_at,job_times from job \
where status in (4,109) \
and job_times::text like '%109%' \
and date_part('day',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) < 30\
order by create_at desc"
job_res=NewDrugAlert.job(self,sql,job_type)
alert_msg = "近一个月有被驱逐过记录的且目前未终止状态的handle:"
if job_res:
for row in job_res:
for m in row[-1]:
if m[0] == 109:
alert_msg = alert_msg + "\n" + str(row[0])\
+ "," + str(row[1]) + "," + str(row[2]) + "," + str(row[5])
break
status109_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="guyan")
print(alert_msg)
if __name__ == '__main__':
LUKE_ENV = 'pro'
alert = NewDrugAlert(LUKE_ENV)
if sys.argv[1] and sys.argv[1] == "xfep":
alert.xfep()
elif sys.argv[1] and sys.argv[1] == "xpair":
alert.xpair()
elif sys.argv[1] and sys.argv[1] == "status2":
alert.status2()
elif sys.argv[1] and sys.argv[1] == "status109":
alert.status109()