第08章 安全

安全概述

用户认证

https://docs.djangoproject.com/zh-hans/2.0/topics/auth/

权限

RBAC

全称 role-based access control,即基于角色的访问控制。

简单说来,把人员按角色分配权限,角色可以理解为企业管理中的岗位。岗位是相对稳定的,人员是易变的。所以引用角色可以极大的提升系统的灵活性和易用性。

参考 https://en.wikipedia.org/wiki/Role-based_access_control

django权限

django的权限系统,可以认为是轻量级的RBAC系统,其中组可以对应RBAC的角色。通过将权限分配给组,再将用户分配到组中,从而实现了授权的过程,同时django还支持直接对用户进行授权,这对应对一些特性情况是十分有用的。

django的权限系统实现了最基本的权限需求,但在补救的来务开发过程上,仍然存在许多无法满足的需求,好在django是一套异常强大的系统,系统内部提供了一套灵活的机制,便利开发人员可以方便的对系统的权限进行扩展。

扩展django权限

对象级权限

在开发过程中,遇到最多的问题应该就是对象级权限的问题了。Django内置的权限授权是针对Model进行的授权,每个Model默认都会生成三个权限:


add、


change、


delete。

如一旦授予某用户Change权限,那么该用户就拥有了该Model所有对象的Change权限,但有些时候我们希望仅允许用户对该Model下的指定对象进行操作,比如新闻发布系统可以允许所有的编辑人员进行新闻的发布,但仅允许发布新闻的编辑人员修改自己所发布的新闻,这时候Django的权限系统就无法满足我们的需求了,我们需要引入更细粒度的对象级权限支持(此处新闻的例子在下文的系统组的实现一节中有更好的解决方案)。

所谓对象级权限,即针对的Model对象实例进行授权操作,使用我们可以精确的对单个对象进行权限的控制,以实现更细精度的权限控制。

  1. 虽然Django的权限系统没有直接提供对象权限的实现,但确提供了基础支持(相关的API都提供有对象参数的传入),使得我可以通过扩展认证后端从而实现对象级的权限支持。目前已经有非常好的对象级权限的第三方实现django-guardian,在此就不具体探讨了,想深入了解的朋友可以自己查看相关的文档。

系统组的实现

什么是系统组 system group ?Django默认的权限系统中只有组的概念,没有什么系统组啊?

其实系统组只是我们为了区别与现有组的一种称呼而已,你可以给它起一个更酷的名字。

那么如何理解系统组呢?我们知道Django里的组都需要我们进行硬性的绑定才会生效的,比如你创建一个“编辑”的组,那么你需要给编辑这个组授予相应内容的编辑权限,同时你需要将相应的用户分配至该“编辑”组,那么用户才会真正的拥有了编辑应有的权限,这在大多数情况下是没有问题的,但这种硬性的绑定极大的限制了系统的灵活性,因为很多时候系统是需要根据运行时的环境来决定用户应该属于哪些组的。

结合上文对象级权限中新闻发布的例子,如果使用对象级权限的方式来实现的话,我们就必需在用户发布新闻的同时,向对象级权限系统中添加一条权限分配的记录(对象级权限系统是需要独立的数据表来记录用户或组与对象的权限关系),这好像也没什么问题,无非是重写save方法或都添加一条signals就可以应对了,

  • 但如果我们希望用户的上级领导也能够修改该信息呢,你可能会说给上级领导也添加一条对象级权限啊,但如果该用户的上级领导变更了呢?

  • 如果我们又希望用户同部门的员工允许修改呢?

是不是开始变得麻烦了,如果有一种机制来处理这种动态关系,那事情就变得简单多了,而实现这一机制的就是系统组。系统组就是在系统运行期,根据运行环境来决定用户所隶属的组,从而实现灵活的授权机制。

看完上面的解释,好像有点明白了,但具体该怎么实现呢?

