1 前端直传
在客户端通过JavaScript代码完成签名,无需过多配置,即可实现直传,非常方便。
但是
客户端通过JavaScript把AccesssKey ID和AccessKey Secret写在代码里面有泄露的风险,
强烈建议使用服务端签名后直传或者STS临时授权访问OSS。
2 后端上传
(1) 原理

和数据直传到OSS相比,以上方法存在以下缺点:
- 上传慢:用户数据需先上传到应用服务器,之后再上传到OSS,网络传输时间比直传到OSS多一倍。如果用户数据不通过应用服务器中转,而是直传到OSS,速度将大大提升。而且OSS采用BGP带宽,能保证各地各运营商之间的传输速度。
- 扩展性差:如果后续用户数量逐渐增加,则应用服务器会成为瓶颈。
- 费用高:需要准备多台应用服务器。由于OSS上行流量是免费的,如果数据直传到OSS,将节省多台应用服务器的费用。
(2) 代码
pip install oss2
import oss2OSS_BASE_PATH = "https://cncms-server.oss-cn-beijing.aliyuncs.com/"end_point = 'https://oss-cn-beijing.aliyuncs.com'bucket_name = 'cncms-server'class OssFileStorage():def __init__(self, root_store_path: str):self.auth = oss2.Auth(access_key_id, access_key_secret)self.bucket = oss2.Bucket(self.auth, end_point, bucket_name)self.root_store_path = root_store_path # 根存储路径def file_upload(self, pk: str, category: str, file_name: str, file_content: str) -> dict:'''文件上传'''# 拼接文件路径object_path = self.root_store_path + "/" + pk + "/" + category + "/" + file_name# 上传文件self.bucket.put_object(object_path, file_content)# 转义quote_file_name = urllib.parse.quote(file_name)object_path = self.root_store_path + "/" + pk + "/" + category + "/" + quote_file_namefile_url = OSS_BASE_PATH + object_pathresult = {"object_path": object_path,"file_url": file_url}return resultdef delete_file(self, file_url: str):'''删除文件'''object_path = file_url.replace(OSS_BASE_PATH, "")self.bucket.delete_object(object_path)def is_file_exist(self, filename):ret = self.bucket.object_exists(filename)return retdef generate_download_url(self, url):if not url:return None# 从数据库存储的完整路径中获取object_nameobject_name = url.replace(OSS_BASE_PATH, "")# 反转义, 避免路径中包含中文后不能加AccesskeyIDobject_name = urllib.parse.unquote(object_name)# 生成下载文件的签名URL,有效时间为1天。# 生成签名URL时,OSS默认会对Object完整路径中的正斜线(/)进行转义,从而导致生成的签名URL无法直接使用。# 设置slash_safe为True,OSS不会对Object完整路径中的正斜线(/)进行转义,此时生成的签名URL可以直接使用。download_url = self.bucket.sign_url('GET', object_name, 24*60*60, slash_safe=True)return download_url
@bp_hard.route('/hardwares/export', methods=['GET'])@auth.login_required@auth.module_harddef export_hardware():"""导出硬件"""xls_name = ['硬件']fit_hards = filter_hards(request, xls_name)# 生成excel文件file_name = '_'.join(xls_name) + '.xlsx'file_path = './temp/' + file_namexlsx_export.export_xlsx_hards(fit_hards, file_path)# 同步上传file_content = open(file_path, 'rb').read()ret = fs.file_upload('excel', 'hard', file_name, file_content)file_url = ret['file_url']result = {'fileUrl': fs.generate_download_url(file_url)}return jsonify(errno=RET.OK, errmsg='导出硬件数据成功', result=result)
3 服务端签名后, 前端上传
(1) 原理

