6.4 分布式搜索引擎

在 Web 一章中,我们提到 MySQL 很脆弱。数据库系统本身要保证实时和强一致性,所以其功能设计上都是为了满足这种一致性需求。比如 write ahead log 的设计,基于 B + 树实现的索引和数据组织,以及基于 MVCC 实现的事务等等。

关系型数据库一般被用于实现 OLTP 系统,所谓 OLTP,援引 wikipedia:

在线交易处理(OLTP, Online transaction processing)是指透过信息系统、电脑网络及数据库,以线上交易的方式处理一般即时性的作业数据,和更早期传统数据库系统大量批量的作业方式并不相同。OLTP 通常被运用于自动化的数据处理工作,如订单输入、金融业务… 等反复性的日常性交易活动。和其相对的是属于决策分析层次的联机分析处理(OLAP)。

在互联网的业务场景中,也有一些实时性要求不高 (可以接受多秒的延迟),但是查询复杂性却很高的场景。举个例子,在电商的 WMS 系统中,或者在大多数业务场景丰富的 CRM 或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的 WMS 中对一件货物的描述,可能有下面这些字段:

仓库 id,入库时间,库位分区 id,储存货架 id,入库操作员 id,出库操作员 id,库存数量,过期时间,SKU 类型,产品品牌,产品分类,内件数量

除了上述信息,如果商品在仓库内有流转。可能还有有关联的流程 id,当前的流转状态等等。

想像一下,如果我们所经营的是一个大型电商,每天有千万级别的订单,那么在这个数据库中查询和建立合适的索引都是一件非常难的事情。

在 CRM 或客服类系统中,常常有根据关键字进行搜索的需求,大型互联网公司每天会接收数以万计的用户投诉。而考虑到事件溯源,用户的投诉至少要存 2~3 年。又是千万级甚至上亿的数据。根据关键字进行一次 like 查询,可能整个 MySQL 就直接挂掉了。

这时候我们就需要搜索引擎来救场了。

搜索引擎

Elasticsearch 是开源分布式搜索引擎的霸主,其依赖于 Lucene 实现,在部署和运维方面做了很多优化。当今搭建一个分布式搜索引擎比起 Sphinx 的时代已经是容易很多很多了。只要简单配置客户端 IP 和端口就可以了。

倒排列表

虽然 es 是针对搜索场景来定制的,但如前文所言,实际应用中常常用 es 来作为 database 来使用,就是因为倒排列表的特性。可以用比较朴素的观点来理解倒排索引:

posting-list

图 6-10 倒排列表

对 Elasticsearch 中的数据进行查询时,本质就是求多个排好序的序列求交集。非数值类型字段涉及到分词问题,大多数内部使用场景下,我们可以直接使用默认的 bi-gram 分词。什么是 bi-gram 分词呢:

即将所有 TiT(i+1) 组成一个词(在 Elasticsearch 中叫 term),然后再编排其倒排列表,这样我们的倒排列表大概就是这样的:

terms

图 6-11 “今天天气很好” 的分词结果

当用户搜索’天气很好’时,其实就是求:天气、气很、很好三组倒排列表的交集,但这里的相等判断逻辑有些特殊,用伪代码表示一下:

  1. func equal() {
  2. if postEntry.docID of '天气' == postEntry.docID of '气很' &&
  3. postEntry.offset + 1 of '天气' == postEntry.offset of '气很' {
  4. return true
  5. }
  6. if postEntry.docID of '气很' == postEntry.docID of '很好' &&
  7. postEntry.offset + 1 of '气很' == postEntry.offset of '很好' {
  8. return true
  9. }
  10. if postEntry.docID of '天气' == postEntry.docID of '很好' &&
  11. postEntry.offset + 2 of '天气' == postEntry.offset of '很好' {
  12. return true
  13. }
  14. return false
  15. }

多个有序列表求交集的时间复杂度是:O(N*M),N 为给定列表当中元素数最小的集合,M 为给定列表的个数。

在整个算法中起决定作用的一是最短的倒排列表的长度,其次是词数总和,一般词数不会很大(想像一下,你会在搜索引擎里输入几百字来搜索么?),所以起决定性作用的,一般是所有倒排列表中,最短的那一个的长度。

因此,文档总数很多的情况下,搜索词的倒排列表最短的那一个不长时,搜索速度也是很快的。如果用关系型数据库,那就需要按照索引(如果有的话)来慢慢扫描了。

查询 DSL

