EasyTexMiner提供弹性分布式预测,可以加载EasyTransfer/EasyTexMiner/Google开源BERT训练出来的模型在PAI上进行离线分布式预测,相比标准的tensorflow的分布式推理有以下几个特征:

  1. 优化了PAI Studio上离线分布式推理速度,相比之前版本,视集群资源情况基本能提升1.5~3倍左右
  2. PAI DSW上本地推理加速1.1x倍,相比之前GPU几乎都能打满
  3. 支持了超卖等弹性资源调度,和FailOver的处理,一个worker挂掉会重启,不影响总体任务
  4. 有效地解决了推理时快慢机的现象
  5. 支持Balde对模型进行优化,优化后单机推理性能会有20%~200%的提升

    性能比较

    测试数据,Tnews 数据 x2000倍:
Mean length Max length Count
22.11 145 100,000,000 (1亿)

测试机器:Tesla T4-16GB
测试集群:Cluster AY88G
测试参数:Sequence Length=128, Batch Size=256
测试模型:BERT-base (TF saved model)
BLADE优化是否开启:否

worker
Count
标准的tensorflow的分布式推理 EasyTexMiner Speedup
total time (min) time/sample (ms) total time (min) time/sample (ms)
50 302min 0.18ms 137 min 0.08ms 2.20x

在DSW上,推理时的GPU利用率也能打满
弹性分布式预测/Blade推理优化(含自定义) - 图1

如何使用

EasyTransfer训练的TF模型

模型下面需要包含以下文件:
弹性分布式预测/Blade推理优化(含自定义) - 图2
其中用户输入的label_mapping,按照-DenumerateValues的顺序进行如下排列,如新闻分类的label_mapping.json为,注意这里0,1 id要以int的形式:

  1. {"教育": 0,
  2. "三农": 1,
  3. ...,
  4. "动漫": 27}

定义相关的环境参数

  1. export test_table=odps://${project_name}/tables/your_test_table_name
  2. export saved_model_dir=oss://path/to/your_model/
  3. export oss_bucket_name=your_bucket_name
  4. export role_arn=acs:ram::xxx
  5. export host=your_host

然后按照如下进行调用

  1. pai -name easytexminer
  2. -project algo_platform_dev
  3. -Dmode=predict
  4. -DinputTable=odps://${proj_name}/tables/your_test_table
  5. -DoutputTable=odps://${proj_name}/tables/your_test_table_out
  6. -DfirstSequence=content
  7. -DoutputSchema=predictions
  8. -DappendCols=example_id
  9. -DmodelName=text_classify_bert
  10. -DcheckpointPath=${saved_model_dir}
  11. -DbatchSize=32
  12. -DworkerCount=10
  13. -Dbuckets="oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}"

EasyTexMiner训练的PyTorch模型

  • 导出Tensorflow SavedModel(可选)

PAI团队对TF做了很多优化,因此把pytorch模型导出成TF SavedModel会加快推理速度,用户可以根据自身情况选择性导出

  1. pai -name easy_transfer_app
  2. -Dmode=export
  3. -DexportType=app_model
  4. -DcheckpointPath=${saved_model_dir}/model.ckpt
  5. -DexportDirBase=${saved_model_dir}
  6. -Dbuckets="oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}"
  • 弹性分布式预测
    1. pai -name easytexminer_2104
    2. -project algo_platform_dev
    3. -Dmode=predict
    4. -DinputTable=odps://${proj_name}/tables/your_test_table
    5. -DoutputTable=odps://${proj_name}/tables/your_test_table_out
    6. -DfirstSequence=content
    7. -DoutputSchema=predictions
    8. -DappendCols=example_id
    9. -DmodelName=text_classify_bert
    10. -DcheckpointPath=${saved_model_dir}
    11. -DbatchSize=32
    12. -DworkerCount=10
    13. -Dbuckets="oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}"

自定义的Tensorflow saved model

