讲 loss 的 github 地址是https://github.com/lawlite19/Blog-Back-Up/blob/master/code/triplet-loss/triplet_loss_np.py#L103

Triplet-Loss原理及其实现、应用 - 图10

Triplet-Loss原理及其实现、应用 - 图11
Triplet-Loss原理及其实现、应用 - 图12

Triplet-Loss原理及其实现、应用 - 图13

Triplet-Loss原理及其实现、应用 - 图14

Triplet-Loss原理及其实现、应用 - 图15

Triplet-Loss原理及其实现、应用 - 图16

  1. def _pairwise_distance(embeddings, squared=False): ------------------------------------------ embedding: 特征向量, 大小(batch_size, vector_size distances: 两两embeddings的距离矩阵,大小 batch_size, batch_size dot_product = tf.matmul(embeddings, tf.transpose(embeddings)) square_norm = tf.diag_part(dot_product) distances = tf.expand_dims(square_norm, axis=1) - 2.0 * dot_product + tf.expand_dims(square_norm, axis=0) distances = tf.maximum(distances, 0.0) distances = distances + mask * 1e-16 distances = tf.sqrt(distances) distances = distances * (1.0 - mask)

Triplet-Loss原理及其实现、应用 - 图17

def _get_triplet_mask(labels):       得到一个3D的mask [a, p, n], 对应triplet(a, p, n)是valid的位置是True       ----------------------------------          labels: 对应训练数据的labels, shape = (batch_size,)          mask: 3D,shape = (batch_size, batch_size, batch_size)    indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)    indices_not_equal = tf.logical_not(indices_equal)    i_not_equal_j = tf.expand_dims(indices_not_equal, 2)     i_not_equal_k = tf.expand_dims(indices_not_equal, 1)    j_not_equal_k = tf.expand_dims(indices_not_equal, 0)    distinct_indices = tf.logical_and(tf.logical_and(i_not_equal_j, i_not_equal_k), j_not_equal_k)    label_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))    i_equal_j = tf.expand_dims(label_equal, 2)    i_equal_k = tf.expand_dims(label_equal, 1)    valid_labels = tf.logical_and(i_equal_j, tf.logical_not(i_equal_k))    mask = tf.logical_and(distinct_indices, valid_labels)

Triplet-Loss原理及其实现、应用 - 图18

Triplet-Loss原理及其实现、应用 - 图19

def batch_all_triplet_loss(labels, embeddings, margin, squared=False):       -------------------------------          labels:     标签数据,shape = (batch_size,)          embeddings: 提取的特征向量, shape = (batch_size, vector_size)          triplet_loss: scalar, 一个batch的损失值          fraction_postive_triplets : valid的triplets占的比例    pairwise_dis = _pairwise_distance(embeddings, squared=squared)    anchor_positive_dist = tf.expand_dims(pairwise_dis, 2)assert anchor_positive_dist.shape[2] == 1, "{}".format(anchor_positive_dist.shape)    anchor_negative_dist = tf.expand_dims(pairwise_dis, 1)assert anchor_negative_dist.shape[1] == 1, "{}".format(anchor_negative_dist.shape)    triplet_loss = anchor_positive_dist - anchor_negative_dist + margin    mask = _get_triplet_mask(labels)    triplet_loss = tf.multiply(mask, triplet_loss)    triplet_loss = tf.maximum(triplet_loss, 0.0)    valid_triplets = tf.to_float(tf.greater(triplet_loss, 1e-16))    num_positive_triplets = tf.reduce_sum(valid_triplets)    num_valid_triplets = tf.reduce_sum(mask)    fraction_postive_triplets = num_positive_triplets / (num_valid_triplets + 1e-16)    triplet_loss = tf.reduce_sum(triplet_loss) / (num_positive_triplets + 1e-16)return triplet_loss, fraction_postive_triplets

Triplet-Loss原理及其实现、应用 - 图20


def _get_anchor_positive_triplet_mask(labels):       得到合法的positive的mask, 即2D的矩阵,[a, p], a!=p and a和p相同labels       ------------------------------------------------          labels: 标签数据,shape = (batch_size, )          mask: 合法的positive mask, shape = (batch_size, batch_size)    indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)    indices_not_equal = tf.logical_not(indices_equal)                     labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))      mask = tf.logical_and(indices_not_equal, labels_equal)

Triplet-Loss原理及其实现、应用 - 图21

