• 数据结构基础
  • 数据Reader

    数据结构基础

    LoD-Tensor

    LoD-Tensor

    SelectedRow

    Tensor ids;
    ids.numel()就是 Tensor 元素的总和,也就是 TensorShape 相乘的结果。
    比如的 ids.dims() = [1,3,5], 那么 ids.numel() 就是 15.
    假设 EmbeddingTable 的大小为 10^9 32,如果以 SelectRows 去存储,SelectRow 不会在一开始就生成一张 10^9 32 的二维矩阵。
    首先这里的 ids 已经是 feature_id % dict_size 之后的值,假设为:
    [828, 12 , 2376, 8910]
    SelectedRows 会将其转化为一个稠密的向量表示,也就是一个连续的Tensor, shape为[4, 32],为了将稀疏的 ids 转化为稠密的 TensorIndex,需要一张影射表,id_to_index。ids 通过 id_to_index 转化为 Tensor 的 Index。

    1. void SelectedRows::Get(const framework::Tensor& ids, framework::Tensor* value,
    2. bool auto_grown, bool is_test, bool fliter_index, bool need_count) {
    3. PADDLE_ENFORCE(value->IsInitialized(),
    4. "The value tensor should be initialized.");
    5. // value_width 就是embedding_size。比如32或64
    6. int64_t value_width = value_->numel() / value_->dims()[0];
    7. // 将 ids 展开并拷贝到 all_ids
    8. std::vector<int64_t> all_ids(ids.numel());
    9. auto* ids_data = ids.data<int64_t>();
    10. const size_t ids_num = ids.numel();
    11. for (auto i = 0; i < ids_num; ++i) {
    12. all_ids[i] = ids_data[i];
    13. }
    14. // 根据 all_ids 查询 index,获取对应的feature在 value_ 里的 index,存在 id_indexes 里
    15. std::vector<int64_t> id_indexes(ids.numel());
    16. GetIndexsByIds(all_ids, &id_indexes, auto_grown, is_test, fliter_index, need_count);
    17. // table_height 是当前稠密embedding表的长度,也就是 value_ 的 index 的最大值
    18. int64_t table_height = value_->dims()[0];
    19. // 遍历 id_indexes,将对应的tensor信息取出来放在value里,
    20. // value是一个存放输出结果的Tensor,所有查询结果的tensor是打平放在value里的。
    21. for (int i = 0; i < ids_num; ++i) {
    22. auto id = ids.data<int64_t>()[i];
    23. int64_t index = id_indexes[i];
    24. // index 应该是要小于 table_height 的
    25. PADDLE_ENFORCE_LT(index, table_height,
    26. "index should be less then table height");
    27. // index 如果小于0,说明是第一次访问embedding table,此时将这个index 对应的 32 维向量加进去
    28. if (index < 0) {
    29. VLOG(5) << "id " << id << " not in the table, return 0";
    30. framework::VisitDataType(
    31. value_->type(),
    32. /*
    33. * 初始化一个 Tensor
    34. * @value: 需要初始化数据的指针起始地址;
    35. * @i*value_width: 从起始地址开始的偏移量;
    36. * @value_width: 需要填充的长度
    37. * @0.0: 初始化的值。
    38. */
    39. TensorFillVisitor(value, i * value_width, value_width, 0.0));
    40. } else {
    41. framework::VisitDataType(
    42. value_->type(),
    43. /*
    44. * 拷贝Tensor
    45. * @value: 目的地址的指针
    46. * @i*value_width: 目的地址偏移量
    47. * @*value_.get(): 源数据指针地址
    48. * @index * value_width: 源数据地址偏移量
    49. * @value_width: 需要拷贝的数据长度
    50. */
    51. TensorCopyVisitor(value, i * value_width, *value_.get(),
    52. index * value_width, value_width));
    53. }
    54. }
    55. }

    再看看 AutoGrownIndex, 自动扩容 SelectedRows,主要会涉及并发访问的加锁。

    int64_t SelectedRows::AutoGrownIndex(int64_t key, bool auto_grown,
                                       bool is_test) {
    // 加读锁
    rwlock_->RDLock();
    auto iter = id_to_index_.find(key);
    if (iter == id_to_index_.end()) {
      // 没查到,先释放读锁
      rwlock_->UNLock();
      ...
      // 然后加写锁
      rwlock_->WRLock();
      auto map_size = id_to_index_.size();
      auto vector_size = rows_.size();
      ...
      // 再查一遍
      auto write_iter = id_to_index_.find(key);
      // 没查到,需要扩容
      if (write_iter == id_to_index_.end()) {
        int row_num = rows_.size();
        // 新增一个 key 到 rows_ 中
        rows_.push_back(key);
        auto index = static_cast<int64_t>(rows_.size() - 1);
        id_to_index_[key] = index;
        rwlock_->UNLock();
        return index;
      } else {
      // 查到了,直接返回
        auto index = write_iter->second;
        rwlock_->UNLock();
        return index;
      }
    } else {
      // 查到了,解锁返回
      auto index = iter->second;
      rwlock_->UNLock();
      return index;
    }
    }
    

    改进:对 id_to_index 做 shard, 释放锁的逻辑

    class SelectedRows {
    public:
    SelectedRows(const std::vector<int64_t>& rows, const int64_t& height)
        : rows_(rows), height_(height) {
      value_.reset(new Tensor());
      rwlock_.reset(new RWLock);
    }
    SelectedRows() {
      height_ = 0;
      value_.reset(new Tensor());
      rwlock_.reset(new RWLock);
    }
    platform::Place place() const { return value_->place(); }
    const Tensor& value() const { return *value_; }
    Tensor* mutable_value() { return value_.get(); }
    int64_t height() const { return height_; }
    void set_height(int64_t height) { height_ = height; }
    const Vector<int64_t>& rows() const { return rows_; }
    Vector<int64_t>* mutable_rows() { return &rows_; }
    void set_rows(const Vector<int64_t>& rows) { rows_ = rows; }
    /*
     * @brief Get the index of key in rows
     *
     * @return -1 if the key does not exists.
     */
    int64_t Index(int64_t key) const {
      auto it = std::find(rows_.begin(), rows_.end(), key);
      if (it == rows_.end()) {
        return -1;
      }
      return static_cast<int64_t>(std::distance(rows_.begin(), it));
    }
    /*
     * @brief whether has the specified key in the table.
     *
     * @return true if the key is exists.
     */
    bool HasKey(int64_t key) const;
    /*
     * @brief Get value by the key list.
     * Note!!! this interface is only used when selected_rows is used as
     * parameters
     * for distribute lookup table.
     *
     * @return a list of pair which contains the non-exists key and the index in
     * the value
     */
    void Get(const framework::Tensor& ids, framework::Tensor* value,
             bool auto_grown = false, bool is_test = false);
    /*
     * @brief Get the index of the key from id_to_index_ map. If the key not
     * exist,
     * add the key into id_to_index_.
     *
     * Note!!! this interface is only used when selected_rows is used as
     * parameters
     * for distribute lookup table.
     *
     * @return index of the key.
     */
    int64_t AutoGrownIndex(int64_t key, bool auto_grown, bool is_test = false);
    /*
     * @brief Get the index of the key from id_to_index_ map.
     */
    inline int64_t GetIndexFromId(int64_t key) {
      auto iter = id_to_index_.find(key);
      if (iter == id_to_index_.end()) {
        return -1;
      } else {
        return iter->second;
      }
    }
    void SyncIndex();
    /*
     * @brief Get complete Dims before
     */
    DDim GetCompleteDims() const {
      std::vector<int64_t> dims = vectorize(value_->dims());
      dims[0] = height_;
      return make_ddim(dims);
    }
    private:
    // Notice: rows can be duplicate. We can have {0, 4, 7, 0, 5, 7, 9} here.
    // SelectedRows are simply concated when adding together. Until a
    // SelectedRows add a Tensor, will the duplicate rows be handled.
    Vector<int64_t> rows_;
    std::unordered_map<int64_t, int64_t>
        id_to_index_;  // should not be used when rows_ has duplicate member
    std::unique_ptr<Tensor> value_{nullptr};
    int64_t height_;  // height indicates the underline tensor's height
    std::unique_ptr<RWLock> rwlock_{nullptr};
    };
    

    DataShard
    每个 DataShard 保存一部分的 id_to_index,用大小固定的 map 存储 id_to_index,
    每次获取的真实的index 时:
    index = shard_id * shard_size + shard_offset
    shard_num 是作为环境变量传入的.
    shard_size=dict_size/shard_num

    // selectRows 已经建立的情况下,重新根据 id_to_index 生成 DataShards,
    // 此时 shard_size=dict_size/shard_num
    // dict_size 就是 10^9
    void ReconstructShardIndexAfterLoad(){
      int64_t shard_size = value_->dims()[0] / shard_num_;
    }
    
    inline int64_t DataShard::GetIndexById(int64_t id, bool auto_grown, bool is_test, bool fliter_index, bool need_count) {
    rwlock_->RDLock();
    auto iter = id_to_offset_.find(id);
    if (iter == id_to_offset_.end()) {
      rwlock_->UNLock();
      if (auto_grown) {
        rwlock_->WRLock();
        if (need_count) {
          id_to_count_[id] += 1;
        }
        if (fliter_index) {
          if (id_to_count_[id] < 10) {
            VLOG(3) << "fliter key " << id;
            rwlock_->UNLock();
            return -1;
          }
        }
        auto shard_offset = id_to_offset_.size();
        PADDLE_ENFORCE_LT(shard_offset, shard_size_, "shard is full!");
        id_to_offset_[id] = shard_offset;
        int64_t offset = shard_id_ * shard_size_ + shard_offset;
        rwlock_->UNLock();
        return offset;
      } else {
        return -1;
      }
    } else {
      int64_t offset = shard_id_ * shard_size_ + iter->second;
      rwlock_->UNLock();
      return offset;
    }
    }
    

    数据Reader

    Reader从维度上分为 BatchReader 和 SampleReader, 前者返回一个batch 的数据,后者返回单条样本的数据。数据的格式为 Numpy Array 或 LoDTensor。
    从网络上分为同步Feed的Reader和异步的PyReader。
    同步异步是相对于模型训练预测而言的。
    同步模式一般只推荐调试模型时用,因为速度很慢。