06 drf-认证权限频率 - 图1


为了方便接下来的学习,我们创建一个新的子应用 opt

  1. python manage.py startapp opt

因为接下来的功能中需要使用到登陆功能,所以我们使用django内置admin站点并创建一个管理员.

  1. python3 manage.py makemigrations
  2. python3 manage.py migrate
  3. python3 manage.py createsuperuser

06 drf-认证权限频率 - 图2
创建管理员以后,访问admin站点,先修改站点的语言配置
settings.py

  1. LANGUAGE_CODE = 'zh-hans' # 中文
  2. TIME_ZONE = 'Asia/Shanghai' # 时区是亚洲上海
  3. USE_I18N = True # 国际化
  4. USE_L10N = True # 本地化
  5. USE_TZ = True # 数据库是否使用TIME_ZONE,True表示使用上海的时区,False表示不使用,使用UTC时间,然后转成上海,会差8个小时

06 drf-认证权限频率 - 图3

访问admin 站点效果:
06 drf-认证权限频率 - 图4

一 认证Authentication

1.1 自定义认证方案

1.1.1 编写models


  1. # models.py
  2. class User(models.Model):
  3. username=models.CharField(max_length=32)
  4. password=models.CharField(max_length=32)
  5. user_type=models.IntegerField(choices=((1,'超级用户'),(2,'普通用户'),(3,'二笔用户')))
  6. class UserToken(models.Model):
  7. user=models.OneToOneField(to='User')
  8. token=models.CharField(max_length=64)

1.1.2 新建认证类


  1. from rest_framework.authentication import BaseAuthentication
  2. class TokenAuth():
  3. def authenticate(self, request):
  4. token = request.GET.get('token')
  5. token_obj = models.UserToken.objects.filter(token=token).first()
  6. if token_obj:
  7. return
  8. else:
  9. raise AuthenticationFailed('认证失败')
  10. def authenticate_header(self,request):
  11. pass

1.1.3 编写视图


  1. def get_random(name):
  2. import hashlib
  3. import time
  4. md=hashlib.md5()
  5. md.update(bytes(str(time.time()),encoding='utf-8'))
  6. md.update(bytes(name,encoding='utf-8'))
  7. return md.hexdigest()
  8. class Login(APIView):
  9. def post(self,reuquest):
  10. back_msg={'status':1001,'msg':None}
  11. try:
  12. name=reuquest.data.get('name')
  13. pwd=reuquest.data.get('pwd')
  14. user=models.User.objects.filter(username=name,password=pwd).first()
  15. if user:
  16. token=get_random(name)
  17. models.UserToken.objects.update_or_create(user=user,defaults={'token':token})
  18. back_msg['status']='1000'
  19. back_msg['msg']='登录成功'
  20. back_msg['token']=token
  21. else:
  22. back_msg['msg'] = '用户名或密码错误'
  23. except Exception as e:
  24. back_msg['msg']=str(e)
  25. return Response(back_msg)
  26. class Course(APIView):
  27. authentication_classes = [TokenAuth, ]
  28. def get(self, request):
  29. return HttpResponse('get')
  30. def post(self, request):
  31. return HttpResponse('post')

1.1.4 全局使用


  1. REST_FRAMEWORK={
  2. "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",]
  3. }

1.1.5 局部使用


  1. #局部使用,只需要在视图类里加入:
  2. authentication_classes = [TokenAuth, ]

1.2 内置认证方案(需要配合权限使用)

可以在配置文件中配置全局默认的认证方案

  1. REST_FRAMEWORK = {
  2. 'DEFAULT_AUTHENTICATION_CLASSES': (
  3. 'rest_framework.authentication.SessionAuthentication', # session认证
  4. 'rest_framework.authentication.BasicAuthentication', # 基本认证
  5. )
  6. }

也可以在每个视图中通过设置authentication_classess属性来设置

  1. from rest_framework.authentication import SessionAuthentication, BasicAuthentication
  2. from rest_framework.views import APIView
  3. class ExampleView(APIView):
  4. # 类属性
  5. authentication_classes = [SessionAuthentication, BasicAuthentication]
  6. ...

