Info

这些文档即将被更新。🎉

当前版本假设Pydantic v1和SQLAlchemy版本小于2。

新的文档将包括Pydantic v2以及 SQLModel(也是基于SQLAlchemy),一旦SQLModel更新为为使用Pydantic v2。

FastAPI不需要你使用SQL(关系型)数据库。

但是您可以使用任何您想要的关系型数据库。

在这里,让我们看一个使用着SQLAlchemy的示例。

您可以很容易地将其调整为任何SQLAlchemy支持的数据库,如:

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server,等等其它数据库

在此示例中,我们将使用SQLite,因为它使用单个文件并且 在Python中具有集成支持。因此,您可以复制此示例并按原样来运行它。

稍后,对于您的产品级别的应用程序,您可能会要使用像PostgreSQL这样的数据库服务器。

这儿有一个FastAPIPostgreSQL的官方项目生成器,全部基于Docker,包括前端和更多工具:https://github.com/tiangolo/full-stack-fastapi-postgresql

请注意,大部分代码是 SQLAlchemy 的标准代码,您可以用于任何框架。FastAPI 特定的代码和往常一样少。

ORMs(对象关系映射)

FastAPI可与任何数据库在任何样式的库中一起与 数据库进行通信。

一种常见的模式是使用“ORM”:对象关系映射。

ORM 具有在代码和数据库表(“关系型”)中的对象之间转换(“映射*”)的工具。

使用 ORM,您通常会在 SQL 数据库中创建一个代表映射的类,该类的每个属性代表一个列,具有名称和类型。

例如,一个类Pet可以表示一个 SQL 表pets

该类的每个实例对象都代表数据库中的一行数据。

又例如,一个对象orion_catPet的一个实例)可以有一个属性orion_cat.type, 对标数据库中的type列。并且该属性的值可以是其它,例如"cat"

这些 ORM 还具有在表或实体之间建立关系的工具(比如创建多表关系)。

这样,您还可以拥有一个属性orion_cat.owner,它包含该宠物所有者的数据,这些数据取自另外一个表。

因此,orion_cat.owner.name可能是该宠物主人的姓名(来自表owners中的列name)。

它可能有一个像"Arquilian"(一种业务逻辑)。

当您尝试从您的宠物对象访问它时,ORM 将完成所有工作以从相应的表所有者那里再获取信息。

常见的 ORM 例如:Django-ORM(Django 框架的一部分)、SQLAlchemy ORM(SQLAlchemy 的一部分,独立于框架)和 Peewee(独立于框架)等。

在这里,我们将看到如何使用SQLAlchemy ORM

以类似的方式,您也可以使用任何其他 ORM。

在文档中也有一篇使用 Peewee 的等效的文章。

文件结构

对于这些示例,假设您有一个名为的目录my_super_project,其中包含一个名为的子目录sql_app,其结构如下:

. └── sql_app ├── __init__.py ├── crud.py ├── database.py ├── main.py ├── models.py └── schemas.py

该文件__init__.py只是一个空文件,但它告诉 Python sql_app 是一个包。

现在让我们看看每个文件/模块的作用。

安装 SQLAlchemy

首先你需要安装SQLAlchemy:

fast →pip install sqlalchemy
████████████████████████████████████████ 100%
restart ↻

创建 SQLAlchemy 部件

让我们转到文件sql_app/database.py

导入 SQLAlchemy 部件

  1. from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker
  2. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
  3. # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
  4. engine = create_engine(
  5. SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
  6. )
  7. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  8. Base = declarative_base()

为 SQLAlchemy 定义数据库 URL地址

  1. from sqlalchemy import create_engine
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
  5. engine = create_engine(
  6. SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
  7. )
  8. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  9. Base = declarative_base()

在这个例子中,我们正在“连接”到一个 SQLite 数据库(用 SQLite 数据库打开一个文件)。

该文件将位于文件中的同一目录中sql_app.db

这就是为什么最后一部分是./sql_app.db.

如果您使用的是PostgreSQL数据库,则只需取消注释该行:

SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

…并根据您的数据库数据和相关凭据(也适用于 MySQL、MariaDB 或任何其他)对其进行调整。

如果您想使用不同的数据库,这是就是您必须修改的地方。

创建 SQLAlchemy 引擎

第一步,创建一个 SQLAlchemy的“引擎”。

