https://gitee.com/muyuOvO/DMdatabase/blob/master/python%E5%AE%9E%E7%8E%B0%E6%8B%BC%E8%A3%85dmfldr#
# -*- coding: utf-8 -*-
# @Author : muyuOvO
# @Link :
# @Date : 2022/2/15
import psutil
import os
import time
curr_path = os.getcwd() # 程序主路径
# os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.AL32UTF8'
os.environ['path'] = r'D:\DMDATABASE\bin' # 数据库bin路径,以便于去执行dmfldr
mapping_config_file = os.path.join(curr_path, 'mapping/config.dat') # 拼接config文件路径
template_dir = os.path.join(curr_path, 'templates')
ctl_dir = os.path.join(curr_path, 'ctls')
log_dir = os.path.join(curr_path, 'logs')
data_bad_dir = os.path.join(curr_path, 'data_bad')
data_bak_dir = os.path.join(curr_path, 'data_bak')
db_host = 'LOCALHOST'
db_port = '5236'
# db_sid = ''
db_user = 'JZD'
db_pwd = 'JZD123456'
# 初始化操作
def operation_init():
if not os.path.exists(mapping_config_file):
print('装载配置文件不存在,退出!')
if not os.path.exists(template_dir):
print('控制文件模板目录不存在,退出!')
if not os.path.exists(ctl_dir):
os.makedirs(ctl_dir)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
if not os.path.exists(data_bad_dir):
os.makedirs(data_bad_dir)
if not os.path.exists(data_bak_dir):
os.makedirs(data_bak_dir)
# 检查操作系统负荷
def system_load_status():
cpu_util = psutil.cpu_percent(None)
mem_util = psutil.virtual_memory().percent
io_wait = float(str(psutil.cpu_times()).split(',')[4].split('=')[1].replace(')', ''))
# if cpu_util<60 and mem_util<90 and io_wait<10: # cpu利用率, 内存利用率, io等待
if cpu_util < 99 and mem_util < 99:
status = 'good'
else:
status = 'bad'
return status
# 配置文件读取, 返回配置文件中行记录列表
def get_mapping_config(config_file):
mapping_config_list = []
with open(config_file, 'r', encoding='utf-8') as f_config_file:
for config_line in f_config_file.readlines():
if config_line.find('#') == -1:
print(config_line, end='')
mapping_config_list.append(config_line.strip().split(',')) # strip 去掉最后一个换行符
return mapping_config_list
# 判断目录下表的对应装载记录文件,没有就生成,返回装载记录文件名
def file_loaded_list(table_full_name):
file_loaded_list_name = f'file.loaded.{table_full_name}.list'
open(file_loaded_list_name,'a',encoding='utf-8')
return file_loaded_list_name
# 获取已装载的数据文件名字,返回已经装载过的数据文件名字列表
def loaded_data(curr_path, file_loaded_list_name):
loaded_data_list = []
with open(os.path.join(curr_path, file_loaded_list_name), 'r', encoding='utf-8') as f:
for data_name in f.readlines():
loaded_data_list.append(data_name.strip())
return loaded_data_list
# 获取需要装载的数据文件,返回需要装载的数据文件列表
def get_file_name(dir_path, loaded_data_list, table_name ):
need_load_file_list = []
for need_load_file_name in os.listdir(dir_path):
if need_load_file_name not in loaded_data_list and table_name in need_load_file_name:
need_load_file_list.append(need_load_file_name.strip())
return need_load_file_list
# 根据templates下文件生成对应的控制文件,返回控制文件名称
def gen_ctl_file(ctl_template_file, dir_path, need_load_file_list):
if need_load_file_list != [] :
in_files = ''
for in_file_path in need_load_file_list:
in_files = in_files + 'infile \'' + dir_path + in_file_path + '\'\n'
ctl_file_content = ''
with open(os.path.join(template_dir, ctl_template_file), 'r') as f_ctl_template_file:
ctl_file_content = f_ctl_template_file.read()
ctl_file_content = ctl_file_content.format(in_file_param=in_files, table_name_param=table_name,
append_flag_param=append_flag)
ctl_file_path = os.path.join(ctl_dir, table_name + '.ctl')
with open(ctl_file_path, 'w') as f_ctl_file:
f_ctl_file.write(ctl_file_content)
return ctl_file_path
# 拼装dmfldr入库语句,装载入库,返回处理结果
def load_into_db(ctl_file_path, need_load_file_list):
if need_load_file_list != []:
dmfldr_cmd = "dmfldr userid={db_user_param}/{db_pwd_param}@{db_host_param}:{db_port_praram} control='{ctl_file_path_param}' log='{log_file_param}' badfile='{bad_file_param}' direct=true parallel=false"
time_id = time.strftime("%Y%m%d%H", time.localtime())
log_file = os.path.join(log_dir, 'loader.' + table_full_name + '.' + time_id + '.log')
bad_file = os.path.join(log_dir, 'loader.' + table_full_name + '.' + time_id + '.bad')
dmfldr_cmd = dmfldr_cmd.format(db_user_param=db_user, db_pwd_param=db_pwd, db_host_param=db_host, db_port_praram=db_port, ctl_file_path_param=ctl_file_path, log_file_param=log_file, bad_file_param=bad_file)
print(dmfldr_cmd)
result = os.system(dmfldr_cmd)
return result
# 在对应装载记录文件写进装在数据文件的名字
def write_loaded_list(file_loaded_list_name, need_load_file_list):
with open(file_loaded_list_name,'a',encoding='utf-8') as f:
for name in need_load_file_list:
f.write(name+'\n')
# 删除载入的数据文件
def del_data_file(dir_path, loaded_data_list):
for need_del_data_file in os.listdir(dir_path):
if need_del_data_file in loaded_data_list:
os.remove(os.path.join(dir_path, need_del_data_file))
operation_init()
system_load_status()
mapping_config_list = get_mapping_config(mapping_config_file)
print(mapping_config_list)
for mapping_config in mapping_config_list:
seq_num = mapping_config[0] # 序号
table_full_name = mapping_config[1] # schema.表名
table_name = mapping_config[1].split('.')[1] # 表名
schema_name = mapping_config[1].split('.')[0] # schema
dir_path = mapping_config[2] # 文件目录
file_pattern = mapping_config[3] # 文件名PATTERN
ctl_template_file = mapping_config[4] # 控制文件模板
append_flag = mapping_config[5] # 是append还是truncate
file_loaded_list_name = file_loaded_list(table_full_name)
loaded_data_list = loaded_data(curr_path, file_loaded_list_name)
need_load_file_list = get_file_name(dir_path, loaded_data_list, table_name)
ctl_file_path = gen_ctl_file(ctl_template_file, dir_path, need_load_file_list)
load_into_db(ctl_file_path, need_load_file_list)
write_loaded_list(file_loaded_list_name, need_load_file_list)