首先,大家需要理解的一点是系统组本身并非系统运行时动态创建的,而是在开发阶段根据业务需要创建的,系统组机制只是在运行时由系统决定用户与系统组的关系而已。因而在业务开发阶段,我们就必需确定好业务所需的系统组。当然我们也可以从所有的业务中抽象出一些具备全局通用性的系统组,以下列出我们抽象出来的具备通用性的系统组供大家参考:

  1. Everyone:所有人,无论是用户还是访客,都属于Everyone组。
  2. Anonymous:匿名用户,非认证的用户都属于Anonymous组。
  3. Users:用户,所有认证的用户都属于Users组。
  4. Creator:创建者,针对具体信息的创建者,都属于Creator组。
  5. Owner:所有者,具体信息的拥有者,都属于Owner组。

看了上边所列出的系统组,大家是不是感觉好像又更理解了一些呢,做过Windows文件授权的朋友,可能还有点似曾相识的感觉,对,这和Windows系统里的特殊组是一样一样的。除了以上所列出的系统组,我们还可以针对特定的业务创建针对性的系统组,如上例中,允许用户的上级领导修改该用户所发布的信息,那么我们可以创建一个名为“信息所有者的上级领导”这样的系统组。

上代码:

  1. # 看下面代码,我们声明了我们所需的系统组的常量,声明为常量是便于代码的调用
  2. SYSTEM_GROUP_EVERYONE = "Everyone" # 所有人
  3. SYSTEM_GROUP_ANONYMOUS = "Anonymous" # 匿名用户
  4. SYSTEM_GROUP_USERS = "Users" # 用户
  5. SYSTEM_GROUP_STAFFS = "Staffs" # 职员
  6. SYSTEM_GROUP_CREATOR = "Creator" # 创建者
  7. SYSTEM_GROUP_OWNER = "Owner" # 所有者

根据上文所述,系统组是在开发阶段就应该确定,那么确定后的系统组应该如何在系统中体现呢?其实系统组只是我们定义的一个概念,为了保证Django系统权限调用接口的一至性,它仍然是一个组对象,只是是一个我们赋予了特殊意义的组,我们仍然需要在Django的“组”这个Model中创建相应的系统组对象(当然如果考虑到系统组的特殊性,可以通过一些机制来限制对系统组的修改和删除操作,以保证系统组始终可用,如何限制不是权限系统的核心,在此不做详述)。你可以通过声明一个系统组初始化的方法,并在适当的地方调用他,当然你也可以使用Django提供的初始数据方法来初始系统组。看代码:

  1. from django.contrib.auth.models import Group
  2. def init_systemgroups():
  3. """
  4. 初始化系统组。
  5. :return:
  6. """
  7. Group.objects.get_or_create(name=SYSTEM_GROUP_EVERYONE)
  8. Group.objects.get_or_create(name=SYSTEM_GROUP_ANONYMOUS)
  9. Group.objects.get_or_create(name=SYSTEM_GROUP_USERS)
  10. Group.objects.get_or_create(name=SYSTEM_GROUP_STAFFS)
  11. Group.objects.get_or_create(name=SYSTEM_GROUP_CREATOR)
  12. Group.objects.get_or_create(name=SYSTEM_GROUP_OWNER)

好了,现在我们已经有了系统组数据了,接下来我们就需要实现系统组的核心逻辑了,从Django所提供的权限验证接口User.has_perm(perm, obj=None),我们可以看出存在两种情况:


一种是不提供obj参数的情况,我们可以语义化理解为“用户(User)是否拥有perm权限”;


一种是提供了obj参数的情况,我们可以语义化理解为“用户(User)是否拥有指定对象(obj)的perm权限”。

同样的,我们系统组也可以分为两种情况:

  1. 一种是与obj参数无关的,我们可以语义化理解为“用户(User)是否为系统组(system group)的成员”,这包括上面所列的Everyone、Anonymous、Users、Staffs等;
  2. 一种是与obj参数有关的,我们可以语义化理解为“用户(User)是否为对象(obj)的系统组(system group)的成员”,包括上面所列的Creator、Owner等。依赖于相同的参数,使得我们可以保持与Django一至的验证接口,我们现在要做的是根据这些参数,给系统返回恰当的系统组,先来看看与obj参数无关的情况的代码:
  1. def get_user_systemgroups(user):
  2. """
  3. 获取指定用户所属的系统组集合。
  4. :param user: 指定的用户。
  5. :return: set 表示的用户所属的系统组名称集合。
  6. """
  7. groups = set()
  8. groups.add(SYSTEM_GROUP_EVERYONE)
  9. if user.is_anonymous():
  10. groups.add(SYSTEM_GROUP_ANONYMOUS) # 如果是匿名用户,添加匿名组
  11. else:
  12. groups.add(SYSTEM_GROUP_USERS) # 添加user组
  13. if user.is_staff:
  14. groups.add(SYSTEM_GROUP_STAFFS) # 添加员工组
  15. return groups