认证失败会有两种可能的返回值:

  • 401 Unauthorized 未认证
  • 403 Permission Denied 权限被禁止

    二 权限Permissions

    权限控制可以限制用户对于视图的访问和对于具体数据对象的访问。

  • 在执行视图的dispatch()方法前,会先进行视图访问权限的判断

  • 在通过get_object()获取具体对象时,会进行模型对象访问权限的判断

    2.1 自定义权限

    2.1.1 编写权限类


  1. # 限制只有超级用户能访问
  2. from rest_framework.permissions import BasePermission
  3. class UserPermission(BasePermission):
  4. message = '不是超级用户,查看不了'
  5. def has_permission(self, request, view):
  6. # user_type = request.user.get_user_type_display()
  7. # if user_type == '超级用户':
  8. # 权限在认证之后,所以能取到user
  9. user_type = request.user.user_type
  10. print(user_type)
  11. if user_type == 1:
  12. return True
  13. else:
  14. return False

2.1.2 全局使用


  1. REST_FRAMEWORK={
  2. "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
  3. "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",]
  4. }

2.1.3 局部使用


  1. # 局部使用只需要在视图类里加入:
  2. permission_classes = [UserPermission,]

2.1.4 说明


  1. 如需自定义权限,需继承rest_framework.permissions.BasePermission父类,并实现以下两个任何一个方法或全部
  2. - `.has_permission(self, request, view)`
  3. 是否可以访问视图, view表示当前视图对象
  4. - `.has_object_permission(self, request, view, obj)`
  5. 是否可以访问数据对象, view表示当前视图, obj为数据对象

2.2 内置权限

2.2.1 内置权限类


  1. from rest_framework.permissions import AllowAny,IsAuthenticated,IsAdminUser,IsAuthenticatedOrReadOnly
  2. - AllowAny 允许所有用户
  3. - IsAuthenticated 仅通过认证的用户
  4. - IsAdminUser 仅管理员用户
  5. - IsAuthenticatedOrReadOnly 已经登陆认证的用户可以对数据进行增删改操作,没有登陆认证的只能查看数据。

2.2.2 全局使用


可以在配置文件中全局设置默认的权限管理类,如

  1. REST_FRAMEWORK = {
  2. ....
  3. 'DEFAULT_PERMISSION_CLASSES': (
  4. 'rest_framework.permissions.IsAuthenticated',
  5. )
  6. }

如果未指明,则采用如下默认配置

  1. 'DEFAULT_PERMISSION_CLASSES': (
  2. 'rest_framework.permissions.AllowAny',
  3. )

2.2.3 局部使用


也可以在具体的视图中通过permission_classes属性来设置,如

  1. from rest_framework.permissions import IsAuthenticated
  2. from rest_framework.views import APIView
  3. class ExampleView(APIView):
  4. permission_classes = (IsAuthenticated,)
  5. ...

2.2.4 实际操作


  1. # 创建超级用户,登陆到admin,创建普通用户(注意设置职员状态,也就是能登陆)
  2. # 全局配置IsAuthenticated
  3. # setting.py
  4. 'DEFAULT_PERMISSION_CLASSES': (
  5. 'rest_framework.permissions.IsAuthenticated',
  6. )
  7. # urls.py
  8. path('test/', views.TestView.as_view()),
  9. # views.py
  10. class TestView(APIView):
  11. def get(self,request):
  12. return Response({'msg':'个人中心'})
  13. # 登陆到admin后台后,直接访问可以,如果没登陆,不能访问
  14. ##注意:如果全局配置了
  15. rest_framework.permissions.IsAdminUser
  16. # 就只有管理员能访问,普通用户访问不了

三 限流Throttling

可以对接口访问的频次进行限制,以减轻服务器压力。
一般用于付费购买次数,投票等场景使用.

3.1 自定义频率类

3.1.1 编写频率类


  1. # 自定义的逻辑
  2. #(1)取出访问者ip
  3. #(2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问,在字典里,继续往下走
  4. #(3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
  5. #(4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
  6. #(5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
  7. class MyThrottles():
  8. VISIT_RECORD = {}
  9. def __init__(self):
  10. self.history=None
  11. def allow_request(self,request, view):
  12. #(1)取出访问者ip
  13. # print(request.META)
  14. ip=request.META.get('REMOTE_ADDR')
  15. import time
  16. ctime=time.time()
  17. # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问
  18. if ip not in self.VISIT_RECORD:
  19. self.VISIT_RECORD[ip]=[ctime,]
  20. return True
  21. self.history=self.VISIT_RECORD.get(ip)
  22. # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
  23. while self.history and ctime-self.history[-1]>60:
  24. self.history.pop()
  25. # (4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
  26. # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
  27. if len(self.history)<3:
  28. self.history.insert(0,ctime)
  29. return True
  30. else:
  31. return False
  32. def wait(self):
  33. import time
  34. ctime=time.time()
  35. return 60-(ctime-self.history[-1])

