SQLAlchemy 简介

SQLAlchemy 数据库炼金师-核心模式 - 图1

SQLAlchemy 是目前最流行的 Python 语言的SQL工具箱,它为开发人员提供了 SQL 的全部功能同时保留了 Python 的灵活性。

SQLAlchemy 以对象关系映射(ORM)而闻名,使用它可以将 Python 的类映射到数据库,从而允许对象模型和数据库模式从一开始就以一种完全分离的方式分别进行架构。 对象模型可以无需考虑使用什么数据库来存储, 这样给项目开发带来了极大的便利.

开始之前, 先收下几个有用的网址

什么是ORM?

ORM(对象关系映射)是一种编程技术,用于在面向对象的编程语言中将对象与数据库表进行映射, 从而简化开发过程。

在ORM系统中,每个类都映射到数据库中的表。 使用了 ORM 技术的项目不必在编写繁琐的数据库接口代码, 而是由 ORM 技术自动来解决增删改查这些与数据库相关的操作,因而开发者可以更专注于实现项目的核心功能。

SQLAlchemy 数据库炼金师-核心模式 - 图2

SQLAlchemy 的两种使用模式

SQLAlchemy 由两个主要的模块组成, 分别是 核心模式 与 ORM 模式

核心模式: 是功能齐全的 SQL 语言的抽象工具包,对于熟悉 SQL 语言的开发者来说, 使用 SQLAlchemy 的核心模式可以快速的上手, 而又无需考虑数据库之间的 SQL 方言差异

ORM 模式: 每个类都映射到数据库中的表, 开发者无需在编写繁琐的数据库接口代码, 可以更专注于实现项目的核心功能

本课程主要讲解 SQLAlchemy 的核心模式开发, ORM 模式开发参考本课程的姊妹篇

第一关 核心模式快速案例

接下来我们用一个简短的案例用 SQLAlchemy 实现对数据库的增删改查操作

  1. # 连接数据库
  2. from sqlalchemy import create_engine
  3. # 连接到 SQLite 数据库
  4. engine = create_engine('sqlite:///sqlalchemy.db')
  5. # 通常加上 echo = True 用于调试, 这样 SQLAlchemy 会输出自动生成的 SQL 语句
  6. engine = create_engine('sqlite:///sqlalchemy.db', echo = True )
  7. # 这个时候还没有连接数据, 还需要对 engine 对象调用 connect 方法
  8. cn = engine.connect()
  9. # 与 DB-API 规范一样, 可以直接使用 execute 方法执行 SQL 语句
  10. # 首先建立一个数据表
  11. artist_create_table = """
  12. create table artist(
  13. id integer primary key autoincrement not null,
  14. name varchar(255)
  15. )
  16. """
  17. cn.execute(artist_create_table)
  18. # 增
  19. cn.execute("insert into artist(id, name) values(1, '测试')")
  20. # 改
  21. cn.execute("update artist set name='姓名' ")
  22. # 查
  23. print(cn.execute("select * from artist").fetchall())
  24. # 输出: [(1, '姓名')]
  25. # 删
  26. cn.execute("delete from artist where id=1")
  27. # 关闭数据库连接
  28. cn.close()

同学你可能会问了, 这个和直接用 DB-API 模块有什么区别, 除了连接数据库有点不同之外, 还是要依赖 SQL语句 做所有的工作 ! 同学你说对了, 这里的快速案例展示的是 SQLAlchemy 可以兼容 DB-API 模块的方法, 因此如果你已经使用 DB-API 模块进行了一些开发, 可以无缝快速的迁移到 SQLAlchemy . 接下来, 让我们探索一下 SQLAlchemy 核心模块中的高级特性, 继续进行对数据库的增删改查操作

第二关 连接数据库

首先我们学习使用 SQLAlchemy 连接数据库

SQLAlchemy 用 create_engine 方法新建一个 Engine 对象, 通过 Engine 对象可以连接数据库, 执行 SQL 语句, 以及其他 DB-API 兼容的方法 , 如图

