为犯罪数据管理系统添加身份验证与授权

为了保护数据,蝙蝠侠实现了身份验证和授权。他决定使用 JWT (JSON Web Token)** 进行身份验证,同时创建了用户表和用户注册端点。


1. 用户模型 User

蝙蝠侠新增了用户表,用于存储可以访问该系统的用户信息。

用户模型 models.py

  1. from sqlalchemy import Column, Integer, String
  2. from sqlalchemy.orm import declarative_base
  3. Base = declarative_base()
  4. class User(Base):
  5. __tablename__ = "users"
  6. id = Column(Integer, primary_key=True, index=True)
  7. username = Column(String, unique=True, index=True)
  8. hashed_password = Column(String)
  9. def __repr__(self):
  10. return f"<User(id={self.id}, username={self.username}, hashed_password={self.hashed_password})>"

2. 用户相关的 CRUD 操作

crud.py 中新增用户操作

安装密码哈希库:pip install passlib[bcrypt]

  1. from sqlalchemy.orm import Session
  2. from .models import User
  3. from passlib.context import CryptContext
  4. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  5. # 哈希密码
  6. def get_password_hash(password: str) -> str:
  7. return pwd_context.hash(password)
  8. # 验证密码
  9. def verify_password(plain_password: str, hashed_password: str) -> bool:
  10. return pwd_context.verify(plain_password, hashed_password)
  11. # 获取用户
  12. def get_user_by_username(db: Session, username: str):
  13. return db.query(User).filter(User.username == username).first()
  14. # 创建用户
  15. def create_user(db: Session, user: dict):
  16. hashed_password = get_password_hash(user["password"])
  17. db_user = User(username=user["username"], hashed_password=hashed_password)
  18. db.add(db_user)
  19. db.commit()
  20. db.refresh(db_user)
  21. return db_user

3. JWT 工具

安装依赖:

  • pip install "python-jose[cryptography]"

crud.py 中新增 JWT 实用方法

  1. from datetime import datetime, timedelta
  2. from jose import JWTError, jwt
  3. SECRET_KEY = "your_secret_key"
  4. ALGORITHM = "HS256"
  5. # 生成访问令牌
  6. def create_access_token(data: dict, expires_delta: timedelta = None):
  7. to_encode = data.copy()
  8. expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
  9. to_encode.update({"exp": expire})
  10. return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  11. # 验证并解码令牌
  12. def decode_access_token(token: str):
  13. try:
  14. return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  15. except JWTError:
  16. return None
  17. # 验证用户
  18. def authenticate_user(db: Session, username: str, password: str):
  19. user = get_user_by_username(db, username)
  20. if user and verify_password(password, user.hashed_password):
  21. return create_access_token({"sub": username})
  22. return None

4. 用户注册和登录端点

蝙蝠侠新增了两个端点,分别用于用户注册和登录。

用户端点 __main__.py

  1. from . import crud
  2. from sqlalchemy.orm import Session
  3. @app.post("/users/register")
  4. async def register_user(request):
  5. user_data = request.json()
  6. with SessionLocal() as db:
  7. created_user = crud.create_user(db, user_data)
  8. return {"message": "User registered successfully", "user": created_user.username}
  9. @app.post("/users/login")
  10. async def login_user(request):
  11. credentials = request.json()
  12. with SessionLocal() as db:
  13. token = crud.authenticate_user(db, credentials["username"], credentials["password"])
  14. if token is None:
  15. return {"error": "Invalid credentials", "status_code": 401}
  16. return {"access_token": token, "token_type": "bearer"}

通过实现用户身份验证和授权,蝙蝠侠进一步保障了犯罪数据管理系统的安全性,确保只有授权用户才能访问敏感数据。