我们稍后会将这个engine在其他地方使用。

  1. from sqlalchemy import create_engine
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
  5. # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
  6. engine = create_engine(
  7. SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  8. Base = declarative_base()

注意

参数:

connect_args={"check_same_thread": False}

…仅用于SQLite,在其他数据库不需要它。

默认情况下,SQLite 只允许一个线程与其通信,假设有多个线程的话,也只将处理一个独立的请求。

这是为了防止意外地为不同的事物(不同的请求)共享相同的连接。

但是在 FastAPI 中,使用普通函数(def)时,多个线程可以为同一个请求与数据库交互,所以我们需要使用connect_args={"check_same_thread": False}来让SQLite允许这样。

此外,我们将确保每个请求都在依赖项中获得自己的数据库连接会话,因此不需要该默认机制。

创建一个SessionLocal

每个SessionLocal类的实例都会是一个数据库会话。当然该类本身还不是数据库会话。

但是一旦我们创建了一个SessionLocal类的实例,这个实例将是实际的数据库会话。

我们将它命名为SessionLocal是为了将它与我们从 SQLAlchemy 导入的Session区别开来。

稍后我们将使用Session(从 SQLAlchemy 导入的那个)。

要创建SessionLocal类,请使用函数sessionmaker

  1. from sqlalchemy import create_engine
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
  5. # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
  6. engine = create_engine(
  7. SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
  8. )
  9. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  10. Base = declarative_base()

创建一个Base

现在我们将使用declarative_base()返回一个类。

稍后我们将继承这个类,来创建每个数据库模型或类(ORM 模型):

  1. from sqlalchemy import create_engine
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
  5. # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
  6. engine = create_engine(
  7. SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
  8. )
  9. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  10. Base = declarative_base()

创建数据库模型

现在让我们看看文件 sql_app/models.py

Base类来创建 SQLAlchemy 模型

我们将使用我们之前创建的Base类来创建 SQLAlchemy 模型。

Tip

SQLAlchemy 使用的“模型”这个术语 来指代与数据库交互的这些类和实例。

而 Pydantic 也使用“模型”这个术语 来指代不同的东西,即数据验证、转换以及文档类和实例。

database(来自上面的database.py文件)导入Base

创建从它继承的类。

这些类就是 SQLAlchemy 模型。

  1. from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
  2. from sqlalchemy.orm import relationship
  3. from .database import Base
  4. class User(Base):
  5. __tablename__ = "users"
  6. id = Column(Integer, primary_key=True)
  7. email = Column(String, unique=True, index=True)
  8. hashed_password = Column(String)
  9. is_active = Column(Boolean, default=True)
  10. items = relationship("Item", back_populates="owner")
  11. class Item(Base):
  12. __tablename__ = "items"
  13. id = Column(Integer, primary_key=True)
  14. title = Column(String, index=True)
  15. description = Column(String, index=True)
  16. owner_id = Column(Integer, ForeignKey("users.id"))
  17. owner = relationship("User", back_populates="items")

这个__tablename__属性是用来告诉 SQLAlchemy 要在数据库中为每个模型使用的数据库表的名称。

创建模型属性/列

现在创建所有模型(类)的属性。

这些属性中的每一个都代表其相应数据库表中的一列。

我们使用Column来表示 SQLAlchemy 中的默认值。

我们传递一个 SQLAlchemy “类型”,如IntegerStringBoolean,它定义了数据库中的类型,作为参数。

  1. from sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sqlalchemy.orm import relationship
  2. from .database import Base
  3. class User(Base):
  4. __tablename__ = "users"
  5. id = Column(Integer, primary_key=True) email = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True)
  6. items = relationship("Item", back_populates="owner")
  7. class Item(Base):
  8. __tablename__ = "items"
  9. id = Column(Integer, primary_key=True) title = Column(String, index=True) description = Column(String, index=True) owner_id = Column(Integer, ForeignKey("users.id"))
  10. owner = relationship("User", back_populates="items")

创建关系

现在创建关系。

为此,我们使用SQLAlchemy ORM提供的relationship

这将或多或少会成为一种“神奇”属性,其中表示该表与其他相关的表中的值。

  1. from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
  2. from sqlalchemy.orm import relationship
  3. from .database import Base
  4. class User(Base):
  5. __tablename__ = "users"
  6. id = Column(Integer, primary_key=True)
  7. email = Column(String, unique=True, index=True)
  8. hashed_password = Column(String)
  9. is_active = Column(Boolean, default=True)
  10. items = relationship("Item", back_populates="owner")
  11. class Item(Base):
  12. __tablename__ = "items"
  13. id = Column(Integer, primary_key=True)
  14. title = Column(String, index=True)
  15. description = Column(String, index=True)
  16. owner_id = Column(Integer, ForeignKey("users.id"))
  17. owner = relationship("User", back_populates="items")

当访问 user 中的属性items时,如 中my_user.items,它将有一个ItemSQLAlchemy 模型列表(来自items表),这些模型具有指向users表中此记录的外键。

当您访问my_user.items时,SQLAlchemy 实际上会从items表中的获取一批记录并在此处填充进去。

同样,当访问 Item中的属性owner时,它将包含表中的UserSQLAlchemy 模型users。使用owner_id属性/列及其外键来了解要从users表中获取哪条记录。

创建 Pydantic 模型

现在让我们查看一下文件sql_app/schemas.py

Tip

为了避免 SQLAlchemy模型和 Pydantic模型之间的混淆,我们将有models.py(SQLAlchemy 模型的文件)和schemas.py( Pydantic 模型的文件)。

这些 Pydantic 模型或多或少地定义了一个“schema”(一个有效的数据形状)。

因此,这将帮助我们在使用两者时避免混淆。

创建初始 Pydantic模型/模式

创建一个ItemBaseUserBasePydantic模型(或者我们说“schema”),他们拥有创建或读取数据时具有的共同属性。

然后创建一个继承自他们的ItemCreateUserCreate,并添加创建时所需的其他数据(或属性)。

因此在创建时也应当有一个password属性。

但是为了安全起见,password不会出现在其他同类 Pydantic模型中,例如通过API读取一个用户数据时,它不应当包含在内。

Python 3.10+Python 3.9+Python 3.8+

  1. from pydantic import BaseModel
  2. class ItemBase(BaseModel):
  3. title: str description: str | None = None
  4. class ItemCreate(ItemBase):
  5. pass
  6. class Item(ItemBase):
  7. id: int
  8. owner_id: int
  9. class Config:
  10. orm_mode = True
  11. class UserBase(BaseModel):
  12. email: str
  13. class UserCreate(UserBase):
  14. password: str
  15. class User(UserBase):
  16. id: int
  17. is_active: bool
  18. items: list[Item] = []
  19. class Config:
  20. orm_mode = True
  1. from typing import Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str description: Union[str, None] = None
  5. class ItemCreate(ItemBase):
  6. pass
  7. class Item(ItemBase):
  8. id: int
  9. owner_id: int
  10. class Config:
  11. orm_mode = True
  12. class UserBase(BaseModel):
  13. email: str
  14. class UserCreate(UserBase):
  15. password: str
  16. class User(UserBase):
  17. id: int
  18. is_active: bool
  19. items: list[Item] = []
  20. class Config:
  21. orm_mode = True
  1. from typing import List, Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str description: Union[str, None] = None
  5. class ItemCreate(ItemBase):
  6. pass
  7. class Item(ItemBase):
  8. id: int
  9. owner_id: int
  10. class Config:
  11. orm_mode = True
  12. class UserBase(BaseModel):
  13. email: str
  14. class UserCreate(UserBase):
  15. password: str
  16. class User(UserBase):
  17. id: int
  18. is_active: bool
  19. items: List[Item] = []
  20. class Config:
  21. orm_mode = True

SQLAlchemy 风格和 Pydantic 风格

请注意,SQLAlchemy模型使用 =来定义属性,并将类型作为参数传递给Column,例如:

name = Column(String)

虽然 Pydantic模型使用: 声明类型,但新的类型注释语法/类型提示是:

name: str

