总览

image.png

1. Presto简介

1. 基本概念

Presto是一个开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。Presto的设计和编写完全是为了解决像Facebook这样规模的商业数据仓库的交互式分析和处理速度的问题。
注意:虽然Presto可以解析SQL,但它不是一个标准的数据库。不是MySQL、Oracle的代替品,也不能用来处理在线事务(OLTP)。

2. 应用场景

Presto支持在线数据查询,包括Hive,关系数据库(MySQL、Oracle)以及专有数据存储。一条Presto查询可以将多个数据源的数据进行合并,可以跨越整个组织进行分析。Presto主要用来处理响应时间小于**1秒到几分钟的场景**。

3. 架构

Presto是一个运行在多台服务器上的分布式系统。完整安装包括一个Coordinator和多个Worker。由客户端提交查询,从Presto命令行CLI提交到Coordinator。Coordinator进行解析,分析并执行查询计划,然后分发处理队列到Worker。Presto有两类服务器:Coordinator和Worker。
Presto基础入门 - 图2

1. Coordinator

Coordinator服务器是用来解析语句,执行计划分析和管理Presto的Worker结点。Presto安装必须有一个Coordinator和多个Worker。如果用于开发环境和测试,则一个Presto实例可以同时担任这两个角色。
Coordinator跟踪每个Work的活动情况并协调查询语句的执行。Coordinator为每个查询建立模型,模型包含多个Stage,每个Stage再转为Task分发到不同的Worker上执行。
Coordinator与Worker、Client通信是通过REST API。

2. Worker

Worker是负责执行任务和处理数据。Worker从Connector获取数据。Worker之间会交换中间数据。Coordinator是负责从Worker获取结果并返回最终结果给Client。
当Worker启动时,会广播自己去发现 Coordinator,并告知 Coordinator它是可用,随时可以接受Task。
Worker与Coordinator、Worker通信是通过REST API。

3. 数据源

  • Connector

Connector是适配器,用于Presto和数据源(如Hive、RDBMS)的连接。可以认为类似JDBC那样,但却是Presto的SPI的实现,使用标准的API来与不同的数据源交互。
Presto有几个内建Connector:JMX的Connector、System Connector(用于访问内建的System Table)、Hive的Connector、TPCH(用于TPC-H基准数据)。还有很多第三方的Connector,所以Presto可以访问不同数据源的数据。
每个Catalog都有一个特定的Connector。如果你使用Catalog配置文件,会发现每个文件都必须包含connector.name属性,用于指定Catalog管理器(创建特定的Connector使用)。一个或多个Catalog用同样的Connector是访问同样的数据库。例如,有两个Hive集群。可以在一个Presto集群上配置两个Catalog,两个Catalog都是用Hive Connector,从而达到可以查询两个Hive集群。

  • Catalog

一个Catalog包含Schema和Connector。例如,配置JMX的Catalog,通过JXM Connector访问JXM信息。当执行一条SQL语句时,可以同时运行在多个Catalog。
Presto处理Table时,是通过表的完全限定(fully-qualified)名来找到Catalog。例如,一个表的权限定名是hive.test_data.test,则“test”是表名,“test_data”是Schema,“hive”是Catalog。
Catalog的定义文件是在Presto的配置目录中。

  • Schema

Schema是用于组织Table。把Catalog和Schema结合在一起来包含一组的表。当通过Presto访问Hive或MySQL时,一个Schema会同时转为Hive和MySQL的同等概念。

  • Table

Table跟关系型的表定义一样,但数据和表的映射是交给Connector。

4. 数据模型

1. Presto三层表结构

  • Catalog:对应某一类数据源,例如Hive的数据,或MySql的数据。
  • Schema:对应MySql中的数据库。
  • Table:对应MySql中的表。

image.png

2. Presto存储单元

  • Page:多行数据的集合,包含多个列的数据,内部仅提供逻辑行,实际以列式存储。
  • Block:一列数据,根据不同类型的数据,通常采取不同的编码方式,了解这些编码方式,有助于自己的存储系统对接presto。

    3. Block

  • Array类型Block

