HeSQL简介

spark的局限

spark的局限

  • 开发有门槛:需要了解广播变量,RDD算子的专业概念。
  • 无法准确识别小表(配置数据)导致大量shuffle
  • 即使识别出小表,也无唯一主键概念,关联效率低
  • 广播变量的局限:无法自动创建索引,不可重用;不支持RDD与广播变量之间的SQL

业务实现有难度

  • 复杂的广播变量、眼花缭乱的RDD算子
  • 创建各种索引,且不可重用
  • 业务复杂:需要关联多种配置数据,且不可重用

目标

  • 资源成本管控:提高机器利用率
  • 人力成本管控:提高开发效率、运营效率

    SparkSQL与HeSQL对比

    传统方式(SparkSQL)
    image.png
    HeSQL
    image.png

    HeSQL框架

    image.png

效果

资源消耗情况
image.png
执行时间
用户访问分布初步测试(40G数据+300M配置)

  • 更少资源:1/2CPU, 1/4内存, 1/100的IO
  • 更高性能:提高2-10倍
  • 更少的代码:约1/10

旧版作业运行30min
image.png
新版作业运行1.6min(所有维度估计10min)
image.png
image.png

基本概念

  • RDD:Resilient Distributed Datasets(弹性分布式数据集)
  • Dataset/Dataframe: 含schema
  • Driver/Executor/broacast

image.png

quick start

  1. package com.hdata.hesql.demo;
  2. import com.hdata.hesql.core.*;
  3. import com.hdata.hesql.core.helper.Option;
  4. import org.apache.spark.sql.SaveMode;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. /** spark sql union 与广播变量对比
  8. * 测试数据由ConvTool中构造
  9. * 测试数据目录结构:
  10. * /spark/data
  11. * +-config
  12. * +-test_user
  13. * +-test_order
  14. * +-parquet
  15. * Created by heshang on 2021/11/09.
  16. */
  17. public class HeSqlTest extends SparkBaseJob {
  18. private static final Logger LOG = LoggerFactory.getLogger(HeSqlTest.class);
  19. public static void main(String[] args) {
  20. new HeSqlTest().start(args);
  21. }
  22. @Override
  23. protected void execute() throws Exception {
  24. sparkSqlUnionById();
  25. heSqlUnionById();
  26. }
  27. private void sparkSqlUnionById() throws Exception {
  28. long startTime = System.currentTimeMillis();
  29. String minPartitions = nameParams.getOrDefault("txt.minPartitions","1");
  30. HeRDDTool.getHeRDDOfConfFromFile(spark, "test_order", "orderid,userid,productid,num,price,cdate,editor",
  31. new Option("delimiter", ",").set("skip.exception", "true"));
  32. //配置文件比较小,只需要几个分区就可以,这样方便进行广播
  33. HeRDDTool.getHeRDDOfConfFromFile(spark,"test_user", "userid,uname,utype,birth,tel,weight,editor",
  34. new Option("delimiter", ",").set("txt.minPartitions", minPartitions));
  35. String sqlStr = "select o.orderid,o.userid,o.productid,o.num,o.price,o.editor,cdate,u.uname,u.utype,u.birth,u.tel,u.weight "
  36. + "from test_order o left join test_user u on o.userid=u.userid";
  37. HeRDD orderUser = spark.sql(sqlStr, "order_user");
  38. orderUser.getDataset().write().mode(SaveMode.Overwrite).parquet("/spark/data/parquet/order_user1");
  39. LOG.info("testSparkSql usetime:{}s", (System.currentTimeMillis() - startTime) / 1000.0);
  40. }
  41. private void heSqlUnionById() throws Exception {
  42. long startTime = System.currentTimeMillis();
  43. HeRDDTool.getHeRDDOfConfFromFile(spark, "test_order", "orderid,userid,productid,num,price,cdate,editor",
  44. new Option("delimiter", ",").set("skip.exception", "true"));
  45. HeBeanTool.getHeBeanOfConfFromFile("test_user", "userid,uname,utype,birth,tel,weight,editor",
  46. new Option("delimiter", ","));
  47. String sqlStr = "select o.orderid,o.userid,o.productid,o.num,o.price,o.editor,cdate,u.uname,u.utype,u.birth,u.tel,u.weight "
  48. + "from test_order o left join test_user u on o.userid=u.userid";
  49. HeRDD orderUser = spark.sqlOfHeRDD(sqlStr, "order_user");
  50. orderUser.getDataset().write().mode(SaveMode.Overwrite).parquet("/spark/data/parquet/order_user2");
  51. LOG.info("testHeRDD2HeBean usetime:{}s", (System.currentTimeMillis() - startTime) / 1000.0);
  52. }
  53. }

在开发环境中少量数据的情况下,执行时间区别不大,主要区别是shuffle。SparkSQL占用大量shuffle,而HeSQL没有任何shuffle。
这里写spark作业,只需要继承SparkBaseJob并实现execute接口即可,其他的操作只需要写SQL语句。