请牢记这一点,这样您在使用:还是=时就不会感到困惑。

创建用于读取/返回的Pydantic模型/模式

现在创建当从 API 返回数据时、将在读取数据时使用的Pydantic模型(schemas)。

例如,在创建一个项目之前,我们不知道分配给它的 ID 是什么,但是在读取它时(从 API 返回时)我们已经知道它的 ID。

同样,当读取用户时,我们现在可以声明items,将包含属于该用户的项目。

不仅是这些项目的 ID,还有我们在 Pydantic模型中定义的用于读取项目的所有数据:Item.

Python 3.10+Python 3.9+Python 3.8+

  1. from pydantic import BaseModel
  2. class ItemBase(BaseModel):
  3. title: str
  4. description: str | None = None
  5. class ItemCreate(ItemBase):
  6. pass
  7. class Item(ItemBase):
  8. id: int owner_id: int
  9. class Config:
  10. orm_mode = True
  11. class UserBase(BaseModel):
  12. email: str
  13. class UserCreate(UserBase):
  14. password: str
  15. class User(UserBase):
  16. id: int is_active: bool items: list[Item] = []
  17. class Config:
  18. orm_mode = True
  1. from typing import Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str
  5. description: Union[str, None] = None
  6. class ItemCreate(ItemBase):
  7. pass
  8. class Item(ItemBase):
  9. id: int owner_id: int
  10. class Config:
  11. orm_mode = True
  12. class UserBase(BaseModel):
  13. email: str
  14. class UserCreate(UserBase):
  15. password: str
  16. class User(UserBase):
  17. id: int is_active: bool items: list[Item] = []
  18. class Config:
  19. orm_mode = True
  1. from typing import List, Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str
  5. description: Union[str, None] = None
  6. class ItemCreate(ItemBase):
  7. pass
  8. class Item(ItemBase):
  9. id: int owner_id: int
  10. class Config:
  11. orm_mode = True
  12. class UserBase(BaseModel):
  13. email: str
  14. class UserCreate(UserBase):
  15. password: str
  16. class User(UserBase):
  17. id: int is_active: bool items: List[Item] = []
  18. class Config:
  19. orm_mode = True

Tip

请注意,读取用户(从 API 返回)时将使用不包括passwordUser Pydantic模型

使用 Pydantic 的orm_mode

现在,在用于查询的 Pydantic模型ItemUser,添加一个内部Config类。

此类Config用于为 Pydantic 提供配置。

Config类中,设置属性orm_mode = True

Python 3.10+Python 3.9+Python 3.8+

  1. from pydantic import BaseModel
  2. class ItemBase(BaseModel):
  3. title: str
  4. description: str | None = None
  5. class ItemCreate(ItemBase):
  6. pass
  7. class Item(ItemBase):
  8. id: int
  9. owner_id: int
  10. class Config: orm_mode = True
  11. class UserBase(BaseModel):
  12. email: str
  13. class UserCreate(UserBase):
  14. password: str
  15. class User(UserBase):
  16. id: int
  17. is_active: bool
  18. items: list[Item] = []
  19. class Config: orm_mode = True
  1. from typing import Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str
  5. description: Union[str, None] = None
  6. class ItemCreate(ItemBase):
  7. pass
  8. class Item(ItemBase):
  9. id: int
  10. owner_id: int
  11. class Config: orm_mode = True
  12. class UserBase(BaseModel):
  13. email: str
  14. class UserCreate(UserBase):
  15. password: str
  16. class User(UserBase):
  17. id: int
  18. is_active: bool
  19. items: list[Item] = []
  20. class Config: orm_mode = True
  1. from typing import List, Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str
  5. description: Union[str, None] = None
  6. class ItemCreate(ItemBase):
  7. pass
  8. class Item(ItemBase):
  9. id: int
  10. owner_id: int
  11. class Config: orm_mode = True
  12. class UserBase(BaseModel):
  13. email: str
  14. class UserCreate(UserBase):
  15. password: str
  16. class User(UserBase):
  17. id: int
  18. is_active: bool
  19. items: List[Item] = []
  20. class Config: orm_mode = True

Tip

请注意,它使用=分配一个值,例如:

orm_mode = True

它不使用之前的:来类型声明。

这是设置配置值,而不是声明类型。

Pydanticorm_mode将告诉 Pydantic模型读取数据,即它不是一个dict,而是一个 ORM 模型(或任何其他具有属性的任意对象)。

这样,而不是仅仅试图从dictid 中获取值,如下所示:

id = data["id"]

它还会尝试从属性中获取它,如:

id = data.id

有了这个,Pydantic模型与 ORM 兼容,您只需在路径操作response_model的参数中声明它即可。

您将能够返回一个数据库模型,它将从中读取数据。

ORM 模式的技术细节

SQLAlchemy 和许多其他默认情况下是“延迟加载”。

这意味着,例如,除非您尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。

例如,访问属性items

current_user.items

将使 SQLAlchemy 转到items表并获取该用户的项目,在调用.items之前不会去查询数据库。

没有orm_mode,如果您从路径操作返回一个 SQLAlchemy 模型,它不会包含关系数据。

即使您在 Pydantic 模型中声明了这些关系,也没有用处。

但是在 ORM 模式下,由于 Pydantic 本身会尝试从属性访问它需要的数据(而不是假设为 dict),你可以声明你想要返回的特定数据,它甚至可以从 ORM 中获取它。

CRUD工具

现在让我们看看文件sql_app/crud.py

在这个文件中,我们将编写可重用的函数用来与数据库中的数据进行交互。

CRUD分别为:增加(Create)、查询(Read)、更改(Update)、删除(Delete),即增删改查。

…虽然在这个例子中我们只是新增和查询。

读取数据

sqlalchemy.orm中导入Session,这将允许您声明db参数的类型,并在您的函数中进行更好的类型检查和完成。

导入之前的models(SQLAlchemy 模型)和schemas(Pydantic模型/模式)。