数据库连接语法: dialect+driver://username:password@host:port/database , 下面通过几个案例讲解如何连接各种类型的数据库

  1. # 连接 SQLite
  2. engine = create_engine('sqlite:///sqlalchemy.db')
  3. # Windows 指定路径
  4. engine = create_engine('sqlite:///C:\\path\\to\\sqlalchemy.db')
  5. # Linux/Unix/Mac 指定路径
  6. engine = create_engine('sqlite:////path/to/sqlalchemy.db')
  7. # MySQL : 缺省驱动 mysql-python
  8. engine = create_engine('mysql://scott:tiger@localhost/foo')
  9. # MySQL : 使用 pymysql 连接
  10. engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo')
  11. # PostgreSQL : 缺省驱动 psycopg2
  12. engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')

重要参数:

  • echo = True : 用于调试, 这样 SQLAlchemy 会输出自动生成的 SQL 语句和其他一些调试信息
  • encoding = ‘utf-8’ : 指定字符串类型的编码, 缺省值就是 utf-8
  • pool_size = 10 : 指定数据库连接池内保持活跃连接的数量

创建了 Engine 对象后, 还没有实际连接数据库, 还需要调用 connect 方法连接数据库

  1. # 连接数据库, 返回连接赋值给 cn
  2. cn = engine.connect()

Engine 对象的其他常用方法

  1. # execute 执行 SQL 语句
  2. engine.execute("select * from artist")
  3. # table_names 获取数据库中的数据表
  4. engine.table_names()
  5. # begin : 事务处理
  6. with engine.begin() as tx:
  7. tx.excute("insert into artist(name) values('事务测试') ")

Engine 与数据库连接池: 由于连接数据库是一个较为耗时的操作, 因此 Engine 内部实现了数据库连接池, 当我们创建 Engine 对象的时候, 使用 pool_size 告诉 Engine 对象数据库连接池需要保持多少个活跃连接. 当我们需要连接数据库时, 从连接池中取用一个现成的连接使用即可, SQLAlchemy 将这一切复杂的逻辑为我们封装在 Engine 对象中

  1. # SQLite 不支持连接池, 这里使用 MySQL 数据库进行演示
  2. engine = create_engine('mysql+pymysql://pandas:pandas@localhost/pandas', pool_size= 5, echo=True)

第三关 建表

为了适应不同数据库建表技术的不同之处 , SQLAlchemy 实现了自己的建表方式: MetaData 元数据类, SQLAlchemy 将不同数据库建表的不同之处封装在 MetaData 中 , 首先需要新建一个 MetaData 对象

  1. # 导入 MetaData 类
  2. from sqlalchemy import MetaData
  3. # 新建 MetaData 类
  4. metadata = MetaData()

建表: 用 Table 对象新建数据库表, 用 Column 对象映射数据库表的每个字段

  1. # 导入需要用的类
  2. from sqlalchemy import Table, Column, Integer, Numeric, String, DateTime
  3. from datetime import datetime
  4. # 创建 Table 对象, 传入 数据表名: teacher, 传入 metadata 对象
  5. table_teacher = Table('teacher', metadata,
  6. # id 列, 指定为整数类型, 主键
  7. Column('id', Integer(), primary_key=True, autoincrement=True),
  8. # email 列 , 字符串类型, 长度 255, 不能为空, 建立 unique 索引
  9. Column('email', String(255), nullable=False, unique=True),
  10. # name 列 , 字符串类型, 长度 50, 建立索引
  11. Column('name', String(50), index=True),
  12. # weibo_url 列, 字符串类型, 长度 255
  13. Column('weibo_url', String(255)),
  14. # phone 列, 字符串类型, 不能为空
  15. Column('phone', String(55), nullable=False),
  16. # gender 列, 整数, 缺省值为1
  17. Column('gender', Integer(), default=1),
  18. # salary 列, 数值类型, 小数点2位
  19. Column('salary', Numeric(12, 2)),
  20. # created_on 日期时间类型, 缺省值为增加数据的时间
  21. Column('created_on', DateTime(), default=datetime.now),
  22. # updated_on 日期时间类型, 缺省值为增加数据的时间, 设置更新数据时自动更新此字段数据1
  23. Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now),
  24. # 设置此参数支持 SQLite 设置主键自动增加
  25. sqlite_autoincrement=True
  26. )
  27. # 连接数据库
  28. engine = create_engine('sqlite:///sqlalchemy.db', echo = True)
  29. # 建表
  30. metadata.create_all(engine)
  1. -- 由于加上了 echo = True 参数, 会输出大量调试信息, 其中会有以下 SQL 语句
  2. CREATE TABLE teacher (
  3. id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  4. email VARCHAR(255) NOT NULL,
  5. name VARCHAR(50),
  6. webo_url VARCHAR(255),
  7. phone VARCHAR(55) NOT NULL,
  8. gender INTEGER,
  9. salary NUMERIC(12, 2),
  10. created_on DATETIME,
  11. updated_on DATETIME,
  12. UNIQUE (email)
  13. )
  14. -- 下面还有一句建立索引的 SQL 语句
  15. CREATE INDEX ix_teacher_name ON teacher (name)
  1. # 连接数据库, 这次用 MySQL 数据库, 其他代码不变
  2. engine = create_engine('mysql+pymysql://pandas:pandas@localhost/pandas', echo = True)
  3. # 建表
  4. metadata.create_all(engine)
  1. -- 由于加上了 echo = True 参数, 会输出大量调试信息, 其中会有以下 SQL 语句
  2. -- SQLAlchemy 自动生成了 MySQL 的建表语句, 我们无需修改任何代码
  3. CREATE TABLE teacher (
  4. id INTEGER NOT NULL AUTO_INCREMENT,
  5. email VARCHAR(255) NOT NULL,
  6. name VARCHAR(50),
  7. webo_url VARCHAR(255),
  8. phone VARCHAR(55) NOT NULL,
  9. gender INTEGER,
  10. salary NUMERIC(12, 2),
  11. created_on DATETIME,
  12. updated_on DATETIME,
  13. PRIMARY KEY (id),
  14. UNIQUE (email)
  15. )
  16. CREATE INDEX ix_teacher_name ON teacher (name)

