github:https://github.com/PyMySQL/PyMySQL
Python3 MySQL 数据库连接 - PyMySQL 驱动:Python3 MySQL 数据库连接 – PyMySQL 驱动 | 菜鸟教程
pymysql 是线程安全的( 搜索 thread,可以看到 threadsafe=1,同时函数 threadsafe() 返回 True ):[https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init__.py](https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init.py)
Mysql 如果数据存在则更新,不存在则插入
:Mysql: 如果数据存在则更新,不存在则插入_飞蛾逐月 - CSDN 博客_mysql 存在更新不存在写入
1、PyMySQL 安装
在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。
PyMySQL 下载地址:GitHub - PyMySQL/PyMySQL: Pure Python MySQL Client
安装 PyMySQL 的 Python 包:pip3 install PyMySQL
2、数据库连接
连接数据库前,请先确认以下事项:
- 已经创建了数据库 TESTDB.
- 在 TESTDB 数据库中您已经创建了表 EMPLOYEE
- EMPLOYEE 表字段为 FIRST_NAME, LAST_NAME, AGE, SEX 和 INCOME。
- 连接数据库 TESTDB 使用的用户名为 “testuser” ,密码为 “test123”,你可以可以自己设定或者直接使用 root 用户名及其密码,Mysql 数据库用户授权请使用 Grant 命令。
- 已经安装了 Python MySQLdb 模块。
- 如果您对 sql 语句不熟悉,可以访问 SQL 基础教程
示 例:
链接 Mysql 的 TESTDB 数据库:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )cursor.execute("SELECT VERSION()")print ("Database version : %s " % data)
3、使用
创建数据库表
如果数据库连接存在我们可以使用execute()方法来为数据库创建表,如下所示创建表EMPLOYEE:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")sql = """CREATE TABLE EMPLOYEE ( FIRST_NAME CHAR(20) NOT NULL,
查询 数据
Python 查询 Mysql 使用 fetchone() 方法获取单条数据,使用 fetchall() 方法获取多条数据。
- fetchone(): 该方法获取下一个查询结果集。结果集是一个对象
- fetchall():接收全部的返回结果行.
- rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。
查询 EMPLOYEE 表中 salary(工资)字段大于 1000 的所有数据:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )sql = "SELECT * FROM EMPLOYEE \ WHERE INCOME > %s" % (1000) results = cursor.fetchall()print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \ (fname, lname, age, sex, income ))print ("Error: unable to fetch data")
示例:
def __init__(self, host='localhost', port=3306, db='', user='root', passwd='root', charset='utf8'): self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset) self.cur = self.conn.cursor(cursor = pymysql.cursors.DictCursor)def __exit__(self, exc_type, exc_val, exc_tb):if __name__ == '__main__':with DB(host='192.168.68.129',user='root',passwd='zhumoran',db='text3') as db: db.execute('select * from course')
插入 数据
执行 SQL INSERT 语句向表 EMPLOYEE 插入记录:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )sql = """INSERT INTO EMPLOYEE(FIRST_NAME, LAST_NAME, AGE, SEX, INCOME) VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
以上例子也可以写成如下形式:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \ LAST_NAME, AGE, SEX, INCOME) \ VALUES ('%s', '%s', %s, '%s', %s)" % \ ('Mac', 'Mohan', 20, 'M', 2000)
以下代码使用变量向SQL语句中传递参数:
..................................con.execute('insert into Login values( %s, %s)' % \..................................
单条插入数据:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \ LAST_NAME, AGE, SEX, INCOME) \ VALUES ('%s', '%s', %s, '%s', %s)" % \ ('Mac', 'Mohan', 20, 'M', 2000)
批量插入数据:
注意:批量插入数据 与 单条插入数据的区别:
- 批量插入:VALUES (%s, %s, %s, %s, %s,) 里面 不用引号
- 单条插入:VALUES (‘%s’, ‘%s’, ‘%s’, ‘%s’, ‘%s’) 里面 需要引号
db = pymysql.connect("localhost","root","123","testdb")sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \ LAST_NAME, AGE, SEX, INCOME) \val = (('li', 'si', 16, 'F', 1000), ('Bruse', 'Jerry', 30, 'F', 3000), ('Lee', 'Tomcat', 40, 'M', 4000), ('zhang', 'san', 18, 'M', 1500)) cursor.executemany(sql,val)
更新 数据
更新操作用于更新数据表的数据,以下实例将 TESTDB 表中 SEX 为 ‘M’ 的 AGE 字段递增 1:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
批量 更新
使用 pymysql 的 course.executemany(sql, update_list) 进行批量更新
- sql:更新一条的 sql 语句模板;
- update_list:一个列表套元组的结构;
示 例:
db = pymysql.connect(user='root', password='mysql', database='test', host='127.0.0.1', port=3306, charset='utf8mb4')name_list = ["re", "gh", "ds", "D"] age_list = ["10", "20", "30", "40"] id_list = ["1", "2", "3", "4"] val_list = [[name_list[i], age_list[i], id_list[i]] for i in range(len(id_list))]with db.cursor() as cursor: sql = "UPDATE test SET name=(%s), age=(%s) WHERE id=(%s)" cursor.executemany(sql, val_list)
pymysql 批量 —- 增、删、改、查
注意:插入数字也是 %s
conn= pymysql.connect(host='rm-xxx.mysql.rds.aliyuncs.com', sql= 'DELETE FROM user_like WHERE user_id=%s and like_post_id=%s' ret= cursor.executemany(sql, ((1,2), (3,4), (5,6)))print("batch Exception:", e)
删除 数据
删除操作用于删除数据表中的数据,以下实例演示了删除数据表 EMPLOYEE 中 AGE 大于 20 的所有数据:
db = pymysql.connect("localhost","testuser","test123","TESTDB" )sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
执行 事务
事务机制可以确保数据一致性。
对于支持事务的数据库, 在 Python 数据库编程中,当游标建立之时,就自动开始了一个隐形的数据库事务。commit()方法游标的所有更新操作,rollback()方法回滚当前游标的所有操作。每一个方法都开始了一个新的事务。
事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。
- 原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
- 一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
- 隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
- 持久性(durability)。持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。
示例:
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
线程安全 pymysqlpool
from pymysqlpool import ConnectionPool'pool_resize_boundary': 50,'enable_auto_resize': True,logging.basicConfig(format='[%(asctime)s][%(name)s][%(module)s.%(lineno)d][%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', pool = ConnectionPool(**config)def test_pool_cursor(cursor_obj=None): cursor_obj = cursor_obj or connection_pool().cursor()with cursor_obj as cursor:print('Truncate table user') cursor.execute('TRUNCATE user')print('Insert one record') result = cursor.execute('INSERT INTO user (name, age) VALUES (%s, %s)', ('Jerry', 20))print(result, cursor.lastrowid)print('Insert multiple records') users = [(name, age) for name in ['Jacky', 'Mary', 'Micheal'] for age in range(10, 15)] result = cursor.executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)print('View items in table user') cursor.execute('SELECT * FROM user')print('Update the name of one user in the table') cursor.execute('UPDATE user SET name="Chris", age=29 WHERE id = 16') cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 1')print('Delete the last record') cursor.execute('DELETE FROM user WHERE id = 16')def test_pool_connection():with connection_pool().connection(autocommit=True) as conn: test_pool_cursor(conn.cursor())with connection_pool().connection() as conn: df = pd.read_sql('SELECT * FROM user', conn)with connection_pool().cursor() as cursor: cursor.execute('TRUNCATE user')def add_users(users, conn): c.cursor().executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)with connection_pool().connection() as conn:def add_user(user, conn=None): c.cursor().execute('INSERT INTO user (name, age) VALUES (%s, %s)', user)with connection_pool().connection() as conn:with connection_pool().cursor() as cursor: cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 5')for x in sorted(cursor, key=lambda d: d['id']): name = "".join(random.sample(string.ascii_lowercase, random.randint(4, 10))).capitalize() age = random.randint(10, 40)def worker(id_, batch_size=1, explicit_conn=True):print('[{}] Worker started...'.format(id_))for _ in range(batch_size): add_user(random_user(), conn)with connection_pool().connection() as c:print('[{}] Worker finished...'.format(id_))def bulk_worker(id_, batch_size=1, explicit_conn=True):print('[{}] Bulk worker started...'.format(id_)) add_users([random_user() for _ in range(batch_size)], conn)with connection_pool().connection() as c:print('[{}] Worker finished...'.format(id_))def test_with_single_thread(batch_number, batch_size, explicit_conn=False, bulk_insert=False): wk = worker if not bulk_insert else bulk_workerfor i in range(batch_number): wk(i, batch_size, explicit_conn)def test_with_multi_threads(batch_number=1, batch_size=1000, explicit_conn=False, bulk_insert=False): wk = worker if not bulk_insert else bulk_workerfor i in range(batch_number): t = threading.Thread(target=wk, args=(i, batch_size, explicit_conn)) [t.join() for t in threads]if __name__ == '__main__': start = time.perf_counter() test_with_multi_threads(20, 10, True, bulk_insert=True) test_with_single_thread(1, 10, True, bulk_insert=True) elapsed = time.perf_counter() - startprint('Elapsed time is: "{}"'.format(elapsed))
https://blog.csdn.net/freeking101/article/details/51595235 ```
