resource_data.py
#!/usr/bin/env python
# -*- coding: utf-8 -*
"""
hbase资源数据上报
"""
import requests, json
import pymysql
from subprocess import check_output
from datetime import datetime, timedelta
from fastapi import FastAPI
from sqlalchemy import create_engine, Column, String, Integer, FLOAT, Boolean
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import and_, or_
from logger import FastapiLoguru
# API服务
api = FastAPI()
# 当前环境
CURRENT_IP = check_output('hostname -i', shell=True)
ENV = 'prd' if CURRENT_IP.startswith(b'10.117') or CURRENT_IP.startswith(b'10.150') else ('sit' if CURRENT_IP.startswith(b'10.203') else 'dev')
DB_URL = "mysql+pymysql://bdpops:LTejd42ml6@bdpops-m.db.sfdc.com.cn:3306/bdpops?charset=utf8" if ENV == "prd" else "mysql+pymysql://bdpops:Oo7glydkwk@bdpops-m.dbsit.sfcloud.local:3306/bdpops?charset=utf8"
# 日志处理
logger = FastapiLoguru(ENV).get_logger()
# MySQL连接引擎
engine = create_engine(DB_URL)
DBsession = sessionmaker(bind=engine)
# MySQL连接会话
session = DBsession()
# ORM实体继承对象
Base = declarative_base()
# ORM实体类
class HRMCluster(Base):
"""
集群信息表
"""
__tablename__ = "hrm_cluster_info"
id = Column(Integer, primary_key=True, index=True)
clusterName = Column(String(50), nullable=False, comment="集群名称")
clusterId = Column(String(50), nullable=False, comment="集群ID")
clusterLoad = Column(String(16), default=None, comment="集群Load")
clusterRequests = Column(FLOAT, default=None, comment="集群请求量")
numRegionServers = Column(Integer, default=None, comment="集群节点数")
numDeadRegionServers = Column(Integer, default=None, comment="集群Dead节点数")
zookeeperQuorum = Column(String(100), default=None, comment="集群Zookeeper")
liveRegionServers = Column(String(5000), default=None, comment="集群Live节点信息")
deadRegionServers = Column(String(5000), default=None, comment="集群Dead节点信息")
storageType = Column(String(20), comment="集群存储磁盘类型")
class HRMRsgroup(Base):
"""
集群分组信息表
"""
__tablename__ = "hrm_rsgroup_info"
id = Column(Integer, primary_key=True, index=True)
groupName = Column(String(50), nullable=False, comment="分组名称")
groupDepartment = Column(String(500), default=None, comment="分组归属组织")
groupUser = Column(String(200), default=None, comment="分组使用者")
systemCode = Column(String(50), default=None, comment="系统编码")
systemLevel = Column(String(20), default=None, comment="系统级别")
clusterName = Column(String(50), nullable=False, comment="集群名称")
numServers = Column(Integer, default=None, comment="分组节点数")
numTables = Column(Integer, default=None, comment="分组表数量")
region_capacity = Column(Integer, default=None, comment="分组region容量")
region_used = Column(Integer, comment="分组region使用量")
region_free = Column(Integer, comment="分组region可用量")
region_utillization = Column(String(30), comment="分组region使用率")
class HRMTable(Base):
"""
集群Hbase表信息表
"""
__tablename__ = "hrm_table_info"
id = Column(Integer, primary_key=True, index=True)
fulltableName = Column(String(300), nullable=False, comment="表全名")
namespcae = Column(String(50), nullable=False, comment="表命名空间")
tableName = Column(String(300), nullable=False, comment="表名")
tableRegions = Column(Integer, default=None, comment="表region数")
isEnable = Column(Boolean, default=None, comment="是否在线")
groupName = Column(String(50), nullable=False, comment="分组名称")
clusterName = Column(String(50), nullable=False, comment="集群名称")
tableDepartment = Column(String(500), default=None, comment="表归属组中")
tableUser = Column(String(200), default=None, comment="表使用者")
systemCode = Column(String(50), comment="系统编码")
systemLevel = Column(String(20), comment="系统级别")
overMatch = Column(Boolean, comment="表region数是否过多")
space = Column(String(50), comment="表所占存储空间")
dateMark = Column(String(20), comment="日期标识位")
# Hbase资源数据采集上报函数
def report_hbase_data():
# 采集结果列表
report_list = []
# 固定上报项
#classification = "realtime"
#component = "hbase"
consumer_classification = "bdpapp"
consumption_frequency = "day"
#consumption_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
consumption_time = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")
# 今日日期
todaydate = datetime.today().date()
rsgroup = session.query(HRMRsgroup).all()
for gn in rsgroup:
cluster = session.query(HRMCluster).filter(HRMCluster.clusterName == gn.clusterName).one()
volume_disk = cluster.storageType
if gn.groupName == 'default' and not (str(gn.clusterName).startswith("ebil") or str(gn.clusterName).startswith("rdmp")):
logger.info("Begin ------ 集群:{} 分组:{} 数据采集".format(gn.clusterName, gn.groupName))
table = session.query(HRMTable).filter(and_(HRMTable.groupName == 'default', HRMTable.clusterName == gn.clusterName, HRMTable.dateMark == todaydate)).all()
for t in table:
report = {}
#report['classification'] = classification
#report['component'] = component
report['consumer_classification'] = consumer_classification
report['consumption_frequency'] = consumption_frequency
report['consumption_time'] = consumption_time
report['volume_unit'] = 'hbase_region'
#report['volume_info'] = 'hbase集群名称_表名'
#report['consumption_info'] = '表region数量'
report['volume_area'] = "sit" if str(t.clusterName).rfind("test") != -1 else ( "prd" if str(t.clusterName).rfind("sc") != -1 else "dr")
if t.systemCode is None or len(t.systemCode) == 0:
logger.info("{} 该Table系统编码需要更新".format(t.fulltableName))
continue
#report['consumer_name'] = t.systemCode
else:
app_info = get_bdp_appid(t.systemCode)
if app_info is not None:
report['consumer_name'] = str(app_info['app_id'])
else:
logger.info("ROP 系统编码: {} 对应APP为空".format(t.systemCode))
continue
report['volume_name'] = t.clusterName + '_' + t.fulltableName
report['volume_info'] = 'hbase集群:{}_公共分组表:{}'.format(t.clusterName, t.fulltableName)
report['consumption_amount'] = t.tableRegions
#report['consumption_info'] = '[消费组件:{}],[消费方:{}],[消费标识:hbase集群 {} | 公共分组表 {}],[消费说明:表当前分区数量],[消费量:{}]'.format(component, t.systemCode, t.clusterName, t.fulltableName, t.tableRegions)
report['volume_disk'] = volume_disk
report_list.append(report)
elif gn.groupName != 'discard':
logger.info("Begin ------ 集群:{} 分组:{} 数据采集".format(gn.clusterName, gn.groupName))
report = {}
#report['classification'] = classification
#report['component'] = component
report['consumer_classification'] = consumer_classification
report['consumption_frequency'] = consumption_frequency
report['consumption_time'] = consumption_time
report['volume_unit'] = 'hbase_group'
#report['volume_info'] = 'hbase集群名称_分组名'
#report['consumption_info'] = '分组RegionServer节点数'
report['volume_area'] = "sit" if str(gn.clusterName).rfind("test") != -1 else ( "prd" if str(gn.clusterName).rfind("sc") != -1 else "dr")
if gn.systemCode is None or len(gn.systemCode) == 0:
logger.info("{} 该RsGroup系统编码需要更新".format(gn.groupName))
continue
#report['consumer_name'] = gn.systemCode
else:
app_info = get_bdp_appid(gn.systemCode)
if app_info is not None:
report['consumer_name'] = str(app_info['app_id'])
else:
logger.info("ROP 系统编码: {} 对应APP为空".format(gn.systemCode))
continue
report['volume_name'] = gn.clusterName + '_' + gn.groupName
report['volume_info'] = 'hbase集群:{}_独立分组:{}'.format(gn.clusterName, gn.groupName)
report['consumption_amount'] = gn.numServers
#report['consumption_info'] = '[消费组件:{}],[消费方:{}],[消费标识:hbase集群 {} | 独立分组 {}],[消费说明:分组当前服务器数量],[消费量:{}]'.format(component, gn.systemCode, gn.clusterName, gn.groupName, gn.numServers)
report['volume_disk'] = volume_disk
report_list.append(report)
else:
pass
logger.info("End ------ 采集结束,采集条数: {}".format(len(report_list)))
return report_list
# 获取BDP平台对应应用ID
def get_bdp_appid(systemcode):
conn = pymysql.connect(
host='rccprd-m.db.sfcloud.local',
port=3306,
user='rcc',
password='LTejd42ml6',
database='rcc',
charset='utf8'
)
# 获取一个光标
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 返回字典数据类型
# 定义将要执行的sql语句
sql = "select system_code, app_id from rcc_v2_sys_app_mapping where system_code = '{}'".format(systemcode)
# 拼接并执行sql语句
cursor.execute(sql)
# 取到查询结果
ret = cursor.fetchone() # 取一条
cursor.close()
conn.close()
# 可以获取指定数量的数据
#cursor.fetchmany(3)
# 光标按绝对位置移动1
#cursor.scroll(1, mode="absolute")
# 光标按照相对位置(当前位置)移动1
#cursor.scroll(1, mode="relative")
return ret
# 数据上报
def data_report(data):
headers = {'content-type': 'application/json'}
#request_url = "http://10.206.55.18:9527/api/v1.0/billings/collection"
request_url = "http://10.216.28.150:9527/api/v2.0/volumes/collection"
logger.info("Begin ------ 开始上报")
#for record in data:
logger.info("--------------- 分隔行 ----------------")
logger.info("上报数据:{}".format(data))
try:
response = requests.post(request_url, data=json.dumps(data), headers=headers, timeout=(3, 2))
logger.info("返回状态码:{}".format(response.status_code))
logger.info("返回信息:{}".format(json.loads(response.text)))
except Exception as e:
logger.error(e)
logger.info("End ------ 上报结束")
if __name__ == "__main__":
result = report_hbase_data()
data_report(result)
session.close()
logger.py
#!/usr/bin/env python
# -*- coding: utf-8 -*
import os
import sys
import time
from loguru import logger
"""
basedir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(basedir)
# 定位到log日志文件
log_path = os.path.join(basedir, 'logs')
if not os.path.exists(log_path):
os.mkdir(log_path)
log_path_error = os.path.join(log_path, f'error_{time.strftime("%Y-%m-%d")}.log')
# 日志简单配置
# 具体其他配置 可自行参考 https://github.com/Delgan/loguru
logger.add(log_path_error, rotation="12:00", retention="5 days", enqueue=True)
"""
class FastapiLoguru(object):
def __init__(self, env: str):
#self.base_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
self.base_path = os.getcwd()
self.log_path = os.path.join(self.base_path, 'logs')
if not os.path.exists(self.log_path):
os.mkdir(self.log_path)
self.log_file_path = os.path.join(
self.log_path,
f'log_{time.strftime("%Y-%m-%d")}.log'
)
#self.log_format = '<level>[{time:YYYY-MM-DD at HH:mm:ss}] {level} {message}</level>'
self.log_format = '[{time:YYYY-MM-DD at HH:mm:ss}] {level} {message}'
self._logger = logger
self._logger.remove()
if env == 'dev':
self._logger.add(
sys.stdout,
colorize=True,
format=self.log_format,
backtrace=True,
diagnose=True,
level='DEBUG'
)
if env == 'sit':
self._logger.add(
self.log_file_path,
colorize=True,
format=self.log_format,
rotation='50 MB',
retention='7 days',
enqueue=True,
level='INFO'
)
if env == 'prd':
self._logger.add(
self.log_file_path,
colorize=True,
format=self.log_format,
rotation='100 MB',
retention='30 days',
enqueue=True,
level='INFO'
)
def get_logger(self) -> logger:
return self._logger