本文仅实现Negative Sampling的Skip-gram模型。文档参考https://wmathor.com/index.php/archives/1435/

    • Skip-gram图示

    v2-ca81e19caa378cee6d4ba6d867f4fc7c_r.jpg.png
    Word2Vec - 图2

    • Negative Sampling

    Word2Vec - 图3

    • 代码

    导包

    1. import torch
    2. import torch.nn as nn
    3. import torch.nn.functional as F
    4. import torch.utils.data as tud
    5. from collections import Counter
    6. import numpy as np
    7. import random
    8. import scipy
    9. from sklearn.metrics.pairwise import cosine_similarity
    10. random.seed(1)
    11. np.random.seed(1)
    12. torch.manual_seed(1)
    13. C = 3 # context window
    14. K = 15 # number of negative samples
    15. epochs = 2
    16. MAX_VOCAB_SIZE = 10000
    17. EMBEDDING_SIZE = 100
    18. batch_size = 32
    19. lr = 0.2

    处理数据

    1. with open('text8.train.txt') as f:
    2. text = f.read() # 得到文本内容
    3. text = text.lower().split() # 分割成单词列表
    4. vocab_dict = dict(Counter(text).most_common(MAX_VOCAB_SIZE - 1)) # 得到单词字典表,key是单词,value是次数
    5. vocab_dict['<UNK>'] = len(text) - np.sum(list(vocab_dict.values())) # 把不常用的单词都编码为"<UNK>"
    6. word2idx = {word:i for i, word in enumerate(vocab_dict.keys())}
    7. idx2word = {i:word for i, word in enumerate(vocab_dict.keys())}
    8. word_counts = np.array([count for count in vocab_dict.values()], dtype=np.float32)
    9. word_freqs = word_counts / np.sum(word_counts)
    10. word_freqs = word_freqs ** (3./4.)
    11. class WordEmbeddingDataset(tud.Dataset):
    12. def __init__(self, text, word2idx, word_freqs):
    13. ''' text: a list of words, all text from the training dataset
    14. word2idx: the dictionary from word to index
    15. word_freqs: the frequency of each word
    16. '''
    17. super(WordEmbeddingDataset, self).__init__() # #通过父类初始化模型,然后重写两个方法
    18. self.text_encoded = [word2idx.get(word, word2idx['<UNK>']) for word in text] # 把单词数字化表示。如果不在词典中,也表示为unk
    19. self.text_encoded = torch.LongTensor(self.text_encoded) # nn.Embedding需要传入LongTensor类型
    20. self.word2idx = word2idx
    21. self.word_freqs = torch.Tensor(word_freqs)
    22. def __len__(self):
    23. return len(self.text_encoded) # 返回所有单词的总数,即item的总数
    24. def __getitem__(self, idx):
    25. ''' 这个function返回以下数据用于训练
    26. - 中心词
    27. - 这个单词附近的positive word
    28. - 随机采样的K个单词作为negative word
    29. '''
    30. center_words = self.text_encoded[idx] # 取得中心词
    31. pos_indices = list(range(idx - C, idx)) + list(range(idx + 1, idx + C + 1)) # 先取得中心左右各C个词的索引
    32. pos_indices = [i % len(self.text_encoded) for i in pos_indices] # 为了避免索引越界,所以进行取余处理
    33. pos_words = self.text_encoded[pos_indices] # tensor(list)
    34. neg_words = torch.multinomial(self.word_freqs, K * pos_words.shape[0], True)
    35. # torch.multinomial作用是对self.word_freqs做K * pos_words.shape[0]次取值,输出的是self.word_freqs对应的下标
    36. # 取样方式采用有放回的采样,并且self.word_freqs数值越大,取样概率越大
    37. # 每采样一个正确的单词(positive word),就采样K个错误的单词(negative word),pos_words.shape[0]是正确单词数量
    38. # while 循环是为了保证 neg_words中不能包含背景词
    39. while len(set(pos_indices.numpy().tolist()) & set(neg_words.numpy().tolist())) > 0:
    40. neg_words = torch.multinomial(self.word_freqs, K * pos_words.shape[0], True)
    41. return center_words, pos_words, neg_words
    42. dataset = WordEmbeddingDataset(text, word2idx, word_freqs)
    43. dataloader = tud.DataLoader(dataset, batch_size, shuffle=True)

    建立模型
    注:对于任一一个词,它既有可能作为中心词出现,也有可能作为背景词出现,所以每个词需要用两个向量去表示。in_embed 训练出来的权重就是每个词作为中心词的权重。out_embed 训练出来的权重就是每个词作为背景词的权重。

    1. class EmbeddingModel(nn.Module):
    2. def __init__(self, vocab_size, embed_size):
    3. super(EmbeddingModel, self).__init__()
    4. self.vocab_size = vocab_size
    5. self.embed_size = embed_size
    6. self.in_embed = nn.Embedding(self.vocab_size, self.embed_size)
    7. self.out_embed = nn.Embedding(self.vocab_size, self.embed_size)
    8. def forward(self, input_labels, pos_labels, neg_labels):
    9. ''' input_labels: center words, [batch_size]
    10. pos_labels: positive words, [batch_size, (window_size * 2)]
    11. neg_labels:negative words, [batch_size, (window_size * 2 * K)]
    12. return: loss, [batch_size]
    13. '''
    14. input_embedding = self.in_embed(input_labels) # [batch_size, embed_size]
    15. pos_embedding = self.out_embed(pos_labels)# [batch_size, (window * 2), embed_size]
    16. neg_embedding = self.out_embed(neg_labels) # [batch_size, (window * 2 * K), embed_size]
    17. input_embedding = input_embedding.unsqueeze(2) # [batch_size, embed_size, 1]
    18. pos_dot = torch.bmm(pos_embedding, input_embedding) # [batch_size, (window * 2), 1]
    19. pos_dot = pos_dot.squeeze(2) # [batch_size, (window * 2)]
    20. neg_dot = torch.bmm(neg_embedding, -input_embedding) # [batch_size, (window * 2 * K), 1]
    21. neg_dot = neg_dot.squeeze(2) # batch_size, (window * 2 * K)]
    22. log_pos = F.logsigmoid(pos_dot).sum(1) # .sum()结果只为一个数,.sum(1)结果是一维的张量
    23. log_neg = F.logsigmoid(neg_dot).sum(1)
    24. loss = log_pos + log_neg
    25. return -loss
    26. def input_embedding(self):
    27. return self.in_embed.weight.detach().numpy()
    28. model = EmbeddingModel(MAX_VOCAB_SIZE, EMBEDDING_SIZE)
    29. optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    训练

    1. for e in range(1):
    2. for i, (input_labels, pos_labels, neg_labels) in enumerate(dataloader):
    3. input_labels = input_labels.long()
    4. pos_labels = pos_labels.long()
    5. neg_labels = neg_labels.long()
    6. optimizer.zero_grad()
    7. loss = model(input_labels, pos_labels, neg_labels).mean()
    8. loss.backward()
    9. optimizer.step()
    10. if i % 100 == 0:
    11. print('epoch', e, 'iteration', i, loss.item())
    12. embedding_weights = model.input_embedding()
    13. torch.save(model.state_dict(), "embedding-{}.th".format(EMBEDDING_SIZE))

    应用:查找最相似的词

    1. def find_nearest(word):
    2. index = word2idx[word]
    3. embedding = embedding_weights[index]
    4. cos_dis = np.array([scipy.spatial.distance.cosine(e, embedding) for e in embedding_weights])
    5. return [idx2word[i] for i in cos_dis.argsort()[:10]]