文档冲突

当我们使用index API更新文档,可以一次性读取原始文档,做我们的修改,然后重新索引整个文档。最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失。

很多时候这是没有问题的。也许我们的主数据存储是一个关系型数据库,我们只是将数据复制到Elasticsearch中并使其可被搜索。也许两个人同时更改相同的文档的几率很小。或者对于我们的业务来说偶尔丢失更改并不是很严重的问题。

但有时丢失了一个变更就是非常严重的。试想我们使用Elasticsearch 存储我们网上商城商品库存的数量,每次我们卖一个商品的时候,我们在 Elasticsearch 中将库存数量减少。有一天,管理层决定做一次促销。突然地,我们一秒要卖好几个商品。假设有两个web程序并行运行,每一个都同时处理所有商品的销售。

image.png

web_1 对stock_count所做的更改已经丢失,因为 web_2不知道它的 stock_count的拷贝已经过期。结果我们会认为有超过商品的实际数量的库存,因为卖给顾客的库存商品并不存在,我们将让他们非常失望。

变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更。在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:

悲观并发控制:这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。
乐观并发控制:Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。

乐观并发控制

Elasticsearch是分布式的。当文档创建、更新或删除时,新版本的文档必须复制到集群中的其他节点。Elasticsearch也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许顺序是乱的。Elasticsearch需要一种方法确保文档的旧版本不会覆盖新的版本。

当我们之前讨论index , GET和DELETE请求时,我们指出每个文档都有一个_version(版本号),当文档被修改时版本号递增。Elasticsearch使用这个version号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

我们可以利用version号来确保应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version号来达到这个目的。如果该版本不是当前版本号,我们的请求将会失败。

老的版本es使用version,但是新版本不支持了,会报下面的错误,提示我们用if_seq _no和if _primary_term

  1. #PUT http://127.0.0.1:9200/shopping/_create/1001
  1. {
  2. "_index": "shopping",
  3. "_type": "_doc",
  4. "_id": "1001",
  5. "_version": 1,
  6. "result": "created",
  7. "_shards": {
  8. "total": 2,
  9. "successful": 1,
  10. "failed": 0
  11. },
  12. "_seq_no": 10,
  13. "_primary_term": 15
  14. }
  1. #POST http://127.0.0.1:9200/shopping/_update/1001
  2. {
  3. "doc":{
  4. "title":"华为手机"
  5. }
  6. }
  1. {
  2. "_index": "shopping",
  3. "_type": "_doc",
  4. "_id": "1001",
  5. "_version": 2,
  6. "result": "updated",
  7. "_shards": {
  8. "total": 2,
  9. "successful": 1,
  10. "failed": 0
  11. },
  12. "_seq_no": 11,
  13. "_primary_term": 15
  14. }
  1. #POST http://127.0.0.1:9200/shopping/_update/1001?version=1
  2. {
  3. "doc":{
  4. "title":"华为手机2"
  5. }
  6. }
  1. {
  2. "error": {
  3. "root_cause": [
  4. {
  5. "type": "action_request_validation_exception",
  6. "reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
  7. }
  8. ],
  9. "type": "action_request_validation_exception",
  10. "reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
  11. },
  12. "status": 400
  13. }
  1. #POST http://127.0.0.1:9200/shopping/_update/1001?if_seq_no=11&if_primary_term=15
  2. {
  3. "doc":{
  4. "title":"华为手机2"
  5. }
  6. }
  1. {
  2. "_index": "shopping",
  3. "_type": "_doc",
  4. "_id": "1001",
  5. "_version": 3,
  6. "result": "updated",
  7. "_shards": {
  8. "total": 2,
  9. "successful": 1,
  10. "failed": 0
  11. },
  12. "_seq_no": 12,
  13. "_primary_term": 16
  14. }