文档:
https://help.aliyun.com/document_detail/91848.html
第一个GET请求的响应
{"accessid": "LTAI5tA7ijeV6BCwdeHWy5hQ","host": "http://wstest1.oss-cn-beijing.aliyuncs.com","policy": "eyJleHBpcmF0aW9uIjogIjIwMjItMDEtMDVUMDU6MjM6NTlaIiwgImNvbmRpdGlvbnMiOiBbWyJzdGFydHMtd2l0aCIsICIka2V5IiwgInVzZXItZGlyLXByZWZpeC8iXV19","signature": "dzqZQaklHQcA6K4Rk+6pZ5/RP7M=","expire": 1641360239,"dir": "user-dir-prefix/","callback": "eyJjYWxsYmFja1VybCI6ICJodHRwOi8vNDcuMTA4LjE3MC4xOTU6NDcxMjMiLCAiY2FsbGJhY2tCb2R5IjogImZpbGVuYW1lPSR7b2JqZWN0fSZzaXplPSR7c2l6ZX0mbWltZVR5cGU9JHttaW1lVHlwZX0maGVpZ2h0PSR7aW1hZ2VJbmZvLmhlaWdodH0md2lkdGg9JHtpbWFnZUluZm8ud2lkdGh9IiwgImNhbGxiYWNrQm9keVR5cGUiOiAiYXBwbGljYXRpb24veC13d3ctZm9ybS11cmxlbmNvZGVkIn0="}
(2) 代码
1) appserver.py
# -*- coding: UTF-8 -*-import socketimport base64import sysimport timeimport datetimeimport jsonimport hmacfrom hashlib import sha1 as shaimport httpserver# 请填写您的AccessKeyId。access_key_id = 'LTAI5tA7ijeV6BCwdeHWy5hQ'# 请填写您的AccessKeySecret。access_key_secret = 'bDikXRrncXK1A9JCMrwV1c89WN2ySr'# host的格式为 bucketname.endpoint ,请替换为您的真实信息。host = 'http://wstest1.oss-cn-beijing.aliyuncs.com'# callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。callback_url = "http://192.168.1.103:47123"# 用户上传文件时指定的前缀。upload_dir = 'user-dir-prefix/'expire_time = 3000000def get_iso_8601(expire):gmt = datetime.datetime.utcfromtimestamp(expire).isoformat()gmt += 'Z'return gmtdef get_token():now = int(time.time())expire_syncpoint = now + expire_timeexpire = get_iso_8601(expire_syncpoint)policy_dict = {}policy_dict['expiration'] = expirecondition_array = []array_item = []array_item.append('starts-with')array_item.append('$key')array_item.append(upload_dir)condition_array.append(array_item)policy_dict['conditions'] = condition_arraypolicy = json.dumps(policy_dict).strip()policy_encode = base64.b64encode(policy.encode())h = hmac.new(access_key_secret.encode(), policy_encode, sha)sign_result = base64.encodestring(h.digest()).strip()callback_dict = {}callback_dict['callbackUrl'] = callback_urlcallback_dict['callbackBody'] = 'filename=${object}&size=${size}&mimeType=${mimeType}' \'&height=${imageInfo.height}&width=${imageInfo.width}'callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'callback_param = json.dumps(callback_dict).strip()base64_callback_body = base64.b64encode(callback_param.encode())token_dict = {}token_dict['accessid'] = access_key_idtoken_dict['host'] = hosttoken_dict['policy'] = policy_encode.decode()token_dict['signature'] = sign_result.decode()token_dict['expire'] = expire_syncpointtoken_dict['dir'] = upload_dirtoken_dict['callback'] = base64_callback_body.decode()result = json.dumps(token_dict)return resultdef get_local_ip():"""获取本机 IPV4 地址:return: 成功返回本机 IP 地址,否则返回空"""try:csocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)csocket.connect(('8.8.8.8', 80))(addr, port) = csocket.getsockname()csocket.close()return addrexcept socket.error:return ""def do_POST(server):"""启用 POST 调用处理逻辑:param server: Web HTTP Server 服务:return:"""print("********************* do_POST ")# get public keypub_key_url = ''try:pub_key_url_base64 = server.headers['x-oss-pub-key-url']pub_key = httpserver.get_pub_key(pub_key_url_base64)except Exception as e:print(str(e))print('Get pub key failed! pub_key_url : ' + pub_key_url)server.send_response(400)server.end_headers()return# get authorizationauthorization_base64 = server.headers['authorization']# get callback bodycontent_length = server.headers['content-length']callback_body = server.rfile.read(int(content_length))# compose authorization stringauth_str = ''pos = server.path.find('?')if -1 == pos:auth_str = server.path + '\n' + callback_body.decode()else:auth_str = httpserver.get_http_request_unquote(server.path[0:pos]) + server.path[pos:] + '\n' + callback_bodyresult = httpserver.verrify(auth_str, authorization_base64, pub_key)if not result:print('Authorization verify failed!')print('Public key : %s' % (pub_key))print('Auth string : %s' % (auth_str))server.send_response(400)server.end_headers()return# response to OSSresp_body = '{"Status":"OK"}'server.send_response(200)server.send_header('Content-Type', 'application/json')server.send_header('Content-Length', str(len(resp_body)))server.end_headers()server.wfile.write(resp_body.encode())def do_GET(server):"""启用 Get 调用处理逻辑:param server: Web HTTP Server 服务:return:"""print("********************* do_GET ")token = get_token()server.send_response(200)server.send_header('Access-Control-Allow-Methods', 'POST')server.send_header('Access-Control-Allow-Origin', '*')server.send_header('Content-Type', 'text/html; charset=UTF-8')server.end_headers()server.wfile.write(token.encode())if '__main__' == __name__:# 在服务器中, 0.0.0.0指的是本机上的所有IPV4地址, 如果一个主机有两个IP地址,# 192.168.1.1 和 10.1.2.1, 并且该主机上的一个服务监听的地址是0.0.0.0, 那么通过两个IP地址都能够访问该服务。# server_ip = get_local_ip() 若用户希望监听本机外网IPV4地址,则采用本行代码并注释掉下一行代码server_ip = "0.0.0.0"server_port = 47123if len(sys.argv) == 2:server_port = int(sys.argv[1])if len(sys.argv) == 3:server_ip = sys.argv[1]server_port = int(sys.argv[2])print("App server is running on http://%s:%s " % (server_ip, server_port))server = httpserver.MyHTTPServer(server_ip, server_port)server.serve_forever()
2) httpserver.py
# -*- coding: utf-8 -*-"""兼容Python 2.X 与 3.X 版本"""import appserverimport base64import urllib.requestfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom Crypto.Signature import PKCS1_v1_5from Crypto.Hash import MD5from Crypto.PublicKey import RSAdef verrify(auth_str, authorization_base64, pub_key):"""校验签名是否正确(MD5 + RAS):param auth_str: 文本信息:param authorization_base64: 签名信息:param pub_key: 公钥:return: 若签名验证正确返回 True 否则返回 False"""pub_key_load = RSA.importKey(pub_key)auth_md5 = MD5.new(auth_str.encode())result = Falsetry:result = PKCS1_v1_5.new(pub_key_load).verify(auth_md5, base64.b64decode(authorization_base64.encode()))except Exception as e:print(e)result = Falsereturn resultdef get_http_request_unquote(url):return urllib.request.unquote(url)def get_pub_key(pub_key_url_base64):""" 抽取出 public key 公钥 """pub_key_url = base64.b64decode(pub_key_url_base64.encode())url_reader = urllib.request.urlopen(pub_key_url.decode())pub_key = url_reader.read()return pub_keyclass MyHTTPRequestHandler(BaseHTTPRequestHandler):def do_POST(self):"""处理 POST 请求"""return appserver.do_POST(self)def do_GET(self):"""处理 GET 请求"""appserver.do_GET(self)class MyHTTPServer(HTTPServer):def __init__(self, host, port):print("run app server by python3!")HTTPServer.__init__(self, (host, port), MyHTTPRequestHandler)
