本文将介绍深度学习实验中计算模型的FLOPs, Params and Latency这3个参数的标准Python实现。

实现方法1:

  1. @staticmethod
  2. def _flops(h, w, C_in, C_out, kernel_size=3, stride=1, padding=None, dilation=1, groups=1, bias=False):
  3. layer = Conv(C_in, C_out, kernel_size, stride, padding, dilation, groups, bias, slimmable=False)
  4. flops, params = profile(layer, inputs=(torch.randn(1, C_in, h, w),), custom_ops=custom_ops)
  5. return flops
  6. @staticmethod
  7. def _latency(h, w, C_in, C_out, kernel_size=3, stride=1, padding=None, dilation=1, groups=1, bias=False):
  8. layer = Conv(C_in, C_out, kernel_size, stride, padding, dilation, groups, bias, slimmable=False)
  9. latency = compute_latency(layer, (1, C_in, h, w))
  10. return latency
  11. def forward_latency(self, size):
  12. c_in, h_in, w_in = size
  13. if self.slimmable:
  14. assert c_in == make_divisible(self.C_in * self.ratio[0]), "c_in %d, self.C_in * self.ratio[0] %d"%(c_in, self.C_in * self.ratio[0])
  15. c_out = make_divisible(self.C_out * self.ratio[1])
  16. else:
  17. assert c_in == self.C_in, "c_in %d, self.C_in %d"%(c_in, self.C_in)
  18. c_out = self.C_out
  19. if self.stride == 1:
  20. h_out = h_in; w_out = w_in
  21. else:
  22. h_out = h_in // 2; w_out = w_in // 2
  23. name = "Conv_H%d_W%d_Cin%d_Cout%d_kernel%d_stride%d"%(h_in, w_in, c_in, c_out, self.kernel_size, self.stride)
  24. if name in latency_lookup_table:
  25. latency = latency_lookup_table[name]
  26. else:
  27. print("not found in latency_lookup_table:", name)
  28. latency = Conv._latency(h_in, w_in, c_in, c_out, self.kernel_size, self.stride, self.padding, self.dilation, self.groups, self.bias)
  29. latency_lookup_table[name] = latency
  30. np.save(table_file_name, latency_lookup_table)
  31. return latency, (c_out, h_out, w_out)
  32. def forward_flops(self, size, quantize=False):
  33. c_in, h_in, w_in = size
  34. if self.slimmable:
  35. assert c_in == make_divisible(self.C_in * self.ratio[0]), "c_in %d, self.C_in * self.ratio[0] %d"%(c_in, self.C_in * self.ratio[0])
  36. c_out = make_divisible(self.C_out * self.ratio[1])
  37. else:
  38. assert c_in == self.C_in, "c_in %d, self.C_in %d"%(c_in, self.C_in)
  39. c_out = self.C_out
  40. if self.stride == 1:
  41. h_out = h_in; w_out = w_in
  42. else:
  43. h_out = h_in // 2; w_out = w_in // 2
  44. name = "Conv_H%d_W%d_Cin%d_Cout%d_kernel%d_stride%d"%(h_in, w_in, c_in, c_out, self.kernel_size, self.stride)
  45. if name in flops_lookup_table:
  46. flops = flops_lookup_table[name]
  47. else:
  48. print("not found in flops_lookup_table:", name)
  49. flops = Conv._flops(h_in, w_in, c_in, c_out, self.kernel_size, self.stride, self.padding, self.dilation, self.groups, self.bias)
  50. flops_lookup_table[name] = flops
  51. np.save(table_file_name, flops_lookup_table)
  52. # if quantize:
  53. # flops /= 4
  54. return flops, (c_out, h_out, w_out)
  55. def count_custom(m, x, y):
  56. m.total_ops += 0
  57. custom_ops={QConv2d: count_convNd, QConvTranspose2d:count_convNd, QuantMeasure: count_custom, nn.InstanceNorm2d: count_custom}

FLOPsLatency的计算分别来自forward_flops和forward_latency这2个函数的输出,这2个函数又分别调用_flops和_latency实现对应的功能。
_flops函数又调用了profile函数,返回一个layer的FLOPsparams
_latency又调用了compute_latency函数,返回一个layer的latency

  • FLOPsparams:thop包计算

profile函数来自thop这个包,具体是:

  1. from thop import profile

使用方法:
基本使用

  1. from torchvision.models import resnet50
  2. from thop import profile
  3. model = resnet50()
  4. flops, params = profile(model, input_size=(1, 3, 224,224))

第三方模块和自定义计算

  1. class YourModule(nn.Module):
  2. # your definition
  3. def count_your_model(model, x, y):
  4. # your rule here
  5. flops, params = profile(model, input_size=(1, 3, 224,224),
  6. custom_ops={YourModule: count_your_model})
  • Latency:

