总览
1. Presto简介
1. 基本概念
Presto是一个开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。Presto的设计和编写完全是为了解决像Facebook这样规模的商业数据仓库的交互式分析和处理速度的问题。
注意:虽然Presto可以解析SQL,但它不是一个标准的数据库。不是MySQL、Oracle的代替品,也不能用来处理在线事务(OLTP)。
2. 应用场景
Presto支持在线数据查询,包括Hive,关系数据库(MySQL、Oracle)以及专有数据存储。一条Presto查询可以将多个数据源的数据进行合并,可以跨越整个组织进行分析。Presto主要用来处理响应时间小于**1秒到几分钟的场景**。
3. 架构
Presto是一个运行在多台服务器上的分布式系统。完整安装包括一个Coordinator和多个Worker。由客户端提交查询,从Presto命令行CLI提交到Coordinator。Coordinator进行解析,分析并执行查询计划,然后分发处理队列到Worker。Presto有两类服务器:Coordinator和Worker。
1. Coordinator
Coordinator服务器是用来解析语句,执行计划分析和管理Presto的Worker结点。Presto安装必须有一个Coordinator和多个Worker。如果用于开发环境和测试,则一个Presto实例可以同时担任这两个角色。
Coordinator跟踪每个Work的活动情况并协调查询语句的执行。Coordinator为每个查询建立模型,模型包含多个Stage,每个Stage再转为Task分发到不同的Worker上执行。
Coordinator与Worker、Client通信是通过REST API。
2. Worker
Worker是负责执行任务和处理数据。Worker从Connector获取数据。Worker之间会交换中间数据。Coordinator是负责从Worker获取结果并返回最终结果给Client。
当Worker启动时,会广播自己去发现 Coordinator,并告知 Coordinator它是可用,随时可以接受Task。
Worker与Coordinator、Worker通信是通过REST API。
3. 数据源
- Connector
Connector是适配器,用于Presto和数据源(如Hive、RDBMS)的连接。可以认为类似JDBC那样,但却是Presto的SPI的实现,使用标准的API来与不同的数据源交互。
Presto有几个内建Connector:JMX的Connector、System Connector(用于访问内建的System Table)、Hive的Connector、TPCH(用于TPC-H基准数据)。还有很多第三方的Connector,所以Presto可以访问不同数据源的数据。
每个Catalog都有一个特定的Connector。如果你使用Catalog配置文件,会发现每个文件都必须包含connector.name属性,用于指定Catalog管理器(创建特定的Connector使用)。一个或多个Catalog用同样的Connector是访问同样的数据库。例如,有两个Hive集群。可以在一个Presto集群上配置两个Catalog,两个Catalog都是用Hive Connector,从而达到可以查询两个Hive集群。
- Catalog
一个Catalog包含Schema和Connector。例如,配置JMX的Catalog,通过JXM Connector访问JXM信息。当执行一条SQL语句时,可以同时运行在多个Catalog。
Presto处理Table时,是通过表的完全限定(fully-qualified)名来找到Catalog。例如,一个表的权限定名是hive.test_data.test,则“test”是表名,“test_data”是Schema,“hive”是Catalog。
Catalog的定义文件是在Presto的配置目录中。
- Schema
Schema是用于组织Table。把Catalog和Schema结合在一起来包含一组的表。当通过Presto访问Hive或MySQL时,一个Schema会同时转为Hive和MySQL的同等概念。
- Table
Table跟关系型的表定义一样,但数据和表的映射是交给Connector。
4. 数据模型
1. Presto三层表结构
- Catalog:对应某一类数据源,例如Hive的数据,或MySql的数据。
- Schema:对应MySql中的数据库。
- Table:对应MySql中的表。
2. Presto存储单元
- Page:多行数据的集合,包含多个列的数据,内部仅提供逻辑行,实际以列式存储。
Block:一列数据,根据不同类型的数据,通常采取不同的编码方式,了解这些编码方式,有助于自己的存储系统对接presto。
3. Block
Array类型Block
应用于固定宽度的类型,例如int,long,double。Block由两部分组成:
- boolean valueIsNull[]:表示每一行是否有值。
- T values[]:每一行的具体值。
- 可变宽度的Block
应用于String类数据,由三部分信息组成:
- Slice:所有行的数据拼接起来的字符串。
- int offsets[]:每一行数据的起始便宜位置。每一行的长度等于下一行的起始便宜减去当前行的起始便宜。
- boolean valueIsNull[]:表示某一行是否有值。如果有某一行无值,那么这一行的便宜量等于上一行的偏移量。
- 固定宽度的String类型的Block
所有行的数据拼接成一长串Slice,每一行的长度固定。
- 字典Block
对于某些列,distinct值较少,适合使用字典保存。主要有两部分组成:
- 字典:可以是任意一种类型的block(甚至可以嵌套一个字典block),block中的每一行按照顺序排序编号。
- int ids[]:表示每一行数据对应的value在字典中的编号。在查找时,首先找到某一行的id,然后到字典中获取真实的值。
5. 执行查询过程
- 提交查询过程。
用户使用Cli提交一个查询语句后,使用HTTP协议与Coordinator通信,Coordinator收到查询请求后调用SqlParser解析SQL语句得到Statement对象,并将Statement封装成一个QueryStarter对象放入线程池中等待执行。
- SQL编译过程。
Presto与Hive一样,使用Antlr编写SQL语法,语法规则定义在Statement.g和StatementBuilder.g两个文件中。 如下图中所示从SQL编译为最终的物理执行计划大概分为5部,最终生成在每个Worker节点上运行的LocalExecutionPlan,这里不详细介绍SQL解析为逻辑执行计划的过程,通过一个SQL语句来理解查询计划生成之后的计算过程。
示例:
select c1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group by c1.rank limit 10;
逻辑执行计划:
- 物理执行计划。
6. Presto性能比较
1. Presto、MapReduce性能比较
Presto中SQL运行过程:MapReduce vs Presto。
- 优点
- Presto与Hive对比,都能够处理PB级别的海量数据分析,但Presto是基于内存运算,减少没必要的硬盘IO,所以更快。
- 能够连接多个数据源,跨数据源连表查,如从Hive查询大量网站访问记录,然后从Mysql中匹配出设备信息。
- 部署也比Hive简单,因为Hive是基于HDFS的,需要先部署HDFS。
- 缺点
- 虽然能够处理PB级别的海量数据分析,但不是代表Presto把PB级别都放在内存中计算的。而是根据场景,如count,avg等聚合运算,是边读数据边计算,再清内存,再读数据再计算,这种耗的内存并不高。但是连表查,就可能产生大量的临时数据,因此速度会变慢,反而Hive此时会更擅长。
为了达到实时查询,可能会想到用它直连MySql来操作查询,这效率并不会提升,瓶颈依然在MySql,此时还引入网络瓶颈,所以会比原本直接操作数据库要慢。
2. Presto、Impala性能比较
参考:https://blog.csdn.net/u012551524/article/details/79124532
与hive实时共享元数据,impala需要用另外定时任务广播元数据,新生成的数据,用impala不能立即查询。
- 没有出现操作大数据集有时挂掉的情况。
- presto与hive都由fackbook开源,兼容性应该会更好点。
测试过程比较简单,分为四个场景sql查询:
查询 | 查询语句 | 数据量(压缩前,snappy) |
---|---|---|
query1 | select sum(pv) from d_op_behavior_host_text_snappy | 35G |
query2 | select siteid,sum(pv) as pv1 from d_op_behavior_host_text_snappy where pv>0 group by siteid order by pv1 desc limit 11; |
35G |
query3 | select count(*) from dwd.d_ad_3rd_party_fancy_all_data where thisdate=’2015-11-10’ and hour=’17’; |
200G |
query4 | select count(*) from dwd.d_ad_impression where thisdate>=’2015-09-01’ and thisdate<=’2015-10-31’ | 200G |
测试结果对比如下:
查询 | 工具 | 第一次执行时间 | 第二次执行时间 |
---|---|---|---|
query1 | impala | 4.82s | 5.56s |
presto | 6s | 5s | |
query2 | impala | 12.79s | 12s |
presto | 15s | 13s | |
query3 | impala | 挂掉 | 挂掉 |
presto | 63s | 58s | |
query4 | impala | 131s | 148s |
presto | 136s | 128s |
2. Presto安装部署
1. 环境需求
Presto的基本需求
- Linux or Mac OS X
- Java 8, 64-bit
-
2. 连接器
Presto支持插接式连接器提供的数据。各连接器的设计需求会有所不同。Presto支持从以下版本的Hadoop中读取Hive数据:
Apache Hadoop 1.x
- Apache Hadoop 2.x
- Cloudera CDH 4
- Cloudera CDH 5
此外,需要有远程的Hive元数据,不支持本地或嵌入模式。 Presto不使用MapReduce,只需要HDFS。
3. 文件类型
Presto支持以下文件类型:
- Text
- SequenceFile
- RCFile
-
4. 安装Presto
3. SQL语法差异
Presto、Hive、Spark的SQL语法差异:
Presto使用ANSI SQL语法和语义。
- Hive使用类似SQL的语言,称为HiveQL,它在MySQL(与ANSI SQL有很多不同)之后进行了松散的建模。
- Spark SQL支持HiveQL语法。
| 比较项 | Hive & Spark | Presto |
| —- | —- | —- |
| 字段类型 | String | Varchar |
| 列名 | date等关键字不能作为列名、别名,不支持中文别名 | 支持中文别名 |
| 日期格式 | yyyy-MM-dd HH:mm:ss | %Y-%m-%d %H:%i:%S |
| 日期函数(from_unixtime) | 只支持from_unixtime(unix_time, format) | 支持from_unixtime(unixtime)、from_unixtime(unixtime, string)、from_unixtime(unixtime, hours, minutes) |
| 日期函数(date_add) | date_add(start_date, num_days) | date_add(unit, value, timestamp) |
| JSON处理 | select get_json_object(json, ‘$.book’); | select json_extract_scalar(json, ‘$.book’);
注意:该函数返回值是一个string类型,其还有一个函数json_extract是直接返回一个json串,所以使用的时候要明确取的是什么类型的值 | | 从Map中根据Key查找Value | get_json_object(to_json(extra),’$.is_baomai’)
SparkSQL从2.4.0版本开始提供了element_at(map,key)函数,上述方式,先转成json,再用get_json_object(json, ‘$.book’)获取value | element_at(map,key) | | 列转行 | select student, score from tests lateral view explode(split(scores, ‘,’)) t as score; | select student, score from tests cross json unnest(split(scores, ‘,’) as t (score); | | 复杂分组 | select origin_state, origin_zip, sum(package_weight) from shipping group by origin_state,origin_zip with rollup; | select origin_state, origin_zip, sum(package_weight) from shipping group by rollup (origin_state, origin_zip);
— Presto语法
select origin_state, origin_zip, sum(package_weight) from shipping group by grouping sets ((origin_state, origin_zip), (origin_state), ()); | | group by和order by | group by和order by不支持结果列顺序号,必须写列名或表达式。
示例:GROUP BY substr(dt,1,7) order by substr(dt,1,7); | 同时支持列名、表达式和结果列顺序号。
示例:SELECT count(), nationkey FROM customer GROUP BY 2;
SELECT count(), nationkey FROM customer GROUP BY nationkey; |
4. 透明混算
Presto是FaceBook开源的支持多数据源混合查询的计算引擎,它是基于内存的查询,查询速度快,支持多数据源在一条SQL语句中混合查询。
■ Hive & MySQL
■ GreenPlum & MySQL
GreenPlum是基于PostgreSQL关系型数据库的,MPP架构的分布式数据库。实现见参考章节相关连接。
■ Elasticsearch & MySQL
■ SQLServer & MongoDB & MySQL
5. Presto优化
01. 数据存储优化
- 合理设置分区。
与Hive类似,Presto会根据元信息读取分区数据,合理的分区能减少Presto数据读取量,提升查询性能。
- 使用列式存储。
Presto对ORC文件读取做了特定优化,因此在Hive中创建Presto使用的表时,建议采用ORC格式存储。相对于Parquet,Presto对ORC支持更好。
- 使用压缩。
数据压缩可以减少节点间数据传输对IO带宽压力,对于即席查询需要快速解压,建议采用Snappy压缩。
- 预先排序。
对于已经排序的数据,在查询的数据过滤阶段,ORC格式支持跳过读取不必要的数据。比如对于经常需要过滤的字段可以预先排序。
INSERT INTO table nation_orc partition(p) SELECT * FROM nation SORT BY n_name;
如果需要过滤n_name字段,则性能将提升。
SELECT count(*) FROM nation_orc WHERE n_name=’AUSTRALIA’;
02. 查询SQL优化
- 只选择使用必要的字段。
由于采用列式存储,选择需要的字段可加快字段的读取、减少数据量。避免采用*读取所有字段。
[GOOD]: SELECT time,user,host FROM tbl
[BAD]: SELECT * FROM tbl
- 过滤条件必须加上分区字段。
对于有分区的表,where语句中优先使用分区字段进行过滤。acct_day是分区字段,visit_time是具体访问时间。
[GOOD]: SELECT time,user,host FROM tbl where acct_day=20171101
[BAD]: SELECT * FROM tbl where visit_time=20171101
- Group By语句优化。
合理安排Group by语句中字段顺序对性能有一定提升。将Group By语句中字段按照每个字段distinct数据多少进行降序排列。
[GOOD]: SELECT GROUP BY uid, gender
[BAD]: SELECT GROUP BY gender, uid
- Order by时使用Limit。
Order by需要扫描数据到单个worker节点进行排序,导致单个worker需要大量内存。如果是查询Top N或者Bottom N,使用limit可减少排序计算和内存压力。
[GOOD]: SELECT * FROM tbl ORDER BY time LIMIT 100
[BAD]: SELECT * FROM tbl ORDER BY time
- 使用近似聚合函数。
Presto有一些近似聚合函数,对于允许有少量误差的查询场景,使用这些函数对查询性能有大幅提升。比如使用approx_distinct() 函数比Count(distinct x)有大概2.3%的误差。
SELECT approx_distinct(user_id) FROM access
- 用regexp_like代替多个like语句。
Presto查询优化器没有对多个like语句进行优化,使用regexp_like对性能有较大提升。
-------------------------------[GOOD]-------------------------------
SELECT
...
FROM
access
WHERE
regexp_like(method, 'GET|POST|PUT|DELETE')
-------------------------------[BAD]-------------------------------
SELECT
...
FROM
access
WHERE
method LIKE '%GET%' OR
method LIKE '%POST%' OR
method LIKE '%PUT%' OR
method LIKE '%DELETE%'
- 使用Join语句时将大表放在左边。
Presto中join的默认算法是broadcast join,即将join左边的表分割到多个worker,然后将join右边的表数据整个复制一份发送到每个worker进行计算。如果右边的表数据量太大,则可能会报内存溢出错误。
[GOOD] SELECT ... FROM large_table l join small_table s on l.id = s.id
[BAD] SELECT ... FROM small_table s join large_table l on l.id = s.id
- 使用Rank函数代替row_number函数来获取Top N。
在进行一些分组排序场景时,使用rank函数性能更好。
-------------------------------[GOOD]-------------------------------
SELECT checksum(rnk)
FROM (
SELECT rank() OVER (PARTITION BY l_orderkey, l_partkey ORDER BY l_shipdate DESC) AS rnk
FROM lineitem
) t
WHERE rnk = 1
-------------------------------[BAD]-------------------------------
SELECT checksum(rnk)
FROM (
SELECT row_number() OVER (PARTITION BY l_orderkey, l_partkey ORDER BY l_shipdate DESC) AS rnk
FROM lineitem
) t
WHERE rnk = 1
03. 无缝替换Hive表
如果之前的Hive表没有用到ORC和snappy,那么怎么无缝替换而不影响线上的应用?
示例:如下一个Hive表:
CREATE TABLE bdc_dm.res_category(
channel_id1 int comment '1级渠道id',
province string COMMENT '省',
city string comment '市',
uv int comment 'uv'
)
comment 'example'
partitioned by (landing_date int COMMENT '日期:yyyymmdd')
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
建立对应的orc表:
CREATE TABLE bdc_dm.res_category_orc(
channel_id1 int comment '1级渠道id',
province string COMMENT '省',
city string comment '市',
uv int comment 'uv'
)
comment 'example'
partitioned by (landing_date int COMMENT '日期:yyyymmdd')
row format delimited fields terminated by '\t'
stored as orc
TBLPROPERTIES ("orc.compress"="SNAPPY");
先将数据灌入orc表,然后更换表名:
-- 导入ORC
insert overwrite table bdc_dm.res_category_orc partition(landing_date)
select * from bdc_dm.res_category where landing_date >= 20171001;
ALTER TABLE bdc_dm.res_category RENAME TO bdc_dm.res_category_tmp;
ALTER TABLE bdc_dm.res_category_orc RENAME TO bdc_dm.res_category;
其中rescategory_tmp是一个备份表,若线上运行一段时间后没有出现问题,则可以删除该表。
注意:ORC和Parquet都支持列式存储,但是ORC对Presto支持更好(Parquet对Impala支持更好),对于列式存储而言,存储文件为二进制的,对于经常增删字段的表,建议不要使用列式存储(修改文件元数据代价大)。对比数据仓库,dwd层建议不要使用ORC,而dm层则建议使用。_
04. 数据统计优化
很多的时候,在Presto上对数据库跨库查询,例如Mysql数据库。这个时候Presto的做法是从MySQL数据库端拉取最基本的数据,然后再去做进一步的处理,例如统计等聚合操作。
示例:
SELECT count(id) FROM table_1 WHERE condition=1;
上面的SQL语句会分为3个步骤进行:
Presto发起到Mysql数据库进行查询。
SELECT id FROM table_1 WHERE condition=1;
对结果进行count计算。
- 返回结果。
所以说,对于Presto来说,其跨库查询的瓶颈是在数据拉取这个步骤。若要提高数据统计的速度,可考虑把Mysql中相关的数据表定期转移到HDFS中,并转存为高效的列式存储格式ORC。
所以定时归档是一个很好的选择,这里还要注意,在归档的时候我们要选择一个归档字段,如果是按日归档,我们可以用日期作为这个字段的值,采用“yyyyMMdd”的形式,例如:“20180123”。
一般创建归档数据库的SQL语句如下:
CREATE TABLE IF NOT EXISTS table_1 (
id INTEGER,
........
partition_date INTEGER
)WITH ( format = 'ORC', partitioned_by = ARRAY['partition_date'] );
查看创建的库结构:
SHOW CREATE TABLE table_1; /*Only Presto*/
带有分区的表创建完成之后,每天只要更新分区字段partition_date就可以了,聪明的Presto就能将数据放置到规划好的分区了。如果要查看一个数据表的分区字段是什么,可以下面的语句:
SHOW PARTITIONS FROM table_1 /*Only Presto*/
05. 分区字段过滤优化
如果数据被规当到HDFS中,并带有分区字段。在每次查询归档表的时候,要带上分区字段作为过滤条件,这样可以加快查询速度。因为有了分区字段作为查询条件,就能帮助Presto避免全区扫描,减少Presto需要扫描的HDFS的文件数。
06. WITH语句优化
使用Presto分析统计数据时,可考虑把多次查询合并为一次查询,用Presto提供的子查询完成。这点和我们熟知的MySQL的使用不是很一样。
示例:
WITH subquery_1 AS (
SELECT a1, a2, a3
FROM Table_1
WHERE a3 between 20180101 and 20180131
), /*子查询subquery_1,注意:多个子查询需要用逗号分隔*/
subquery_2 AS (
SELECT b1, b2, b3
FROM Table_2
WHERE b3 between 20180101 and 20180131
) /*最后一个子查询后不要带逗号,不然会报错。*/
SELECT
subquery_1.a1, subquery_1.a2,
subquery_2.b1, subquery_2.b2
FROM subquery_1
JOIN subquery_2
ON subquery_1.a3 = subquery_2.b3;
07. 子查询优化
利用子查询,减少读表的次数,尤其是大数据量的表。具体做法是,将使用频繁的表作为一个子查询抽离出来,避免多次read。
08. 查询字段优化
一定要避免在查询中使用“SELECT *”这样的语句,尽量只查询需要的字段,换位思考,如果让你去查询数据是不是告诉你的越具体,工作效率越高呢。对于我们的数据库而言也是这样,任务越明确,工作效率越高。对于要查询全部字段的需求也是这样,没有偷懒的捷径,把它们都写出来。
09. join查询优化
join左边尽量放小数据量的表,而且最好是重复关联键少的表。
10. 注意事项
- 字段名引用
Presto中的字段名引用使用双引号分割,这个要区别于MySQL的反引号“`”。当然,可以不加这个双引号。
- 时间函数
timestamp进行比较的时候,需要添加timestamp关键字,而MySQL中对timestamp可以直接进行比较。
/*MySQL的写法 */
SELECT t FROM a WHERE t > '2017-01-01 00:00:00';
/*Presto中的写法*/
SELECT t FROM a WHERE t > timestamp '2017-01-01 00:00:00';
- MD5函数的使用
Presto中MD5函数传入的是binary类型,返回的也是binary类型,要对字符串进行MD5操作时,需要转换。
SELECT to_hex(md5(to_utf8('1212')));
- 覆写
Presto中不支持“insert overwrite”语法,只能先“delete”,然后“insert into”。
- 压缩
Presto中对ORC文件格式进行了针对性优化,但在impala中目前不支持ORC格式的表,Hive中支持ORC格式的表,所以想用列式存储的时候可以优先考虑ORC格式。Presto目前支持parquet格式,支持查询,但不支持insert。
参考
SegmentFault(思否):Presto上使用SQL遇到的坑
https://segmentfault.com/a/1190000013120454?utm_source=tag-newest
CSDN:SpringBoot整合Presto实现多数据源操作数据(Hive & MySQL)
https://blog.csdn.net/qq_37933018/article/details/104663400
CSDN:Presto实现多数据源混合查询(Greenplum & MySQL)
https://blog.csdn.net/Sunhighlight/article/details/89043885
程序员大本营:搭建Presto集群进行跨多类型数据库联表查询(SQLServer & MySQL & MongoDB & Redis)
https://www.pianshen.com/article/60041233028
博客园:Presto实现多数据源混合查询(Elasticsearch & MySQL)
https://www.cnblogs.com/yzlsthl/p/11805102.html