建立了数据表以后, Table 对象还有大用处, 可以进行增删改查, 所以如果使用 SQLAlchemy 的核心模式开发项目, 首先就要建立 Table 对象. 假如你已经使用其他方式建立了数据表, 没问题 SQLAlchemy 的 Table 对象也可以从现有的数据表加载

  1. # 假如以上代码已经执行过了, 那么数据库中应该建立好数据表 teacher , 现在让我们用代码把 teacher 加载成 Table 对象
  2. # 连接数据库
  3. engine = create_engine('sqlite:///sqlalchemy.db', echo = True)
  4. # 新建 MetaData , 这一次 传入 Engine 对象
  5. metadata= MetaData(engine)
  6. # 从数据库中加载 teacher 表, 需要两个参数: metadata, autoload
  7. table_teacher_1 = Table('teacher', metadata, autoload=True)

下面列出了 SQLAlchemy 提供的常用数据类型, 供参考使用

  • BigInteger
  • Boolean
  • Date/Time/DateTime
  • Enum
  • Float
  • Integer/SmallInteger
  • LargeBinary
  • Numeric
  • String/Unicode
  • Text/UnicodeText

SQLAlchemy 还有一些数据类型是只有部分数据库才支持的, 要了解这些数据类型请参考官方文档 https://docs.sqlalchemy.org/en/13/core/type_basics.html

ForeignKey 外键: 在关系型数据库设计中, 外键是一个重要的特性, 即数据表中的其中一个字段的数据来自于另一个数据表的字段, 通常来自于另一个数据表的主键。 上面我们设计了一个稍微复杂一些的 teacher 表, 现在设计一个 course 表, 其中的一个字段 teacher_id 来自于 teacher 表的主键

  1. from sqlalchemy import Table, Column, Integer, String, DateTime, ForeignKey
  2. from datetime import datetime
  3. # 创建 Table 对象, 传入 数据表名: course, 传入 metadata 对象
  4. table_course = Table('course', metadata,
  5. # id 列, 指定为整数类型, 主键
  6. Column('id', Integer(), primary_key=True, autoincrement=True),
  7. # name 列 , 字符串类型, 长度 255, 不能为空, 建立 unique 索引
  8. Column('name', String(255), nullable=False, unique=True),
  9. # teacher_id , 整数类型, 外键
  10. Column('teacher_id', Integer(), ForeignKey('teacher.id')),
  11. # created_on 日期时间类型, 缺省值为增加数据的时间
  12. Column('created_on', DateTime(), default=datetime.now),
  13. # 设置此参数支持 SQLite 设置主键自动增加
  14. sqlite_autoincrement=True
  15. )
  16. # 连接 SQLite 数据库
  17. engine = create_engine('sqlite:///sqlalchemy.db', echo = True)
  18. # 建表, 使用 Table 对象的 create 方法建表
  19. table_course.create(engine)

