如果使用Spark SQL,可以用registerJavaUDAF,对分组的数据,调用Java进行运算
如果是DataFrame,可以用pandas的UDF,应用到每组数据上:
对DataFrame进行group,得到GroupedData对象,也有很多方法可以用,比如agg/apply都支持pandas_udf,注意可能需要full shuffle,而且单个group需要在同一个节点加载进入内存
对DataFrame指定某些列,用cube方法,也可以变成GroupedData
cube生成的结果除了包括groupby的内容之外,也包含整个集合,比如一个维度就是null,相当于*,全部内容,做count等agg操作的时候对全体做,两个维度就有[null,null],还有[具体数值,null],对这些分类也做相应的汇总。
而groupby就没有自动生成这样“全选”的数据行。
对于group可能只能使用pandas进行处理,主要分两类
1 GROUPED_AGG,输入是一列,每组只能生成一个数字,调用是agg
输出类似于withColumn把结果贴到现有df右边,
2 GROUPED_MAP,输入是DataFrame,每组生成一个DataFrame,调用是apply
输出替代输入的df,最终多个df竖着合并起来,
可参考如何对GroupedData使用pandasUDF
参考DataBricks的PandasUDF入门
可以按时间窗口进行Group,参考sql.functions.window
pandas与spark的交互使用Arrow,参考详情
因此使用spark中相应功能之前,需要安装PyArrow,conda install pyarrow即可
spark使用的arrow版本小于0.15,arrow在0.15版本升级了协议,需要强制指定spark使用旧的arrow协议,或者安装<0.15版本的pyarrow
参考文档
参考Jira报错
参考SO的方法
arrow使跨语言的调用变的更快,不再需要多余转换(搜索:spark pyarrow efficiency)
视频介绍(Spark AI Summit 2020)
文字介绍(3秒变成57毫秒)