OK,是不是很简单,我们只是对User进行简单的验证,就可以获得User有关的系统组了,而第二种与obj参数有关的情况就比较复杂了,比如Creator组,我们必需要获取对象的创建者,我们才能与User进行比较,从而验证User是否为obj的创建者,然而Creator组是我们抽象出来的全局通用的组,意味着我们的Model需要提供一致的方法来获取对象的创建者,这时我们需要定义一个接口(Python没有提供接口的概念,我们只是通过抽象的方法来模拟)来实现:

  1. class CreatorMixin(object):
  2. """
  3. 实现创建者的 Model 基类。
  4. """
  5. def get_creator(self):
  6. """
  7. 获取对象的创建者,子类重写该方法实现创建者对象的获取。
  8. :return: 当前对象的创建者。
  9. """
  10. return None
  11. def set_creator(self, user):
  12. """
  13. 设置对象的创建者,子类重写该方法实现创建者对象的设置。
  14. :param creator: 要设置为创建者的User对象。
  15. :return:
  16. """
  17. pass
  18. class OwnerMixin(object):
  19. """
  20. 实现所有者的 Model 基类。
  21. """
  22. def get_owner(self):
  23. """
  24. 获取对象的所有者,子类重写该方法实现所有者对象的获取。
  25. :return: 当前对象的所有者。
  26. """
  27. return None
  28. def set_owner(self, user):
  29. """
  30. 设置对象的所有者,子类重写该方法实现所有者对象的设置。
  31. :param owner: 要设置为所有者的User对象。
  32. :return:
  33. """
  34. pass

现在再来看看第二种情况的实现:

  1. def get_user_systemgroups_for_obj(user, obj):
  2. """
  3. 获取指定用户相对于指定的对象所属的系统组集合。
  4. :param user: 指定的用户。
  5. :param obj: 相对于指定的对象。
  6. :return: set 表示的用户所属的系统组名称集合。
  7. """
  8. groups = set()
  9. if isinstance(obj, CreatorMixin) and obj.get_creator() == user:
  10. groups.add(SYSTEM_GROUP_CREATOR)
  11. if isinstance(obj, OwnerMixin) and obj.get_owner() == user:
  12. groups.add(SYSTEM_GROUP_OWNER)
  13. return groups

现在,我们为了保证系统组的扩展性,我们需要定义一套规则,使得你可以在你自己的应用中,扩展实现自己业务所需要的系统组,我们约定在你的应用中应该存在一个模块,模块中应该包含有以上声明的get_user_systemgroups(user)和get_user_systemgroups_for_obj(user, obj)两个方法,同时你需要在项目的settings.py文件中,告诉系统你的系统组实现的模块路径,类似如下:

  1. # 自定义系统组实现
  2. SYSTEM_GROUP_IMPLEMENTERS = ['systemgroups.systemgroups', '你自己实现的系统组的路径'…]

