Info
这些文档即将被更新。🎉
当前版本假设Pydantic v1和SQLAlchemy版本小于2。
新的文档将包括Pydantic v2以及 SQLModel(也是基于SQLAlchemy),一旦SQLModel更新为为使用Pydantic v2。
FastAPI不需要你使用SQL(关系型)数据库。
但是您可以使用任何您想要的关系型数据库。
在这里,让我们看一个使用着SQLAlchemy的示例。
您可以很容易地将其调整为任何SQLAlchemy支持的数据库,如:
- PostgreSQL
- MySQL
- SQLite
- Oracle
- Microsoft SQL Server,等等其它数据库
在此示例中,我们将使用SQLite,因为它使用单个文件并且 在Python中具有集成支持。因此,您可以复制此示例并按原样来运行它。
稍后,对于您的产品级别的应用程序,您可能会要使用像PostgreSQL这样的数据库服务器。
这儿有一个FastAPI和PostgreSQL的官方项目生成器,全部基于Docker,包括前端和更多工具:https://github.com/tiangolo/full-stack-fastapi-postgresql
请注意,大部分代码是
SQLAlchemy
的标准代码,您可以用于任何框架。FastAPI 特定的代码和往常一样少。
ORMs(对象关系映射)
FastAPI可与任何数据库在任何样式的库中一起与 数据库进行通信。
一种常见的模式是使用“ORM”:对象关系映射。
ORM 具有在代码和数据库表(“关系型”)中的对象之间转换(“映射*”)的工具。
使用 ORM,您通常会在 SQL 数据库中创建一个代表映射的类,该类的每个属性代表一个列,具有名称和类型。
例如,一个类Pet
可以表示一个 SQL 表pets
。
该类的每个实例对象都代表数据库中的一行数据。
又例如,一个对象orion_cat
(Pet
的一个实例)可以有一个属性orion_cat.type
, 对标数据库中的type
列。并且该属性的值可以是其它,例如"cat"
。
这些 ORM 还具有在表或实体之间建立关系的工具(比如创建多表关系)。
这样,您还可以拥有一个属性orion_cat.owner
,它包含该宠物所有者的数据,这些数据取自另外一个表。
因此,orion_cat.owner.name
可能是该宠物主人的姓名(来自表owners
中的列name
)。
它可能有一个像"Arquilian"
(一种业务逻辑)。
当您尝试从您的宠物对象访问它时,ORM 将完成所有工作以从相应的表所有者那里再获取信息。
常见的 ORM 例如:Django-ORM(Django 框架的一部分)、SQLAlchemy ORM(SQLAlchemy 的一部分,独立于框架)和 Peewee(独立于框架)等。
在这里,我们将看到如何使用SQLAlchemy ORM。
以类似的方式,您也可以使用任何其他 ORM。
在文档中也有一篇使用 Peewee 的等效的文章。
文件结构
对于这些示例,假设您有一个名为的目录my_super_project
,其中包含一个名为的子目录sql_app
,其结构如下:
.
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
├── models.py
└── schemas.py
该文件__init__.py
只是一个空文件,但它告诉 Python sql_app
是一个包。
现在让我们看看每个文件/模块的作用。
安装 SQLAlchemy
首先你需要安装SQLAlchemy
:
fast →pip install sqlalchemy
████████████████████████████████████████ 100%
restart ↻
创建 SQLAlchemy 部件
让我们转到文件sql_app/database.py
。
导入 SQLAlchemy 部件
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
为 SQLAlchemy 定义数据库 URL地址
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
在这个例子中,我们正在“连接”到一个 SQLite 数据库(用 SQLite 数据库打开一个文件)。
该文件将位于文件中的同一目录中sql_app.db
。
这就是为什么最后一部分是./sql_app.db
.
如果您使用的是PostgreSQL数据库,则只需取消注释该行:
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
…并根据您的数据库数据和相关凭据(也适用于 MySQL、MariaDB 或任何其他)对其进行调整。
如果您想使用不同的数据库,这是就是您必须修改的地方。
创建 SQLAlchemy 引擎
第一步,创建一个 SQLAlchemy的“引擎”。
我们稍后会将这个engine
在其他地方使用。
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
注意
参数:
connect_args={"check_same_thread": False}
…仅用于SQLite
,在其他数据库不需要它。
默认情况下,SQLite 只允许一个线程与其通信,假设有多个线程的话,也只将处理一个独立的请求。
这是为了防止意外地为不同的事物(不同的请求)共享相同的连接。
但是在 FastAPI 中,使用普通函数(def)时,多个线程可以为同一个请求与数据库交互,所以我们需要使用connect_args={"check_same_thread": False}
来让SQLite允许这样。
此外,我们将确保每个请求都在依赖项中获得自己的数据库连接会话,因此不需要该默认机制。
创建一个SessionLocal
类
每个SessionLocal
类的实例都会是一个数据库会话。当然该类本身还不是数据库会话。
但是一旦我们创建了一个SessionLocal
类的实例,这个实例将是实际的数据库会话。
我们将它命名为SessionLocal
是为了将它与我们从 SQLAlchemy 导入的Session
区别开来。
稍后我们将使用Session
(从 SQLAlchemy 导入的那个)。
要创建SessionLocal
类,请使用函数sessionmaker
:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
创建一个Base
类
现在我们将使用declarative_base()
返回一个类。
稍后我们将继承这个类,来创建每个数据库模型或类(ORM 模型):
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
创建数据库模型
现在让我们看看文件 sql_app/models.py
。
用Base
类来创建 SQLAlchemy 模型
我们将使用我们之前创建的Base
类来创建 SQLAlchemy 模型。
Tip
SQLAlchemy 使用的“模型”这个术语 来指代与数据库交互的这些类和实例。
而 Pydantic 也使用“模型”这个术语 来指代不同的东西,即数据验证、转换以及文档类和实例。
从database
(来自上面的database.py
文件)导入Base
。
创建从它继承的类。
这些类就是 SQLAlchemy 模型。
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
这个__tablename__
属性是用来告诉 SQLAlchemy 要在数据库中为每个模型使用的数据库表的名称。
创建模型属性/列
现在创建所有模型(类)的属性。
这些属性中的每一个都代表其相应数据库表中的一列。
我们使用Column
来表示 SQLAlchemy 中的默认值。
我们传递一个 SQLAlchemy “类型”,如Integer
、String
和Boolean
,它定义了数据库中的类型,作为参数。
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True) email = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True) title = Column(String, index=True) description = Column(String, index=True) owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
创建关系
现在创建关系。
为此,我们使用SQLAlchemy ORM提供的relationship
。
这将或多或少会成为一种“神奇”属性,其中表示该表与其他相关的表中的值。
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
当访问 user 中的属性items
时,如 中my_user.items
,它将有一个Item
SQLAlchemy 模型列表(来自items
表),这些模型具有指向users
表中此记录的外键。
当您访问my_user.items
时,SQLAlchemy 实际上会从items
表中的获取一批记录并在此处填充进去。
同样,当访问 Item中的属性owner
时,它将包含表中的User
SQLAlchemy 模型users
。使用owner_id
属性/列及其外键来了解要从users
表中获取哪条记录。
创建 Pydantic 模型
现在让我们查看一下文件sql_app/schemas.py
。
Tip
为了避免 SQLAlchemy模型和 Pydantic模型之间的混淆,我们将有models.py
(SQLAlchemy 模型的文件)和schemas.py
( Pydantic 模型的文件)。
这些 Pydantic 模型或多或少地定义了一个“schema”(一个有效的数据形状)。
因此,这将帮助我们在使用两者时避免混淆。
创建初始 Pydantic模型/模式
创建一个ItemBase
和UserBase
Pydantic模型(或者我们说“schema”),他们拥有创建或读取数据时具有的共同属性。
然后创建一个继承自他们的ItemCreate
和UserCreate
,并添加创建时所需的其他数据(或属性)。
因此在创建时也应当有一个password
属性。
但是为了安全起见,password
不会出现在其他同类 Pydantic模型中,例如通过API读取一个用户数据时,它不应当包含在内。
Python 3.10+Python 3.9+Python 3.8+
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
SQLAlchemy 风格和 Pydantic 风格
请注意,SQLAlchemy模型使用 =
来定义属性,并将类型作为参数传递给Column
,例如:
name = Column(String)
虽然 Pydantic模型使用:
声明类型,但新的类型注释语法/类型提示是:
name: str
请牢记这一点,这样您在使用:
还是=
时就不会感到困惑。
创建用于读取/返回的Pydantic模型/模式
现在创建当从 API 返回数据时、将在读取数据时使用的Pydantic模型(schemas)。
例如,在创建一个项目之前,我们不知道分配给它的 ID 是什么,但是在读取它时(从 API 返回时)我们已经知道它的 ID。
同样,当读取用户时,我们现在可以声明items
,将包含属于该用户的项目。
不仅是这些项目的 ID,还有我们在 Pydantic模型中定义的用于读取项目的所有数据:Item
.
Python 3.10+Python 3.9+Python 3.8+
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int is_active: bool items: list[Item] = []
class Config:
orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int is_active: bool items: list[Item] = []
class Config:
orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int is_active: bool items: List[Item] = []
class Config:
orm_mode = True
Tip
请注意,读取用户(从 API 返回)时将使用不包括password
的User
Pydantic模型。
使用 Pydantic 的orm_mode
现在,在用于查询的 Pydantic模型Item
中User
,添加一个内部Config
类。
此类Config
用于为 Pydantic 提供配置。
在Config
类中,设置属性orm_mode = True
。
Python 3.10+Python 3.9+Python 3.8+
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config: orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config: orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config: orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config: orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config: orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config: orm_mode = True
Tip
请注意,它使用=
分配一个值,例如:
orm_mode = True
它不使用之前的:
来类型声明。
这是设置配置值,而不是声明类型。
Pydanticorm_mode
将告诉 Pydantic模型读取数据,即它不是一个dict
,而是一个 ORM 模型(或任何其他具有属性的任意对象)。
这样,而不是仅仅试图从dict
上 id
中获取值,如下所示:
id = data["id"]
它还会尝试从属性中获取它,如:
id = data.id
有了这个,Pydantic模型与 ORM 兼容,您只需在路径操作response_model
的参数中声明它即可。
您将能够返回一个数据库模型,它将从中读取数据。
ORM 模式的技术细节
SQLAlchemy 和许多其他默认情况下是“延迟加载”。
这意味着,例如,除非您尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。
例如,访问属性items
:
current_user.items
将使 SQLAlchemy 转到items
表并获取该用户的项目,在调用.items
之前不会去查询数据库。
没有orm_mode
,如果您从路径操作返回一个 SQLAlchemy 模型,它不会包含关系数据。
即使您在 Pydantic 模型中声明了这些关系,也没有用处。
但是在 ORM 模式下,由于 Pydantic 本身会尝试从属性访问它需要的数据(而不是假设为 dict
),你可以声明你想要返回的特定数据,它甚至可以从 ORM 中获取它。
CRUD工具
现在让我们看看文件sql_app/crud.py
。
在这个文件中,我们将编写可重用的函数用来与数据库中的数据进行交互。
CRUD分别为:增加(Create)、查询(Read)、更改(Update)、删除(Delete),即增删改查。
…虽然在这个例子中我们只是新增和查询。
读取数据
从 sqlalchemy.orm
中导入Session
,这将允许您声明db
参数的类型,并在您的函数中进行更好的类型检查和完成。
导入之前的models
(SQLAlchemy 模型)和schemas
(Pydantic模型/模式)。
创建一些工具函数来完成:
- 通过 ID 和电子邮件查询单个用户。
- 查询多个用户。
- 查询多个项目。
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
Tip
通过创建仅专用于与数据库交互(获取用户或项目)的函数,独立于路径操作函数,您可以更轻松地在多个部分中重用它们,并为它们添加单元测试。
创建数据
现在创建工具函数来创建数据。
它的步骤是:
- 使用您的数据创建一个 SQLAlchemy 模型实例。
- 使用
add
来将该实例对象添加到数据库会话。 - 使用
commit
来将更改提交到数据库(以便保存它们)。 - 使用
refresh
来刷新您的实例对象(以便它包含来自数据库的任何新数据,例如生成的 ID)。
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item
Tip
SQLAlchemy 模型User
包含一个hashed_password
,它应该是一个包含散列的安全密码。
但由于 API 客户端提供的是原始密码,因此您需要将其提取并在应用程序中生成散列密码。
然后将hashed_password参数与要保存的值一起传递。
Warning
此示例不安全,密码未经过哈希处理。
在现实生活中的应用程序中,您需要对密码进行哈希处理,并且永远不要以明文形式保存它们。
有关更多详细信息,请返回教程中的安全部分。
在这里,我们只关注数据库的工具和机制。
Tip
这里不是将每个关键字参数传递给Item并从Pydantic模型中读取每个参数,而是先生成一个字典,其中包含Pydantic模型的数据:
item.dict()
然后我们将dict的键值对 作为关键字参数传递给 SQLAlchemy Item
:
Item(**item.dict())
然后我们传递 Pydantic模型未提供的额外关键字参数owner_id
:
Item(**item.dict(), owner_id=user_id)
主FastAPI应用程序
现在在sql_app/main.py
文件中 让我们集成和使用我们之前创建的所有其他部分。
创建数据库表
以非常简单的方式创建数据库表:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
Alembic 注意
通常你可能会使用 Alembic,来进行格式化数据库(创建表等)。
而且您还可以将 Alembic 用于“迁移”(这是它的主要工作)。
“迁移”是每当您更改 SQLAlchemy 模型的结构、添加新属性等以在数据库中复制这些更改、添加新列、新表等时所需的一组步骤。
您可以在Project Generation - Template的模板中找到一个 FastAPI 项目中的 Alembic 示例。具体在alembic
代码目录中。
创建依赖项
现在使用我们在sql_app/database.py
文件中创建的SessionLocal
来创建依赖项。
我们需要每个请求有一个独立的数据库会话/连接(SessionLocal
),在整个请求中使用相同的会话,然后在请求完成后关闭它。
然后将为下一个请求创建一个新会话。
为此,我们将创建一个包含yield
的依赖项,正如前面关于Dependencies withyield
的部分中所解释的那样。
我们的依赖项将创建一个新的 SQLAlchemy SessionLocal
,它将在单个请求中使用,然后在请求完成后关闭它。
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal() try: yield db finally: db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal() try: yield db finally: db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
Info
我们将SessionLocal()
请求的创建和处理放在一个try
块中。
然后我们在finally块中关闭它。
通过这种方式,我们确保数据库会话在请求后始终关闭。即使在处理请求时出现异常。
但是您不能从退出代码中引发另一个异常(在yield之后)。可以查阅 Dependencies with yield and HTTPException
然后,当在路径操作函数中使用依赖项时,我们使用Session
,直接从 SQLAlchemy 导入的类型声明它。
这将为我们在路径操作函数中提供更好的编辑器支持,因为编辑器将知道db
参数的类型Session
:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
技术细节
参数db
实际上是 type SessionLocal
,但是这个类(用 创建sessionmaker()
)是 SQLAlchemy 的“代理” Session
,所以,编辑器并不真正知道提供了哪些方法。
但是通过将类型声明为Session,编辑器现在可以知道可用的方法(.add()、.query()、.commit()等)并且可以提供更好的支持(比如完成)。类型声明不影响实际对象。
创建您的FastAPI 路径操作
现在,到了最后,编写标准的FastAPI 路径操作代码。
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit) return users
@app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item) def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item]) def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit) return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit) return users
@app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item) def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item]) def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit) return items
我们在依赖项中的每个请求之前利用yield
创建数据库会话,然后关闭它。
所以我们就可以在路径操作函数中创建需要的依赖,就能直接获取会话。
这样,我们就可以直接从路径操作函数内部调用crud.get_user
并使用该会话,来进行对数据库操作。
Tip
请注意,您返回的值是 SQLAlchemy 模型或 SQLAlchemy 模型列表。
但是由于所有路径操作的response_model都使用 Pydantic模型/使用orm_mode模式,因此您的 Pydantic 模型中声明的数据将从它们中提取并返回给客户端,并进行所有正常的过滤和验证。
Tip
另请注意,response_models
应当是标准 Python 类型,例如List[schemas.Item]
.
但是由于它的内容/参数List是一个 使用orm_mode模式的Pydantic模型,所以数据将被正常检索并返回给客户端,所以没有问题。
关于 def
对比 async def
在这里,我们在路径操作函数和依赖项中都使用着 SQLAlchemy 模型,它将与外部数据库进行通信。
这会需要一些“等待时间”。
但是由于 SQLAlchemy 不具有await
直接使用的兼容性,因此类似于:
user = await db.query(User).first()
…相反,我们可以使用:
user = db.query(User).first()
然后我们应该声明路径操作函数和不带 的依赖关系async def
,只需使用普通的def
,如下:
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
...
Info
如果您需要异步连接到关系数据库,请参阅Async SQL (Relational) Databases
Very Technical Details
如果您很好奇并且拥有深厚的技术知识,您可以在Async文档中查看有关如何处理 async def
于def
差别的技术细节。
迁移
因为我们直接使用 SQLAlchemy,并且我们不需要任何类型的插件来使用FastAPI,所以我们可以直接将数据库迁移至Alembic进行集成。
由于与 SQLAlchemy 和 SQLAlchemy 模型相关的代码位于单独的独立文件中,您甚至可以使用 Alembic 执行迁移,而无需安装 FastAPI、Pydantic 或其他任何东西。
同样,您将能够在与FastAPI无关的代码的其他部分中使用相同的 SQLAlchemy 模型和实用程序。
审查所有文件
最后回顾整个案例,您应该有一个名为的目录my_super_project
,其中包含一个名为sql_app
。
sql_app
中应该有以下文件:
sql_app/__init__.py
:这是一个空文件。sql_app/database.py
:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
sql_app/models.py
:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
sql_app/schemas.py
:
Python 3.10+Python 3.9+Python 3.8+
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
sql_app/crud.py
:
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
sql_app/main.py
:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
执行项目
您可以复制这些代码并按原样使用它。
Info
事实上,这里的代码只是大多数测试代码的一部分。
你可以用 Uvicorn 运行它:
fast →uvicorn sql_app.main:app —reload
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
打开浏览器进入 http://127.0.0.1:8000/docs。
您将能够与您的FastAPI应用程序交互,从真实数据库中读取数据:
直接与数据库交互
如果您想独立于 FastAPI 直接浏览 SQLite 数据库(文件)以调试其内容、添加表、列、记录、修改数据等,您可以使用SQLite 的 DB Browser
它看起来像这样:
您还可以使用SQLite Viewer或ExtendsClass等在线 SQLite 浏览器。
中间件替代数据库会话
如果你不能使用带有yield
的依赖项——例如,如果你没有使用Python 3.7并且不能安装上面提到的Python 3.6的“backports” ——你可以使用类似的方法在“中间件”中设置会话。
“中间件”基本上是一个对每个请求都执行的函数,其中一些代码在端点函数之前执行,另一些代码在端点函数之后执行。
创建中间件
我们要添加的中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemySessionLocal
,将其添加到请求中,然后在请求完成后关闭它。
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
@app.middleware("http") async def db_session_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500) try: request.state.db = SessionLocal() response = await call_next(request) finally: request.state.db.close() return response
# Dependency
def get_db(request: Request):
return request.state.db
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
@app.middleware("http") async def db_session_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500) try: request.state.db = SessionLocal() response = await call_next(request) finally: request.state.db.close() return response
# Dependency
def get_db(request: Request):
return request.state.db
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
Info
我们将SessionLocal()
请求的创建和处理放在一个try
块中。
然后我们在finally块中关闭它。
通过这种方式,我们确保数据库会话在请求后始终关闭,即使在处理请求时出现异常也会关闭。
关于request.state
request.state
是每个Request
对象的属性。它用于存储附加到请求本身的任意对象,例如本例中的数据库会话。您可以在Starlette 的关于Request
state的文档中了解更多信息。
对于这种情况下,它帮助我们确保在整个请求中使用单个数据库会话,然后关闭(在中间件中)。
使用yield
依赖项与使用中间件的区别
在此处添加中间件与yield
的依赖项的作用效果类似,但也有一些区别:
- 中间件需要更多的代码并且更复杂一些。
- 中间件必须是一个
async
函数。- 如果其中有代码必须“等待”网络,它可能会在那里“阻止”您的应用程序并稍微降低性能。
- 尽管这里的
SQLAlchemy
工作方式可能不是很成问题。 - 但是,如果您向等待大量I/O的中间件添加更多代码,则可能会出现问题。
- 每个请求都会运行一个中间件。
- 将为每个请求创建一个连接。
- 即使处理该请求的路径操作不需要数据库。
Tip
最好使用带有yield的依赖项,如果这足够满足用例需求
Info
带有yield
的依赖项是最近刚加入FastAPI中的。
所以本教程的先前版本只有带有中间件的示例,并且可能有多个应用程序使用中间件进行数据库会话管理。