DBUtil
import pymysql
class DBUtil():
def __init__(self, dbName):
self.host = '127.0.0.1'
self.port = 3306
self.user = 'root'
self.password = '123456'
self.db = dbName
def __getConnect(self):
return pymysql.connect(self.host, self.user, self.password, self.db)
# 根据dict插入一条数据
def insertOne(self, dict_data, table_name):
global insert_sql
conn = self.__getConnect()
keyStr = '' # 列的字段
valueStr = '' # 行字段
for key in dict_data:
keyStr += ' ' + key + ','
valueStr = valueStr + "%(" + key + ")s,"
keyStr = keyStr.rstrip(',')
valueStr = valueStr.rstrip(',')
try:
cursor = conn.cursor()
insert_sql = "insert into " + table_name + "(" + keyStr + " ) values( " + valueStr + " )"
result = cursor.execute(insert_sql, dict_data)
conn.commit()
cursor.close()
conn.close()
print("执行sql:", insert_sql)
print("插入单条数据成功,影响行数:", result)
except Exception as e:
conn.rollback()
print("DBUtil.insertOne()插入单条操作失败:", e)
print(insert_sql)
raise e
# 批量插入
def insertList(self, dict_data_list, table_name):
for dic_data in dict_data_list:
self.insertOne(dic_data, table_name)
# 根据sql查询 返回嵌套元组 可自行强转list,dict
def select(self, sql):
conn = self.__getConnect()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 执行SQL语句
cursor.execute(sql)
conn.commit()
# 获取所有记录列表
results = cursor.fetchall()
return results
except Exception as e:
print("DBUtil.select()查询单条记录失败:", e)
raise e
finally:
cursor.close()
conn.close()
# 查个数
def selectCount(self, tableName, whereSql):
sql = "select count(*) from " + tableName + " where "
whereSql = whereSql.strip()
if "where" == whereSql[0:5]:
whereSql = whereSql.replace("where", "").strip()
sql += whereSql
conn = self.__getConnect()
cursor = conn.cursor()
try:
# 执行SQL语句
cursor.execute(sql)
conn.commit()
# 获取所有记录列表
results = cursor.fetchall()
return results[0][0]
except Exception as e:
print("查询记录数失败:", e)
print("DBUtil.selectCount()查询记录数失败:", e)
raise e
finally:
cursor.close()
conn.close()
# 其他
def execute(self, sql):
conn = self.__getConnect()
cursor = conn.cursor()
try:
cursor.execute(sql)
print("执行sql:", sql)
conn.commit()
except Exception as e:
print("执行sql失败:", e)
print("DBUtil.execute()执行失败:", e)
raise e
finally:
cursor.close()
conn.close()
SqliteUtil
import sqlite3
class sqliteUtil():
def __init__(self, dbFile):
self.dbFile = dbFile
def __getConnect(self):
return sqlite3.connect(self.dbFile)
def dict_factory(self, cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# 根据sql查询 返回嵌套元组 可自行强转list,dict
def select(self, sql):
conn = self.__getConnect()
conn.row_factory = self.dict_factory
cursor = conn.cursor()
try:
# 执行SQL语句
cursor.execute(sql)
conn.commit()
# 获取所有记录列表
results = cursor.fetchall()
return results
except:
print("Error: unable to fecth data")
cursor.close()
conn.close()
# 查个数
def selectCount(self, tableName, whereSql):
sql = "select count(*) from " + tableName + " where "
whereSql = whereSql.strip()
if "where" == whereSql[0:5]:
whereSql = whereSql.replace("where", "").strip()
sql += whereSql
conn = self.__getConnect()
cursor = conn.cursor()
try:
# 执行SQL语句
cursor.execute(sql)
conn.commit()
# 获取所有记录列表
results = cursor.fetchall()
return results[0][0]
except:
print("Error: unable to fecth data")
cursor.close()
conn.close()
# 其他
def execute(self, sql):
conn = self.__getConnect()
cursor = conn.cursor()
cursor.execute(sql)
print("执行sql")
conn.commit()
cursor.close()
conn.close()
# 根据dict插入一条数据
def insertOne(self, dict_data, table_name):
global insert_sql
conn = self.__getConnect()
keyStr = '' # 列的字段
valueStr = '' # 行字段
for key in dict_data:
keyStr += ' ' + key + ','
# keyStr += '? ,'
valueStr = valueStr + "'" + dict_data[key] + "',"
keyStr = keyStr.rstrip(',')
valueStr = valueStr.rstrip(',')
try:
cursor = conn.cursor()
insert_sql = "insert into " + table_name + "(" + keyStr + " ) values( " + valueStr + " )"
result = cursor.execute(insert_sql, dict_data)
conn.commit()
cursor.close()
conn.close()
print("执行sql:", insert_sql)
print("插入单条数据成功,影响行数:", result.rowcount)
except Exception as e:
conn.rollback()
print("sqliteUtil.insertOne()插入单条操作失败:", e)
print(insert_sql)
raise e
# 批量插入
def insertList(self, dict_data_list, table_name):
for dic_data in dict_data_list:
self.insertOne(dic_data, table_name)
# 创建表
def createTable(self, sql):
sql_demo = '''
CREATE TABLE password_library(
name text not null,
url text not null,
username text not null,
password text not null,
note text null,
update_time text null,
primary key (name, url, username, password)
);
'''
con = self.__getConnect()
cur = con.cursor()
cur.execute(sql)
XlsUtil
# -*- coding: utf-8 -*-
import xlrd
import xlwt
def read(path):
"""
:param path 文件路径
"""
workbook = xlrd.open_workbook(path)
data_sheet = workbook.sheets()[0]
row_num = data_sheet.nrows
col_num = data_sheet.ncols
data = []
for i in range(row_num):
row_data = []
for j in range(col_num):
if (i == 0):
continue
row_data.append(data_sheet.cell_value(i, j))
data.append(row_data)
print(data)
return data
def write(data, header, out_path):
"""
:param data 表格数据
:param header 表头数据
:param out_path 输出路径
"""
wbk = xlwt.Workbook()
sheet = wbk.add_sheet('Sheet1', cell_overwrite_ok=True)
for i in range(len(header)):
sheet.write(0, i, header[i])
for row in range(len(data)):
for col in range(len(data[row])):
sheet.write(row + 1, col, data[row][col])
print(out_path)
wbk.save(out_path)
# test read and write
if __name__ == '__main__':
write([
[1.0, '海底世界', '1.mp4', '1.png',
'animal:动物,coral:珊瑚,crab:蟹,dolphin:海豚,jellyfish:水母,near:靠近,ocean:海洋,plant:植物,soft:软的,tail:尾巴',
'神秘的海底世界,色彩斑斓,有好多好多的动物,海龟、海豚、海豹、水母…… 鲸是怎么跳得那么高的呢,一起去看看吧~', '免费'],
[2.0, '沙滩上的一天', '2.mp4', '2.png',
'brown:棕色的,cute:可爱,desk:书桌,great:美好的,late:晚的,seashell:贝壳,sky:天空,put:放,small:小的,star:星星',
'沙滩,阳光,贝壳。我们会在沙滩上看到些什么呢?这一天的旅行是怎么度过呢?让我们一起度过这美好的一天吧。',
'免费'],
[3.0, '沙滩,我来了!', '3.mp4', '3.png',
'beach:海滩,sea:海,shorts:短裤,summertime:夏日,swimsuit:泳衣,wave:波浪,wear:穿,sunscreen:防晒霜,tank top:背心,sunglasses:太阳眼镜',
'炎炎夏日,最好的去处当然是沙滩啦。为了我们的沙滩旅行,我们需要准备些什么呢?在沙滩上,我们又会做些什么游戏呢?', '会员'], [4.0, '在度假中', '4.mp4', '4.png',
'beach:海滩,having a picnic:吃野餐,hungry:饿的,sand:沙子,sea:海,looking at:看着,sitting on:坐在…上,looking for:寻找着,together:一起,yard:院子',
'度假时间到了,让我们一去去逛逛,沙滩,花园,大海,让我们一起去看看我们小伙伴的度假时光吧。',
'会员']],
['编号', '名称', '视频文件', '封面文件', '单词', '简介', '权限'],
'test_new2.xlsx')
XlsxUtil
# -*- coding: utf-8 -*-
from openpyxl import load_workbook, Workbook
def read(file_path, header=None, header_row=0, sheet_name=None):
"""
:param file_path 文件全路径
:param header 表头["name","age","count"]
:param header_row 表头行号
:param sheet_name 工作表名称,为空则默认
"""
work_book = load_workbook(file_path)
if sheet_name is None:
work_sheet = work_book.active
else:
work_sheet = work_book[sheet_name]
data_list = []
# 遍历所有行
for i, row in enumerate(work_sheet.rows):
# 跳过表头行
if i > header_row:
if header is None:
data_list.append([cell.value for cell in row])
else:
data_list.append(dict(zip(header, [cell.value for cell in row])))
return data_list
def write(header, data_list, save_path):
"""
:param file_path 文件全路径
:param header 表头["name","age","count"]
:param header_row 表头行号
:param sheet_name 工作表名称,为空则默认
"""
work_book = Workbook()
work_sheet = work_book.active
for j, item in enumerate(header):
work_sheet.cell(1, j + 1, item)
for row in range(len(data_list)):
for col in range(len(data_list[row])):
work_sheet.cell(row + 2, col + 1, data_list[row][col])
work_book.save(save_path)
if __name__ == '__main__':
header = ['sort', 'type', 'tu', 'number', "model", "xinghao", "jihao", "chejia"] # 固定的标题
result_list = read(r'C:\Users\Administrator\Desktop\存货档案11_4 (1)(1).XLS', header, 2, "Sheet1")
print(result_list)
data_list = [
[1.0, '海底世界', '1.mp4', '1.png', 'animal:动物,coral:珊瑚,crab:蟹,dolphin:海豚,jellyfish:水母,near:靠近,ocean:海洋,plant:植物,soft:软的,tail:尾巴', '神秘的海底世界,色彩斑斓,有好多好多的动物,海龟、海豚、海豹、水母…… 鲸是怎么跳得那么高的呢,一起去看看吧~', '免费'],
[2.0, '沙滩上的一天', '2.mp4', '2.png', 'brown:棕色的,cute:可爱,desk:书桌,great:美好的,late:晚的,seashell:贝壳,sky:天空,put:放,small:小的,star:星星', '沙滩,阳光,贝壳。我们会在沙滩上看到些什么呢?这一天的旅行是怎么度过呢?让我们一起度过这美好的一天吧。', '免费'],
[3.0, '沙滩,我来了!', '3.mp4', '3.png', 'beach:海滩,sea:海,shorts:短裤,summertime:夏日,swimsuit:泳衣,wave:波浪,wear:穿,sunscreen:防晒霜,tank top:背心,sunglasses:太阳眼镜', '炎炎夏日,最好的去处当然是沙滩啦。为了我们的沙滩旅行,我们需要准备些什么呢?在沙滩上,我们又会做些什么游戏呢?', '会员'], [4.0, '在度假中', '4.mp4', '4.png', 'beach:海滩,having a picnic:吃野餐,hungry:饿的,sand:沙子,sea:海,looking at:看着,sitting on:坐在…上,looking for:寻找着,together:一起,yard:院子', '度假时间到了,让我们一去去逛逛,沙滩,花园,大海,让我们一起去看看我们小伙伴的度假时光吧。', '会员']
]
write(header, data_list, r'C:\Users\Administrator\Desktop\2.xlsx')
FileUtil
# -*- coding: utf-8 -*-
import hashlib
import os
import os.path
from configparser import ConfigParser
# 合并一个文件夹下多个文件
def merge_files(directory_path, outfilePath):
""" directory_path: 文件夹目录 outfilePath: 输出文件路径,包括文件名 """
with open(outfilePath, 'a+', encoding="utf-8") as of:
# root 所指的是当前正在遍历的这个文件夹的本身的地址
# dirs 是一个 list ,内容是该文件夹中所有的目录的名字(不包括子目录)
# files 同样是 list , 内容是该文件夹中所有的文件(不包括子目录)
# 所以root+file = 目录下每一个文件路径
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
with open(file_path, encoding="utf-8") as f:
# 换行写入
of.write(f.read() + "\n")
# 删除文件夹下的空目录
def remove_empty_dir(path):
for root, dirs, files in os.walk(path):
for item in dirs:
dir_path = os.path.join(root, item)
try:
os.rmdir(dir_path)
except:
pass
# 追加内容到文件头部
def append_head_file(file_path, content):
""" file_path: 文件路径 content: 追加的内容 """
with open(file_path, 'r+', encoding="utf-8") as f:
old = f.read()
f.seek(0)
f.write(content)
f.write(old)
# 获取配置文件对象
def read_config(ini_file):
cfg = ConfigParser()
cfg.read(ini_file, encoding="utf-8")
return cfg
# 读取文件
def read_file(src):
with open(src, "r", encoding="utf-8") as f:
return f.read()
# 读取所有行
def read_lines(src):
with open(src, "r", encoding="utf-8") as f:
return f.readlines()
# m3u文件追加,无文件则创建
def append_m3u8(file_path, img_url, group_name, video_name, video_url):
""" file_path 如 1.m3u """
content = '#EXTINF:-1 tvg-logo="{}" group-title="{}", {}\n{}\n\n'.format(img_url, group_name, video_name, video_url)
if not os.path.exists(file_path):
content = "#EXTM3U\n" + content
with open(file_path, mode="a", encoding="utf-8") as f:
f.write(content)
# 追加文本文件
def append_text(filename, text):
with open(filename, "a", encoding="utf-8") as f:
f.write(text)
# 保存文本文件
def save_text(filename, text):
with open(filename, "w", encoding="utf-8") as f:
f.write(text)
# 保存二进制文件
def save_file(filename, content):
with open(filename, "wb") as f:
f.write(content)
# 文件(夹)是否存在
def exist_path(path):
return os.path.exists(path)
# 获取文件hash
def get_content_hash(file_path):
file_size = os.path.getsize(file_path)
sha1 = hashlib.sha1()
size = 5
temp = size # 输出
with open(file_path, 'rb') as f:
while True:
chunk = f.read(1024 * 1024 * size)
if chunk:
print("文件大小{}M 已读取{}M".format(file_size, temp))
temp += size
sha1.update(chunk)
else:
break
return sha1.hexdigest()
JsonUtil
# -*- coding: utf-8 -*-
import json
# json字符串转对象
def json2obj(text):
return json.loads(text)
# 对象转json字符串
def obj2json(obj, indent=2):
return json.dumps(obj, ensure_ascii=False, indent=indent)
# 将json文件转成对象
def read_json_file_2_obj(src):
with open(src, "r", encoding="utf-8") as f:
return json2obj(f.read())
# 保存json字符串或者对象到文件中
def save_json_file(obj, filename):
if type(obj) != str:
obj = obj2json(obj)
with open(filename, "w", encoding="utf-8") as f:
f.write(obj)
LogUtil
# -*- coding: utf-8 -*-
import logging
import os
import time
def getLogger(log_file_path):
"""
:param log_file_path: 日志存放目录
:return: logger对象
"""
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
if not os.path.exists(log_file_path):
os.makedirs(log_file_path)
log_path = log_file_path + "/output-" + time.strftime("%Y-%m-%d") + ".log"
handler = logging.FileHandler(log_path, encoding="utf-8")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
return logger
OtherUtil
# -*- coding: utf-8 -*-
import datetime
import hashlib
import os
import os.path
import random
import time
import traceback
from JsonUtil import *
from RequestUtil import get_resp
# 生成指定范围的日期
def dateRange(beginDate, endDate, split_str):
""" beginDate: '2020/01/01' endDate: '2020/01/08' split_str: '/' """
dates = []
format_str = "%Y{}%m{}%d".format(split_str, split_str)
dt = datetime.datetime.strptime(beginDate, format_str)
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime(format_str)
return dates
# 打印异常信息
def print_exc():
traceback.print_exc()
# md5加密
def md5(text):
m = hashlib.md5()
m.update(text.encode('UTF-8'))
return m.hexdigest()
# 执行cmd命令
def cmd(command):
os.system(command)
# 获取时间戳 毫秒级 长度13
def get_timestamp():
return str(int(time.time() * 1000))
# 输出调试
def print_debug(*obj):
print('\033[1;46m', *obj, '\033[0m')
# 遍历打印
def foreach(data_list):
for data in data_list:
print(data)
# 是否包含中文
def has_contain_chinese(check_str):
for ch in check_str:
if '\u4e00' <= ch <= '\u9fff':
return True
return False
# 切换v2节点
def v2ray_random():
os.system('taskkill /f /t /im v2rayN.exe')
json_file = r"F:/app/v2rayN-Core/guiNConfig.json"
data = read_json_file_2_obj(json_file)
try:
print("原节点:", data["vmess"][data["index"]]["remarks"], end=" ")
except:
pass
try:
data["index"] = random.randint(5, len(data["vmess"]))
print("变更为:", data["vmess"][data["index"]]["remarks"])
except:
pass
save_json_file(data, json_file)
os.startfile('F:/app/v2rayN-Core/v2rayN.exe')
proxies = {
'http': 'socks5://localhost:10808',
'https': 'socks5://localhost:10808'
}
try:
resp = get_resp("https://www.google.com.hk/", proxies=proxies, timeout=4)
if resp.status_code != 200:
print("节点连接错误,重新更换")
v2ray_random()
except:
print("节点连接异常,重新更换")
v2ray_random()
# 计时
def timing(target):
print("程序开始计时...")
start = time.time()
target()
print("程序运行耗时: {}s".format(round(time.time() - start, 2)))
RequestUtil
# -*- coding: utf-8 -*-
import re
from urllib.parse import quote, unquote
import requests
from lxml import etree
from requests import utils
from retrying import retry
default_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36",
}
# 获取response
def get_resp(url, headers=None, method="get", **kwargs):
if headers is None:
headers = default_headers
return requests.request(method=method, url=url, headers=headers, **kwargs)
# 获取response 重试999次
@retry(stop_max_attempt_number=999)
def get_resp_retry(url, headers=None, method="get", **kwargs):
return get_resp(url, headers, method, **kwargs)
# 保存文件,图片,视频等二进制流
def save_file_with_url(url, filename, headers=None):
with open(filename, "wb") as f:
f.write(get_resp(url, headers).content)
# 保存文本,网页
def save_text_with_url(url, filename, headers=None):
with open(filename, "w", encoding="utf-8") as f:
f.write(get_resp(url, headers).text)
# 获取xpath对象
def get_xpath(html_str):
return etree.HTML(html_str)
# 格式化字符串
def formatText(text):
return re.sub(r"[/\\:*?\"<>|]", "_", text)
# cookie字符串转对象
def cookie_dict(resp):
return utils.dict_from_cookiejar(resp.cookies)
# 对象转cookie字符串
def dict_to_cookie(cookie_dict):
cookies_str = ""
for cookie in cookie_dict:
cookies_str += cookie + "=" + cookie_dict[cookie] + "; "
return cookies_str[0:-1]
# url参数编码
def url_encode(value):
return quote(value)
# url参数解码
def url_decode(value):
return unquote(value)
# 将浏览器复制的headers参数 格式化成dict
def header_format(headers_str):
return {x.split(':')[0].strip(): ("".join(x.split(':')[1:])).strip().replace('//', "://") for x in
headers_str.strip().split('\n')}
部署全局工具类
import shutil
if __name__ == '__main__':
py_list = ["FileUtil.py", "JsonUtil.py", "OtherUtil.py", "RequestUtil.py", "DBUtil.py", "LogUtil.py",
"XlsUtil.py", "XlsxUtil.py", "SqliteUtil.py"]
dic = "C:/Users/Administrator/AppData/Local/Programs/Python/Python38/Lib/site-packages"
for py in py_list:
print("部署:", py)
shutil.copy(py, dic)
引入方式
from FileUtil import *
from JsonUtil import *
from OtherUtil import *
from RequestUtil import *