#!/bin/env python
# encoding=utf-8
import json
import re
import os
import sys
import ftplib
import time
import datetime
import psycopg2
from ftplib import FTP
from kafka import KafkaProducer
from multiprocessing.dummy import Pool as ThreadPool
def conn_pg():
conn = psycopg2.connect(database='XX', user='XX', password='XXXX', host='XX.XX.XX.XX', port='XXXX')
print('Open database successfully')
return conn
def conn_ftp(ftp_server, ftp_port, user_name, password):
try:
ftp = FTP()
ftp.connect(ftp_server, ftp_port)
ftp.login(user_name, password)
except Exception as e:
return None
else:
return ftp
def ftp_scan(source_brokers, source_topic, infor):
ftp_server = infor[0]
ftp_port = int(infor[1])
ftp_name = infor[2]
ftp_pwd = infor[3]
remote_path = infor[5]
remote_bk_path = infor[6]
local_path = infor[7]
file_regular = infor[8]
chkf_regular = infor[9]
mv_flag = int(infor[10])
print(remote_path)
filepp=[]
ftp_conn = conn_ftp(ftp_server, ftp_port, ftp_name, ftp_pwd)
if not ftp_conn:
return filepp
ftp_conn.cwd(remote_path)
ftplist = ftp_conn.nlst()
# 遍历需要的数据文件、校验文件
for files in ftplist:
# 获取文件的时间
L = list(ftp_conn.sendcmd('MDTM ' + files))
# 将获取到的文件时间拼接
datetime_files = L[4] + L[5] + L[6] + L[7] + '-' + L[8] + L[9] + '-' + L[10] + L[11] + ' ' + L[12] + L[
13] + ':' + L[14] + L[15] + ':' + L[16] + L[17]
# 将str转换为datime格式
date_time_files = datetime.datetime.strptime(datetime_files, '%Y-%m-%d %H:%M:%S')
# utc时间+8小时,得到北京时间
sh_time = date_time_files + datetime.timedelta(hours=8)
file_size = ftp_conn.size(files)
if re.match(r"%s" % (file_regular), files) or re.match(r"%s" % (chkf_regular), files):
filepp.append(files)
jsonstr = '''filename":"{files}","filetime":"{sh_time}","filesize":"{file_size}"'''.format(
files=files, sh_time=sh_time, file_size=file_size)
print(jsonstr)
jsonstr = '{' + jsonstr.replace('\n', '') + '}'
kafka_producer(source_brokers, source_topic, jsonstr)
return filepp
def file_down(file,infor):
#获取配置表相关信息
ftp_server = infor[0]
ftp_port = int(infor[1])
ftp_name = infor[2]
ftp_pwd = infor[3]
remote_path = infor[5]
remote_bk_path = infor[6]
local_path = infor[7]
ftp_conn = conn_ftp(ftp_server, ftp_port, ftp_name, ftp_pwd)
if ftp_conn:
print('downloading '+file)
filename = local_path + '/' + file
remotefile = remote_path + '/' + file
remotebkfile = remote_bk_path + '/' + file
try:
ftp_conn.cwd(remote_path)
f = open(filename, 'wb')
file_handle = f.write
ftp_conn.retrbinary("RETR %s" % (file), file_handle)
print('download complete '+file)
ftp_conn.rename(remotefile, remotebkfile)
print('rename complete '+file)
print('finsh '+file)
except Exception as e:
print(e)
ftp_conn.close()
def kafka_producer(kafkabrokes, kafkatopic, jsonstr):
producer = KafkaProducer(bootstrap_servers=kafkabrokes)
try:
producer.send(kafkatopic, jsonstr)
except Exception as e:
print e
else:
producer.close()
if __name__ == '__main__':
down_group_id = sys.argv[1]
source_brokers = ['XX.XX.XX.XX:XXXX', 'XX.XX.XX.XX:XXXX'] #brokes
source_topic = "XX" # 批处理topic
sql = '''
SELECT
a.ip,
a.ftp_port,
a.ftp_name,
a.ftp_pwd,
a.home_path,
b.remote_path,
b.remote_bk_path,
b.local_path,
b.file_regular,
b.chkf_regular,
b.mv_flag,
b.data_source,
b.table_attr,
b.prov_id
FROM
public.conf_host_info a,
public.conf_path_info b
WHERE
a.host_key = b.host_key
AND b.is_valid = '0' and down_group_id = '{id}' and a.host_type = '3'
'''.format(id=down_group_id)
pg_conn = conn_pg()
cur = pg_conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
print(rows)
pg_conn.close()
while True:
for i in rows:
files=ftp_scan(source_brokers, source_topic, i)
pool = ThreadPool(10)
# 为thread_q创建处理线程
for file in files:
pool.apply_async(file_down, (file, i))
pool.close()
pool.join()
time.sleep(10)