1. #!/bin/env python
    2. # encoding=utf-8
    3. import json
    4. import re
    5. import os
    6. import sys
    7. import ftplib
    8. import time
    9. import datetime
    10. import psycopg2
    11. from ftplib import FTP
    12. from kafka import KafkaProducer
    13. from multiprocessing.dummy import Pool as ThreadPool
    14. def conn_pg():
    15. conn = psycopg2.connect(database='XX', user='XX', password='XXXX', host='XX.XX.XX.XX', port='XXXX')
    16. print('Open database successfully')
    17. return conn
    18. def conn_ftp(ftp_server, ftp_port, user_name, password):
    19. try:
    20. ftp = FTP()
    21. ftp.connect(ftp_server, ftp_port)
    22. ftp.login(user_name, password)
    23. except Exception as e:
    24. return None
    25. else:
    26. return ftp
    27. def ftp_scan(source_brokers, source_topic, infor):
    28. ftp_server = infor[0]
    29. ftp_port = int(infor[1])
    30. ftp_name = infor[2]
    31. ftp_pwd = infor[3]
    32. remote_path = infor[5]
    33. remote_bk_path = infor[6]
    34. local_path = infor[7]
    35. file_regular = infor[8]
    36. chkf_regular = infor[9]
    37. mv_flag = int(infor[10])
    38. print(remote_path)
    39. filepp=[]
    40. ftp_conn = conn_ftp(ftp_server, ftp_port, ftp_name, ftp_pwd)
    41. if not ftp_conn:
    42. return filepp
    43. ftp_conn.cwd(remote_path)
    44. ftplist = ftp_conn.nlst()
    45. # 遍历需要的数据文件、校验文件
    46. for files in ftplist:
    47. # 获取文件的时间
    48. L = list(ftp_conn.sendcmd('MDTM ' + files))
    49. # 将获取到的文件时间拼接
    50. datetime_files = L[4] + L[5] + L[6] + L[7] + '-' + L[8] + L[9] + '-' + L[10] + L[11] + ' ' + L[12] + L[
    51. 13] + ':' + L[14] + L[15] + ':' + L[16] + L[17]
    52. # 将str转换为datime格式
    53. date_time_files = datetime.datetime.strptime(datetime_files, '%Y-%m-%d %H:%M:%S')
    54. # utc时间+8小时,得到北京时间
    55. sh_time = date_time_files + datetime.timedelta(hours=8)
    56. file_size = ftp_conn.size(files)
    57. if re.match(r"%s" % (file_regular), files) or re.match(r"%s" % (chkf_regular), files):
    58. filepp.append(files)
    59. jsonstr = '''filename":"{files}","filetime":"{sh_time}","filesize":"{file_size}"'''.format(
    60. files=files, sh_time=sh_time, file_size=file_size)
    61. print(jsonstr)
    62. jsonstr = '{' + jsonstr.replace('\n', '') + '}'
    63. kafka_producer(source_brokers, source_topic, jsonstr)
    64. return filepp
    65. def file_down(file,infor):
    66. #获取配置表相关信息
    67. ftp_server = infor[0]
    68. ftp_port = int(infor[1])
    69. ftp_name = infor[2]
    70. ftp_pwd = infor[3]
    71. remote_path = infor[5]
    72. remote_bk_path = infor[6]
    73. local_path = infor[7]
    74. ftp_conn = conn_ftp(ftp_server, ftp_port, ftp_name, ftp_pwd)
    75. if ftp_conn:
    76. print('downloading '+file)
    77. filename = local_path + '/' + file
    78. remotefile = remote_path + '/' + file
    79. remotebkfile = remote_bk_path + '/' + file
    80. try:
    81. ftp_conn.cwd(remote_path)
    82. f = open(filename, 'wb')
    83. file_handle = f.write
    84. ftp_conn.retrbinary("RETR %s" % (file), file_handle)
    85. print('download complete '+file)
    86. ftp_conn.rename(remotefile, remotebkfile)
    87. print('rename complete '+file)
    88. print('finsh '+file)
    89. except Exception as e:
    90. print(e)
    91. ftp_conn.close()
    92. def kafka_producer(kafkabrokes, kafkatopic, jsonstr):
    93. producer = KafkaProducer(bootstrap_servers=kafkabrokes)
    94. try:
    95. producer.send(kafkatopic, jsonstr)
    96. except Exception as e:
    97. print e
    98. else:
    99. producer.close()
    100. if __name__ == '__main__':
    101. down_group_id = sys.argv[1]
    102. source_brokers = ['XX.XX.XX.XX:XXXX', 'XX.XX.XX.XX:XXXX'] #brokes
    103. source_topic = "XX" # 批处理topic
    104. sql = '''
    105. SELECT
    106. a.ip,
    107. a.ftp_port,
    108. a.ftp_name,
    109. a.ftp_pwd,
    110. a.home_path,
    111. b.remote_path,
    112. b.remote_bk_path,
    113. b.local_path,
    114. b.file_regular,
    115. b.chkf_regular,
    116. b.mv_flag,
    117. b.data_source,
    118. b.table_attr,
    119. b.prov_id
    120. FROM
    121. public.conf_host_info a,
    122. public.conf_path_info b
    123. WHERE
    124. a.host_key = b.host_key
    125. AND b.is_valid = '0' and down_group_id = '{id}' and a.host_type = '3'
    126. '''.format(id=down_group_id)
    127. pg_conn = conn_pg()
    128. cur = pg_conn.cursor()
    129. cur.execute(sql)
    130. rows = cur.fetchall()
    131. print(rows)
    132. pg_conn.close()
    133. while True:
    134. for i in rows:
    135. files=ftp_scan(source_brokers, source_topic, i)
    136. pool = ThreadPool(10)
    137. # 为thread_q创建处理线程
    138. for file in files:
    139. pool.apply_async(file_down, (file, i))
    140. pool.close()
    141. pool.join()
    142. time.sleep(10)