同时,我们需要提供一个方法来根据上面规则,依次获取所有应用中的用户所属的系统组集合,代码如下:

  1. def get_user_systemgroups(user):
  2. """
  3. 从所有应用中获取指定用户所属的系统组集合。
  4. :param user: 指定的用户。
  5. :return: set 表示的用户所属的系统组名称集合。
  6. """
  7. imps = SYSTEM_GROUP_IMPLEMENTERS
  8. groups = set()
  9. if not imps:
  10. return groups
  11. for imp in imps:
  12. imp = importlib.import_module(imp)
  13. if hasattr(imp, "get_user_systemgroups"):
  14. groups.update(imp.get_user_systemgroups(user))
  15. return groups
  16. def get_user_systemgroups_for_obj(user, obj):
  17. """
  18. 从所有应用中获取指定用户相对于指定的对象所属的系统组集合。
  19. :param user: 指定的用户。
  20. :param obj: 相对于指定的对象。
  21. :return: set 表示的用户所属的系统组名称集合。
  22. """
  23. imps = SYSTEM_GROUP_IMPLEMENTERS
  24. groups = set()
  25. if not imps:
  26. return groups
  27. for imp in imps:
  28. imp = importlib.import_module(imp)
  29. if hasattr(imp, "get_user_systemgroups_for_obj"):
  30. groups.update(imp.get_user_systemgroups_for_obj(user, obj))
  31. return groups

最后,我们来实现我们的认证后端:

  1. def get_group_permissions(name):
  2. """
  3. 获取指定名称的组所拥有的权限集合。
  4. :param name: 组的名称。
  5. :return: 权限集合。
  6. """
  7. perms = Permission.objects.filter(group__name = name)
  8. perms = perms.values_list('content_type__app_label', 'codename').order_by()
  9. return set(["%s.%s" % (ct, name) for ct, name in perms])
  10. def get_groups_permissions(names):
  11. """
  12. 获取指定名称的组所拥有的权限集合。
  13. :param names: 组的名称集合。
  14. :return: 权限集合。
  15. """
  16. perms = set()
  17. for name in names:
  18. perms.update(get_group_permissions(name))
  19. return perms
  20. class SystemGroupBackend(object):
  21. def authenticate(self, username=None, password=None, **kwargs):
  22. return None
  23. def has_perm(self, user_obj, perm, obj=None):
  24. return perm in self.get_all_permissions(user_obj, obj)
  25. def get_all_permissions(self, user_obj, obj=None):
  26. perms = self.get_group_permissions(user_obj, obj)
  27. return perms
  28. def get_group_permissions(self, user_obj, obj=None):
  29. result_perms = set()
  30. groups = get_user_systemgroups(user_obj)
  31. perms = get_groups_permissions(groups)
  32. result_perms.update(perms)
  33. if obj is None:
  34. return result_perms
  35. groups = get_user_systemgroups_for_obj(user_obj, obj)
  36. perms = get_groups_permissions(groups)
  37. result_perms.update(perms)
  38. return result_perms

至此,我们完整的实现了系统组的机制,以上代码因篇幅原因删减了部分与主逻辑关系不大的代码,如权限的缓存等,以便于阅读和理解,完整的代码请参考我的GitHub项目:https://github.com/Kidwind/django-systemgroups,同时因项目时间较短,仍然存在很多不足和问题,也欢迎大家指正。

权限映射

权限映射又是什么呢?

要回答这个问题,我们还是以上文中的新闻发布的例子来展开,我们的新闻应该是根据性质进行分类的,比如实事新闻、财经新闻、体育新闻等等,这时我们就形成了新闻类别(InfoCategory)和新闻(Info)这两个一对多关系的Model,随着工作的细分,我们需要将不同的分类授权不同的部门来进行管理,这时你想到的是什么?

  1. 对,就是上文中所提到的对象级权限,我们给新闻类别(InfoCategory)创建一个用于控制新闻类别下的新闻的修改权限,名为change_info_by_category,通过针对新闻类别(InfoCategory)进行对象级的change_info_by_category授权,如果此时我们要进行某篇新闻的修改权限验证,我们需要对新闻所在栏目进行change_info_by_category的权限验证,像这样`user.has_perm(‘app_label. change_info_by_category’, obj=info.category)`,有什么问题吗?似乎也没什么问题,但我们细细分析一下,我们原本对新闻(Info)进行修改的权限验证方法`user.has_perm(‘app_label.change_info’, obj=info)`,需要人为的转换为上面的权限验证,相应的Django提供的Admin我们需要重写相应的方法来修改权限验证的逻辑,如果新闻(Info)本身还提供对象级的权限检测,我们的逻辑就需要改为要对两个方法都进行验证,还有更多复杂的情况,情况一变,我们就需要重写我们的权限验证逻辑吗,很麻烦不是吗,如果有一种方法能够实现上述权限验证的自动转换,能够保证我们的权限调用方法不变,那事情就简单多了,而这一方法,就是我们所说的权限映射。