创建一些工具函数来完成:

  • 通过 ID 和电子邮件查询单个用户。
  • 查询多个用户。
  • 查询多个项目。
  1. from sqlalchemy.orm import Session
  2. from . import models, schemas
  3. def get_user(db: Session, user_id: int):
  4. return db.query(models.User).filter(models.User.id == user_id).first()
  5. def get_user_by_email(db: Session, email: str):
  6. return db.query(models.User).filter(models.User.email == email).first()
  7. def get_users(db: Session, skip: int = 0, limit: int = 100):
  8. return db.query(models.User).offset(skip).limit(limit).all()
  9. def create_user(db: Session, user: schemas.UserCreate):
  10. fake_hashed_password = user.password + "notreallyhashed"
  11. db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
  12. db.add(db_user)
  13. db.commit()
  14. db.refresh(db_user)
  15. return db_user
  16. def get_items(db: Session, skip: int = 0, limit: int = 100):
  17. return db.query(models.Item).offset(skip).limit(limit).all()
  18. def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
  19. db_item = models.Item(**item.dict(), owner_id=user_id)
  20. db.add(db_item)
  21. db.commit()
  22. db.refresh(db_item)
  23. return db_item

Tip

通过创建仅专用于与数据库交互(获取用户或项目)的函数,独立于路径操作函数,您可以更轻松地在多个部分中重用它们,并为它们添加单元测试。

创建数据

现在创建工具函数来创建数据。

它的步骤是:

  • 使用您的数据创建一个 SQLAlchemy 模型实例。
  • 使用add来将该实例对象添加到数据库会话。
  • 使用commit来将更改提交到数据库(以便保存它们)。
  • 使用refresh来刷新您的实例对象(以便它包含来自数据库的任何新数据,例如生成的 ID)。
  1. from sqlalchemy.orm import Session
  2. from . import models, schemas
  3. def get_user(db: Session, user_id: int):
  4. return db.query(models.User).filter(models.User.id == user_id).first()
  5. def get_user_by_email(db: Session, email: str):
  6. return db.query(models.User).filter(models.User.email == email).first()
  7. def get_users(db: Session, skip: int = 0, limit: int = 100):
  8. return db.query(models.User).offset(skip).limit(limit).all()
  9. def create_user(db: Session, user: schemas.UserCreate):
  10. fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_user
  11. def get_items(db: Session, skip: int = 0, limit: int = 100):
  12. return db.query(models.Item).offset(skip).limit(limit).all()
  13. def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
  14. db_item = models.Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item

Tip

SQLAlchemy 模型User包含一个hashed_password,它应该是一个包含散列的安全密码。

但由于 API 客户端提供的是原始密码,因此您需要将其提取并在应用程序中生成散列密码。

然后将hashed_password参数与要保存的值一起传递。

Warning

此示例不安全,密码未经过哈希处理。

在现实生活中的应用程序中,您需要对密码进行哈希处理,并且永远不要以明文形式保存它们。

有关更多详细信息,请返回教程中的安全部分。

在这里,我们只关注数据库的工具和机制。

Tip

这里不是将每个关键字参数传递给Item并从Pydantic模型中读取每个参数,而是先生成一个字典,其中包含Pydantic模型的数据:

item.dict()

然后我们将dict的键值对 作为关键字参数传递给 SQLAlchemy Item

Item(**item.dict())

然后我们传递 Pydantic模型未提供的额外关键字参数owner_id

Item(**item.dict(), owner_id=user_id)

FastAPI应用程序

现在在sql_app/main.py文件中 让我们集成和使用我们之前创建的所有其他部分。

创建数据库表

以非常简单的方式创建数据库表:

Python 3.9+Python 3.8+

  1. from fastapi import Depends, FastAPI, HTTPException
  2. from sqlalchemy.orm import Session
  3. from . import crud, models, schemas
  4. from .database import SessionLocal, engine
  5. models.Base.metadata.create_all(bind=engine)
  6. app = FastAPI()
  7. # Dependency
  8. def get_db():
  9. db = SessionLocal()
  10. try:
  11. yield db
  12. finally:
  13. db.close()
  14. @app.post("/users/", response_model=schemas.User)
  15. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  16. db_user = crud.get_user_by_email(db, email=user.email)
  17. if db_user:
  18. raise HTTPException(status_code=400, detail="Email already registered")
  19. return crud.create_user(db=db, user=user)
  20. @app.get("/users/", response_model=list[schemas.User])
  21. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  22. users = crud.get_users(db, skip=skip, limit=limit)
  23. return users
  24. @app.get("/users/{user_id}", response_model=schemas.User)
  25. def read_user(user_id: int, db: Session = Depends(get_db)):
  26. db_user = crud.get_user(db, user_id=user_id)
  27. if db_user is None:
  28. raise HTTPException(status_code=404, detail="User not found")
  29. return db_user
  30. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  31. def create_item_for_user(
  32. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  33. ):
  34. return crud.create_user_item(db=db, item=item, user_id=user_id)
  35. @app.get("/items/", response_model=list[schemas.Item])
  36. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  37. items = crud.get_items(db, skip=skip, limit=limit)
  38. return items
  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException
  3. from sqlalchemy.orm import Session
  4. from . import crud, models, schemas
  5. from .database import SessionLocal, engine
  6. models.Base.metadata.create_all(bind=engine)
  7. app = FastAPI()
  8. # Dependency
  9. def get_db():
  10. db = SessionLocal()
  11. try:
  12. yield db
  13. finally:
  14. db.close()
  15. @app.post("/users/", response_model=schemas.User)
  16. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  17. db_user = crud.get_user_by_email(db, email=user.email)
  18. if db_user:
  19. raise HTTPException(status_code=400, detail="Email already registered")
  20. return crud.create_user(db=db, user=user)
  21. @app.get("/users/", response_model=List[schemas.User])
  22. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  23. users = crud.get_users(db, skip=skip, limit=limit)
  24. return users
  25. @app.get("/users/{user_id}", response_model=schemas.User)
  26. def read_user(user_id: int, db: Session = Depends(get_db)):
  27. db_user = crud.get_user(db, user_id=user_id)
  28. if db_user is None:
  29. raise HTTPException(status_code=404, detail="User not found")
  30. return db_user
  31. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  32. def create_item_for_user(
  33. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  34. ):
  35. return crud.create_user_item(db=db, item=item, user_id=user_id)
  36. @app.get("/items/", response_model=List[schemas.Item])
  37. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  38. items = crud.get_items(db, skip=skip, limit=limit)
  39. return items

