数据表基本class设计:
#encoding: utf-8
from apps.app import db
from apps.configs.db_config import DB_CONFIG
from datetime import datetime
import time
import copy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy_serializer import SerializerMixin
from sqlalchemy import text
from apps.app import logger
def current_timestamp():
"""
返回毫秒级时间戳
:return:
"""
return int(round(time.time() * 1000))
class BaseModel(db.Model, SerializerMixin):
"""
设为db基类。
SerializerMixin为数据序列化插件
"""
# Flask-SQLAlchemy创建table时,如何声明基类(这个类不会创建表,可以被继承)
# 方法就是把__abstract__这个属性设置为True,这个类为基类,不会被创建为表!
__abstract__ = True
# 添加配置设置编码
# __table_args__ = {
# 'mysql_charset': DB_CONFIG['mysql']['charset']
# }
@declared_attr
def __tablename__(cls):
# 将表类名自动小写,_分割,去掉末尾的"Model"字样
if "_" in cls.__name__:
name =cls.__name__
else:
name = ''.join([('_' + ch.lower()) if ch.isupper() else ch
for ch in cls.__name__]).strip('_')
if name[-6:] == "_model":
name = name[:-6]
_table_prefix = DB_CONFIG['mysql']['prefix']
return _table_prefix + name if _table_prefix else name
# 每个表都自动带上创建时间、更新时间
# server_default表示数据库字段默认值,会写入到数据库字段定义中。且值只接受字符串,不接收整型、布尔型等,需要用text()函数格式化。
create_time = db.Column(db.BigInteger, default=current_timestamp, server_default=text('0'), comment='创建时间') # 存储毫秒级时间戳
# create_time = db.Column(db.Integer, default=time.time, comment='创建时间') # 存储秒级时间戳
create_by = db.Column(db.String(100), default="", server_default='', comment='创建人')
update_time = db.Column(db.DateTime, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'), onupdate=datetime.now, server_onupdate=text('CURRENT_TIMESTAMP'), comment='更新时间')
update_by = db.Column(db.String(100), default="", server_default='', comment='更新人')
def toDict(self):
"""
返回dict结果
用法:
r = model.query.……
> r.toDict() # 输出dict结果数据
> r.toDict().__str__() # 输出字符串结果数据
:return: c_dict
"""
# 使用深拷贝,避免引用对象self.__dict__缺少"_sa_instance_state"属性
c_dict = copy.deepcopy(self.__dict__)
for (k, v) in c_dict.items():
# 将datetime内容格式化为[Y-m-d H:M:S]
if isinstance(v, datetime):
c_dict[k] = v.strftime("%Y-%m-%d %H:%M:%S")
try:
if "_sa_instance_state" in c_dict:
del c_dict["_sa_instance_state"]
# if "create_time" in c_dict:
# del c_dict["create_time"]
if "update_time" in c_dict:
del c_dict["update_time"]
except Exception as e:
logger.error(e)
return c_dict
用户相关表结构定义:
# encoding:utf-8
# 用户模型
from apps.app import db
from apps.models.base import BaseModel
from apps.configs.db_config import DB_CONFIG
import shortuuid
from werkzeug.security import generate_password_hash, check_password_hash
from apps.models.enums import GenderEnumModel, DomainEnumModel
from flask_login import UserMixin, current_user, AnonymousUserMixin
from apps.app import logger
class UserPermissionModel(BaseModel):
"""
权限集表
角色权限关系:
UserPermissionModel:存储每个页面/操作权限的权限编码;
UserRoleModel:角色表;
role_to_permission:角色的权限通过角色权限关联表进行一对多关联;
UserModel:用户可多选角色;
role_to_user:用户与角色通过用户角色关联表进行一对多关联;
"""
# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_permission'
__table_args__ = {'comment': '菜单权限表'} # 添加索引和表注释
# 序列化排除字段
serialize_rules = ("-roles",)
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(100), default='', comment="权限名称")
code = db.Column(db.String(50), nullable=False, comment='权限编码', unique=True)
parent_id = db.Column(db.Integer, default=0, comment="上级权限ID")
type = db.Column(db.Integer, comment="权限类型:1 目录,2 菜单,3 按钮,9 接口")
url = db.Column(db.String(100), default="", comment='请求链接')
is_frame = db.Column(db.Integer, default=0, comment="是否外链:0 否,1 是")
explain = db.Column(db.String(255), default='', comment="说明")
domain = db.Column(db.Enum(DomainEnumModel), default=DomainEnumModel.ADMIN, unique=False, comment='权限所属域')
sort_num = db.Column(db.Integer, default=10, comment='排序码,数值越小越靠前')
is_default = db.Column(db.Integer, default=0, comment="是否默认拥有的权限:0 非默认;1 默认权限")
status = db.Column(db.Integer, default=0, comment='状态:0 正常/显示,-1 禁用/隐藏')
is_delete = db.Column(db.Integer, default=0, comment='是否删除:0 正常,1 表示已删除')
# # 上级权限id
# parent_id = db.Column(db.Integer, db.ForeignKey('%s.id' % (DB_CONFIG['mysql']['prefix'] + 'user_permission')), nullable=True, comment='上级权限id')
# # 当上级只有一个时,直接关联上级即可。自引用UserPermissionModel.id,需加入remote_side参数
# parent = db.relationship('UserPermissionModel', remote_side=[id], backref="children")
def default_permission(self, domain=None):
"""
查询未删除的默认权限集
:param domain: 如果传入站点标识,则只查询该站点的默认权限集
:return:
"""
# 查询条件
q = [UserPermissionModel.is_delete==0]
if domain:
q.append(UserPermissionModel.domain==domain)
return UserPermissionModel.query.filter(*q).all()
def all_permission(self, domain=None):
"""
查询全部未删除的权限集
:param domain: 如果传入站点标识,则只查询该站点的全部权限集
:return:
"""
# 查询条件
q = [UserPermissionModel.is_delete == 0]
if domain:
q.append(UserPermissionModel.domain == domain)
return UserPermissionModel.query.filter(*q).all()
def has_permission(self, code):
"""
判断权限是否有入库
:param code <UserPermissionModel.code>:
:return:
"""
permission = UserPermissionModel.query.filter(UserPermissionModel.is_delete==0, UserPermissionModel.code==code).first()
if permission:
return True
else:
return False
# 角色-权限 关联表
role_to_permission = db.Table(
DB_CONFIG['mysql']['prefix'] + 'user_role_to_permission',
db.Column('role_id', db.String(100), db.ForeignKey('%s.id' % (DB_CONFIG['mysql']['prefix'] + 'user_role')),
primary_key=True),
db.Column('permission_id', db.Integer, db.ForeignKey('%s.id' % UserPermissionModel.__tablename__), primary_key=True)
)
class UserRoleModel(BaseModel):
# 用户角色表
# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_role'
__table_args__ = {'comment': '用户角色表'} # 添加索引和表注释
serialize_rules = ("-users", "-permissions")
# serialize_only = ("-users",)
id = db.Column(db.String(100), primary_key=True, default=shortuuid.uuid, comment='唯一标识')
code = db.Column(db.String(100), unique=False, comment='角色编码')
name = db.Column(db.String(50), default='', comment='角色名称')
desc = db.Column(db.String(100), default='', comment='角色介绍')
domain = db.Column(db.Enum(DomainEnumModel), default=DomainEnumModel.ADMIN, unique=False, comment='角色所属域')
sort_num = db.Column(db.Integer, default=10, comment='排序码,数值越小越靠前')
is_default = db.Column(db.Integer, default=0, comment='默认角色:0 非默认,1 默认')
status = db.Column(db.Integer, default=0, comment='状态:0 正常,-1 禁用')
is_delete = db.Column(db.Integer, default=0, comment='是否删除:0 正常,1 表示已删除')
# 角色权限
permissions = db.relationship('UserPermissionModel', secondary=role_to_permission, backref='roles')
def toDict(self):
# 重载基类toDict方法
c_dict = BaseModel.toDict(self)
try:
if "domain" in c_dict:
# 枚举类型处理
c_dict["domain_name"] = self.domain.name
c_dict["domain_value"] = self.domain.value
del c_dict["domain"]
except Exception as e:
logger.error(e)
return c_dict
@property
def role_info(self):
info = {
# 'id': self.id,
'name': self.name,
'code': self.code,
'domain': self.domain.name,
'status': self.status
}
return info
# 用户-角色 关联表
role_to_user = db.Table(
DB_CONFIG['mysql']['prefix'] + 'user_role_to_user',
db.Column('role_id', db.String(100), db.ForeignKey('%s.id' % UserRoleModel.__tablename__), primary_key=True),
db.Column('user_id', db.String(100), db.ForeignKey('%s.id' % (DB_CONFIG['mysql']['prefix'] + 'user')),
primary_key=True)
)
class UserModel(BaseModel, UserMixin):
# 用户信息表
__tablename__ = DB_CONFIG['mysql']['prefix'] + 'user'
__table_args__ = {'comment': '用户信息表'} # 添加索引和表注释
# 序列化排除字段
serialize_rules = ("-_password",)
# serialize_only = ("username", "telephone")
id = db.Column(db.String(100), primary_key=True, default=shortuuid.uuid, comment='用户标识')
username = db.Column(db.String(50), nullable=False, comment='用户名')
telephone = db.Column(db.String(11), unique=False, default="", comment='手机号')
_password = db.Column(db.String(100), nullable=False, comment='密码')
email = db.Column(db.String(50), default="", comment='邮箱')
avatar_url = db.Column(db.String(100), default="", comment='头像地址')
signature = db.Column(db.String(100), default="", comment='个性签名')
realname = db.Column(db.String(50), default="", comment='姓名')
province_id = db.Column(db.Integer, default=0, comment='省份id')
city_id = db.Column(db.Integer, default=0, comment='城市id')
region_id = db.Column(db.Integer, default=0, comment='区域id')
gender = db.Column(db.Enum(GenderEnumModel), default=GenderEnumModel.UNKNOW, comment='性别')
login_error_num = db.Column(db.Integer, default=0, comment='登录错误次数,每次登录成功清零')
last_login_time = db.Column(db.DateTime, comment='最后登录时间')
is_developer = db.Column(db.Integer, default=0, comment="是否开发者:0 否,1 是")
status = db.Column(db.Integer, default=0, comment='状态:0 正常,-1 禁用')
active = db.Column(db.Integer, default=1, comment='是否可用:0 不可用,1 表示可用')
is_delete = db.Column(db.Integer, default=0, comment='是否删除:0 正常,1 表示已删除')
# 上级用户
# superior_user_id = db.Column(db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), nullable=True, default="", comment='上级用户id')
# 当上级用户有多个时,使用多对多的关系
# superior_user = db.relationship('UserModel', secondary=superior_users, backref='users')
# 当上级用户只有一个时,直接关联上级用户即可。自引用user.id,需加入remote_side参数
# superior_user = db.relationship('UserModel', remote_side=[id])
# 用户角色
roles = db.relationship('UserRoleModel', secondary=role_to_user, backref='users')
def __init__(self, user_id=None, *args, **kwargs):
if "password" in kwargs:
self.password = kwargs.get('password')
kwargs.pop("password")
super(UserModel, self).__init__(*args, **kwargs)
# def toDict(self):
# # 重载基类toDict方法
# c_dict = BaseModel.toDict(self)
#
# try:
# roles = []
# for role in self.roles:
# roles.append(role.toDict())
# c_dict["roles"] = roles
#
# if "_password" in c_dict:
# del c_dict["_password"]
# if "gender" in c_dict:
# # 枚举类型处理
# c_dict["gender_name"] = self.gender.name
# c_dict["gender_value"] = self.gender.value
# del c_dict["gender"]
# except Exception as e:
# logger.err(e)
#
# return c_dict
@property
def jwt_login_time_dict(self):
"""
将jwt_login_time格式化为dict
:return:
"""
jwt_login_times = {}
for v in self.jwt_login_time:
jwt_login_times[v.id] = round(v.login_time / 1000000, 6)
return jwt_login_times
@property
def password(self):
return self._password
@password.setter
def password(self, raw_password):
# password赋值时,自动加密
self._password = generate_password_hash(raw_password)
def check_password(self, raw_password):
result = check_password_hash(self.password, raw_password)
return result
@property
def is_authenticated(self):
return True
@property
def is_active(self):
if self.status == 0:
return True
else:
return False
# return self.active
@property
def is_anonymous(self):
return False
@property
def permissions(self):
"""
返回该用户的所有权限
:return:
"""
# 开发者直接返回所有权限
if self.is_developer:
return UserPermissionModel.all_permission("all")
# 如果没有关联角色,返回空
if not self.roles:
return []
# 定义权限集合
all_permissions = []
# 循环合并多角色的权限,最后去重
for role in self.roles:
all_permissions.extend(role.permissions)
return list(set(all_permissions))
def can(self, permission):
"""
判断用户是否有传入权限
:param permission <UserPermissionModel.code>: 权限编码
:return:
"""
result = False
has_per = UserPermissionModel.has_permission(permission)
# 如果权限有入库
if has_per:
# 查询当前用户所有权限
all_permissions = self.permissions
for p in all_permissions:
# 传入权限属于该用户
if permission == p.code:
result = True
else:
# 如果权限没有入库,直接返回通过
result = True
return result
@property
def user_info(self):
"""
返回用户个人信息
:return:
"""
# info = {
# "name": self.realname,
# "status": self.status,
# "is_delete": self.is_delete,
# "email": self.email,
# "telephone": self.telephone,
# "avatar_url": self.avatar_url,
# "id": self.id
# }
# roles = []
# for role in self.roles:
# roles.append(role.role_info)
# info["roles"] = roles
# return info
return self.to_dict(only=('id', 'username','realname', 'status', 'is_delete', 'email', 'telephone', 'avatar_url', 'roles'))
def __repr__(self):
return '<User %r>' % (self.username)
# # 上级用户关联表,如有多个用户上级,则需要用该表
# superior_users = db.Table(
# 'bbx_superior_users',
# db.Column('superior_user_id', db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), primary_key=True),
# db.Column('user_id', db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), primary_key=True)
# )
class AnonymousUser(AnonymousUserMixin):
# 匿名用户
def __init__(self, **kwargs):
super(AnonymousUser, self).__init__(**kwargs)
@property
def id(self):
return None
@property
def is_active(self):
return False
@property
def is_authenticated(self):
return False
@property
def is_anonymous(self):
return True
@property
def user_info(self):
return None
@property
def permissions(self):
return None
def can(self, permissions):
return False
# def get_id(self):
# return None
class UserJwtLoginTimeModel(BaseModel):
# 用户JWT登录时间表
# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_jwt_login_time'
__table_args__ = {'comment': '用户JWT登录时间表'} # 添加索引和表注释
# 序列化排除字段
serialize_rules = ("-user",)
id = db.Column(db.String(100), primary_key=True, comment='cid')
user_id = db.Column(db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), comment='用户ID')
login_time = db.Column(db.BigInteger, comment='jwt登录时间')
user = db.relationship('UserModel', backref='jwt_login_time')
class UserLoginLogModel(BaseModel):
# 用户登录日志表
# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_login_log'
__table_args__ = {'comment': '用户登录日志表'} # 添加索引和表注释
# 序列化排除字段
serialize_rules = ("-user",) ,
id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='唯一标识')
user_id = db.Column(db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), comment='用户ID')
pass_error = db.Column(db.Integer, default=0, comment='登录结果:0 成功,1 失败')
login_account = db.Column(db.String(100), default='', comment='登录账号')
login_msg = db.Column(db.String(100), default='', comment='登录信息')
ip = db.Column(db.String(20), default='', comment='登录IP')
client = db.Column(db.String(100), default='', comment='客户端')
user = db.relationship('UserModel', backref='login_log')