DBUtil
import pymysqlclass DBUtil(): def __init__(self, dbName): self.host = '127.0.0.1' self.port = 3306 self.user = 'root' self.password = '123456' self.db = dbName def __getConnect(self): return pymysql.connect(self.host, self.user, self.password, self.db) # 根据dict插入一条数据 def insertOne(self, dict_data, table_name): global insert_sql conn = self.__getConnect() keyStr = '' # 列的字段 valueStr = '' # 行字段 for key in dict_data: keyStr += ' ' + key + ',' valueStr = valueStr + "%(" + key + ")s," keyStr = keyStr.rstrip(',') valueStr = valueStr.rstrip(',') try: cursor = conn.cursor() insert_sql = "insert into " + table_name + "(" + keyStr + " ) values( " + valueStr + " )" result = cursor.execute(insert_sql, dict_data) conn.commit() cursor.close() conn.close() print("执行sql:", insert_sql) print("插入单条数据成功,影响行数:", result) except Exception as e: conn.rollback() print("DBUtil.insertOne()插入单条操作失败:", e) print(insert_sql) raise e # 批量插入 def insertList(self, dict_data_list, table_name): for dic_data in dict_data_list: self.insertOne(dic_data, table_name) # 根据sql查询 返回嵌套元组 可自行强转list,dict def select(self, sql): conn = self.__getConnect() cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 执行SQL语句 cursor.execute(sql) conn.commit() # 获取所有记录列表 results = cursor.fetchall() return results except Exception as e: print("DBUtil.select()查询单条记录失败:", e) raise e finally: cursor.close() conn.close() # 查个数 def selectCount(self, tableName, whereSql): sql = "select count(*) from " + tableName + " where " whereSql = whereSql.strip() if "where" == whereSql[0:5]: whereSql = whereSql.replace("where", "").strip() sql += whereSql conn = self.__getConnect() cursor = conn.cursor() try: # 执行SQL语句 cursor.execute(sql) conn.commit() # 获取所有记录列表 results = cursor.fetchall() return results[0][0] except Exception as e: print("查询记录数失败:", e) print("DBUtil.selectCount()查询记录数失败:", e) raise e finally: cursor.close() conn.close() # 其他 def execute(self, sql): conn = self.__getConnect() cursor = conn.cursor() try: cursor.execute(sql) print("执行sql:", sql) conn.commit() except Exception as e: print("执行sql失败:", e) print("DBUtil.execute()执行失败:", e) raise e finally: cursor.close() conn.close()
SqliteUtil
import sqlite3class sqliteUtil(): def __init__(self, dbFile): self.dbFile = dbFile def __getConnect(self): return sqlite3.connect(self.dbFile) def dict_factory(self, cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d # 根据sql查询 返回嵌套元组 可自行强转list,dict def select(self, sql): conn = self.__getConnect() conn.row_factory = self.dict_factory cursor = conn.cursor() try: # 执行SQL语句 cursor.execute(sql) conn.commit() # 获取所有记录列表 results = cursor.fetchall() return results except: print("Error: unable to fecth data") cursor.close() conn.close() # 查个数 def selectCount(self, tableName, whereSql): sql = "select count(*) from " + tableName + " where " whereSql = whereSql.strip() if "where" == whereSql[0:5]: whereSql = whereSql.replace("where", "").strip() sql += whereSql conn = self.__getConnect() cursor = conn.cursor() try: # 执行SQL语句 cursor.execute(sql) conn.commit() # 获取所有记录列表 results = cursor.fetchall() return results[0][0] except: print("Error: unable to fecth data") cursor.close() conn.close() # 其他 def execute(self, sql): conn = self.__getConnect() cursor = conn.cursor() cursor.execute(sql) print("执行sql") conn.commit() cursor.close() conn.close() # 根据dict插入一条数据 def insertOne(self, dict_data, table_name): global insert_sql conn = self.__getConnect() keyStr = '' # 列的字段 valueStr = '' # 行字段 for key in dict_data: keyStr += ' ' + key + ',' # keyStr += '? ,' valueStr = valueStr + "'" + dict_data[key] + "'," keyStr = keyStr.rstrip(',') valueStr = valueStr.rstrip(',') try: cursor = conn.cursor() insert_sql = "insert into " + table_name + "(" + keyStr + " ) values( " + valueStr + " )" result = cursor.execute(insert_sql, dict_data) conn.commit() cursor.close() conn.close() print("执行sql:", insert_sql) print("插入单条数据成功,影响行数:", result.rowcount) except Exception as e: conn.rollback() print("sqliteUtil.insertOne()插入单条操作失败:", e) print(insert_sql) raise e # 批量插入 def insertList(self, dict_data_list, table_name): for dic_data in dict_data_list: self.insertOne(dic_data, table_name) # 创建表 def createTable(self, sql): sql_demo = ''' CREATE TABLE password_library( name text not null, url text not null, username text not null, password text not null, note text null, update_time text null, primary key (name, url, username, password) ); ''' con = self.__getConnect() cur = con.cursor() cur.execute(sql)
XlsUtil
# -*- coding: utf-8 -*-import xlrdimport xlwtdef read(path): """ :param path 文件路径 """ workbook = xlrd.open_workbook(path) data_sheet = workbook.sheets()[0] row_num = data_sheet.nrows col_num = data_sheet.ncols data = [] for i in range(row_num): row_data = [] for j in range(col_num): if (i == 0): continue row_data.append(data_sheet.cell_value(i, j)) data.append(row_data) print(data) return datadef write(data, header, out_path): """ :param data 表格数据 :param header 表头数据 :param out_path 输出路径 """ wbk = xlwt.Workbook() sheet = wbk.add_sheet('Sheet1', cell_overwrite_ok=True) for i in range(len(header)): sheet.write(0, i, header[i]) for row in range(len(data)): for col in range(len(data[row])): sheet.write(row + 1, col, data[row][col]) print(out_path) wbk.save(out_path)# test read and writeif __name__ == '__main__': write([ [1.0, '海底世界', '1.mp4', '1.png', 'animal:动物,coral:珊瑚,crab:蟹,dolphin:海豚,jellyfish:水母,near:靠近,ocean:海洋,plant:植物,soft:软的,tail:尾巴', '神秘的海底世界,色彩斑斓,有好多好多的动物,海龟、海豚、海豹、水母…… 鲸是怎么跳得那么高的呢,一起去看看吧~', '免费'], [2.0, '沙滩上的一天', '2.mp4', '2.png', 'brown:棕色的,cute:可爱,desk:书桌,great:美好的,late:晚的,seashell:贝壳,sky:天空,put:放,small:小的,star:星星', '沙滩,阳光,贝壳。我们会在沙滩上看到些什么呢?这一天的旅行是怎么度过呢?让我们一起度过这美好的一天吧。', '免费'], [3.0, '沙滩,我来了!', '3.mp4', '3.png', 'beach:海滩,sea:海,shorts:短裤,summertime:夏日,swimsuit:泳衣,wave:波浪,wear:穿,sunscreen:防晒霜,tank top:背心,sunglasses:太阳眼镜', '炎炎夏日,最好的去处当然是沙滩啦。为了我们的沙滩旅行,我们需要准备些什么呢?在沙滩上,我们又会做些什么游戏呢?', '会员'], [4.0, '在度假中', '4.mp4', '4.png', 'beach:海滩,having a picnic:吃野餐,hungry:饿的,sand:沙子,sea:海,looking at:看着,sitting on:坐在…上,looking for:寻找着,together:一起,yard:院子', '度假时间到了,让我们一去去逛逛,沙滩,花园,大海,让我们一起去看看我们小伙伴的度假时光吧。', '会员']], ['编号', '名称', '视频文件', '封面文件', '单词', '简介', '权限'], 'test_new2.xlsx')
XlsxUtil
# -*- coding: utf-8 -*-from openpyxl import load_workbook, Workbookdef read(file_path, header=None, header_row=0, sheet_name=None): """ :param file_path 文件全路径 :param header 表头["name","age","count"] :param header_row 表头行号 :param sheet_name 工作表名称,为空则默认 """ work_book = load_workbook(file_path) if sheet_name is None: work_sheet = work_book.active else: work_sheet = work_book[sheet_name] data_list = [] # 遍历所有行 for i, row in enumerate(work_sheet.rows): # 跳过表头行 if i > header_row: if header is None: data_list.append([cell.value for cell in row]) else: data_list.append(dict(zip(header, [cell.value for cell in row]))) return data_listdef write(header, data_list, save_path): """ :param file_path 文件全路径 :param header 表头["name","age","count"] :param header_row 表头行号 :param sheet_name 工作表名称,为空则默认 """ work_book = Workbook() work_sheet = work_book.active for j, item in enumerate(header): work_sheet.cell(1, j + 1, item) for row in range(len(data_list)): for col in range(len(data_list[row])): work_sheet.cell(row + 2, col + 1, data_list[row][col]) work_book.save(save_path)if __name__ == '__main__': header = ['sort', 'type', 'tu', 'number', "model", "xinghao", "jihao", "chejia"] # 固定的标题 result_list = read(r'C:\Users\Administrator\Desktop\存货档案11_4 (1)(1).XLS', header, 2, "Sheet1") print(result_list) data_list = [ [1.0, '海底世界', '1.mp4', '1.png', 'animal:动物,coral:珊瑚,crab:蟹,dolphin:海豚,jellyfish:水母,near:靠近,ocean:海洋,plant:植物,soft:软的,tail:尾巴', '神秘的海底世界,色彩斑斓,有好多好多的动物,海龟、海豚、海豹、水母…… 鲸是怎么跳得那么高的呢,一起去看看吧~', '免费'], [2.0, '沙滩上的一天', '2.mp4', '2.png', 'brown:棕色的,cute:可爱,desk:书桌,great:美好的,late:晚的,seashell:贝壳,sky:天空,put:放,small:小的,star:星星', '沙滩,阳光,贝壳。我们会在沙滩上看到些什么呢?这一天的旅行是怎么度过呢?让我们一起度过这美好的一天吧。', '免费'], [3.0, '沙滩,我来了!', '3.mp4', '3.png', 'beach:海滩,sea:海,shorts:短裤,summertime:夏日,swimsuit:泳衣,wave:波浪,wear:穿,sunscreen:防晒霜,tank top:背心,sunglasses:太阳眼镜', '炎炎夏日,最好的去处当然是沙滩啦。为了我们的沙滩旅行,我们需要准备些什么呢?在沙滩上,我们又会做些什么游戏呢?', '会员'], [4.0, '在度假中', '4.mp4', '4.png', 'beach:海滩,having a picnic:吃野餐,hungry:饿的,sand:沙子,sea:海,looking at:看着,sitting on:坐在…上,looking for:寻找着,together:一起,yard:院子', '度假时间到了,让我们一去去逛逛,沙滩,花园,大海,让我们一起去看看我们小伙伴的度假时光吧。', '会员'] ] write(header, data_list, r'C:\Users\Administrator\Desktop\2.xlsx')
FileUtil
# -*- coding: utf-8 -*-import hashlibimport osimport os.pathfrom configparser import ConfigParser# 合并一个文件夹下多个文件def merge_files(directory_path, outfilePath): """ directory_path: 文件夹目录 outfilePath: 输出文件路径,包括文件名 """ with open(outfilePath, 'a+', encoding="utf-8") as of: # root 所指的是当前正在遍历的这个文件夹的本身的地址 # dirs 是一个 list ,内容是该文件夹中所有的目录的名字(不包括子目录) # files 同样是 list , 内容是该文件夹中所有的文件(不包括子目录) # 所以root+file = 目录下每一个文件路径 for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) with open(file_path, encoding="utf-8") as f: # 换行写入 of.write(f.read() + "\n")# 删除文件夹下的空目录def remove_empty_dir(path): for root, dirs, files in os.walk(path): for item in dirs: dir_path = os.path.join(root, item) try: os.rmdir(dir_path) except: pass# 追加内容到文件头部def append_head_file(file_path, content): """ file_path: 文件路径 content: 追加的内容 """ with open(file_path, 'r+', encoding="utf-8") as f: old = f.read() f.seek(0) f.write(content) f.write(old)# 获取配置文件对象def read_config(ini_file): cfg = ConfigParser() cfg.read(ini_file, encoding="utf-8") return cfg# 读取文件def read_file(src): with open(src, "r", encoding="utf-8") as f: return f.read()# 读取所有行def read_lines(src): with open(src, "r", encoding="utf-8") as f: return f.readlines()# m3u文件追加,无文件则创建def append_m3u8(file_path, img_url, group_name, video_name, video_url): """ file_path 如 1.m3u """ content = '#EXTINF:-1 tvg-logo="{}" group-title="{}", {}\n{}\n\n'.format(img_url, group_name, video_name, video_url) if not os.path.exists(file_path): content = "#EXTM3U\n" + content with open(file_path, mode="a", encoding="utf-8") as f: f.write(content)# 追加文本文件def append_text(filename, text): with open(filename, "a", encoding="utf-8") as f: f.write(text)# 保存文本文件def save_text(filename, text): with open(filename, "w", encoding="utf-8") as f: f.write(text)# 保存二进制文件def save_file(filename, content): with open(filename, "wb") as f: f.write(content)# 文件(夹)是否存在def exist_path(path): return os.path.exists(path)# 获取文件hashdef get_content_hash(file_path): file_size = os.path.getsize(file_path) sha1 = hashlib.sha1() size = 5 temp = size # 输出 with open(file_path, 'rb') as f: while True: chunk = f.read(1024 * 1024 * size) if chunk: print("文件大小{}M 已读取{}M".format(file_size, temp)) temp += size sha1.update(chunk) else: break return sha1.hexdigest()
JsonUtil
# -*- coding: utf-8 -*-import json# json字符串转对象def json2obj(text): return json.loads(text)# 对象转json字符串def obj2json(obj, indent=2): return json.dumps(obj, ensure_ascii=False, indent=indent)# 将json文件转成对象def read_json_file_2_obj(src): with open(src, "r", encoding="utf-8") as f: return json2obj(f.read())# 保存json字符串或者对象到文件中def save_json_file(obj, filename): if type(obj) != str: obj = obj2json(obj) with open(filename, "w", encoding="utf-8") as f: f.write(obj)
LogUtil
# -*- coding: utf-8 -*-import loggingimport osimport timedef getLogger(log_file_path): """ :param log_file_path: 日志存放目录 :return: logger对象 """ logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) if not os.path.exists(log_file_path): os.makedirs(log_file_path) log_path = log_file_path + "/output-" + time.strftime("%Y-%m-%d") + ".log" handler = logging.FileHandler(log_path, encoding="utf-8") formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) return logger
OtherUtil
# -*- coding: utf-8 -*-import datetimeimport hashlibimport osimport os.pathimport randomimport timeimport tracebackfrom JsonUtil import *from RequestUtil import get_resp# 生成指定范围的日期def dateRange(beginDate, endDate, split_str): """ beginDate: '2020/01/01' endDate: '2020/01/08' split_str: '/' """ dates = [] format_str = "%Y{}%m{}%d".format(split_str, split_str) dt = datetime.datetime.strptime(beginDate, format_str) date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime(format_str) return dates# 打印异常信息def print_exc(): traceback.print_exc()# md5加密def md5(text): m = hashlib.md5() m.update(text.encode('UTF-8')) return m.hexdigest()# 执行cmd命令def cmd(command): os.system(command)# 获取时间戳 毫秒级 长度13def get_timestamp(): return str(int(time.time() * 1000))# 输出调试def print_debug(*obj): print('\033[1;46m', *obj, '\033[0m')# 遍历打印def foreach(data_list): for data in data_list: print(data)# 是否包含中文def has_contain_chinese(check_str): for ch in check_str: if '\u4e00' <= ch <= '\u9fff': return True return False# 切换v2节点def v2ray_random(): os.system('taskkill /f /t /im v2rayN.exe') json_file = r"F:/app/v2rayN-Core/guiNConfig.json" data = read_json_file_2_obj(json_file) try: print("原节点:", data["vmess"][data["index"]]["remarks"], end=" ") except: pass try: data["index"] = random.randint(5, len(data["vmess"])) print("变更为:", data["vmess"][data["index"]]["remarks"]) except: pass save_json_file(data, json_file) os.startfile('F:/app/v2rayN-Core/v2rayN.exe') proxies = { 'http': 'socks5://localhost:10808', 'https': 'socks5://localhost:10808' } try: resp = get_resp("https://www.google.com.hk/", proxies=proxies, timeout=4) if resp.status_code != 200: print("节点连接错误,重新更换") v2ray_random() except: print("节点连接异常,重新更换") v2ray_random()# 计时def timing(target): print("程序开始计时...") start = time.time() target() print("程序运行耗时: {}s".format(round(time.time() - start, 2)))
RequestUtil
# -*- coding: utf-8 -*-import refrom urllib.parse import quote, unquoteimport requestsfrom lxml import etreefrom requests import utilsfrom retrying import retrydefault_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36",}# 获取responsedef get_resp(url, headers=None, method="get", **kwargs): if headers is None: headers = default_headers return requests.request(method=method, url=url, headers=headers, **kwargs)# 获取response 重试999次@retry(stop_max_attempt_number=999)def get_resp_retry(url, headers=None, method="get", **kwargs): return get_resp(url, headers, method, **kwargs)# 保存文件,图片,视频等二进制流def save_file_with_url(url, filename, headers=None): with open(filename, "wb") as f: f.write(get_resp(url, headers).content)# 保存文本,网页def save_text_with_url(url, filename, headers=None): with open(filename, "w", encoding="utf-8") as f: f.write(get_resp(url, headers).text)# 获取xpath对象def get_xpath(html_str): return etree.HTML(html_str)# 格式化字符串def formatText(text): return re.sub(r"[/\\:*?\"<>|]", "_", text)# cookie字符串转对象def cookie_dict(resp): return utils.dict_from_cookiejar(resp.cookies)# 对象转cookie字符串def dict_to_cookie(cookie_dict): cookies_str = "" for cookie in cookie_dict: cookies_str += cookie + "=" + cookie_dict[cookie] + "; " return cookies_str[0:-1]# url参数编码def url_encode(value): return quote(value)# url参数解码def url_decode(value): return unquote(value)# 将浏览器复制的headers参数 格式化成dictdef header_format(headers_str): return {x.split(':')[0].strip(): ("".join(x.split(':')[1:])).strip().replace('//', "://") for x in headers_str.strip().split('\n')}
部署全局工具类
import shutilif __name__ == '__main__': py_list = ["FileUtil.py", "JsonUtil.py", "OtherUtil.py", "RequestUtil.py", "DBUtil.py", "LogUtil.py", "XlsUtil.py", "XlsxUtil.py", "SqliteUtil.py"] dic = "C:/Users/Administrator/AppData/Local/Programs/Python/Python38/Lib/site-packages" for py in py_list: print("部署:", py) shutil.copy(py, dic)
引入方式
from FileUtil import *from JsonUtil import *from OtherUtil import *from RequestUtil import *