trt_int8.pytrt_run.pytf_muti_run.pytf_one_run.py
quick start
https://docs.nvidia.com/deeplearning/tensorrt/quick-start-guide/index.html#precision
查看官网。 示例可参考 tensorRT 的 github 示例
本人的工作 有
1 tensorflow 的模型 ckpt 模式 转为 frezen_graph 模式
2 convert to uff 格式
3.1 tensorRT load uff格式的model, 进行 inference.
3.2 在3.1的基础上 调整 precision 为 int8
https://github.com/NVIDIA/TensorRT/issues/301
pycuda教程
TensorRT-基础(engine/context/buffer等)
https://zhuanlan.zhihu.com/p/336256668
trt_sample.py
trt_sample.py
# default_model_name: "resnet_v2_50"
import os
import sys
import pycuda.driver as cuda
# This import causes pycuda to automatically manage CUDA context creation and cleanup.
import pycuda.autoinit
import tensorflow as tf
import tensorrt as trt
from numpy.lib.npyio import load
from tensorflow.python.tools import freeze_graph
from trt_common import tf_image_to_trt_images, EntropyCalibrator
from trt_common import (convert_prob_to_dict, deserialize_cuda_engine,
do_inference, load_labels, serialize_cuda_engine,
to_calculate_value_softmax)
# below is private_module.
parentpath = '/yyy/xxx/'
path_list = [
parentpath, parentpath + 'lib', parentpath + 'engine',
parentpath + 'inference', parentpath + 'applications',
os.path.join(parentpath, "applications/kkk")
]
sys.path += path_list
import letrain
from nets import nets_factory
from preprocessing import image_preprocessing, preprocessing_factory
# above is private_module.
# tf.app.flags.DEFINE_string('model_name', '', '')
tf.app.flags.DEFINE_string('model_path', '', '')
tf.app.flags.DEFINE_string('output_path', '.', '')
tf.app.flags.DEFINE_string('trt_mode', 'float32', '')
tf.app.flags.DEFINE_string('image_path', '', '')
tf.app.flags.DEFINE_string('build_or_infer', "infer", '')
FLAGS = tf.app.flags.FLAGS
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
INT8_ENGINE = "trt_int8.engine"
FLOAT32_ENGINE = "trt_float32.engine"
# CALIBRATION_IMAGE = "/home/daiyi1/for_autoML/trt/trt_im/engine_dir/train_62.jpg" # single
CALIBRATION_IMAGE = "/home/daiyi1/for_autoML/trt/trt_im/images_list_dir/"
class Classification(object):
def _all_pre_load(self, model_path, model_name):
self.labels_dict = load_labels(model_path)
num_classes = len(self.labels_dict)
self.model_name = model_name
self.network_fn = nets_factory.get_network_fn(model_name,
num_classes,
is_training=False)
self.image_size = self.network_fn.default_image_size
def _pre_load_ckpt(self, model_path):
self.input_node_name = "input_node_1"
self.output_node_name = "output_node_1"
# 模型的checkpoint文件地址
# model_name = "resnet_v2_50"
if os.path.isdir(model_path):
self.ckpt_path = tf.train.latest_checkpoint(model_path)
self.model_path = model_path
def preprocess_file(self, filename, height, width):
"""Preprocess data from single image file
Args:
filename: image file name (full path)
height: expected file size
width: expected file size
Return:
image with expected height/width using corresponding nets model.
"""
class Record(object):
pass
# decode img
_data = Record()
_data.name = filename
file_contents = tf.read_file(_data.name)
record_bytes = tf.image.decode_image(file_contents,
channels=3,
name='image_decode')
_data.uint8image = record_bytes
_name = self.model_name
_fn = preprocessing_factory.get_preprocessing(_name, is_training=False)
image = get_image(_data, height, width, _fn)
return tf.reshape(image, [1, height, width, 3])
def preprocess_filelist(self, filenamelist, height, width, num_batch):
"""Preprocess data from image file list
Args:
filename_list: image file name list (full path)
height: expected file size
width: expected file size
num_batch: number of files in filename_list
Return:
images with expected height/width using corresponding nets model.
"""
class Record(object):
pass
_data_list = []
for i in range(num_batch):
filename = filenamelist[i]
result = Record()
result.name = filename
file_contents = tf.read_file(result.name)
record_bytes = tf.image.decode_image(file_contents,
channels=3,
name='image_decode')
result.uint8image = record_bytes
_data_list.append(result)
_name = self.model_name
_reshaped_img_list = []
_fn = preprocessing_factory.get_preprocessing(_name, is_training=False)
for _data in _data_list:
image = get_image(_data, height, width, _fn)
image = tf.reshape(image, [height, width, 3])
_reshaped_img_list.append(image)
return tf.stack(_reshaped_img_list)
def ckpt_to_freeze(self):
file_path = os.path.join(self.model_path, 'pb_model')
if not os.path.exists(file_path):
os.system('mkdir %s' % file_path)
tf.reset_default_graph()
# image_place 是你的输入变量
image_place = tf.compat.v1.placeholder(
tf.float32, [1, self.image_size, self.image_size, 3],
name=self.input_node_name)
# network_fn 是你自己定义的模型
logits, _ = self.network_fn(image_place)
tf.identity(logits, name=self.output_node_name)
freeze_pb_path = os.path.join(self.model_path, 'frozen_model.pb')
with tf.compat.v1.Session() as sess:
# 保存图,在./pb_model文件夹中生成model.pb文件
# model.pb文件将作为input_graph给到接下来的freeze_graph函数
tf.io.write_graph(sess.graph_def, file_path, 'model.pb')
# 把图和参数结构一起
freeze_graph.freeze_graph(input_graph=os.path.join(
file_path, 'model.pb'),
input_saver='',
input_binary=False,
input_checkpoint=self.ckpt_path,
output_node_names=self.output_node_name,
restore_op_name='save/restore_all',
filename_tensor_name='save/Const:0',
output_graph=freeze_pb_path,
clear_devices=True,
initializer_nodes='')
return freeze_pb_path
def freeze_to_uff(self, freeze_pb_path):
os.system("convert-to-uff %s" % freeze_pb_path)
full_file_path = freeze_pb_path.split(".")
file_path = full_file_path[0:-1]
file_path = file_path + ["uff"]
return ".".join(file_path)
def build_engine_by_uff(self, model_file, calib=None):
# For more information on TRT basics,
# refer to the introductory samples.
with trt.Builder(TRT_LOGGER) as builder, builder.create_network(
) as network, builder.create_builder_config() as config, trt.UffParser(
) as parser:
config.max_workspace_size = GiB(1)
if calib:
config.set_flag(trt.BuilderFlag.INT8)
config.int8_calibrator = calib
# Parse the Uff Network
parser.register_input(self.input_node_name,
(3, self.image_size, self.image_size))
parser.register_output(self.output_node_name)
parser.parse(model_file, network)
# Build and return an engine.
engine = builder.build_engine(network, config)
return engine
def ckpt_to_plan_file(self, mode, output_path):
uff_file_path = self.freeze_to_uff(self.ckpt_to_freeze())
if mode == "float32":
engine = self.build_engine_by_uff(uff_file_path)
serialize_cuda_engine(os.path.join(output_path, FLOAT32_ENGINE),
engine)
elif mode == "int8":
self._process_images(CALIBRATION_IMAGE)
calib_file_sample = [i['pre_value'] for i in self.image_dict_list]
calibration_cache = "calibration.cache"
calib = EntropyCalibrator(calib_file_sample, calibration_cache)
engine = self.build_engine_by_uff(uff_file_path, calib)
serialize_cuda_engine(os.path.join(output_path, INT8_ENGINE),
engine)
def _process_images(self, image_file):
self.image_dict_list = []
if os.path.isfile(image_file):
image = self.preprocess_file(image_file, self.image_size,
self.image_size)
tf_image_list = [{'file_path': image_file, 'pre_value': image}]
trt_image_list = tf_image_to_trt_images(tf_image_list,
is_single=True)
elif os.path.isdir(image_file):
tf_image_list = [{
'file_path': os.path.join(image_file, file)
} for file in os.listdir(image_file)]
images_list_value = self.preprocess_filelist(
[file['file_path'] for file in tf_image_list], self.image_size,
self.image_size, len(tf_image_list))
for idx in range(len(tf_image_list)):
tf_image_list[idx]['pre_value'] = images_list_value[idx]
trt_image_list = tf_image_to_trt_images(tf_image_list,
is_single=False)
self.image_dict_list = trt_image_list
def inference(self, plan_file):
self.engine = deserialize_cuda_engine(plan_file)
image_dict_list = do_inference(self.image_dict_list, self.engine)
all_prob = to_calculate_value_softmax(image_dict_list)
res = convert_prob_to_dict(image_dict_list, all_prob, self.labels_dict)
print("############ res is : ", res)
return res
def from_ckpt_to_plan_all(obj, model_path, trt_mode):
output_path = FLAGS.output_path.strip()
obj._pre_load_ckpt(model_path)
obj.ckpt_to_plan_file(trt_mode, output_path)
def run_plan_all(obj, model_path, trt_mode):
image_path = FLAGS.image_path.strip()
if trt_mode.lower() == "float32":
plan_file = os.path.join(model_path, FLOAT32_ENGINE)
elif trt_mode.lower() == "int8":
plan_file = os.path.join(model_path, INT8_ENGINE)
obj._process_images(image_file=image_path)
obj.inference(plan_file=plan_file)
def get_image(_data, height, width, _fn):
if FLAGS.data_format == 'raw_image' and FLAGS.task == 'custom':
resize_side = int(height * 1.15)
image = image_preprocessing.preprocess_image(_data.uint8image,
height,
width,
resize_side=resize_side)
elif FLAGS.data_format == 'tfrecord' and FLAGS.task == 'custom':
raise Exception("ERROR custom data not support tfrecrd data")
else:
image = _fn(_data.uint8image, height, width)
return image
def GiB(val):
return val * 1 << 30
if __name__ == "__main__":
model_name = FLAGS.model_name.strip()
model_path = FLAGS.model_path.strip()
trt_mode = FLAGS.trt_mode.strip()
if trt_mode.lower() not in ["float32", "int8"]:
raise "ERROR, please confirm your 'trt_mode'. \
It only could be 'int8' or 'float32'"
obj = Classification(model_path, model_name)
obj._all_pre_load(model_path, model_name)
is_infer = FLAGS.build_or_infer.strip()
if is_infer.lower().startswith("build"):
from_ckpt_to_plan_all(obj, model_path, trt_mode)
elif is_infer.lower().startswith("infer"):
run_plan_all(obj, model_path, trt_mode)
else:
raise ValueError("please input which action you want. \
The options is to build engine or inference based on engine. \
so you can input 'build' or 'infer'.")
trt_common.py
from logging import raiseExceptions
import os
import sys
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
import six
import tensorflow as tf
parentpath = '/opt/letrain/'
path_list = [
parentpath, parentpath + 'lib', parentpath + 'engine',
parentpath + 'inference', parentpath + 'applications',
os.path.join(parentpath, "applications/frcnn")
]
sys.path += path_list
from nets import nets_factory
from preprocessing import image_preprocessing
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.WARN)
FLAGS = tf.app.flags.FLAGS
import tensorrt as trt
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def convert_prob_to_dict(img_list, all_val, labels_dict):
new_prob_val = []
for val in all_val:
if type(val) == list:
new_prob_val.append(val)
else:
new_prob_val.append(val.tolist())
prob_val = np.array(new_prob_val)
classification_data = []
if isinstance(prob_val, np.ndarray):
probs = []
labels = []
index = np.argmax(prob_val, axis=1)
for i, j in enumerate(index):
probs.append(prob_val[i][j])
labels.append(labels_dict[str(j)])
result = {
'label': labels_dict[str(j)],
'probability': round(float(prob_val[i][j]), 4)
}
classification_data.append(result)
_true, _false = [], []
for id, _ in enumerate(classification_data):
if img_list[id]['file_path'].split("/")[1].split(
"_")[0] == classification_data[id]['label']:
_true.append(classification_data[id]['probability'])
else:
_false.append(classification_data[id]['probability'])
result = {}
result['true_count'] = len(_true)
result['true_ave_value'] = sum(_true) / len(_true) if len(_true) else 0
result['false_count'] = len(_false)
result['false_ave_value'] = sum(_false) / len(_false) if len(_false) else 0
return ("## inference_result: ", result)
def do_inference(image_dict_list, engine):
inputs, outputs, bindings, stream = local_allocate_buffers(engine)
import copy
for image_file in image_dict_list:
context2 = engine.create_execution_context()
np.copyto(inputs[0].host, image_file['pre_value'])
[
cuda.memcpy_htod_async(inp.device, inp.host, stream)
for inp in inputs
]
# Run inference.
context2.execute_async(batch_size=1,
bindings=bindings,
stream_handle=stream.handle)
# Transfer predictions back from the GPU.
[
cuda.memcpy_dtoh_async(out.host, out.device, stream)
for out in outputs
]
# Synchronize the stream
stream.synchronize()
# Return only the host outputs.
[logits_result] = [out.host for out in outputs]
image_file['post_value'] = copy.deepcopy(logits_result)
del context2
return image_dict_list
def load_labels(model_path):
LABELS_FILE = os.path.join(model_path, 'labels.txt')
labels_dict = {}
with open(LABELS_FILE) as f:
for line in f:
key, value = line.rstrip('\n').split(':')
labels_dict[key] = value
return labels_dict
def tf_image_to_trt_images(img_list, is_single=True):
all_tr_images = []
for image_path in img_list:
before_value = image_path['pre_value']
if is_single:
after_value = tf.transpose(before_value, [0, 3, 1, 2])
else:
after_value = tf.transpose(before_value, [2, 0, 1])
print("##### after image_path['pre_value']: ", after_value)
all_tr_images.append(after_value)
all_tr_images = tf_init_run_value(all_tr_images)
for key, val in enumerate(img_list):
val['pre_value'] = np.expand_dims([all_tr_images[key]], axis=0).ravel()
return img_list
def tf_init_run_value(tf_palceholders):
gpu_options = tf.compat.v1.GPUOptions(per_process_gpu_memory_fraction=0.3)
sess = tf.compat.v1.Session(
config=tf.compat.v1.ConfigProto(gpu_options=gpu_options,
allow_soft_placement=True,
log_device_placement=False))
tf_values = sess.run(tf_palceholders)
return tf_values
def to_calculate_value_softmax(img_list):
with tf.Graph().as_default():
with tf.device('/device:GPU:0'):
all_prob = []
for image_file in img_list:
# Softmax简单的说就是把一个N*1的向量归一化为(0,1)之间的值
prob = tf.nn.softmax(image_file['post_value'])
all_prob.append(prob)
return tf_init_run_value(all_prob)
def local_allocate_buffers(engine):
inputs = []
outputs = []
bindings = []
stream = cuda.Stream()
for binding in engine:
size = trt.volume(engine.get_binding_shape(binding)) * 1
dtype = trt.nptype(engine.get_binding_dtype(binding))
# Allocate host and device buffers
host_mem = cuda.pagelocked_empty(size, dtype)
# 分配内存
device_mem = cuda.mem_alloc(host_mem.nbytes)
# Append the device buffer to device bindings.
bindings.append(int(device_mem))
# Append to the appropriate list.
if engine.binding_is_input(binding):
inputs.append(HostDeviceMem(host_mem, device_mem))
else:
outputs.append(HostDeviceMem(host_mem, device_mem))
return inputs, outputs, bindings, stream
# Simple helper data class that's a little nicer to use than a 2-tuple.
class HostDeviceMem(object):
def __init__(self, host_mem, device_mem):
self.host = host_mem
self.device = device_mem
def __str__(self):
return "Host:\n" + str(self.host) + "\nDevice:\n" + str(self.device)
def __repr__(self):
return self.__str__()
class EntropyCalibrator(trt.IInt8EntropyCalibrator2):
def __init__(self, training_data, cache_file, batch_size=1):
# Whenever you specify a custom constructor for a TensorRT class,
# you MUST call the constructor of the parent explicitly.
trt.IInt8EntropyCalibrator2.__init__(self)
self.cache_file = cache_file
# Every time get_batch is called, the next batch of size batch_size will be copied to the device and returned.
self.data = training_data
self.batch_size = batch_size
self.current_index = 0
# Allocate enough memory for a whole batch.
self.device_input = cuda.mem_alloc(self.data[0].nbytes * self.batch_size)
def get_batch_size(self):
return self.batch_size
# TensorRT passes along the names of the engine bindings to the get_batch function.
# You don't necessarily have to use them, but they can be useful to understand the order of
# the inputs. The bindings list is expected to have the same ordering as 'names'.
def get_batch(self, names):
# if self.current_index + self.batch_size > self.data.shape[0]:
if self.current_index + self.batch_size > len(self.data):
return None
# The most important thing is 'return None', this is a singal to terminate.
# because this function will be called repeatedly.
# if not None vluae, it will never stop.
current_batch = int(self.current_index / self.batch_size)
if current_batch % 10 == 0:
print("Calibrating batch {:}, containing {:} images".format(
current_batch, self.batch_size))
# batch = self.data[self.current_index:self.current_index + self.batch_size].ravel()
batch = self.data[self.current_index:self.current_index + self.batch_size]
cuda.memcpy_htod(self.device_input, batch)
self.current_index += self.batch_size
return [self.device_input]
def read_calibration_cache(self):
# If there is a cache, use it instead of calibrating again. Otherwise, implicitly return None.
if os.path.exists(self.cache_file):
with open(self.cache_file, "rb") as f:
return f.read()
def write_calibration_cache(self, cache):
with open(self.cache_file, "wb") as f:
f.write(cache)
def serialize_cuda_engine(plan_file, engine):
with open(plan_file, "wb") as f:
f.write(engine.serialize())
return plan_file
def deserialize_cuda_engine(plan_file):
trt_runtime = trt.Runtime(TRT_LOGGER)
with open(plan_file, "rb") as f:
engine = trt_runtime.deserialize_cuda_engine(f.read())
return engine
启动app
deploy_app_service.py
import tensorflow as tf
from factory import create_app
if __name__ == 'deploy_app_service':
gun_args = ['workers', 'bind', 'pythonpath', 'access-logfile', 'error-logfile', 'env', 'timeout']
for param in gun_args:
tf.app.flags.DEFINE_string(param, '', '')
inf_app = create_app()
if __name__ == '__main__':
tf.app.flags.DEFINE_enum('scenario', 'classification', ['classification', 'objectdetection', 'segmentation'],"Scenario for this service")
tf.app.flags.DEFINE_string('url', '/api/service', 'URL used to access the deployed service')
tf.app.flags.DEFINE_string('deploy_path', '', "deploy service path")
tf.app.flags.DEFINE_string('lico_model_path', '', "model path, for inference job")
tf.app.flags.DEFINE_string('lico_image_file', '', "image file to deal, for inference job")
tf.app.flags.DEFINE_string('lico_output_dir', '', "output dir, for inference job")
tf.app.flags.DEFINE_string('model_name', '', "model_name of the deployed service")
tf.app.flags.DEFINE_string('trt_mode', 'float32', "trt_mode of the deployed service")
tf.app.flags.DEFINE_integer('port', 61234, 'HTTP server port for the deployed service')
tf.app.flags.DEFINE_string('service_uuid', '', 'Deployed service which would be add to URI')
FLAGS = tf.app.flags.FLAGS
from os.path import dirname
from subprocess import check_call
cmd = [
'gunicorn',
'--workers',
'1',
'--bind',
'0.0.0.0:{0}'.format(FLAGS.port),
'--pythonpath',
dirname(__file__),
'--access-logfile',
'-',
'--error-logfile',
'-',
'--timeout',
'600',
'--env',
'TEST_SCENARIO={0}'.format(FLAGS.scenario),
'--env',
'TEST_MODEL_PATH={0}'.format(FLAGS.deploy_path),
'--env',
'TEST_MODEL_NAME={0}'.format(FLAGS.model_name),
'--env',
'TEST_TRT_MODE={0}'.format(FLAGS.trt_mode),
'--env',
'SERVICE_UUID={0}'.format(FLAGS.service_uuid),
'--env',
'SERVICE_URL={0}'.format(FLAGS.url),
'deploy_service:inf_app'
]
check_call(cmd)
factory.py
from resource import DeployServiceRunner
def create_app():
import os
import falcon
api = falcon.API()
api.req_options.strip_url_path_trailing_slash = True
scenario = os.environ.get('TEST_SCENARIO', '')
model_path = os.environ.get('TEST_MODEL_PATH', '')
model_name = os.environ.get('TEST_MODEL_NAME', '')
trt_mode = os.environ.get('TEST_TRT_MODE', '')
service_uuid = os.environ.get('SERVICE_UUID', '')
service_url = os.environ.get('SERVICE_URL', '')
if service_uuid:
service_url = '/api/service/{0}'.format(service_uuid)
api.add_route(
service_url,
DeployServiceRunner(scenario, model_path, model_name, trt_mode)
)
return api
resource.py
import base64
import falcon
import os
import sys
import traceback
from falcon.media.validators import jsonschema
from tempfile import NamedTemporaryFile
def get_infe_serv(scenario):
print('Init Load Model')
if scenario == "classification":
from tensorrt_sample import Classification
inference_service_mgt = Classification()
return inference_service_mgt
class DeployServiceRunner(object):
def __init__(self, scenario, deploy_path, model_name, trt_mode):
self.deploy_path = deploy_path
self.model_path = os.path.join(deploy_path, 'model')
self.trt_mode = trt_mode
self.InferenceServiceMgt = get_infe_serv(scenario)
self.InferenceServiceMgt.load_model(self.model_path, model_name)
print('Load Model Success')
sys.stdout.flush()
@jsonschema.validate(
{
"type": "object",
"properties": {
"image": {"type": "string"},
"image_type": {
"type": "string",
"enum": ["BASE64"]
},
},
"required": ["image", "image_type"]
}
)
def on_post(self, req, resp):
try:
content = base64.b64decode(req.media.get('image'))
with NamedTemporaryFile(mode='wb') as buf:
buf.write(content)
buf.flush()
inf_result = self.InferenceServiceMgt.inference(buf.name)
except ValueError:
traceback.print_exc()
raise falcon.HTTPBadRequest(
description='The format of image is unsupported'
)
except TypeError:
traceback.print_exc()
raise falcon.HTTPBadRequest(
description='It is not an image'
)
except Exception:
traceback.print_exc()
raise falcon.HTTP_INTERNAL_SERVER_ERROR(
description='Internal error of inference server'
)
resp.media = inf_result