建表输出:

  1. CREATE TABLE course (
  2. id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  3. name VARCHAR(255) NOT NULL,
  4. teacher_id INTEGER,
  5. created_on DATETIME,
  6. UNIQUE (name),
  7. FOREIGN KEY(teacher_id) REFERENCES teacher (id)
  8. )

第四关 增删改

创建了数据表以后, 接下来对数据表进行增删改操作. 当然我们可以使用 engine.execute 方法, 直接使用 SQL 语句进行增删改, 也可以使用 SQLAlchemy 为我们提供的方法, 不写 SQL 语句进行增删改操作

在前面建表关卡, 我们建立了 teacher 和 course 数据表, 并分别使用 metadata.create_all 方法 和 table_course.create 方法创建了数据表, 下面我们使用 Table 对象内置的 insert 方法增加数据

  1. # 接以上建表代码, 在这里连接一下 SQLite 数据库
  2. engine = create_engine('sqlite:///sqlalchemy.db', echo = True)
  3. cn = engine.connect()
  4. # 先增加 teacher 数据, 因为 course 需要关联 teacher.id
  5. # 增加一行数据
  6. cn.execute( table_teacher.insert().values(email='a@a.com', name='姓名', phone='123456'))
  7. # 由于使用了 echo = True 参数, 会自动输出生成的 SQL 语句
  8. # INSERT INTO teacher (email, name, phone, gender, created_on, updated_on) VALUES (?, ?, ?, ?, ?, ?)
  9. # ('a1@a.com', '姓名', '123456', 1, '2020-05-22 11:31:59.996943', '2020-05-22 11:31:59.996955')
  10. # 增加多行数据, 首先传入 table_teacher.insert(), 会自动生成 SQL 语句, 然后传入一个列表对象, 其元素为字典对象
  11. cn.execute(table_teacher.insert(), [
  12. {'email':'a1@a.com', 'name':'姓名1', 'phone':'123456'},
  13. {'email':'a2@a.com', 'name':'姓名2', 'phone':'123456'},
  14. {'email':'a3@a.com', 'name':'姓名3', 'phone':'123456'},
  15. ] )
  16. # 直接输出 table_teacher.insert()
  17. print(table_teacher.insert())
  18. # 输出
  19. "INSERT INTO teacher (id, email, name, webo_url, phone, gender, salary, created_on, updated_on) VALUES (:id, :email, :name, :webo_url, :phone, :gender, :salary, :created_on, :updated_on)"
  20. # 现在 teacher 表中有了数据后, 可以增加一些 course 数据了
  21. cn.execute( table_course.insert().values(name='数学', teacher_id = 1) )
  22. # 在调试信息中可以看到
  23. # INSERT INTO course (name, teacher_id, created_on) VALUES (?, ?, ?)
  24. # ('数学', 1, '2020-05-22 11:44:54.889823')
  25. # 虽然只提供了两个参数, 然而 SQLAlchemy 还是把 created_on 字段也自动生成了, 并自动加上了当前的时间

获取增加数据后自动生成的主键值

  1. # execute 方法返回的结果是 SQLAlchemy 的对象: ResultProxy , 从属性 inserted_primary_key 可以获得自动生成的主键值
  2. cn.execute( table_course.insert().values(name='语文', teacher_id = 1) ).inserted_primary_key
  3. # 输出一个列表数据: [2]
  4. # 也可以使用 DB-API 规范中的 lastrowid 获取自动生成的主键值
  5. cn.execute( table_course.insert().values(name='英语', teacher_id = 1) ).lastrowid
  6. # 输出: 3

改变数据: 使用 Table 对象的 update 方法可以修改数据, 与增加数据不同, update 方法首先用 where 方法提供一个筛选条件, 然后在提供需要修改的数据. 在查询数据关卡会详细讲解筛选条件, 在这里我们先简单使用一下

  1. # where 筛选条件: table_teacher.c.id 是 Table 对象的 id 字段 , "==" 在这里使用两个= , 因为需要的是一个判断表达式, 而不是参数赋值
  2. # values 与 insert 使用方法一致
  3. cn.execute( table_teacher.update().where( table_teacher.c.id == 1 ).values(email='a@a.com', name='姓名', phone='123456') )

