并发冲突
场景:现有张三和李四两个用户,同时在系统(多线程)中下单,购买某一件商品。
冲突流程图
流程说明:
- 张三和李四都同时购买了一个商品
- 张三购买商品后,将库存改为100-1=99,刷新到Es中
- 同时,李四内存中获取到的库存也是100,然后也做了减库存的操作,100-1=99,也刷新到Es中
- 这时,实际上是购买了两件商品,但是却只减了一次库存。
结果:库存 = 99是错误的,实际上,库存应该是100-1-1 = 98才对。
普通的Es操作流程
- 先获取到Es中文档数据,也就是商品信息。然后将信息加载到内存中。
- 发生购买后,直接基于内存中的数据进行操作和计算。
- 将计算后的结果回写到Es。
当并发操作的Es的线程越多,或者并发请求越多,或者是读取一份数据后占用的时间越长,这种并发冲突就越明显,如果不做控制,那后面的用户拿到的就是旧数据。
悲观锁
悲观锁,常见于关系型数据库,MySQL
悲观锁图
场景:一般都是数据库级别的行锁或者是表锁
乐观锁
乐观锁图
需要注意的是:乐观锁模式中,数据是不会被锁住的,任意线程都可以操作。在同一时刻,只有一个线程能修改库存成功。
乐观锁与悲观锁的比较
- 悲观锁
- 优点:使用起来比较方便,对程序是透明的(这类锁一般是数据库锁),不需要程序做额外的操作。
- 缺点:并发能力很低,同一时间只有一个线程能操作资源。
乐观锁
每次创建索引的时候,version都是1。 ```json PUT /testindex/_doc/1 { “test”:”test” }
//创建结果 { “_index” : “testindex”, “_type” : “_doc”, “_id” : “1”, “_version” : 1, “result” : “created”, “_shards” : { “total” : 2, “successful” : 2, “failed” : 0 }, “_seq_no” : 0, “_primary_term” : 1 }
- 修改索引时,version会加1
```json
PUT /testindex/_doc/1
{
"test":"test1"
}
//修改结果
{
"_index" : "testindex",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1
}
- 删除时,version也会加1 ```json DELETE /testindex/_doc/1
{ “_index” : “testindex”, “_type” : “_doc”, “_id” : “1”, “_version” : 2, “result” : “deleted”, “_shards” : { “total” : 2, “successful” : 2, “failed” : 0 }, “_seq_no” : 1, “_primary_term” : 1 }
先删除一条document,然后再创建一条同样的document,我们会发现在delete version上加1。<br />从侧面可以证明:删除一个document之后,Es并不是马上物理删除,document的一些版本号信息还是保留着的。
**Es的乐观锁原理**<br />知识点:es的后台,很多的类似于副本分片同步请求,都是多线程异步的。也就是说,多个请求之间是乱序的,可能存在先请求过来的线程后到,后请求过来的线程先到的情况。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/2874668/1607233450539-894edc40-c679-4ba6-9272-37101d3fe4f5.png#align=left&display=inline&height=269&margin=%5Bobject%20Object%5D&name=image.png&originHeight=269&originWidth=451&size=19275&status=done&style=none&width=451)<br />正常流程:
1. 初始写入,field=test1
1. 修改线程1先到,写入,field=test2
1. 修改线程2后到,写入,field=test3
值变动:test1->test2->test3<br />如果,Es没有并发控制。那么流程可能是这样的:
1. 初始写入,field=test1
1. 修改线程1先到P0分片
1. 修改线程2后到P0分片
1. 修改线程2先同步数据到R0副本,test1->test3
1. 修改线程2先同步数据到R0副本,test3->test2
值变动:test1->test3->test2<br />此时,es的数据就错乱了。
加入_version乐观锁控制后。。。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/2874668/1607237796851-b3c41c21-dd98-4472-a223-1802988206ea.png#align=left&display=inline&height=295&margin=%5Bobject%20Object%5D&name=image.png&originHeight=295&originWidth=440&size=27615&status=done&style=none&width=440)<br />`Es知识点`:Es内部的多线程异步并发修改时,是基于自己的_version版本号进行乐观锁并发控制的。<br />针对于上述错误的流程,加了乐观锁后,会变成这样:
- 线程2,修改field=test3后,修改version=2
- 线程1,尝试修改field的值,但是对比version,发现version(version=1)不一致,修改失败。
值变动:test1->test3-/>test2(修改失败)
顺序情况下的乐观锁:<br />如果是线程1,先修改,然后同步R0,这个时候,test1->test2,version=2<br />然后线程2再来修改,同步R0,线程2是能获取到正确的版本号的,因此也能修改成功。
<a name="KBWVP"></a>
#### 乐观锁测试
> 需要注意的是:在es6.7版本前,直接用version=xxx即可,6.7版本后,用if_seq_no和if_primary_term控制。
> if_seq_no字段跟version一样,每次更改,都会加1
> if_primary_term表示文档被分配的地址
插入初始数据,version=1
```json
PUT /testmy/_doc/1
{
"filed":"testmy"
}
{
"_index" : "testmy",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1
}
客户端1,通过if_seq_no=1去更改文档
//6.7版本前
PUT /testmy/_doc/1?version=1
{
"filed":"testmy client1"
}
//6.7版本后
PUT /testmy/_doc/1?if_seq_no=0&if_primary_term=1
{
"filed":"testmy client1"
}
{
"_index" : "testmy",
"_type" : "_doc",
"_id" : "1",
"_version" : 3,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1
}
客户端2,也通过if_seq_no=1去更改
PUT /testmy/_doc/1?if_seq_no=1&if_primary_term=1
{
"field":"testmy client1"
}
//修改失败,报错
{
"error" : {
"root_cause" : [
{
"type" : "version_conflict_engine_exception",
"reason" : "[1]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [4] and primary term [1]",
"index_uuid" : "5aL9f1zXR02B1fAzo0cOWA",
"shard" : "0",
"index" : "testmy"
}
],
"type" : "version_conflict_engine_exception",
"reason" : "[1]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [4] and primary term [1]",
"index_uuid" : "5aL9f1zXR02B1fAzo0cOWA",
"shard" : "0",
"index" : "testmy"
},
"status" : 409
}
external version
- version的external是什么?
external是Es提供的一个feature。如果我们的数据存在数据库中,需要使用数据库中的版本号替代Es的版本号来做乐观锁,那么就可以指定version_type=external。
有无external的区别
- _version:只有当提供的version与Es的version一致时才能修改成功。
- external:只有当提供的version比Es的version大时才能修改成功。
使用external
- 创建一条document ```json PUT /my_test/_doc/1 { “filed”:”version external test” }
{ “_index” : “my_test”, “_type” : “_doc”, “_id” : “1”, “_version” : 1, “result” : “created”, “_shards” : { “total” : 2, “successful” : 1, “failed” : 0 }, “_seq_no” : 0, “_primary_term” : 1 }
2. 客户端1发起一次修改文档的操作(使用了external后,传入的version要比现有文档的version要大)
```json
PUT /my_test/_doc/1?version=2&version_type=external
{
"filed":"version external test client1"
}
{
"_index" : "my_test",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 9,
"_primary_term" : 1,
"found" : true,
"_source" : {
"filed" : "version external test client1"
}
}
- 客户端2发起一次修改文档的操作 ```json PUT /my_test/_doc/1?version=2&version_type=external { “filed”:”version external test client2” }
{ “error” : { “root_cause” : [ { “type” : “version_conflict_engine_exception”, “reason” : “[1]: version conflict, current version [2] is higher or equal to the one provided [2]”, “index_uuid” : “8L1U_tVPRY-K7MPBTLdm6w”, “shard” : “0”, “index” : “my_test” } ], “type” : “version_conflict_engine_exception”, “reason” : “[1]: version conflict, current version [2] is higher or equal to the one provided [2]”, “index_uuid” : “8L1U_tVPRY-K7MPBTLdm6w”, “shard” : “0”, “index” : “my_test” }, “status” : 409 }
上述报错信息表明,加了external后,如果想要更改文档,传入的version必须要比现有的version大才行。
<a name="eAPH7"></a>
#### Partial Update
partial update也叫做部分更新,可以根据用户的需求,更新指定部分字段。
创建&全量替换文档的语法是一样的<br />语法:`PUT /index/type/id`
**全量替换的流程**
1. 应用程序发送一个get请求到,获取到document,展示到前台,供用户查看和修改。
1. 用户在前台页面修改数据,发送到后台。
1. 后台会将用户修改的数据在内存中执行,然后封装好修改后的全量数据。
1. 然后发送put请求到es,进行全量替换。
1. es将旧的document标记为deleted,然后冲洗创建一个新的document
**partial update的语法**
```json
POST /index/_update/id
{
"doc":{
"field":"value" //部分字段
}
}
如果只想修改部分字段,那么使用partial update就很方便了。
图解:全量替换
查询-更新这整个操作可能耗时较长,修改完后写回到es,可能es里面的数据早就被人修改了,所以并发冲突比较多。
图解:partial update
partial update 的优势
- 所有的查询,修改和回写都在同一个shard中进行,避免了过多的网络开销(如果是:全量替换,则需要获取document【第一次】,修改document后,更新【第二次】),提升了性能。
- 介绍了查询和修改中的时间间隔,可有效减少并发冲突。
图解:es的内部操作
在es的内部,全量替换与partial update操作流程基本一致。
全量替换是先将所有字段更新,然后创建一个新的document。
partial update是将部分字段更新,然后创建一个新的document。
Partial Update 并发控制
并发图
流程图解:
- user1和user2都对同一个document发起了一个partial update的请求。
- user1先操作,读取到document的基本信息和version,然后更新指定字段,并创建document,原有版本号+1
- user2拿到version=1版本号和数据去更新,却发现version不一致,更新失败
冲突解决方案
- retry_on_conflict 指定发生冲突后,重试的次数。
策略如下:
- 发生冲突后,再次获取document最新的version
- 基于最新的version去更新,如果成功就ok
- 如果失败,再重复1,和2步骤。如果达到次数都没有成功,按就是标记为失败了。
- version
指定更新的version,如果version满足,则更新成功,否则失败。
Es的script支持
创建文档
PUT /human/_doc/2
{
"num":0,
"tags":[]
}
- 使用内置脚本修改文档 ```json POST /human/_update/2 { “script”: “ctx._source.num+=1” }
//修改后的数据 { “_index” : “human”, “_type” : “_doc”, “_id” : “2”, “_version” : 3, “result” : “updated”, “_shards” : { “total” : 2, “successful” : 2, “failed” : 0 }, “_seq_no” : 20, “_primary_term” : 2 }
2. ~~**外部脚本修改文档(es6.0版本后删除了)**~~
> <a name="0cabaa35"></a>
### File scripts removed
> File scripts have been removed. Instead, use stored scripts. The associated setting `path.scripts` has also been removed.
> [https://www.elastic.co/guide/en/elasticsearch/reference/6.0/breaking_60_scripting_changes.html#_file_scripts_removed](https://www.elastic.co/guide/en/elasticsearch/reference/6.0/breaking_60_scripting_changes.html#_file_scripts_removed)
3. **使用存储在es中的脚本**
编写脚本如下:
```json
POST _scripts/testscript
{
"script":{
"lang": "painless",
"source": "ctx._source.tags.add(params.new_tags)"
}
}
在6.x版本中,es删除了lang=groovy的选项。lang只能指定为painless
Painless was introduced in 5.0, where groovy existed as an additional scripting language. From 6.0 onwards groovy was removed as a scripting language as painless is more secure and much faster. This means in order to run watches on 6.x you need to replace groovy with painless scripts. Hope this helps! —Alex https://discuss.elastic.co/t/error-in-executing-groovy-script-in-watcher-elasticsearch-6-3-1/140924
使用脚本更新指定字段
POST /human/_update/2
{
"script": {
"id": "testscript",
"params": {
"new_tags":"aatags"
}
}
}
id
脚本的唯一标识params
:脚本中的参数
- 使用脚本删除 ```json //编写脚本 POST _scripts/delete_script { “script”:{ “lang”: “painless”, “source”: “ctx.op=ctx._source.num == params.count ? ‘delete’ : ‘none’” } }
//执行脚本 POST /human/_update/2 { “script”: { “id”: “delete_script”, “params”: { “count”:2 } } }
//执行结果 { “_index” : “human”, “_type” : “_doc”, “_id” : “2”, “_version” : 9, “result” : “deleted”, “_shards” : { “total” : 2, “successful” : 2, “failed” : 0 }, “_seq_no” : 26, “_primary_term” : 2 }
5. **使用脚本更新&添加(upsert)**
```json
//使用upsert关键字,如果文档不存在,则创建文档,否则更新。
POST /human/_doc/2
{
"script":"ctx._source.num+=1",
"upsert":{
"num":0,
"tags":[]
}
}
//创建文档
{
"_index" : "human",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 27,
"_primary_term" : 2
}
//更新文档
{
"_index" : "human",
"_type" : "_doc",
"_id" : "2",
"_version" : 3,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 29,
"_primary_term" : 2
}