为犯罪数据管理系统添加身份验证与授权
为了保护数据,蝙蝠侠实现了身份验证和授权。他决定使用 JWT (JSON Web Token)** 进行身份验证,同时创建了用户表和用户注册端点。
1. 用户模型 User
蝙蝠侠新增了用户表,用于存储可以访问该系统的用户信息。
用户模型 models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
def __repr__(self):
return f"<User(id={self.id}, username={self.username}, hashed_password={self.hashed_password})>"
2. 用户相关的 CRUD 操作
在 crud.py
中新增用户操作
安装密码哈希库:pip install passlib[bcrypt]
from sqlalchemy.orm import Session
from .models import User
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 哈希密码
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
# 验证密码
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# 获取用户
def get_user_by_username(db: Session, username: str):
return db.query(User).filter(User.username == username).first()
# 创建用户
def create_user(db: Session, user: dict):
hashed_password = get_password_hash(user["password"])
db_user = User(username=user["username"], hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
3. JWT 工具
安装依赖:
pip install "python-jose[cryptography]"
在 crud.py
中新增 JWT 实用方法
from datetime import datetime, timedelta
from jose import JWTError, jwt
SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"
# 生成访问令牌
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# 验证并解码令牌
def decode_access_token(token: str):
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
return None
# 验证用户
def authenticate_user(db: Session, username: str, password: str):
user = get_user_by_username(db, username)
if user and verify_password(password, user.hashed_password):
return create_access_token({"sub": username})
return None
4. 用户注册和登录端点
蝙蝠侠新增了两个端点,分别用于用户注册和登录。
用户端点 __main__.py
from . import crud
from sqlalchemy.orm import Session
@app.post("/users/register")
async def register_user(request):
user_data = request.json()
with SessionLocal() as db:
created_user = crud.create_user(db, user_data)
return {"message": "User registered successfully", "user": created_user.username}
@app.post("/users/login")
async def login_user(request):
credentials = request.json()
with SessionLocal() as db:
token = crud.authenticate_user(db, credentials["username"], credentials["password"])
if token is None:
return {"error": "Invalid credentials", "status_code": 401}
return {"access_token": token, "token_type": "bearer"}
通过实现用户身份验证和授权,蝙蝠侠进一步保障了犯罪数据管理系统的安全性,确保只有授权用户才能访问敏感数据。