删除数据: 使用 Table 对象的 delete 方法可以删除数据, 与 update 方法一样用 where 方法提供筛选条件. 注意: 与 update 方法不同的是如果没有提供筛选条件, 会删除表中所有数据 !

  1. # 删除表中全部数据! 还需要使用 execute 才会执行删除, 这里其实是生成 SQL 语句, 没有真的执行删除, 不用紧张
  2. table_teacher.delete()
  3. # 删除指定的数据
  4. cn.execute( table_teacher.delete().where(table_teacher.c.id == 2))

第五关 查询数据库

介绍了增删改之后, 接下来我们学习使用 SQLAlchemy 查询数据, 对应的大家可能已经猜到了, select 方法可以查询数据, 加上 where 方法进行筛选

  1. # 选择全部数据, 相当于 select * from teacher
  2. cn.execute( table_teacher.select() )
  3. # 使用 for 循环遍历返回结果
  4. for row in cn.execute( table_teacher.select() ):
  5. # 可以使用字段名获取数据
  6. print( row['id'])
  7. # 以下代码省略了 print , 方便大家拷贝粘贴
  8. # 使用 fetchone/fetchmany/fetchall 方法获取数据
  9. # fetchone 获取一行数据
  10. cn.execute(table_teacher.select()).fetchone()
  11. # fetchmany 获取多行数据, 返回的是列表
  12. cn.execute(table_teacher.select()).fetchmany(3)
  13. # fetchall 获取全部数据, 返回的是列表
  14. cn.execute(table_teacher.select()).fetchall()

where 筛选条件: 简单的判断数值/字符串类型

  1. # 以下代码省略了 print 方便拷贝粘贴
  2. # 数值类型: 支持 SQL 语言的算术操作符 例如: == > >= < <= !=
  3. engine.execute(table_teacher.select().where(table_teacher.c.id >= 2)).fetchone()
  4. # 字符串类型: 支持 SQL 语言的比较操作符 例如: == !=
  5. engine.execute(table_teacher.select().where(table_teacher.c.name == '姓名1')).fetchone()
  6. # 字符串类型: 使用 like 匹配字符串
  7. engine.execute(table_teacher.select().where(table_teacher.c.name.like( '%姓名%'))).fetchone()
  8. # 使用 in 匹配多个值
  9. engine.execute(table_teacher.select().where(table_teacher.c.id.in_([1,2,3]) )).fetchone()
  10. # 使用 in 加上 取反操作符 ~ 进行 not in 匹配操作
  11. engine.execute(table_teacher.select().where( ~table_teacher.c.id.in_([1,2,3]) )).fetchone()
  12. # 使用 is 方法进行空值匹配 : 相当于 SQL 语言中的 is null
  13. engine.execute(table_teacher.select().where( table_teacher.c.id.is_(None) )).fetchone()
  14. # 使用 isnot 进行非空值匹配: 相当于 is not null
  15. engine.execute(table_teacher.select().where( table_teacher.c.id.isnot(None) )).fetchone()

where 筛选条件: 多个筛选条件一起使用, 可以使用 and 与 or 来组合多个筛选条件. 因为 and/or 是 Python 的关键字, 因此 SQLAlchemy 中的方法名称是 and / or

  1. # 在 where 方法中直接传入多个判断条件, 默认使用 AND 组合
  2. # 相当于 id >=2 and name like '%姓名%'
  3. engine.execute(table_teacher.select().where(table_teacher.c.id >= 2, table_teacher.c.name.like( '%姓名%'))).fetchone()
  4. # 使用 and / or 组合判断条件
  5. from sqlalchemy import and_, or_
  6. and_( table_teacher.c.id >= 2, table_teacher.c.name.like( '%姓名%') )
  7. # 使用 or 来拼接判断条件
  8. or_( table_teacher.c.id >= 2, table_teacher.c.name.like( '%姓名%') )
  9. # 更加复杂的组合, 组合使用 and / or 来拼接判断条件
  10. # 相当于 teacher.id >= 2 AND (teacher.name LIKE '%姓名%' OR teacher.phone = '123456')
  11. and_( table_teacher.c.id >= 2, or_(table_teacher.c.name.like( '%姓名%'), table_teacher.c.phone =='123456' ) )

