小🌰 : Label encoding

模型特征按照是否连续可以分为两类:连续性数值特征和离散型特征,离散型特征往往以字符串的形式存在,比如用户兴趣特征就包括体育、政治、军事和娱乐等。对于很多机器学习算法来说,字符串类型的数据是不能直接消费的,需要转换为数值才行,例如把体育、政治、军事、娱乐映射为 0、1、2、3,这个过程在机器学习领域有个术语就叫 Label encoding。

📖 1:

  1. // 函数定义 - 反例
  2. def findIndex(filePath: String, interest: String): Int = {
  3. val source = Source.fromFile(filePath, "UTF-8")
  4. val lines = source.getLines().toArray
  5. source.close()
  6. val searchMap = lines.zip(0 until lines.size).toMap
  7. searchMap.getOrElse(interest, -1)
  8. }
  9. // Dataset中调用
  10. findIndex(filePath, "体育-篮球-NBA-湖人")

📖 2:

  1. // 函数定义 - 正例1:高阶函数
  2. val findIndex: (String) => (String) => Int = {
  3. (filePath) =>
  4. val source = Source.fromFile(filePath, "UTF-8")
  5. val lines = source.getLines().toArray
  6. source.close()
  7. val searchMap = lines.zip(0 until lines.size).toMap
  8. (interest) => searchMap.getOrElse(interest, -1)
  9. }
  10. val partFunc = findIndex(filePath)
  11. partFunc("体育-篮球-NBA-湖人")
  1. 处理函数定义为高阶函数,形参是模版文件路径,返回结果是从用户兴趣到索引的函数
  2. 封装千亿样本的Dataset所调用的函数,不是第一份代码中的findIndex,而是用模版文件调用findIndex得到的partFunc,partFunc是行参为兴趣,结果为索引的普通标量函数
  3. 用户代码先在Driver端用模版文件调用这个高阶函数,完成第一步计算建立字典的过程,同时输出一个只带一个形参的标量函数,这个标量函数携带了刚刚建好的映射字典。最后,Dataset将这个标量函数作用于千亿样本之上做Label encoding。这样,函数的第一步计算只在driver端计算一次,分发给集群中所有executors的任务中封装的是携带了字典的标量函数。然后在Execturos端,executors在各自的数据分片上调用函数,省去了扫描模版文件,建立字典的开销。最后,我们只需要把样本中的用户兴趣传递进去,函数就能以O(1)的查询效率返回数值结果

User Memory性能隐患

在运行时,这个函数在 Driver 端会被封装到一个又一个的 Task 中去,随后 Driver 把这些 Task 分发到 Executor,Executor 接收到任务之后,交由线程池去执行。这个时候,每个 Task 就像是一架架小飞机,携带着代码“乘客”和数据“行李”(searchMap),从 Driver 飞往 Executor。Task 小飞机在 Executor 机场着陆之后,代码“乘客”乘坐出租车或是机场大巴,去往 JVM stack;数据“行李”则由专人堆放在 JVM Heap,也就是我们常说的堆内内存。

  1. Task分发过程中的网络开销
  2. Executor的User Memory需要寄存这份同样的数据(数据大小?用户自定义的数据结构往往是用于辅助函数完成计算任务的,所以函数执行完毕之后,它携带的数据结构的生命周期也就告一段落。因此,只需要关注executor在同一时间可以并行处理的task数量,也就是executor的线程池大小。那么User memory至少需要提供#threads * #size这么大的内存空间)

📖 3:

  1. // 广播变量
  2. val source = Source.fromFile(filePath, "UTF-8")
  3. val lines = source.getLines().toArray
  4. source.close()
  5. val searchMap = lines.zip(0 until lines.size).toMap
  6. val bcSearchMap = sparkSession.sparkContext.broadcast(searchMap)
  7. bcSearchMap.value.getOrElse("体育-蓝湖-NBA-湖人", -1)

小飞机之前需要携带函数findIndex,现在则换成了一位“匿名的乘客”:一个读取广播变量并调用其getOrElse方法的匿名函数。由于这位匿名乘客将大件行李托运给了“联邦广播快递公司”的专用货机,因此,task小飞机着陆后,没有任何行李需要寄存到User Memory

广播变量所携带的数据内容会物化到MemoryStore中去,以executor为粒度为所有task提供唯一的一份数据拷贝。MemoryStore产生的内存占用会被记入到Storage Memory上。由于广播变量分发和存储以executor为粒度,因此每个executor消耗的内存空间,就是searchMap一份数据拷贝的大小