es 定义了一套查询 DSL,当我们把 es 当数据库使用时,需要用到其 bool 查询。举个例子:

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [
  5. {
  6. "match": {
  7. "field_1": {
  8. "query": "1",
  9. "type": "phrase"
  10. }
  11. }
  12. },
  13. {
  14. "match": {
  15. "field_2": {
  16. "query": "2",
  17. "type": "phrase"
  18. }
  19. }
  20. },
  21. {
  22. "match": {
  23. "field_3": {
  24. "query": "3",
  25. "type": "phrase"
  26. }
  27. }
  28. },
  29. {
  30. "match": {
  31. "field_4": {
  32. "query": "4",
  33. "type": "phrase"
  34. }
  35. }
  36. }
  37. ]
  38. }
  39. },
  40. "from": 0,
  41. "size": 1
  42. }

看起来比较麻烦,但表达的意思很简单:

  1. if field_1 == 1 && field_2 == 2 && field_3 == 3 && field_4 == 4 {
  2. return true
  3. }

用 bool should query 可以表示 or 的逻辑:

  1. {
  2. "query": {
  3. "bool": {
  4. "should": [
  5. {
  6. "match": {
  7. "field_1": {
  8. "query": "1",
  9. "type": "phrase"
  10. }
  11. }
  12. },
  13. {
  14. "match": {
  15. "field_2": {
  16. "query": "3",
  17. "type": "phrase"
  18. }
  19. }
  20. }
  21. ]
  22. }
  23. },
  24. "from": 0,
  25. "size": 1
  26. }

这里表示的是类似:

  1. if field_1 == 1 || field_2 == 2 {
  2. return true
  3. }

这些 Go 代码里 if 后面跟着的表达式在编程语言中有专有名词来表达 Boolean Expression

  1. 4 > 1
  2. 5 == 2
  3. 3 <i && x> 10

es 的 Bool Query 方案,就是用 json 来表达了这种程序语言中的 Boolean Expression,为什么可以这么做呢?因为 json 本身是可以表达树形结构的,我们的程序代码在被编译器 parse 之后,也会变成 AST,而 AST 抽象语法树,顾名思义,就是树形结构。理论上 json 能够完备地表达一段程序代码被 parse 之后的结果。这里的 Boolean Expression 被编译器 Parse 之后也会生成差不多的树形结构,而且只是整个编译器实现的一个很小的子集。

基于 client SDK 做开发

初始化:

  1. // 选用 elastic 版本时
  2. // 注意与自己使用的 elasticsearch 要对应
  3. import (
  4. elastic "gopkg.in/olivere/elastic.v3"
  5. )
  6. var esClient *elastic.Client
  7. func initElasticsearchClient(host string, port string) {
  8. var err error
  9. esClient, err = elastic.NewClient(
  10. elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)),
  11. elastic.SetMaxRetries(3),
  12. )
  13. if err != nil {
  14. // log error
  15. }
  16. }

插入:

  1. func insertDocument(db string, table string, obj map[string]interface{}) {
  2. id := obj["id"]
  3. var indexName, typeName string
  4. // 数据库中的 database/table 概念,可以简单映射到 es 的 index 和 type
  5. // 不过需要注意,因为 es 中的 _type 本质上只是 document 的一个字段
  6. // 所以单个 index 内容过多会导致性能问题
  7. // 在新版本中 type 已经废弃
  8. // 为了让不同表的数据落入不同的 index,这里我们用 table+name 作为 index 的名字
  9. indexName = fmt.Sprintf("%v_%v", db, table)
  10. typeName = table
  11. // 正常情况
  12. res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do()
  13. if err != nil {
  14. // handle error
  15. } else {
  16. // insert success
  17. }
  18. }

获取:

  1. func query(indexName string, typeName string) (*elastic.SearchResult, error) {
  2. // 通过 bool must 和 bool should 添加 bool 查询条件
  3. q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1),
  4. elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m")))
  5. q = q.Should(
  6. elastic.NewMatchPhraseQuery("name", "alex"),
  7. elastic.NewMatchPhraseQuery("name", "xargin"),
  8. )
  9. searchService := esClient.Search(indexName).Type(typeName)
  10. res, err := searchService.Query(q).Do()
  11. if err != nil {
  12. // log error
  13. return nil, err
  14. }
  15. return res, nil
  16. }

删除:

  1. func deleteDocument(
  2. indexName string, typeName string, obj map[string]interface{},
  3. ) {
  4. id := obj["id"]
  5. res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do()
  6. if err != nil {
  7. // handle error
  8. } else {
  9. // delete success
  10. }
  11. }