SparkBaseJob

  • 封装基本配置:从app.properties中读取,主要配置项如下:

    1. # 这个一般在windows下运行时才需要设置,
    2. # 如果没有设置这个可以通过 -Dspark.driver.host=192.168.56.1 设置
    3. # 系统上线时一般需要删除这个配置
    4. spark.driver.host=192.168.56.1
    5. spark.master=spark://node01:7077
    6. spark.eventLog.enabled=true
    7. spark.eventLog.dir=hdfs://node01:9000/history
    8. spark.eventLog.compress=true
  • 初始化参数解析与spark运行环境

  • 方便本机调试

提交作业是自动加载本地打包的jar文件,是开发可以在本地运行main函数的方式进行调试,提高开发效率。

MetaManager

大数据作业经常要处理文本或接口数据,这些数据没有元数据信息,来源也不同,不方便进行SQL方式操作,所以我们要用MetaManager进行管理这些数据。目前使用简单管理方式,即编程的方式。将来可以用平台的方式,即web界面方式注册、管理元数据。
从hive/clickhouse/mysql读取的数据不需要经过MetaManager,因为他们已经有元数据。

  • view

即HeBean或HeRDD的名字,与spark中view概念类似
如果是配置文件则对应配置文件名,或目录名

  • 默认值

    DEFAULT_STR_IFNULL = "";
    DEFAULT_INT    _IFNULL = "0";
    

    对应数据不存在是默认是空(NULL),这样容易导致程序异常,或者业务处理异常,比如SQL无法关联,很多时候我们可以通过设置默认值解决。

  • 读取数据时相关option

delimiter: 分隔符集合,每个字符都是分隔符,不是作为一个整体。默认空格。
skip.exception:忽略解析异常,被忽略的值被设置为null, 即使nullable=false也会忽略该异常*

即skip.exception=true或nullable=true时,如果没有设置默认值,数据缺失时都会设置为null

skip.empty:忽略空行,包括只有空格的行。默认为true

HeRDD/HeBean

HeRDD HeBean
作用 封装数据的注册,便于SmartSQL操作 封装数据的注册,便于SmartSQL操作
是否含schema
本质 本质上是RDD,对Dataset的简单封装 本质上是Bean,标准化的Bean
使用场景 适合数据量大的数据 适合数据量小(配置文件等)的数据

HeBean与广播变量的区别

  • HeBean会被自动广播出去
  • HeBean会智能的自动创建索引,包括单主键索引与多主键索引,支持非唯一索引

HeBean主要结构:

public class HeBean implements Serializable {
    private List<Row> data;//GenericRow
    private StructType schema;
    //索引名字=建立索引的列名序号,多个列用逗号分隔
    //一次只支持一个索引,多个索引也是需要重新广播,所以多个索引没有太大意义
    private String indexName;
    Map<String, List<Row>> indexes = new HashMap<>();//非唯一索引
    Map<String, Row> uniqIndexes = new HashMap<>();  //唯一索引
}

HeRDD主要结构:

public class HeRDD implements Serializable {
    private StructType schema;
    private Dataset<Row> dataset;
    private String view;
    private boolean global = false;
}

HeSparkSession

HeSparkSession是SparkSession的子类,SparkSession的功能HeSparkSession都有。
下面介绍一下它的主要的接口。

HeRDD sql(String sqlContext, String view, boolean global)

SparkSession的封装,增加对view的注册。

HeRDD sqlOfHeRDD(String sql, String view, JoinType joinType)

HeRDD与HeBean之间的关联,这是HeSQL的核心。
目前仅支持左连接,此时HeBean会被广播出去,并自动创建索引
左连接的效率优化点:
1、自动广播
2、一对一关联时避免右表的全表扫描
3、使用数组下标进行索引而不是字段名字
4、广播变量广播之前会智能的建立需要的索引
注意:
- 主要解决sparksql没有主键导致过多shuffle问题
- 不支持HeRDD与HeBean之间的内联接,需要左联接之后再过滤空值的方式实现内联。
- 不支持三个即以上表的关联
- 字段顺序未必与sql语句中一致
- 一般大文件使用HeRDD,小文件使用HeBean
- 支持列别名

HeBean sqlOfHeBean(String sql, String view, JoinType joinType)

HeBean与HeBean之间的关联,这样避免了定义一堆的POJO对象。
HeBean之间可以进行内联接、左联接,连接时不产生shuffle,因为数据都在driver执行

核心数据结构

public class HeSparkSession extends SparkSession {
    private static Map<String, Broadcast<HeBean>> broadcastedHeBeans = new HashMap<>();
    private static Map<String, HeBean> heBeanMap = new HashMap<>();
    private static Map<String, HeRDD> heRDDMap = new HashMap<>();
    private static Option globalOption = new Option();
}
  • broadcastedHeBeans 已经广播的heBean变量
  • heBeanMap heBean索引表,主要用于查找view中的schema
  • heRDDMap heRDD索引表,主要用于查找view中的schema

