信息

image.png
解释

  • Engine:框架的引擎
  • Connection Pooling:数据库连接池
  • Dialect:选择连接数据库的DB API种类
  • Schema/Types:架构和类型
  • SQL Exprression Language:SQL表达式语言

SQLAlchemy使用Engine/ConnectionPooling/Dialect进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

配置

创建引擎

  1. # 数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名
  2. # e.g.
  3. # mysql+pymysql://root:1234@127.0.0.1:3306/db1
  4. # 连接
  5. engine = create_engine(
  6. "URI",
  7. max_overflow=0, # 超过连接池大小外最多创建的连接
  8. pool_size=5, # 连接池大小
  9. pool_timeout=30, # 池中没有线程时,最多等待的时间,超时报错,默认30秒
  10. pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置),-1代表永远不回收
  11. )

源引擎

SQLAlchemy本身无法操作数据库,其本质上是依赖pymysql、mysqldb、mssql等第三方插件,根据配置文件的不同调用不同的数据库API,利用Dialect和数据API进行交流,从而实现对数据库的操作。
以下方式返回源引擎的连接对象。

  1. import sqlalchemy
  2. from sqlalchemy import create_engine
  3. engine = create_engine(...)
  4. conn = engine.raw_connection() # 返回一个pymysql连接对象
  5. cursor = conn.cursor()
  6. cursor.execute("select * from table_name")
  7. result = cursor.fetchall()
  8. cursor.close()
  9. conn.close()

ORM映射过程

  1. import datetime
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy import ForeignKey, DateTime, UniqueConstraint, Index
  5. from sqlalchemy import Column, Integer, String, DateTime, Text
  6. Base = declarative_base()
  7. class User(Base):
  8. __tablename__ = 'user'
  9. id = Column(Integer, primary_key=True)
  10. name = Column(String(32), index=True, nullable=False, default='xx')
  11. email = Column(String(32), unique=True)
  12. ctime = Column(DateTime, default=datetime.datetime.now)
  13. extra = Column(Text, nullable=True)
  14. __table_args__ = (
  15. UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一索引
  16. Index('ix_id_name', 'name', 'email'), #给name和email创建普通索引,索引名为ix_id_name
  17. )

默认建表的引擎为MyISAM,可通过__table_args__设置为InnoDB

  1. __table_args__ = {
  2. 'mysql_engine': 'InnoDB', # 指定表的引擎
  3. 'mysql_charset': 'utf8' # 指定表的编码格式
  4. }

FK/M2M关系创建

FK关系示例。

  1. class Hobby(Base):
  2. __tablename__ = 'hobby'
  3. id = Column(Integer, primary_key=True)
  4. hobby_name = Column(String(50))
  5. class Person(Base):
  6. __tablename__ = 'person'
  7. id = Column(Integer, primary_key=True)
  8. name = Column(String(32))
  9. hobby_id = Column(Integer, ForeignKey("hobby.id")) # FK结构
  10. hobby = relationship("Hobby", backref='person') # 反向引用

M2M关系示例。多对多需要自行创建第三张表。

  1. class Server2Group(Base):
  2. __tablename__ = 'server2group'
  3. id = Column(Integer, primary_key=True, autoincrement=True) # 自增
  4. server_id = Column(Integer, ForeignKey('server.id'))
  5. group_id = Column(Integer, ForeignKey('group.id'))
  6. class Group(Base):
  7. __tablename__ = 'group'
  8. id = Column(Integer, primary_key=True)
  9. name = Column(String(64))
  10. servers = relationship('Server', secondary='server2group', backref='groups')
  11. class Server(Base):
  12. __tablename__ = 'server'
  13. id = Column(Integer, primary_key=True, autoincrement=True)
  14. hostname = Column(String(64))

说明

表的删除和创建。

  1. engine = create_engine(...)
  2. Base.metadata.create_all(engine)
  3. Base.metadata.drop_all(engine)

使用原生SQL语句。

  1. engine = create_engine(...)
  2. Session = sessionmaker(bind=engine)
  3. session = Session()
  4. # 查询
  5. cursor = session.execute('select * from user')
  6. result = cursor.fetchall()
  7. # 非只读操作
  8. cursor = session.execute('insert into users(name) values(:value)', params={"value": 'name'})
  9. session.commit()
  10. session.close()

会话不再使用后调用session.close()

CRUD过程

插入

  1. engine = create_engine(...)
  2. Session = sessionmaker(bind=engine)
  3. session = Session()
  4. session.add(Users(name="name"))
  5. session.add_all([Users(name="name"), Person(name="name")]) # 可不同表
  6. session.commit()

删除

  1. session.query(Users).filter(Users.id > 2).delete()
  2. session.commit()

修改

synchronize_session设置为False即执行字符串拼接;设置为"evaluate"即执行四则运算

  1. session.query(Users).filter(Users.id > 0).update({"name" : "099"})
  2. session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"},
  3. synchronize_session=False)
  4. session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1},
  5. synchronize_session="evaluate")
  6. session.commit()

查询

注意filterfilter_by查询条件的写法不同。

  1. session.query(Users).all()
  2. # 列的别名
  3. session.query(Users.name.label('xx'), Users.age).all()
  4. # 返回唯一行,多个数据则报错
  5. session.query(Users).filter(Users.name=="alex").one()
  6. session.query(Users).filter(Users.name=="alex").all()
  7. session.query(Users).filter_by(name='alex').all()
  8. session.query(Users).filter_by(name='alex').first()
  9. session.query(Users).filter_by(name='alex').order_by(Users.id).desc().all()
  10. session.query(Users).filter_by(name='alex').order_by(Users.id).asc().all()
  11. # text 自定义条件 :为变量占位符 在params中进行传参
  12. session.query(Users).filter(text("id<:value and name=:name"))
  13. .params(value=224, name='fred').all()
  14. # 从另一个表中查询
  15. session.query(Users).from_statement(text("SELECT * FROM Hosts where name=:name"))
  16. .params(name='ed').all()

Relationship操作

基于上文FK/M2M关系创建部分创建的表。
基于曾创建的relationship,下列记录的添加会导致其关联表中也被添加对应的数据。

  1. # FK relationship
  2. class Person:
  3. ...
  4. hobby = relationship("Hobby", backref='person')
  5. class Hobby:
  6. ...
  7. # 操作
  8. person = Person(name='name', hobby=Hobby(hobby_name='aname'))
  9. session.add(person)
  10. hobby = Hobby(hobby_name='aname')
  11. hobby.person = [Person(name='name1'), Person(name='name2')]
  12. session.add(hobby)