讲 loss 的 github 地址是https://github.com/lawlite19/Blog-Back-Up/blob/master/code/triplet-loss/triplet_loss_np.py#L103
def _pairwise_distance(embeddings, squared=False): ------------------------------------------ embedding: 特征向量, 大小(batch_size, vector_size) distances: 两两embeddings的距离矩阵,大小 (batch_size, batch_size) dot_product = tf.matmul(embeddings, tf.transpose(embeddings)) square_norm = tf.diag_part(dot_product) distances = tf.expand_dims(square_norm, axis=1) - 2.0 * dot_product + tf.expand_dims(square_norm, axis=0) distances = tf.maximum(distances, 0.0) distances = distances + mask * 1e-16 distances = tf.sqrt(distances) distances = distances * (1.0 - mask)
def _get_triplet_mask(labels): 得到一个3D的mask [a, p, n], 对应triplet(a, p, n)是valid的位置是True ---------------------------------- labels: 对应训练数据的labels, shape = (batch_size,) mask: 3D,shape = (batch_size, batch_size, batch_size) indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool) indices_not_equal = tf.logical_not(indices_equal) i_not_equal_j = tf.expand_dims(indices_not_equal, 2) i_not_equal_k = tf.expand_dims(indices_not_equal, 1) j_not_equal_k = tf.expand_dims(indices_not_equal, 0) distinct_indices = tf.logical_and(tf.logical_and(i_not_equal_j, i_not_equal_k), j_not_equal_k) label_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1)) i_equal_j = tf.expand_dims(label_equal, 2) i_equal_k = tf.expand_dims(label_equal, 1) valid_labels = tf.logical_and(i_equal_j, tf.logical_not(i_equal_k)) mask = tf.logical_and(distinct_indices, valid_labels)
def batch_all_triplet_loss(labels, embeddings, margin, squared=False): ------------------------------- labels: 标签数据,shape = (batch_size,) embeddings: 提取的特征向量, shape = (batch_size, vector_size) triplet_loss: scalar, 一个batch的损失值 fraction_postive_triplets : valid的triplets占的比例 pairwise_dis = _pairwise_distance(embeddings, squared=squared) anchor_positive_dist = tf.expand_dims(pairwise_dis, 2)assert anchor_positive_dist.shape[2] == 1, "{}".format(anchor_positive_dist.shape) anchor_negative_dist = tf.expand_dims(pairwise_dis, 1)assert anchor_negative_dist.shape[1] == 1, "{}".format(anchor_negative_dist.shape) triplet_loss = anchor_positive_dist - anchor_negative_dist + margin mask = _get_triplet_mask(labels) triplet_loss = tf.multiply(mask, triplet_loss) triplet_loss = tf.maximum(triplet_loss, 0.0) valid_triplets = tf.to_float(tf.greater(triplet_loss, 1e-16)) num_positive_triplets = tf.reduce_sum(valid_triplets) num_valid_triplets = tf.reduce_sum(mask) fraction_postive_triplets = num_positive_triplets / (num_valid_triplets + 1e-16) triplet_loss = tf.reduce_sum(triplet_loss) / (num_positive_triplets + 1e-16)return triplet_loss, fraction_postive_triplets
def _get_anchor_positive_triplet_mask(labels): 得到合法的positive的mask, 即2D的矩阵,[a, p], a!=p and a和p相同labels ------------------------------------------------ labels: 标签数据,shape = (batch_size, ) mask: 合法的positive mask, shape = (batch_size, batch_size) indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool) indices_not_equal = tf.logical_not(indices_equal) labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1)) mask = tf.logical_and(indices_not_equal, labels_equal)
def _get_anchor_negative_triplet_mask(labels): 得到negative的2D mask, [a, n] 只需a, n不同且有不同的labels ------------------------------------------------ labels: 标签数据,shape = (batch_size, ) mask: negative mask, shape = (batch_size, batch_size) labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1)) mask = tf.logical_not(labels_equal)
def batch_hard_triplet_loss(labels, embeddings, margin, squared=False): batch hard triplet loss of a batch, 每个样本最大的positive距离 - 对应样本最小的negative距离 ------------------------------------ labels: 标签数据,shape = (batch_size,) embeddings: 提取的特征向量, shape = (batch_size, vector_size) triplet_loss: scalar, 一个batch的损失值 pairwise_distances = _pairwise_distance(embeddings) mask_anchor_positive = _get_anchor_positive_triplet_mask(labels) mask_anchor_positive = tf.to_float(mask_anchor_positive) anchor_positive_dist = tf.multiply(mask_anchor_positive, pairwise_distances) hardest_positive_dist = tf.reduce_max(anchor_positive_dist, axis=1, keepdims=True) tf.summary.scalar("hardest_positive_dis", tf.reduce_mean(hardest_positive_dist))'''取每一行最小值得时候,因为invalid [a, n]置为了0, 所以不能直接取,这里对应invalid位置加上每一行的最大值即可,然后再取最小的值''' mask_anchor_negative = _get_anchor_negative_triplet_mask(labels) mask_anchor_negative = tf.to_float(mask_anchor_negative) max_anchor_negative_dist = tf.reduce_max(pairwise_distances, axis=1, keepdims=True) anchor_negative_dist = pairwise_distances + max_anchor_negative_dist * (1.0 - mask_anchor_negative) hardest_negative_dist = tf.reduce_min(anchor_negative_dist, axis=1, keepdims=True) tf.summary.scalar("hardest_negative_dist", tf.reduce_mean(hardest_negative_dist)) triplet_loss = tf.maximum(hardest_positive_dist - hardest_negative_dist + margin, 0.0) triplet_loss = tf.reduce_mean(triplet_loss)
"triplet_strategy": "batch_all",
args = parser.parse_args(argv[1:]) tf.logging.info("创建模型....")with open(args.model_config) as f: config = tf.estimator.RunConfig(model_dir=args.model_dir, tf_random_seed=100) cls = tf.estimator.Estimator(model_fn=my_model, config=config, params=params) tf.logging.info("开始训练模型,共{} epochs....".format(params['num_epochs'])) cls.train(input_fn = lambda: train_input_fn(args.data_dir, params)) tf.logging.info("测试集评价模型....") res = cls.evaluate(input_fn = lambda: test_input_fn(args.data_dir, params)) print("评价---{} : {}".format(key, res[key]))
def my_model(features, labels, mode, params): --------------------------------- features: 输入,shape = (batch_size, 784) labels: 输出,shape = (batch_size, ) is_training = (mode == tf.estimator.ModeKeys.TRAIN) images = tf.reshape(images, shape=[-1, params['image_size'], params['image_size'], 1]) with tf.variable_scope("model"): embeddings = build_model(is_training, images, params) if mode == tf.estimator.ModeKeys.PREDICT: predictions = {'embeddings': embeddings}return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) labels = tf.cast(labels, tf.int64)if params['triplet_strategy'] == 'batch_all': loss, fraction = batch_all_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])elif params['triplet_strategy'] == 'batch_hard': loss = batch_hard_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])raise ValueError("triplet_strategy 配置不正确: {}".format(params['triplet_strategy'])) embedding_mean_norm = tf.reduce_mean(tf.norm(embeddings, axis=1)) tf.summary.scalar("embedding_mean_norm", embedding_mean_norm)with tf.variable_scope("metrics"): eval_metric_ops = {'embedding_mean_norm': tf.metrics.mean(embedding_mean_norm)}if params['triplet_strategy'] == 'batch_all': eval_metric_ops['fraction_positive_triplets'] = tf.metrics.mean(fraction)if mode == tf.estimator.ModeKeys.EVAL:return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=eval_metric_ops) tf.summary.scalar('loss', loss)if params['triplet_strategy'] == "batch_all": tf.summary.scalar('fraction_positive_triplets', fraction) tf.summary.image('train_image', images, max_outputs=1) optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate']) global_step = tf.train.get_global_step()if params['use_batch_norm']:'''如果使用BN,需要估计batch上的均值和方差,tf.get_collection(tf.GraphKeys.UPDATE_OPS)就可以得到 tf.control_dependencies计算完之后再进行里面的操作with tf.control_dependencies(tf.get_collection(tf.GraphKeys.UPDATE_OPS)): train_op = optimizer.minimize(loss, global_step=global_step) train_op = optimizer.minimize(loss, global_step=global_step)return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
def build_model(is_training, images, params): ---------------------------- is_training: bool, 是否是训练阶段,可以从mode中判断 images: (batch_size, 28*28*1), 输入mnist数据 out: 输出的embeddings, shape = (batch_size, 64) num_channel = params['num_channels'] bn_momentum = params['bn_momentum'] channels = [num_channel, num_channel * 2]for i, c in enumerate(channels):with tf.variable_scope("block_{}".format(i)): out = tf.layers.conv2d(out, c, 3, padding='same')if params['use_batch_norm']: out = tf.layers.batch_normalization(out, momentum=bn_momentum, training=is_training) out = tf.layers.max_pooling2d(out, 2, 2)assert out.shape[1:] == [7, 7, num_channel * 2] out = tf.reshape(out, [-1, 7*7*num_channel*2])with tf.variable_scope("fc_1"): out = tf.layers.dense(out, params['embedding_size'])
args = parser.parse_args(argv[1:])with open(args.model_config) as f: tf.logging.info("创建模型....") config = tf.estimator.RunConfig(model_dir=args.model_dir, tf_random_seed=100) cls = tf.estimator.Estimator(model_fn=my_model, config=config, params=params) tf.logging.info("预测....") predictions = cls.predict(input_fn=lambda: test_input_fn(args.data_dir, params)) embeddings = np.zeros((10000, params['embedding_size']))for i, p in enumerate(predictions): embeddings[i] = p['embeddings'] tf.logging.info("embeddings shape: {}".format(embeddings.shape))
with tf.Session() as sess: dataset = mnist_dataset.test(args.data_dir) dataset = dataset.map(lambda img, lab: lab) dataset = dataset.batch(10000) labels_tensor = dataset.make_one_shot_iterator().get_next() labels = sess.run(labels_tensor) np.savetxt(os.path.join(args.log_dir, 'metadata.tsv'), labels, fmt='%d')
shutil.copy(args.sprite_filename, args.log_dir)with tf.Session() as sess: embedding_var = tf.Variable(embeddings, name="mnist_embeddings") sess.run(embedding_var.initializer) saver.save(sess, os.path.join(args.log_dir, 'embeddings.ckpt')) summary_writer = tf.summary.FileWriter(args.log_dir) config = projector.ProjectorConfig() embedding = config.embeddings.add() embedding.tensor_name = embedding_var.name embedding.metadata_path = 'metadata.tsv' embedding.sprite.image_path = 'mnist_10k_sprite.png' embedding.sprite.single_image_dim.extend([28, 28]) projector.visualize_embeddings(summary_writer, config)
Reference
https://omoindrot.github.io/triplet-loss#batch-hard-strategy
https://github.com/omoindrot/tensorflow-triplet-loss
https://github.com/lawlite19/Blog-Back-Up
https://github.com/omoindrot/tensorflow-triplet-loss/issues/6
https://github.com/davidsandberg/facenet(原版)
https://blog.csdn.net/u013066730/article/details/88797338