本文将介绍深度学习实验中计算模型的FLOPs, Params and Latency这3个参数的标准Python实现。
实现方法1:
@staticmethod
def _flops(h, w, C_in, C_out, kernel_size=3, stride=1, padding=None, dilation=1, groups=1, bias=False):
layer = Conv(C_in, C_out, kernel_size, stride, padding, dilation, groups, bias, slimmable=False)
flops, params = profile(layer, inputs=(torch.randn(1, C_in, h, w),), custom_ops=custom_ops)
return flops
@staticmethod
def _latency(h, w, C_in, C_out, kernel_size=3, stride=1, padding=None, dilation=1, groups=1, bias=False):
layer = Conv(C_in, C_out, kernel_size, stride, padding, dilation, groups, bias, slimmable=False)
latency = compute_latency(layer, (1, C_in, h, w))
return latency
def forward_latency(self, size):
c_in, h_in, w_in = size
if self.slimmable:
assert c_in == make_divisible(self.C_in * self.ratio[0]), "c_in %d, self.C_in * self.ratio[0] %d"%(c_in, self.C_in * self.ratio[0])
c_out = make_divisible(self.C_out * self.ratio[1])
else:
assert c_in == self.C_in, "c_in %d, self.C_in %d"%(c_in, self.C_in)
c_out = self.C_out
if self.stride == 1:
h_out = h_in; w_out = w_in
else:
h_out = h_in // 2; w_out = w_in // 2
name = "Conv_H%d_W%d_Cin%d_Cout%d_kernel%d_stride%d"%(h_in, w_in, c_in, c_out, self.kernel_size, self.stride)
if name in latency_lookup_table:
latency = latency_lookup_table[name]
else:
print("not found in latency_lookup_table:", name)
latency = Conv._latency(h_in, w_in, c_in, c_out, self.kernel_size, self.stride, self.padding, self.dilation, self.groups, self.bias)
latency_lookup_table[name] = latency
np.save(table_file_name, latency_lookup_table)
return latency, (c_out, h_out, w_out)
def forward_flops(self, size, quantize=False):
c_in, h_in, w_in = size
if self.slimmable:
assert c_in == make_divisible(self.C_in * self.ratio[0]), "c_in %d, self.C_in * self.ratio[0] %d"%(c_in, self.C_in * self.ratio[0])
c_out = make_divisible(self.C_out * self.ratio[1])
else:
assert c_in == self.C_in, "c_in %d, self.C_in %d"%(c_in, self.C_in)
c_out = self.C_out
if self.stride == 1:
h_out = h_in; w_out = w_in
else:
h_out = h_in // 2; w_out = w_in // 2
name = "Conv_H%d_W%d_Cin%d_Cout%d_kernel%d_stride%d"%(h_in, w_in, c_in, c_out, self.kernel_size, self.stride)
if name in flops_lookup_table:
flops = flops_lookup_table[name]
else:
print("not found in flops_lookup_table:", name)
flops = Conv._flops(h_in, w_in, c_in, c_out, self.kernel_size, self.stride, self.padding, self.dilation, self.groups, self.bias)
flops_lookup_table[name] = flops
np.save(table_file_name, flops_lookup_table)
# if quantize:
# flops /= 4
return flops, (c_out, h_out, w_out)
def count_custom(m, x, y):
m.total_ops += 0
custom_ops={QConv2d: count_convNd, QConvTranspose2d:count_convNd, QuantMeasure: count_custom, nn.InstanceNorm2d: count_custom}
FLOPs和Latency的计算分别来自forward_flops和forward_latency这2个函数的输出,这2个函数又分别调用_flops和_latency实现对应的功能。
_flops函数又调用了profile函数,返回一个layer的FLOPs和params。
_latency又调用了compute_latency函数,返回一个layer的latency。
- FLOPs和params:thop包计算
profile函数来自thop这个包,具体是:
from thop import profile
使用方法:
基本使用
from torchvision.models import resnet50
from thop import profile
model = resnet50()
flops, params = profile(model, input_size=(1, 3, 224,224))
第三方模块和自定义计算
class YourModule(nn.Module):
# your definition
def count_your_model(model, x, y):
# your rule here
flops, params = profile(model, input_size=(1, 3, 224,224),
custom_ops={YourModule: count_your_model})
- Latency:
compute_latency函数来自compute_latency_ms_tensorrt函数或者compute_latency_ms_pytorch函数:
try:
from utils.darts_utils import compute_latency_ms_tensorrt as compute_latency
print("use TensorRT for latency test")
except:
from utils.darts_utils import compute_latency_ms_pytorch as compute_latency
print("use PyTorch for latency test")
下面看下二者的具体实现:
try:
import tensorrt as trt
from PIL import Image
import pycuda.driver as cuda
import pycuda.autoinit
MAX_BATCH_SIZE = 1
MAX_WORKSPACE_SIZE = 1 << 30
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
DTYPE = trt.float32
# Model
INPUT_NAME = 'input'
OUTPUT_NAME = 'output'
def allocate_buffers(engine):
h_input = cuda.pagelocked_empty(trt.volume(engine.get_binding_shape(0)), dtype=trt.nptype(DTYPE))
h_output = cuda.pagelocked_empty(trt.volume(engine.get_binding_shape(1)), dtype=trt.nptype(DTYPE))
d_input = cuda.mem_alloc(h_input.nbytes)
d_output = cuda.mem_alloc(h_output.nbytes)
return h_input, d_input, h_output, d_output
def build_engine(model_file):
with trt.Builder(TRT_LOGGER) as builder, builder.create_network() as network, trt.OnnxParser(network, TRT_LOGGER) as parser:
builder.max_workspace_size = MAX_WORKSPACE_SIZE
builder.max_batch_size = MAX_BATCH_SIZE
with open(model_file, 'rb') as model:
parser.parse(model.read())
return builder.build_cuda_engine(network)
def load_input(input_size, host_buffer):
assert len(input_size) == 4
b, c, h, w = input_size
dtype = trt.nptype(DTYPE)
img_array = np.random.randn(c, h, w).astype(dtype).ravel()
np.copyto(host_buffer, img_array)
def do_inference(context, h_input, d_input, h_output, d_output, iterations=None):
# Transfer input data to the GPU.
cuda.memcpy_htod(d_input, h_input)
# warm-up
for _ in range(10):
context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])
# test proper iterations
if iterations is None:
elapsed_time = 0
iterations = 100
while elapsed_time < 1:
t_start = time.time()
for _ in range(iterations):
context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])
elapsed_time = time.time() - t_start
iterations *= 2
FPS = iterations / elapsed_time
iterations = int(FPS * 3)
# Run inference.
t_start = time.time()
for _ in tqdm(range(iterations)):
context.execute(batch_size=1, bindings=[int(d_input), int(d_output)])
elapsed_time = time.time() - t_start
latency = elapsed_time / iterations * 1000
return latency
def compute_latency_ms_tensorrt(model, input_size, iterations=None):
model = model.cuda()
model.eval()
_, c, h, w = input_size
dummy_input = torch.randn(1, c, h, w, device='cuda')
torch.onnx.export(model, dummy_input, "model.onnx", verbose=False, input_names=["input"], output_names=["output"])
with build_engine("model.onnx") as engine:
h_input, d_input, h_output, d_output = allocate_buffers(engine)
load_input(input_size, h_input)
with engine.create_execution_context() as context:
latency = do_inference(context, h_input, d_input, h_output, d_output, iterations=iterations)
# FPS = 1000 / latency (in ms)
return latency
except:
warnings.warn("TensorRT (or pycuda) is not installed. compute_latency_ms_tensorrt() cannot be used.")
#########################################################################
def compute_latency_ms_pytorch(model, input_size, iterations=None, device=None):
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
model.eval()
model = model.cuda()
input = torch.randn(*input_size).cuda()
with torch.no_grad():
for _ in range(10):
model(input)
if iterations is None:
elapsed_time = 0
iterations = 100
while elapsed_time < 1:
torch.cuda.synchronize()
torch.cuda.synchronize()
t_start = time.time()
for _ in range(iterations):
model(input)
torch.cuda.synchronize()
torch.cuda.synchronize()
elapsed_time = time.time() - t_start
iterations *= 2
FPS = iterations / elapsed_time
iterations = int(FPS * 6)
print('=========Speed Testing=========')
torch.cuda.synchronize()
torch.cuda.synchronize()
t_start = time.time()
for _ in tqdm(range(iterations)):
model(input)
torch.cuda.synchronize()
torch.cuda.synchronize()
elapsed_time = time.time() - t_start
latency = elapsed_time / iterations * 1000
torch.cuda.empty_cache()
# FPS = 1000 / latency (in ms)
return latency
实现方法2:
- macs和params:
可以看到,macs是通过profile_macs函数计算的。def profile(self, config=None, config_op=None, config_path=None, verbose=True, reference_macs = 0, reference_params = 0):
netG = self.netG
if isinstance(netG, nn.DataParallel):
netG = netG.module
if config is not None:
netG.configs = config
if config_op is not None:
netG.config_op = config_op
if config_path is not None:
netG.config_path = config_path
with torch.no_grad():
macs = profile_macs(netG, (self.real_A[:1],))
params = 0
for p in netG.parameters():
params += p.numel()
if verbose:
print(config_path)
print('MACs: %.3fG\tParams: %.3fM' % (macs / 1e9 - reference_macs, params / 1e6 - reference_params), flush=True)
return macs, params
params是通过netG.parameters().numel()函数计算的。
那么profile_macs函数是如何实现的?
答:是通过torchprofile这个包中的profile_macs函数计算的。
from torchprofile import profile_macs
这个包的github地址为:
用法也很简单:
import torch
from torchvision.models import resnet18
model = resnet18()
inputs = torch.randn(1, 3, 224, 224)
之后就可以计算MACs using profile_macs:
from torchprofile import profile_macs
macs = profile_macs(model, inputs)
- Latency:
import sys
import time
import warnings
import torch
import tqdm
from torch.backends import cudnn
from configs import decode_config
from data import create_dataloader
from models import create_model
from options.test_options import TestOptions
def check(opt):
assert opt.serial_batches
assert opt.no_flip
assert opt.load_size == opt.crop_size
assert opt.preprocess == 'resize_and_crop'
assert opt.batch_size == 1
if not opt.no_fid:
assert opt.real_stat_path is not None
if opt.phase == 'train':
warnings.warn('You are using training set for inference.')
if __name__ == '__main__':
cudnn.enabled = True
opt = TestOptions().parse()
print(' '.join(sys.argv))
if opt.config_str is not None:
assert 'super' in opt.netG or 'sub' in opt.netG
config = decode_config(opt.config_str)
else:
assert 'super' not in opt.model
config = None
dataloader = create_dataloader(opt)
model = create_model(opt)
model.setup(opt)
for data in dataloader:
model.set_input(data)
break
# Warm-up times
for i in tqdm.trange(opt.times):
model.test(config)
if len(opt.gpu_ids) > 0:
torch.cuda.synchronize()
start_time = time.time()
for i in tqdm.trange(opt.times):
model.test(config)
if len(opt.gpu_ids) > 0:
torch.cuda.synchronize()
cost_time = time.time() - start_time
print('Cost Time: %.2fs\tLatency: %.4fs' % (cost_time, cost_time / opt.times))
对这2种方案做个对比:
ResNet 18:
1 torchprofile计算macs:
import torch
from torchvision.models import resnet18
model = resnet18()
inputs = torch.randn(1, 3, 224, 224)
from torchprofile import profile_macs
macs = profile_macs(model, inputs)
print(macs)
结果:1814073856
2 thop计算flops:
from thop import profile
import torch
from torchvision.models import resnet18
model = resnet18()
input=torch.randn(1,3,224,224)
flops, params = profile(model, inputs=(input,))
print(flops)
结果:1824545792.0
二者基本一致。
ResNet 50:
1 torchprofile计算macs:
import torch
from torchvision.models import resnet50
model = resnet50()
inputs = torch.randn(1, 3, 224, 224)
from torchprofile import profile_macs
macs = profile_macs(model, inputs)
print(macs)
结果:4089186304
2 thop计算flops:
from thop import profile
import torch
from torchvision.models import resnet50
model = resnet50()
input=torch.randn(1,3,224,224)
flops, params = profile(model, inputs=(input,))
print(flops)
结果:4135790592.0
二者基本一致。
结论:
这2种方案计算的模型运算量基本一致。