#!/bin/env python# encoding=utf-8import jsonimport reimport osimport sysimport ftplibimport timeimport datetimeimport psycopg2from ftplib import FTPfrom kafka import KafkaProducerfrom multiprocessing.dummy import Pool as ThreadPooldef conn_pg(): conn = psycopg2.connect(database='XX', user='XX', password='XXXX', host='XX.XX.XX.XX', port='XXXX') print('Open database successfully') return conndef conn_ftp(ftp_server, ftp_port, user_name, password): try: ftp = FTP() ftp.connect(ftp_server, ftp_port) ftp.login(user_name, password) except Exception as e: return None else: return ftpdef ftp_scan(source_brokers, source_topic, infor): ftp_server = infor[0] ftp_port = int(infor[1]) ftp_name = infor[2] ftp_pwd = infor[3] remote_path = infor[5] remote_bk_path = infor[6] local_path = infor[7] file_regular = infor[8] chkf_regular = infor[9] mv_flag = int(infor[10]) print(remote_path) filepp=[] ftp_conn = conn_ftp(ftp_server, ftp_port, ftp_name, ftp_pwd) if not ftp_conn: return filepp ftp_conn.cwd(remote_path) ftplist = ftp_conn.nlst() # 遍历需要的数据文件、校验文件 for files in ftplist: # 获取文件的时间 L = list(ftp_conn.sendcmd('MDTM ' + files)) # 将获取到的文件时间拼接 datetime_files = L[4] + L[5] + L[6] + L[7] + '-' + L[8] + L[9] + '-' + L[10] + L[11] + ' ' + L[12] + L[ 13] + ':' + L[14] + L[15] + ':' + L[16] + L[17] # 将str转换为datime格式 date_time_files = datetime.datetime.strptime(datetime_files, '%Y-%m-%d %H:%M:%S') # utc时间+8小时,得到北京时间 sh_time = date_time_files + datetime.timedelta(hours=8) file_size = ftp_conn.size(files) if re.match(r"%s" % (file_regular), files) or re.match(r"%s" % (chkf_regular), files): filepp.append(files) jsonstr = '''filename":"{files}","filetime":"{sh_time}","filesize":"{file_size}"'''.format( files=files, sh_time=sh_time, file_size=file_size) print(jsonstr) jsonstr = '{' + jsonstr.replace('\n', '') + '}' kafka_producer(source_brokers, source_topic, jsonstr) return fileppdef file_down(file,infor): #获取配置表相关信息 ftp_server = infor[0] ftp_port = int(infor[1]) ftp_name = infor[2] ftp_pwd = infor[3] remote_path = infor[5] remote_bk_path = infor[6] local_path = infor[7] ftp_conn = conn_ftp(ftp_server, ftp_port, ftp_name, ftp_pwd) if ftp_conn: print('downloading '+file) filename = local_path + '/' + file remotefile = remote_path + '/' + file remotebkfile = remote_bk_path + '/' + file try: ftp_conn.cwd(remote_path) f = open(filename, 'wb') file_handle = f.write ftp_conn.retrbinary("RETR %s" % (file), file_handle) print('download complete '+file) ftp_conn.rename(remotefile, remotebkfile) print('rename complete '+file) print('finsh '+file) except Exception as e: print(e) ftp_conn.close()def kafka_producer(kafkabrokes, kafkatopic, jsonstr): producer = KafkaProducer(bootstrap_servers=kafkabrokes) try: producer.send(kafkatopic, jsonstr) except Exception as e: print e else: producer.close()if __name__ == '__main__': down_group_id = sys.argv[1] source_brokers = ['XX.XX.XX.XX:XXXX', 'XX.XX.XX.XX:XXXX'] #brokes source_topic = "XX" # 批处理topic sql = ''' SELECT a.ip, a.ftp_port, a.ftp_name, a.ftp_pwd, a.home_path, b.remote_path, b.remote_bk_path, b.local_path, b.file_regular, b.chkf_regular, b.mv_flag, b.data_source, b.table_attr, b.prov_id FROM public.conf_host_info a, public.conf_path_info b WHERE a.host_key = b.host_key AND b.is_valid = '0' and down_group_id = '{id}' and a.host_type = '3' '''.format(id=down_group_id) pg_conn = conn_pg() cur = pg_conn.cursor() cur.execute(sql) rows = cur.fetchall() print(rows) pg_conn.close() while True: for i in rows: files=ftp_scan(source_brokers, source_topic, i) pool = ThreadPool(10) # 为thread_q创建处理线程 for file in files: pool.apply_async(file_down, (file, i)) pool.close() pool.join() time.sleep(10)