模型的写法
1 弄明白核心模块的流程
2 在脑中先过一遍训练的流程,训练的流程是怎样的
3 确定好loss函数
4 确定网络结构
5 确定代码大概有哪些个模块
eg 对于cyclegan来说
有discriminator_model,generator_model,dataset,train
6 开始写模型
- 引用pytorch和nn模块
- 定义卷积block
- 一个block模块包含卷积层、归一化层、激活层
import torch
import torch.nn as nn
class Block(nn.Module):
def __init__(self, in_channels, out_channels, stride):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 4, stride, 1, bias=True, padding_mode="reflect"),
nn.InstanceNorm2d(out_channels),
nn.LeakyReLU(0.2, inplace=True),
)
def forward(self, x):
return self.conv(x)
- 写主模块
- 主模块的输入一般为经过预处理的数据(以图像举例,便是)
- 之后是每一层的通道数量
- 主模块中的第一层称之为initial层,经过卷积和激活,不经过归一化处理;
- 之后是定义layer层,一系列的block叠加
最后:注意,如果是返回概率值需要经过sigmod激活函数,将输出映射到0-1区间
测试,随便生成一个向量,通过模型查看shape
class Discriminator(nn.Module):
def __init__(self, in_channels=3, features=[64, 128, 256, 512]):
super().__init__()
self.initial = nn.Sequential(
nn.Conv2d(
in_channels,
features[0],
kernel_size=4,
stride=2,
padding=1,
padding_mode="reflect",
),
nn.LeakyReLU(0.2, inplace=True),
)
layers = []
in_channels = features[0]
for feature in features[1:]:
layers.append(Block(in_channels, feature, stride=1 if feature==features[-1] else 2))
in_channels = feature
layers.append(nn.Conv2d(in_channels, 1, kernel_size=4, stride=1, padding=1, padding_mode="reflect"))
self.model = nn.Sequential(*layers)
def forward(self, x):
x = self.initial(x)
return torch.sigmoid(self.model(x))
def test():
x = torch.randn((5, 3, 256, 256))
model = Discriminator(in_channels=3)
preds = model(x)
print(preds.shape)
if __name__ == "__main__":
test()
生成模型建立的新例子
in_channels,out_channels,down可以控制是编码和界面阶段,kwargs(keyword arguments)例如stride,padding,padding mode。
class ConvBlock(nn.Module):
def __init__(self, in_channels, out_channels, down=True, use_act=True, **kwargs):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, padding_mode="reflect", **kwargs)
if down
else nn.ConvTranspose2d(in_channels, out_channels, **kwargs),
nn.InstanceNorm2d(out_channels),
nn.ReLU(inplace=True) if use_act else nn.Identity()
)
def forward(self, x):
return self.conv(x)
建立残差模块
默认down为True,通过两次卷积,之后与X相加(残差模块,X不用提前normlize)
class ResidualBlock(nn.Module):
def __init__(self, channels):
super().__init__()
self.block = nn.Sequential(
ConvBlock(channels, channels, kernel_size=3, padding=1),
ConvBlock(channels, channels, use_act=False, kernel_size=3, padding=1),
)
def forward(self, x):
return x + self.block(x)
具体的生成模块
1 initial模块 输入是(处理后的原图像)
2 down_blocks(encoder)
3 bottleneck 残差卷积模块
4 decode模块和down_blocks相反
5 last卷积模块和initial模块对应
6 通过tanh激活函数将输出映射到-1到1
7 test该模块
ps modellist用法
self.down_blocks = nn.ModuleList(
[
ConvBlock(num_features, num_features*2, kernel_size=3, stride=2, padding=1),
ConvBlock(num_features*2, num_features*4, kernel_size=3, stride=2, padding=1),
]
)
for layer in self.down_blocks:
x = layer(x)
class Generator(nn.Module):
def __init__(self, img_channels, num_features = 64, num_residuals=9):
super().__init__()
self.initial = nn.Sequential(
nn.Conv2d(img_channels, num_features, kernel_size=7, stride=1, padding=3, padding_mode="reflect"),
nn.InstanceNorm2d(num_features),
nn.ReLU(inplace=True),
)
self.down_blocks = nn.ModuleList(
[
ConvBlock(num_features, num_features*2, kernel_size=3, stride=2, padding=1),
ConvBlock(num_features*2, num_features*4, kernel_size=3, stride=2, padding=1),
]
)
self.res_blocks = nn.Sequential(
*[ResidualBlock(num_features*4) for _ in range(num_residuals)]
)
self.up_blocks = nn.ModuleList(
[
ConvBlock(num_features*4, num_features*2, down=False, kernel_size=3, stride=2, padding=1, output_padding=1),
ConvBlock(num_features*2, num_features*1, down=False, kernel_size=3, stride=2, padding=1, output_padding=1),
]
)
self.last = nn.Conv2d(num_features*1, img_channels, kernel_size=7, stride=1, padding=3, padding_mode="reflect")
def forward(self, x):
x = self.initial(x)
for layer in self.down_blocks:
x = layer(x)
x = self.res_blocks(x)
for layer in self.up_blocks:
x = layer(x)
return torch.tanh(self.last(x))
def test():
img_channels = 3
img_size = 256
x = torch.randn((2, img_channels, img_size, img_size))
gen = Generator(img_channels, 9)
print(gen(x).shape)
utils
save_checkpoint
load_checkpoint
def save_checkpoint(model, optimizer, filename="my_checkpoint.pth.tar"):
print("=> Saving checkpoint")
checkpoint = {
"state_dict": model.state_dict(),
"optimizer": optimizer.state_dict(),
}
torch.save(checkpoint, filename)
def load_checkpoint(checkpoint_file, model, optimizer, lr):
print("=> Loading checkpoint")
checkpoint = torch.load(checkpoint_file, map_location=config.DEVICE)
model.load_state_dict(checkpoint["state_dict"])
optimizer.load_state_dict(checkpoint["optimizer"])
# If we don't do this then it will just have learning rate of old checkpoint
# and it will lead to many hours of debugging \:
for param_group in optimizer.param_groups:
param_group["lr"] = lr
设置随机种子
def seed_everything(seed=42):
os.environ["PYTHONHASHSEED"] = str(seed)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
config
可以用argphase
定义所有的超参数
和data的预处理
transforms = A.Compose(
[
A.Resize(width=256, height=256),
A.HorizontalFlip(p=0.5),
A.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], max_pixel_value=255),
ToTensorV2(),
],
additional_targets={"image0": "image"},
)
Dataset
class HorseZebraDataset(Dataset):
def __init__(self, root_zebra, root_horse, transform=None):
self.root_zebra = root_zebra
self.root_horse = root_horse
self.transform = transform
self.zebra_images = os.listdir(root_zebra)
self.horse_images = os.listdir(root_horse)
self.length_dataset = max(len(self.zebra_images), len(self.horse_images)) # 1000, 1500
self.zebra_len = len(self.zebra_images)
self.horse_len = len(self.horse_images)
def __len__(self):
return self.length_dataset
def __getitem__(self, index):
zebra_img = self.zebra_images[index % self.zebra_len]
horse_img = self.horse_images[index % self.horse_len]
zebra_path = os.path.join(self.root_zebra, zebra_img)
horse_path = os.path.join(self.root_horse, horse_img)
zebra_img = np.array(Image.open(zebra_path).convert("RGB"))
horse_img = np.array(Image.open(horse_path).convert("RGB"))
if self.transform:
augmentations = self.transform(image=zebra_img, image0=horse_img)
zebra_img = augmentations["image"]
horse_img = augmentations["image0"]
return zebra_img, horse_img
Train
- 训练小技巧
- 将数据转换成float16,速度快,节省显存
- 用tqdm来可视化训练进度
- 弄明白scalar 的用法
- 及时保存模型
- 输出模型的结果
- 损失函数的保存(用来绘制损失函数的趋势图)
import torch
from dataset import HorseZebraDataset
import sys
from utils import save_checkpoint, load_checkpoint
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import config
from tqdm import tqdm
from torchvision.utils import save_image
from discriminator_model import Discriminator
from generator_model import Generator
def train_fn(disc_H, disc_Z, gen_Z, gen_H, loader, opt_disc, opt_gen, l1, mse, d_scaler, g_scaler):
H_reals = 0
H_fakes = 0
loop = tqdm(loader, leave=True)
for idx, (zebra, horse) in enumerate(loop):
zebra = zebra.to(config.DEVICE)
horse = horse.to(config.DEVICE)
# Train Discriminators H and Z
with torch.cuda.amp.autocast():
fake_horse = gen_H(zebra)
D_H_real = disc_H(horse)
D_H_fake = disc_H(fake_horse.detach())
H_reals += D_H_real.mean().item()
H_fakes += D_H_fake.mean().item()
D_H_real_loss = mse(D_H_real, torch.ones_like(D_H_real))
D_H_fake_loss = mse(D_H_fake, torch.zeros_like(D_H_fake))
D_H_loss = D_H_real_loss + D_H_fake_loss
fake_zebra = gen_Z(horse)
D_Z_real = disc_Z(zebra)
D_Z_fake = disc_Z(fake_zebra.detach())
D_Z_real_loss = mse(D_Z_real, torch.ones_like(D_Z_real))
D_Z_fake_loss = mse(D_Z_fake, torch.zeros_like(D_Z_fake))
D_Z_loss = D_Z_real_loss + D_Z_fake_loss
# put it togethor
D_loss = (D_H_loss + D_Z_loss)/2
opt_disc.zero_grad()
d_scaler.scale(D_loss).backward()
d_scaler.step(opt_disc)
d_scaler.update()
# Train Generators H and Z
with torch.cuda.amp.autocast():
# adversarial loss for both generators
D_H_fake = disc_H(fake_horse)
D_Z_fake = disc_Z(fake_zebra)
loss_G_H = mse(D_H_fake, torch.ones_like(D_H_fake))
loss_G_Z = mse(D_Z_fake, torch.ones_like(D_Z_fake))
# cycle loss
cycle_zebra = gen_Z(fake_horse)
cycle_horse = gen_H(fake_zebra)
cycle_zebra_loss = l1(zebra, cycle_zebra)
cycle_horse_loss = l1(horse, cycle_horse)
# identity loss (remove these for efficiency if you set lambda_identity=0)
identity_zebra = gen_Z(zebra)
identity_horse = gen_H(horse)
identity_zebra_loss = l1(zebra, identity_zebra)
identity_horse_loss = l1(horse, identity_horse)
# add all togethor
G_loss = (
loss_G_Z
+ loss_G_H
+ cycle_zebra_loss * config.LAMBDA_CYCLE
+ cycle_horse_loss * config.LAMBDA_CYCLE
+ identity_horse_loss * config.LAMBDA_IDENTITY
+ identity_zebra_loss * config.LAMBDA_IDENTITY
)
opt_gen.zero_grad()
g_scaler.scale(G_loss).backward()
g_scaler.step(opt_gen)
g_scaler.update()
if idx % 200 == 0:
save_image(fake_horse*0.5+0.5, f"saved_images/horse_{idx}.png")
save_image(fake_zebra*0.5+0.5, f"saved_images/zebra_{idx}.png")
loop.set_postfix(H_real=H_reals/(idx+1), H_fake=H_fakes/(idx+1))
def main():
disc_H = Discriminator(in_channels=3).to(config.DEVICE)
disc_Z = Discriminator(in_channels=3).to(config.DEVICE)
gen_Z = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)
gen_H = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)
opt_disc = optim.Adam(
list(disc_H.parameters()) + list(disc_Z.parameters()),
lr=config.LEARNING_RATE,
betas=(0.5, 0.999),
)
opt_gen = optim.Adam(
list(gen_Z.parameters()) + list(gen_H.parameters()),
lr=config.LEARNING_RATE,
betas=(0.5, 0.999),
)
L1 = nn.L1Loss()
mse = nn.MSELoss()
if config.LOAD_MODEL:
load_checkpoint(
config.CHECKPOINT_GEN_H, gen_H, opt_gen, config.LEARNING_RATE,
)
load_checkpoint(
config.CHECKPOINT_GEN_Z, gen_Z, opt_gen, config.LEARNING_RATE,
)
load_checkpoint(
config.CHECKPOINT_CRITIC_H, disc_H, opt_disc, config.LEARNING_RATE,
)
load_checkpoint(
config.CHECKPOINT_CRITIC_Z, disc_Z, opt_disc, config.LEARNING_RATE,
)
dataset = HorseZebraDataset(
root_horse=config.TRAIN_DIR+"/horses", root_zebra=config.TRAIN_DIR+"/zebras", transform=config.transforms
)
val_dataset = HorseZebraDataset(
root_horse="cyclegan_test/horse1", root_zebra="cyclegan_test/zebra1", transform=config.transforms
)
val_loader = DataLoader(
val_dataset,
batch_size=1,
shuffle=False,
pin_memory=True,
)
loader = DataLoader(
dataset,
batch_size=config.BATCH_SIZE,
shuffle=True,
num_workers=config.NUM_WORKERS,
pin_memory=True
)
g_scaler = torch.cuda.amp.GradScaler()
d_scaler = torch.cuda.amp.GradScaler()
for epoch in range(config.NUM_EPOCHS):
train_fn(disc_H, disc_Z, gen_Z, gen_H, loader, opt_disc, opt_gen, L1, mse, d_scaler, g_scaler)
if config.SAVE_MODEL:
save_checkpoint(gen_H, opt_gen, filename=config.CHECKPOINT_GEN_H)
save_checkpoint(gen_Z, opt_gen, filename=config.CHECKPOINT_GEN_Z)
save_checkpoint(disc_H, opt_disc, filename=config.CHECKPOINT_CRITIC_H)
save_checkpoint(disc_Z, opt_disc, filename=config.CHECKPOINT_CRITIC_Z)
if __name__ == "__main__":
main()
关于混合精度训练
PyTorch的自动混合精度(AMP) - Gemfield的文章 - 知乎 https://zhuanlan.zhihu.com/p/165152789
torch.HalfTensor的优势就是存储小、计算快、更好的利用CUDA设备的Tensor Core。因此训练的时候可以减少显存的占用(可以增加batchsize了),同时训练速度更快;
torch.HalfTensor的劣势就是:数值范围小(更容易Overflow / Underflow)、舍入误差(Rounding Error,导致一些微小的梯度信息达不到16bit精度的最低分辨率,从而丢失)。
可见,当有优势的时候就用torch.HalfTensor,而为了消除torch.HalfTensor的劣势,我们带来了两种解决方案:
1,梯度scale,这正是上一小节中提到的torch.cuda.amp.GradScaler,通过放大loss的值来防止梯度的underflow(这只是BP的时候传递梯度信息使用,真正更新权重的时候还是要把放大的梯度再unscale回去);
2,回落到torch.FloatTensor,这就是混合一词的由来。那怎么知道什么时候用torch.FloatTensor,什么时候用半精度浮点型呢?这是PyTorch框架决定的,在PyTorch 1.6的AMP上下文中,如下操作中tensor会被自动转化为半精度浮点型的torch.HalfTensor:
- matmul
- addbmm
- addmm
- addmv
- addr
- baddbmm
- bmm
- chain_matmul
- conv1d
- conv2d
- conv3d
- conv_transpose1d
- conv_transpose2d
- conv_transpose3d
- linear
- matmul
- mm
- mv
- prelu
如何在PyTorch中使用自动混合精度?
答案就是autocast + GradScaler。
1,autocast
正如前文所说,需要使用torch.cuda.amp模块中的autocast 类。使用也是非常简单的:
from torch.cuda.amp import autocast as autocast
# 创建model,默认是torch.FloatTensor
model = Net().cuda()
optimizer = optim.SGD(model.parameters(), ...)
for input, target in data:
optimizer.zero_grad()
# 前向过程(model + loss)开启 autocast
with autocast():
output = model(input)
loss = loss_fn(output, target)
# 反向传播在autocast上下文之外
loss.backward()
optimizer.step()
可以使用autocast的context managers语义(如上所示),也可以使用decorators语义。 当进入autocast的上下文后,上面列出来的那些CUDA ops 会把tensor的dtype转换为半精度浮点型,从而在不损失训练精度的情况下加快运算。刚进入autocast的上下文时,tensor可以是任何类型,你不要在model或者input上手工调用.half()
,框架会自动做,这也是自动混合精度中“自动”一词的由来。
另外一点就是,autocast上下文应该只包含网络的前向过程(包括loss的计算),而不要包含反向传播,因为BP的op会使用和前向op相同的类型。
还有的时候呀,你的代码在autocast上下文中会报如下的错误:
Traceback (most recent call last):
......
File "/opt/conda/lib/python3.7/site-packages/torch/nn/modules/module.py", line 722, in _call_impl
result = self.forward(*input, **kwargs)
......
RuntimeError: expected scalar type float but found c10::Half
对于RuntimeError: expected scalar type float but found c10::Half,这估计是个bug。你可以在tensor上手工调用.float()来让type匹配。
但是别忘了前面提到的梯度scaler模块呀,需要在训练最开始之前实例化一个GradScaler对象。因此PyTorch中经典的AMP使用方式如下:
from torch.cuda.amp import autocast as autocast
# 创建model,默认是torch.FloatTensor
model = Net().cuda()
optimizer = optim.SGD(model.parameters(), ...)
# 在训练最开始之前实例化一个GradScaler对象
scaler = GradScaler()
for epoch in epochs:
for input, target in data:
optimizer.zero_grad()
# 前向过程(model + loss)开启 autocast
with autocast():
output = model(input)
loss = loss_fn(output, target)
# Scales loss. 为了梯度放大.
scaler.scale(loss).backward()
# scaler.step() 首先把梯度的值unscale回来.
# 如果梯度的值不是 infs 或者 NaNs, 那么调用optimizer.step()来更新权重,
# 否则,忽略step调用,从而保证权重不更新(不被破坏)
scaler.step(optimizer)
# 准备着,看是否要增大scaler
scaler.update()
scaler的大小在每次迭代中动态的估计,为了尽可能的减少梯度underflow,scaler应该更大;但是如果太大的话,半精度浮点型的tensor又容易overflow(变成inf或者NaN)。所以动态估计的原理就是在不出现inf或者NaN梯度值的情况下尽可能的增大scaler的值——在每次scaler.step(optimizer)中,都会检查是否又inf或NaN的梯度出现:
1,如果出现了inf或者NaN,scaler.step(optimizer)会忽略此次的权重更新(optimizer.step() ),并且将scaler的大小缩小(乘上backoff_factor);
2,如果没有出现inf或者NaN,那么权重正常更新,并且当连续多次(growth_interval指定)没有出现inf或者NaN,则scaler.update()会将scaler的大小增加(乘上growth_factor)。
以上项目对应的例子
g_scaler = torch.cuda.amp.GradScaler()
d_scaler = torch.cuda.amp.GradScaler()
for epoch in range(config.NUM_EPOCHS):
train_fn(disc_H, disc_Z, gen_Z, gen_H, loader, opt_disc, opt_gen, L1, mse, d_scaler, g_scaler)
for idx, (zebra, horse) in enumerate(loop):
zebra = zebra.to(config.DEVICE)
horse = horse.to(config.DEVICE)
# Train Discriminators H and Z
with torch.cuda.amp.autocast():
fake_horse = gen_H(zebra)
D_H_real = disc_H(horse)
D_H_fake = disc_H(fake_horse.detach())
H_reals += D_H_real.mean().item()
H_fakes += D_H_fake.mean().item()
D_H_real_loss = mse(D_H_real, torch.ones_like(D_H_real))
D_H_fake_loss = mse(D_H_fake, torch.zeros_like(D_H_fake))
D_H_loss = D_H_real_loss + D_H_fake_loss
fake_zebra = gen_Z(horse)
D_Z_real = disc_Z(zebra)
D_Z_fake = disc_Z(fake_zebra.detach())
D_Z_real_loss = mse(D_Z_real, torch.ones_like(D_Z_real))
D_Z_fake_loss = mse(D_Z_fake, torch.zeros_like(D_Z_fake))
D_Z_loss = D_Z_real_loss + D_Z_fake_loss
# put it togethor
D_loss = (D_H_loss + D_Z_loss)/2
opt_disc.zero_grad()
d_scaler.scale(D_loss).backward()
d_scaler.step(opt_disc)
d_scaler.update()
# Train Generators H and Z
with torch.cuda.amp.autocast():
# adversarial loss for both generators
D_H_fake = disc_H(fake_horse)
D_Z_fake = disc_Z(fake_zebra)
loss_G_H = mse(D_H_fake, torch.ones_like(D_H_fake))
loss_G_Z = mse(D_Z_fake, torch.ones_like(D_Z_fake))
# cycle loss
cycle_zebra = gen_Z(fake_horse)
cycle_horse = gen_H(fake_zebra)
cycle_zebra_loss = l1(zebra, cycle_zebra)
cycle_horse_loss = l1(horse, cycle_horse)
# identity loss (remove these for efficiency if you set lambda_identity=0)
identity_zebra = gen_Z(zebra)
identity_horse = gen_H(horse)
identity_zebra_loss = l1(zebra, identity_zebra)
identity_horse_loss = l1(horse, identity_horse)
# add all togethor
G_loss = (
loss_G_Z
+ loss_G_H
+ cycle_zebra_loss * config.LAMBDA_CYCLE
+ cycle_horse_loss * config.LAMBDA_CYCLE
+ identity_horse_loss * config.LAMBDA_IDENTITY
+ identity_zebra_loss * config.LAMBDA_IDENTITY
)
opt_gen.zero_grad()
g_scaler.scale(G_loss).backward()
g_scaler.step(opt_gen)
g_scaler.update()