1 前端直传
在客户端通过JavaScript代码完成签名,无需过多配置,即可实现直传,非常方便。
但是
客户端通过JavaScript把AccesssKey ID和AccessKey Secret写在代码里面有泄露的风险,
强烈建议使用服务端签名后直传或者STS临时授权访问OSS。
2 后端上传
(1) 原理
和数据直传到OSS相比,以上方法存在以下缺点:
- 上传慢:用户数据需先上传到应用服务器,之后再上传到OSS,网络传输时间比直传到OSS多一倍。如果用户数据不通过应用服务器中转,而是直传到OSS,速度将大大提升。而且OSS采用BGP带宽,能保证各地各运营商之间的传输速度。
- 扩展性差:如果后续用户数量逐渐增加,则应用服务器会成为瓶颈。
- 费用高:需要准备多台应用服务器。由于OSS上行流量是免费的,如果数据直传到OSS,将节省多台应用服务器的费用。
(2) 代码
pip install oss2
import oss2
OSS_BASE_PATH = "https://cncms-server.oss-cn-beijing.aliyuncs.com/"
end_point = 'https://oss-cn-beijing.aliyuncs.com'
bucket_name = 'cncms-server'
class OssFileStorage():
def __init__(self, root_store_path: str):
self.auth = oss2.Auth(access_key_id, access_key_secret)
self.bucket = oss2.Bucket(self.auth, end_point, bucket_name)
self.root_store_path = root_store_path # 根存储路径
def file_upload(self, pk: str, category: str, file_name: str, file_content: str) -> dict:
'''文件上传'''
# 拼接文件路径
object_path = self.root_store_path + "/" + pk + "/" + category + "/" + file_name
# 上传文件
self.bucket.put_object(object_path, file_content)
# 转义
quote_file_name = urllib.parse.quote(file_name)
object_path = self.root_store_path + "/" + pk + "/" + category + "/" + quote_file_name
file_url = OSS_BASE_PATH + object_path
result = {
"object_path": object_path,
"file_url": file_url
}
return result
def delete_file(self, file_url: str):
'''删除文件'''
object_path = file_url.replace(OSS_BASE_PATH, "")
self.bucket.delete_object(object_path)
def is_file_exist(self, filename):
ret = self.bucket.object_exists(filename)
return ret
def generate_download_url(self, url):
if not url:
return None
# 从数据库存储的完整路径中获取object_name
object_name = url.replace(OSS_BASE_PATH, "")
# 反转义, 避免路径中包含中文后不能加AccesskeyID
object_name = urllib.parse.unquote(object_name)
# 生成下载文件的签名URL,有效时间为1天。
# 生成签名URL时,OSS默认会对Object完整路径中的正斜线(/)进行转义,从而导致生成的签名URL无法直接使用。
# 设置slash_safe为True,OSS不会对Object完整路径中的正斜线(/)进行转义,此时生成的签名URL可以直接使用。
download_url = self.bucket.sign_url('GET', object_name, 24*60*60, slash_safe=True)
return download_url
@bp_hard.route('/hardwares/export', methods=['GET'])
@auth.login_required
@auth.module_hard
def export_hardware():
"""导出硬件"""
xls_name = ['硬件']
fit_hards = filter_hards(request, xls_name)
# 生成excel文件
file_name = '_'.join(xls_name) + '.xlsx'
file_path = './temp/' + file_name
xlsx_export.export_xlsx_hards(fit_hards, file_path)
# 同步上传
file_content = open(file_path, 'rb').read()
ret = fs.file_upload('excel', 'hard', file_name, file_content)
file_url = ret['file_url']
result = {'fileUrl': fs.generate_download_url(file_url)}
return jsonify(errno=RET.OK, errmsg='导出硬件数据成功', result=result)
3 服务端签名后, 前端上传
(1) 原理
文档:
https://help.aliyun.com/document_detail/91848.html
第一个GET请求的响应
{
"accessid": "LTAI5tA7ijeV6BCwdeHWy5hQ",
"host": "http://wstest1.oss-cn-beijing.aliyuncs.com",
"policy": "eyJleHBpcmF0aW9uIjogIjIwMjItMDEtMDVUMDU6MjM6NTlaIiwgImNvbmRpdGlvbnMiOiBbWyJzdGFydHMtd2l0aCIsICIka2V5IiwgInVzZXItZGlyLXByZWZpeC8iXV19",
"signature": "dzqZQaklHQcA6K4Rk+6pZ5/RP7M=",
"expire": 1641360239,
"dir": "user-dir-prefix/",
"callback": "eyJjYWxsYmFja1VybCI6ICJodHRwOi8vNDcuMTA4LjE3MC4xOTU6NDcxMjMiLCAiY2FsbGJhY2tCb2R5IjogImZpbGVuYW1lPSR7b2JqZWN0fSZzaXplPSR7c2l6ZX0mbWltZVR5cGU9JHttaW1lVHlwZX0maGVpZ2h0PSR7aW1hZ2VJbmZvLmhlaWdodH0md2lkdGg9JHtpbWFnZUluZm8ud2lkdGh9IiwgImNhbGxiYWNrQm9keVR5cGUiOiAiYXBwbGljYXRpb24veC13d3ctZm9ybS11cmxlbmNvZGVkIn0="
}
(2) 代码
1) appserver.py
# -*- coding: UTF-8 -*-
import socket
import base64
import sys
import time
import datetime
import json
import hmac
from hashlib import sha1 as sha
import httpserver
# 请填写您的AccessKeyId。
access_key_id = 'LTAI5tA7ijeV6BCwdeHWy5hQ'
# 请填写您的AccessKeySecret。
access_key_secret = 'bDikXRrncXK1A9JCMrwV1c89WN2ySr'
# host的格式为 bucketname.endpoint ,请替换为您的真实信息。
host = 'http://wstest1.oss-cn-beijing.aliyuncs.com'
# callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。
callback_url = "http://192.168.1.103:47123"
# 用户上传文件时指定的前缀。
upload_dir = 'user-dir-prefix/'
expire_time = 3000000
def get_iso_8601(expire):
gmt = datetime.datetime.utcfromtimestamp(expire).isoformat()
gmt += 'Z'
return gmt
def get_token():
now = int(time.time())
expire_syncpoint = now + expire_time
expire = get_iso_8601(expire_syncpoint)
policy_dict = {}
policy_dict['expiration'] = expire
condition_array = []
array_item = []
array_item.append('starts-with')
array_item.append('$key')
array_item.append(upload_dir)
condition_array.append(array_item)
policy_dict['conditions'] = condition_array
policy = json.dumps(policy_dict).strip()
policy_encode = base64.b64encode(policy.encode())
h = hmac.new(access_key_secret.encode(), policy_encode, sha)
sign_result = base64.encodestring(h.digest()).strip()
callback_dict = {}
callback_dict['callbackUrl'] = callback_url
callback_dict['callbackBody'] = 'filename=${object}&size=${size}&mimeType=${mimeType}' \
'&height=${imageInfo.height}&width=${imageInfo.width}'
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = base64.b64encode(callback_param.encode())
token_dict = {}
token_dict['accessid'] = access_key_id
token_dict['host'] = host
token_dict['policy'] = policy_encode.decode()
token_dict['signature'] = sign_result.decode()
token_dict['expire'] = expire_syncpoint
token_dict['dir'] = upload_dir
token_dict['callback'] = base64_callback_body.decode()
result = json.dumps(token_dict)
return result
def get_local_ip():
"""
获取本机 IPV4 地址
:return: 成功返回本机 IP 地址,否则返回空
"""
try:
csocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
csocket.connect(('8.8.8.8', 80))
(addr, port) = csocket.getsockname()
csocket.close()
return addr
except socket.error:
return ""
def do_POST(server):
"""
启用 POST 调用处理逻辑
:param server: Web HTTP Server 服务
:return:
"""
print("********************* do_POST ")
# get public key
pub_key_url = ''
try:
pub_key_url_base64 = server.headers['x-oss-pub-key-url']
pub_key = httpserver.get_pub_key(pub_key_url_base64)
except Exception as e:
print(str(e))
print('Get pub key failed! pub_key_url : ' + pub_key_url)
server.send_response(400)
server.end_headers()
return
# get authorization
authorization_base64 = server.headers['authorization']
# get callback body
content_length = server.headers['content-length']
callback_body = server.rfile.read(int(content_length))
# compose authorization string
auth_str = ''
pos = server.path.find('?')
if -1 == pos:
auth_str = server.path + '\n' + callback_body.decode()
else:
auth_str = httpserver.get_http_request_unquote(server.path[0:pos]) + server.path[pos:] + '\n' + callback_body
result = httpserver.verrify(auth_str, authorization_base64, pub_key)
if not result:
print('Authorization verify failed!')
print('Public key : %s' % (pub_key))
print('Auth string : %s' % (auth_str))
server.send_response(400)
server.end_headers()
return
# response to OSS
resp_body = '{"Status":"OK"}'
server.send_response(200)
server.send_header('Content-Type', 'application/json')
server.send_header('Content-Length', str(len(resp_body)))
server.end_headers()
server.wfile.write(resp_body.encode())
def do_GET(server):
"""
启用 Get 调用处理逻辑
:param server: Web HTTP Server 服务
:return:
"""
print("********************* do_GET ")
token = get_token()
server.send_response(200)
server.send_header('Access-Control-Allow-Methods', 'POST')
server.send_header('Access-Control-Allow-Origin', '*')
server.send_header('Content-Type', 'text/html; charset=UTF-8')
server.end_headers()
server.wfile.write(token.encode())
if '__main__' == __name__:
# 在服务器中, 0.0.0.0指的是本机上的所有IPV4地址, 如果一个主机有两个IP地址,
# 192.168.1.1 和 10.1.2.1, 并且该主机上的一个服务监听的地址是0.0.0.0, 那么通过两个IP地址都能够访问该服务。
# server_ip = get_local_ip() 若用户希望监听本机外网IPV4地址,则采用本行代码并注释掉下一行代码
server_ip = "0.0.0.0"
server_port = 47123
if len(sys.argv) == 2:
server_port = int(sys.argv[1])
if len(sys.argv) == 3:
server_ip = sys.argv[1]
server_port = int(sys.argv[2])
print("App server is running on http://%s:%s " % (server_ip, server_port))
server = httpserver.MyHTTPServer(server_ip, server_port)
server.serve_forever()
2) httpserver.py
# -*- coding: utf-8 -*-
"""
兼容Python 2.X 与 3.X 版本
"""
import appserver
import base64
import urllib.request
from http.server import HTTPServer, BaseHTTPRequestHandler
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import MD5
from Crypto.PublicKey import RSA
def verrify(auth_str, authorization_base64, pub_key):
"""
校验签名是否正确(MD5 + RAS)
:param auth_str: 文本信息
:param authorization_base64: 签名信息
:param pub_key: 公钥
:return: 若签名验证正确返回 True 否则返回 False
"""
pub_key_load = RSA.importKey(pub_key)
auth_md5 = MD5.new(auth_str.encode())
result = False
try:
result = PKCS1_v1_5.new(pub_key_load).verify(auth_md5, base64.b64decode(authorization_base64.encode()))
except Exception as e:
print(e)
result = False
return result
def get_http_request_unquote(url):
return urllib.request.unquote(url)
def get_pub_key(pub_key_url_base64):
""" 抽取出 public key 公钥 """
pub_key_url = base64.b64decode(pub_key_url_base64.encode())
url_reader = urllib.request.urlopen(pub_key_url.decode())
pub_key = url_reader.read()
return pub_key
class MyHTTPRequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
"""处理 POST 请求"""
return appserver.do_POST(self)
def do_GET(self):
"""处理 GET 请求"""
appserver.do_GET(self)
class MyHTTPServer(HTTPServer):
def __init__(self, host, port):
print("run app server by python3!")
HTTPServer.__init__(self, (host, port), MyHTTPRequestHandler)