def _get_anchor_negative_triplet_mask(labels):       得到negative的2D mask, [a, n] 只需a, n不同且有不同的labels       ------------------------------------------------          labels: 标签数据,shape = (batch_size, )          mask: negative mask, shape = (batch_size, batch_size)    labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))    mask = tf.logical_not(labels_equal)

Triplet-Loss原理及其实现、应用 - 图22

def batch_hard_triplet_loss(labels, embeddings, margin, squared=False):       batch hard triplet loss of a batch, 每个样本最大的positive距离 - 对应样本最小的negative距离       ------------------------------------          labels:     标签数据,shape = (batch_size,)          embeddings: 提取的特征向量, shape = (batch_size, vector_size)          triplet_loss: scalar, 一个batch的损失值    pairwise_distances = _pairwise_distance(embeddings)    mask_anchor_positive = _get_anchor_positive_triplet_mask(labels)    mask_anchor_positive = tf.to_float(mask_anchor_positive)    anchor_positive_dist = tf.multiply(mask_anchor_positive, pairwise_distances)    hardest_positive_dist = tf.reduce_max(anchor_positive_dist, axis=1, keepdims=True)      tf.summary.scalar("hardest_positive_dis", tf.reduce_mean(hardest_positive_dist))'''取每一行最小值得时候,因为invalid [a, n]置为了0, 所以不能直接取,这里对应invalid位置加上每一行的最大值即可,然后再取最小的值'''    mask_anchor_negative = _get_anchor_negative_triplet_mask(labels)    mask_anchor_negative = tf.to_float(mask_anchor_negative)    max_anchor_negative_dist = tf.reduce_max(pairwise_distances, axis=1, keepdims=True)       anchor_negative_dist = pairwise_distances + max_anchor_negative_dist * (1.0 - mask_anchor_negative)      hardest_negative_dist = tf.reduce_min(anchor_negative_dist, axis=1, keepdims=True)    tf.summary.scalar("hardest_negative_dist", tf.reduce_mean(hardest_negative_dist))    triplet_loss = tf.maximum(hardest_positive_dist - hardest_negative_dist + margin, 0.0)    triplet_loss = tf.reduce_mean(triplet_loss)

"triplet_strategy": "batch_all",

    args = parser.parse_args(argv[1:])    tf.logging.info("创建模型....")with open(args.model_config) as f:    config = tf.estimator.RunConfig(model_dir=args.model_dir, tf_random_seed=100)      cls = tf.estimator.Estimator(model_fn=my_model, config=config, params=params)      tf.logging.info("开始训练模型,共{} epochs....".format(params['num_epochs']))    cls.train(input_fn = lambda: train_input_fn(args.data_dir, params))                tf.logging.info("测试集评价模型....")    res = cls.evaluate(input_fn = lambda: test_input_fn(args.data_dir, params))            print("评价---{} : {}".format(key, res[key]))

Triplet-Loss原理及其实现、应用 - 图23