Alembic 注意

通常你可能会使用 Alembic,来进行格式化数据库(创建表等)。

而且您还可以将 Alembic 用于“迁移”(这是它的主要工作)。

“迁移”是每当您更改 SQLAlchemy 模型的结构、添加新属性等以在数据库中复制这些更改、添加新列、新表等时所需的一组步骤。

您可以在Project Generation - Template的模板中找到一个 FastAPI 项目中的 Alembic 示例。具体在alembic代码目录中

创建依赖项

现在使用我们在sql_app/database.py文件中创建的SessionLocal来创建依赖项。

我们需要每个请求有一个独立的数据库会话/连接(SessionLocal),在整个请求中使用相同的会话,然后在请求完成后关闭它。

然后将为下一个请求创建一个新会话。

为此,我们将创建一个包含yield的依赖项,正如前面关于Dependencies withyield的部分中所解释的那样。

我们的依赖项将创建一个新的 SQLAlchemy SessionLocal,它将在单个请求中使用,然后在请求完成后关闭它。

Python 3.9+Python 3.8+

  1. from fastapi import Depends, FastAPI, HTTPException
  2. from sqlalchemy.orm import Session
  3. from . import crud, models, schemas
  4. from .database import SessionLocal, engine
  5. models.Base.metadata.create_all(bind=engine)
  6. app = FastAPI()
  7. # Dependency
  8. def get_db():
  9. db = SessionLocal() try: yield db finally: db.close()
  10. @app.post("/users/", response_model=schemas.User)
  11. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  12. db_user = crud.get_user_by_email(db, email=user.email)
  13. if db_user:
  14. raise HTTPException(status_code=400, detail="Email already registered")
  15. return crud.create_user(db=db, user=user)
  16. @app.get("/users/", response_model=list[schemas.User])
  17. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  18. users = crud.get_users(db, skip=skip, limit=limit)
  19. return users
  20. @app.get("/users/{user_id}", response_model=schemas.User)
  21. def read_user(user_id: int, db: Session = Depends(get_db)):
  22. db_user = crud.get_user(db, user_id=user_id)
  23. if db_user is None:
  24. raise HTTPException(status_code=404, detail="User not found")
  25. return db_user
  26. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  27. def create_item_for_user(
  28. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  29. ):
  30. return crud.create_user_item(db=db, item=item, user_id=user_id)
  31. @app.get("/items/", response_model=list[schemas.Item])
  32. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  33. items = crud.get_items(db, skip=skip, limit=limit)
  34. return items
  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException
  3. from sqlalchemy.orm import Session
  4. from . import crud, models, schemas
  5. from .database import SessionLocal, engine
  6. models.Base.metadata.create_all(bind=engine)
  7. app = FastAPI()
  8. # Dependency
  9. def get_db():
  10. db = SessionLocal() try: yield db finally: db.close()
  11. @app.post("/users/", response_model=schemas.User)
  12. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  13. db_user = crud.get_user_by_email(db, email=user.email)
  14. if db_user:
  15. raise HTTPException(status_code=400, detail="Email already registered")
  16. return crud.create_user(db=db, user=user)
  17. @app.get("/users/", response_model=List[schemas.User])
  18. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  19. users = crud.get_users(db, skip=skip, limit=limit)
  20. return users
  21. @app.get("/users/{user_id}", response_model=schemas.User)
  22. def read_user(user_id: int, db: Session = Depends(get_db)):
  23. db_user = crud.get_user(db, user_id=user_id)
  24. if db_user is None:
  25. raise HTTPException(status_code=404, detail="User not found")
  26. return db_user
  27. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  28. def create_item_for_user(
  29. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  30. ):
  31. return crud.create_user_item(db=db, item=item, user_id=user_id)
  32. @app.get("/items/", response_model=List[schemas.Item])
  33. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  34. items = crud.get_items(db, skip=skip, limit=limit)
  35. return items

Info

我们将SessionLocal()请求的创建和处理放在一个try块中。

然后我们在finally块中关闭它。

通过这种方式,我们确保数据库会话在请求后始终关闭。即使在处理请求时出现异常。

但是您不能从退出代码中引发另一个异常(在yield之后)。可以查阅 Dependencies with yield and HTTPException

然后,当在路径操作函数中使用依赖项时,我们使用Session,直接从 SQLAlchemy 导入的类型声明它。

这将为我们在路径操作函数中提供更好的编辑器支持,因为编辑器将知道db参数的类型Session