简而言之,权限映射就是将用户对当前对象所执行的权限验证转换为用户对另一个对象的另一个权限进行验证的过程。看下图:

第08章 安全 - 图1

好了,分析到这里,相信大家对权限映射的作用有了大概的认识,下面我们来对代码实现做简单的分析和了解。同系统组一样,我们的Model需要提供一至的方法来根据当前的权限验证参数,获取映射后的权限验证参数,我们定义接口如下:

  1. class PermMappableMixin(object):
  2. """
  3. 实现权限映射的 Model 基类。
  4. """
  5. @classmethod
  6. def mapping_permission(cls, perm, obj=None):
  7. """
  8. 根据当前的权限验证参数,获取映射后的权限验证参数。(此类方法仅为标记方法,子类应实现相应的方法)
  9. :param perm: 当前检测的权限。
  10. :param obj: 当前进行检测权限的对象。
  11. :return: 返回值包含两个参数:第一个参数为映射后的权限;第二个参数为对应映射后的对象,其应为映射后权限所对应的 Model 的实例。
  12. """
  13. return None, None

接下来,我们只需要实现我们的认证后端就可以了:

  1. from django.contrib.contenttypes.models import ContentType
  2. class PermMappingBackend(object):
  3. def authenticate(self, username=None, password=None, **kwargs):
  4. return None
  5. def has_perm(self, user_obj, perm, obj=None):
  6. app_label, codename = perm.split('.')
  7. content_types = ContentType.objects.filter(
  8. app_label = app_label,
  9. permission__codename = codename) # 根据权限获取其对应的ContentType实例。
  10. for content_type in content_types:
  11. model_class = content_type.model_class() # 根据 ContentType 实例获取对应的 Model 类
  12. if issubclass(model_class, PermMappableMixin):
  13. mapped_perm, mapped_obj = model_class.mapping_permission(perm, obj = obj)
  14. if mapped_perm and user_obj.has_perm(mapped_perm, obj=mapped_obj):
  15. return True
  16. return False

是不是很简单,接下来看看我们新闻例子的代码:

  1. from django.utils.translation import ugettext as _
  2. from django.db import models
  3. class InfoCategory(models.Model):
  4. name = models.CharField(max_length=128, verbose_name=_('分类名称'))
  5. class Meta:
  6. permissions = (
  7. ("add_info_by_category", _("允许添加分类信息")),
  8. ("change_info_by_category", _("允许修改分类信息")),
  9. ("delete_info_by_category", _("允许删除分类信息")),
  10. )
  11. class Info(PermMappableMixin, models.Model):
  12. category = models.ForeignKey(InfoCategory, verbose_name=_("所属分类"))
  13. title = models.CharField(max_length=256, verbose_name=_('标题'))
  14. @classmethod
  15. def mapping_permission(cls, perm, obj=None):
  16. mapped_perm = None
  17. mapped_obj = None
  18. if perm == "permmapping.add_info":
  19. mapped_perm = "permmapping.add_info_by_category"
  20. elif perm == "permmapping.change_info":
  21. mapped_perm = "permmapping.change_info_by_category"
  22. elif perm == "permmapping.delete_info":
  23. mapped_perm = "permmapping.delete_info_by_category"
  24. if isinstance(obj, cls):
  25. mapped_obj = obj.category
  26. return mapped_perm, mapped_obj

总结:

Django系统提供了一套灵活的认证系统,使得我们可以通过扩展其实现灵活的权限控制策略,本文结合我在项目过程中的实践经验,为大家展示了通过对象级权限、系统组的实现、权限映射来解决项目中所遇到的几种权限问题,同时实践中也可以通过组合这几种权限机制,实现更多更为复杂的权限策略。其它类的开发语言,也可以借鉴Django及上文所提到的几种权限系统的扩展的思路,实现各自平台的权限系统。

劫持保护

CSRF保护

什么是CSRF

