信息

解释
- Engine:框架的引擎
- Connection Pooling:数据库连接池
- Dialect:选择连接数据库的DB API种类
- Schema/Types:架构和类型
- SQL Exprression Language:SQL表达式语言
SQLAlchemy使用Engine/ConnectionPooling/Dialect进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
配置
创建引擎
# 数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名# e.g.# mysql+pymysql://root:1234@127.0.0.1:3306/db1# 连接engine = create_engine("URI",max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 池中没有线程时,最多等待的时间,超时报错,默认30秒pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置),-1代表永远不回收)
源引擎
SQLAlchemy本身无法操作数据库,其本质上是依赖pymysql、mysqldb、mssql等第三方插件,根据配置文件的不同调用不同的数据库API,利用Dialect和数据API进行交流,从而实现对数据库的操作。
以下方式返回源引擎的连接对象。
import sqlalchemyfrom sqlalchemy import create_engineengine = create_engine(...)conn = engine.raw_connection() # 返回一个pymysql连接对象cursor = conn.cursor()cursor.execute("select * from table_name")result = cursor.fetchall()cursor.close()conn.close()
ORM映射过程
import datetimefrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import ForeignKey, DateTime, UniqueConstraint, Indexfrom sqlalchemy import Column, Integer, String, DateTime, TextBase = declarative_base()class User(Base):__tablename__ = 'user'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False, default='xx')email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一索引Index('ix_id_name', 'name', 'email'), #给name和email创建普通索引,索引名为ix_id_name)
默认建表的引擎为MyISAM,可通过__table_args__设置为InnoDB。
__table_args__ = {'mysql_engine': 'InnoDB', # 指定表的引擎'mysql_charset': 'utf8' # 指定表的编码格式}
FK/M2M关系创建
FK关系示例。
class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True)hobby_name = Column(String(50))class Person(Base):__tablename__ = 'person'id = Column(Integer, primary_key=True)name = Column(String(32))hobby_id = Column(Integer, ForeignKey("hobby.id")) # FK结构hobby = relationship("Hobby", backref='person') # 反向引用
M2M关系示例。多对多需要自行创建第三张表。
class Server2Group(Base):__tablename__ = 'server2group'id = Column(Integer, primary_key=True, autoincrement=True) # 自增server_id = Column(Integer, ForeignKey('server.id'))group_id = Column(Integer, ForeignKey('group.id'))class Group(Base):__tablename__ = 'group'id = Column(Integer, primary_key=True)name = Column(String(64))servers = relationship('Server', secondary='server2group', backref='groups')class Server(Base):__tablename__ = 'server'id = Column(Integer, primary_key=True, autoincrement=True)hostname = Column(String(64))
说明
表的删除和创建。
engine = create_engine(...)Base.metadata.create_all(engine)Base.metadata.drop_all(engine)
使用原生SQL语句。
engine = create_engine(...)Session = sessionmaker(bind=engine)session = Session()# 查询cursor = session.execute('select * from user')result = cursor.fetchall()# 非只读操作cursor = session.execute('insert into users(name) values(:value)', params={"value": 'name'})session.commit()session.close()
CRUD过程
插入
engine = create_engine(...)Session = sessionmaker(bind=engine)session = Session()session.add(Users(name="name"))session.add_all([Users(name="name"), Person(name="name")]) # 可不同表session.commit()
删除
session.query(Users).filter(Users.id > 2).delete()session.commit()
修改
synchronize_session设置为False即执行字符串拼接;设置为"evaluate"即执行四则运算
session.query(Users).filter(Users.id > 0).update({"name" : "099"})session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"},synchronize_session=False)session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1},synchronize_session="evaluate")session.commit()
查询
注意filter与filter_by查询条件的写法不同。
session.query(Users).all()# 列的别名session.query(Users.name.label('xx'), Users.age).all()# 返回唯一行,多个数据则报错session.query(Users).filter(Users.name=="alex").one()session.query(Users).filter(Users.name=="alex").all()session.query(Users).filter_by(name='alex').all()session.query(Users).filter_by(name='alex').first()session.query(Users).filter_by(name='alex').order_by(Users.id).desc().all()session.query(Users).filter_by(name='alex').order_by(Users.id).asc().all()# text 自定义条件 :为变量占位符 在params中进行传参session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').all()# 从另一个表中查询session.query(Users).from_statement(text("SELECT * FROM Hosts where name=:name")).params(name='ed').all()
Relationship操作
基于上文FK/M2M关系创建部分创建的表。
基于曾创建的relationship,下列记录的添加会导致其关联表中也被添加对应的数据。
# FK relationshipclass Person:...hobby = relationship("Hobby", backref='person')class Hobby:...# 操作person = Person(name='name', hobby=Hobby(hobby_name='aname'))session.add(person)hobby = Hobby(hobby_name='aname')hobby.person = [Person(name='name1'), Person(name='name2')]session.add(hobby)