def my_model(features, labels, mode, params):       ---------------------------------          features: 输入,shape = (batch_size, 784)          labels:   输出,shape = (batch_size, )    is_training = (mode == tf.estimator.ModeKeys.TRAIN)    images = tf.reshape(images, shape=[-1, params['image_size'], params['image_size'], 1])  with tf.variable_scope("model"):        embeddings = build_model(is_training, images, params)  if mode == tf.estimator.ModeKeys.PREDICT:             predictions = {'embeddings': embeddings}return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)    labels = tf.cast(labels, tf.int64)if params['triplet_strategy'] == 'batch_all':        loss, fraction = batch_all_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])elif params['triplet_strategy'] == 'batch_hard':        loss = batch_hard_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])raise ValueError("triplet_strategy 配置不正确: {}".format(params['triplet_strategy']))    embedding_mean_norm = tf.reduce_mean(tf.norm(embeddings, axis=1))         tf.summary.scalar("embedding_mean_norm", embedding_mean_norm)with tf.variable_scope("metrics"):        eval_metric_ops = {'embedding_mean_norm': tf.metrics.mean(embedding_mean_norm)}if params['triplet_strategy'] == 'batch_all':            eval_metric_ops['fraction_positive_triplets'] = tf.metrics.mean(fraction)if mode == tf.estimator.ModeKeys.EVAL:return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=eval_metric_ops)    tf.summary.scalar('loss', loss)if params['triplet_strategy'] == "batch_all":        tf.summary.scalar('fraction_positive_triplets', fraction)    tf.summary.image('train_image', images, max_outputs=1)       optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])    global_step = tf.train.get_global_step()if params['use_batch_norm']:'''如果使用BN,需要估计batch上的均值和方差,tf.get_collection(tf.GraphKeys.UPDATE_OPS)就可以得到        tf.control_dependencies计算完之后再进行里面的操作with tf.control_dependencies(tf.get_collection(tf.GraphKeys.UPDATE_OPS)):            train_op = optimizer.minimize(loss, global_step=global_step)        train_op = optimizer.minimize(loss, global_step=global_step)return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

Triplet-Loss原理及其实现、应用 - 图24

def build_model(is_training, images, params):       ----------------------------          is_training: bool, 是否是训练阶段,可以从mode中判断          images:     (batch_size, 28*28*1), 输入mnist数据          out: 输出的embeddings, shape = (batch_size, 64)    num_channel = params['num_channels']    bn_momentum = params['bn_momentum']    channels = [num_channel, num_channel * 2]for i, c in enumerate(channels):with tf.variable_scope("block_{}".format(i)):            out = tf.layers.conv2d(out, c, 3, padding='same')if params['use_batch_norm']:                out = tf.layers.batch_normalization(out, momentum=bn_momentum, training=is_training)            out = tf.layers.max_pooling2d(out, 2, 2)assert out.shape[1:] == [7, 7, num_channel * 2]    out = tf.reshape(out, [-1, 7*7*num_channel*2])with tf.variable_scope("fc_1"):        out = tf.layers.dense(out, params['embedding_size'])

Triplet-Loss原理及其实现、应用 - 图25


Triplet-Loss原理及其实现、应用 - 图26

Triplet-Loss原理及其实现、应用 - 图27

Triplet-Loss原理及其实现、应用 - 图28

Triplet-Loss原理及其实现、应用 - 图29

Triplet-Loss原理及其实现、应用 - 图30

Triplet-Loss原理及其实现、应用 - 图31

Triplet-Loss原理及其实现、应用 - 图32

Triplet-Loss原理及其实现、应用 - 图33

Triplet-Loss原理及其实现、应用 - 图34

Triplet-Loss原理及其实现、应用 - 图35

Triplet-Loss原理及其实现、应用 - 图36

Triplet-Loss原理及其实现、应用 - 图37


    args = parser.parse_args(argv[1:])with open(args.model_config) as f:    tf.logging.info("创建模型....")    config = tf.estimator.RunConfig(model_dir=args.model_dir, tf_random_seed=100)      cls = tf.estimator.Estimator(model_fn=my_model, config=config, params=params)      tf.logging.info("预测....")    predictions = cls.predict(input_fn=lambda: test_input_fn(args.data_dir, params))    embeddings = np.zeros((10000, params['embedding_size']))for i, p in enumerate(predictions):        embeddings[i] = p['embeddings']    tf.logging.info("embeddings shape: {}".format(embeddings.shape))

Triplet-Loss原理及其实现、应用 - 图38

with tf.Session() as sess:        dataset = mnist_dataset.test(args.data_dir)        dataset = dataset.map(lambda img, lab: lab)        dataset = dataset.batch(10000)        labels_tensor = dataset.make_one_shot_iterator().get_next()        labels = sess.run(labels_tensor)       np.savetxt(os.path.join(args.log_dir, 'metadata.tsv'), labels, fmt='%d')

Triplet-Loss原理及其实现、应用 - 图39

    shutil.copy(args.sprite_filename, args.log_dir)with tf.Session() as sess:        embedding_var = tf.Variable(embeddings, name="mnist_embeddings")        sess.run(embedding_var.initializer)        saver.save(sess, os.path.join(args.log_dir, 'embeddings.ckpt'))        summary_writer = tf.summary.FileWriter(args.log_dir)        config = projector.ProjectorConfig()        embedding = config.embeddings.add()        embedding.tensor_name = embedding_var.name        embedding.metadata_path = 'metadata.tsv'        embedding.sprite.image_path = 'mnist_10k_sprite.png'        embedding.sprite.single_image_dim.extend([28, 28])        projector.visualize_embeddings(summary_writer, config)

Triplet-Loss原理及其实现、应用 - 图40

Triplet-Loss原理及其实现、应用 - 图41

Triplet-Loss原理及其实现、应用 - 图42

Triplet-Loss原理及其实现、应用 - 图43

Reference

https://omoindrot.github.io/triplet-loss#batch-hard-strategy

https://github.com/omoindrot/tensorflow-triplet-loss

https://github.com/lawlite19/Blog-Back-Up

https://github.com/omoindrot/tensorflow-triplet-loss/issues/6

https://github.com/davidsandberg/facenet(原版)
https://blog.csdn.net/u013066730/article/details/88797338