https://gitee.com/muyuOvO/DMdatabase/blob/master/python%E5%AE%9E%E7%8E%B0%E6%8B%BC%E8%A3%85dmfldr#

    1. # -*- coding: utf-8 -*-
    2. # @Author : muyuOvO
    3. # @Link :
    4. # @Date : 2022/2/15
    5. import psutil
    6. import os
    7. import time
    8. curr_path = os.getcwd() # 程序主路径
    9. # os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.AL32UTF8'
    10. os.environ['path'] = r'D:\DMDATABASE\bin' # 数据库bin路径,以便于去执行dmfldr
    11. mapping_config_file = os.path.join(curr_path, 'mapping/config.dat') # 拼接config文件路径
    12. template_dir = os.path.join(curr_path, 'templates')
    13. ctl_dir = os.path.join(curr_path, 'ctls')
    14. log_dir = os.path.join(curr_path, 'logs')
    15. data_bad_dir = os.path.join(curr_path, 'data_bad')
    16. data_bak_dir = os.path.join(curr_path, 'data_bak')
    17. db_host = 'LOCALHOST'
    18. db_port = '5236'
    19. # db_sid = ''
    20. db_user = 'JZD'
    21. db_pwd = 'JZD123456'
    22. # 初始化操作
    23. def operation_init():
    24. if not os.path.exists(mapping_config_file):
    25. print('装载配置文件不存在,退出!')
    26. if not os.path.exists(template_dir):
    27. print('控制文件模板目录不存在,退出!')
    28. if not os.path.exists(ctl_dir):
    29. os.makedirs(ctl_dir)
    30. if not os.path.exists(log_dir):
    31. os.makedirs(log_dir)
    32. if not os.path.exists(data_bad_dir):
    33. os.makedirs(data_bad_dir)
    34. if not os.path.exists(data_bak_dir):
    35. os.makedirs(data_bak_dir)
    36. # 检查操作系统负荷
    37. def system_load_status():
    38. cpu_util = psutil.cpu_percent(None)
    39. mem_util = psutil.virtual_memory().percent
    40. io_wait = float(str(psutil.cpu_times()).split(',')[4].split('=')[1].replace(')', ''))
    41. # if cpu_util<60 and mem_util<90 and io_wait<10: # cpu利用率, 内存利用率, io等待
    42. if cpu_util < 99 and mem_util < 99:
    43. status = 'good'
    44. else:
    45. status = 'bad'
    46. return status
    47. # 配置文件读取, 返回配置文件中行记录列表
    48. def get_mapping_config(config_file):
    49. mapping_config_list = []
    50. with open(config_file, 'r', encoding='utf-8') as f_config_file:
    51. for config_line in f_config_file.readlines():
    52. if config_line.find('#') == -1:
    53. print(config_line, end='')
    54. mapping_config_list.append(config_line.strip().split(',')) # strip 去掉最后一个换行符
    55. return mapping_config_list
    56. # 判断目录下表的对应装载记录文件,没有就生成,返回装载记录文件名
    57. def file_loaded_list(table_full_name):
    58. file_loaded_list_name = f'file.loaded.{table_full_name}.list'
    59. open(file_loaded_list_name,'a',encoding='utf-8')
    60. return file_loaded_list_name
    61. # 获取已装载的数据文件名字,返回已经装载过的数据文件名字列表
    62. def loaded_data(curr_path, file_loaded_list_name):
    63. loaded_data_list = []
    64. with open(os.path.join(curr_path, file_loaded_list_name), 'r', encoding='utf-8') as f:
    65. for data_name in f.readlines():
    66. loaded_data_list.append(data_name.strip())
    67. return loaded_data_list
    68. # 获取需要装载的数据文件,返回需要装载的数据文件列表
    69. def get_file_name(dir_path, loaded_data_list, table_name ):
    70. need_load_file_list = []
    71. for need_load_file_name in os.listdir(dir_path):
    72. if need_load_file_name not in loaded_data_list and table_name in need_load_file_name:
    73. need_load_file_list.append(need_load_file_name.strip())
    74. return need_load_file_list
    75. # 根据templates下文件生成对应的控制文件,返回控制文件名称
    76. def gen_ctl_file(ctl_template_file, dir_path, need_load_file_list):
    77. if need_load_file_list != [] :
    78. in_files = ''
    79. for in_file_path in need_load_file_list:
    80. in_files = in_files + 'infile \'' + dir_path + in_file_path + '\'\n'
    81. ctl_file_content = ''
    82. with open(os.path.join(template_dir, ctl_template_file), 'r') as f_ctl_template_file:
    83. ctl_file_content = f_ctl_template_file.read()
    84. ctl_file_content = ctl_file_content.format(in_file_param=in_files, table_name_param=table_name,
    85. append_flag_param=append_flag)
    86. ctl_file_path = os.path.join(ctl_dir, table_name + '.ctl')
    87. with open(ctl_file_path, 'w') as f_ctl_file:
    88. f_ctl_file.write(ctl_file_content)
    89. return ctl_file_path
    90. # 拼装dmfldr入库语句,装载入库,返回处理结果
    91. def load_into_db(ctl_file_path, need_load_file_list):
    92. if need_load_file_list != []:
    93. dmfldr_cmd = "dmfldr userid={db_user_param}/{db_pwd_param}@{db_host_param}:{db_port_praram} control='{ctl_file_path_param}' log='{log_file_param}' badfile='{bad_file_param}' direct=true parallel=false"
    94. time_id = time.strftime("%Y%m%d%H", time.localtime())
    95. log_file = os.path.join(log_dir, 'loader.' + table_full_name + '.' + time_id + '.log')
    96. bad_file = os.path.join(log_dir, 'loader.' + table_full_name + '.' + time_id + '.bad')
    97. dmfldr_cmd = dmfldr_cmd.format(db_user_param=db_user, db_pwd_param=db_pwd, db_host_param=db_host, db_port_praram=db_port, ctl_file_path_param=ctl_file_path, log_file_param=log_file, bad_file_param=bad_file)
    98. print(dmfldr_cmd)
    99. result = os.system(dmfldr_cmd)
    100. return result
    101. # 在对应装载记录文件写进装在数据文件的名字
    102. def write_loaded_list(file_loaded_list_name, need_load_file_list):
    103. with open(file_loaded_list_name,'a',encoding='utf-8') as f:
    104. for name in need_load_file_list:
    105. f.write(name+'\n')
    106. # 删除载入的数据文件
    107. def del_data_file(dir_path, loaded_data_list):
    108. for need_del_data_file in os.listdir(dir_path):
    109. if need_del_data_file in loaded_data_list:
    110. os.remove(os.path.join(dir_path, need_del_data_file))
    111. operation_init()
    112. system_load_status()
    113. mapping_config_list = get_mapping_config(mapping_config_file)
    114. print(mapping_config_list)
    115. for mapping_config in mapping_config_list:
    116. seq_num = mapping_config[0] # 序号
    117. table_full_name = mapping_config[1] # schema.表名
    118. table_name = mapping_config[1].split('.')[1] # 表名
    119. schema_name = mapping_config[1].split('.')[0] # schema
    120. dir_path = mapping_config[2] # 文件目录
    121. file_pattern = mapping_config[3] # 文件名PATTERN
    122. ctl_template_file = mapping_config[4] # 控制文件模板
    123. append_flag = mapping_config[5] # 是append还是truncate
    124. file_loaded_list_name = file_loaded_list(table_full_name)
    125. loaded_data_list = loaded_data(curr_path, file_loaded_list_name)
    126. need_load_file_list = get_file_name(dir_path, loaded_data_list, table_name)
    127. ctl_file_path = gen_ctl_file(ctl_template_file, dir_path, need_load_file_list)
    128. load_into_db(ctl_file_path, need_load_file_list)
    129. write_loaded_list(file_loaded_list_name, need_load_file_list)