信息
解释
- Engine:框架的引擎
- Connection Pooling:数据库连接池
- Dialect:选择连接数据库的DB API种类
- Schema/Types:架构和类型
- SQL Exprression Language:SQL表达式语言
SQLAlchemy使用Engine/ConnectionPooling/Dialect进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
配置
创建引擎
# 数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名
# e.g.
# mysql+pymysql://root:1234@127.0.0.1:3306/db1
# 连接
engine = create_engine(
"URI",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程时,最多等待的时间,超时报错,默认30秒
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置),-1代表永远不回收
)
源引擎
SQLAlchemy本身无法操作数据库,其本质上是依赖pymysql、mysqldb、mssql等第三方插件,根据配置文件的不同调用不同的数据库API,利用Dialect和数据API进行交流,从而实现对数据库的操作。
以下方式返回源引擎的连接对象。
import sqlalchemy
from sqlalchemy import create_engine
engine = create_engine(...)
conn = engine.raw_connection() # 返回一个pymysql连接对象
cursor = conn.cursor()
cursor.execute("select * from table_name")
result = cursor.fetchall()
cursor.close()
conn.close()
ORM映射过程
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy import Column, Integer, String, DateTime, Text
Base = declarative_base()
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False, default='xx')
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
__table_args__ = (
UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一索引
Index('ix_id_name', 'name', 'email'), #给name和email创建普通索引,索引名为ix_id_name
)
默认建表的引擎为MyISAM
,可通过__table_args__
设置为InnoDB
。
__table_args__ = {
'mysql_engine': 'InnoDB', # 指定表的引擎
'mysql_charset': 'utf8' # 指定表的编码格式
}
FK/M2M关系创建
FK关系示例。
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
hobby_name = Column(String(50))
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
name = Column(String(32))
hobby_id = Column(Integer, ForeignKey("hobby.id")) # FK结构
hobby = relationship("Hobby", backref='person') # 反向引用
M2M关系示例。多对多需要自行创建第三张表。
class Server2Group(Base):
__tablename__ = 'server2group'
id = Column(Integer, primary_key=True, autoincrement=True) # 自增
server_id = Column(Integer, ForeignKey('server.id'))
group_id = Column(Integer, ForeignKey('group.id'))
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(64))
servers = relationship('Server', secondary='server2group', backref='groups')
class Server(Base):
__tablename__ = 'server'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64))
说明
表的删除和创建。
engine = create_engine(...)
Base.metadata.create_all(engine)
Base.metadata.drop_all(engine)
使用原生SQL语句。
engine = create_engine(...)
Session = sessionmaker(bind=engine)
session = Session()
# 查询
cursor = session.execute('select * from user')
result = cursor.fetchall()
# 非只读操作
cursor = session.execute('insert into users(name) values(:value)', params={"value": 'name'})
session.commit()
session.close()
CRUD过程
插入
engine = create_engine(...)
Session = sessionmaker(bind=engine)
session = Session()
session.add(Users(name="name"))
session.add_all([Users(name="name"), Person(name="name")]) # 可不同表
session.commit()
删除
session.query(Users).filter(Users.id > 2).delete()
session.commit()
修改
synchronize_session
设置为False
即执行字符串拼接;设置为"evaluate"
即执行四则运算
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"},
synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1},
synchronize_session="evaluate")
session.commit()
查询
注意filter
与filter_by
查询条件的写法不同。
session.query(Users).all()
# 列的别名
session.query(Users.name.label('xx'), Users.age).all()
# 返回唯一行,多个数据则报错
session.query(Users).filter(Users.name=="alex").one()
session.query(Users).filter(Users.name=="alex").all()
session.query(Users).filter_by(name='alex').all()
session.query(Users).filter_by(name='alex').first()
session.query(Users).filter_by(name='alex').order_by(Users.id).desc().all()
session.query(Users).filter_by(name='alex').order_by(Users.id).asc().all()
# text 自定义条件 :为变量占位符 在params中进行传参
session.query(Users).filter(text("id<:value and name=:name"))
.params(value=224, name='fred').all()
# 从另一个表中查询
session.query(Users).from_statement(text("SELECT * FROM Hosts where name=:name"))
.params(name='ed').all()
Relationship操作
基于上文FK/M2M关系创建部分创建的表。
基于曾创建的relationship,下列记录的添加会导致其关联表中也被添加对应的数据。
# FK relationship
class Person:
...
hobby = relationship("Hobby", backref='person')
class Hobby:
...
# 操作
person = Person(name='name', hobby=Hobby(hobby_name='aname'))
session.add(person)
hobby = Hobby(hobby_name='aname')
hobby.person = [Person(name='name1'), Person(name='name2')]
session.add(hobby)