resource_data.py
#!/usr/bin/env python# -*- coding: utf-8 -*"""hbase资源数据上报"""import requests, jsonimport pymysqlfrom subprocess import check_outputfrom datetime import datetime, timedeltafrom fastapi import FastAPIfrom sqlalchemy import create_engine, Column, String, Integer, FLOAT, Booleanfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import and_, or_from logger import FastapiLoguru# API服务api = FastAPI()# 当前环境CURRENT_IP = check_output('hostname -i', shell=True)ENV = 'prd' if CURRENT_IP.startswith(b'10.117') or CURRENT_IP.startswith(b'10.150') else ('sit' if CURRENT_IP.startswith(b'10.203') else 'dev')DB_URL = "mysql+pymysql://bdpops:LTejd42ml6@bdpops-m.db.sfdc.com.cn:3306/bdpops?charset=utf8" if ENV == "prd" else "mysql+pymysql://bdpops:Oo7glydkwk@bdpops-m.dbsit.sfcloud.local:3306/bdpops?charset=utf8"# 日志处理logger = FastapiLoguru(ENV).get_logger()# MySQL连接引擎engine = create_engine(DB_URL)DBsession = sessionmaker(bind=engine)# MySQL连接会话session = DBsession()# ORM实体继承对象Base = declarative_base()# ORM实体类class HRMCluster(Base):"""集群信息表"""__tablename__ = "hrm_cluster_info"id = Column(Integer, primary_key=True, index=True)clusterName = Column(String(50), nullable=False, comment="集群名称")clusterId = Column(String(50), nullable=False, comment="集群ID")clusterLoad = Column(String(16), default=None, comment="集群Load")clusterRequests = Column(FLOAT, default=None, comment="集群请求量")numRegionServers = Column(Integer, default=None, comment="集群节点数")numDeadRegionServers = Column(Integer, default=None, comment="集群Dead节点数")zookeeperQuorum = Column(String(100), default=None, comment="集群Zookeeper")liveRegionServers = Column(String(5000), default=None, comment="集群Live节点信息")deadRegionServers = Column(String(5000), default=None, comment="集群Dead节点信息")storageType = Column(String(20), comment="集群存储磁盘类型")class HRMRsgroup(Base):"""集群分组信息表"""__tablename__ = "hrm_rsgroup_info"id = Column(Integer, primary_key=True, index=True)groupName = Column(String(50), nullable=False, comment="分组名称")groupDepartment = Column(String(500), default=None, comment="分组归属组织")groupUser = Column(String(200), default=None, comment="分组使用者")systemCode = Column(String(50), default=None, comment="系统编码")systemLevel = Column(String(20), default=None, comment="系统级别")clusterName = Column(String(50), nullable=False, comment="集群名称")numServers = Column(Integer, default=None, comment="分组节点数")numTables = Column(Integer, default=None, comment="分组表数量")region_capacity = Column(Integer, default=None, comment="分组region容量")region_used = Column(Integer, comment="分组region使用量")region_free = Column(Integer, comment="分组region可用量")region_utillization = Column(String(30), comment="分组region使用率")class HRMTable(Base):"""集群Hbase表信息表"""__tablename__ = "hrm_table_info"id = Column(Integer, primary_key=True, index=True)fulltableName = Column(String(300), nullable=False, comment="表全名")namespcae = Column(String(50), nullable=False, comment="表命名空间")tableName = Column(String(300), nullable=False, comment="表名")tableRegions = Column(Integer, default=None, comment="表region数")isEnable = Column(Boolean, default=None, comment="是否在线")groupName = Column(String(50), nullable=False, comment="分组名称")clusterName = Column(String(50), nullable=False, comment="集群名称")tableDepartment = Column(String(500), default=None, comment="表归属组中")tableUser = Column(String(200), default=None, comment="表使用者")systemCode = Column(String(50), comment="系统编码")systemLevel = Column(String(20), comment="系统级别")overMatch = Column(Boolean, comment="表region数是否过多")space = Column(String(50), comment="表所占存储空间")dateMark = Column(String(20), comment="日期标识位")# Hbase资源数据采集上报函数def report_hbase_data():# 采集结果列表report_list = []# 固定上报项#classification = "realtime"#component = "hbase"consumer_classification = "bdpapp"consumption_frequency = "day"#consumption_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")consumption_time = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")# 今日日期todaydate = datetime.today().date()rsgroup = session.query(HRMRsgroup).all()for gn in rsgroup:cluster = session.query(HRMCluster).filter(HRMCluster.clusterName == gn.clusterName).one()volume_disk = cluster.storageTypeif gn.groupName == 'default' and not (str(gn.clusterName).startswith("ebil") or str(gn.clusterName).startswith("rdmp")):logger.info("Begin ------ 集群:{} 分组:{} 数据采集".format(gn.clusterName, gn.groupName))table = session.query(HRMTable).filter(and_(HRMTable.groupName == 'default', HRMTable.clusterName == gn.clusterName, HRMTable.dateMark == todaydate)).all()for t in table:report = {}#report['classification'] = classification#report['component'] = componentreport['consumer_classification'] = consumer_classificationreport['consumption_frequency'] = consumption_frequencyreport['consumption_time'] = consumption_timereport['volume_unit'] = 'hbase_region'#report['volume_info'] = 'hbase集群名称_表名'#report['consumption_info'] = '表region数量'report['volume_area'] = "sit" if str(t.clusterName).rfind("test") != -1 else ( "prd" if str(t.clusterName).rfind("sc") != -1 else "dr")if t.systemCode is None or len(t.systemCode) == 0:logger.info("{} 该Table系统编码需要更新".format(t.fulltableName))continue#report['consumer_name'] = t.systemCodeelse:app_info = get_bdp_appid(t.systemCode)if app_info is not None:report['consumer_name'] = str(app_info['app_id'])else:logger.info("ROP 系统编码: {} 对应APP为空".format(t.systemCode))continuereport['volume_name'] = t.clusterName + '_' + t.fulltableNamereport['volume_info'] = 'hbase集群:{}_公共分组表:{}'.format(t.clusterName, t.fulltableName)report['consumption_amount'] = t.tableRegions#report['consumption_info'] = '[消费组件:{}],[消费方:{}],[消费标识:hbase集群 {} | 公共分组表 {}],[消费说明:表当前分区数量],[消费量:{}]'.format(component, t.systemCode, t.clusterName, t.fulltableName, t.tableRegions)report['volume_disk'] = volume_diskreport_list.append(report)elif gn.groupName != 'discard':logger.info("Begin ------ 集群:{} 分组:{} 数据采集".format(gn.clusterName, gn.groupName))report = {}#report['classification'] = classification#report['component'] = componentreport['consumer_classification'] = consumer_classificationreport['consumption_frequency'] = consumption_frequencyreport['consumption_time'] = consumption_timereport['volume_unit'] = 'hbase_group'#report['volume_info'] = 'hbase集群名称_分组名'#report['consumption_info'] = '分组RegionServer节点数'report['volume_area'] = "sit" if str(gn.clusterName).rfind("test") != -1 else ( "prd" if str(gn.clusterName).rfind("sc") != -1 else "dr")if gn.systemCode is None or len(gn.systemCode) == 0:logger.info("{} 该RsGroup系统编码需要更新".format(gn.groupName))continue#report['consumer_name'] = gn.systemCodeelse:app_info = get_bdp_appid(gn.systemCode)if app_info is not None:report['consumer_name'] = str(app_info['app_id'])else:logger.info("ROP 系统编码: {} 对应APP为空".format(gn.systemCode))continuereport['volume_name'] = gn.clusterName + '_' + gn.groupNamereport['volume_info'] = 'hbase集群:{}_独立分组:{}'.format(gn.clusterName, gn.groupName)report['consumption_amount'] = gn.numServers#report['consumption_info'] = '[消费组件:{}],[消费方:{}],[消费标识:hbase集群 {} | 独立分组 {}],[消费说明:分组当前服务器数量],[消费量:{}]'.format(component, gn.systemCode, gn.clusterName, gn.groupName, gn.numServers)report['volume_disk'] = volume_diskreport_list.append(report)else:passlogger.info("End ------ 采集结束,采集条数: {}".format(len(report_list)))return report_list# 获取BDP平台对应应用IDdef get_bdp_appid(systemcode):conn = pymysql.connect(host='rccprd-m.db.sfcloud.local',port=3306,user='rcc',password='LTejd42ml6',database='rcc',charset='utf8')# 获取一个光标cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 返回字典数据类型# 定义将要执行的sql语句sql = "select system_code, app_id from rcc_v2_sys_app_mapping where system_code = '{}'".format(systemcode)# 拼接并执行sql语句cursor.execute(sql)# 取到查询结果ret = cursor.fetchone() # 取一条cursor.close()conn.close()# 可以获取指定数量的数据#cursor.fetchmany(3)# 光标按绝对位置移动1#cursor.scroll(1, mode="absolute")# 光标按照相对位置(当前位置)移动1#cursor.scroll(1, mode="relative")return ret# 数据上报def data_report(data):headers = {'content-type': 'application/json'}#request_url = "http://10.206.55.18:9527/api/v1.0/billings/collection"request_url = "http://10.216.28.150:9527/api/v2.0/volumes/collection"logger.info("Begin ------ 开始上报")#for record in data:logger.info("--------------- 分隔行 ----------------")logger.info("上报数据:{}".format(data))try:response = requests.post(request_url, data=json.dumps(data), headers=headers, timeout=(3, 2))logger.info("返回状态码:{}".format(response.status_code))logger.info("返回信息:{}".format(json.loads(response.text)))except Exception as e:logger.error(e)logger.info("End ------ 上报结束")if __name__ == "__main__":result = report_hbase_data()data_report(result)session.close()
logger.py
#!/usr/bin/env python# -*- coding: utf-8 -*import osimport sysimport timefrom loguru import logger"""basedir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))# print(basedir)# 定位到log日志文件log_path = os.path.join(basedir, 'logs')if not os.path.exists(log_path):os.mkdir(log_path)log_path_error = os.path.join(log_path, f'error_{time.strftime("%Y-%m-%d")}.log')# 日志简单配置# 具体其他配置 可自行参考 https://github.com/Delgan/logurulogger.add(log_path_error, rotation="12:00", retention="5 days", enqueue=True)"""class FastapiLoguru(object):def __init__(self, env: str):#self.base_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))self.base_path = os.getcwd()self.log_path = os.path.join(self.base_path, 'logs')if not os.path.exists(self.log_path):os.mkdir(self.log_path)self.log_file_path = os.path.join(self.log_path,f'log_{time.strftime("%Y-%m-%d")}.log')#self.log_format = '<level>[{time:YYYY-MM-DD at HH:mm:ss}] {level} {message}</level>'self.log_format = '[{time:YYYY-MM-DD at HH:mm:ss}] {level} {message}'self._logger = loggerself._logger.remove()if env == 'dev':self._logger.add(sys.stdout,colorize=True,format=self.log_format,backtrace=True,diagnose=True,level='DEBUG')if env == 'sit':self._logger.add(self.log_file_path,colorize=True,format=self.log_format,rotation='50 MB',retention='7 days',enqueue=True,level='INFO')if env == 'prd':self._logger.add(self.log_file_path,colorize=True,format=self.log_format,rotation='100 MB',retention='30 days',enqueue=True,level='INFO')def get_logger(self) -> logger:return self._logger
