https://gitee.com/muyuOvO/DMdatabase/blob/master/python%E5%AE%9E%E7%8E%B0%E6%8B%BC%E8%A3%85dmfldr#
# -*- coding: utf-8 -*-# @Author : muyuOvO# @Link :# @Date : 2022/2/15import psutilimport osimport timecurr_path = os.getcwd() # 程序主路径# os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.AL32UTF8'os.environ['path'] = r'D:\DMDATABASE\bin' # 数据库bin路径,以便于去执行dmfldrmapping_config_file = os.path.join(curr_path, 'mapping/config.dat') # 拼接config文件路径template_dir = os.path.join(curr_path, 'templates')ctl_dir = os.path.join(curr_path, 'ctls')log_dir = os.path.join(curr_path, 'logs')data_bad_dir = os.path.join(curr_path, 'data_bad')data_bak_dir = os.path.join(curr_path, 'data_bak')db_host = 'LOCALHOST'db_port = '5236'# db_sid = ''db_user = 'JZD'db_pwd = 'JZD123456'# 初始化操作def operation_init():if not os.path.exists(mapping_config_file):print('装载配置文件不存在,退出!')if not os.path.exists(template_dir):print('控制文件模板目录不存在,退出!')if not os.path.exists(ctl_dir):os.makedirs(ctl_dir)if not os.path.exists(log_dir):os.makedirs(log_dir)if not os.path.exists(data_bad_dir):os.makedirs(data_bad_dir)if not os.path.exists(data_bak_dir):os.makedirs(data_bak_dir)# 检查操作系统负荷def system_load_status():cpu_util = psutil.cpu_percent(None)mem_util = psutil.virtual_memory().percentio_wait = float(str(psutil.cpu_times()).split(',')[4].split('=')[1].replace(')', ''))# if cpu_util<60 and mem_util<90 and io_wait<10: # cpu利用率, 内存利用率, io等待if cpu_util < 99 and mem_util < 99:status = 'good'else:status = 'bad'return status# 配置文件读取, 返回配置文件中行记录列表def get_mapping_config(config_file):mapping_config_list = []with open(config_file, 'r', encoding='utf-8') as f_config_file:for config_line in f_config_file.readlines():if config_line.find('#') == -1:print(config_line, end='')mapping_config_list.append(config_line.strip().split(',')) # strip 去掉最后一个换行符return mapping_config_list# 判断目录下表的对应装载记录文件,没有就生成,返回装载记录文件名def file_loaded_list(table_full_name):file_loaded_list_name = f'file.loaded.{table_full_name}.list'open(file_loaded_list_name,'a',encoding='utf-8')return file_loaded_list_name# 获取已装载的数据文件名字,返回已经装载过的数据文件名字列表def loaded_data(curr_path, file_loaded_list_name):loaded_data_list = []with open(os.path.join(curr_path, file_loaded_list_name), 'r', encoding='utf-8') as f:for data_name in f.readlines():loaded_data_list.append(data_name.strip())return loaded_data_list# 获取需要装载的数据文件,返回需要装载的数据文件列表def get_file_name(dir_path, loaded_data_list, table_name ):need_load_file_list = []for need_load_file_name in os.listdir(dir_path):if need_load_file_name not in loaded_data_list and table_name in need_load_file_name:need_load_file_list.append(need_load_file_name.strip())return need_load_file_list# 根据templates下文件生成对应的控制文件,返回控制文件名称def gen_ctl_file(ctl_template_file, dir_path, need_load_file_list):if need_load_file_list != [] :in_files = ''for in_file_path in need_load_file_list:in_files = in_files + 'infile \'' + dir_path + in_file_path + '\'\n'ctl_file_content = ''with open(os.path.join(template_dir, ctl_template_file), 'r') as f_ctl_template_file:ctl_file_content = f_ctl_template_file.read()ctl_file_content = ctl_file_content.format(in_file_param=in_files, table_name_param=table_name,append_flag_param=append_flag)ctl_file_path = os.path.join(ctl_dir, table_name + '.ctl')with open(ctl_file_path, 'w') as f_ctl_file:f_ctl_file.write(ctl_file_content)return ctl_file_path# 拼装dmfldr入库语句,装载入库,返回处理结果def load_into_db(ctl_file_path, need_load_file_list):if need_load_file_list != []:dmfldr_cmd = "dmfldr userid={db_user_param}/{db_pwd_param}@{db_host_param}:{db_port_praram} control='{ctl_file_path_param}' log='{log_file_param}' badfile='{bad_file_param}' direct=true parallel=false"time_id = time.strftime("%Y%m%d%H", time.localtime())log_file = os.path.join(log_dir, 'loader.' + table_full_name + '.' + time_id + '.log')bad_file = os.path.join(log_dir, 'loader.' + table_full_name + '.' + time_id + '.bad')dmfldr_cmd = dmfldr_cmd.format(db_user_param=db_user, db_pwd_param=db_pwd, db_host_param=db_host, db_port_praram=db_port, ctl_file_path_param=ctl_file_path, log_file_param=log_file, bad_file_param=bad_file)print(dmfldr_cmd)result = os.system(dmfldr_cmd)return result# 在对应装载记录文件写进装在数据文件的名字def write_loaded_list(file_loaded_list_name, need_load_file_list):with open(file_loaded_list_name,'a',encoding='utf-8') as f:for name in need_load_file_list:f.write(name+'\n')# 删除载入的数据文件def del_data_file(dir_path, loaded_data_list):for need_del_data_file in os.listdir(dir_path):if need_del_data_file in loaded_data_list:os.remove(os.path.join(dir_path, need_del_data_file))operation_init()system_load_status()mapping_config_list = get_mapping_config(mapping_config_file)print(mapping_config_list)for mapping_config in mapping_config_list:seq_num = mapping_config[0] # 序号table_full_name = mapping_config[1] # schema.表名table_name = mapping_config[1].split('.')[1] # 表名schema_name = mapping_config[1].split('.')[0] # schemadir_path = mapping_config[2] # 文件目录file_pattern = mapping_config[3] # 文件名PATTERNctl_template_file = mapping_config[4] # 控制文件模板append_flag = mapping_config[5] # 是append还是truncatefile_loaded_list_name = file_loaded_list(table_full_name)loaded_data_list = loaded_data(curr_path, file_loaded_list_name)need_load_file_list = get_file_name(dir_path, loaded_data_list, table_name)ctl_file_path = gen_ctl_file(ctl_template_file, dir_path, need_load_file_list)load_into_db(ctl_file_path, need_load_file_list)write_loaded_list(file_loaded_list_name, need_load_file_list)