假如我们有这样一个TF saved model:
用户会有自己训练好的模型,放在一个 oss 目录中,在这个例子中,模型位置在
弹性分布式预测/Blade推理优化(含自定义) - 图3
我们可以用tensorflow官方的脚本观察这个模型的输入输出为:
image.png
这是一个标准的BERT分类模型,输入为 [-1, seq_len] 的三个向量 input_ids,input_mask,segment_ids
输出为16分类的 predictions, logits, probabilities

新建一个 main.py,写入以下代码:

  1. import json
  2. import os
  3. import uuid
  4. from easytexminer.core.predictor import Predictor, TFModelPredictor, PredictorManager
  5. from easytexminer.applications.tokenization import Tokenizer
  6. from easytexminer.data.bert_preprocessors import InputExample, bert_cls_convert_example_to_feature
  7. from easytexminer.utils import io, distributed_call_main, config, init_running_envs
  8. class UserDefinedClassifyPredictor(Predictor):
  9. def __init__(self, model_dir, *args, **kwargs):
  10. super(UserDefinedClassifyPredictor, self).__init__(*args, **kwargs)
  11. self.bert_tokenizer = Tokenizer(backend="bert", vocab_file=os.path.join(model_dir, "vocab.txt"))
  12. self.model_predictor = TFModelPredictor(saved_model_path=model_dir,
  13. input_keys=["input_ids", "input_mask", "segment_ids"],
  14. output_keys=["predictions", "probabilities", "logits"])
  15. self.label_path = os.path.join(model_dir, "label_mapping.json")
  16. with io.open(self.label_path) as f:
  17. self.label_mapping = json.load(f)
  18. self.label_id_to_name = {idx: name for name, idx in self.label_mapping.items()}
  19. self.first_sequence = kwargs.pop("first_sequence", "first_sequence")
  20. self.second_sequence = kwargs.pop("second_sequence", "second_sequence")
  21. self.sequence_length = kwargs.pop("sequence_length", 128)
  22. def preprocess(self, in_data):
  23. """
  24. Args:
  25. in_data (`list`): a list of dict containing raw text
  26. Returns:
  27. rst (`dict`): a dict of batched features
  28. """
  29. if not isinstance(in_data, list):
  30. in_data = [in_data]
  31. rst = {
  32. "input_ids": [],
  33. "input_mask": [],
  34. "segment_ids": []
  35. }
  36. for record in in_data:
  37. text_a = record[self.first_sequence]
  38. text_b = record.get(self.second_sequence, None)
  39. example = InputExample(text_a=text_a, text_b=text_b, label=None)
  40. feature = bert_cls_convert_example_to_feature(example,
  41. self.bert_tokenizer,
  42. self.sequence_length)
  43. rst["input_ids"].append(feature.input_ids)
  44. rst["input_mask"].append(feature.input_mask)
  45. rst["segment_ids"].append(feature.segment_ids)
  46. return rst
  47. def predict(self, in_data):
  48. return self.model_predictor.predict(in_data)
  49. def postprocess(self, result):
  50. """
  51. Args:
  52. result (`dict`): A dict of returned tensors (np.array)
  53. Returns:
  54. new_results (`list`): a list of user-defined return dict (a row)
  55. """
  56. preds = result["predictions"]
  57. probs = result["probabilities"]
  58. logits = result["logits"]
  59. new_results = list()
  60. for b, prob in enumerate(probs):
  61. new_results.append({
  62. "predictions": preds[b],
  63. "probabilities": ",".join([str(t) for t in probs[b]]),
  64. "logits": ",".join([str(t) for t in logits[b]])
  65. })
  66. return new_results
  67. def main_fn(gpu, cfg, *args, **kwargs):
  68. # Prepare seed / logging / gpu environment
  69. init_running_envs(gpu, cfg)
  70. predictor = UserDefinedClassifyPredictor(
  71. model_dir=cfg.checkpoint_dir,
  72. first_sequence=cfg.first_sequence,
  73. second_sequence=cfg.second_sequence,
  74. sequence_length=cfg.sequence_length)
  75. predictor_manager = PredictorManager(
  76. predictor=predictor,
  77. input_file=cfg.tables,
  78. output_file=cfg.outputs,
  79. input_schema=cfg.input_schema,
  80. output_schema=cfg.output_schema,
  81. append_cols=cfg.append_cols,
  82. batch_size=cfg.batch_size
  83. )
  84. predictor_manager.run()

