- 数据结构基础
-
数据结构基础
LoD-Tensor
SelectedRow
Tensor ids;
ids.numel()就是 Tensor 元素的总和,也就是 TensorShape 相乘的结果。
比如的 ids.dims() = [1,3,5], 那么 ids.numel() 就是 15.
假设 EmbeddingTable 的大小为 10^9 32,如果以 SelectRows 去存储,SelectRow 不会在一开始就生成一张 10^9 32 的二维矩阵。
首先这里的 ids 已经是 feature_id % dict_size 之后的值,假设为:
[828, 12 , 2376, 8910]
SelectedRows 会将其转化为一个稠密的向量表示,也就是一个连续的Tensor, shape为[4, 32],为了将稀疏的 ids 转化为稠密的 TensorIndex,需要一张影射表,id_to_index。ids 通过 id_to_index 转化为 Tensor 的 Index。void SelectedRows::Get(const framework::Tensor& ids, framework::Tensor* value,
bool auto_grown, bool is_test, bool fliter_index, bool need_count) {
PADDLE_ENFORCE(value->IsInitialized(),
"The value tensor should be initialized.");
// value_width 就是embedding_size。比如32或64
int64_t value_width = value_->numel() / value_->dims()[0];
// 将 ids 展开并拷贝到 all_ids
std::vector<int64_t> all_ids(ids.numel());
auto* ids_data = ids.data<int64_t>();
const size_t ids_num = ids.numel();
for (auto i = 0; i < ids_num; ++i) {
all_ids[i] = ids_data[i];
}
// 根据 all_ids 查询 index,获取对应的feature在 value_ 里的 index,存在 id_indexes 里
std::vector<int64_t> id_indexes(ids.numel());
GetIndexsByIds(all_ids, &id_indexes, auto_grown, is_test, fliter_index, need_count);
// table_height 是当前稠密embedding表的长度,也就是 value_ 的 index 的最大值
int64_t table_height = value_->dims()[0];
// 遍历 id_indexes,将对应的tensor信息取出来放在value里,
// value是一个存放输出结果的Tensor,所有查询结果的tensor是打平放在value里的。
for (int i = 0; i < ids_num; ++i) {
auto id = ids.data<int64_t>()[i];
int64_t index = id_indexes[i];
// index 应该是要小于 table_height 的
PADDLE_ENFORCE_LT(index, table_height,
"index should be less then table height");
// index 如果小于0,说明是第一次访问embedding table,此时将这个index 对应的 32 维向量加进去
if (index < 0) {
VLOG(5) << "id " << id << " not in the table, return 0";
framework::VisitDataType(
value_->type(),
/*
* 初始化一个 Tensor
* @value: 需要初始化数据的指针起始地址;
* @i*value_width: 从起始地址开始的偏移量;
* @value_width: 需要填充的长度
* @0.0: 初始化的值。
*/
TensorFillVisitor(value, i * value_width, value_width, 0.0));
} else {
framework::VisitDataType(
value_->type(),
/*
* 拷贝Tensor
* @value: 目的地址的指针
* @i*value_width: 目的地址偏移量
* @*value_.get(): 源数据指针地址
* @index * value_width: 源数据地址偏移量
* @value_width: 需要拷贝的数据长度
*/
TensorCopyVisitor(value, i * value_width, *value_.get(),
index * value_width, value_width));
}
}
}
再看看 AutoGrownIndex, 自动扩容 SelectedRows,主要会涉及并发访问的加锁。
int64_t SelectedRows::AutoGrownIndex(int64_t key, bool auto_grown, bool is_test) { // 加读锁 rwlock_->RDLock(); auto iter = id_to_index_.find(key); if (iter == id_to_index_.end()) { // 没查到,先释放读锁 rwlock_->UNLock(); ... // 然后加写锁 rwlock_->WRLock(); auto map_size = id_to_index_.size(); auto vector_size = rows_.size(); ... // 再查一遍 auto write_iter = id_to_index_.find(key); // 没查到,需要扩容 if (write_iter == id_to_index_.end()) { int row_num = rows_.size(); // 新增一个 key 到 rows_ 中 rows_.push_back(key); auto index = static_cast<int64_t>(rows_.size() - 1); id_to_index_[key] = index; rwlock_->UNLock(); return index; } else { // 查到了,直接返回 auto index = write_iter->second; rwlock_->UNLock(); return index; } } else { // 查到了,解锁返回 auto index = iter->second; rwlock_->UNLock(); return index; } }
改进:对 id_to_index 做 shard, 释放锁的逻辑
class SelectedRows { public: SelectedRows(const std::vector<int64_t>& rows, const int64_t& height) : rows_(rows), height_(height) { value_.reset(new Tensor()); rwlock_.reset(new RWLock); } SelectedRows() { height_ = 0; value_.reset(new Tensor()); rwlock_.reset(new RWLock); } platform::Place place() const { return value_->place(); } const Tensor& value() const { return *value_; } Tensor* mutable_value() { return value_.get(); } int64_t height() const { return height_; } void set_height(int64_t height) { height_ = height; } const Vector<int64_t>& rows() const { return rows_; } Vector<int64_t>* mutable_rows() { return &rows_; } void set_rows(const Vector<int64_t>& rows) { rows_ = rows; } /* * @brief Get the index of key in rows * * @return -1 if the key does not exists. */ int64_t Index(int64_t key) const { auto it = std::find(rows_.begin(), rows_.end(), key); if (it == rows_.end()) { return -1; } return static_cast<int64_t>(std::distance(rows_.begin(), it)); } /* * @brief whether has the specified key in the table. * * @return true if the key is exists. */ bool HasKey(int64_t key) const; /* * @brief Get value by the key list. * Note!!! this interface is only used when selected_rows is used as * parameters * for distribute lookup table. * * @return a list of pair which contains the non-exists key and the index in * the value */ void Get(const framework::Tensor& ids, framework::Tensor* value, bool auto_grown = false, bool is_test = false); /* * @brief Get the index of the key from id_to_index_ map. If the key not * exist, * add the key into id_to_index_. * * Note!!! this interface is only used when selected_rows is used as * parameters * for distribute lookup table. * * @return index of the key. */ int64_t AutoGrownIndex(int64_t key, bool auto_grown, bool is_test = false); /* * @brief Get the index of the key from id_to_index_ map. */ inline int64_t GetIndexFromId(int64_t key) { auto iter = id_to_index_.find(key); if (iter == id_to_index_.end()) { return -1; } else { return iter->second; } } void SyncIndex(); /* * @brief Get complete Dims before */ DDim GetCompleteDims() const { std::vector<int64_t> dims = vectorize(value_->dims()); dims[0] = height_; return make_ddim(dims); } private: // Notice: rows can be duplicate. We can have {0, 4, 7, 0, 5, 7, 9} here. // SelectedRows are simply concated when adding together. Until a // SelectedRows add a Tensor, will the duplicate rows be handled. Vector<int64_t> rows_; std::unordered_map<int64_t, int64_t> id_to_index_; // should not be used when rows_ has duplicate member std::unique_ptr<Tensor> value_{nullptr}; int64_t height_; // height indicates the underline tensor's height std::unique_ptr<RWLock> rwlock_{nullptr}; };
DataShard
每个 DataShard 保存一部分的 id_to_index,用大小固定的 map 存储 id_to_index,
每次获取的真实的index 时:
index = shard_id * shard_size + shard_offset
shard_num 是作为环境变量传入的.
shard_size=dict_size/shard_num// selectRows 已经建立的情况下,重新根据 id_to_index 生成 DataShards, // 此时 shard_size=dict_size/shard_num // dict_size 就是 10^9 void ReconstructShardIndexAfterLoad(){ int64_t shard_size = value_->dims()[0] / shard_num_; }
inline int64_t DataShard::GetIndexById(int64_t id, bool auto_grown, bool is_test, bool fliter_index, bool need_count) { rwlock_->RDLock(); auto iter = id_to_offset_.find(id); if (iter == id_to_offset_.end()) { rwlock_->UNLock(); if (auto_grown) { rwlock_->WRLock(); if (need_count) { id_to_count_[id] += 1; } if (fliter_index) { if (id_to_count_[id] < 10) { VLOG(3) << "fliter key " << id; rwlock_->UNLock(); return -1; } } auto shard_offset = id_to_offset_.size(); PADDLE_ENFORCE_LT(shard_offset, shard_size_, "shard is full!"); id_to_offset_[id] = shard_offset; int64_t offset = shard_id_ * shard_size_ + shard_offset; rwlock_->UNLock(); return offset; } else { return -1; } } else { int64_t offset = shard_id_ * shard_size_ + iter->second; rwlock_->UNLock(); return offset; } }
数据Reader
Reader从维度上分为 BatchReader 和 SampleReader, 前者返回一个batch 的数据,后者返回单条样本的数据。数据的格式为 Numpy Array 或 LoDTensor。
从网络上分为同步Feed的Reader和异步的PyReader。
同步异步是相对于模型训练预测而言的。
同步模式一般只推荐调试模型时用,因为速度很慢。