tvm中,与前端ONNX相关的文件是tvm/python/tvm/relay/frontend/onnx.py
。ONNX包含许多算子,tvm需要对这些算子进行转换,转换成tvm自己的算子。目前,有些ONNX算子,tvm还不支持(没有实现相应的转换),例如:Loop
, NonMaxSuppression
等等,需要我们自己去添加这些算子的转换。
重要的类
OnnxOpConverter
类,这是一个ONNX 算子转换相关的基类,所有ONNX 算子转换器类都是该类的子类。Unary
类,该类是一元操作符的算子转换的基类。该类是OnnxOpConverter
的子类。Elemwise
类,该类是按元素的算子转换的基类。该类是OnnxOpConverter
的子类。
class OnnxOpConverter(object):
""" A helper class for holding onnx op converters.
"""
@classmethod
def get_converter(cls, opset):
""" Get converter matches given opset.
Parameters
----------
opset: int
opset from model.
Returns
-------
converter, which should be `_impl_vx`. Number x is the biggest
number smaller than or equal to opset belongs to all support versions.
"""
versions = [
int(d.replace('_impl_v', '')) for d in dir(cls) if '_impl_v' in d
]
versions = sorted(versions + [opset])
version = versions[
max([i for i, v in enumerate(versions) if v == opset]) - 1]
if hasattr(cls, '_impl_v{}'.format(version)):
return getattr(cls, '_impl_v{}'.format(version))
raise NotImplementedError(
'opset version {} of {} not implemented'.format(
version, cls.__name__))
class Unary(OnnxOpConverter):
""" A helper class for unary op converters.
"""
name = ''
@classmethod
def _impl_v1(cls, inputs, attr, params):
assert len(inputs) == 1, "Unary math op {} takes 1 input, {} given".format(
cls.name, len(inputs))
op_name = cls.name
return get_relay_op(op_name)(*inputs)
class Elemwise(OnnxOpConverter):
""" A helper class for elemwise op converters.
"""
name = ''
@classmethod
def _impl_v1(cls, inputs, attr, params):
assert len(inputs) == 2, "Math op {} take 2 inputs, {} given".format(
cls.name, len(inputs))
op_name = cls.name
conv_ops = ["conv2d", "conv2d_transpose"]
if attr.get('broadcast', 0) and any(x in str(inputs[0]) for x in conv_ops):
# TODO(zhreshold): remove hard coded infershape
axis = int(attr.get('axis', 0))
inputs[1] = _op.expand_dims(inputs[1], axis=axis, num_newaxis=2)
return get_relay_op(op_name)(*inputs)
那么,如何添加一个新的算子(实际是算子的转换)呢? 以Upsample
为例。
添加算子
1 新建一个与算子相关的类
一般的,该类的名字与要添加的算子的名字相同(不同也没关系,但是不要与其他类同名,保证不重名)。以Upsample
为例,新建类名为Upsample,继承OnnxOpConverter
类。
另外,还必须添加一个类函数,类名必须是”_impl_v
“,后面接一个数字x。数字x是小于或等于opset(算子集)所支持版本的最大数(x is the biggest number smaller than or equal to opset belongs to all support versions)。 该函数返回一个与算子名相关的可调用的relay函数(即用relay的函数实现该ONNX算子的功能)。
有关opset (https://github.com/onnx/onnx/blob/master/docs/VersionConverter.md):
ONNX提供了一个库,用于在不同的opset版本之间转换ONNX模型。主要动机是在不增强ONNX后端规范的情况下,提高ONNX模型的向后兼容性。这使后端开发人员可以为特定的opset版本提供支持,并允许用户将模型编写或导出到特定的opset版本,但可以在具有不同opset版本的环境中运行。在实现方面,该库利用了内存中的表示形式,该表示形式比原始protobuf结构以及为ONNX Optimizer开发的protobuf格式的转换器要方便得多。
您可能对调用所提供的特定算子的适配器或实现新的适配器(或两者)感兴趣。默认适配器只能在默认域中工作,但是可以根据相关的重大更改,将其通用化为跨域工作或使用新的转换方法。
class Upsample(OnnxOpConverter):
""" Operator converter for Upsample (nearest mode).
"""
@classmethod
def _impl_v9(cls, inputs, attr, params):
scales = attr.get('scales')
if not scales:
#Here we are going to higher OPSET version.
assert len(inputs) == 2, "Upsample op take 2 inputs, {} given".format(len(inputs))
scales = params[inputs[1].name_hint].asnumpy()
inputs = inputs[:1]
assert len(scales) == 4 and scales[0] == 1.0 and scales[1] == 1.0
mode = attr.get('mode')
if mode == b'nearest':
method = "nearest_neighbor"
elif mode == b'linear':
method = "bilinear"
else:
raise tvm.error.OpAttributeInvalid(
'Value {} in attribute "mode" of operator Upsample is not valid.'.format(mode))
attr = {'scale_h':scales[-2], 'scale_w':scales[-1], 'method':method,
'layout':'NCHW', 'align_corners':True}
return AttrCvt('upsampling')(inputs, attr)
2 添加算子转换
_get_convert_map
是一个算子转换映射函数,它定义了算子名称到转换函数的映射。我们需要在该函数中添加Upsample算子的转换。
该映射的key是ONNX的算子名称, value则是tvm中该算子的实现(可调用的函数)。
一个ONNX算子直接对应一个tvm算子(仅仅是算子名不一样的),直接使用Renamer即可。
一个ONNX算子由多个tvm 算子实现,需要采用前面第1步中的方法,新建类,然后调用该类的函数get_converter
来实现。
# _convert_map defines maps of name to converter functor(callable)
# for 1 to 1 mapping, use Renamer if nothing but name is different
# use AttrCvt if attributes need to be converted
# for 1 to N mapping(composed), use custom callable functions
# for N to 1 mapping, currently not supported(?)
def _get_convert_map(opset):
return {
# defs/experimental
'Identity': Renamer('copy'),
# 'Affine'
'ThresholdedRelu': ThresholdedRelu.get_converter(opset),
'ScaledTanh': ScaledTanh.get_converter(opset),
'ParametricSoftplus': ParametricSoftPlus.get_converter(opset),
'ConstantOfShape': ConstantOfShape.get_converter(opset),
# 'GivenTensorFill'
'FC': AttrCvt('dense', ignores=['axis', 'axis_w']),
'Scale': Scale.get_converter(opset),
# 'GRUUnit'
# 'ATen'
# 'ImageScaler'
# 'MeanVarianceNormalization'
# 'Crop'
# 'Embedding'
'Upsample': Upsample.get_converter(opset),
...
3 添加测试
在文件tests/python/frontend/onnx/test_forward.py
中添加相关测试代码:
def _test_upsample_nearest():
scale = 2
in_shape = (1, 1, 3, 3)
out_shape = (1, 1, 3*scale, 3*scale)
y = helper.make_node("Upsample", ['in'], [
'out'], mode='nearest', scales=[1.0, 1.0, 2.0, 2.0])
in_array = np.random.uniform(size=in_shape).astype(np.float32)
out_array = topi.testing.upsampling_python(
in_array, (scale, scale), "NCHW")
graph = helper.make_graph([y],
'upsample_nearest_test',
inputs=[helper.make_tensor_value_info(
"in", TensorProto.FLOAT, list(in_shape))],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, list(out_shape))])
model = helper.make_model(graph, producer_name='upsample_nearest_test')
for target, ctx in ctx_list():
tvm_out = get_tvm_output(
model, in_array, target, ctx, out_shape, 'float32')
tvm.testing.assert_allclose(out_array, tvm_out)
def _test_upsample_bilinear():
scale = 2
in_shape = (1, 1, 3, 3)
out_shape = (1, 1, 3*scale, 3*scale)
y = helper.make_node("Upsample", ['in'], [
'out'], mode='linear', scales=[1.0, 1.0, 2.0, 2.0])
in_array = np.random.uniform(size=in_shape).astype(np.float32)
out_array = topi.testing.bilinear_resize_python(
in_array, (3*scale, 3*scale), "NCHW")
graph = helper.make_graph([y],
'upsample_bilinear_test',
inputs=[helper.make_tensor_value_info(
"in", TensorProto.FLOAT, list(in_shape))],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, list(out_shape))])
model = helper.make_model(graph, producer_name='upsample_bilinear_test')
for target, ctx in ctx_list():
tvm_out = get_tvm_output(
model, in_array, target, ctx, out_shape, 'float32')
tvm.testing.assert_allclose(out_array, tvm_out, rtol=1e-5, atol=1e-5)
def _test_upsample_bilinear_opset9():
scale = 2
in_shape = (1, 1, 3, 3)
out_shape = (1, 1, 3*scale, 3*scale)
y = helper.make_node("Upsample", ['in', 'scales'], ['out'], mode='linear')
scales = [1.0, 1.0, 2.0, 2.0]
in_array = np.random.uniform(size=in_shape).astype(np.float32)
out_array = topi.testing.bilinear_resize_python(
in_array, (3*scale, 3*scale), "NCHW")
ref_array = np.array(scales)
ref_node = helper.make_node('Constant',
inputs=[],
outputs=['scales'],
value=onnx.helper.make_tensor(name='const_tensor',
data_type=TensorProto.FLOAT,
dims=ref_array.shape,
vals=ref_array.flatten().astype(float)))
graph = helper.make_graph([ref_node, y],
'upsample_bilinear_opset9_test',
inputs=[helper.make_tensor_value_info(
"in", TensorProto.FLOAT, list(in_shape))],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, list(out_shape))])
model = helper.make_model(
graph, producer_name='upsample_bilinear_opset9_test')
for target, ctx in ctx_list():
tvm_out = get_tvm_output(
model, in_array, target, ctx, out_shape, 'float32')
tvm.testing.assert_allclose(out_array, tvm_out, rtol=1e-5, atol=1e-5)
def test_upsample():
_test_upsample_nearest()
_test_upsample_bilinear()
_test_upsample_bilinear_opset9()
if __name__ == '__main__':
test_upsample()