Dockerfile、CronJob、Deployment

    1. FROM python:3.7.9
    2. COPY requirements.txt xjob_alert.py /
    3. RUN apt-get update && apt-get -y install cron vim
    4. RUN pip install --no-cache-dir -r requirements.txt
    5. RUN chmod 0644 xjob_alert.py
    6. RUN touch /var/log/xfep.log /var/log/xpair.log /var/log/status2.log /var/log/status109.log
    7. RUN crontab -l | { cat ; echo "*/3 * * * * /usr/local/bin/python /xjob_alert.py xpair >>/var/log/xpair.log 2>&1"; } |crontab -
    8. RUN crontab -l | { cat ; echo "*/10 * * * * /usr/local/bin/python /xjob_alert.py xfep >>/var/log/xfep.log 2>&1"; } |crontab -
    9. RUN crontab -l | { cat ; echo "*/5 * * * * /usr/local/bin/python /xjob_alert.py status2 >>/var/log/status2.log 2>&1"; } |crontab -
    10. RUN crontab -l | { cat ; echo "0 */6 * * * /usr/local/bin/python /xjob_alert.py status109 >>/var/log/status109.log 2>&1"; } |crontab -
    11. CMD cron && tail -f /var/log/xpair.log
    1. ---
    2. apiVersion: batch/v1beta1
    3. kind: CronJob
    4. metadata:
    5. name: xpair-alert
    6. spec:
    7. schedule: "*/5 * * * *"
    8. successfulJobsHistoryLimit: 3
    9. failedJobsHistoryLimit: 3
    10. jobTemplate:
    11. spec:
    12. template:
    13. spec:
    14. containers:
    15. - name: xjob-alert
    16. image: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.2
    17. imagePullPolicy: IfNotPresent
    18. command:
    19. - /usr/bin/python
    20. - xjob_alert.py
    21. - xpair
    22. restartPolicy: OnFailure
    23. ---
    24. apiVersion: batch/v1beta1
    25. kind: CronJob
    26. metadata:
    27. name: xfep-alert
    28. spec:
    29. schedule: "*/30 * * * *"
    30. successfulJobsHistoryLimit: 3
    31. failedJobsHistoryLimit: 3
    32. jobTemplate:
    33. spec:
    34. template:
    35. spec:
    36. containers:
    37. - name: xjob-alert
    38. image: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.2
    39. imagePullPolicy: IfNotPresent
    40. command:
    41. - /usr/bin/python
    42. - xjob_alert.py
    43. - xfep
    44. restartPolicy: OnFailure
    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. labels:
    5. app: xjob-alert
    6. name: xjob-alert
    7. namespace: production
    8. spec:
    9. replicas: 1
    10. revisionHistoryLimit: 10
    11. selector:
    12. matchLabels:
    13. app: xjob-alert
    14. template:
    15. metadata:
    16. labels:
    17. app: xjob-alert
    18. spec:
    19. containers:
    20. - name: xjob-alert
    21. image: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.3
    22. imagePullPolicy: Always
    23. restartPolicy: Always
    24. imagePullSecrets:
    25. - name: regcred
    1. #!/usr/bin/python
    2. # -*- coding: UTF-8 -*-
    3. import psycopg2
    4. import requests as requests
    5. from psycopg2 import sql
    6. import json, os, time, sys
    7. class NewDrugAlert:
    8. def __init__(self, env):
    9. if env == 'pro':
    10. self.parms = {
    11. "database": "anakin",
    12. "user": "anakin",
    13. "password": "81b88ebb893",
    14. "host": "luke-pro.ct9zn8ktuvkc.us-east-1.rds.amazonaws.com",
    15. "port": "5432",
    16. }
    17. if env == 'dev' or env == '':
    18. self.parms = {
    19. "database": "anakin",
    20. "user": "postgres",
    21. "password": "xtalpi123",
    22. "host": "luke-postgres.development.svc",
    23. "port": "5432",
    24. }
    25. def job(self,sql,job_type):
    26. conn = psycopg2.connect(**self.parms)
    27. cur = conn.cursor()
    28. cur.execute(sql)
    29. rows = cur.fetchall()
    30. conn.close()
    31. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),":",job_type,"job select results:",rows)
    32. return rows
    33. def alert(self,msg,job_type,bussiness):
    34. #test:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/e9a0edf5-46f2-44c4-8b67-b2d12a8579e2'
    35. #guyan:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/4663ac9b-bb95-4fbc-9124-c83e347a5f89'
    36. #xinyao:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/8d738505-682a-4fc7-8cb0-21c5ea1e2d2a'
    37. if bussiness == "guyan":
    38. fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/4663ac9b-bb95-4fbc-9124-c83e347a5f89'
    39. elif bussiness == "xinyao":
    40. fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/8d738505-682a-4fc7-8cb0-21c5ea1e2d2a'
    41. headers = {'Content-Type': 'application/json;charset=utf-8'}
    42. text = {"msg_type": "text", "content": {"msg":"","text":msg}}
    43. res = requests.post(fs_url, json.dumps(text), headers=headers).content
    44. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),":",job_type,"job alert results:",res)
    45. return res
    46. def xpair(self):
    47. job_type = "xpair"
    48. sql = "select handle,\"user\",status,cost_dimension,\
    49. date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) as time_diff \
    50. from job where status in (1,2,3) \
    51. and image LIKE '%tasksys_fep_map_py%' \
    52. and \"group\"::text LIKE '%xpair%' \
    53. and date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) >= 5 \
    54. order by create_at desc;"
    55. job_res=NewDrugAlert.job(self,sql,job_type)
    56. alert_msg = ''
    57. over_5 = "xpair任务1、2、3状态超过5分钟:"
    58. over_10 = "xpair任务1、2、3状态超过10分钟:"
    59. over_20 = "xpair任务1、2、3状态超过20分钟:"
    60. if job_res:
    61. for m in job_res:
    62. if m[-1] >= 5 and m[-1] < 10:
    63. over_5 = over_5 + "\n" + str(m)
    64. elif m[-1] >= 10 and m[-1] < 20:
    65. over_10 = over_10 + "\n" + str(m)
    66. else:
    67. over_20 = over_20 + "\n" + str(m)
    68. alert_msg = alert_msg + over_20 + "\n" + over_10 + "\n" + over_5
    69. xpair_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")
    70. print(alert_msg)
    71. def xfep(self):
    72. job_type = "xfep"
    73. sql = "select handle,\"user\",status,cost_dimension,\
    74. date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) as time_diff \
    75. from job where cluster_id in ('gcloud','DefaultGPU') \
    76. and status in (1,2,3) \
    77. and image like '%tasksys_fep_py3%' \
    78. and date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) > 30 \
    79. order by time_diff desc;"
    80. job_res=NewDrugAlert.job(self,sql,job_type)
    81. alert_msg = 'xfep任务1、2、3状态持续超过30分钟:'
    82. if job_res:
    83. for m in job_res:
    84. alert_msg = alert_msg + "\n" + str(m)
    85. xfep_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")
    86. print(alert_msg)
    87. def status2(self):
    88. job_type = "status 2"
    89. sql = "select handle,\"user\",status,cluster_id,cost_dimension,job_times from job \
    90. where status in (2) \
    91. and image like '%tasksys_fep_py3%' \
    92. and cluster_id in ('gcloud','DefaultGPU') \
    93. order by status desc, job_times asc"
    94. job_res=NewDrugAlert.job(self,sql,job_type)
    95. now_time = time.time()
    96. alert_msg = 'gcloud、DefaultGPU集群2状态持续超过30分钟(请确认队列是否阻塞):'
    97. if job_res:
    98. for m in job_res:
    99. time_diff = int((now_time - m[-1][-1][-1]) / 60)
    100. if time_diff > 30:
    101. alert_msg = alert_msg + "\n" + str(m[0]) + "," + str(m[1]) + "," + str(m[3]) + "," + str(m[4]) + "," + str(time_diff) + "min"
    102. if alert_msg.count("xtalpi") > 0:
    103. status2_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")
    104. print(alert_msg)
    105. def status109(self):
    106. job_type = "status 109"
    107. sql = "select handle,\"user\",status,cluster_id,cost_dimension, \
    108. to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS') as create_at,job_times from job \
    109. where status in (4,109) \
    110. and job_times::text like '%109%' \
    111. and date_part('day',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) < 30\
    112. order by create_at desc"
    113. job_res=NewDrugAlert.job(self,sql,job_type)
    114. alert_msg = "近一个月有被驱逐过记录的且目前未终止状态的handle:"
    115. if job_res:
    116. for row in job_res:
    117. for m in row[-1]:
    118. if m[0] == 109:
    119. alert_msg = alert_msg + "\n" + str(row[0])\
    120. + "," + str(row[1]) + "," + str(row[2]) + "," + str(row[5])
    121. break
    122. status109_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="guyan")
    123. print(alert_msg)
    124. if __name__ == '__main__':
    125. LUKE_ENV = 'pro'
    126. alert = NewDrugAlert(LUKE_ENV)
    127. if sys.argv[1] and sys.argv[1] == "xfep":
    128. alert.xfep()
    129. elif sys.argv[1] and sys.argv[1] == "xpair":
    130. alert.xpair()
    131. elif sys.argv[1] and sys.argv[1] == "status2":
    132. alert.status2()
    133. elif sys.argv[1] and sys.argv[1] == "status109":
    134. alert.status109()