07 认证与授权
认证功能
验证用户登录
身份验证是将僌请求与一组识别凭证(例如请求来自的用户或者其签名的令牌)相关联的机制。然后,权限和限制策略可以使用这些凭据来确定请求是否应该被允许。
DRF框架提供了许多开箱即用的身份验证方案,还允许您实施自定义方案。
身份验证始终在视图的开始处运行,在执行权限和限制检查之前以及在允许其他任何代码继续执行之前运行。
注意:身份验证本身不会允许/禁止传入的请求,它只是标识请求的凭据。
如何确认身份验证
认证方案问题被定义为琴𥌨的列表。DRF框架将尝试使用列表中的每个类进行认证,并将设置request.user
为认证成功返回值的第一个参数
如果没有认证类,request.user
将被设置为一个实例django.contrib.auth.models.AnonymousUser
,request.auth
并将被设置为None。
未经身份验证的请求request.user
和request.auth
的值可以使用UNAUTHENTICATED_USER
和UNAUTHENTICATED_TOKEN
设置进行修改
定义认证方案
全局认证方案
使用 DEFAULT_AUTHENTICATION_CLASSES
指定全局默认的认证方案
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework.authentication.BasicAuthentication', 'rest_framework.authentication.SessionAuthentication',
)
}
自定义认证方案
- 定义认证类
- 注册认证类(视图处理添加类属性)
authentication_classes=[认证类(),]
认证使用范围
- 局部视图使用,指定使用
authentication_classes=[认证类(),]
- 全局视图使用,指定不使用
authentication_classes=[]
在视图类上指定身份验证方案
from rest_framework.authentication import SessionAuthentication, BasicAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
class ExampleView(APIView):
authentication_classes = (SessionAuthentication, BasicAuthentication)
permission_classes = (IsAuthenticated,)
def get(self, request, format=None):
content = {
'user': unicode(request.user), # `django.contrib.auth.User` instance.
'auth': unicode(request.auth), # None
}
return Response(content)
Unauthorized and Forbdden response
请求未通过认证被拒绝时,有两种不同的 status_code是合适的。
- HTTP 401 Unauthorized
- HTTP 403 Permission Denied
401响应必须始终包含一个WWW-Authenticate
header,指示客户端如何进行身份验证。
403响应不包含WWW-Authenticate
header
返回哪种响应取决于认证方案。尽管可能正在使用多种认证方案,但是只能使用一种方案来确定响应的类型。在确定响应类型时使用视图上设置的第一个认证类
请注意,当请求可以成功进行身份验证时,仍然会被拒绝执行请求的权限,在这种情况下无论使用哪种认证方案,始终都会响应HTTP 403 Permission Denied
DRF认证方案
认证方案 | 说明 |
---|---|
BasicAuthentication | 使用HTTP基本认证,用户名与密码认证 |
TokenAuthentication | 基于令牌的HTTP认证方案 |
SessionAuthentication | 基于django的默认session后端进行认证 |
RemoteUserAuthentication | 允许您将身份验证委托给设置REMOTE_USER 环境变量的Web服务器。 |
TokenAuthentication 认证方案配置
# 1.配置settings
INSTALLED_APPS = (
...
'rest_framework.authtoken'
)
# 2.迁移token表
manage.py migrate
# 3.为用户创建token
from rest_framework.authtoken.models import Token
token = Token.objects.create(user=...)
print token.key
# 4.客户端认证,token需要包含在header的Authorization中
Authorization: Token 9944b09199c62bcf9418ad846dd0e4bbdfc6ee4b
# 5.如果认证成功,则TokenAuthentication提供以下凭据。
request.user将是一个Django User实例。
request.auth将是一个rest_framework.authtoken.models.Token实例。
# 6.被拒绝权限的认证会响应 HTTP 401 Unauthorized 适当的 header
WWW-Authenticate: Token
# 7.生产环境使用 TokenAuthentication时,必须使用https
# 8.生成令牌,使用信号
# 如果希望每个用户有一个自动生成的令牌,可以简单的捕捉用户的post_save信号
from django.conf import settings
from django.db.models.signals import post_save
from django.dispatch import receiver
from rest_framework.authtoken.models import Token
@receiver(post_save, sender=settings.AUTH_USER_MODEL)
def create_auth_token(sender, instance=None, created=False, **kwargs):
if created:
Token.objects.create(user=instance)
# 9.如果已经有一些用户了,可以使用以下代码生成现有用户的令牌
from django.contrib.auth.models import User
from rest_framework.authtoken.models import Token
for user in User.objects.all():
Token.objects.get_or_create(user=user)
# 10.提供API接口
# 该obtain_auth_token视图将返回一个JSON响应时有效username和password字段使用形式的数据或JSON发布到视图
from rest_framework.authtoken import views
urlpatterns += [
url(r'^api-token-auth/', views.obtain_auth_token)
]
自定义认证
要实现自定义身份验证方案,需要继承BaseAuthentication
类,并覆盖.authenticate(self, request)
。该方法返回一个二元组,否则返回None
某些情况下你想要.authenticate()
抛出AuthenticationFailed
错误,代替返回的None,
你可以使用的方法:
- 如果没有尝试认证,则返回None,任何其他正在使用的身份验证方案仍将被检查。
- 如果尝试进行身份验证但失败了,请引发
AuthenticationFailed
异常。无论是否进行任何权限检查,都将立即返回错误响应,并且不检查任何其他身份验证方案。
你也可以重写.authenticate_header(self, request)
方法。如果要实现它,应该返回一个字符串,它将作为 HTTP 401 Unauthorized 响应中 header 的WWW-Authenticate的值
如果未覆盖.authenticate_header(self, request)
方法,请求在未通过身份验证时,将返回HTTP 403 Forbidden
认证方案实例
问题1:有些API需要用户登录,有些不需要
解决:
a. 创建两张表
b. 用户登录(返回token并保存到数据库)
模型代码
局部视图使用认证
```python
class UserInfo(models.Model):
user_type_choices = (
(1, '普通用户'),
(2, 'VIP'),
(3, "SVIP'),
)
user_type = models.IntegerField(choices=user_type_choices)
username = models.CharField(max_length=32, unique=True)
password = models.CharField(max_length=64 )
class UserToken(models.Model):
user = models.OneToOneField(to='UserInfo')
token = models.CharField(max_length=64)
视图代码
def md5(user):
import hashlib
import time
ctime = str(time.time())
m = hashlib.md5(bytes(user, encoding='u8'))
m.update(bytes(ctime,encoding='u8'))
return m.hexdigest()
class AuthView(APIView):
"""
用户登录认证视图
"""
def post(self, request, *args, **kwargs):
ret = {'code':1000, 'msg': None}
try:
user = request._request.POST.get('username')
pwd = request._request.POST.get('password')
obj = models.UserInfo.objects.filter(username=user, password=pwd).first()
if not obj:
ret['code'] = 1001
ret['msg'] = '用户名或者密码错误'
# 为登录用户创建token
token = md5(user)
# 保存token到数据库
models.UserToken.objects.update_or_create(user=obj,defaults={'token': token})
ret['token'] = token
except Exception as e:
ret['code'] = 1002
ret['msg'] = '请求异常'
return JsonResponse(ret)
class Authentication:
"""
自定义认证类
"""
def authenticate(self, request):
token = request._requet.GET.get('token')
token_obj = models.UserToken.objects.filter(token=token).first()
if not token_obj:
raise exception.AuthenticationFailed('用户认证失败')
# 在rest framework内部会将整个两个字段赋值给request,供后续操作使用
return (token_obj.user, token_obj)
def authenticate_header(self, request):
pass
class OrderView(APIView):
"""
订单相关业务视图,启用认证类Authtication,
"""
authentication_classes = [Authtication, ]
def get(self, request, *args, **kwargs):
ret = {'code':1000, 'msg':None, 'data':None}
# request.user # 认证类的返回值 token_obj.user
# request.auth # 认证类的返回值 token_obj
try:
ret['data'] = ORDER_DICT # ORDER_DICT 要返回的数据
except Exception as e:
pass
return JsonResponse(ret)
class UserInfoView(APIView):
"""
用户信息视图,启用认证类Authtication,
"""
authentication_classes = [Authtication, ]
def get(self, request, *args, **kwargs):
return HttpResponse('用户信息')
全局视图使用认证
# settings.py
# /api/utils/auth.py中放置认证类 Authtication
REST_FRAMEWORK = {
# 全局认证类
"DEFAULT_AUTHENTICATION_CLASSES": ['api.utils.auth.Authtication',],
"UNAUTHENTICATED_USER":None, # 匿名 request.user =None
"UNAUTHENTICATED_TOKEN":None, # 匿名 request.auth =None
# 全局权限类
"DEFAULT_PERMISSION_CLASSES":['api.utils.permission.SVIPPermission',]
}
# 注意:
# 已经添加了认证类,所有其他类不再需要添加 authentication_classes = [Authtication, ]
# 对于不需要使用认证的视图添加类属性 authentication_classes = [ ],即可取消对该视图的认证
# 自定义认证类,必须继承自: from rest_framework.authentication import BaseAuthentication
权限
局部范围使用
class SVIPPermission:
# 自定义错误消息
message = "必须是SVIP用户才能访问"
# 检查权限
def has_permission(self, request, view):
return True
class OrderView(APIView):
"""
订单相关业务视图
"""
authentication_classes = [Authtication, ]
permission_classes =[MyPermission,]
def get(self, request, *args, **kwargs):
ret = {'code':1000, 'msg':None, 'data':None}
# request.user # 认证类的返回值 token_obj.user
# request.auth # 认证类的返回值 token_obj
# if request.user_type !=3:
# return HttpResponse("无权访问")
try:
ret['data'] = ORDER_DICT # ORDER_DICT 要返回的数据
except Exception as e:
pass
return JsonResponse(ret)
以上配置实现:
- 用户登录之后才能浏览
- 用户登录之后,根据权限检查是什么类型用户
需求:3. 实现某个用户根据访问频率做控制,如6次/分
JWT认证
参考地址:https://www.jianshu.com/p/180a870a308a
前言
在前后端分离的应用中,后端主要作为 Model 层,为前端提供数据访问 API。为了保证数据安全可靠地在用户与服务端之间传输,实现服务端的认证就显得极为必要。
常见的服务端认证方式有以下几种:
- cookie
- token
什么是 JWT
是一套RFC7519标准,它定义了一套简洁、自包含方案,用于安全的在C/S之间传输JSON信息。
有什么优点:
- 体积小
- 传输方式多样:支持http hreader/URL/POST
- 严谨的结构化。
- 支持跨域,多应用于单点登录SSO
为什么选择JWT
除了上面说到的优点外,相对传统的服务端验证,JWT还有以下吸引点:
- 充分依赖无状态API,契合RESTful设计原则(无状态的HTTP)
- 易于实现CDN,将静态资源分布式管理
- 验证解耦,随处生成
- 比cookie更支持原生移动端应用
JWT组成
header
payload 需要处理的实际资料内容
registered Claims
iss 发行者
sub 主题
aud 听众
exp 有效期限,必须大于现在时间,且必须为数字
nbf 生效时间,定义一个时间,在这个时间之前jwt不能进行处理
iat 发行时间,可以被用来判断jwt已经发出了多久
jti jwt唯一识别值
- Public Claims 公开资讯,如使用者姓名等
- Private Claims 发行者与订阅者自行沟通定义的 Claims name与资料
- signature
// Header
{
"alg": "HS256", # 加密算法
"typ": "JWT" # token类型
}
// Payload 声明关于实体(通常是用户)
{
// reserved claims
"iss": "a.com", # 签发者
"exp": "1d", # 过期时间
"iat": # 签发时间
"sub": "1234567890" # 面向的用户
// 公开声明
"http://a.com": true,
// 私人声明
"company": "A",
"awesome": true,
}
// Signature
HS256(Base64(Header) + "." + Base64(Payload), secretKey)
// JWT
JWT = Base64(Header) + "." + Base64(Payload) + "." + $Signature
PyJWT
django的jwt插件: https://pypi.org/project/django-jwt-auth/
示例代码
jwt_payload = jwt.encode({
'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
}, 'secret')
time.sleep(32)
# JWT payload is now expired
# But with some leeway, it will still validate
jwt.decode(jwt_payload, 'secret', leeway=10, algorithms=['HS256'])
# 示例2
import jwt
# 编码
encoded_token = jwt.encode({‘user_id’: “abc”}, ‘SECRET’, algorithm=’HS256')
# ‘eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiYWJjIn0.OW6BZboviYgO6Yy_UTj5jloba7WlPwZnKHPYDUyY3MU’
# 解码
jwt.decode(encoded_token, ‘SECRET’, algorithms=[‘HS256’])
{’user_id’: ’abc’}
到期时间将与当前UTC时间进行比较(as given by timegm(datetime.utcnow().utctimetuple())), 因此请确保在编码中使用UTC时间戳或日期时间。
设置
JWT_ENCODE_HANDLER = 'jwt_auth.utils.jwt_encode_handler'
JWT_DECODE_HANDLER = 'jwt_auth.utils.jwt_decode_handler',
JWT_PAYLOAD_HANDLER = 'jwt_auth.utils.jwt_payload_handler'
JWT_PAYLOAD_GET_USER_ID_HANDLER = 'jwt_auth.utils.jwt_get_user_id_from_payload_handler'
JWT_SECRET_KEY: SECRET_KEY # 安全key
JWT_ALGORITHM = 'HS256'
JWT_VERIFY = True
JWT_VERIFY_EXPIRATION = True
JWT_LEEWAY = 0
JWT_EXPIRATION_DELTA = datetime.timedelta(seconds=300) # 延迟时间
JWT_ALLOW_REFRESH = False
JWT_REFRESH_EXPIRATION_DELTA = datetime.timedelta(days=7)
JWT_AUTH_HEADER_PREFIX = 'Bearer'
Using JWT authentication in Django
- 用户登录时创建一个token
- 下次浏览器发来的任何API请求,必须在auth header中发送token
- 服务器将验证token
创建 jwt token
import jwt,json
from rest_framework import views
from rest_framework.response import Response
from models import User
class Login(views.APIView):
def post(self, request, *args, **kwargs):
if not request.data:
return Response({'Error': "Please provide username/password"}, status="400")
username = request.data['username']
password = request.data['password']
try:
user = User.objects.get(username=username, password=password)
except User.DoesNotExist:
return Response({'Error': "Invalid username/password"}, status="400")
if user:
payload = {
'id': user.id,
'email': user.email,
}
jwt_token = {'token': jwt.encode(payload, "SECRET_KEY")}
return HttpResponse(
json.dumps(jwt_token),
status=200,
content_type="application/json" # 注意 content_type
)
else:
return Response(
json.dumps({'Error': "Invalid credentials"}),
status=400,
content_type="application/json"
)
验证jwt token
from rest_framework import status, exceptions
from django.http import HttpResponse
from rest_framework.authentication import get_authorization_header, BaseAuthentication
from users.models import User
import jwt
import json
class TokenAuthentication(BaseAuthentication):
model = None
def get_model(self):
return User
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'token':
return None
if len(auth) == 1:
msg = 'Invalid token header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid token header'
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1]
if token=="null":
msg = 'Null token not allowed'
raise exceptions.AuthenticationFailed(msg)
except UnicodeError:
msg = 'Invalid token header. Token string should not contain invalid characters.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def authenticate_credentials(self, token):
model = self.get_model()
payload = jwt.decode(token, "SECRET_KEY")
email = payload['email']
userid = payload['id']
msg = {'Error': "Token mismatch",'status' :"401"}
try:
user = User.objects.get(
email=email,
id=userid,
is_active=True
)
if not user.token['token'] == token:
raise exceptions.AuthenticationFailed(msg)
except jwt.ExpiredSignature or jwt.DecodeError or jwt.InvalidTokenError:
return HttpResponse({'Error': "Token is invalid"}, status="403")
except User.DoesNotExist:
return HttpResponse({'Error': "Internal server error"}, status="500")
return (user, token)
def authenticate_header(self, request):
return 'Token'
jwt设置
'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=300),
'JWT_REFRESH_EXPIRATION_DELTA': datetime.timedelta(days=7),
JWT_REFRESH_EXPIRATION_DELTA
的确切含义是自从原始token被发布出去后,多长时间范围内它以及它的子孙token可以被用来刷新以获得新的子孙token
。
原始token,指的是通过用户名/密码验证后获得的token(即
obtain_jwt_token
接口返回的token),原始token刷新后获得的token1,以及token1继续刷新获得的token2、token2再获得token3……形成了一串token链,这些token的过期时间都是JWT_EXPIRATION_DELTA
。然而这串token链不可能一直无限制刷新下去,直到原始token发布后的
JWT_REFRESH_EXPIRATION_DELTA
后,刷新操作将被终止,框架会返回HTTP 400。之后必须再次调用obtain_jwt_token
来获得新的原始token。
再回到5mins + 7days的默认设置上来,客户端首先调用obtain_jwt_token
进行登录操作,之后必须每隔小于5分钟就刷新一次token,才能保证不掉线。然而即使一直保持在线,上限也只有7天,7天过后必须重新登录,这才是5mins + 7days的确切含义。
显而易见,这种默认策略如果用在手机客户端上,是很不合适的。客户端如果置后台超过5分钟,再回去就得重新登录;就算一直在前台,连续用7天后还是得重新登录。也难怪乎那么多人会无法理解这一组设置的真正用意了。
我目前在客户端上准备采用30days + 360days的做法,只要一个月内使用一次app,就不用重新登录,最长可以一年不用重新登录。
短信验证
注册短信接口
注册第三方短信接口
发送短信代码
class Yunpian(object):
def_init(self,api_key):
self.api_key=api_key
self.single_send_ur1="https://sms.yunpian.com/v2/sms/single_send"
def send_sms(self,code,mobile):
parmas={
"apikey": self.api_key,
"mobile": mobile,"
"text" :"【京东商城】您的验证码是{code}。如非本人操作,请忽略本短信"
}
response=requests.post(self.single_send_url,data=parmas)
re_dict=json.loads(response.text)
return re_dict
if __name__ == "__main__":
smsder = Yunpian("xxxxxxxxx")
smsder.send_sms()
throttle
节流与权限类似,因为它确定是否应该授权请求。QOS
DRF节流设置
全局设置
REST_FRAMEWORK = {
# 全局节流设置,使用默认节流器
'DEFAULT_THROTTLE_CLASSES': (
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle'
),
'DEFAULT_THROTTLE_RATES': {
# 速率支持 second, minute, hour, day
# 或者 s, m, h, d
'anon': '100/day', # 100匿名用户/天
'user': '1000/day' # 1000用户/天
}
}
识别客户端
- X-Forwarded-For 优先
- REMOTE_ADDR
X-Forwarded-For 工作原理: 从客户端到服务器中间经过的代理的一个IP列表
语法:
X-Forwarded-For: <client>, <proxy1>, <proxy2>
设置缓存
DRF节流器使用django的缓存后端保存节流器设置。你应该确保你已经设置了适当的缓存设置。
代码
class VisitThrottle(BaseThrottle):
def __init__(self):
self.history = None # 初始化识别客户端列表
def allow_request(self, request, view):
# 逻辑代码,按指定条件限制请求
# return True # 表示可以继续访问
# return False # 访问频率太高,被限制
pass
def wait(self):
# 还需要等待多少秒才能访问
return 60 - atime
class VisiThrottle(SimpleRateThrottle):
scope = "Luffy"
def get_cache_key(self, request, view):
return self.get_ident(request)
DRF内置节流器
节流器 | |
---|---|
AnonRateThrottle | 根据IP,限制未授权的用户 |
UserRateThrottle | 基于用户ID,限制用户 |
ScopedRateThrottle | 只有当正在访问的视图包含.throttle_scope属性时才会应用此节流阀。 |