本文以错题本算法为例
1. 准备
import timefrom requests import requestfrom urllib.parse import urljoinfilename = '1.jpg' # 图片文件路径corners = [[0, 0], [2160, 0], [0, 3840], [2160, 3840]]host = 'http://gateway.algo.leqi.us' # 算法host地址
2. 请求接口
该算法中oss_file必填参数为一图片文件,因此需要先将图片上传至阿里云OSS上.
上传方式为:
- 调获取文件上传信息接口获得文件的上传地址以及OSS文件名 ```python url = urljoin(host, ‘/api/oss/url’) request_body = { “user_name”:”your_name”, “password”:”your password”, “filename”: “1.jpg” } oss_info = request(“POST”, url=url, json=request_body).json() put_url = oss_info[‘put_url’] oss_name = oss_info[‘oss_name’]
上传文件
request.put(put_url, data=open(filename, ‘rb’).read())
2. 调用[发布算法接口](https://www.yuque.com/fenfendeyouzhiqingnian/algorithm/rc39bm)发布算法
```python
url = urljoin(host, 'api/algorithm')
request_body = {
"user_name":"your_name",
"password":"your password",
"target":"topic",
"request":{
"oss_file": oss_name,
"corners":corners
}
}
resp = request("POST", url=url, json=request_body).json()
task_id = resp['task_id']
轮询调用获取任务结果接口获得任务处理结果
url = urljoin(host, 'api/algorithm/task/{}'.format(task_id)) while True: resp = request('GET', url=url).json() if resp['gateway_code']==1001: print('任务处理异常 ', resp['error']) exit() elif resp['gateway_code']==1002: # 任务还在处理中 time.sleep(1) continue else resp['gateway_code']!=1000: print('未知状态码' resp['error']) exit() # 任务处理成功 result_im_oss_name = resp['result']['result_im_oss_name'] box = resp['result']['box']拿到任务结果后,可以调用获取文件url接口来获取文件的下载地址 ```python url = urljoin(host, ‘api/oss/url/‘) params = {“filename”:result_im_oss_name} resp = request(“GET”, url, params=params).json()
文件的下载/预览地址
preview_url = resp[‘preview_url’]
可选,下载文件到本地存储
with open(“result.jpg”, ‘wb’) as f: f.write(request(“GET”, url).content)
<a name="Ko7xk"></a>
#### 3. 完整代码
```python
import time
from requests import request
from urllib.parse import urljoin
filename = '1.jpg' # 图片文件路径
corners = [[0, 0], [2160, 0], [0, 3840], [2160, 3840]]
host = 'http://gateway.algo.leqi.us' # 算法host地址
url = urljoin(host, '/api/oss/url')
request_body = {
"user_name":"your_name",
"password":"your password",
"filename": "1.jpg"
}
oss_info = request("POST", url=url, json=request_body).json()
put_url = oss_info['put_url']
oss_name = oss_info['oss_name']
# 上传文件
request.put(put_url, data=open(filename, 'rb').read())
url = urljoin(host, 'api/algorithm')
request_body = {
"user_name":"your_name",
"password":"your password",
"target":"topic",
"request":{
"oss_file": oss_name,
"corners":corners
}
}
resp = request("POST", url=url, json=request_body).json()
task_id = resp['task_id']
url = urljoin(host, 'api/algorithm/task/{}'.format(task_id))
while True:
resp = request('GET', url=url).json()
if resp['gateway_code']==1001:
print('任务处理异常 ', resp['error'])
exit()
elif resp['gateway_code']==1002:
# 任务还在处理中
time.sleep(1)
continue
else resp['gateway_code']!=1000:
print('未知状态码' resp['error'])
exit()
# 任务处理成功
result_im_oss_name = resp['result']['result_im_oss_name']
box = resp['result']['box']
url = urljoin(host, 'api/oss/url/')
params = {"filename":result_im_oss_name}
resp = request("GET", url, params=params).json()
# 文件的下载/预览地址
preview_url = resp['preview_url']
# 可选,下载文件到本地存储
with open("result.jpg", 'wb') as f:
f.write(request("GET", url).content)
