使用方法:
在Vscode中,会有代码提示,很容易看懂
import mysqldb
db = mysqldb.connect_db(mysqldb.conn_confing)
# 执行sql,会有提示
db.query("update table1 set col='' ")
# 获取多个结果,返回 dict
rows = db.fetch_rows(table='table')
源代码 mysqldb.py
from pymysql import (connect, cursors, err, escape_sequence)
conn_confing = {
'host': 'localhost',
'port': '3306',
'user': 'root',
'password': 'root',
'db': 'your_db',
'charset': 'utf-8'
}
def connect_db(conn_confing):
# msyql dababase connection info
dbconn = MYSQL(
dbhost=conn_confing.get('host'),
dbport=conn_confing.get('port'),
dbuser=conn_confing.get('user'),
dbpwd=conn_confing.get('password'),
dbname=conn_confing.get('db'),
dbcharset=conn_confing.get('charset')
)
return dbconn
def connect_ssdc(conn_confing):
"""Connect to the database return SSDictCursor dbsession"""
connection = connect(
host=conn_confing.get('host'),
port=int(conn_confing.get('port')) or 3306,
user=conn_confing.get('user'),
password=conn_confing.get('password'),
db=conn_confing.get('db'),
charset=conn_confing.get('charset'),
cursorclass=cursors.SSDictCursor)
return connection
class MYSQL:
"""A Friendly pymysql Class, Provide CRUD functionality"""
def __init__(self, dbhost, dbuser, dbpwd, dbname, dbcharset='utf-8', dbport=3306,):
self.dbhost = dbhost
self.dbport = int(dbport)
self.dbuser = dbuser
self.dbpwd = dbpwd
self.dbname = dbname
self.dbcharset = dbcharset
self.connection = self.session()
def session(self):
"""Connect to the database return dbsession"""
connection = connect(
host=self.dbhost,
port=self.dbport,
user=self.dbuser,
password=self.dbpwd,
db=self.dbname,
# charset=self.dbcharset,
cursorclass=cursors.DictCursor)
return connection
def insert(self, table, data):
"""mysql insert() function"""
with self.connection.cursor() as cursor:
params = self.join_field_value(data)
sql = "INSERT IGNORE INTO {table} SET {params}".format(
table=table, params=params)
# print(sql)
try:
cursor.execute(sql, tuple(data.values()))
last_id = cursor.lastrowid
self.connection.commit()
except Exception as err:
print(err)
return last_id
def bulk_insert(self, table, data):
assert isinstance(data, list) and data != [], "data_format_error"
with self.connection.cursor() as cursor:
params = []
for param in data:
params.append(escape_sequence(param.values(), 'utf-8'))
values = ', '.join(params)
fields = ', '.join('`{}`'.format(x) for x in param.keys())
sql = u"INSERT IGNORE INTO {table} ({fields}) VALUES {values}".format(
fields=fields, table=table, values=values)
cursor.execute(sql)
last_id = cursor.lastrowid
self.connection.commit()
return last_id
def delete(self, table, condition=None, limit=None):
"""
mysql delete() function
sql.PreparedStatement method
"""
with self.connection.cursor() as cursor:
prepared = []
if not condition:
where = '1'
elif isinstance(condition, dict):
where = self.join_field_value(condition, ' AND ')
prepared.extend(condition.values())
else:
where = condition
limits = "LIMIT {limit}".format(limit=limit) if limit else ""
sql = "DELETE FROM {table} WHERE {where} {limits}".format(
table=table, where=where, limits=limits)
if not prepared:
result = cursor.execute(sql)
else:
result = cursor.execute(sql, tuple(prepared))
self.connection.commit()
return result
def update(self, table, data, condition=None):
"""
mysql update() function
Use sql.PreparedStatement method
"""
with self.connection.cursor() as cursor:
prepared = []
params = self.join_field_value(data)
prepared.extend(data.values())
if not condition:
where = '1'
elif isinstance(condition, dict):
where = self.join_field_value(condition, ' AND ')
prepared.extend(condition.values())
else:
where = condition
sql = "UPDATE IGNORE {table} SET {params} WHERE {where}".format(
table=table, params=params, where=where)
# check PreparedStatement
if not prepared:
result = cursor.execute(sql)
else:
result = cursor.execute(sql, tuple(prepared))
self.connection.commit()
return result
def count(self, table, condition=None):
"""
count database record
Use sql.PreparedStatement method
"""
with self.connection.cursor() as cursor:
prepared = []
if not condition:
where = '1'
elif isinstance(condition, dict):
where = self.join_field_value(condition, ' AND ')
prepared.extend(condition.values())
else:
where = condition
sql = "SELECT COUNT(*) as cnt FROM {table} WHERE {where}".format(
table=table, where=where)
if not prepared:
cursor.execute(sql)
else:
cursor.execute(sql, tuple(prepared))
self.connection.commit()
return cursor.fetchone().get('cnt')
def fetch_rows(self, table, fields=None, condition=None, order=None, limit=None, fetchone=False):
"""
mysql select() function
Use sql.PreparedStatement method
"""
with self.connection.cursor() as cursor:
prepared = []
if not fields:
fields = '*'
elif isinstance(fields, tuple) or isinstance(fields, list):
fields = '`{0}`'.format('`, `'.join(fields))
else:
fields = fields
if not condition:
where = '1'
elif isinstance(condition, dict):
where = self.join_field_value(condition, ' AND ')
prepared.extend(condition.values())
else:
where = condition
if not order:
orderby = ''
else:
orderby = 'ORDER BY {order}'.format(order=order)
limits = "LIMIT {limit}".format(limit=limit) if limit else ""
sql = "SELECT {fields} FROM {table} WHERE {where} {orderby} {limits}".format(
fields=fields, table=table, where=where, orderby=orderby, limits=limits)
if not prepared:
cursor.execute(sql)
else:
cursor.execute(sql, tuple(prepared))
self.connection.commit()
return cursor.fetchone() if fetchone else cursor.fetchall()
def query(self, sql, fetchone=False, execute=False):
"""execute custom sql query"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
self.connection.commit()
if execute:
return
return cursor.fetchone() if fetchone else cursor.fetchall()
def join_field_value(self, data, glue=', '):
sql = comma = ''
for key in data.keys():
sql += "{}`{}` = %s".format(comma, key)
comma = glue
return sql
def close(self):
if getattr(self, 'connection', 0):
return self.connection.close()
def __del__(self):
"""close mysql database connection"""
self.close()