Python 3.9+Python 3.8+

  1. from fastapi import Depends, FastAPI, HTTPException
  2. from sqlalchemy.orm import Session
  3. from . import crud, models, schemas
  4. from .database import SessionLocal, engine
  5. models.Base.metadata.create_all(bind=engine)
  6. app = FastAPI()
  7. # Dependency
  8. def get_db():
  9. db = SessionLocal()
  10. try:
  11. yield db
  12. finally:
  13. db.close()
  14. @app.post("/users/", response_model=schemas.User)
  15. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  16. db_user = crud.get_user_by_email(db, email=user.email)
  17. if db_user:
  18. raise HTTPException(status_code=400, detail="Email already registered")
  19. return crud.create_user(db=db, user=user)
  20. @app.get("/users/", response_model=list[schemas.User])
  21. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  22. users = crud.get_users(db, skip=skip, limit=limit)
  23. return users
  24. @app.get("/users/{user_id}", response_model=schemas.User)
  25. def read_user(user_id: int, db: Session = Depends(get_db)):
  26. db_user = crud.get_user(db, user_id=user_id)
  27. if db_user is None:
  28. raise HTTPException(status_code=404, detail="User not found")
  29. return db_user
  30. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  31. def create_item_for_user(
  32. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
  33. return crud.create_user_item(db=db, item=item, user_id=user_id)
  34. @app.get("/items/", response_model=list[schemas.Item])
  35. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  36. items = crud.get_items(db, skip=skip, limit=limit)
  37. return items
  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException
  3. from sqlalchemy.orm import Session
  4. from . import crud, models, schemas
  5. from .database import SessionLocal, engine
  6. models.Base.metadata.create_all(bind=engine)
  7. app = FastAPI()
  8. # Dependency
  9. def get_db():
  10. db = SessionLocal()
  11. try:
  12. yield db
  13. finally:
  14. db.close()
  15. @app.post("/users/", response_model=schemas.User)
  16. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  17. db_user = crud.get_user_by_email(db, email=user.email)
  18. if db_user:
  19. raise HTTPException(status_code=400, detail="Email already registered")
  20. return crud.create_user(db=db, user=user)
  21. @app.get("/users/", response_model=List[schemas.User])
  22. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  23. users = crud.get_users(db, skip=skip, limit=limit)
  24. return users
  25. @app.get("/users/{user_id}", response_model=schemas.User)
  26. def read_user(user_id: int, db: Session = Depends(get_db)):
  27. db_user = crud.get_user(db, user_id=user_id)
  28. if db_user is None:
  29. raise HTTPException(status_code=404, detail="User not found")
  30. return db_user
  31. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  32. def create_item_for_user(
  33. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
  34. return crud.create_user_item(db=db, item=item, user_id=user_id)
  35. @app.get("/items/", response_model=List[schemas.Item])
  36. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  37. items = crud.get_items(db, skip=skip, limit=limit)
  38. return items

技术细节

参数db实际上是 type SessionLocal,但是这个类(用 创建sessionmaker())是 SQLAlchemy 的“代理” Session,所以,编辑器并不真正知道提供了哪些方法。

但是通过将类型声明为Session,编辑器现在可以知道可用的方法(.add()、.query()、.commit()等)并且可以提供更好的支持(比如完成)。类型声明不影响实际对象。

创建您的FastAPI 路径操作

现在,到了最后,编写标准的FastAPI 路径操作代码。

Python 3.9+Python 3.8+

  1. from fastapi import Depends, FastAPI, HTTPException
  2. from sqlalchemy.orm import Session
  3. from . import crud, models, schemas
  4. from .database import SessionLocal, engine
  5. models.Base.metadata.create_all(bind=engine)
  6. app = FastAPI()
  7. # Dependency
  8. def get_db():
  9. db = SessionLocal()
  10. try:
  11. yield db
  12. finally:
  13. db.close()
  14. @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  15. db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user)
  16. @app.get("/users/", response_model=list[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  17. users = crud.get_users(db, skip=skip, limit=limit) return users
  18. @app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)):
  19. db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user
  20. @app.post("/users/{user_id}/items/", response_model=schemas.Item) def create_item_for_user(
  21. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
  22. return crud.create_user_item(db=db, item=item, user_id=user_id)
  23. @app.get("/items/", response_model=list[schemas.Item]) def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  24. items = crud.get_items(db, skip=skip, limit=limit) return items
  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException
  3. from sqlalchemy.orm import Session
  4. from . import crud, models, schemas
  5. from .database import SessionLocal, engine
  6. models.Base.metadata.create_all(bind=engine)
  7. app = FastAPI()
  8. # Dependency
  9. def get_db():
  10. db = SessionLocal()
  11. try:
  12. yield db
  13. finally:
  14. db.close()
  15. @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  16. db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user)
  17. @app.get("/users/", response_model=List[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  18. users = crud.get_users(db, skip=skip, limit=limit) return users
  19. @app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)):
  20. db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user
  21. @app.post("/users/{user_id}/items/", response_model=schemas.Item) def create_item_for_user(
  22. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
  23. return crud.create_user_item(db=db, item=item, user_id=user_id)
  24. @app.get("/items/", response_model=List[schemas.Item]) def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  25. items = crud.get_items(db, skip=skip, limit=limit) return items

我们在依赖项中的每个请求之前利用yield创建数据库会话,然后关闭它。

所以我们就可以在路径操作函数中创建需要的依赖,就能直接获取会话。

这样,我们就可以直接从路径操作函数内部调用crud.get_user并使用该会话,来进行对数据库操作。

Tip

请注意,您返回的值是 SQLAlchemy 模型或 SQLAlchemy 模型列表。

但是由于所有路径操作的response_model都使用 Pydantic模型/使用orm_mode模式,因此您的 Pydantic 模型中声明的数据将从它们中提取并返回给客户端,并进行所有正常的过滤和验证。

Tip

另请注意,response_models应当是标准 Python 类型,例如List[schemas.Item].

但是由于它的内容/参数List是一个 使用orm_mode模式的Pydantic模型,所以数据将被正常检索并返回给客户端,所以没有问题。

关于 def 对比 async def

在这里,我们在路径操作函数和依赖项中都使用着 SQLAlchemy 模型,它将与外部数据库进行通信。

这会需要一些“等待时间”。

但是由于 SQLAlchemy 不具有await直接使用的兼容性,因此类似于:

user = await db.query(User).first()

…相反,我们可以使用:

user = db.query(User).first()

然后我们应该声明路径操作函数和不带 的依赖关系async def,只需使用普通的def,如下:

@app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user(db, user_id=user_id) ...

Info

如果您需要异步连接到关系数据库,请参阅Async SQL (Relational) Databases

Very Technical Details

如果您很好奇并且拥有深厚的技术知识,您可以在Async文档中查看有关如何处理 async defdef差别的技术细节。

迁移

因为我们直接使用 SQLAlchemy,并且我们不需要任何类型的插件来使用FastAPI,所以我们可以直接将数据库迁移至Alembic进行集成。

由于与 SQLAlchemy 和 SQLAlchemy 模型相关的代码位于单独的独立文件中,您甚至可以使用 Alembic 执行迁移,而无需安装 FastAPI、Pydantic 或其他任何东西。

同样,您将能够在与FastAPI无关的代码的其他部分中使用相同的 SQLAlchemy 模型和实用程序。

例如,在具有CeleryRQARQ的后台任务工作者中。

审查所有文件

最后回顾整个案例,您应该有一个名为的目录my_super_project,其中包含一个名为sql_app

sql_app中应该有以下文件:

  • sql_app/__init__.py:这是一个空文件。

  • sql_app/database.py

  1. from sqlalchemy import create_engine
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
  5. # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
  6. engine = create_engine(
  7. SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
  8. )
  9. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  10. Base = declarative_base()
  • sql_app/models.py:
  1. from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
  2. from sqlalchemy.orm import relationship
  3. from .database import Base
  4. class User(Base):
  5. __tablename__ = "users"
  6. id = Column(Integer, primary_key=True)
  7. email = Column(String, unique=True, index=True)
  8. hashed_password = Column(String)
  9. is_active = Column(Boolean, default=True)
  10. items = relationship("Item", back_populates="owner")
  11. class Item(Base):
  12. __tablename__ = "items"
  13. id = Column(Integer, primary_key=True)
  14. title = Column(String, index=True)
  15. description = Column(String, index=True)
  16. owner_id = Column(Integer, ForeignKey("users.id"))
  17. owner = relationship("User", back_populates="items")
  • sql_app/schemas.py:

Python 3.10+Python 3.9+Python 3.8+

  1. from pydantic import BaseModel
  2. class ItemBase(BaseModel):
  3. title: str
  4. description: str | None = None
  5. class ItemCreate(ItemBase):
  6. pass
  7. class Item(ItemBase):
  8. id: int
  9. owner_id: int
  10. class Config:
  11. orm_mode = True
  12. class UserBase(BaseModel):
  13. email: str
  14. class UserCreate(UserBase):
  15. password: str
  16. class User(UserBase):
  17. id: int
  18. is_active: bool
  19. items: list[Item] = []
  20. class Config:
  21. orm_mode = True
  1. from typing import Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str
  5. description: Union[str, None] = None
  6. class ItemCreate(ItemBase):
  7. pass
  8. class Item(ItemBase):
  9. id: int
  10. owner_id: int
  11. class Config:
  12. orm_mode = True
  13. class UserBase(BaseModel):
  14. email: str
  15. class UserCreate(UserBase):
  16. password: str
  17. class User(UserBase):
  18. id: int
  19. is_active: bool
  20. items: list[Item] = []
  21. class Config:
  22. orm_mode = True
  1. from typing import List, Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str
  5. description: Union[str, None] = None
  6. class ItemCreate(ItemBase):
  7. pass
  8. class Item(ItemBase):
  9. id: int
  10. owner_id: int
  11. class Config:
  12. orm_mode = True
  13. class UserBase(BaseModel):
  14. email: str
  15. class UserCreate(UserBase):
  16. password: str
  17. class User(UserBase):
  18. id: int
  19. is_active: bool
  20. items: List[Item] = []
  21. class Config:
  22. orm_mode = True
  • sql_app/crud.py:
  1. from sqlalchemy.orm import Session
  2. from . import models, schemas
  3. def get_user(db: Session, user_id: int):
  4. return db.query(models.User).filter(models.User.id == user_id).first()
  5. def get_user_by_email(db: Session, email: str):
  6. return db.query(models.User).filter(models.User.email == email).first()
  7. def get_users(db: Session, skip: int = 0, limit: int = 100):
  8. return db.query(models.User).offset(skip).limit(limit).all()
  9. def create_user(db: Session, user: schemas.UserCreate):
  10. fake_hashed_password = user.password + "notreallyhashed"
  11. db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
  12. db.add(db_user)
  13. db.commit()
  14. db.refresh(db_user)
  15. return db_user
  16. def get_items(db: Session, skip: int = 0, limit: int = 100):
  17. return db.query(models.Item).offset(skip).limit(limit).all()
  18. def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
  19. db_item = models.Item(**item.dict(), owner_id=user_id)
  20. db.add(db_item)
  21. db.commit()
  22. db.refresh(db_item)
  23. return db_item
  • sql_app/main.py:

Python 3.9+Python 3.8+

  1. from fastapi import Depends, FastAPI, HTTPException
  2. from sqlalchemy.orm import Session
  3. from . import crud, models, schemas
  4. from .database import SessionLocal, engine
  5. models.Base.metadata.create_all(bind=engine)
  6. app = FastAPI()
  7. # Dependency
  8. def get_db():
  9. db = SessionLocal()
  10. try:
  11. yield db
  12. finally:
  13. db.close()
  14. @app.post("/users/", response_model=schemas.User)
  15. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  16. db_user = crud.get_user_by_email(db, email=user.email)
  17. if db_user:
  18. raise HTTPException(status_code=400, detail="Email already registered")
  19. return crud.create_user(db=db, user=user)
  20. @app.get("/users/", response_model=list[schemas.User])
  21. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  22. users = crud.get_users(db, skip=skip, limit=limit)
  23. return users
  24. @app.get("/users/{user_id}", response_model=schemas.User)
  25. def read_user(user_id: int, db: Session = Depends(get_db)):
  26. db_user = crud.get_user(db, user_id=user_id)
  27. if db_user is None:
  28. raise HTTPException(status_code=404, detail="User not found")
  29. return db_user
  30. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  31. def create_item_for_user(
  32. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  33. ):
  34. return crud.create_user_item(db=db, item=item, user_id=user_id)
  35. @app.get("/items/", response_model=list[schemas.Item])
  36. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  37. items = crud.get_items(db, skip=skip, limit=limit)
  38. return items
  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException
  3. from sqlalchemy.orm import Session
  4. from . import crud, models, schemas
  5. from .database import SessionLocal, engine
  6. models.Base.metadata.create_all(bind=engine)
  7. app = FastAPI()
  8. # Dependency
  9. def get_db():
  10. db = SessionLocal()
  11. try:
  12. yield db
  13. finally:
  14. db.close()
  15. @app.post("/users/", response_model=schemas.User)
  16. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  17. db_user = crud.get_user_by_email(db, email=user.email)
  18. if db_user:
  19. raise HTTPException(status_code=400, detail="Email already registered")
  20. return crud.create_user(db=db, user=user)
  21. @app.get("/users/", response_model=List[schemas.User])
  22. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  23. users = crud.get_users(db, skip=skip, limit=limit)
  24. return users
  25. @app.get("/users/{user_id}", response_model=schemas.User)
  26. def read_user(user_id: int, db: Session = Depends(get_db)):
  27. db_user = crud.get_user(db, user_id=user_id)
  28. if db_user is None:
  29. raise HTTPException(status_code=404, detail="User not found")
  30. return db_user
  31. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  32. def create_item_for_user(
  33. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  34. ):
  35. return crud.create_user_item(db=db, item=item, user_id=user_id)
  36. @app.get("/items/", response_model=List[schemas.Item])
  37. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  38. items = crud.get_items(db, skip=skip, limit=limit)
  39. return items

执行项目

您可以复制这些代码并按原样使用它。

Info

事实上,这里的代码只是大多数测试代码的一部分。

你可以用 Uvicorn 运行它:

fast →uvicorn sql_app.main:app —reload
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

打开浏览器进入 http://127.0.0.1:8000/docs。

您将能够与您的FastAPI应用程序交互,从真实数据库中读取数据:

SQL (关系型) 数据库 - 图1

直接与数据库交互

如果您想独立于 FastAPI 直接浏览 SQLite 数据库(文件)以调试其内容、添加表、列、记录、修改数据等,您可以使用SQLite 的 DB Browser

它看起来像这样:

SQL (关系型) 数据库 - 图2

您还可以使用SQLite ViewerExtendsClass等在线 SQLite 浏览器。

中间件替代数据库会话

如果你不能使用带有yield的依赖项——例如,如果你没有使用Python 3.7并且不能安装上面提到的Python 3.6的“backports” ——你可以使用类似的方法在“中间件”中设置会话。

“中间件”基本上是一个对每个请求都执行的函数,其中一些代码在端点函数之前执行,另一些代码在端点函数之后执行。

创建中间件

我们要添加的中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemySessionLocal,将其添加到请求中,然后在请求完成后关闭它。

Python 3.9+Python 3.8+

  1. from fastapi import Depends, FastAPI, HTTPException, Request, Response
  2. from sqlalchemy.orm import Session
  3. from . import crud, models, schemas
  4. from .database import SessionLocal, engine
  5. models.Base.metadata.create_all(bind=engine)
  6. app = FastAPI()
  7. @app.middleware("http") async def db_session_middleware(request: Request, call_next):
  8. response = Response("Internal server error", status_code=500) try: request.state.db = SessionLocal() response = await call_next(request) finally: request.state.db.close() return response
  9. # Dependency
  10. def get_db(request: Request):
  11. return request.state.db
  12. @app.post("/users/", response_model=schemas.User)
  13. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  14. db_user = crud.get_user_by_email(db, email=user.email)
  15. if db_user:
  16. raise HTTPException(status_code=400, detail="Email already registered")
  17. return crud.create_user(db=db, user=user)
  18. @app.get("/users/", response_model=list[schemas.User])
  19. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  20. users = crud.get_users(db, skip=skip, limit=limit)
  21. return users
  22. @app.get("/users/{user_id}", response_model=schemas.User)
  23. def read_user(user_id: int, db: Session = Depends(get_db)):
  24. db_user = crud.get_user(db, user_id=user_id)
  25. if db_user is None:
  26. raise HTTPException(status_code=404, detail="User not found")
  27. return db_user
  28. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  29. def create_item_for_user(
  30. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  31. ):
  32. return crud.create_user_item(db=db, item=item, user_id=user_id)
  33. @app.get("/items/", response_model=list[schemas.Item])
  34. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  35. items = crud.get_items(db, skip=skip, limit=limit)
  36. return items
  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException, Request, Response
  3. from sqlalchemy.orm import Session
  4. from . import crud, models, schemas
  5. from .database import SessionLocal, engine
  6. models.Base.metadata.create_all(bind=engine)
  7. app = FastAPI()
  8. @app.middleware("http") async def db_session_middleware(request: Request, call_next):
  9. response = Response("Internal server error", status_code=500) try: request.state.db = SessionLocal() response = await call_next(request) finally: request.state.db.close() return response
  10. # Dependency
  11. def get_db(request: Request):
  12. return request.state.db
  13. @app.post("/users/", response_model=schemas.User)
  14. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  15. db_user = crud.get_user_by_email(db, email=user.email)
  16. if db_user:
  17. raise HTTPException(status_code=400, detail="Email already registered")
  18. return crud.create_user(db=db, user=user)
  19. @app.get("/users/", response_model=List[schemas.User])
  20. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  21. users = crud.get_users(db, skip=skip, limit=limit)
  22. return users
  23. @app.get("/users/{user_id}", response_model=schemas.User)
  24. def read_user(user_id: int, db: Session = Depends(get_db)):
  25. db_user = crud.get_user(db, user_id=user_id)
  26. if db_user is None:
  27. raise HTTPException(status_code=404, detail="User not found")
  28. return db_user
  29. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  30. def create_item_for_user(
  31. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  32. ):
  33. return crud.create_user_item(db=db, item=item, user_id=user_id)
  34. @app.get("/items/", response_model=List[schemas.Item])
  35. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  36. items = crud.get_items(db, skip=skip, limit=limit)
  37. return items

Info

我们将SessionLocal()请求的创建和处理放在一个try块中。

然后我们在finally块中关闭它。

通过这种方式,我们确保数据库会话在请求后始终关闭,即使在处理请求时出现异常也会关闭。

关于request.state

request.state是每个Request对象的属性。它用于存储附加到请求本身的任意对象,例如本例中的数据库会话。您可以在Starlette 的关于Requeststate的文档中了解更多信息。

对于这种情况下,它帮助我们确保在整个请求中使用单个数据库会话,然后关闭(在中间件中)。

使用yield依赖项与使用中间件的区别

在此处添加中间件yield的依赖项的作用效果类似,但也有一些区别:

  • 中间件需要更多的代码并且更复杂一些。
  • 中间件必须是一个async函数。
    • 如果其中有代码必须“等待”网络,它可能会在那里“阻止”您的应用程序并稍微降低性能。
    • 尽管这里的SQLAlchemy工作方式可能不是很成问题。
    • 但是,如果您向等待大量I/O的中间件添加更多代码,则可能会出现问题。
  • 每个请求都会运行一个中间件。
    • 将为每个请求创建一个连接。
    • 即使处理该请求的路径操作不需要数据库。

Tip

最好使用带有yield的依赖项,如果这足够满足用例需求

Info

带有yield的依赖项是最近刚加入FastAPI中的。

所以本教程的先前版本只有带有中间件的示例,并且可能有多个应用程序使用中间件进行数据库会话管理。