resource_data.py

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*
    3. """
    4. hbase资源数据上报
    5. """
    6. import requests, json
    7. import pymysql
    8. from subprocess import check_output
    9. from datetime import datetime, timedelta
    10. from fastapi import FastAPI
    11. from sqlalchemy import create_engine, Column, String, Integer, FLOAT, Boolean
    12. from sqlalchemy.orm import sessionmaker
    13. from sqlalchemy.ext.declarative import declarative_base
    14. from sqlalchemy import and_, or_
    15. from logger import FastapiLoguru
    16. # API服务
    17. api = FastAPI()
    18. # 当前环境
    19. CURRENT_IP = check_output('hostname -i', shell=True)
    20. ENV = 'prd' if CURRENT_IP.startswith(b'10.117') or CURRENT_IP.startswith(b'10.150') else ('sit' if CURRENT_IP.startswith(b'10.203') else 'dev')
    21. DB_URL = "mysql+pymysql://bdpops:LTejd42ml6@bdpops-m.db.sfdc.com.cn:3306/bdpops?charset=utf8" if ENV == "prd" else "mysql+pymysql://bdpops:Oo7glydkwk@bdpops-m.dbsit.sfcloud.local:3306/bdpops?charset=utf8"
    22. # 日志处理
    23. logger = FastapiLoguru(ENV).get_logger()
    24. # MySQL连接引擎
    25. engine = create_engine(DB_URL)
    26. DBsession = sessionmaker(bind=engine)
    27. # MySQL连接会话
    28. session = DBsession()
    29. # ORM实体继承对象
    30. Base = declarative_base()
    31. # ORM实体类
    32. class HRMCluster(Base):
    33. """
    34. 集群信息表
    35. """
    36. __tablename__ = "hrm_cluster_info"
    37. id = Column(Integer, primary_key=True, index=True)
    38. clusterName = Column(String(50), nullable=False, comment="集群名称")
    39. clusterId = Column(String(50), nullable=False, comment="集群ID")
    40. clusterLoad = Column(String(16), default=None, comment="集群Load")
    41. clusterRequests = Column(FLOAT, default=None, comment="集群请求量")
    42. numRegionServers = Column(Integer, default=None, comment="集群节点数")
    43. numDeadRegionServers = Column(Integer, default=None, comment="集群Dead节点数")
    44. zookeeperQuorum = Column(String(100), default=None, comment="集群Zookeeper")
    45. liveRegionServers = Column(String(5000), default=None, comment="集群Live节点信息")
    46. deadRegionServers = Column(String(5000), default=None, comment="集群Dead节点信息")
    47. storageType = Column(String(20), comment="集群存储磁盘类型")
    48. class HRMRsgroup(Base):
    49. """
    50. 集群分组信息表
    51. """
    52. __tablename__ = "hrm_rsgroup_info"
    53. id = Column(Integer, primary_key=True, index=True)
    54. groupName = Column(String(50), nullable=False, comment="分组名称")
    55. groupDepartment = Column(String(500), default=None, comment="分组归属组织")
    56. groupUser = Column(String(200), default=None, comment="分组使用者")
    57. systemCode = Column(String(50), default=None, comment="系统编码")
    58. systemLevel = Column(String(20), default=None, comment="系统级别")
    59. clusterName = Column(String(50), nullable=False, comment="集群名称")
    60. numServers = Column(Integer, default=None, comment="分组节点数")
    61. numTables = Column(Integer, default=None, comment="分组表数量")
    62. region_capacity = Column(Integer, default=None, comment="分组region容量")
    63. region_used = Column(Integer, comment="分组region使用量")
    64. region_free = Column(Integer, comment="分组region可用量")
    65. region_utillization = Column(String(30), comment="分组region使用率")
    66. class HRMTable(Base):
    67. """
    68. 集群Hbase表信息表
    69. """
    70. __tablename__ = "hrm_table_info"
    71. id = Column(Integer, primary_key=True, index=True)
    72. fulltableName = Column(String(300), nullable=False, comment="表全名")
    73. namespcae = Column(String(50), nullable=False, comment="表命名空间")
    74. tableName = Column(String(300), nullable=False, comment="表名")
    75. tableRegions = Column(Integer, default=None, comment="表region数")
    76. isEnable = Column(Boolean, default=None, comment="是否在线")
    77. groupName = Column(String(50), nullable=False, comment="分组名称")
    78. clusterName = Column(String(50), nullable=False, comment="集群名称")
    79. tableDepartment = Column(String(500), default=None, comment="表归属组中")
    80. tableUser = Column(String(200), default=None, comment="表使用者")
    81. systemCode = Column(String(50), comment="系统编码")
    82. systemLevel = Column(String(20), comment="系统级别")
    83. overMatch = Column(Boolean, comment="表region数是否过多")
    84. space = Column(String(50), comment="表所占存储空间")
    85. dateMark = Column(String(20), comment="日期标识位")
    86. # Hbase资源数据采集上报函数
    87. def report_hbase_data():
    88. # 采集结果列表
    89. report_list = []
    90. # 固定上报项
    91. #classification = "realtime"
    92. #component = "hbase"
    93. consumer_classification = "bdpapp"
    94. consumption_frequency = "day"
    95. #consumption_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    96. consumption_time = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")
    97. # 今日日期
    98. todaydate = datetime.today().date()
    99. rsgroup = session.query(HRMRsgroup).all()
    100. for gn in rsgroup:
    101. cluster = session.query(HRMCluster).filter(HRMCluster.clusterName == gn.clusterName).one()
    102. volume_disk = cluster.storageType
    103. if gn.groupName == 'default' and not (str(gn.clusterName).startswith("ebil") or str(gn.clusterName).startswith("rdmp")):
    104. logger.info("Begin ------ 集群:{} 分组:{} 数据采集".format(gn.clusterName, gn.groupName))
    105. table = session.query(HRMTable).filter(and_(HRMTable.groupName == 'default', HRMTable.clusterName == gn.clusterName, HRMTable.dateMark == todaydate)).all()
    106. for t in table:
    107. report = {}
    108. #report['classification'] = classification
    109. #report['component'] = component
    110. report['consumer_classification'] = consumer_classification
    111. report['consumption_frequency'] = consumption_frequency
    112. report['consumption_time'] = consumption_time
    113. report['volume_unit'] = 'hbase_region'
    114. #report['volume_info'] = 'hbase集群名称_表名'
    115. #report['consumption_info'] = '表region数量'
    116. report['volume_area'] = "sit" if str(t.clusterName).rfind("test") != -1 else ( "prd" if str(t.clusterName).rfind("sc") != -1 else "dr")
    117. if t.systemCode is None or len(t.systemCode) == 0:
    118. logger.info("{} 该Table系统编码需要更新".format(t.fulltableName))
    119. continue
    120. #report['consumer_name'] = t.systemCode
    121. else:
    122. app_info = get_bdp_appid(t.systemCode)
    123. if app_info is not None:
    124. report['consumer_name'] = str(app_info['app_id'])
    125. else:
    126. logger.info("ROP 系统编码: {} 对应APP为空".format(t.systemCode))
    127. continue
    128. report['volume_name'] = t.clusterName + '_' + t.fulltableName
    129. report['volume_info'] = 'hbase集群:{}_公共分组表:{}'.format(t.clusterName, t.fulltableName)
    130. report['consumption_amount'] = t.tableRegions
    131. #report['consumption_info'] = '[消费组件:{}],[消费方:{}],[消费标识:hbase集群 {} | 公共分组表 {}],[消费说明:表当前分区数量],[消费量:{}]'.format(component, t.systemCode, t.clusterName, t.fulltableName, t.tableRegions)
    132. report['volume_disk'] = volume_disk
    133. report_list.append(report)
    134. elif gn.groupName != 'discard':
    135. logger.info("Begin ------ 集群:{} 分组:{} 数据采集".format(gn.clusterName, gn.groupName))
    136. report = {}
    137. #report['classification'] = classification
    138. #report['component'] = component
    139. report['consumer_classification'] = consumer_classification
    140. report['consumption_frequency'] = consumption_frequency
    141. report['consumption_time'] = consumption_time
    142. report['volume_unit'] = 'hbase_group'
    143. #report['volume_info'] = 'hbase集群名称_分组名'
    144. #report['consumption_info'] = '分组RegionServer节点数'
    145. report['volume_area'] = "sit" if str(gn.clusterName).rfind("test") != -1 else ( "prd" if str(gn.clusterName).rfind("sc") != -1 else "dr")
    146. if gn.systemCode is None or len(gn.systemCode) == 0:
    147. logger.info("{} 该RsGroup系统编码需要更新".format(gn.groupName))
    148. continue
    149. #report['consumer_name'] = gn.systemCode
    150. else:
    151. app_info = get_bdp_appid(gn.systemCode)
    152. if app_info is not None:
    153. report['consumer_name'] = str(app_info['app_id'])
    154. else:
    155. logger.info("ROP 系统编码: {} 对应APP为空".format(gn.systemCode))
    156. continue
    157. report['volume_name'] = gn.clusterName + '_' + gn.groupName
    158. report['volume_info'] = 'hbase集群:{}_独立分组:{}'.format(gn.clusterName, gn.groupName)
    159. report['consumption_amount'] = gn.numServers
    160. #report['consumption_info'] = '[消费组件:{}],[消费方:{}],[消费标识:hbase集群 {} | 独立分组 {}],[消费说明:分组当前服务器数量],[消费量:{}]'.format(component, gn.systemCode, gn.clusterName, gn.groupName, gn.numServers)
    161. report['volume_disk'] = volume_disk
    162. report_list.append(report)
    163. else:
    164. pass
    165. logger.info("End ------ 采集结束,采集条数: {}".format(len(report_list)))
    166. return report_list
    167. # 获取BDP平台对应应用ID
    168. def get_bdp_appid(systemcode):
    169. conn = pymysql.connect(
    170. host='rccprd-m.db.sfcloud.local',
    171. port=3306,
    172. user='rcc',
    173. password='LTejd42ml6',
    174. database='rcc',
    175. charset='utf8'
    176. )
    177. # 获取一个光标
    178. cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 返回字典数据类型
    179. # 定义将要执行的sql语句
    180. sql = "select system_code, app_id from rcc_v2_sys_app_mapping where system_code = '{}'".format(systemcode)
    181. # 拼接并执行sql语句
    182. cursor.execute(sql)
    183. # 取到查询结果
    184. ret = cursor.fetchone() # 取一条
    185. cursor.close()
    186. conn.close()
    187. # 可以获取指定数量的数据
    188. #cursor.fetchmany(3)
    189. # 光标按绝对位置移动1
    190. #cursor.scroll(1, mode="absolute")
    191. # 光标按照相对位置(当前位置)移动1
    192. #cursor.scroll(1, mode="relative")
    193. return ret
    194. # 数据上报
    195. def data_report(data):
    196. headers = {'content-type': 'application/json'}
    197. #request_url = "http://10.206.55.18:9527/api/v1.0/billings/collection"
    198. request_url = "http://10.216.28.150:9527/api/v2.0/volumes/collection"
    199. logger.info("Begin ------ 开始上报")
    200. #for record in data:
    201. logger.info("--------------- 分隔行 ----------------")
    202. logger.info("上报数据:{}".format(data))
    203. try:
    204. response = requests.post(request_url, data=json.dumps(data), headers=headers, timeout=(3, 2))
    205. logger.info("返回状态码:{}".format(response.status_code))
    206. logger.info("返回信息:{}".format(json.loads(response.text)))
    207. except Exception as e:
    208. logger.error(e)
    209. logger.info("End ------ 上报结束")
    210. if __name__ == "__main__":
    211. result = report_hbase_data()
    212. data_report(result)
    213. session.close()

    logger.py

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*
    3. import os
    4. import sys
    5. import time
    6. from loguru import logger
    7. """
    8. basedir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    9. # print(basedir)
    10. # 定位到log日志文件
    11. log_path = os.path.join(basedir, 'logs')
    12. if not os.path.exists(log_path):
    13. os.mkdir(log_path)
    14. log_path_error = os.path.join(log_path, f'error_{time.strftime("%Y-%m-%d")}.log')
    15. # 日志简单配置
    16. # 具体其他配置 可自行参考 https://github.com/Delgan/loguru
    17. logger.add(log_path_error, rotation="12:00", retention="5 days", enqueue=True)
    18. """
    19. class FastapiLoguru(object):
    20. def __init__(self, env: str):
    21. #self.base_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    22. self.base_path = os.getcwd()
    23. self.log_path = os.path.join(self.base_path, 'logs')
    24. if not os.path.exists(self.log_path):
    25. os.mkdir(self.log_path)
    26. self.log_file_path = os.path.join(
    27. self.log_path,
    28. f'log_{time.strftime("%Y-%m-%d")}.log'
    29. )
    30. #self.log_format = '<level>[{time:YYYY-MM-DD at HH:mm:ss}] {level} {message}</level>'
    31. self.log_format = '[{time:YYYY-MM-DD at HH:mm:ss}] {level} {message}'
    32. self._logger = logger
    33. self._logger.remove()
    34. if env == 'dev':
    35. self._logger.add(
    36. sys.stdout,
    37. colorize=True,
    38. format=self.log_format,
    39. backtrace=True,
    40. diagnose=True,
    41. level='DEBUG'
    42. )
    43. if env == 'sit':
    44. self._logger.add(
    45. self.log_file_path,
    46. colorize=True,
    47. format=self.log_format,
    48. rotation='50 MB',
    49. retention='7 days',
    50. enqueue=True,
    51. level='INFO'
    52. )
    53. if env == 'prd':
    54. self._logger.add(
    55. self.log_file_path,
    56. colorize=True,
    57. format=self.log_format,
    58. rotation='100 MB',
    59. retention='30 days',
    60. enqueue=True,
    61. level='INFO'
    62. )
    63. def get_logger(self) -> logger:
    64. return self._logger