EasyTexMiner提供弹性分布式预测,可以加载EasyTransfer/EasyTexMiner/Google开源BERT训练出来的模型在PAI上进行离线分布式预测,相比标准的tensorflow的分布式推理有以下几个特征:
- 优化了PAI Studio上离线分布式推理速度,相比之前版本,视集群资源情况基本能提升1.5~3倍左右
- PAI DSW上本地推理加速1.1x倍,相比之前GPU几乎都能打满
- 支持了超卖等弹性资源调度,和FailOver的处理,一个worker挂掉会重启,不影响总体任务
- 有效地解决了推理时快慢机的现象
- 支持Balde对模型进行优化,优化后单机推理性能会有20%~200%的提升
性能比较
测试数据,Tnews 数据 x2000倍:
| Mean length | Max length | Count |
|---|---|---|
| 22.11 | 145 | 100,000,000 (1亿) |
测试机器:Tesla T4-16GB
测试集群:Cluster AY88G
测试参数:Sequence Length=128, Batch Size=256
测试模型:BERT-base (TF saved model)
BLADE优化是否开启:否
| worker Count |
标准的tensorflow的分布式推理 | EasyTexMiner | Speedup | ||
|---|---|---|---|---|---|
| total time (min) | time/sample (ms) | total time (min) | time/sample (ms) | ||
| 50 | 302min | 0.18ms | 137 min | 0.08ms | 2.20x |
如何使用
EasyTransfer训练的TF模型
模型下面需要包含以下文件:
其中用户输入的label_mapping,按照-DenumerateValues的顺序进行如下排列,如新闻分类的label_mapping.json为,注意这里0,1 id要以int的形式:
{"教育": 0,"三农": 1,...,"动漫": 27}
定义相关的环境参数
export test_table=odps://${project_name}/tables/your_test_table_nameexport saved_model_dir=oss://path/to/your_model/export oss_bucket_name=your_bucket_nameexport role_arn=acs:ram::xxxexport host=your_host
然后按照如下进行调用
pai -name easytexminer-project algo_platform_dev-Dmode=predict-DinputTable=odps://${proj_name}/tables/your_test_table-DoutputTable=odps://${proj_name}/tables/your_test_table_out-DfirstSequence=content-DoutputSchema=predictions-DappendCols=example_id-DmodelName=text_classify_bert-DcheckpointPath=${saved_model_dir}-DbatchSize=32-DworkerCount=10-Dbuckets="oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}"
EasyTexMiner训练的PyTorch模型
- 导出Tensorflow SavedModel(可选)
PAI团队对TF做了很多优化,因此把pytorch模型导出成TF SavedModel会加快推理速度,用户可以根据自身情况选择性导出
pai -name easy_transfer_app-Dmode=export-DexportType=app_model-DcheckpointPath=${saved_model_dir}/model.ckpt-DexportDirBase=${saved_model_dir}-Dbuckets="oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}"
- 弹性分布式预测
pai -name easytexminer_2104-project algo_platform_dev-Dmode=predict-DinputTable=odps://${proj_name}/tables/your_test_table-DoutputTable=odps://${proj_name}/tables/your_test_table_out-DfirstSequence=content-DoutputSchema=predictions-DappendCols=example_id-DmodelName=text_classify_bert-DcheckpointPath=${saved_model_dir}-DbatchSize=32-DworkerCount=10-Dbuckets="oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}"
自定义的Tensorflow saved model
假如我们有这样一个TF saved model:
用户会有自己训练好的模型,放在一个 oss 目录中,在这个例子中,模型位置在
我们可以用tensorflow官方的脚本观察这个模型的输入输出为:
这是一个标准的BERT分类模型,输入为 [-1, seq_len] 的三个向量 input_ids,input_mask,segment_ids
输出为16分类的 predictions, logits, probabilities
新建一个 main.py,写入以下代码:
import jsonimport osimport uuidfrom easytexminer.core.predictor import Predictor, TFModelPredictor, PredictorManagerfrom easytexminer.applications.tokenization import Tokenizerfrom easytexminer.data.bert_preprocessors import InputExample, bert_cls_convert_example_to_featurefrom easytexminer.utils import io, distributed_call_main, config, init_running_envsclass UserDefinedClassifyPredictor(Predictor):def __init__(self, model_dir, *args, **kwargs):super(UserDefinedClassifyPredictor, self).__init__(*args, **kwargs)self.bert_tokenizer = Tokenizer(backend="bert", vocab_file=os.path.join(model_dir, "vocab.txt"))self.model_predictor = TFModelPredictor(saved_model_path=model_dir,input_keys=["input_ids", "input_mask", "segment_ids"],output_keys=["predictions", "probabilities", "logits"])self.label_path = os.path.join(model_dir, "label_mapping.json")with io.open(self.label_path) as f:self.label_mapping = json.load(f)self.label_id_to_name = {idx: name for name, idx in self.label_mapping.items()}self.first_sequence = kwargs.pop("first_sequence", "first_sequence")self.second_sequence = kwargs.pop("second_sequence", "second_sequence")self.sequence_length = kwargs.pop("sequence_length", 128)def preprocess(self, in_data):"""Args:in_data (`list`): a list of dict containing raw textReturns:rst (`dict`): a dict of batched features"""if not isinstance(in_data, list):in_data = [in_data]rst = {"input_ids": [],"input_mask": [],"segment_ids": []}for record in in_data:text_a = record[self.first_sequence]text_b = record.get(self.second_sequence, None)example = InputExample(text_a=text_a, text_b=text_b, label=None)feature = bert_cls_convert_example_to_feature(example,self.bert_tokenizer,self.sequence_length)rst["input_ids"].append(feature.input_ids)rst["input_mask"].append(feature.input_mask)rst["segment_ids"].append(feature.segment_ids)return rstdef predict(self, in_data):return self.model_predictor.predict(in_data)def postprocess(self, result):"""Args:result (`dict`): A dict of returned tensors (np.array)Returns:new_results (`list`): a list of user-defined return dict (a row)"""preds = result["predictions"]probs = result["probabilities"]logits = result["logits"]new_results = list()for b, prob in enumerate(probs):new_results.append({"predictions": preds[b],"probabilities": ",".join([str(t) for t in probs[b]]),"logits": ",".join([str(t) for t in logits[b]])})return new_resultsdef main_fn(gpu, cfg, *args, **kwargs):# Prepare seed / logging / gpu environmentinit_running_envs(gpu, cfg)predictor = UserDefinedClassifyPredictor(model_dir=cfg.checkpoint_dir,first_sequence=cfg.first_sequence,second_sequence=cfg.second_sequence,sequence_length=cfg.sequence_length)predictor_manager = PredictorManager(predictor=predictor,input_file=cfg.tables,output_file=cfg.outputs,input_schema=cfg.input_schema,output_schema=cfg.output_schema,append_cols=cfg.append_cols,batch_size=cfg.batch_size)predictor_manager.run()
调用PAI命令:
export config_file=./xxx.ini
export proj=xxx
export role_arn=xxx
export oss_bucket_name="xxx"
export host=cn-xxx.oss.aliyuncs.com
export model_dir=oss://path/to/your_model
if [ ! -f ./dev.tsv ]; then
wget http://atp-modelzoo-sh.oss-cn-shanghai.aliyuncs.com/easytexminer/tutorials/distillation/classification/dev.tsv
fi
odpscmd -e --config=${config_file} "DROP TABLE IF EXISTS easytexminer_mrpc_dev;"
odpscmd -e --config=${config_file} "CREATE TABLE easytexminer_mrpc_dev(
quality STRING, id1 STRING, id2 STRING, sent1 STRING, sent2 STRING);"
odpscmd -e --config=${config_file} "tunnel upload dev.tsv easytexminer_mrpc_dev -fd \t;"
odpscmd -e --config=${config_file} "DROP TABLE IF EXISTS easytexminer_mrpc_dev_pred"
tar -zcvf user_defined_prediction.tar.gz main.py
ossutil cp -f user_defined_prediction.tar.gz ${model_dir}/
rm -f user_defined_prediction.tar.gz
command="
pai -name easytexminer
-project algo_platform_dev
-Dscript=${model_dir}/user_defined_prediction.tar.gz
-DentryFile=main.py
-Dmode=predict
-DinputTable=odps://${proj}/tables/easytexminer_mrpc_dev
-DoutputTable=odps://${proj}/tables/easytexminer_mrpc_dev_pred
-DoutputSchema=predictions,probabilities,logits
-DappendCols=quality
-DfirstSequence=sent1
-DsecondSequence=sent2
-DinputSchema=quality:str:1,id1:str1:id2:str:1,sent1:str:1,sent2:str:1
-DappendCols=''
-DcheckpointPath=${model_dir}/
-DworkerCount=1
-Dbuckets='oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}'
"
echo "${command}"
odpscmd -e --config=${config_file} "${command}"
echo "finish..."
自定义PyTorch模型/其他预处理操作
根据上一节,通过 preprocess/predict/postprocess进行对python随意的组合,只要换掉TF预测逻辑变成pytorch预测逻辑,甚至直接用jieba分词都可以!下面是一个jieba关键词提取的predictor示例
import jieba.analyse
from easytexminer.core.predictor import Predictor
class UserDefeinedKeywordsPredictor(Predictor):
def __init__(self, *args, **kwargs):
super(UserDefeinedKeywordsPredictor, self).__init__()
def preprocess(self, in_data):
if isinstance(in_data, dict):
in_data = [in_data]
rst = {
"sentence": []
}
for record in in_data:
sent = record["sentence"].replace(',', '').replace('。', '').replace('?', '').replace('!', '') \
.replace('“', '').replace('”', '').replace(':', '').replace('…', '').replace('(', '').replace(')', '') \
.replace('—', '').replace('《', '').replace('》', '').replace('、', '').replace('‘', '') \
.replace('’', '').replace(',', '').replace('【', '').replace('】', '').replace('"', '') \
.replace('#', '').replace('...', '').replace('?', '').replace('『', '')
rst["sentence"].append(sent)
return rst
def predict(self, in_data):
keywords_list = list()
for sent in in_data["sentence"]:
keywords_list.append(list(jieba.analyse.extract_tags(sent, topK=5)))
return {
"keywords": keywords_list
}
def postprocess(self, result):
return [{"keywords": " ".join(kw)} for kw in result["keywords"]]