应用于固定宽度的类型,例如int,long,double。Block由两部分组成:

  • boolean valueIsNull[]:表示每一行是否有值。
  • T values[]:每一行的具体值。
    • 可变宽度的Block

应用于String类数据,由三部分信息组成:

  • Slice:所有行的数据拼接起来的字符串。
  • int offsets[]:每一行数据的起始便宜位置。每一行的长度等于下一行的起始便宜减去当前行的起始便宜。
  • boolean valueIsNull[]:表示某一行是否有值。如果有某一行无值,那么这一行的便宜量等于上一行的偏移量。
    • 固定宽度的String类型的Block

所有行的数据拼接成一长串Slice,每一行的长度固定。

  • 字典Block

对于某些列,distinct值较少,适合使用字典保存。主要有两部分组成:

  • 字典:可以是任意一种类型的block(甚至可以嵌套一个字典block),block中的每一行按照顺序排序编号。
  • int ids[]:表示每一行数据对应的value在字典中的编号。在查找时,首先找到某一行的id,然后到字典中获取真实的值。

    5. 执行查询过程

  1. 提交查询过程。

用户使用Cli提交一个查询语句后,使用HTTP协议与Coordinator通信,Coordinator收到查询请求后调用SqlParser解析SQL语句得到Statement对象,并将Statement封装成一个QueryStarter对象放入线程池中等待执行。
image.png

  1. SQL编译过程。

Presto与Hive一样,使用Antlr编写SQL语法,语法规则定义在Statement.g和StatementBuilder.g两个文件中。 如下图中所示从SQL编译为最终的物理执行计划大概分为5部,最终生成在每个Worker节点上运行的LocalExecutionPlan,这里不详细介绍SQL解析为逻辑执行计划的过程,通过一个SQL语句来理解查询计划生成之后的计算过程。
Presto基础入门 - 图5
示例:

  1. select c1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group by c1.rank limit 10;

逻辑执行计划:
Presto基础入门 - 图6

  1. 物理执行计划。

image.png

6. Presto性能比较

image.png

1. Presto、MapReduce性能比较

Presto中SQL运行过程:MapReduce vs Presto。
Presto基础入门 - 图9

  • 优点
  1. Presto与Hive对比,都能够处理PB级别的海量数据分析,但Presto是基于内存运算,减少没必要的硬盘IO,所以更快。
  2. 能够连接多个数据源,跨数据源连表查,如从Hive查询大量网站访问记录,然后从Mysql中匹配出设备信息。
  3. 部署也比Hive简单,因为Hive是基于HDFS的,需要先部署HDFS。

image.png

  • 缺点
  1. 虽然能够处理PB级别的海量数据分析,但不是代表Presto把PB级别都放在内存中计算的。而是根据场景,如count,avg等聚合运算,是边读数据边计算,再清内存,再读数据再计算,这种耗的内存并不高。但是连表查,就可能产生大量的临时数据,因此速度会变慢,反而Hive此时会更擅长。
  2. 为了达到实时查询,可能会想到用它直连MySql来操作查询,这效率并不会提升,瓶颈依然在MySql,此时还引入网络瓶颈,所以会比原本直接操作数据库要慢。

    2. Presto、Impala性能比较

    参考:https://blog.csdn.net/u012551524/article/details/79124532

  3. 与hive实时共享元数据,impala需要用另外定时任务广播元数据,新生成的数据,用impala不能立即查询。

  4. 没有出现操作大数据集有时挂掉的情况。
  5. presto与hive都由fackbook开源,兼容性应该会更好点。

测试过程比较简单,分为四个场景sql查询:

查询 查询语句 数据量(压缩前,snappy)
query1 select sum(pv) from d_op_behavior_host_text_snappy 35G
query2 select siteid,sum(pv) as pv1 from d_op_behavior_host_text_snappy
where pv>0 group by siteid order by pv1 desc limit 11;
35G
query3 select count(*) from dwd.d_ad_3rd_party_fancy_all_data where
thisdate=’2015-11-10’ and hour=’17’;
200G
query4 select count(*) from dwd.d_ad_impression where thisdate>=’2015-09-01’ and thisdate<=’2015-10-31’ 200G