使用例子参考demo下文件。

HeRDDTool/HeBeanTool

这两个工具主要封装从各种存储中读取数据的方法。
有对应的测试用例,查看用例即可。

UDF

现实中业务通常很复杂,只凭SQL很难实现。这个可以使用UDF实现,业务人员在SQL中调用udf实现业务需求。
我们也推荐使用UDF方式,因为它是可重用的,使业务代码简介,好维护。
UDF的定义加入到HeUDF.java即可,框架会进行自动注册。

窗口函数

获取本每个班级每个学科的前三名同学的姓名和成绩:

SELECT class,subject,name,grade,
    row_number() over(partition by class,subject order by grade desc) top 
    FROM v1 where top <=3

使用窗口函数会使原本复杂的SQL语句变得简单。
参考例子WindowFunction。

Parquetation

起因

  • 读取文本文件需要读取所有内容,不管是不是需要使用到
  • cache/persist无法起到效果

    数据太大,占用大量内存,产生大量GC 内存保存不了,最终交换大硬盘,产生大量IO

  • 业务逻辑需要考虑大量配置的关联

    处理复杂 效率低 同一天的数据使用同一个时刻的配置,数据不准确

parquet文件

  • 列式存储,只取需要的列,适合统计分析
  • 数据压缩,大大较少IO与内存

    框架设计

  • 统一将源数据与配置关联之后保存为parquet文件

    目标是下游的业务主要处理一到两个parquet文件即可

  • 将parquet处理标准化,包括输入参数、输出文件命名方式、保存路径

image.png
优点:

  • 业务开发与底层开发分开,业务人员只需要写SQL就可以,专人负责parquetation的开发。
  • 提高开发效率,只需要HeSQL进行开发,甚至可以不需要开发,提供界面进行处理即可。
  • 提高运营效率:减少了大量重复作业,以及相关的运维。
  • 提高资源利用率:parquet的列式存储与数据压缩大大提高性能。

本地调试

为了提高开发效率和调试程序找bug,我们需要在本地搭建开发调试文件,即可以在本地运行main函数方式进行调试,而不是仅通过打印日志实现。

环境安装配置

下面是搭建本地调试环境的关键的地方:

  • hadoop集群

推荐使用vagrant搭建集群,具体参考前面部分。

  • spark集群

集群搭建参考前面部分。要注意spark.driver.host的配置,这是为了executor与driver能够通信。
为了让executor能够找到工程中的包,需要将本地包设置到环境中,下面这段代码可以参考:

String appFilePath = SparkBaseJob.class.getClassLoader().getResource("app.properties").getFile();
// 判断是不是jar包spark_submit模式还是在IDE中直接运行的调试模式
if(!appFilePath.contains(".jar!")){
    // 非spark_submit模式,为了能够进行调试,需要将jar包setJars
    String userDir = System.getProperty("user.dir");
    // 包名要根据实际情况修改
    sparkConf.setJars(new String[]{userDir+ File.separator+"target"+File.separator+"hesql-1.0-all.jar"});
}

注意需要先mvn package ,再运行main函数。

排除重复包

直接对工程打包会导致jar包很大,有的甚至打包几百M,而很多依赖的jar包实际上是没有必要放到里面的,因为他们集群环境中已经有了,我们应该如何排除呢?
假如你的spark安装在 /opt/hdata/spark ,参考下面的代码进行jar包排除:

#!/bin/bash

cd /opt/hdata/spark
for f in `ls jars`;do
    # 去掉版本号
    jar=${f%-*}
    # depend.list 通过 mvn dependency:tree 获取
    cat depend.list|grep ":${jar}$"|awk '{print "<exclude>"$0"</exclude>"}'
done

将经过处理之后的输出放到maven-assembly-plugin插件的配置文件中:

<?xml version="1.0" encoding="UTF-8"?>
<assembly>
    <id>all</id>
    <includeBaseDirectory>false</includeBaseDirectory>
    <formats>
        <format>jar</format>
    </formats>
    <fileSets>
        <fileSet>
            <directory>${project.build.directory}/classes</directory>
            <outputDirectory>/</outputDirectory>
        </fileSet>
    </fileSets>
    <dependencySets>
        <dependencySet>
            <outputDirectory>/</outputDirectory>
            <excludes>
                <!-要排除的包放到这里-->
            </excludes>
            <unpack>true</unpack>
            <scope>runtime</scope>
            <useProjectArtifact>false</useProjectArtifact>
            <useTransitiveDependencies>false</useTransitiveDependencies>
        </dependencySet>
    </dependencySets>
</assembly>

经过处理之后,最后的jar包可能只有几M,这样会大大提高开发和运行效率。