数据表基本class设计:
#encoding: utf-8from apps.app import dbfrom apps.configs.db_config import DB_CONFIGfrom datetime import datetimeimport timeimport copyfrom sqlalchemy.ext.declarative import declared_attrfrom sqlalchemy_serializer import SerializerMixinfrom sqlalchemy import textfrom apps.app import loggerdef current_timestamp():"""返回毫秒级时间戳:return:"""return int(round(time.time() * 1000))class BaseModel(db.Model, SerializerMixin):"""设为db基类。SerializerMixin为数据序列化插件"""# Flask-SQLAlchemy创建table时,如何声明基类(这个类不会创建表,可以被继承)# 方法就是把__abstract__这个属性设置为True,这个类为基类,不会被创建为表!__abstract__ = True# 添加配置设置编码# __table_args__ = {# 'mysql_charset': DB_CONFIG['mysql']['charset']# }@declared_attrdef __tablename__(cls):# 将表类名自动小写,_分割,去掉末尾的"Model"字样if "_" in cls.__name__:name =cls.__name__else:name = ''.join([('_' + ch.lower()) if ch.isupper() else chfor ch in cls.__name__]).strip('_')if name[-6:] == "_model":name = name[:-6]_table_prefix = DB_CONFIG['mysql']['prefix']return _table_prefix + name if _table_prefix else name# 每个表都自动带上创建时间、更新时间# server_default表示数据库字段默认值,会写入到数据库字段定义中。且值只接受字符串,不接收整型、布尔型等,需要用text()函数格式化。create_time = db.Column(db.BigInteger, default=current_timestamp, server_default=text('0'), comment='创建时间') # 存储毫秒级时间戳# create_time = db.Column(db.Integer, default=time.time, comment='创建时间') # 存储秒级时间戳create_by = db.Column(db.String(100), default="", server_default='', comment='创建人')update_time = db.Column(db.DateTime, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'), onupdate=datetime.now, server_onupdate=text('CURRENT_TIMESTAMP'), comment='更新时间')update_by = db.Column(db.String(100), default="", server_default='', comment='更新人')def toDict(self):"""返回dict结果用法:r = model.query.……> r.toDict() # 输出dict结果数据> r.toDict().__str__() # 输出字符串结果数据:return: c_dict"""# 使用深拷贝,避免引用对象self.__dict__缺少"_sa_instance_state"属性c_dict = copy.deepcopy(self.__dict__)for (k, v) in c_dict.items():# 将datetime内容格式化为[Y-m-d H:M:S]if isinstance(v, datetime):c_dict[k] = v.strftime("%Y-%m-%d %H:%M:%S")try:if "_sa_instance_state" in c_dict:del c_dict["_sa_instance_state"]# if "create_time" in c_dict:# del c_dict["create_time"]if "update_time" in c_dict:del c_dict["update_time"]except Exception as e:logger.error(e)return c_dict
用户相关表结构定义:
# encoding:utf-8# 用户模型from apps.app import dbfrom apps.models.base import BaseModelfrom apps.configs.db_config import DB_CONFIGimport shortuuidfrom werkzeug.security import generate_password_hash, check_password_hashfrom apps.models.enums import GenderEnumModel, DomainEnumModelfrom flask_login import UserMixin, current_user, AnonymousUserMixinfrom apps.app import loggerclass UserPermissionModel(BaseModel):"""权限集表角色权限关系:UserPermissionModel:存储每个页面/操作权限的权限编码;UserRoleModel:角色表;role_to_permission:角色的权限通过角色权限关联表进行一对多关联;UserModel:用户可多选角色;role_to_user:用户与角色通过用户角色关联表进行一对多关联;"""# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_permission'__table_args__ = {'comment': '菜单权限表'} # 添加索引和表注释# 序列化排除字段serialize_rules = ("-roles",)id = db.Column(db.Integer, primary_key=True, autoincrement=True)name = db.Column(db.String(100), default='', comment="权限名称")code = db.Column(db.String(50), nullable=False, comment='权限编码', unique=True)parent_id = db.Column(db.Integer, default=0, comment="上级权限ID")type = db.Column(db.Integer, comment="权限类型:1 目录,2 菜单,3 按钮,9 接口")url = db.Column(db.String(100), default="", comment='请求链接')is_frame = db.Column(db.Integer, default=0, comment="是否外链:0 否,1 是")explain = db.Column(db.String(255), default='', comment="说明")domain = db.Column(db.Enum(DomainEnumModel), default=DomainEnumModel.ADMIN, unique=False, comment='权限所属域')sort_num = db.Column(db.Integer, default=10, comment='排序码,数值越小越靠前')is_default = db.Column(db.Integer, default=0, comment="是否默认拥有的权限:0 非默认;1 默认权限")status = db.Column(db.Integer, default=0, comment='状态:0 正常/显示,-1 禁用/隐藏')is_delete = db.Column(db.Integer, default=0, comment='是否删除:0 正常,1 表示已删除')# # 上级权限id# parent_id = db.Column(db.Integer, db.ForeignKey('%s.id' % (DB_CONFIG['mysql']['prefix'] + 'user_permission')), nullable=True, comment='上级权限id')# # 当上级只有一个时,直接关联上级即可。自引用UserPermissionModel.id,需加入remote_side参数# parent = db.relationship('UserPermissionModel', remote_side=[id], backref="children")def default_permission(self, domain=None):"""查询未删除的默认权限集:param domain: 如果传入站点标识,则只查询该站点的默认权限集:return:"""# 查询条件q = [UserPermissionModel.is_delete==0]if domain:q.append(UserPermissionModel.domain==domain)return UserPermissionModel.query.filter(*q).all()def all_permission(self, domain=None):"""查询全部未删除的权限集:param domain: 如果传入站点标识,则只查询该站点的全部权限集:return:"""# 查询条件q = [UserPermissionModel.is_delete == 0]if domain:q.append(UserPermissionModel.domain == domain)return UserPermissionModel.query.filter(*q).all()def has_permission(self, code):"""判断权限是否有入库:param code <UserPermissionModel.code>::return:"""permission = UserPermissionModel.query.filter(UserPermissionModel.is_delete==0, UserPermissionModel.code==code).first()if permission:return Trueelse:return False# 角色-权限 关联表role_to_permission = db.Table(DB_CONFIG['mysql']['prefix'] + 'user_role_to_permission',db.Column('role_id', db.String(100), db.ForeignKey('%s.id' % (DB_CONFIG['mysql']['prefix'] + 'user_role')),primary_key=True),db.Column('permission_id', db.Integer, db.ForeignKey('%s.id' % UserPermissionModel.__tablename__), primary_key=True))class UserRoleModel(BaseModel):# 用户角色表# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_role'__table_args__ = {'comment': '用户角色表'} # 添加索引和表注释serialize_rules = ("-users", "-permissions")# serialize_only = ("-users",)id = db.Column(db.String(100), primary_key=True, default=shortuuid.uuid, comment='唯一标识')code = db.Column(db.String(100), unique=False, comment='角色编码')name = db.Column(db.String(50), default='', comment='角色名称')desc = db.Column(db.String(100), default='', comment='角色介绍')domain = db.Column(db.Enum(DomainEnumModel), default=DomainEnumModel.ADMIN, unique=False, comment='角色所属域')sort_num = db.Column(db.Integer, default=10, comment='排序码,数值越小越靠前')is_default = db.Column(db.Integer, default=0, comment='默认角色:0 非默认,1 默认')status = db.Column(db.Integer, default=0, comment='状态:0 正常,-1 禁用')is_delete = db.Column(db.Integer, default=0, comment='是否删除:0 正常,1 表示已删除')# 角色权限permissions = db.relationship('UserPermissionModel', secondary=role_to_permission, backref='roles')def toDict(self):# 重载基类toDict方法c_dict = BaseModel.toDict(self)try:if "domain" in c_dict:# 枚举类型处理c_dict["domain_name"] = self.domain.namec_dict["domain_value"] = self.domain.valuedel c_dict["domain"]except Exception as e:logger.error(e)return c_dict@propertydef role_info(self):info = {# 'id': self.id,'name': self.name,'code': self.code,'domain': self.domain.name,'status': self.status}return info# 用户-角色 关联表role_to_user = db.Table(DB_CONFIG['mysql']['prefix'] + 'user_role_to_user',db.Column('role_id', db.String(100), db.ForeignKey('%s.id' % UserRoleModel.__tablename__), primary_key=True),db.Column('user_id', db.String(100), db.ForeignKey('%s.id' % (DB_CONFIG['mysql']['prefix'] + 'user')),primary_key=True))class UserModel(BaseModel, UserMixin):# 用户信息表__tablename__ = DB_CONFIG['mysql']['prefix'] + 'user'__table_args__ = {'comment': '用户信息表'} # 添加索引和表注释# 序列化排除字段serialize_rules = ("-_password",)# serialize_only = ("username", "telephone")id = db.Column(db.String(100), primary_key=True, default=shortuuid.uuid, comment='用户标识')username = db.Column(db.String(50), nullable=False, comment='用户名')telephone = db.Column(db.String(11), unique=False, default="", comment='手机号')_password = db.Column(db.String(100), nullable=False, comment='密码')email = db.Column(db.String(50), default="", comment='邮箱')avatar_url = db.Column(db.String(100), default="", comment='头像地址')signature = db.Column(db.String(100), default="", comment='个性签名')realname = db.Column(db.String(50), default="", comment='姓名')province_id = db.Column(db.Integer, default=0, comment='省份id')city_id = db.Column(db.Integer, default=0, comment='城市id')region_id = db.Column(db.Integer, default=0, comment='区域id')gender = db.Column(db.Enum(GenderEnumModel), default=GenderEnumModel.UNKNOW, comment='性别')login_error_num = db.Column(db.Integer, default=0, comment='登录错误次数,每次登录成功清零')last_login_time = db.Column(db.DateTime, comment='最后登录时间')is_developer = db.Column(db.Integer, default=0, comment="是否开发者:0 否,1 是")status = db.Column(db.Integer, default=0, comment='状态:0 正常,-1 禁用')active = db.Column(db.Integer, default=1, comment='是否可用:0 不可用,1 表示可用')is_delete = db.Column(db.Integer, default=0, comment='是否删除:0 正常,1 表示已删除')# 上级用户# superior_user_id = db.Column(db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), nullable=True, default="", comment='上级用户id')# 当上级用户有多个时,使用多对多的关系# superior_user = db.relationship('UserModel', secondary=superior_users, backref='users')# 当上级用户只有一个时,直接关联上级用户即可。自引用user.id,需加入remote_side参数# superior_user = db.relationship('UserModel', remote_side=[id])# 用户角色roles = db.relationship('UserRoleModel', secondary=role_to_user, backref='users')def __init__(self, user_id=None, *args, **kwargs):if "password" in kwargs:self.password = kwargs.get('password')kwargs.pop("password")super(UserModel, self).__init__(*args, **kwargs)# def toDict(self):# # 重载基类toDict方法# c_dict = BaseModel.toDict(self)## try:# roles = []# for role in self.roles:# roles.append(role.toDict())# c_dict["roles"] = roles## if "_password" in c_dict:# del c_dict["_password"]# if "gender" in c_dict:# # 枚举类型处理# c_dict["gender_name"] = self.gender.name# c_dict["gender_value"] = self.gender.value# del c_dict["gender"]# except Exception as e:# logger.err(e)## return c_dict@propertydef jwt_login_time_dict(self):"""将jwt_login_time格式化为dict:return:"""jwt_login_times = {}for v in self.jwt_login_time:jwt_login_times[v.id] = round(v.login_time / 1000000, 6)return jwt_login_times@propertydef password(self):return self._password@password.setterdef password(self, raw_password):# password赋值时,自动加密self._password = generate_password_hash(raw_password)def check_password(self, raw_password):result = check_password_hash(self.password, raw_password)return result@propertydef is_authenticated(self):return True@propertydef is_active(self):if self.status == 0:return Trueelse:return False# return self.active@propertydef is_anonymous(self):return False@propertydef permissions(self):"""返回该用户的所有权限:return:"""# 开发者直接返回所有权限if self.is_developer:return UserPermissionModel.all_permission("all")# 如果没有关联角色,返回空if not self.roles:return []# 定义权限集合all_permissions = []# 循环合并多角色的权限,最后去重for role in self.roles:all_permissions.extend(role.permissions)return list(set(all_permissions))def can(self, permission):"""判断用户是否有传入权限:param permission <UserPermissionModel.code>: 权限编码:return:"""result = Falsehas_per = UserPermissionModel.has_permission(permission)# 如果权限有入库if has_per:# 查询当前用户所有权限all_permissions = self.permissionsfor p in all_permissions:# 传入权限属于该用户if permission == p.code:result = Trueelse:# 如果权限没有入库,直接返回通过result = Truereturn result@propertydef user_info(self):"""返回用户个人信息:return:"""# info = {# "name": self.realname,# "status": self.status,# "is_delete": self.is_delete,# "email": self.email,# "telephone": self.telephone,# "avatar_url": self.avatar_url,# "id": self.id# }# roles = []# for role in self.roles:# roles.append(role.role_info)# info["roles"] = roles# return inforeturn self.to_dict(only=('id', 'username','realname', 'status', 'is_delete', 'email', 'telephone', 'avatar_url', 'roles'))def __repr__(self):return '<User %r>' % (self.username)# # 上级用户关联表,如有多个用户上级,则需要用该表# superior_users = db.Table(# 'bbx_superior_users',# db.Column('superior_user_id', db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), primary_key=True),# db.Column('user_id', db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), primary_key=True)# )class AnonymousUser(AnonymousUserMixin):# 匿名用户def __init__(self, **kwargs):super(AnonymousUser, self).__init__(**kwargs)@propertydef id(self):return None@propertydef is_active(self):return False@propertydef is_authenticated(self):return False@propertydef is_anonymous(self):return True@propertydef user_info(self):return None@propertydef permissions(self):return Nonedef can(self, permissions):return False# def get_id(self):# return Noneclass UserJwtLoginTimeModel(BaseModel):# 用户JWT登录时间表# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_jwt_login_time'__table_args__ = {'comment': '用户JWT登录时间表'} # 添加索引和表注释# 序列化排除字段serialize_rules = ("-user",)id = db.Column(db.String(100), primary_key=True, comment='cid')user_id = db.Column(db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), comment='用户ID')login_time = db.Column(db.BigInteger, comment='jwt登录时间')user = db.relationship('UserModel', backref='jwt_login_time')class UserLoginLogModel(BaseModel):# 用户登录日志表# __tablename__ = DB_CONFIG['mysql']['prefix'] + 'user_login_log'__table_args__ = {'comment': '用户登录日志表'} # 添加索引和表注释# 序列化排除字段serialize_rules = ("-user",) ,id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='唯一标识')user_id = db.Column(db.String(100), db.ForeignKey('%s.id' % UserModel.__tablename__), comment='用户ID')pass_error = db.Column(db.Integer, default=0, comment='登录结果:0 成功,1 失败')login_account = db.Column(db.String(100), default='', comment='登录账号')login_msg = db.Column(db.String(100), default='', comment='登录信息')ip = db.Column(db.String(20), default='', comment='登录IP')client = db.Column(db.String(100), default='', comment='客户端')user = db.relationship('UserModel', backref='login_log')
