api的简单介绍
UpdateByQueryd的作用类似于数据库的update set xx where a=xxx 语句,但是于事务数据库不同的是,ElasticSearch的UpdateByQueryd并不支持事务,UpdateByQueryd的执行可能会被query 或者 update的报错打断。也就是说我们通过UpdateByQueryd的更新可能打断,造成有一部分数据更新成功,一部分没有更新的情况。
简单使用
dsl
POST twitter/_update_by_query
{
"query": {
"match": {
"city.keyword": "北京"
}
},
"script": {
"source": """
if(ctx._source.containsKey("content")) {
ctx._source.content_length = ctx._source.content.length();
} else {
ctx._source.content_length = 0;
}
ctx._source['uid'] += params['one']";
"""
"params": {
"one": 1
}
}
}UpdateByQueryd
java代码
UpdateByQueryRequest updateRequest = new UpdateByQueryRequest("post_check_2022");
HashMap<String, Object> params = Maps.newHashMap();
check.put("checkTime", DateUtil.currentSeconds());
params.put("check", check);
params.put("status", 2);
updateRequest.setDocTypes("_doc");
updateRequest.setQuery(new TermQueryBuilder("postUuid", "dlm8oeyvpkop7"));
updateRequest.setScript((new Script(ScriptType.INLINE, "painless", "if(ctx._source.checks == null){ctx._source.checks =[params.check]} else {ctx._source.checks.add(params.check)} ctx._source.status =(params.status);ctx._source.lastCheckTime =(params.check.checkTime) ", params)));
restHighLevelClient.updateByQuery(updateRequest, RequestOptions.DEFAULT);
UpdateByQuery的常见问题
版本冲突
版本冲突是UpdateByQuery最常出现的问题,update_by_query在索引启动时获取索引的快照,这意味着如果文档在生成快照的时间和处理索引请求之间发生更改,则会出现版本冲突。1s内多次修改同一个document就会发生,你通过设置version_conflicts=false(会忽略错误),但并未解决问题,还有2种方式解决该问题。
- retries,一直重试,UpdateByQueryRequestBuilder中默认为11次,可见对es是有一定的压力的,我们应该把这个值改小点。
refresh=true,一直去刷盘,当然可以解决准实时的问题,但磁盘消耗是很多的。
脚本超限
报错:org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=circuit_breaking_exception, reason=[script] Too many dynamic script compilations within, max: [75/5m]; please use indexed, or scripts with parameters instead; this limit can be changed by the [script.max_compilations_rate] setting]
解决方法,需要修改es服务端配置
PUT _cluster/settings
{
"transient" : {
"script.max_compilations_rate" : "100/1m"
}
}
执行超时
java.io.IOException: listener timeout,默认的超时时间是30s
代码解决(不建议)
restClient = RestClient.builder(
new HttpHost(GSConstants.P_ES_HOST, GSConstants.P_ES_PORT, "http"))
.setMaxRetryTimeoutMillis(TIMEOUT)
.setHttpClientConfigCallback(new HttpClientConfigCallback(){
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(5*60*1000)//超时时间5分钟
.setSocketTimeout(5*60*1000)//这就是Socket超时时间设置
.setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
return httpClientBuilder;
}
dsl 加上conflicts=proceed&wait_for_completion=false参数 ```json POST nickname_review_2022/_update_by_query?conflicts=proceed&wait_for_completion=false { “script”: { “source”: “ctx._source.qualityTestingTime = 0; ctx._source.taskId = 0;”, “lang”: “painless” }, “query”: { “term”: {
"qualityTestingStatus": {
"value": "0"
}
} } }
```