调用PAI命令:

export config_file=./xxx.ini
export proj=xxx
export role_arn=xxx
export oss_bucket_name="xxx"
export host=cn-xxx.oss.aliyuncs.com
export model_dir=oss://path/to/your_model

if [ ! -f ./dev.tsv ]; then
  wget http://atp-modelzoo-sh.oss-cn-shanghai.aliyuncs.com/easytexminer/tutorials/distillation/classification/dev.tsv
fi

odpscmd -e --config=${config_file} "DROP TABLE IF EXISTS easytexminer_mrpc_dev;"
odpscmd -e --config=${config_file} "CREATE TABLE easytexminer_mrpc_dev(
  quality STRING, id1 STRING, id2 STRING, sent1 STRING, sent2 STRING);"
odpscmd -e --config=${config_file} "tunnel upload dev.tsv easytexminer_mrpc_dev -fd \t;"


odpscmd -e --config=${config_file} "DROP TABLE IF EXISTS easytexminer_mrpc_dev_pred"

tar -zcvf user_defined_prediction.tar.gz main.py
ossutil cp -f user_defined_prediction.tar.gz ${model_dir}/
rm -f user_defined_prediction.tar.gz

command="
pai -name easytexminer
 -project algo_platform_dev
 -Dscript=${model_dir}/user_defined_prediction.tar.gz
 -DentryFile=main.py
 -Dmode=predict
 -DinputTable=odps://${proj}/tables/easytexminer_mrpc_dev
 -DoutputTable=odps://${proj}/tables/easytexminer_mrpc_dev_pred
 -DoutputSchema=predictions,probabilities,logits
 -DappendCols=quality
 -DfirstSequence=sent1
 -DsecondSequence=sent2
 -DinputSchema=quality:str:1,id1:str1:id2:str:1,sent1:str:1,sent2:str:1
 -DappendCols=''
 -DcheckpointPath=${model_dir}/
 -DworkerCount=1
 -Dbuckets='oss://${oss_bucket_name}/?role_arn=${role_arn}&host=${host}'
"

echo "${command}"
odpscmd -e --config=${config_file} "${command}"
echo "finish..."

自定义PyTorch模型/其他预处理操作

根据上一节,通过 preprocess/predict/postprocess进行对python随意的组合,只要换掉TF预测逻辑变成pytorch预测逻辑,甚至直接用jieba分词都可以!下面是一个jieba关键词提取的predictor示例

import jieba.analyse
from easytexminer.core.predictor import Predictor

class UserDefeinedKeywordsPredictor(Predictor):
    def __init__(self, *args, **kwargs):
        super(UserDefeinedKeywordsPredictor, self).__init__()

    def preprocess(self, in_data):
        if isinstance(in_data, dict):
            in_data = [in_data]
        rst = {
            "sentence": []
        }

        for record in in_data:
            sent = record["sentence"].replace(',', '').replace('。', '').replace('?', '').replace('!', '') \
                .replace('“', '').replace('”', '').replace(':', '').replace('…', '').replace('(', '').replace(')', '') \
                .replace('—', '').replace('《', '').replace('》', '').replace('、', '').replace('‘', '') \
                .replace('’', '').replace(',', '').replace('【', '').replace('】', '').replace('"', '') \
                .replace('#', '').replace('...', '').replace('?', '').replace('『', '')
            rst["sentence"].append(sent)

        return rst

    def predict(self, in_data):
        keywords_list = list()
        for sent in in_data["sentence"]:
            keywords_list.append(list(jieba.analyse.extract_tags(sent, topK=5)))
        return {
            "keywords": keywords_list
        }

    def postprocess(self, result):
        return [{"keywords": " ".join(kw)} for kw in result["keywords"]]