compute_latency函数来自compute_latency_ms_tensorrt函数或者compute_latency_ms_pytorch函数:

  1. try:
  2. from utils.darts_utils import compute_latency_ms_tensorrt as compute_latency
  3. print("use TensorRT for latency test")
  4. except:
  5. from utils.darts_utils import compute_latency_ms_pytorch as compute_latency
  6. print("use PyTorch for latency test")

下面看下二者的具体实现:

  1. try:
  2. import tensorrt as trt
  3. from PIL import Image
  4. import pycuda.driver as cuda
  5. import pycuda.autoinit
  6. MAX_BATCH_SIZE = 1
  7. MAX_WORKSPACE_SIZE = 1 << 30
  8. TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
  9. DTYPE = trt.float32
  10. # Model
  11. INPUT_NAME = 'input'
  12. OUTPUT_NAME = 'output'
  13. def allocate_buffers(engine):
  14. h_input = cuda.pagelocked_empty(trt.volume(engine.get_binding_shape(0)), dtype=trt.nptype(DTYPE))
  15. h_output = cuda.pagelocked_empty(trt.volume(engine.get_binding_shape(1)), dtype=trt.nptype(DTYPE))
  16. d_input = cuda.mem_alloc(h_input.nbytes)
  17. d_output = cuda.mem_alloc(h_output.nbytes)
  18. return h_input, d_input, h_output, d_output
  19. def build_engine(model_file):
  20. with trt.Builder(TRT_LOGGER) as builder, builder.create_network() as network, trt.OnnxParser(network, TRT_LOGGER) as parser:
  21. builder.max_workspace_size = MAX_WORKSPACE_SIZE
  22. builder.max_batch_size = MAX_BATCH_SIZE
  23. with open(model_file, 'rb') as model:
  24. parser.parse(model.read())
  25. return builder.build_cuda_engine(network)
  26. def load_input(input_size, host_buffer):
  27. assert len(input_size) == 4
  28. b, c, h, w = input_size
  29. dtype = trt.nptype(DTYPE)
  30. img_array = np.random.randn(c, h, w).astype(dtype).ravel()
  31. np.copyto(host_buffer, img_array)
  32. def do_inference(context, h_input, d_input, h_output, d_output, iterations=None):
  33. # Transfer input data to the GPU.
  34. cuda.memcpy_htod(d_input, h_input)
  35. # warm-up
  36. for _ in range(10):
  37. context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])
  38. # test proper iterations
  39. if iterations is None:
  40. elapsed_time = 0
  41. iterations = 100
  42. while elapsed_time < 1:
  43. t_start = time.time()
  44. for _ in range(iterations):
  45. context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])
  46. elapsed_time = time.time() - t_start
  47. iterations *= 2
  48. FPS = iterations / elapsed_time
  49. iterations = int(FPS * 3)
  50. # Run inference.
  51. t_start = time.time()
  52. for _ in tqdm(range(iterations)):
  53. context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])
  54. elapsed_time = time.time() - t_start
  55. latency = elapsed_time / iterations * 1000
  56. return latency
  57. def compute_latency_ms_tensorrt(model, input_size, iterations=None):
  58. model = model.cuda()
  59. model.eval()
  60. _, c, h, w = input_size
  61. dummy_input = torch.randn(1, c, h, w, device='cuda')
  62. torch.onnx.export(model, dummy_input, "model.onnx", verbose=False, input_names=["input"], output_names=["output"])
  63. with build_engine("model.onnx") as engine:
  64. h_input, d_input, h_output, d_output = allocate_buffers(engine)
  65. load_input(input_size, h_input)
  66. with engine.create_execution_context() as context:
  67. latency = do_inference(context, h_input, d_input, h_output, d_output, iterations=iterations)
  68. # FPS = 1000 / latency (in ms)
  69. return latency
  70. except:
  71. warnings.warn("TensorRT (or pycuda) is not installed. compute_latency_ms_tensorrt() cannot be used.")
  72. #########################################################################
  73. def compute_latency_ms_pytorch(model, input_size, iterations=None, device=None):
  74. torch.backends.cudnn.enabled = True
  75. torch.backends.cudnn.benchmark = True
  76. model.eval()
  77. model = model.cuda()
  78. input = torch.randn(*input_size).cuda()
  79. with torch.no_grad():
  80. for _ in range(10):
  81. model(input)
  82. if iterations is None:
  83. elapsed_time = 0
  84. iterations = 100
  85. while elapsed_time < 1:
  86. torch.cuda.synchronize()
  87. torch.cuda.synchronize()
  88. t_start = time.time()
  89. for _ in range(iterations):
  90. model(input)
  91. torch.cuda.synchronize()
  92. torch.cuda.synchronize()
  93. elapsed_time = time.time() - t_start
  94. iterations *= 2
  95. FPS = iterations / elapsed_time
  96. iterations = int(FPS * 6)
  97. print('=========Speed Testing=========')
  98. torch.cuda.synchronize()
  99. torch.cuda.synchronize()
  100. t_start = time.time()
  101. for _ in tqdm(range(iterations)):
  102. model(input)
  103. torch.cuda.synchronize()
  104. torch.cuda.synchronize()
  105. elapsed_time = time.time() - t_start
  106. latency = elapsed_time / iterations * 1000
  107. torch.cuda.empty_cache()
  108. # FPS = 1000 / latency (in ms)
  109. return latency

