YoloV1
Keras框架实现
下载原始darknet的权重
http://pjreddie.com/media/files/yolov1/tiny-yolov1.weights
将darknet模型转为keras模型
"""Reads Darknet config and weights and creates Keras models with TF backend.Currently only supports layers in Yolov1-tiny config."""import argparseimport configparserimport ioimport osfrom collections import defaultdictimport numpy as npimport tensorflow.keras.backend as Kfrom tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dense, Flatten, Dropout, Reshape, LeakyReLU, ReLU, BatchNormalizationfrom tensorflow.keras import Modelfrom tensorflow.keras.regularizers import l2def unique_config_sections(config_file): """ 将配置文件所有的节点生成独一无二的名字,添加到每一个节点后边 """ section_counters = defaultdict(int) output_stream = io.StringIO() with open(config_file) as fin: for line in fin: if line.startswith('['): section = line.strip().strip('[]') # 添加新的后缀 _section = section + '_' + str(section_counters[section]) # 同一前缀的后缀加1 section_counters[section] += 1 # 用新的节点名字替换原有的节点名字 line = line.replace(section, _section) output_stream.write(line) output_stream.seek(0) return output_streamdef main(args): config_path = os.path.expanduser(args.config_path) weights_path = os.path.expanduser(args.weights_path) output_path = os.path.expanduser(args.output_path) assert config_path.endswith('.cfg'), '{} is not a .cfg file'.format(config_path) assert weights_path.endswith('.weights'), '{} is not a .weights file'.format(weights_path) assert output_path.endswith('.hdf5'), 'output path {} is not a .hdf5 file'.format(output_path) # Load weights and config. print('加载darknet的权重...') weights_file = open(weights_path, 'rb') # 读取darknet网络权重的头部信息 weights_header = np.ndarray(shape=(4,), dtype='int32', buffer=weights_file.read(16)) print('darknet网络权重的头部信息:', weights_header) print('解析Darknet配置文件') unique_config_file = unique_config_sections(config_path) cfg_parser = configparser.ConfigParser() cfg_parser.read_file(unique_config_file) print('开始生成Keras网络模型...') try: image_height = int(cfg_parser['crop_0']['crop_height']) image_width = int(cfg_parser['crop_0']['crop_width']) except KeyError: image_height = int(cfg_parser['net_0']['height']) image_width = int(cfg_parser['net_0']['width']) # 定义输入层 prev_layer = Input(shape=(image_height, image_width, 3)) all_layers = [prev_layer] # 权重衰减(目的不是为了提高精度或者提高速度,而是防止过拟合) weight_decay = float(cfg_parser['net_0']['decay']) if 'net_0' in cfg_parser.sections() else 5e-4 count = 0 fc_flag = False for section in cfg_parser.sections(): print('正在解析节点: {}'.format(section)) # 解析卷积相关 if section.startswith('convolutional'): filters = int(cfg_parser[section]['filters']) size = int(cfg_parser[section]['size']) stride = int(cfg_parser[section]['stride']) pad = int(cfg_parser[section]['pad']) activation = cfg_parser[section]['activation'] batch_normalize = 'batch_normalize' in cfg_parser[section] # padding='same' is equivalent to Darknet pad=1 padding = 'same' if pad == 1 else 'valid' # Setting weights. # Darknet serializes convolutional weights as: # [bias/beta, [gamma, mean, variance], conv_weights] # 获取上一层的形状 prev_layer_shape = K.int_shape(prev_layer) # TODO: This assumes channel last dim_ordering. # keras设计成channel last,而darknet是channel first weights_shape = (size, size, prev_layer_shape[-1], filters) darknet_w_shape = (filters, weights_shape[2], size, size) # 参数个数 weights_size = np.product(weights_shape) print('conv2d', 'bn' if batch_normalize else ' ', activation, weights_shape) conv_bias = np.ndarray(shape=(filters,), dtype='float32', buffer=weights_file.read(filters * 4)) count += filters bn_weight_list = [] if batch_normalize: bn_weights = np.ndarray(shape=(3, filters), dtype='float32', buffer=weights_file.read(filters * 12)) count += 3 * filters # TODO: Keras BatchNormalization mistakenly refers to var # as std. bn_weight_list = [ bn_weights[0], # scale gamma conv_bias, # shift beta bn_weights[1], # running mean bn_weights[2] # running var ] conv_weights = np.ndarray( shape=darknet_w_shape, dtype='float32', buffer=weights_file.read(weights_size * 4)) count += weights_size # DarkNet conv_weights are serialized Caffe-style: # (out_dim, in_dim, height, width) # We would like to set these to Tensorflow order: # (height, width, in_dim, out_dim) # TODO: Add check for Theano dim ordering. conv_weights = np.transpose(conv_weights, [2, 3, 1, 0]) conv_weights = [conv_weights] if batch_normalize \ else [conv_weights, conv_bias] # Handle activation. act_fn = None if activation == 'leaky': pass # Add advanced activation later. elif activation == 'relu': pass elif activation != 'linear': raise ValueError( 'Unknown activation function `{}` in section {}'.format( activation, section)) # Create Conv2D layer conv_layer = Conv2D( filters, (size, size), strides=(stride, stride), kernel_regularizer=l2(weight_decay), use_bias=not batch_normalize, weights=conv_weights, activation=act_fn, padding=padding, name=format(section))(prev_layer) if batch_normalize: conv_layer = BatchNormalization( weights=bn_weight_list, name='bn' + format(section))(conv_layer) prev_layer = conv_layer if activation == 'linear': all_layers.append(prev_layer) elif activation == 'leaky': act_layer = LeakyReLU(alpha=0.1)(prev_layer) prev_layer = act_layer all_layers.append(act_layer) elif activation == 'relu': act_layer = ReLU()(prev_layer) prev_layer = act_layer all_layers.append(act_layer) # 解析池化相关 elif section.startswith('maxpool'): size = int(cfg_parser[section]['size']) stride = int(cfg_parser[section]['stride']) all_layers.append( MaxPooling2D( padding='same', pool_size=(size, size), strides=(stride, stride))(prev_layer)) prev_layer = all_layers[-1] # 解析连接相关 elif section.startswith('connected'): output_size = int(cfg_parser[section]['output']) activation = cfg_parser[section]['activation'] prev_layer_shape = K.int_shape(prev_layer) # TODO: This assumes channel last dim_ordering. weights_shape = (np.prod(prev_layer_shape[1:]), output_size) darknet_w_shape = (output_size, weights_shape[0]) weights_size = np.product(weights_shape) print('full-connected', activation, weights_shape) fc_bias = np.ndarray( shape=(output_size,), dtype='float32', buffer=weights_file.read(output_size * 4)) count += output_size fc_weights = np.ndarray( shape=darknet_w_shape, dtype='float32', buffer=weights_file.read(weights_size * 4)) count += weights_size # DarkNet fc_weights are serialized Caffe-style: # (out_dim, in_dim) # We would like to set these to Tensorflow order: # (in_dim, out_dim) # TODO: Add check for Theano dim ordering. fc_weights = np.transpose(fc_weights, [1, 0]) fc_weights = [fc_weights, fc_bias] # Handle activation. act_fn = None if activation == 'leaky': pass # Add advanced activation later. elif activation == 'relu': pass elif activation != 'linear': raise ValueError( 'Unknown activation function `{}` in section {}'.format( activation, section)) if not fc_flag: prev_layer = Flatten()(prev_layer) fc_flag = True # Create Full-Connect layer fc_layer = Dense( output_size, kernel_regularizer=l2(weight_decay), weights=fc_weights, activation=act_fn, name=format(section))(prev_layer) prev_layer = fc_layer if activation == 'linear': all_layers.append(prev_layer) elif activation == 'leaky': act_layer = LeakyReLU(alpha=0.1)(prev_layer) prev_layer = act_layer all_layers.append(act_layer) elif activation == 'relu': act_layer = ReLU()(prev_layer) prev_layer = act_layer all_layers.append(act_layer) # 解析dropout相关 elif section.startswith('dropout'): probability = float(cfg_parser[section]['probability']) dropout_layer = Dropout(probability)(prev_layer) prev_layer = dropout_layer all_layers.append(prev_layer) # 解析目标检测相关 elif section.startswith('detection'): classes = int(cfg_parser[section]['classes']) coords = int(cfg_parser[section]['coords']) rescore = int(cfg_parser[section]['rescore']) side = int(cfg_parser[section]['side']) num = int(cfg_parser[section]['num']) reshape_layer = Reshape( (side, side, classes + num * (coords + rescore)) )(prev_layer) prev_layer = reshape_layer all_layers.append(prev_layer) # net、crop、detection、softmax不解析 elif (section.startswith('net') or section.startswith('crop') or section.startswith('detection') or section.startswith('softmax')): pass # Configs not currently handled during models definition. # 异常节点抛出 else: raise ValueError('不支持节点类型: {}'.format(section)) # 创建keras的model model = Model(inputs=all_layers[0], outputs=all_layers[-1]) print("keras的model简要内容:") model.summary() # 保存model的权值 model.save_weights('{}'.format(output_path)) print('Saved Keras models to {}'.format(output_path)) # 读取darknet剩余的权值 remaining_weights = len(weights_file.read()) / 4 # 关闭流 weights_file.close() print('Read {} of {} from Darknet weights.'.format(count, count + remaining_weights)) if remaining_weights > 0: print('Warning: {} unused weights'.format(remaining_weights))if __name__ == '__main__': parser = argparse.ArgumentParser(description='Darknet Yolov1-tiny To Keras Converter.') parser.add_argument('config_path', help='Path to Darknet cfg file.') parser.add_argument('weights_path', help='Path to Darknet weights file.') parser.add_argument('output_path', help='Path to output Keras models file.') # main(parser.parse_args()) main(parser.parse_args(['cfg/yolov1-tiny.cfg', 'weights/tiny-yolov1.weights', 'weights/tiny-yolov1.hdf5']))
Voc数据集转为Keras适用的形式
import argparseimport xml.etree.ElementTree as ElTrimport os# 数据集sets = [('2007', 'train'), ('2007', 'val'), ('2007', 'test')]classes_num = { 'aeroplane': 0, 'bicycle': 1, 'bird': 2, 'boat': 3, 'bottle': 4, 'bus': 5, 'car': 6, 'cat': 7, 'chair': 8, 'cow': 9, 'diningtable': 10, 'dog': 11, 'horse': 12, 'motorbike': 13, 'person': 14, 'pottedplant': 15, 'sheep': 16, 'sofa': 17, 'train': 18, 'tvmonitor': 19}def convert_annotation(annotation_dir, year, image_id, f): # 获取文件的标注内容 in_file = os.path.join(annotation_dir, 'VOC%s/Annotations/%s.xml' % (year, image_id)) tree = ElTr.parse(in_file) root = tree.getroot() # 获取标注xml文件中所有的object节点 for obj in root.iter('object'): # 目标识别难度 difficult = obj.find('difficult').text # 类别名称 cls = obj.find('name').text # 全部的类别 classes = list(classes_num.keys()) # 如果object不在分类类比中或者识别难度等于1,则不用于训练 if cls not in classes or int(difficult) == 1: continue # 类别映射号码 cls_id = classes.index(cls) # 获取object的边界 xml_box = obj.find('bndbox') b = (int(xml_box.find('xmin').text), int(xml_box.find('ymin').text), int(xml_box.find('xmax').text), int(xml_box.find('ymax').text)) f.write(' ' + ','.join([str(a) for a in b]) + ',' + str(cls_id))def main(args): data_dir = os.path.expanduser(args.dir) for year, image_set in sets: # 读取训练集的文件编号 with open(os.path.join(data_dir, 'VOC%s/ImageSets/Main/%s.txt' % (year, image_set)), 'r') as f: image_ids = f.read().strip().split() # 获取训练数据的具体路径 with open(os.path.join(data_dir, '%s_%s.txt' % (year, image_set)), 'w') as f: for image_id in image_ids: f.write('%s/VOC%s/JPEGImages/%s.jpg' % (data_dir, year, image_id)) convert_annotation(data_dir, year, image_id, f) f.write('\n')if __name__ == '__main__': parser = argparse.ArgumentParser(description='Build Annotations.') parser.add_argument('dir', default='..', help='Annotations.') main(parser.parse_args(['data/VOCdevkit']))