order by : 对结果进行排序使用 order_by 方法

  1. # 导入 asc /desc 方法用于排序, 分别对应 SQL 语言的 正序/倒序
  2. from sqlalchemy import asc, desc
  3. # 使用 select 选择数据后, 使用 order_by 方法进行排序, 缺省是正序
  4. table_teacher.select().order_by( table_teacher.c.id)
  5. # 使用 select 选择数据后, 使用 order_by 方法进行排序, 倒序用 desc 方法
  6. table_teacher.select().order_by( desc(table_teacher.c.id) )
  7. # 对于多列排序, 传入多个 asc/desc 调用即可
  8. table_teacher.select().order_by( table_teacher.c.id, desc(table_teacher.c.name))
  9. # 以上代码会生成 SQL 语句: SELECT teacher.id, teacher.email, teacher.name, teacher.webo_url, teacher.phone, teacher.gender, teacher.salary, teacher.created_on, teacher.updated_on
  10. # FROM teacher ORDER BY teacher.id, teacher.name DESC

多表查询: join 方法支持关联数据表查询, 上面代码中我们建立了两个 Table: table_teacher, table_course , 其中 table_course.c.teacher_id 与 table_teacher.c.id 有关联, 因此我们可以使用 join 方法进行关联查询

  1. # 使用 join 方法进行关联表查询
  2. table_course.select().join(table_teacher)
  3. # 以上代码会生成 SQL 语句 : (SELECT course.id AS id, course.name AS name, course.teacher_id AS teacher_id, course.created_on AS created_on
  4. # FROM course) JOIN teacher ON teacher.id = teacher_id
  5. # table_teacher 也可以关联 table_course
  6. table_teacher.select().join(table_course)
  7. # 以上代码会生成 SQL 语句: (SELECT teacher.id AS id, teacher.email AS email, teacher.name AS name, teacher.webo_url AS webo_url, teacher.phone AS phone, teacher.gender AS gender, teacher.salary AS salary, teacher.created_on AS created_on, teacher.updated_on AS updated_on
  8. # FROM teacher) JOIN course ON id = course.teacher_id

多表查询: select 方法支持多表查询, 上面代码中我们建立了两个 Table: table_teacher, table_course . 下面我们使用 select 方法查询两个表的数据

  1. # 导入 select 方法
  2. from sqlalchemy import select
  3. # 生成的 SQL 语句会包括两个表格中的全部字段, 但是我们还需要 join 两个表格 , 这里使用 where 方法加上判断条件
  4. select([table_teacher, table_course]).where(table_teacher.c.id == table_course.c.teacher_id)

第六关 事务处理

SQLAlchemy 对于事务处理其实和 DB-API 一样, 因为事务处理的套路是一样的, 就是利用 with 语法

  1. # 连接数据库
  2. engine = create_engine('sqlite:///sqlalchemy.db', echo = True)
  3. # 使用 with 启动一个事务
  4. with engine.begin() as cn:
  5. cn.execute("select * from teacher")
  6. # 在这里使用 Table 对象增删改查
  7. cn.execute( table_teacher.select() )

课程练习

设计一个项目 project 的数据库,要求包含以下字段:

  • id 整数类型主键自增1
  • name 字符串类型

本项目使用了一个 流行歌手的数据集 artist.csv , 其中数据非常简单只有两列: id, name , 刚好与我们设计的 project 数据库一样. 接下来使用 Dataset 数据库实现以下操作:

Step1:将 artist.csv 导入项目数据库,并删除掉前 3 个数据;

Step2:搜索数据库,打印出数据库 id = 10 的数据;

Step3:新增一行数据,设置 name = ‘测试项目’ ,不设置 id 字段的数据,并获取新增数据的 id;

Step4:将数据导出为 project_data.csv 文件中。

好了,利用我们本课程已经学习的知识,动手完成这个小练习吧!