测试结果对比如下:

查询 工具 第一次执行时间 第二次执行时间
query1 impala 4.82s 5.56s
presto 6s 5s
query2 impala 12.79s 12s
presto 15s 13s
query3 impala 挂掉 挂掉
presto 63s 58s
query4 impala 131s 148s
presto 136s 128s

2. Presto安装部署

1. 环境需求

Presto的基本需求

  • Linux or Mac OS X
  • Java 8, 64-bit
  • Python 2.4+

    2. 连接器

    Presto支持插接式连接器提供的数据。各连接器的设计需求会有所不同。Presto支持从以下版本的Hadoop中读取Hive数据:

  • Apache Hadoop 1.x

  • Apache Hadoop 2.x
  • Cloudera CDH 4
  • Cloudera CDH 5

此外,需要有远程的Hive元数据,不支持本地或嵌入模式。 Presto不使用MapReduce,只需要HDFS。

3. 文件类型

Presto支持以下文件类型:

  • Text
  • SequenceFile
  • RCFile
  • ORC

    4. 安装Presto

    参考:《基于CentOS7安装Presto-0.219

    3. SQL语法差异

    Presto、Hive、Spark的SQL语法差异:

  • Presto使用ANSI SQL语法和语义。

  • Hive使用类似SQL的语言,称为HiveQL,它在MySQL(与ANSI SQL有很多不同)之后进行了松散的建模。
  • Spark SQL支持HiveQL语法。 | 比较项 | Hive & Spark | Presto | | —- | —- | —- | | 字段类型 | String | Varchar | | 列名 | date等关键字不能作为列名、别名,不支持中文别名 | 支持中文别名 | | 日期格式 | yyyy-MM-dd HH:mm:ss | %Y-%m-%d %H:%i:%S | | 日期函数(from_unixtime) | 只支持from_unixtime(unix_time, format) | 支持from_unixtime(unixtime)、from_unixtime(unixtime, string)、from_unixtime(unixtime, hours, minutes) | | 日期函数(date_add) | date_add(start_date, num_days) | date_add(unit, value, timestamp) | | JSON处理 | select get_json_object(json, ‘$.book’); | select json_extract_scalar(json, ‘$.book’);
    注意:该函数返回值是一个string类型,其还有一个函数json_extract是直接返回一个json串,所以使用的时候要明确取的是什么类型的值 | | 从Map中根据Key查找Value | get_json_object(to_json(extra),’$.is_baomai’)
    SparkSQL从2.4.0版本开始提供了element_at(map,key)函数,上述方式,先转成json,再用get_json_object(json, ‘$.book’)获取value | element_at(map,key) | | 列转行 | select student, score from tests lateral view explode(split(scores, ‘,’)) t as score; | select student, score from tests cross json unnest(split(scores, ‘,’) as t (score); | | 复杂分组 | select origin_state, origin_zip, sum(package_weight) from shipping group by origin_state,origin_zip with rollup; | select origin_state, origin_zip, sum(package_weight) from shipping group by rollup (origin_state, origin_zip);
    — Presto语法
    select origin_state, origin_zip, sum(package_weight) from shipping group by grouping sets ((origin_state, origin_zip), (origin_state), ()); | | group by和order by | group by和order by不支持结果列顺序号,必须写列名或表达式。
    示例:GROUP BY substr(dt,1,7) order by substr(dt,1,7); | 同时支持列名、表达式和结果列顺序号。
    示例:SELECT count(), nationkey FROM customer GROUP BY 2;
    SELECT count(
    ), nationkey FROM customer GROUP BY nationkey; |

4. 透明混算

Presto是FaceBook开源的支持多数据源混合查询的计算引擎,它是基于内存的查询,查询速度快,支持多数据源在一条SQL语句中混合查询。

■ Hive & MySQL

实现见参考章节相关连接。

■ GreenPlum & MySQL

GreenPlum是基于PostgreSQL关系型数据库的,MPP架构的分布式数据库。实现见参考章节相关连接。

■ Elasticsearch & MySQL

实现见参考章节相关连接。

■ SQLServer & MongoDB & MySQL

实现见参考章节相关连接。

5. Presto优化

01. 数据存储优化

  1. 合理设置分区。

与Hive类似,Presto会根据元信息读取分区数据,合理的分区能减少Presto数据读取量,提升查询性能。

  1. 使用列式存储。

Presto对ORC文件读取做了特定优化,因此在Hive中创建Presto使用的表时,建议采用ORC格式存储。相对于Parquet,Presto对ORC支持更好。

  1. 使用压缩。

数据压缩可以减少节点间数据传输对IO带宽压力,对于即席查询需要快速解压,建议采用Snappy压缩。

  1. 预先排序。

对于已经排序的数据,在查询的数据过滤阶段,ORC格式支持跳过读取不必要的数据。比如对于经常需要过滤的字段可以预先排序。

  1. INSERT INTO table nation_orc partition(p) SELECT * FROM nation SORT BY n_name;

如果需要过滤n_name字段,则性能将提升。

  1. SELECT count(*) FROM nation_orc WHERE n_name=’AUSTRALIA’;

02. 查询SQL优化

  1. 只选择使用必要的字段。

由于采用列式存储,选择需要的字段可加快字段的读取、减少数据量。避免采用*读取所有字段。

  1. [GOOD]: SELECT time,user,host FROM tbl
  2. [BAD]: SELECT * FROM tbl
  1. 过滤条件必须加上分区字段。

对于有分区的表,where语句中优先使用分区字段进行过滤。acct_day是分区字段,visit_time是具体访问时间。

  1. [GOOD]: SELECT time,user,host FROM tbl where acct_day=20171101
  2. [BAD]: SELECT * FROM tbl where visit_time=20171101
  1. Group By语句优化。

合理安排Group by语句中字段顺序对性能有一定提升。将Group By语句中字段按照每个字段distinct数据多少进行降序排列。

  1. [GOOD]: SELECT GROUP BY uid, gender
  2. [BAD]: SELECT GROUP BY gender, uid
  1. Order by时使用Limit。

Order by需要扫描数据到单个worker节点进行排序,导致单个worker需要大量内存。如果是查询Top N或者Bottom N,使用limit可减少排序计算和内存压力。

  1. [GOOD]: SELECT * FROM tbl ORDER BY time LIMIT 100
  2. [BAD]: SELECT * FROM tbl ORDER BY time
  1. 使用近似聚合函数。

Presto有一些近似聚合函数,对于允许有少量误差的查询场景,使用这些函数对查询性能有大幅提升。比如使用approx_distinct() 函数比Count(distinct x)有大概2.3%的误差。

  1. SELECT approx_distinct(user_id) FROM access
  1. 用regexp_like代替多个like语句。

Presto查询优化器没有对多个like语句进行优化,使用regexp_like对性能有较大提升。

  1. -------------------------------[GOOD]-------------------------------
  2. SELECT
  3. ...
  4. FROM
  5. access
  6. WHERE
  7. regexp_like(method, 'GET|POST|PUT|DELETE')
  8. -------------------------------[BAD]-------------------------------
  9. SELECT
  10. ...
  11. FROM
  12. access
  13. WHERE
  14. method LIKE '%GET%' OR
  15. method LIKE '%POST%' OR
  16. method LIKE '%PUT%' OR
  17. method LIKE '%DELETE%'
  1. 使用Join语句时将大表放在左边。

Presto中join的默认算法是broadcast join,即将join左边的表分割到多个worker,然后将join右边的表数据整个复制一份发送到每个worker进行计算。如果右边的表数据量太大,则可能会报内存溢出错误。

  1. [GOOD] SELECT ... FROM large_table l join small_table s on l.id = s.id
  2. [BAD] SELECT ... FROM small_table s join large_table l on l.id = s.id
  1. 使用Rank函数代替row_number函数来获取Top N。

在进行一些分组排序场景时,使用rank函数性能更好。

  1. -------------------------------[GOOD]-------------------------------
  2. SELECT checksum(rnk)
  3. FROM (
  4. SELECT rank() OVER (PARTITION BY l_orderkey, l_partkey ORDER BY l_shipdate DESC) AS rnk
  5. FROM lineitem
  6. ) t
  7. WHERE rnk = 1
  8. -------------------------------[BAD]-------------------------------
  9. SELECT checksum(rnk)
  10. FROM (
  11. SELECT row_number() OVER (PARTITION BY l_orderkey, l_partkey ORDER BY l_shipdate DESC) AS rnk
  12. FROM lineitem
  13. ) t
  14. WHERE rnk = 1

03. 无缝替换Hive表

如果之前的Hive表没有用到ORC和snappy,那么怎么无缝替换而不影响线上的应用?
示例:如下一个Hive表:

  1. CREATE TABLE bdc_dm.res_category(
  2. channel_id1 int comment '1级渠道id',
  3. province string COMMENT '省',
  4. city string comment '市',
  5. uv int comment 'uv'
  6. )
  7. comment 'example'
  8. partitioned by (landing_date int COMMENT '日期:yyyymmdd')
  9. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  10. COLLECTION ITEMS TERMINATED BY ','
  11. MAP KEYS TERMINATED BY ':'
  12. LINES TERMINATED BY '\n';

建立对应的orc表:

  1. CREATE TABLE bdc_dm.res_category_orc(
  2. channel_id1 int comment '1级渠道id',
  3. province string COMMENT '省',
  4. city string comment '市',
  5. uv int comment 'uv'
  6. )
  7. comment 'example'
  8. partitioned by (landing_date int COMMENT '日期:yyyymmdd')
  9. row format delimited fields terminated by '\t'
  10. stored as orc
  11. TBLPROPERTIES ("orc.compress"="SNAPPY");

先将数据灌入orc表,然后更换表名:

  1. -- 导入ORC
  2. insert overwrite table bdc_dm.res_category_orc partition(landing_date)
  3. select * from bdc_dm.res_category where landing_date >= 20171001;
  4. ALTER TABLE bdc_dm.res_category RENAME TO bdc_dm.res_category_tmp;
  5. ALTER TABLE bdc_dm.res_category_orc RENAME TO bdc_dm.res_category;

其中rescategory_tmp是一个备份表,若线上运行一段时间后没有出现问题,则可以删除该表。
注意:ORC和Parquet都支持列式存储,但是ORC对Presto支持更好(Parquet对Impala支持更好),对于列式存储而言,存储文件为二进制的,对于经常增删字段的表,建议不要使用列式存储(修改文件元数据代价大)。对比数据仓库,dwd层建议不要使用ORC,而dm层则建议使用。_

04. 数据统计优化

很多的时候,在Presto上对数据库跨库查询,例如Mysql数据库。这个时候Presto的做法是从MySQL数据库端拉取最基本的数据,然后再去做进一步的处理,例如统计等聚合操作。
示例:

  1. SELECT count(id) FROM table_1 WHERE condition=1;

上面的SQL语句会分为3个步骤进行:

  1. Presto发起到Mysql数据库进行查询。

    1. SELECT id FROM table_1 WHERE condition=1;
  2. 对结果进行count计算。

  3. 返回结果。

所以说,对于Presto来说,其跨库查询的瓶颈是在数据拉取这个步骤。若要提高数据统计的速度,可考虑把Mysql中相关的数据表定期转移到HDFS中,并转存为高效的列式存储格式ORC。
所以定时归档是一个很好的选择,这里还要注意,在归档的时候我们要选择一个归档字段,如果是按日归档,我们可以用日期作为这个字段的值,采用“yyyyMMdd”的形式,例如:“20180123”。
一般创建归档数据库的SQL语句如下:

  1. CREATE TABLE IF NOT EXISTS table_1 (
  2. id INTEGER,
  3. ........
  4. partition_date INTEGER
  5. )WITH ( format = 'ORC', partitioned_by = ARRAY['partition_date'] );

查看创建的库结构:

  1. SHOW CREATE TABLE table_1; /*Only Presto*/

带有分区的表创建完成之后,每天只要更新分区字段partition_date就可以了,聪明的Presto就能将数据放置到规划好的分区了。如果要查看一个数据表的分区字段是什么,可以下面的语句:

  1. SHOW PARTITIONS FROM table_1 /*Only Presto*/

05. 分区字段过滤优化

如果数据被规当到HDFS中,并带有分区字段。在每次查询归档表的时候,要带上分区字段作为过滤条件,这样可以加快查询速度。因为有了分区字段作为查询条件,就能帮助Presto避免全区扫描,减少Presto需要扫描的HDFS的文件数。

06. WITH语句优化

使用Presto分析统计数据时,可考虑把多次查询合并为一次查询,用Presto提供的子查询完成。这点和我们熟知的MySQL的使用不是很一样。
示例:

  1. WITH subquery_1 AS (
  2. SELECT a1, a2, a3
  3. FROM Table_1
  4. WHERE a3 between 20180101 and 20180131
  5. ), /*子查询subquery_1,注意:多个子查询需要用逗号分隔*/
  6. subquery_2 AS (
  7. SELECT b1, b2, b3
  8. FROM Table_2
  9. WHERE b3 between 20180101 and 20180131
  10. ) /*最后一个子查询后不要带逗号,不然会报错。*/
  11. SELECT
  12. subquery_1.a1, subquery_1.a2,
  13. subquery_2.b1, subquery_2.b2
  14. FROM subquery_1
  15. JOIN subquery_2
  16. ON subquery_1.a3 = subquery_2.b3;

07. 子查询优化

利用子查询,减少读表的次数,尤其是大数据量的表。具体做法是,将使用频繁的表作为一个子查询抽离出来,避免多次read。

08. 查询字段优化

一定要避免在查询中使用“SELECT *”这样的语句,尽量只查询需要的字段,换位思考,如果让你去查询数据是不是告诉你的越具体,工作效率越高呢。对于我们的数据库而言也是这样,任务越明确,工作效率越高。对于要查询全部字段的需求也是这样,没有偷懒的捷径,把它们都写出来。

09. join查询优化

join左边尽量放小数据量的表,而且最好是重复关联键少的表。

10. 注意事项

  • 字段名引用

Presto中的字段名引用使用双引号分割,这个要区别于MySQL的反引号“`”。当然,可以不加这个双引号。

  • 时间函数

timestamp进行比较的时候,需要添加timestamp关键字,而MySQL中对timestamp可以直接进行比较。

  1. /*MySQL的写法 */
  2. SELECT t FROM a WHERE t > '2017-01-01 00:00:00';
  3. /*Presto中的写法*/
  4. SELECT t FROM a WHERE t > timestamp '2017-01-01 00:00:00';
  • MD5函数的使用

Presto中MD5函数传入的是binary类型,返回的也是binary类型,要对字符串进行MD5操作时,需要转换。

  1. SELECT to_hex(md5(to_utf8('1212')));
  • 覆写

Presto中不支持“insert overwrite”语法,只能先“delete”,然后“insert into”。

  • 压缩

Presto中对ORC文件格式进行了针对性优化,但在impala中目前不支持ORC格式的表,Hive中支持ORC格式的表,所以想用列式存储的时候可以优先考虑ORC格式。Presto目前支持parquet格式,支持查询,但不支持insert。

参考

SegmentFault(思否):Presto上使用SQL遇到的坑
https://segmentfault.com/a/1190000013120454?utm_source=tag-newest
CSDN:SpringBoot整合Presto实现多数据源操作数据(Hive & MySQL)
https://blog.csdn.net/qq_37933018/article/details/104663400
CSDN:Presto实现多数据源混合查询(Greenplum & MySQL)
https://blog.csdn.net/Sunhighlight/article/details/89043885
程序员大本营:搭建Presto集群进行跨多类型数据库联表查询(SQLServer & MySQL & MongoDB & Redis)
https://www.pianshen.com/article/60041233028
博客园:Presto实现多数据源混合查询(Elasticsearch & MySQL)
https://www.cnblogs.com/yzlsthl/p/11805102.html