本文将介绍深度学习实验中计算模型的FLOPs, Params and Latency这3个参数的标准Python实现。
实现方法1:
@staticmethoddef _flops(h, w, C_in, C_out, kernel_size=3, stride=1, padding=None, dilation=1, groups=1, bias=False):layer = Conv(C_in, C_out, kernel_size, stride, padding, dilation, groups, bias, slimmable=False)flops, params = profile(layer, inputs=(torch.randn(1, C_in, h, w),), custom_ops=custom_ops)return flops@staticmethoddef _latency(h, w, C_in, C_out, kernel_size=3, stride=1, padding=None, dilation=1, groups=1, bias=False):layer = Conv(C_in, C_out, kernel_size, stride, padding, dilation, groups, bias, slimmable=False)latency = compute_latency(layer, (1, C_in, h, w))return latencydef forward_latency(self, size):c_in, h_in, w_in = sizeif self.slimmable:assert c_in == make_divisible(self.C_in * self.ratio[0]), "c_in %d, self.C_in * self.ratio[0] %d"%(c_in, self.C_in * self.ratio[0])c_out = make_divisible(self.C_out * self.ratio[1])else:assert c_in == self.C_in, "c_in %d, self.C_in %d"%(c_in, self.C_in)c_out = self.C_outif self.stride == 1:h_out = h_in; w_out = w_inelse:h_out = h_in // 2; w_out = w_in // 2name = "Conv_H%d_W%d_Cin%d_Cout%d_kernel%d_stride%d"%(h_in, w_in, c_in, c_out, self.kernel_size, self.stride)if name in latency_lookup_table:latency = latency_lookup_table[name]else:print("not found in latency_lookup_table:", name)latency = Conv._latency(h_in, w_in, c_in, c_out, self.kernel_size, self.stride, self.padding, self.dilation, self.groups, self.bias)latency_lookup_table[name] = latencynp.save(table_file_name, latency_lookup_table)return latency, (c_out, h_out, w_out)def forward_flops(self, size, quantize=False):c_in, h_in, w_in = sizeif self.slimmable:assert c_in == make_divisible(self.C_in * self.ratio[0]), "c_in %d, self.C_in * self.ratio[0] %d"%(c_in, self.C_in * self.ratio[0])c_out = make_divisible(self.C_out * self.ratio[1])else:assert c_in == self.C_in, "c_in %d, self.C_in %d"%(c_in, self.C_in)c_out = self.C_outif self.stride == 1:h_out = h_in; w_out = w_inelse:h_out = h_in // 2; w_out = w_in // 2name = "Conv_H%d_W%d_Cin%d_Cout%d_kernel%d_stride%d"%(h_in, w_in, c_in, c_out, self.kernel_size, self.stride)if name in flops_lookup_table:flops = flops_lookup_table[name]else:print("not found in flops_lookup_table:", name)flops = Conv._flops(h_in, w_in, c_in, c_out, self.kernel_size, self.stride, self.padding, self.dilation, self.groups, self.bias)flops_lookup_table[name] = flopsnp.save(table_file_name, flops_lookup_table)# if quantize:# flops /= 4return flops, (c_out, h_out, w_out)def count_custom(m, x, y):m.total_ops += 0custom_ops={QConv2d: count_convNd, QConvTranspose2d:count_convNd, QuantMeasure: count_custom, nn.InstanceNorm2d: count_custom}
FLOPs和Latency的计算分别来自forward_flops和forward_latency这2个函数的输出,这2个函数又分别调用_flops和_latency实现对应的功能。
_flops函数又调用了profile函数,返回一个layer的FLOPs和params。
_latency又调用了compute_latency函数,返回一个layer的latency。
- FLOPs和params:thop包计算
profile函数来自thop这个包,具体是:
from thop import profile
使用方法:
基本使用
from torchvision.models import resnet50from thop import profilemodel = resnet50()flops, params = profile(model, input_size=(1, 3, 224,224))
第三方模块和自定义计算
class YourModule(nn.Module):# your definitiondef count_your_model(model, x, y):# your rule hereflops, params = profile(model, input_size=(1, 3, 224,224),custom_ops={YourModule: count_your_model})
- Latency:
compute_latency函数来自compute_latency_ms_tensorrt函数或者compute_latency_ms_pytorch函数:
try:from utils.darts_utils import compute_latency_ms_tensorrt as compute_latencyprint("use TensorRT for latency test")except:from utils.darts_utils import compute_latency_ms_pytorch as compute_latencyprint("use PyTorch for latency test")
下面看下二者的具体实现:
try:import tensorrt as trtfrom PIL import Imageimport pycuda.driver as cudaimport pycuda.autoinitMAX_BATCH_SIZE = 1MAX_WORKSPACE_SIZE = 1 << 30TRT_LOGGER = trt.Logger(trt.Logger.WARNING)DTYPE = trt.float32# ModelINPUT_NAME = 'input'OUTPUT_NAME = 'output'def allocate_buffers(engine):h_input = cuda.pagelocked_empty(trt.volume(engine.get_binding_shape(0)), dtype=trt.nptype(DTYPE))h_output = cuda.pagelocked_empty(trt.volume(engine.get_binding_shape(1)), dtype=trt.nptype(DTYPE))d_input = cuda.mem_alloc(h_input.nbytes)d_output = cuda.mem_alloc(h_output.nbytes)return h_input, d_input, h_output, d_outputdef build_engine(model_file):with trt.Builder(TRT_LOGGER) as builder, builder.create_network() as network, trt.OnnxParser(network, TRT_LOGGER) as parser:builder.max_workspace_size = MAX_WORKSPACE_SIZEbuilder.max_batch_size = MAX_BATCH_SIZEwith open(model_file, 'rb') as model:parser.parse(model.read())return builder.build_cuda_engine(network)def load_input(input_size, host_buffer):assert len(input_size) == 4b, c, h, w = input_sizedtype = trt.nptype(DTYPE)img_array = np.random.randn(c, h, w).astype(dtype).ravel()np.copyto(host_buffer, img_array)def do_inference(context, h_input, d_input, h_output, d_output, iterations=None):# Transfer input data to the GPU.cuda.memcpy_htod(d_input, h_input)# warm-upfor _ in range(10):context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])# test proper iterationsif iterations is None:elapsed_time = 0iterations = 100while elapsed_time < 1:t_start = time.time()for _ in range(iterations):context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])elapsed_time = time.time() - t_startiterations *= 2FPS = iterations / elapsed_timeiterations = int(FPS * 3)# Run inference.t_start = time.time()for _ in tqdm(range(iterations)):context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])elapsed_time = time.time() - t_startlatency = elapsed_time / iterations * 1000return latencydef compute_latency_ms_tensorrt(model, input_size, iterations=None):model = model.cuda()model.eval()_, c, h, w = input_sizedummy_input = torch.randn(1, c, h, w, device='cuda')torch.onnx.export(model, dummy_input, "model.onnx", verbose=False, input_names=["input"], output_names=["output"])with build_engine("model.onnx") as engine:h_input, d_input, h_output, d_output = allocate_buffers(engine)load_input(input_size, h_input)with engine.create_execution_context() as context:latency = do_inference(context, h_input, d_input, h_output, d_output, iterations=iterations)# FPS = 1000 / latency (in ms)return latencyexcept:warnings.warn("TensorRT (or pycuda) is not installed. compute_latency_ms_tensorrt() cannot be used.")#########################################################################def compute_latency_ms_pytorch(model, input_size, iterations=None, device=None):torch.backends.cudnn.enabled = Truetorch.backends.cudnn.benchmark = Truemodel.eval()model = model.cuda()input = torch.randn(*input_size).cuda()with torch.no_grad():for _ in range(10):model(input)if iterations is None:elapsed_time = 0iterations = 100while elapsed_time < 1:torch.cuda.synchronize()torch.cuda.synchronize()t_start = time.time()for _ in range(iterations):model(input)torch.cuda.synchronize()torch.cuda.synchronize()elapsed_time = time.time() - t_startiterations *= 2FPS = iterations / elapsed_timeiterations = int(FPS * 6)print('=========Speed Testing=========')torch.cuda.synchronize()torch.cuda.synchronize()t_start = time.time()for _ in tqdm(range(iterations)):model(input)torch.cuda.synchronize()torch.cuda.synchronize()elapsed_time = time.time() - t_startlatency = elapsed_time / iterations * 1000torch.cuda.empty_cache()# FPS = 1000 / latency (in ms)return latency
实现方法2:
- macs和params:
可以看到,macs是通过profile_macs函数计算的。def profile(self, config=None, config_op=None, config_path=None, verbose=True, reference_macs = 0, reference_params = 0):netG = self.netGif isinstance(netG, nn.DataParallel):netG = netG.moduleif config is not None:netG.configs = configif config_op is not None:netG.config_op = config_opif config_path is not None:netG.config_path = config_pathwith torch.no_grad():macs = profile_macs(netG, (self.real_A[:1],))params = 0for p in netG.parameters():params += p.numel()if verbose:print(config_path)print('MACs: %.3fG\tParams: %.3fM' % (macs / 1e9 - reference_macs, params / 1e6 - reference_params), flush=True)return macs, params
params是通过netG.parameters().numel()函数计算的。
那么profile_macs函数是如何实现的?
答:是通过torchprofile这个包中的profile_macs函数计算的。
from torchprofile import profile_macs
这个包的github地址为:
用法也很简单:
import torchfrom torchvision.models import resnet18model = resnet18()inputs = torch.randn(1, 3, 224, 224)
之后就可以计算MACs using profile_macs:
from torchprofile import profile_macsmacs = profile_macs(model, inputs)
- Latency:
import sysimport timeimport warningsimport torchimport tqdmfrom torch.backends import cudnnfrom configs import decode_configfrom data import create_dataloaderfrom models import create_modelfrom options.test_options import TestOptionsdef check(opt):assert opt.serial_batchesassert opt.no_flipassert opt.load_size == opt.crop_sizeassert opt.preprocess == 'resize_and_crop'assert opt.batch_size == 1if not opt.no_fid:assert opt.real_stat_path is not Noneif opt.phase == 'train':warnings.warn('You are using training set for inference.')if __name__ == '__main__':cudnn.enabled = Trueopt = TestOptions().parse()print(' '.join(sys.argv))if opt.config_str is not None:assert 'super' in opt.netG or 'sub' in opt.netGconfig = decode_config(opt.config_str)else:assert 'super' not in opt.modelconfig = Nonedataloader = create_dataloader(opt)model = create_model(opt)model.setup(opt)for data in dataloader:model.set_input(data)break# Warm-up timesfor i in tqdm.trange(opt.times):model.test(config)if len(opt.gpu_ids) > 0:torch.cuda.synchronize()start_time = time.time()for i in tqdm.trange(opt.times):model.test(config)if len(opt.gpu_ids) > 0:torch.cuda.synchronize()cost_time = time.time() - start_timeprint('Cost Time: %.2fs\tLatency: %.4fs' % (cost_time, cost_time / opt.times))
对这2种方案做个对比:
ResNet 18:
1 torchprofile计算macs:
import torchfrom torchvision.models import resnet18model = resnet18()inputs = torch.randn(1, 3, 224, 224)from torchprofile import profile_macsmacs = profile_macs(model, inputs)print(macs)
结果:1814073856
2 thop计算flops:
from thop import profileimport torchfrom torchvision.models import resnet18model = resnet18()input=torch.randn(1,3,224,224)flops, params = profile(model, inputs=(input,))print(flops)
结果:1824545792.0
二者基本一致。
ResNet 50:
1 torchprofile计算macs:
import torchfrom torchvision.models import resnet50model = resnet50()inputs = torch.randn(1, 3, 224, 224)from torchprofile import profile_macsmacs = profile_macs(model, inputs)print(macs)
结果:4089186304
2 thop计算flops:
from thop import profileimport torchfrom torchvision.models import resnet50model = resnet50()input=torch.randn(1,3,224,224)flops, params = profile(model, inputs=(input,))print(flops)
结果:4135790592.0
二者基本一致。
结论:
这2种方案计算的模型运算量基本一致。