因为 Lucene 的性质,本质上搜索引擎内的数据是不可变的,所以如果要对文档进行更新,Lucene 内部是按照 id 进行完全覆盖 (本质是取同一 id 最新的 segment 中的数据) 的操作,所以与插入的情况是一样的。

使用 es 作为数据库使用时,需要注意,因为 es 有索引合并的操作,所以数据插入到 es 中到可以查询的到需要一段时间(由 es 的 refresh_interval 决定)。所以千万不要把 es 当成强一致的关系型数据库来使用。

将 sql 转换为 DSL

比如我们有一段 bool 表达式,user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1),写成 SQL 是如下形式:

  1. select * from xxx where user_id = 1 and (
  2. product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1
  3. )

写成 es 的 DSL 是如下形式:

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [
  5. {
  6. "match": {
  7. "user_id": {
  8. "query": "1",
  9. "type": "phrase"
  10. }
  11. }
  12. },
  13. {
  14. "match": {
  15. "product_id": {
  16. "query": "1",
  17. "type": "phrase"
  18. }
  19. }
  20. },
  21. {
  22. "bool": {
  23. "should": [
  24. {
  25. "match": {
  26. "star_num": {
  27. "query": "4",
  28. "type": "phrase"
  29. }
  30. }
  31. },
  32. {
  33. "match": {
  34. "star_num": {
  35. "query": "5",
  36. "type": "phrase"
  37. }
  38. }
  39. }
  40. ]
  41. }
  42. },
  43. {
  44. "match": {
  45. "banned": {
  46. "query": "1",
  47. "type": "phrase"
  48. }
  49. }
  50. }
  51. ]
  52. }
  53. },
  54. "from": 0,
  55. "size": 1
  56. }

es 的 DSL 虽然很好理解,但是手写起来非常费劲。前面提供了基于 SDK 的方式来写,但也不足够灵活。

SQL 的 where 部分就是 boolean expression。我们之前提到过,这种 bool 表达式在被解析之后,和 es 的 DSL 的结构长得差不多,我们能不能直接通过这种 “差不多” 的猜测来直接帮我们把 SQL 转换成 DSL 呢?

当然可以,我们把 SQL 的 where 被 Parse 之后的结构和 es 的 DSL 的结构做个对比:

ast

图 6-12 AST 和 DSL 之间的对应关系

既然结构上完全一致,逻辑上我们就可以相互转换。我们以广度优先对 AST 树进行遍历,然后将二元表达式转换成 json 字符串,再拼装起来就可以了,限于篇幅,本文中就不给出示例了,读者朋友可以查看:

github.com/cch123/elasticsql

来学习具体的实现。

异构数据同步

在实际应用中,我们很少直接向搜索引擎中写入数据。更为常见的方式是,将 MySQL 或其它关系型数据中的数据同步到搜索引擎中。而搜索引擎的使用方只能对数据进行查询,无法进行修改和删除。

常见的同步方案有两种:

通过时间戳进行增量数据同步

sync to es

图 6-13 基于时间戳的数据同步

这种同步方式与业务强绑定,例如 WMS 系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从 MySQL 的出库单表中,把最近十分钟创建的所有出库单取出,批量存入 es 中,取数据的操作需要执行的逻辑可以表达为下面的 SQL:

  1. select * from wms_orders where update_time >= date_sub(now(), interval 10 minute);

当然,考虑到边界情况,我们可以让这个时间段的数据与前一次的有一些重叠:

  1. select * from wms_orders where update_time >= date_sub(
  2. now(), interval 11 minute
  3. );

取最近 11 分钟有变动的数据覆盖更新到 es 中。这种方案的缺点显而易见,我们必须要求业务数据严格遵守一定的规范。比如这里的,必须要有 update_time 字段,并且每次创建和更新都要保证该字段有正确的时间值。否则我们的同步逻辑就会丢失数据。

通过 binlog 进行数据同步

binlog-sync

图 6-13 基于 binlog 的数据同步

业界使用较多的是阿里开源的 Canal,来进行 binlog 解析与同步。canal 会伪装成 MySQL 的从库,然后解析好行格式的 binlog,再以更容易解析的格式(例如 json)发送到消息队列。

由下游的 Kafka 消费者负责把上游数据表的自增主键作为 es 的文档的 id 进行写入,这样可以保证每次接收到 binlog 时,对应 id 的数据都被覆盖更新为最新。MySQL 的 Row 格式的 binlog 会将每条记录的所有字段都提供给下游,所以在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按 id 进行覆盖即可。

这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键 id 来保证我们进入 es 的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。当然,你也可以为每一张需要的表去定制消费者的逻辑,这就不是通用系统讨论的范畴了。