pip install leqi-algorithm-mic-sdk
1. 准备
import timefrom algorithm_mic_sdk import errorfrom algorithm_mic_sdk.algorithms.topic import Topicfrom algorithm_mic_sdk.auth import AuthInfofrom algorithm_mic_sdk.base import Basefrom algorithm_mic_sdk.tools import FileInfofilename = '1.jpg' # 图片文件路径corners = [[0, 0], [2160, 0], [0, 3840], [2160, 3840]]host = 'http://gateway.algo.leqi.us' # 算法host地址user_name = 'your_name'password = 'your_password'# 初始化授权信息类auth_info = AuthInfo(host, user_name, password)
2. 请求接口
该算法中
oss_file必填参数为一图片文件,因此需要先将图片上传至阿里云OSS上.
构建
FileInfo对象file_info = FileInfo.for_file_bytes(open(filename, 'rb').read())
实例化
Topic类,调用其asynchronous_request方法来异步请求算法.对于耗时短的算法可以直接调用同步请求方法
synchronous_request# 创建算法对象,各个算法的各个参数具体含义可见文档topic = Topic(auth_info=auth_info, file=file_info, corners=corners)resp = topic.asynchronous_request()task_id = resp.task_id
轮询调用
get_results方法获取任务结果.get_results方法对于所有algorithm_mic_sdk.base.Base类及其子类都可以使用,与具体的算法无关.
while True:# 根据任务ID获取结果,此处可以也可以用:# resp = Base(auth_info).get_results(task_id)resp = topic.get_results(task_id)if resp.gateway_code==1001:print('任务处理异常 ', resp.gateway_error)exit()elif resp.gateway_code==1002:# 任务还在处理中time.sleep(1)continueelif resp.gateway_code!=1000:print('未知状态码', resp.gateway_error)exit()# 任务处理成功result_im_oss_name = resp.result['result_im_oss_name']box = resp.result['box']
- 拿到任务结果后,调用
get_file方法获取文件二进制数据get_file方法对于所有algorithm_mic_sdk.base.Base类及其子类都可以使用,与具体的算法无关.
# 可选,下载文件到本地存储with open("result.jpg", 'wb') as f:f.write(topic.get_file(result_im_oss_name).content)
3. 完整代码
import timefrom algorithm_mic_sdk.algorithms.topic import Topicfrom algorithm_mic_sdk.auth import AuthInfofrom algorithm_mic_sdk.tools import FileInfofilename = '1.jpg' # 图片文件路径corners = [[0, 0], [2160, 0], [0, 3840], [2160, 3840]]host = 'http://gateway.algo.leqi.us' # 算法host地址user_name = 'your_name'password = 'your_password'# 初始化授权信息类auth_info = AuthInfo(host, user_name, password)file_info = FileInfo.for_file_bytes(open(filename, 'rb').read())# 创建算法对象,各个算法的各个参数具体含义可见文档topic = Topic(auth_info=auth_info, file=file_info, corners=corners)resp = topic.asynchronous_request()task_id = resp.task_idwhile True:# 根据任务ID获取结果,此处可以也可以用:# resp = Base(auth_info).get_results(task_id)resp = topic.get_results(task_id)if resp.gateway_code == 1001:print('任务处理异常 ', resp.gateway_error)exit()elif resp.gateway_code == 1002:print('任务还在处理中')time.sleep(1)continueelif resp.gateway_code != 1000:print('未知状态码', resp.gateway_error)exit()print('任务处理成功')result_im_oss_name = resp.result['result_im_oss_name']box = resp.result['box']# 可选,下载文件到本地存储with open("result.jpg", 'wb') as f:f.write(topic.get_file(result_im_oss_name))break