参考答案:

  1. # 导入 SQLAlchemy
  2. from sqlalchemy import create_engine, MetaData, Table, Integer, String
  3. # 连接数据库
  4. engine = create_engine('sqlite:///project.db', echo = True)
  5. # 使用 MetaData/Table 建表
  6. metadata = MetaData()
  7. table_project = Table('project', metadata,
  8. # id 列, 指定为整数类型, 主键
  9. Column('id', Integer(), primary_key=True, autoincrement=True),
  10. # name 列 , 字符串类型, 长度 50, 建立索引
  11. Column('name', String(255) ),
  12. # 设置此参数支持 SQLite 设置主键自动增加
  13. sqlite_autoincrement=True
  14. )
  15. table_project.create(engine)
  16. # 建表 SQL 语句 :
  17. # CREATE TABLE project (
  18. # id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  19. # name VARCHAR(255)
  20. # )
  21. # 从 CSV 文件读取数据到列表对象
  22. # 定义一个列表对象
  23. projects = []
  24. # 导入 csv 模块
  25. import csv
  26. # 用 with 语法, 打开文件
  27. with open('artist.csv', newline='') as csvfile:
  28. # 用 DictReader 类, 方便开发
  29. reader = csv.DictReader(csvfile)
  30. # 按行遍历csv 文件
  31. for row in reader:
  32. try:
  33. projects.append({"id": row['id'], "name": row['name'] } )
  34. except Exception as e:
  35. print(e)
  36. # 批量执行 SQL
  37. engine.execute(table_project.insert(), projects)
  38. # 自动生成 SQL 语句: INSERT INTO project (id, name) VALUES (?, ?)
  39. print(" 成功导入 CSV 数据" )
  40. # 删除掉前3个数据
  41. engine.execute(table_project.delete().where(table_project.c.id < 3 ) )
  42. # 打印 id = 10 的数据
  43. print( engine.execute(table_project.select().where(table_project.c.id==10)).fetchone() )
  44. # 新增一行数据, 设置 name = '测试项目' , 不设置 id 字段的数据, 并获取新增数据的 id
  45. print( engine.execute(table_project.insert(), name='测试项目' ).lastrowid )
  46. # 将数据导出为 project_data.csv
  47. with open("project_data.csv", 'w') as csvfile:
  48. writer = csv.writer(csvfile)
  49. header= False
  50. # 遍历数据
  51. for r in engine.execute("select * from project"):
  52. if header == False:
  53. # 写入列名称
  54. writer.writerow(r.keys() )
  55. header = True
  56. writer.writerow(r.values())
  57. print(" 成功写入 CSV 文件")

拓展知识 SQLAlchemy 与大数据

SQLAlchemy 也支持大数据技术, 通过使用支持 DB-API 规范的模块, SQLAlchemy 支持使用 Apache Hive, Presto, Apache Drill, Apache Solr, ElasticSearch, Google BigQuery, Teradata Vantage 等基于大数据技术的数据库与数据仓库

通过 Apache Drill 的模块, SQLAlchemy 可以操作 Hadoop/HBase/Hive/MongoDB 上的数据, 就像使用数据库一样使用大数据

例如, 使用 SQLAlchemy 可以直接连接 Hive 数据仓库中的数据, 使用同样的方法可以查询/筛选数据

  1. # 导入 SQLAlchemy
  2. from sqlalchemy import create_engine, Table, MetaData
  3. # 需要先安装 pyhive : pip install pyhive[hive] pyhive[presto]
  4. # 支持 Apache Presto
  5. engine = create_engine('presto://localhost:8080/hive/default')
  6. # 支持 Apache Hive
  7. engine = create_engine('hive://localhost:10000/default')
  8. # 使用 Hive 中建立好的数据表 : logs , 自动加载并绑定到 Table 对象
  9. logs = Table('logs', MetaData(bind=engine), autoload=True)
  10. # 使用 select 选择数据
  11. engine.execute(logs.select().where(logs.c.id==1)).fetch_one()

总结

SQLAlchemy 是目前最为流行的 Python 数据库模块, 在本课程中主要讲解了如何使用 SQLAlchemy 的核心模式操作数据库, 虽然相对于 ORM 模式有些繁琐, 但是提供了相当大的灵活度. 从 DB-API 模块移植到 SQLAlchemy 十分方便

SQLAlchemy 的核心模式建立在对于 SQL 语言的理解与掌控之上, 因此要学好 SQLAlchemy 的核心模式开发, 需要先掌握 SQL 语言

SQLAlchemy 还支持另一种开发模式: ORM 模式, 在使用 ORM 模式进行开发之前, 多了解核心模式和 SQL 语言可以事半功倍

最后, SQLAlchemy 支持常见的大数据技术如 Hive/Hadoop/HBase 等, 是由 Python 开发工程师进阶到大数据开发工程师的一条捷径