3.1.2 全局使用


  1. REST_FRAMEWORK = {
  2. 'DEFAULT_THROTTLE_CLASSES':['app01.utils.MyThrottles',],
  3. }

3.1.3 局部使用


  1. #在视图类里使用
  2. throttle_classes = [MyThrottles,]

3.2 内置频率类

3.2.1 根据用户ip限制


  1. #写一个类,继承自SimpleRateThrottle,(根据ip限制)
  2. from rest_framework.throttling import SimpleRateThrottle
  3. class VisitThrottle(SimpleRateThrottle):
  4. scope = 'luffy'
  5. def get_cache_key(self, request, view):
  6. return self.get_ident(request)
  7. #在setting里配置:(一分钟访问三次)
  8. REST_FRAMEWORK = {
  9. 'DEFAULT_THROTTLE_RATES':{
  10. 'luffy':'3/m' # key要跟类中的scop对应
  11. }
  12. }
  13. # 可以全局使用,局部使用

了解:错误信息中文显示

  1. class Course(APIView):
  2. authentication_classes = [TokenAuth, ]
  3. permission_classes = [UserPermission, ]
  4. throttle_classes = [MyThrottles,]
  5. def get(self, request):
  6. return HttpResponse('get')
  7. def post(self, request):
  8. return HttpResponse('post')
  9. def throttled(self, request, wait):
  10. from rest_framework.exceptions import Throttled
  11. class MyThrottled(Throttled):
  12. default_detail = '傻逼啊'
  13. extra_detail_singular = '还有 {wait} second.'
  14. extra_detail_plural = '出了 {wait} seconds.'
  15. raise MyThrottled(wait)

3.2.2 限制匿名用户每分钟访问3次


  1. REST_FRAMEWORK = {
  2. 'DEFAULT_THROTTLE_CLASSES': (
  3. 'rest_framework.throttling.AnonRateThrottle',
  4. ),
  5. 'DEFAULT_THROTTLE_RATES': {
  6. 'anon': '3/m',
  7. }
  8. }
  9. # 使用 `second`, `minute`, `hour` 或`day`来指明周期。
  10. # 可以全局使用,局部使用

3.2.3 限制登陆用户每分钟访问10次


  1. REST_FRAMEWORK = {
  2. 'DEFAULT_THROTTLE_CLASSES': (
  3. 'rest_framework.throttling.UserRateThrottle'
  4. ),
  5. 'DEFAULT_THROTTLE_RATES': {
  6. 'user': '10/m'
  7. }
  8. }
  9. # 可以全局使用,局部使用

3.2.4 其他


1) AnonRateThrottle
限制所有匿名未认证用户,使用IP区分用户。
使用DEFAULT_THROTTLE_RATES['anon'] 来设置频次
2)UserRateThrottle
限制认证用户,使用User id 来区分。
使用DEFAULT_THROTTLE_RATES['user'] 来设置频次
3)ScopedRateThrottle
限制用户对于每个视图的访问频次,使用ip或user id。
例如:

  1. class ContactListView(APIView):
  2. throttle_scope = 'contacts'
  3. ...
  4. class ContactDetailView(APIView):
  5. throttle_scope = 'contacts'
  6. ...
  7. class UploadView(APIView):
  8. throttle_scope = 'uploads'
  9. ...
  10. REST_FRAMEWORK = {
  11. 'DEFAULT_THROTTLE_CLASSES': (
  12. 'rest_framework.throttling.ScopedRateThrottle',
  13. ),
  14. 'DEFAULT_THROTTLE_RATES': {
  15. 'contacts': '1000/day',
  16. 'uploads': '20/day'
  17. }
  18. }

实例

全局配置中设置访问频率

  1. 'DEFAULT_THROTTLE_RATES': {
  2. 'anon': '3/minute',
  3. 'user': '10/minute'
  4. }
  1. from rest_framework.authentication import SessionAuthentication
  2. from rest_framework.permissions import IsAuthenticated
  3. from rest_framework.generics import RetrieveAPIView
  4. from rest_framework.throttling import UserRateThrottle
  5. class StudentAPIView(RetrieveAPIView):
  6. queryset = Student.objects.all()
  7. serializer_class = StudentSerializer
  8. authentication_classes = [SessionAuthentication]
  9. permission_classes = [IsAuthenticated]
  10. throttle_classes = (UserRateThrottle,)