DBUtil
import pymysqlclass DBUtil():    def __init__(self, dbName):        self.host = '127.0.0.1'        self.port = 3306        self.user = 'root'        self.password = '123456'        self.db = dbName    def __getConnect(self):        return pymysql.connect(self.host, self.user, self.password, self.db)    # 根据dict插入一条数据    def insertOne(self, dict_data, table_name):        global insert_sql        conn = self.__getConnect()        keyStr = ''  # 列的字段        valueStr = ''  # 行字段        for key in dict_data:            keyStr += ' ' + key + ','            valueStr = valueStr + "%(" + key + ")s,"        keyStr = keyStr.rstrip(',')        valueStr = valueStr.rstrip(',')        try:            cursor = conn.cursor()            insert_sql = "insert into " + table_name + "(" + keyStr + " ) values( " + valueStr + " )"            result = cursor.execute(insert_sql, dict_data)            conn.commit()            cursor.close()            conn.close()            print("执行sql:", insert_sql)            print("插入单条数据成功,影响行数:", result)        except Exception as e:            conn.rollback()            print("DBUtil.insertOne()插入单条操作失败:", e)            print(insert_sql)            raise e    # 批量插入    def insertList(self, dict_data_list, table_name):        for dic_data in dict_data_list:            self.insertOne(dic_data, table_name)    # 根据sql查询 返回嵌套元组 可自行强转list,dict    def select(self, sql):        conn = self.__getConnect()        cursor = conn.cursor(pymysql.cursors.DictCursor)        try:            # 执行SQL语句            cursor.execute(sql)            conn.commit()            # 获取所有记录列表            results = cursor.fetchall()            return results        except Exception as e:            print("DBUtil.select()查询单条记录失败:", e)            raise e        finally:            cursor.close()            conn.close()    # 查个数    def selectCount(self, tableName, whereSql):        sql = "select count(*) from " + tableName + " where "        whereSql = whereSql.strip()        if "where" == whereSql[0:5]:            whereSql = whereSql.replace("where", "").strip()        sql += whereSql        conn = self.__getConnect()        cursor = conn.cursor()        try:            # 执行SQL语句            cursor.execute(sql)            conn.commit()            # 获取所有记录列表            results = cursor.fetchall()            return results[0][0]        except Exception as e:            print("查询记录数失败:", e)            print("DBUtil.selectCount()查询记录数失败:", e)            raise e        finally:            cursor.close()            conn.close()    # 其他    def execute(self, sql):        conn = self.__getConnect()        cursor = conn.cursor()        try:            cursor.execute(sql)            print("执行sql:", sql)            conn.commit()        except Exception as e:            print("执行sql失败:", e)            print("DBUtil.execute()执行失败:", e)            raise e        finally:            cursor.close()            conn.close()
SqliteUtil
import sqlite3class sqliteUtil():    def __init__(self, dbFile):        self.dbFile = dbFile    def __getConnect(self):        return sqlite3.connect(self.dbFile)    def dict_factory(self, cursor, row):        d = {}        for idx, col in enumerate(cursor.description):            d[col[0]] = row[idx]        return d    # 根据sql查询 返回嵌套元组 可自行强转list,dict    def select(self, sql):        conn = self.__getConnect()        conn.row_factory = self.dict_factory        cursor = conn.cursor()        try:            # 执行SQL语句            cursor.execute(sql)            conn.commit()            # 获取所有记录列表            results = cursor.fetchall()            return results        except:            print("Error: unable to fecth data")        cursor.close()        conn.close()    # 查个数    def selectCount(self, tableName, whereSql):        sql = "select count(*) from " + tableName + " where "        whereSql = whereSql.strip()        if "where" == whereSql[0:5]:            whereSql = whereSql.replace("where", "").strip()        sql += whereSql        conn = self.__getConnect()        cursor = conn.cursor()        try:            # 执行SQL语句            cursor.execute(sql)            conn.commit()            # 获取所有记录列表            results = cursor.fetchall()            return results[0][0]        except:            print("Error: unable to fecth data")        cursor.close()        conn.close()    # 其他    def execute(self, sql):        conn = self.__getConnect()        cursor = conn.cursor()        cursor.execute(sql)        print("执行sql")        conn.commit()        cursor.close()        conn.close()    # 根据dict插入一条数据    def insertOne(self, dict_data, table_name):        global insert_sql        conn = self.__getConnect()        keyStr = ''  # 列的字段        valueStr = ''  # 行字段        for key in dict_data:            keyStr += ' ' + key + ','            # keyStr += '? ,'            valueStr = valueStr + "'" + dict_data[key] + "',"        keyStr = keyStr.rstrip(',')        valueStr = valueStr.rstrip(',')        try:            cursor = conn.cursor()            insert_sql = "insert into " + table_name + "(" + keyStr + " ) values( " + valueStr + " )"            result = cursor.execute(insert_sql, dict_data)            conn.commit()            cursor.close()            conn.close()            print("执行sql:", insert_sql)            print("插入单条数据成功,影响行数:", result.rowcount)        except Exception as e:            conn.rollback()            print("sqliteUtil.insertOne()插入单条操作失败:", e)            print(insert_sql)            raise e    # 批量插入    def insertList(self, dict_data_list, table_name):        for dic_data in dict_data_list:            self.insertOne(dic_data, table_name)    # 创建表    def createTable(self, sql):        sql_demo = '''            CREATE TABLE password_library(                name              text  not null,                url              text  not null,                username              text   not null,                password              text  not null,                note              text  null,                update_time     text null,                primary key (name, url, username, password)            );        '''        con = self.__getConnect()        cur = con.cursor()        cur.execute(sql)
XlsUtil
# -*- coding: utf-8 -*-import xlrdimport xlwtdef read(path):    """    :param path 文件路径    """    workbook = xlrd.open_workbook(path)    data_sheet = workbook.sheets()[0]    row_num = data_sheet.nrows    col_num = data_sheet.ncols    data = []    for i in range(row_num):        row_data = []        for j in range(col_num):            if (i == 0):                continue            row_data.append(data_sheet.cell_value(i, j))        data.append(row_data)    print(data)    return datadef write(data, header, out_path):    """    :param data     表格数据    :param header   表头数据    :param out_path 输出路径    """    wbk = xlwt.Workbook()    sheet = wbk.add_sheet('Sheet1', cell_overwrite_ok=True)    for i in range(len(header)):        sheet.write(0, i, header[i])    for row in range(len(data)):        for col in range(len(data[row])):            sheet.write(row + 1, col, data[row][col])    print(out_path)    wbk.save(out_path)# test read and writeif __name__ == '__main__':    write([        [1.0, '海底世界', '1.mp4', '1.png',         'animal:动物,coral:珊瑚,crab:蟹,dolphin:海豚,jellyfish:水母,near:靠近,ocean:海洋,plant:植物,soft:软的,tail:尾巴',         '神秘的海底世界,色彩斑斓,有好多好多的动物,海龟、海豚、海豹、水母…… 鲸是怎么跳得那么高的呢,一起去看看吧~', '免费'],        [2.0, '沙滩上的一天', '2.mp4', '2.png',         'brown:棕色的,cute:可爱,desk:书桌,great:美好的,late:晚的,seashell:贝壳,sky:天空,put:放,small:小的,star:星星',         '沙滩,阳光,贝壳。我们会在沙滩上看到些什么呢?这一天的旅行是怎么度过呢?让我们一起度过这美好的一天吧。',         '免费'],        [3.0, '沙滩,我来了!', '3.mp4', '3.png',         'beach:海滩,sea:海,shorts:短裤,summertime:夏日,swimsuit:泳衣,wave:波浪,wear:穿,sunscreen:防晒霜,tank top:背心,sunglasses:太阳眼镜',         '炎炎夏日,最好的去处当然是沙滩啦。为了我们的沙滩旅行,我们需要准备些什么呢?在沙滩上,我们又会做些什么游戏呢?', '会员'], [4.0, '在度假中', '4.mp4', '4.png',                                                                            'beach:海滩,having a picnic:吃野餐,hungry:饿的,sand:沙子,sea:海,looking at:看着,sitting on:坐在…上,looking for:寻找着,together:一起,yard:院子',                                                                            '度假时间到了,让我们一去去逛逛,沙滩,花园,大海,让我们一起去看看我们小伙伴的度假时光吧。',                                                                            '会员']],        ['编号', '名称', '视频文件', '封面文件', '单词', '简介', '权限'],        'test_new2.xlsx')
XlsxUtil
# -*- coding: utf-8 -*-from openpyxl import load_workbook, Workbookdef read(file_path, header=None, header_row=0, sheet_name=None):    """    :param file_path    文件全路径    :param header       表头["name","age","count"]    :param header_row   表头行号    :param sheet_name   工作表名称,为空则默认    """    work_book = load_workbook(file_path)    if sheet_name is None:        work_sheet = work_book.active    else:        work_sheet = work_book[sheet_name]    data_list = []    # 遍历所有行    for i, row in enumerate(work_sheet.rows):        # 跳过表头行        if i > header_row:            if header is None:                data_list.append([cell.value for cell in row])            else:                data_list.append(dict(zip(header, [cell.value for cell in row])))    return data_listdef write(header, data_list, save_path):    """    :param file_path    文件全路径    :param header       表头["name","age","count"]    :param header_row   表头行号    :param sheet_name   工作表名称,为空则默认    """    work_book = Workbook()    work_sheet = work_book.active    for j, item in enumerate(header):        work_sheet.cell(1, j + 1, item)    for row in range(len(data_list)):        for col in range(len(data_list[row])):            work_sheet.cell(row + 2, col + 1, data_list[row][col])    work_book.save(save_path)if __name__ == '__main__':    header = ['sort', 'type', 'tu', 'number', "model", "xinghao", "jihao", "chejia"]  # 固定的标题    result_list = read(r'C:\Users\Administrator\Desktop\存货档案11_4 (1)(1).XLS', header, 2, "Sheet1")    print(result_list)    data_list = [        [1.0, '海底世界', '1.mp4', '1.png', 'animal:动物,coral:珊瑚,crab:蟹,dolphin:海豚,jellyfish:水母,near:靠近,ocean:海洋,plant:植物,soft:软的,tail:尾巴', '神秘的海底世界,色彩斑斓,有好多好多的动物,海龟、海豚、海豹、水母…… 鲸是怎么跳得那么高的呢,一起去看看吧~', '免费'],        [2.0, '沙滩上的一天', '2.mp4', '2.png', 'brown:棕色的,cute:可爱,desk:书桌,great:美好的,late:晚的,seashell:贝壳,sky:天空,put:放,small:小的,star:星星', '沙滩,阳光,贝壳。我们会在沙滩上看到些什么呢?这一天的旅行是怎么度过呢?让我们一起度过这美好的一天吧。', '免费'],        [3.0, '沙滩,我来了!', '3.mp4', '3.png', 'beach:海滩,sea:海,shorts:短裤,summertime:夏日,swimsuit:泳衣,wave:波浪,wear:穿,sunscreen:防晒霜,tank top:背心,sunglasses:太阳眼镜', '炎炎夏日,最好的去处当然是沙滩啦。为了我们的沙滩旅行,我们需要准备些什么呢?在沙滩上,我们又会做些什么游戏呢?', '会员'], [4.0, '在度假中', '4.mp4', '4.png', 'beach:海滩,having a picnic:吃野餐,hungry:饿的,sand:沙子,sea:海,looking at:看着,sitting on:坐在…上,looking for:寻找着,together:一起,yard:院子', '度假时间到了,让我们一去去逛逛,沙滩,花园,大海,让我们一起去看看我们小伙伴的度假时光吧。', '会员']    ]    write(header, data_list, r'C:\Users\Administrator\Desktop\2.xlsx')
FileUtil
# -*- coding: utf-8 -*-import hashlibimport osimport os.pathfrom configparser import ConfigParser# 合并一个文件夹下多个文件def merge_files(directory_path, outfilePath):    """ directory_path: 文件夹目录           outfilePath: 输出文件路径,包括文件名 """    with open(outfilePath, 'a+', encoding="utf-8") as of:        # root 所指的是当前正在遍历的这个文件夹的本身的地址        # dirs 是一个 list ,内容是该文件夹中所有的目录的名字(不包括子目录)        # files 同样是 list , 内容是该文件夹中所有的文件(不包括子目录)        # 所以root+file = 目录下每一个文件路径        for root, dirs, files in os.walk(directory_path):            for file in files:                file_path = os.path.join(root, file)                with open(file_path, encoding="utf-8") as f:                    # 换行写入                    of.write(f.read() + "\n")# 删除文件夹下的空目录def remove_empty_dir(path):    for root, dirs, files in os.walk(path):        for item in dirs:            dir_path = os.path.join(root, item)            try:                os.rmdir(dir_path)            except:                pass# 追加内容到文件头部def append_head_file(file_path, content):    """ file_path: 文件路径         content: 追加的内容 """    with open(file_path, 'r+', encoding="utf-8") as f:        old = f.read()        f.seek(0)        f.write(content)        f.write(old)# 获取配置文件对象def read_config(ini_file):    cfg = ConfigParser()    cfg.read(ini_file, encoding="utf-8")    return cfg# 读取文件def read_file(src):    with open(src, "r", encoding="utf-8") as f:        return f.read()# 读取所有行def read_lines(src):    with open(src, "r", encoding="utf-8") as f:        return f.readlines()# m3u文件追加,无文件则创建def append_m3u8(file_path, img_url, group_name, video_name, video_url):    """ file_path 如 1.m3u """    content = '#EXTINF:-1 tvg-logo="{}" group-title="{}", {}\n{}\n\n'.format(img_url, group_name, video_name, video_url)    if not os.path.exists(file_path):        content = "#EXTM3U\n" + content    with open(file_path, mode="a", encoding="utf-8") as f:        f.write(content)# 追加文本文件def append_text(filename, text):    with open(filename, "a", encoding="utf-8") as f:        f.write(text)# 保存文本文件def save_text(filename, text):    with open(filename, "w", encoding="utf-8") as f:        f.write(text)# 保存二进制文件def save_file(filename, content):    with open(filename, "wb") as f:        f.write(content)# 文件(夹)是否存在def exist_path(path):    return os.path.exists(path)# 获取文件hashdef get_content_hash(file_path):    file_size = os.path.getsize(file_path)    sha1 = hashlib.sha1()    size = 5    temp = size  # 输出    with open(file_path, 'rb') as f:        while True:            chunk = f.read(1024 * 1024 * size)            if chunk:                print("文件大小{}M 已读取{}M".format(file_size, temp))                temp += size                sha1.update(chunk)            else:                break    return sha1.hexdigest()
JsonUtil
# -*- coding: utf-8 -*-import json# json字符串转对象def json2obj(text):    return json.loads(text)# 对象转json字符串def obj2json(obj, indent=2):    return json.dumps(obj, ensure_ascii=False, indent=indent)# 将json文件转成对象def read_json_file_2_obj(src):    with open(src, "r", encoding="utf-8") as f:        return json2obj(f.read())# 保存json字符串或者对象到文件中def save_json_file(obj, filename):    if type(obj) != str:        obj = obj2json(obj)    with open(filename, "w", encoding="utf-8") as f:        f.write(obj)
LogUtil
# -*- coding: utf-8 -*-import loggingimport osimport timedef getLogger(log_file_path):    """    :param log_file_path: 日志存放目录    :return: logger对象    """    logger = logging.getLogger(__name__)    logger.setLevel(level=logging.INFO)    if not os.path.exists(log_file_path):        os.makedirs(log_file_path)    log_path = log_file_path + "/output-" + time.strftime("%Y-%m-%d") + ".log"    handler = logging.FileHandler(log_path, encoding="utf-8")    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')    handler.setFormatter(formatter)    logger.addHandler(handler)    stream_handler = logging.StreamHandler()    stream_handler.setFormatter(formatter)    logger.addHandler(stream_handler)    return logger
OtherUtil
# -*- coding: utf-8 -*-import datetimeimport hashlibimport osimport os.pathimport randomimport timeimport tracebackfrom JsonUtil import *from RequestUtil import get_resp# 生成指定范围的日期def dateRange(beginDate, endDate, split_str):    """ beginDate: '2020/01/01'     endDate: '2020/01/08'  split_str: '/' """    dates = []    format_str = "%Y{}%m{}%d".format(split_str, split_str)    dt = datetime.datetime.strptime(beginDate, format_str)    date = beginDate[:]    while date <= endDate:        dates.append(date)        dt = dt + datetime.timedelta(1)        date = dt.strftime(format_str)    return dates# 打印异常信息def print_exc():    traceback.print_exc()# md5加密def md5(text):    m = hashlib.md5()    m.update(text.encode('UTF-8'))    return m.hexdigest()# 执行cmd命令def cmd(command):    os.system(command)# 获取时间戳 毫秒级 长度13def get_timestamp():    return str(int(time.time() * 1000))# 输出调试def print_debug(*obj):    print('\033[1;46m', *obj, '\033[0m')# 遍历打印def foreach(data_list):    for data in data_list:        print(data)# 是否包含中文def has_contain_chinese(check_str):    for ch in check_str:        if '\u4e00' <= ch <= '\u9fff':            return True    return False# 切换v2节点def v2ray_random():    os.system('taskkill /f /t /im v2rayN.exe')    json_file = r"F:/app/v2rayN-Core/guiNConfig.json"    data = read_json_file_2_obj(json_file)    try:        print("原节点:", data["vmess"][data["index"]]["remarks"], end=" ")    except:        pass    try:        data["index"] = random.randint(5, len(data["vmess"]))        print("变更为:", data["vmess"][data["index"]]["remarks"])    except:        pass    save_json_file(data, json_file)    os.startfile('F:/app/v2rayN-Core/v2rayN.exe')    proxies = {        'http': 'socks5://localhost:10808',        'https': 'socks5://localhost:10808'    }    try:        resp = get_resp("https://www.google.com.hk/", proxies=proxies, timeout=4)        if resp.status_code != 200:            print("节点连接错误,重新更换")            v2ray_random()    except:        print("节点连接异常,重新更换")        v2ray_random()# 计时def timing(target):    print("程序开始计时...")    start = time.time()    target()    print("程序运行耗时: {}s".format(round(time.time() - start, 2)))
RequestUtil
# -*- coding: utf-8 -*-import refrom urllib.parse import quote, unquoteimport requestsfrom lxml import etreefrom requests import utilsfrom retrying import retrydefault_headers = {    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36",}# 获取responsedef get_resp(url, headers=None, method="get", **kwargs):    if headers is None:        headers = default_headers    return requests.request(method=method, url=url, headers=headers, **kwargs)# 获取response 重试999次@retry(stop_max_attempt_number=999)def get_resp_retry(url, headers=None, method="get", **kwargs):    return get_resp(url, headers, method, **kwargs)# 保存文件,图片,视频等二进制流def save_file_with_url(url, filename, headers=None):    with open(filename, "wb") as f:        f.write(get_resp(url, headers).content)# 保存文本,网页def save_text_with_url(url, filename, headers=None):    with open(filename, "w", encoding="utf-8") as f:        f.write(get_resp(url, headers).text)# 获取xpath对象def get_xpath(html_str):    return etree.HTML(html_str)# 格式化字符串def formatText(text):    return re.sub(r"[/\\:*?\"<>|]", "_", text)# cookie字符串转对象def cookie_dict(resp):    return utils.dict_from_cookiejar(resp.cookies)# 对象转cookie字符串def dict_to_cookie(cookie_dict):    cookies_str = ""    for cookie in cookie_dict:        cookies_str += cookie + "=" + cookie_dict[cookie] + "; "    return cookies_str[0:-1]# url参数编码def url_encode(value):    return quote(value)# url参数解码def url_decode(value):    return unquote(value)# 将浏览器复制的headers参数 格式化成dictdef header_format(headers_str):    return {x.split(':')[0].strip(): ("".join(x.split(':')[1:])).strip().replace('//', "://") for x in            headers_str.strip().split('\n')}
部署全局工具类
import shutilif __name__ == '__main__':    py_list = ["FileUtil.py", "JsonUtil.py", "OtherUtil.py", "RequestUtil.py", "DBUtil.py", "LogUtil.py",               "XlsUtil.py", "XlsxUtil.py",  "SqliteUtil.py"]    dic = "C:/Users/Administrator/AppData/Local/Programs/Python/Python38/Lib/site-packages"    for py in py_list:        print("部署:", py)        shutil.copy(py, dic)
引入方式
from FileUtil import *from JsonUtil import *from OtherUtil import *from RequestUtil import *