CSRF, Cross Site Request Forgery, 跨站点伪造请求。具体的细节及其危害见 wikipedia

Django的csrf防护机制

django 第一次响应来自某个客户端的请求时,会在服务器端随机生成一个 token,把这个 token 放在 cookie 里。然后每次 POST 请求都会带上这个 token,这样就能避免被CSRF攻击。

  1. 在返回的HTTP响应的cookie里,django会为你添加一个csrftoken字段,其值为一个自动生成的token
  2. 在所有的POST请求之前,django会验证这个请求的cookie里的csrftoken字段的值和提交的表单里的csrfmiddlewaretoken字段的值是否一样。如果一样,刚表明这是一个合法的请求,否则这个请求是来自于别人的csrf攻击,返回403 Forbidden
  3. 在所有 ajax POST请求里,添加一个X-CSRFTOKEN header,其值为cookie里的csrftoken的值

Django如果使用CSRF保护

  1. 基本原则:GET请求不要有副作用。
  2. 启用django.middleware.csrf.CsrfViewMiddleware这个中间件
  3. 在所有的POST表单元素时,需要加上一个{% csrf_token %}
  4. 在渲染模块时,使用RequestContextRequestContext会处理csrf_token这个tag,从而自动为表单添加一个名为csrfmiddlewaretoken的input

类视图使用CSRF保护

基于类视图的CSRF保护使用装饰器method_decorator实现,且装饰器只能加到 dispatch()方法上。

  • csrf_protect,为当前函数强制设置防跨站请求伪造功能,即便settings中没有设置全局中间件。
  • csrf_exempt,取消当前函数防跨站请求伪造功能,即便settings中设置了全局中间件
  1. from django.views.decorators.csrf import csrf_exempt, csrf_protect
  2. class HomeView(View):
  3. @method_decorator(csrf_exempt)
  4. def dispatch(self, request, *args, **kwargs):
  5. return super(HomeView, self).dispatch(request, *args, **kwargs)
  6. def get(self, request):
  7. return render(request, "home.html")
  8. def post(self, request):
  9. print("Home View POST method...")
  10. return redirect("/index/")

类视图结合装饰器

直接在视图上添加装饰器时,@method_decorator(name=xxx)一定要传递name参数

  1. from django.views import View
  2. from django.utils.decorators import method_decorator
  3. def wrapper(func):
  4. @wraps(func)
  5. def inner(request, *args, **kwargs):
  6. # 登录校验
  7. cookie_k = request.session.get("user01", None)
  8. if cookie_k:
  9. # 表示已经登录的用户
  10. ret = func(request, *args, **kwargs)
  11. return ret
  12. else:
  13. # 滚去登录
  14. return redirect("/login/")
  15. return inner
  16. class LoginView(View):
  17. def get(self, request):
  18. return render(request, "login.html")
  19. def post(self, request):
  20. user = request.POST.get("name")
  21. pwd = request.POST.get("pwd")
  22. if user == "safly" and pwd == "123":
  23. # 登陆成功
  24. # 写session
  25. request.session["user01"] = user
  26. # request.session.set_expiry(5)
  27. return redirect("/index/")
  28. @method_decorator(wrapper, name="get")
  29. class IndexView(View):
  30. def get(self, request):
  31. user = request.session.get("user01", "游客")
  32. return render(request, "index.html", {"user": user})

装饰器添加到GET/POST方法上

  1. # @method_decorator(wrapper, name="get")
  2. class IndexView(View):
  3. @method_decorator(wrapper)
  4. def get(self, request):
  5. user = request.session.get("user02", "游客")
  6. return render(request, "index.html", {"user": user})

装饰器添加到dispatch()

  1. # @method_decorator(wrapper, name="get")
  2. class IndexView(View):
  3. ## 这么写所有的请求方法都要做登录校验
  4. @method_decorator(wrapper)
  5. def dispatch(self, request, *args, **kwargs):
  6. return super(IndexView,self).dispatch(request,*args,**kwargs)
  7. # @method_decorator(wrapper)
  8. def get(self, request):
  9. user = request.session.get("user04", "游客")
  10. return render(request, "index.html", {"user": user})

加密签名