实现方法2:

  • macs和params:
    1. def profile(self, config=None, config_op=None, config_path=None, verbose=True, reference_macs = 0, reference_params = 0):
    2. netG = self.netG
    3. if isinstance(netG, nn.DataParallel):
    4. netG = netG.module
    5. if config is not None:
    6. netG.configs = config
    7. if config_op is not None:
    8. netG.config_op = config_op
    9. if config_path is not None:
    10. netG.config_path = config_path
    11. with torch.no_grad():
    12. macs = profile_macs(netG, (self.real_A[:1],))
    13. params = 0
    14. for p in netG.parameters():
    15. params += p.numel()
    16. if verbose:
    17. print(config_path)
    18. print('MACs: %.3fG\tParams: %.3fM' % (macs / 1e9 - reference_macs, params / 1e6 - reference_params), flush=True)
    19. return macs, params
    可以看到,macs是通过profile_macs函数计算的。
    params是通过netG.parameters().numel()函数计算的。

那么profile_macs函数是如何实现的?
答:是通过torchprofile这个包中的profile_macs函数计算的。

  1. from torchprofile import profile_macs

这个包的github地址为:
用法也很简单:

  1. import torch
  2. from torchvision.models import resnet18
  3. model = resnet18()
  4. inputs = torch.randn(1, 3, 224, 224)

之后就可以计算MACs using profile_macs:

  1. from torchprofile import profile_macs
  2. macs = profile_macs(model, inputs)
  • Latency:
    1. import sys
    2. import time
    3. import warnings
    4. import torch
    5. import tqdm
    6. from torch.backends import cudnn
    7. from configs import decode_config
    8. from data import create_dataloader
    9. from models import create_model
    10. from options.test_options import TestOptions
    11. def check(opt):
    12. assert opt.serial_batches
    13. assert opt.no_flip
    14. assert opt.load_size == opt.crop_size
    15. assert opt.preprocess == 'resize_and_crop'
    16. assert opt.batch_size == 1
    17. if not opt.no_fid:
    18. assert opt.real_stat_path is not None
    19. if opt.phase == 'train':
    20. warnings.warn('You are using training set for inference.')
    21. if __name__ == '__main__':
    22. cudnn.enabled = True
    23. opt = TestOptions().parse()
    24. print(' '.join(sys.argv))
    25. if opt.config_str is not None:
    26. assert 'super' in opt.netG or 'sub' in opt.netG
    27. config = decode_config(opt.config_str)
    28. else:
    29. assert 'super' not in opt.model
    30. config = None
    31. dataloader = create_dataloader(opt)
    32. model = create_model(opt)
    33. model.setup(opt)
    34. for data in dataloader:
    35. model.set_input(data)
    36. break
    37. # Warm-up times
    38. for i in tqdm.trange(opt.times):
    39. model.test(config)
    40. if len(opt.gpu_ids) > 0:
    41. torch.cuda.synchronize()
    42. start_time = time.time()
    43. for i in tqdm.trange(opt.times):
    44. model.test(config)
    45. if len(opt.gpu_ids) > 0:
    46. torch.cuda.synchronize()
    47. cost_time = time.time() - start_time
    48. print('Cost Time: %.2fs\tLatency: %.4fs' % (cost_time, cost_time / opt.times))

对这2种方案做个对比:

ResNet 18:

  • 1 torchprofile计算macs:

    1. import torch
    2. from torchvision.models import resnet18
    3. model = resnet18()
    4. inputs = torch.randn(1, 3, 224, 224)
    5. from torchprofile import profile_macs
    6. macs = profile_macs(model, inputs)
    7. print(macs)

    结果:1814073856

  • 2 thop计算flops:

    1. from thop import profile
    2. import torch
    3. from torchvision.models import resnet18
    4. model = resnet18()
    5. input=torch.randn(1,3,224,224)
    6. flops, params = profile(model, inputs=(input,))
    7. print(flops)

    结果:1824545792.0
    二者基本一致。

ResNet 50:

  • 1 torchprofile计算macs:

    1. import torch
    2. from torchvision.models import resnet50
    3. model = resnet50()
    4. inputs = torch.randn(1, 3, 224, 224)
    5. from torchprofile import profile_macs
    6. macs = profile_macs(model, inputs)
    7. print(macs)

    结果:4089186304

  • 2 thop计算flops:

    1. from thop import profile
    2. import torch
    3. from torchvision.models import resnet50
    4. model = resnet50()
    5. input=torch.randn(1,3,224,224)
    6. flops, params = profile(model, inputs=(input,))
    7. print(flops)

    结果:4135790592.0
    二者基本一致。

结论:

这2种方案计算的模型运算量基本一致。