一、简介

1.先看ES的架构图

ES系列十五、ES常用Java Client API - 图1

二、ES支持的客户端连接方式

1.REST API

  http请求,例如,浏览器请求get方法;利用Postman等工具发起REST请求;java 发起httpClient请求等。

2.Transport 连接

  socket连接,用官方提供的TransPort客户端,底层是netty。
注意:ES的发展规划中在7.0版本开始将废弃 TransportClient,8.0版本中将完全移除 TransportClient,取而代之的是High Level REST Client。

3. ES提供了多种编程语言客户端

  ES系列十五、ES常用Java Client API - 图2
官网可以了解详情:
https://www.elastic.co/guide/en/elasticsearch/client/index.html

三、Java REST Client介绍

1. ES提供了两个JAVA REST client 版本

Java Low Level REST Client: 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串。兼容所有ES版本
Java High Level REST Client: 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api。使用的版本需要保持和ES服务端的版本一致,否则会有版本问题。
官方推荐使用高级版,低级版需要自己准确记住api

2. Java Low Level REST Client 说明

特点,maven 引入、使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html
API doc :https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.4/index.html.

3. Java High Level REST Client 说明

从6.0.0开始加入的,目的是以java面向对象的方式来进行请求、响应处理。
每个API 支持 同步/异步 两种方式,同步方法直接返回一个结果对象。异步的方法以async为后缀,通过listener参数来通知结果
高级java REST 客户端依赖Elasticsearch core project
兼容性说明:
依赖 java1.8 和 Elasticsearch core project
请使用与服务端ES版本一致的客户端版本

4. Java High Level REST Client maven 集成

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  4. <version>6.2.4</version>
  5. </dependency>

5.将log4j2.xml编译到classes路径下

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration status="OFF">
  3. <appenders>
  4. <Console name="Console" target="SYSTEM_OUT">
  5. <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
  6. </Console>
  7. </appenders>
  8. <loggers>
  9. <root level="info">
  10. <appender-ref ref="Console"/>
  11. </root>
  12. </loggers>
  13. </configuration>

6. Java High Level REST Client 初始化

RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(“localhost”, 9200, “http”),
new HttpHost(“localhost”, 9201, “http”)));
给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求
Client 不再使用了,记得关闭它:
client.close();
API及用法示例,请参考:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.3/java-rest-high-create-index.html

四、Java High Level REST Client 使用示例

准备(需要配置log4j2的maven配置和log的配置文件,否则运行demo控制台会报错):
编写示例之前首先在maven工程里面引入和ES服务端版本一样的Java客户端

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  4. <version>6.3.1</version>
  5. </dependency>
  6. <!--日志-->
  7. <dependency>
  8. <groupId>org.apache.logging.log4j</groupId>
  9. <artifactId>log4j-api</artifactId>
  10. <version>2.11.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.logging.log4j</groupId>
  14. <artifactId>log4j-core</artifactId>
  15. <version>2.11.1</version>
  16. </dependency>

给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求:

  1. package com.es.demo;
  2. import org.apache.http.HttpHost;
  3. import org.elasticsearch.client.RestClient;
  4. import org.elasticsearch.client.RestHighLevelClient;
  5. public class InitClient {
  6. public static RestHighLevelClient getClient(){
  7. RestHighLevelClient client = new RestHighLevelClient(
  8. RestClient.builder(
  9. new HttpHost("localhost", 9200, "http")));
  10. return client;
  11. }
  12. }

1.创建索引

  1. package com.es.demo;
  2. import org.elasticsearch.action.ActionListener;
  3. import org.elasticsearch.action.admin.indices.alias.Alias;
  4. import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
  5. import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
  6. import org.elasticsearch.action.index.IndexResponse;
  7. import org.elasticsearch.client.RestHighLevelClient;
  8. import org.elasticsearch.common.settings.Settings;
  9. import org.elasticsearch.common.xcontent.XContentType;
  10. public class CreateIndexDemo {
  11. public static void main(String ags[]){
  12. try(RestHighLevelClient client = InitClient.getClient()){
  13. // 1.创建索引名
  14. CreateIndexRequest request = new CreateIndexRequest("book8");
  15. // 2.索引setting配置
  16. request.settings(Settings.builder().put("index.number_of_shards",5)
  17. .put("index.number_of_replicas", 2) // 副本数
  18. .put("analysis.analyzer.default.tokenizer","standard")
  19. );
  20. // 3.设置索引的mapping
  21. request.mapping("_doc",
  22. " {\n" +
  23. " \"_doc\": {\n" +
  24. " \"properties\": {\n" +
  25. " \"message\": {\n" +
  26. " \"type\": \"text\"\n" +
  27. " }\n" +
  28. " }\n" +
  29. " }\n" +
  30. " }",
  31. XContentType.JSON);
  32. // 设置索引别名
  33. request.alias(new Alias("lab1"));
  34. // 5.发送请求
  35. // 5.1同步方式
  36. CreateIndexResponse response = client.indices().create(request);
  37. // 处理响应
  38. boolean acknowledged = response.isAcknowledged();
  39. boolean shardsAcknowledged = response.isShardsAcknowledged();
  40. System.out.println("请求结果---------------");
  41. System.out.println("acknowledged:"+acknowledged);
  42. System.out.println("shardsAcknowledged:"+shardsAcknowledged);
  43. // 5.2 异步方式发送请求
  44. /* ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
  45. @Override
  46. public void onResponse(CreateIndexResponse createIndexResponse) {
  47. boolean acknowledged = createIndexResponse.isAcknowledged();
  48. boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
  49. System.out.println("请求结果---------------");
  50. System.out.println("acknowledged:"+acknowledged);
  51. System.out.println("shardsAcknowledged:"+shardsAcknowledged);
  52. }
  53. @Override
  54. public void onFailure(Exception e) {
  55. e.printStackTrace();
  56. }
  57. };
  58. client.indices().createAsync(request, listener);*/
  59. }catch (Exception e){
  60. e.printStackTrace();
  61. }
  62. }
  63. }

结果:

  1. 请求结果---------------
  2. acknowledged:true
  3. shardsAcknowledged:true

构建json官方一共给出四中方式:

  1. package com.es.demo;
  2. import org.elasticsearch.action.ActionListener;
  3. import org.elasticsearch.action.admin.indices.alias.Alias;
  4. import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
  5. import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
  6. import org.elasticsearch.action.index.IndexResponse;
  7. import org.elasticsearch.client.RestHighLevelClient;
  8. import org.elasticsearch.common.settings.Settings;
  9. import org.elasticsearch.common.xcontent.XContentBuilder;
  10. import org.elasticsearch.common.xcontent.XContentFactory;
  11. import org.elasticsearch.common.xcontent.XContentType;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. public class CreateIndexDemo {
  15. public static void main(String ags[]){
  16. try(RestHighLevelClient client = InitClient.getClient()){
  17. // 1.创建索引名
  18. CreateIndexRequest request = new CreateIndexRequest("book13");
  19. // 2.索引setting配置
  20. /*request.settings(Settings.builder().put("index.number_of_shards",5)
  21. .put("index.number_of_replicas", 2) // 副本数
  22. .put("analysis.analyzer.default.tokenizer","standard")
  23. );*/
  24. // 3.设置索引的mapping
  25. // 3.1方式一、直接给出json串
  26. /* request.mapping("_doc",
  27. " {\n" +
  28. " \"_doc\": {\n" +
  29. " \"properties\": {\n" +
  30. " \"message\": {\n" +
  31. " \"type\": \"text\"\n" +
  32. " }\n" +
  33. " }\n" +
  34. " }\n" +
  35. " }",
  36. XContentType.JSON);*/
  37. // 3.2方式二、给出封装成Map
  38. /* Map<String, Object> jsonMap = new HashMap<>();
  39. Map<String, Object> message = new HashMap<>();
  40. message.put("type", "text");
  41. Map<String, Object> properties = new HashMap<>();
  42. properties.put("message", message);
  43. Map<String, Object> _doc = new HashMap<>();
  44. _doc.put("properties", properties);
  45. jsonMap.put("_doc", _doc);
  46. request.mapping("_doc", jsonMap);*/
  47. // 3.3方式三、使用XContentBuilder
  48. /* XContentBuilder builder = XContentFactory.jsonBuilder();
  49. builder.startObject();
  50. {
  51. builder.startObject("_doc");
  52. {
  53. builder.startObject("properties");
  54. {
  55. builder.startObject("message");
  56. {
  57. builder.field("type", "text");
  58. }
  59. builder.endObject();
  60. builder.startObject("message1");
  61. {
  62. builder.field("type", "text");
  63. }
  64. builder.endObject();
  65. }
  66. builder.endObject();
  67. }
  68. builder.endObject();
  69. }
  70. builder.endObject();
  71. request.mapping("_doc", builder);*/
  72. // 3.4方式四、使用XContentBuilder
  73. request.source("{\n" +
  74. " \"settings\" : {\n" +
  75. " \"number_of_shards\" : 1,\n" +
  76. " \"number_of_replicas\" : 0\n" +
  77. " },\n" +
  78. " \"mappings\" : {\n" +
  79. " \"_doc\" : {\n" +
  80. " \"properties\" : {\n" +
  81. " \"message\" : { \"type\" : \"text\" },\n" +
  82. " \"message1\" : { \"type\" : \"text\" }\n" +
  83. " }\n" +
  84. " }\n" +
  85. " },\n" +
  86. " \"aliases\" : {\n" +
  87. " \"lab2\" : {}\n" +
  88. " }\n" +
  89. "}", XContentType.JSON);
  90. // 设置索引别名
  91. //request.alias(new Alias("lab1"));
  92. // 5.发送请求
  93. // 5.1同步方式
  94. CreateIndexResponse response = client.indices().create(request);
  95. // 处理响应
  96. boolean acknowledged = response.isAcknowledged();
  97. boolean shardsAcknowledged = response.isShardsAcknowledged();
  98. System.out.println("请求结果---------------");
  99. System.out.println("acknowledged:"+acknowledged);
  100. System.out.println("shardsAcknowledged:"+shardsAcknowledged);
  101. // 5.2 异步方式发送请求
  102. /* ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
  103. @Override
  104. public void onResponse(CreateIndexResponse createIndexResponse) {
  105. boolean acknowledged = createIndexResponse.isAcknowledged();
  106. boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
  107. System.out.println("请求结果---------------");
  108. System.out.println("acknowledged:"+acknowledged);
  109. System.out.println("shardsAcknowledged:"+shardsAcknowledged);
  110. }
  111. @Override
  112. public void onFailure(Exception e) {
  113. e.printStackTrace();
  114. }
  115. };
  116. client.indices().createAsync(request, listener);*/
  117. }catch (Exception e){
  118. e.printStackTrace();
  119. }
  120. }
  121. }
  1. 更多用法参考官方:java索引API

2. index document

索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档

  1. package com.es.demo;
  2. import org.apache.logging.log4j.LogManager;
  3. import org.apache.logging.log4j.Logger;
  4. import org.elasticsearch.ElasticsearchException;
  5. import org.elasticsearch.action.DocWriteResponse;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.action.index.IndexResponse;
  8. import org.elasticsearch.action.support.replication.ReplicationResponse;
  9. import org.elasticsearch.client.RestHighLevelClient;
  10. import org.elasticsearch.common.xcontent.XContentType;
  11. import org.elasticsearch.rest.RestStatus;
  12. public class IndexDocumentDemo {
  13. private static Logger logger = LogManager.getRootLogger();
  14. public static void main(String args[]){
  15. try(RestHighLevelClient client = InitClient.getClient()){
  16. // 1、创建索引请求
  17. IndexRequest request = new IndexRequest(
  18. "mess", //索引
  19. "_doc", // mapping type
  20. "1"); //文档id
  21. // 2、准备文档数据
  22. // 方式一:直接给JSON串
  23. String jsonString = "{" +
  24. "\"user\":\"kimchy\"," +
  25. "\"postDate\":\"2013-01-30\"," +
  26. "\"message\":\"trying out Elasticsearch\"" +
  27. "}";
  28. request.source(jsonString, XContentType.JSON);
  29. // 方式二:以map对象来表示文档
  30. /*
  31. Map<String, Object> jsonMap = new HashMap<>();
  32. jsonMap.put("user", "kimchy");
  33. jsonMap.put("postDate", new Date());
  34. jsonMap.put("message", "trying out Elasticsearch");
  35. request.source(jsonMap);
  36. */
  37. // 方式三:用XContentBuilder来构建文档
  38. /*
  39. XContentBuilder builder = XContentFactory.jsonBuilder();
  40. builder.startObject();
  41. {
  42. builder.field("user", "kimchy");
  43. builder.field("postDate", new Date());
  44. builder.field("message", "trying out Elasticsearch");
  45. }
  46. builder.endObject();
  47. request.source(builder);
  48. */
  49. // 方式四:直接用key-value对给出
  50. /*
  51. request.source("user", "kimchy",
  52. "postDate", new Date(),
  53. "message", "trying out Elasticsearch");
  54. */
  55. //3、其他的一些可选设置
  56. /*
  57. request.routing("routing"); //设置routing值
  58. request.timeout(TimeValue.timeValueSeconds(1)); //设置主分片等待时长
  59. request.setRefreshPolicy("wait_for"); //设置重刷新策略
  60. request.version(2); //设置版本号
  61. request.opType(DocWriteRequest.OpType.CREATE); //操作类别
  62. */
  63. //4、发送请求
  64. IndexResponse indexResponse = null;
  65. try {
  66. // 同步方式
  67. indexResponse = client.index(request);
  68. } catch(ElasticsearchException e) {
  69. // 捕获,并处理异常
  70. //判断是否版本冲突、create但文档已存在冲突
  71. if (e.status() == RestStatus.CONFLICT) {
  72. logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage());
  73. }
  74. logger.error("索引异常", e);
  75. }
  76. //5、处理响应
  77. if(indexResponse != null) {
  78. String index = indexResponse.getIndex();
  79. String type = indexResponse.getType();
  80. String id = indexResponse.getId();
  81. long version = indexResponse.getVersion();
  82. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
  83. System.out.println("新增文档成功!");
  84. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  85. System.out.println("修改文档成功!");
  86. }
  87. // 分片处理信息
  88. ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
  89. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  90. }
  91. // 如果有分片副本失败,可以获得失败原因信息
  92. if (shardInfo.getFailed() > 0) {
  93. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  94. String reason = failure.reason();
  95. System.out.println("副本失败原因:" + reason);
  96. }
  97. }
  98. }
  99. //异步方式发送索引请求
  100. /*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
  101. @Override
  102. public void onResponse(IndexResponse indexResponse) {
  103. }
  104. @Override
  105. public void onFailure(Exception e) {
  106. }
  107. };
  108. client.indexAsync(request, listener);
  109. */
  110. }catch (Exception e){
  111. e.printStackTrace();
  112. }
  113. }
  114. }

结果:

  1. 新增文档成功!

官方文档:API

3. get document

  1. package com.es.demo;
  2. import org.apache.logging.log4j.LogManager;
  3. import org.apache.logging.log4j.Logger;
  4. import org.elasticsearch.ElasticsearchException;
  5. import org.elasticsearch.action.get.GetRequest;
  6. import org.elasticsearch.action.get.GetResponse;
  7. import org.elasticsearch.client.RestHighLevelClient;
  8. import org.elasticsearch.common.Strings;
  9. import org.elasticsearch.rest.RestStatus;
  10. import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
  11. import java.io.IOException;
  12. import java.util.Map;
  13. public class GetDocumentDemo {
  14. private static Logger logger = LogManager.getRootLogger();
  15. public static void main(String[] args) {
  16. try (RestHighLevelClient client = InitClient.getClient();) {
  17. // 1、创建获取文档请求
  18. GetRequest request = new GetRequest(
  19. "book13", //索引
  20. "_doc", // mapping type
  21. "1"); //文档id
  22. // 2、可选的设置
  23. //request.routing("routing");
  24. //request.version(2);
  25. //request.fetchSourceContext(new FetchSourceContext(false)); //是否获取_source字段
  26. //选择返回的字段
  27. String[] includes = new String[]{"message", "*Date","user"};
  28. String[] excludes = Strings.EMPTY_ARRAY;
  29. FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
  30. request.fetchSourceContext(fetchSourceContext);
  31. // 取stored字段
  32. /*request.storedFields("message");
  33. GetResponse getResponse = client.get(request);
  34. String message = getResponse.getField("message").getValue();*/
  35. //3、发送请求
  36. GetResponse getResponse = null;
  37. try {
  38. // 同步请求
  39. getResponse = client.get(request);
  40. } catch (ElasticsearchException e) {
  41. if (e.status() == RestStatus.NOT_FOUND) {
  42. logger.error("没有找到该id的文档" );
  43. }
  44. if (e.status() == RestStatus.CONFLICT) {
  45. logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" );
  46. }
  47. logger.error("获取文档异常", e);
  48. }
  49. //4、处理响应
  50. if(getResponse != null) {
  51. String index = getResponse.getIndex();
  52. String type = getResponse.getType();
  53. String id = getResponse.getId();
  54. if (getResponse.isExists()) { // 文档存在
  55. long version = getResponse.getVersion();
  56. String sourceAsString = getResponse.getSourceAsString(); //结果取成 String
  57. Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // 结果取成Map
  58. byte[] sourceAsBytes = getResponse.getSourceAsBytes(); //结果取成字节数组
  59. logger.info("index:" + index + " type:" + type + " id:" + id);
  60. logger.info(sourceAsString);
  61. } else {
  62. logger.error("没有找到该id的文档" );
  63. }
  64. }
  65. //异步方式发送获取文档请求
  66. /*
  67. ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
  68. @Override
  69. public void onResponse(GetResponse getResponse) {
  70. }
  71. @Override
  72. public void onFailure(Exception e) {
  73. }
  74. };
  75. client.getAsync(request, listener);
  76. */
  77. } catch (IOException e) {
  78. e.printStackTrace();
  79. }
  80. }
  81. }

结果:

  1. 20:15:40.943 [main] INFO - index:book13 type:_doc id:1
  2. 20:15:40.943 [main] INFO - {"postDate":"2013-01-30","message":"trying out Elasticsearch","user":"kimchy"}

4. Bulk

批量索引文档,即批量往索引里面放入文档数据.类似于数据库里面批量向表里面插入多行数据,一行数据就是一个文档
BulkDemo.java

  1. package com.es.demo;
  2. import org.apache.logging.log4j.LogManager;
  3. import org.apache.logging.log4j.Logger;
  4. import org.elasticsearch.action.DocWriteRequest;
  5. import org.elasticsearch.action.DocWriteResponse;
  6. import org.elasticsearch.action.bulk.BulkItemResponse;
  7. import org.elasticsearch.action.bulk.BulkRequest;
  8. import org.elasticsearch.action.bulk.BulkResponse;
  9. import org.elasticsearch.action.delete.DeleteResponse;
  10. import org.elasticsearch.action.index.IndexRequest;
  11. import org.elasticsearch.action.index.IndexResponse;
  12. import org.elasticsearch.action.update.UpdateResponse;
  13. import org.elasticsearch.client.RestHighLevelClient;
  14. import org.elasticsearch.common.xcontent.XContentType;
  15. import java.io.IOException;
  16. import java.util.Date;
  17. public class BulkDemo {
  18. private static Logger logger = LogManager.getRootLogger();
  19. public static void main(String[] args) {
  20. try (RestHighLevelClient client = InitClient.getClient();) {
  21. // 1、创建批量操作请求参数
  22. BulkRequest request = new BulkRequest();
  23. request.add(new IndexRequest("book13", "_doc", "1")
  24. .source(XContentType.JSON,"postDate", new Date()));
  25. request.add(new IndexRequest("book13", "_doc", "2")
  26. .source(XContentType.JSON,"user", "liming"));
  27. request.add(new IndexRequest("book13", "_doc", "3")
  28. .source(XContentType.JSON,"message", "add a doc"));
  29. /*
  30. request.add(new DeleteRequest("mess", "_doc", "3"));
  31. request.add(new UpdateRequest("mess", "_doc", "2")
  32. .doc(XContentType.JSON,"other", "test"));
  33. request.add(new IndexRequest("mess", "_doc", "4")
  34. .source(XContentType.JSON,"field", "baz"));
  35. */
  36. // 2、可选的设置
  37. /*
  38. request.timeout("2m");
  39. request.setRefreshPolicy("wait_for");
  40. request.waitForActiveShards(2);
  41. */
  42. //3、发送请求
  43. // 同步请求
  44. BulkResponse bulkResponse = client.bulk(request);
  45. //4、处理响应
  46. if(bulkResponse != null) {
  47. for (BulkItemResponse bulkItemResponse : bulkResponse) {
  48. DocWriteResponse itemResponse = bulkItemResponse.getResponse();
  49. if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
  50. || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
  51. IndexResponse indexResponse = (IndexResponse) itemResponse;
  52. //TODO 新增成功的处理
  53. logger.info("新增成功");
  54. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
  55. UpdateResponse updateResponse = (UpdateResponse) itemResponse;
  56. //TODO 修改成功的处理
  57. logger.info("修改成功");
  58. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
  59. DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
  60. //TODO 删除成功的处理
  61. logger.info("删除成功");
  62. }
  63. }
  64. }
  65. //异步方式发送批量操作请求
  66. /*
  67. ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
  68. @Override
  69. public void onResponse(BulkResponse bulkResponse) {
  70. }
  71. @Override
  72. public void onFailure(Exception e) {
  73. }
  74. };
  75. client.bulkAsync(request, listener);
  76. */
  77. } catch (IOException e) {
  78. e.printStackTrace();
  79. }
  80. }
  81. }

结果:

  1. 20:25:41.726 [main] INFO - 新增成功
  2. 20:25:41.730 [main] INFO - 新增成功
  3. 20:25:41.730 [main] INFO - 新增成功

重复运行多次,并没有返回修改成功,也是新增成功,这可能是个bug,实际上第一次以后运行都是修改操作了。

  1. package com.es.demo;
  2. import org.apache.logging.log4j.LogManager;
  3. import org.apache.logging.log4j.Logger;
  4. import org.elasticsearch.action.DocWriteRequest;
  5. import org.elasticsearch.action.DocWriteResponse;
  6. import org.elasticsearch.action.bulk.BulkItemResponse;
  7. import org.elasticsearch.action.bulk.BulkRequest;
  8. import org.elasticsearch.action.bulk.BulkResponse;
  9. import org.elasticsearch.action.delete.DeleteResponse;
  10. import org.elasticsearch.action.index.IndexRequest;
  11. import org.elasticsearch.action.index.IndexResponse;
  12. import org.elasticsearch.action.update.UpdateResponse;
  13. import org.elasticsearch.client.RestHighLevelClient;
  14. import org.elasticsearch.common.xcontent.XContentType;
  15. import java.io.IOException;
  16. import java.util.Date;
  17. public class BulkDemo {
  18. private static Logger logger = LogManager.getRootLogger();
  19. public static void main(String[] args) {
  20. try (RestHighLevelClient client = InitClient.getClient();) {
  21. // 1、创建批量操作请求参数
  22. BulkRequest request = new BulkRequest();
  23. request.add(new IndexRequest("book13", "_doc", "1")
  24. .source(XContentType.JSON,"postDate", new Date()));
  25. request.add(new IndexRequest("book13", "_doc", "2")
  26. .source(XContentType.JSON,"user", "liming"));
  27. request.add(new IndexRequest("book13", "_doc", "3")
  28. .source(XContentType.JSON,"message", "add a doc"));
  29. /*
  30. request.add(new DeleteRequest("mess", "_doc", "3"));
  31. request.add(new UpdateRequest("mess", "_doc", "2")
  32. .doc(XContentType.JSON,"other", "test"));
  33. request.add(new IndexRequest("mess", "_doc", "4")
  34. .source(XContentType.JSON,"field", "baz"));
  35. */
  36. // 2、可选的设置
  37. /*
  38. request.timeout("2m");
  39. request.setRefreshPolicy("wait_for");
  40. request.waitForActiveShards(2);
  41. */
  42. //3、发送请求
  43. // 同步请求
  44. BulkResponse bulkResponse = client.bulk(request);
  45. //4、处理响应
  46. if(bulkResponse != null) {
  47. for (BulkItemResponse bulkItemResponse : bulkResponse) {
  48. DocWriteResponse itemResponse = bulkItemResponse.getResponse();
  49. if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
  50. || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
  51. IndexResponse indexResponse = (IndexResponse) itemResponse;
  52. //TODO 新增成功的处理
  53. logger.info("新增成功,{}",indexResponse.toString());
  54. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
  55. UpdateResponse updateResponse = (UpdateResponse) itemResponse;
  56. //TODO 修改成功的处理
  57. logger.info("修改成功,{}",updateResponse.toString());
  58. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
  59. DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
  60. //TODO 删除成功的处理
  61. logger.info("删除成功,{}",deleteResponse.toString());
  62. }
  63. }
  64. }
  65. //异步方式发送批量操作请求
  66. /*
  67. ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
  68. @Override
  69. public void onResponse(BulkResponse bulkResponse) {
  70. }
  71. @Override
  72. public void onFailure(Exception e) {
  73. }
  74. };
  75. client.bulkAsync(request, listener);
  76. */
  77. } catch (IOException e) {
  78. e.printStackTrace();
  79. }
  80. }
  81. }

结果:

  1. 20:31:44.095 [main] INFO - 新增成功,IndexResponse[index=book13,type=_doc,id=1,version=6,result=updated,seqNo=13,primaryTerm=1,shards={"total":1,"successful":1,"failed":0}]
  2. 20:31:44.099 [main] INFO - 新增成功,IndexResponse[index=book13,type=_doc,id=2,version=5,result=updated,seqNo=14,primaryTerm=1,shards={"total":1,"successful":1,"failed":0}]
  3. 20:31:44.099 [main] INFO - 新增成功,IndexResponse[index=book13,type=_doc,id=3,version=5,result=updated,seqNo=15,primaryTerm=1,shards={"total":1,"successful":1,"failed":0}]

postMan对比查看结果:

  1. {
  2. "took": 1,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": 3,
  12. "max_score": 1,
  13. "hits": [
  14. {
  15. "_index": "book13",
  16. "_type": "_doc",
  17. "_id": "1",
  18. "_score": 1,
  19. "_source": {
  20. "postDate": "2018-09-09T12:25:41.302Z"
  21. }
  22. },
  23. {
  24. "_index": "book13",
  25. "_type": "_doc",
  26. "_id": "2",
  27. "_score": 1,
  28. "_source": {
  29. "user": "liming"
  30. }
  31. },
  32. {
  33. "_index": "book13",
  34. "_type": "_doc",
  35. "_id": "3",
  36. "_score": 1,
  37. "_source": {
  38. "message": "add a doc"
  39. }
  40. }
  41. ]
  42. }
  43. }

5. search

搜索数据
SearchDemo.java

  1. package com.es.demo;
  2. import java.io.IOException;
  3. import java.util.Map;
  4. import java.util.concurrent.TimeUnit;
  5. import org.apache.logging.log4j.LogManager;
  6. import org.apache.logging.log4j.Logger;
  7. import org.elasticsearch.action.search.SearchRequest;
  8. import org.elasticsearch.action.search.SearchResponse;
  9. import org.elasticsearch.action.search.ShardSearchFailure;
  10. import org.elasticsearch.client.RestHighLevelClient;
  11. import org.elasticsearch.common.unit.TimeValue;
  12. import org.elasticsearch.index.query.QueryBuilders;
  13. import org.elasticsearch.rest.RestStatus;
  14. import org.elasticsearch.search.SearchHit;
  15. import org.elasticsearch.search.SearchHits;
  16. import org.elasticsearch.search.builder.SearchSourceBuilder;
  17. /**
  18. *
  19. * @Description: 搜索数据
  20. * @author lgs
  21. * @date 2018年6月23日
  22. *
  23. */
  24. public class SearchDemo {
  25. private static Logger logger = LogManager.getRootLogger();
  26. public static void main(String[] args) {
  27. try (RestHighLevelClient client = InitClient.getClient();) {
  28. // 1、创建search请求
  29. //SearchRequest searchRequest = new SearchRequest();
  30. SearchRequest searchRequest = new SearchRequest("book13");
  31. searchRequest.types("_doc");
  32. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  33. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  34. //构造QueryBuilder
  35. /*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
  36. .fuzziness(Fuzziness.AUTO)
  37. .prefixLength(3)
  38. .maxExpansions(10);
  39. sourceBuilder.query(matchQueryBuilder);*/
  40. sourceBuilder.query(QueryBuilders.termQuery("user", "liming"));
  41. sourceBuilder.from(0);
  42. sourceBuilder.size(10);
  43. sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
  44. //是否返回_source字段
  45. //sourceBuilder.fetchSource(false);
  46. //设置返回哪些字段
  47. /*String[] includeFields = new String[] {"title", "user", "innerObject.*"};
  48. String[] excludeFields = new String[] {"_type"};
  49. sourceBuilder.fetchSource(includeFields, excludeFields);*/
  50. //指定排序
  51. //sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
  52. //sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));
  53. // 设置返回 profile
  54. //sourceBuilder.profile(true);
  55. //将请求体加入到请求中
  56. searchRequest.source(sourceBuilder);
  57. // 可选的设置
  58. //searchRequest.routing("routing");
  59. // 高亮设置
  60. /*
  61. HighlightBuilder highlightBuilder = new HighlightBuilder();
  62. HighlightBuilder.Field highlightTitle =
  63. new HighlightBuilder.Field("title");
  64. highlightTitle.highlighterType("unified");
  65. highlightBuilder.field(highlightTitle);
  66. HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
  67. highlightBuilder.field(highlightUser);
  68. sourceBuilder.highlighter(highlightBuilder);*/
  69. //加入聚合
  70. /*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
  71. .field("company.keyword");
  72. aggregation.subAggregation(AggregationBuilders.avg("average_age")
  73. .field("age"));
  74. sourceBuilder.aggregation(aggregation);*/
  75. //做查询建议
  76. /*SuggestionBuilder termSuggestionBuilder =
  77. SuggestBuilders.termSuggestion("user").text("kmichy");
  78. SuggestBuilder suggestBuilder = new SuggestBuilder();
  79. suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
  80. sourceBuilder.suggest(suggestBuilder);*/
  81. //3、发送请求
  82. SearchResponse searchResponse = client.search(searchRequest);
  83. //4、处理响应
  84. //搜索结果状态信息
  85. RestStatus status = searchResponse.status();
  86. TimeValue took = searchResponse.getTook();
  87. Boolean terminatedEarly = searchResponse.isTerminatedEarly();
  88. boolean timedOut = searchResponse.isTimedOut();
  89. //分片搜索情况
  90. int totalShards = searchResponse.getTotalShards();
  91. int successfulShards = searchResponse.getSuccessfulShards();
  92. int failedShards = searchResponse.getFailedShards();
  93. for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
  94. // failures should be handled here
  95. }
  96. //处理搜索命中文档结果
  97. SearchHits hits = searchResponse.getHits();
  98. long totalHits = hits.getTotalHits();
  99. float maxScore = hits.getMaxScore();
  100. SearchHit[] searchHits = hits.getHits();
  101. for (SearchHit hit : searchHits) {
  102. // do something with the SearchHit
  103. String index = hit.getIndex();
  104. String type = hit.getType();
  105. String id = hit.getId();
  106. float score = hit.getScore();
  107. //取_source字段值
  108. String sourceAsString = hit.getSourceAsString(); //取成json串
  109. Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
  110. //从map中取字段值
  111. /*
  112. String documentTitle = (String) sourceAsMap.get("title");
  113. List<Object> users = (List<Object>) sourceAsMap.get("user");
  114. Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
  115. */
  116. logger.info("index:" + index + " type:" + type + " id:" + id);
  117. logger.info(sourceAsString);
  118. //取高亮结果
  119. /*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
  120. HighlightField highlight = highlightFields.get("title");
  121. Text[] fragments = highlight.fragments();
  122. String fragmentString = fragments[0].string();*/
  123. }
  124. // 获取聚合结果
  125. /*
  126. Aggregations aggregations = searchResponse.getAggregations();
  127. Terms byCompanyAggregation = aggregations.get("by_company");
  128. Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
  129. Avg averageAge = elasticBucket.getAggregations().get("average_age");
  130. double avg = averageAge.getValue();
  131. */
  132. // 获取建议结果
  133. /*Suggest suggest = searchResponse.getSuggest();
  134. TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
  135. for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
  136. for (TermSuggestion.Entry.Option option : entry) {
  137. String suggestText = option.getText().string();
  138. }
  139. }
  140. */
  141. //异步方式发送获查询请求
  142. /*
  143. ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
  144. @Override
  145. public void onResponse(SearchResponse getResponse) {
  146. //结果获取
  147. }
  148. @Override
  149. public void onFailure(Exception e) {
  150. //失败处理
  151. }
  152. };
  153. client.searchAsync(searchRequest, listener);
  154. */
  155. } catch (IOException e) {
  156. logger.error(e);
  157. }
  158. }
  159. }

结果:
21:05:50.762 [main] INFO - index:book13 type:_doc id:2
21:05:50.766 [main] INFO - {“user”:”liming”}

6. highlight 高亮

HighlightDemo.java

  1. package com.es.demo;
  2. import org.apache.logging.log4j.LogManager;
  3. import org.apache.logging.log4j.Logger;
  4. import org.elasticsearch.action.search.SearchRequest;
  5. import org.elasticsearch.action.search.SearchResponse;
  6. import org.elasticsearch.client.RestHighLevelClient;
  7. import org.elasticsearch.common.text.Text;
  8. import org.elasticsearch.index.query.QueryBuilder;
  9. import org.elasticsearch.index.query.QueryBuilders;
  10. import org.elasticsearch.rest.RestStatus;
  11. import org.elasticsearch.search.SearchHit;
  12. import org.elasticsearch.search.SearchHits;
  13. import org.elasticsearch.search.builder.SearchSourceBuilder;
  14. import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
  15. import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
  16. import java.io.IOException;
  17. import java.util.Map;
  18. public class HighlightDemo {
  19. private static Logger logger = LogManager.getRootLogger();
  20. public static void main(String[] args) {
  21. try (RestHighLevelClient client = InitClient.getClient();) {
  22. // 1、创建search请求
  23. SearchRequest searchRequest = new SearchRequest("book1");
  24. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  25. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  26. //构造QueryBuilder
  27. QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("name", "test");
  28. sourceBuilder.query(matchQueryBuilder);
  29. //分页设置
  30. /*sourceBuilder.from(0);
  31. sourceBuilder.size(5); ;*/
  32. // 高亮设置
  33. HighlightBuilder highlightBuilder = new HighlightBuilder();
  34. highlightBuilder.requireFieldMatch(false).field("name").field("age")
  35. .preTags("<strong>").postTags("</strong>");
  36. //不同字段可有不同设置,如不同标签
  37. /*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title");
  38. highlightTitle.preTags("<strong>").postTags("</strong>");
  39. highlightBuilder.field(highlightTitle);
  40. HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content");
  41. highlightContent.preTags("<b>").postTags("</b>");
  42. highlightBuilder.field(highlightContent).requireFieldMatch(false);*/
  43. sourceBuilder.highlighter(highlightBuilder);
  44. searchRequest.source(sourceBuilder);
  45. //3、发送请求
  46. SearchResponse searchResponse = client.search(searchRequest);
  47. //4、处理响应
  48. if (RestStatus.OK.equals(searchResponse.status())) {
  49. //处理搜索命中文档结果
  50. SearchHits hits = searchResponse.getHits();
  51. long totalHits = hits.getTotalHits();
  52. SearchHit[] searchHits = hits.getHits();
  53. for (SearchHit hit : searchHits) {
  54. String index = hit.getIndex();
  55. String type = hit.getType();
  56. String id = hit.getId();
  57. float score = hit.getScore();
  58. //取_source字段值
  59. //String sourceAsString = hit.getSourceAsString(); //取成json串
  60. Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
  61. //从map中取字段值
  62. /*String title = (String) sourceAsMap.get("title");
  63. String content = (String) sourceAsMap.get("content"); */
  64. logger.info("index:" + index + " type:" + type + " id:" + id);
  65. logger.info("sourceMap : " + sourceAsMap);
  66. //取高亮结果
  67. Map<String, HighlightField> highlightFields = hit.getHighlightFields();
  68. HighlightField highlight = highlightFields.get("name");
  69. if (highlight != null) {
  70. Text[] fragments = highlight.fragments(); //多值的字段会有多个值
  71. if (fragments != null) {
  72. String fragmentString = fragments[0].string();
  73. logger.info("title highlight : " + fragmentString);
  74. //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
  75. //sourceAsMap.put("title", fragmentString);
  76. }
  77. }
  78. highlight = highlightFields.get("age");
  79. if (highlight != null) {
  80. Text[] fragments = highlight.fragments(); //多值的字段会有多个值
  81. if (fragments != null) {
  82. String fragmentString = fragments[0].string();
  83. logger.info("content highlight : " + fragmentString);
  84. //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
  85. //sourceAsMap.put("content", fragmentString);
  86. }
  87. }
  88. }
  89. }
  90. } catch (IOException e) {
  91. logger.error(e);
  92. }
  93. }
  94. }

结果:

  1. 21:13:29.702 [main] INFO - index:book1 type:english id:5oVDQ2UBRzBxBrDgtIl0
  2. 21:13:29.706 [main] INFO - sourceMap : {name=test goog my money, addr=中国, class=dsfdsf, age=12}
  3. 21:13:29.706 [main] INFO - title highlight : <strong>test</strong> goog my money
  4. 21:13:29.706 [main] INFO - index:book1 type:english id:6IUkUmUBRzBxBrDgFok2
  5. 21:13:29.710 [main] INFO - sourceMap : {name=test goog my money, addr=中国, class=dsfdsf, age=[14, 54, 45, 34]}
  6. 21:13:29.710 [main] INFO - title highlight : <strong>test</strong> goog my money
  7. 21:13:29.710 [main] INFO - index:book1 type:english id:32
  8. 21:13:29.710 [main] INFO - sourceMap : {name=test, age=1}
  9. 21:13:29.710 [main] INFO - title highlight : <strong>test</strong>
  10. 21:13:29.710 [main] INFO - index:book1 type:english id:33
  11. 21:13:29.710 [main] INFO - sourceMap : {name=test, age=1}
  12. 21:13:29.710 [main] INFO - title highlight : <strong>test</strong>
  13. 21:13:29.710 [main] INFO - index:book1 type:english id:54UiUmUBRzBxBrDgfIl9
  14. 21:13:29.710 [main] INFO - sourceMap : {name=test goog my money, addr=中国, class=dsfdsf, age=[11, 13, 14]}
  15. 21:13:29.710 [main] INFO - title highlight : <strong>test</strong> goog my money

7. suggest 查询建议

SuggestDemo.java(本demo只有单词纠错和前缀自动补全)

  1. package com.es.demo;
  2. import java.io.IOException;
  3. import org.apache.logging.log4j.LogManager;
  4. import org.apache.logging.log4j.Logger;
  5. import org.elasticsearch.action.search.SearchRequest;
  6. import org.elasticsearch.action.search.SearchResponse;
  7. import org.elasticsearch.client.RestHighLevelClient;
  8. import org.elasticsearch.rest.RestStatus;
  9. import org.elasticsearch.search.builder.SearchSourceBuilder;
  10. import org.elasticsearch.search.suggest.Suggest;
  11. import org.elasticsearch.search.suggest.SuggestBuilder;
  12. import org.elasticsearch.search.suggest.SuggestBuilders;
  13. import org.elasticsearch.search.suggest.SuggestionBuilder;
  14. import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
  15. import org.elasticsearch.search.suggest.term.TermSuggestion;
  16. /**
  17. *
  18. * @Description: 查询建议
  19. * @author lgs
  20. * @date 2018年6月23日
  21. *
  22. */
  23. public class SuggestDemo {
  24. private static Logger logger = LogManager.getRootLogger();
  25. //词项建议拼写检查,检查用户的拼写是否错误,如果有错给用户推荐正确的词,appel->apple
  26. public static void termSuggest() {
  27. try (RestHighLevelClient client = InitClient.getClient();) {
  28. // 1、创建search请求
  29. //SearchRequest searchRequest = new SearchRequest();
  30. SearchRequest searchRequest = new SearchRequest("book1");
  31. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  32. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  33. sourceBuilder.size(0);
  34. //做查询建议
  35. //词项建议
  36. SuggestionBuilder termSuggestionBuilder =
  37. SuggestBuilders.termSuggestion("name").text("text");
  38. SuggestBuilder suggestBuilder = new SuggestBuilder();
  39. suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
  40. sourceBuilder.suggest(suggestBuilder);
  41. searchRequest.source(sourceBuilder);
  42. //3、发送请求
  43. SearchResponse searchResponse = client.search(searchRequest);
  44. //4、处理响应
  45. //搜索结果状态信息
  46. if(RestStatus.OK.equals(searchResponse.status())) {
  47. // 获取建议结果
  48. Suggest suggest = searchResponse.getSuggest();
  49. TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
  50. for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
  51. logger.info("text: " + entry.getText().string());
  52. for (TermSuggestion.Entry.Option option : entry) {
  53. String suggestText = option.getText().string();
  54. logger.info(" suggest option : " + suggestText);
  55. }
  56. }
  57. }
  58. /*
  59. "suggest": {
  60. "my-suggestion": [
  61. {
  62. "text": "tring",
  63. "offset": 0,
  64. "length": 5,
  65. "options": [
  66. {
  67. "text": "trying",
  68. "score": 0.8,
  69. "freq": 1
  70. }
  71. ]
  72. },
  73. {
  74. "text": "out",
  75. "offset": 6,
  76. "length": 3,
  77. "options": []
  78. },
  79. {
  80. "text": "elasticsearch",
  81. "offset": 10,
  82. "length": 13,
  83. "options": []
  84. }
  85. ]
  86. }*/
  87. } catch (IOException e) {
  88. logger.error(e);
  89. }
  90. }
  91. //自动补全,根据用户的输入联想到可能的词或者短语
  92. public static void completionSuggester() {
  93. try (RestHighLevelClient client = InitClient.getClient();) {
  94. // 1、创建search请求
  95. //SearchRequest searchRequest = new SearchRequest();
  96. SearchRequest searchRequest = new SearchRequest("book5");
  97. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  98. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  99. sourceBuilder.size(0);
  100. //做查询建议
  101. //自动补全
  102. /*POST music/_search?pretty
  103. {
  104. "suggest": {
  105. "song-suggest" : {
  106. "prefix" : "lucene s",
  107. "completion" : {
  108. "field" : "suggest" ,
  109. "skip_duplicates": true
  110. }
  111. }
  112. }
  113. }*/
  114. SuggestionBuilder termSuggestionBuilder =
  115. SuggestBuilders.completionSuggestion("suggest").prefix("tes")
  116. .skipDuplicates(true);
  117. SuggestBuilder suggestBuilder = new SuggestBuilder();
  118. suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder);
  119. sourceBuilder.suggest(suggestBuilder);
  120. searchRequest.source(sourceBuilder);
  121. //3、发送请求
  122. SearchResponse searchResponse = client.search(searchRequest);
  123. //4、处理响应
  124. //搜索结果状态信息
  125. if(RestStatus.OK.equals(searchResponse.status())) {
  126. // 获取建议结果
  127. Suggest suggest = searchResponse.getSuggest();
  128. CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest");
  129. for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) {
  130. logger.info("text: " + entry.getText().string());
  131. for (CompletionSuggestion.Entry.Option option : entry) {
  132. String suggestText = option.getText().string();
  133. logger.info(" suggest option : " + suggestText);
  134. }
  135. }
  136. }
  137. } catch (IOException e) {
  138. logger.error(e);
  139. }
  140. }
  141. public static void main(String[] args) {
  142. termSuggest();
  143. logger.info("--------------------------------------");
  144. completionSuggester();
  145. }
  146. }

结果:

  1. 21:24:40.416 [main] INFO - text: text
  2. 21:24:40.420 [main] INFO - suggest option : test
  3. 21:24:40.420 [main] INFO - suggest option : term
  4. 21:24:40.420 [main] INFO - --------------------------------------
  5. 21:24:40.624 [main] INFO - text: tes
  6. 21:24:40.624 [main] INFO - suggest option : test english
  7. 21:24:40.624 [main] INFO - suggest option : test my book1

8. aggregation 聚合分析

AggregationDemo.java

  1. package com.es.demo;
  2. import java.io.IOException;
  3. import org.apache.logging.log4j.LogManager;
  4. import org.apache.logging.log4j.Logger;
  5. import org.elasticsearch.action.search.SearchRequest;
  6. import org.elasticsearch.action.search.SearchResponse;
  7. import org.elasticsearch.client.RestHighLevelClient;
  8. import org.elasticsearch.rest.RestStatus;
  9. import org.elasticsearch.search.aggregations.AggregationBuilders;
  10. import org.elasticsearch.search.aggregations.Aggregations;
  11. import org.elasticsearch.search.aggregations.BucketOrder;
  12. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  13. import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
  14. import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
  15. import org.elasticsearch.search.aggregations.metrics.avg.Avg;
  16. import org.elasticsearch.search.builder.SearchSourceBuilder;
  17. public class AggregationDemo {
  18. private static Logger logger = LogManager.getRootLogger();
  19. public static void main(String[] args) {
  20. try (RestHighLevelClient client = InitClient.getClient();) {
  21. // 1、创建search请求
  22. //SearchRequest searchRequest = new SearchRequest();
  23. SearchRequest searchRequest = new SearchRequest("book1");
  24. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  25. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  26. sourceBuilder.size(0);
  27. //加入聚合
  28. //字段值项分组聚合
  29. TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age")
  30. .field("age").order(BucketOrder.aggregation("average_balance", true));
  31. //计算每组的平均balance指标
  32. aggregation.subAggregation(AggregationBuilders.avg("average_balance")
  33. .field("age"));
  34. sourceBuilder.aggregation(aggregation);
  35. searchRequest.source(sourceBuilder);
  36. //3、发送请求
  37. SearchResponse searchResponse = client.search(searchRequest);
  38. //4、处理响应
  39. //搜索结果状态信息
  40. if(RestStatus.OK.equals(searchResponse.status())) {
  41. // 获取聚合结果
  42. Aggregations aggregations = searchResponse.getAggregations();
  43. Terms byAgeAggregation = aggregations.get("by_age");
  44. logger.info("aggregation by_age 结果");
  45. logger.info("docCountError: " + byAgeAggregation.getDocCountError());
  46. logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts());
  47. logger.info("------------------------------------");
  48. for(Bucket buck : byAgeAggregation.getBuckets()) {
  49. logger.info("key: " + buck.getKeyAsNumber());
  50. logger.info("docCount: " + buck.getDocCount());
  51. logger.info("docCountError: " + buck.getDocCountError());
  52. //取子聚合
  53. Avg averageBalance = buck.getAggregations().get("average_balance");
  54. logger.info("average_balance: " + averageBalance.getValue());
  55. logger.info("------------------------------------");
  56. }
  57. //直接用key 来去分组
  58. /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24");
  59. Avg averageAge = elasticBucket.getAggregations().get("average_age");
  60. double avg = averageAge.getValue();*/
  61. }
  62. } catch (IOException e) {
  63. logger.error(e);
  64. }
  65. }
  66. }

结果:

  1. 22:58:24.681 [main] INFO - aggregation by_age 结果
  2. 22:58:24.685 [main] INFO - docCountError: 0
  3. 22:58:24.685 [main] INFO - sumOfOtherDocCounts: 1
  4. 22:58:24.685 [main] INFO - ------------------------------------
  5. 22:58:24.685 [main] INFO - key: 1
  6. 22:58:24.685 [main] INFO - docCount: 11
  7. 22:58:24.685 [main] INFO - docCountError: 0
  8. 22:58:24.685 [main] INFO - average_balance: 1.0
  9. 22:58:24.685 [main] INFO - ------------------------------------
  10. 22:58:24.685 [main] INFO - key: 12
  11. 22:58:24.685 [main] INFO - docCount: 16
  12. 22:58:24.685 [main] INFO - docCountError: 0
  13. 22:58:24.685 [main] INFO - average_balance: 12.0
  14. 22:58:24.685 [main] INFO - ------------------------------------
  15. 22:58:24.685 [main] INFO - key: 11
  16. 22:58:24.685 [main] INFO - docCount: 1
  17. 22:58:24.689 [main] INFO - docCountError: 0
  18. 22:58:24.689 [main] INFO - average_balance: 12.666666666666666
  19. 22:58:24.689 [main] INFO - ------------------------------------
  20. 22:58:24.689 [main] INFO - key: 13
  21. 22:58:24.689 [main] INFO - docCount: 2
  22. 22:58:24.689 [main] INFO - docCountError: 0
  23. 22:58:24.689 [main] INFO - average_balance: 12.75
  24. 22:58:24.689 [main] INFO - ------------------------------------
  25. 22:58:24.689 [main] INFO - key: 16
  26. 22:58:24.689 [main] INFO - docCount: 1
  27. 22:58:24.689 [main] INFO - docCountError: 0
  28. 22:58:24.689 [main] INFO - average_balance: 16.0
  29. 22:58:24.689 [main] INFO - ------------------------------------
  30. 22:58:24.689 [main] INFO - key: 21
  31. 22:58:24.689 [main] INFO - docCount: 1
  32. 22:58:24.689 [main] INFO - docCountError: 0
  33. 22:58:24.689 [main] INFO - average_balance: 21.0
  34. 22:58:24.689 [main] INFO - ------------------------------------
  35. 22:58:24.689 [main] INFO - key: 14
  36. 22:58:24.689 [main] INFO - docCount: 2
  37. 22:58:24.689 [main] INFO - docCountError: 0
  38. 22:58:24.689 [main] INFO - average_balance: 26.428571428571427
  39. 22:58:24.689 [main] INFO - ------------------------------------
  40. 22:58:24.689 [main] INFO - key: 33
  41. 22:58:24.689 [main] INFO - docCount: 1
  42. 22:58:24.689 [main] INFO - docCountError: 0
  43. 22:58:24.689 [main] INFO - average_balance: 33.0
  44. 22:58:24.689 [main] INFO - ------------------------------------
  45. 22:58:24.689 [main] INFO - key: 34
  46. 22:58:24.689 [main] INFO - docCount: 1
  47. 22:58:24.689 [main] INFO - docCountError: 0
  48. 22:58:24.689 [main] INFO - average_balance: 36.75
  49. 22:58:24.689 [main] INFO - ------------------------------------
  50. 22:58:24.689 [main] INFO - key: 45
  51. 22:58:24.689 [main] INFO - docCount: 1
  52. 22:58:24.689 [main] INFO - docCountError: 0
  53. 22:58:24.689 [main] INFO - average_balance: 36.75
  54. 22:58:24.689 [main] INFO - ------------------------------------

9. 官网资料

各种查询对应的QueryBuilder:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-query-builders.html
各种聚合对应的AggregationBuilder:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html

10.源码

https://github.com/Star-Lordxing/ES-java-client-api

五、Java Client

1. Java Client 说明

java client 使用 TransportClient,各种操作本质上都是异步的(可以用 listener,或返回 Future )。
注意:ES的发展规划中在7.0版本开始将废弃 TransportClient,8.0版本中将完全移除 TransportClient,取而代之的是High Level REST Client。
High Level REST Client 中的操作API和java client 大多是一样的,除了连接方式InitClient代码不一样

2. 官方学习链接

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

3. 兼容性说明

请使用与服务端ES版本一致的客户端版本

4. Java Client maven 集成

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>transport</artifactId>
  4. <version>6.3.1</version>
  5. </dependency>
  6. <!--日志-->
  7. <dependency>
  8. <groupId>org.apache.logging.log4j</groupId>
  9. <artifactId>log4j-api</artifactId>
  10. <version>2.11.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.logging.log4j</groupId>
  14. <artifactId>log4j-core</artifactId>
  15. <version>2.11.1</version>
  16. </dependency>

5.将log4j2.xml编译到classes路径下

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration status="OFF">
  3. <appenders>
  4. <Console name="Console" target="SYSTEM_OUT">
  5. <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
  6. </Console>
  7. </appenders>
  8. <loggers>
  9. <root level="info">
  10. <appender-ref ref="Console"/>
  11. </root>
  12. </loggers>
  13. </configuration>

六.Transport API使用示例

1.创建连接InitClient.java

  1. package com.es.demo;
  2. import org.elasticsearch.client.transport.TransportClient;
  3. import org.elasticsearch.common.settings.Settings;
  4. import org.elasticsearch.common.transport.TransportAddress;
  5. import org.elasticsearch.transport.client.PreBuiltTransportClient;
  6. import java.net.InetAddress;
  7. import java.net.UnknownHostException;
  8. public class InitClient {
  9. private static TransportClient client;
  10. public static TransportClient getClient() throws UnknownHostException {
  11. if(client == null) {
  12. //client = new PreBuiltTransportClient(Settings.EMPTY)
  13. // 连接集群的设置
  14. Settings settings = Settings.builder()
  15. .put("cluster.name", "my-application") //如果集群的名字不是默认的elasticsearch,需指定
  16. .put("client.transport.sniff", false) //自动嗅探
  17. .build();
  18. client = new PreBuiltTransportClient(settings)
  19. //.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
  20. .addTransportAddress(new TransportAddress(InetAddress.getByName("start.com"), 9300));
  21. //可用连接设置参数说明
  22. /*
  23. cluster.name
  24. 指定集群的名字,如果集群的名字不是默认的elasticsearch,需指定。
  25. client.transport.sniff
  26. 设置为true,将自动嗅探整个集群,自动加入集群的节点到连接列表中。
  27. client.transport.ignore_cluster_name
  28. Set to true to ignore cluster name validation of connected nodes. (since 0.19.4)
  29. client.transport.ping_timeout
  30. The time to wait for a ping response from a node. Defaults to 5s.
  31. client.transport.nodes_sampler_interval
  32. How often to sample / ping the nodes listed and connected. Defaults to 5s.
  33. */
  34. }
  35. return client;
  36. }
  37. }

client.transport.sniff:false //自动嗅探 ,我本机单节点设置为true会报错。

2. Create index 创建索引


CreateIndexDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.concurrent.ExecutionException;
  4. import org.elasticsearch.action.admin.indices.alias.Alias;
  5. import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
  6. import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
  7. import org.elasticsearch.client.transport.TransportClient;
  8. import org.elasticsearch.common.settings.Settings;
  9. import org.elasticsearch.common.xcontent.XContentType;
  10. public class CreateIndexDemo {
  11. public static void main(String[] args) {
  12. //这里和RESTful风格不同
  13. try (TransportClient client = InitDemo.getClient();) {
  14. // 1、创建 创建索引request
  15. CreateIndexRequest request = new CreateIndexRequest("mess");
  16. // 2、设置索引的settings
  17. request.settings(Settings.builder().put("index.number_of_shards", 3) // 分片数
  18. .put("index.number_of_replicas", 2) // 副本数
  19. .put("analysis.analyzer.default.tokenizer", "ik_smart") // 默认分词器
  20. );
  21. // 3、设置索引的mappings
  22. request.mapping("_doc",
  23. " {\n" +
  24. " \"_doc\": {\n" +
  25. " \"properties\": {\n" +
  26. " \"message\": {\n" +
  27. " \"type\": \"text\"\n" +
  28. " }\n" +
  29. " }\n" +
  30. " }\n" +
  31. " }",
  32. XContentType.JSON);
  33. // 4、 设置索引的别名
  34. request.alias(new Alias("mmm"));
  35. // 5、 发送请求 这里和RESTful风格不同
  36. CreateIndexResponse createIndexResponse = client.admin().indices()
  37. .create(request).get();
  38. // 6、处理响应
  39. boolean acknowledged = createIndexResponse.isAcknowledged();
  40. boolean shardsAcknowledged = createIndexResponse
  41. .isShardsAcknowledged();
  42. System.out.println("acknowledged = " + acknowledged);
  43. System.out.println("shardsAcknowledged = " + shardsAcknowledged);
  44. // listener方式发送请求
  45. /*ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
  46. @Override
  47. public void onResponse(
  48. CreateIndexResponse createIndexResponse) {
  49. // 6、处理响应
  50. boolean acknowledged = createIndexResponse.isAcknowledged();
  51. boolean shardsAcknowledged = createIndexResponse
  52. .isShardsAcknowledged();
  53. System.out.println("acknowledged = " + acknowledged);
  54. System.out.println(
  55. "shardsAcknowledged = " + shardsAcknowledged);
  56. }
  57. @Override
  58. public void onFailure(Exception e) {
  59. System.out.println("创建索引异常:" + e.getMessage());
  60. }
  61. };
  62. client.admin().indices().create(request, listener);
  63. */
  64. } catch (IOException | InterruptedException | ExecutionException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. }

3. index document

索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档
IndexDocumentDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.concurrent.ExecutionException;
  4. import org.apache.logging.log4j.LogManager;
  5. import org.apache.logging.log4j.Logger;
  6. import org.elasticsearch.ElasticsearchException;
  7. import org.elasticsearch.action.DocWriteResponse;
  8. import org.elasticsearch.action.index.IndexRequest;
  9. import org.elasticsearch.action.index.IndexResponse;
  10. import org.elasticsearch.action.support.replication.ReplicationResponse;
  11. import org.elasticsearch.client.transport.TransportClient;
  12. import org.elasticsearch.common.xcontent.XContentType;
  13. import org.elasticsearch.rest.RestStatus;
  14. public class IndexDocumentDemo {
  15. private static Logger logger = LogManager.getRootLogger();
  16. public static void main(String[] args) {
  17. //这里和RESTful风格不同
  18. try (TransportClient client = InitDemo.getClient();) {
  19. // 1、创建索引请求
  20. IndexRequest request = new IndexRequest(
  21. "mess", //索引
  22. "_doc", // mapping type
  23. "11"); //文档id
  24. // 2、准备文档数据
  25. // 方式一:直接给JSON串
  26. String jsonString = "{" +
  27. "\"user\":\"kimchy\"," +
  28. "\"postDate\":\"2013-01-30\"," +
  29. "\"message\":\"trying out Elasticsearch\"" +
  30. "}";
  31. request.source(jsonString, XContentType.JSON);
  32. // 方式二:以map对象来表示文档
  33. /*
  34. Map<String, Object> jsonMap = new HashMap<>();
  35. jsonMap.put("user", "kimchy");
  36. jsonMap.put("postDate", new Date());
  37. jsonMap.put("message", "trying out Elasticsearch");
  38. request.source(jsonMap);
  39. */
  40. // 方式三:用XContentBuilder来构建文档
  41. /*
  42. XContentBuilder builder = XContentFactory.jsonBuilder();
  43. builder.startObject();
  44. {
  45. builder.field("user", "kimchy");
  46. builder.field("postDate", new Date());
  47. builder.field("message", "trying out Elasticsearch");
  48. }
  49. builder.endObject();
  50. request.source(builder);
  51. */
  52. // 方式四:直接用key-value对给出
  53. /*
  54. request.source("user", "kimchy",
  55. "postDate", new Date(),
  56. "message", "trying out Elasticsearch");
  57. */
  58. //3、其他的一些可选设置
  59. /*
  60. request.routing("routing"); //设置routing值
  61. request.timeout(TimeValue.timeValueSeconds(1)); //设置主分片等待时长
  62. request.setRefreshPolicy("wait_for"); //设置重刷新策略
  63. request.version(2); //设置版本号
  64. request.opType(DocWriteRequest.OpType.CREATE); //操作类别
  65. */
  66. //4、发送请求
  67. IndexResponse indexResponse = null;
  68. try {
  69. //方式一: 用client.index 方法,返回是 ActionFuture<IndexResponse>,再调用get获取响应结果
  70. indexResponse = client.index(request).get();
  71. //方式二:client提供了一个 prepareIndex方法,内部为我们创建IndexRequest
  72. /*IndexResponse indexResponse = client.prepareIndex("mess","_doc","11")
  73. .setSource(jsonString, XContentType.JSON)
  74. .get();*/
  75. //方式三:request + listener
  76. //client.index(request, listener);
  77. } catch(ElasticsearchException e) {
  78. // 捕获,并处理异常
  79. //判断是否版本冲突、create但文档已存在冲突
  80. if (e.status() == RestStatus.CONFLICT) {
  81. logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage());
  82. }
  83. logger.error("索引异常", e);
  84. }catch (InterruptedException | ExecutionException e) {
  85. logger.error("索引异常", e);
  86. }
  87. //5、处理响应
  88. if(indexResponse != null) {
  89. String index = indexResponse.getIndex();
  90. String type = indexResponse.getType();
  91. String id = indexResponse.getId();
  92. long version = indexResponse.getVersion();
  93. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
  94. System.out.println("新增文档成功,处理逻辑代码写到这里。");
  95. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  96. System.out.println("修改文档成功,处理逻辑代码写到这里。");
  97. }
  98. // 分片处理信息
  99. ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
  100. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  101. }
  102. // 如果有分片副本失败,可以获得失败原因信息
  103. if (shardInfo.getFailed() > 0) {
  104. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  105. String reason = failure.reason();
  106. System.out.println("副本失败原因:" + reason);
  107. }
  108. }
  109. }
  110. //listener 方式
  111. /*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
  112. @Override
  113. public void onResponse(IndexResponse indexResponse) {
  114. }
  115. @Override
  116. public void onFailure(Exception e) {
  117. }
  118. };
  119. client.index(request, listener);
  120. */
  121. } catch (IOException e) {
  122. e.printStackTrace();
  123. }
  124. }
  125. }

4. get document

获取文档数据
GetDocumentDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.Map;
  4. import java.util.concurrent.ExecutionException;
  5. import org.apache.logging.log4j.LogManager;
  6. import org.apache.logging.log4j.Logger;
  7. import org.elasticsearch.ElasticsearchException;
  8. import org.elasticsearch.action.get.GetRequest;
  9. import org.elasticsearch.action.get.GetResponse;
  10. import org.elasticsearch.client.transport.TransportClient;
  11. import org.elasticsearch.common.Strings;
  12. import org.elasticsearch.rest.RestStatus;
  13. import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
  14. public class GetDocumentDemo {
  15. private static Logger logger = LogManager.getRootLogger();
  16. public static void main(String[] args) {
  17. //这里和RESTful风格不同
  18. try (TransportClient client = InitDemo.getClient();) {
  19. // 1、创建获取文档请求
  20. GetRequest request = new GetRequest(
  21. "mess", //索引
  22. "_doc", // mapping type
  23. "11"); //文档id
  24. // 2、可选的设置
  25. //request.routing("routing");
  26. //request.version(2);
  27. //request.fetchSourceContext(new FetchSourceContext(false)); //是否获取_source字段
  28. //选择返回的字段
  29. String[] includes = new String[]{"message", "*Date"};
  30. String[] excludes = Strings.EMPTY_ARRAY;
  31. FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
  32. request.fetchSourceContext(fetchSourceContext);
  33. //也可写成这样
  34. /*String[] includes = Strings.EMPTY_ARRAY;
  35. String[] excludes = new String[]{"message"};
  36. FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
  37. request.fetchSourceContext(fetchSourceContext);*/
  38. // 取stored字段
  39. /*request.storedFields("message");
  40. GetResponse getResponse = client.get(request);
  41. String message = getResponse.getField("message").getValue();*/
  42. //3、发送请求
  43. GetResponse getResponse = null;
  44. try {
  45. getResponse = client.get(request).get();
  46. } catch (ElasticsearchException e) {
  47. if (e.status() == RestStatus.NOT_FOUND) {
  48. logger.error("没有找到该id的文档" );
  49. }
  50. if (e.status() == RestStatus.CONFLICT) {
  51. logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" );
  52. }
  53. logger.error("获取文档异常", e);
  54. }catch (InterruptedException | ExecutionException e) {
  55. logger.error("索引异常", e);
  56. }
  57. //4、处理响应
  58. if(getResponse != null) {
  59. String index = getResponse.getIndex();
  60. String type = getResponse.getType();
  61. String id = getResponse.getId();
  62. if (getResponse.isExists()) { // 文档存在
  63. long version = getResponse.getVersion();
  64. String sourceAsString = getResponse.getSourceAsString(); //结果取成 String
  65. Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // 结果取成Map
  66. byte[] sourceAsBytes = getResponse.getSourceAsBytes(); //结果取成字节数组
  67. logger.info("index:" + index + " type:" + type + " id:" + id);
  68. logger.info(sourceAsString);
  69. } else {
  70. logger.error("没有找到该id的文档" );
  71. }
  72. }
  73. //异步方式发送获取文档请求
  74. /*
  75. ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
  76. @Override
  77. public void onResponse(GetResponse getResponse) {
  78. }
  79. @Override
  80. public void onFailure(Exception e) {
  81. }
  82. };
  83. client.getAsync(request, listener);
  84. */
  85. } catch (IOException e) {
  86. e.printStackTrace();
  87. }
  88. }
  89. }

5. Bulk


批量索引文档,即批量往索引里面放入文档数据.类似于数据库里面批量向表里面插入多行数据,一行数据就是一个文档
BulkDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.concurrent.ExecutionException;
  4. import org.apache.logging.log4j.LogManager;
  5. import org.apache.logging.log4j.Logger;
  6. import org.elasticsearch.action.DocWriteRequest;
  7. import org.elasticsearch.action.DocWriteResponse;
  8. import org.elasticsearch.action.bulk.BulkItemResponse;
  9. import org.elasticsearch.action.bulk.BulkRequest;
  10. import org.elasticsearch.action.bulk.BulkResponse;
  11. import org.elasticsearch.action.delete.DeleteResponse;
  12. import org.elasticsearch.action.index.IndexRequest;
  13. import org.elasticsearch.action.index.IndexResponse;
  14. import org.elasticsearch.action.update.UpdateResponse;
  15. import org.elasticsearch.client.transport.TransportClient;
  16. import org.elasticsearch.common.xcontent.XContentType;
  17. public class BulkDemo {
  18. private static Logger logger = LogManager.getRootLogger();
  19. public static void main(String[] args) {
  20. //这里和RESTful风格不同
  21. try (TransportClient client = InitDemo.getClient();) {
  22. // 1、创建批量操作请求
  23. BulkRequest request = new BulkRequest();
  24. request.add(new IndexRequest("mess", "_doc", "1")
  25. .source(XContentType.JSON,"field", "foo"));
  26. request.add(new IndexRequest("mess", "_doc", "2")
  27. .source(XContentType.JSON,"field", "bar"));
  28. request.add(new IndexRequest("mess", "_doc", "3")
  29. .source(XContentType.JSON,"field", "baz"));
  30. /*
  31. request.add(new DeleteRequest("mess", "_doc", "3"));
  32. request.add(new UpdateRequest("mess", "_doc", "2")
  33. .doc(XContentType.JSON,"other", "test"));
  34. request.add(new IndexRequest("mess", "_doc", "4")
  35. .source(XContentType.JSON,"field", "baz"));
  36. */
  37. // 2、可选的设置
  38. /*
  39. request.timeout("2m");
  40. request.setRefreshPolicy("wait_for");
  41. request.waitForActiveShards(2);
  42. */
  43. //3、发送请求
  44. // 同步请求
  45. BulkResponse bulkResponse = client.bulk(request).get();
  46. //4、处理响应
  47. if(bulkResponse != null) {
  48. for (BulkItemResponse bulkItemResponse : bulkResponse) {
  49. DocWriteResponse itemResponse = bulkItemResponse.getResponse();
  50. if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
  51. || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
  52. IndexResponse indexResponse = (IndexResponse) itemResponse;
  53. //TODO 新增成功的处理
  54. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
  55. UpdateResponse updateResponse = (UpdateResponse) itemResponse;
  56. //TODO 修改成功的处理
  57. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
  58. DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
  59. //TODO 删除成功的处理
  60. }
  61. }
  62. }
  63. //异步方式发送批量操作请求
  64. /*
  65. ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
  66. @Override
  67. public void onResponse(BulkResponse bulkResponse) {
  68. }
  69. @Override
  70. public void onFailure(Exception e) {
  71. }
  72. };
  73. client.bulkAsync(request, listener);
  74. */
  75. } catch (IOException | InterruptedException | ExecutionException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. }

6. search

搜索数据
SearchDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.Map;
  4. import java.util.concurrent.ExecutionException;
  5. import java.util.concurrent.TimeUnit;
  6. import org.apache.logging.log4j.LogManager;
  7. import org.apache.logging.log4j.Logger;
  8. import org.elasticsearch.action.search.SearchRequest;
  9. import org.elasticsearch.action.search.SearchResponse;
  10. import org.elasticsearch.action.search.ShardSearchFailure;
  11. import org.elasticsearch.client.transport.TransportClient;
  12. import org.elasticsearch.common.unit.TimeValue;
  13. import org.elasticsearch.index.query.QueryBuilders;
  14. import org.elasticsearch.rest.RestStatus;
  15. import org.elasticsearch.search.SearchHit;
  16. import org.elasticsearch.search.SearchHits;
  17. import org.elasticsearch.search.builder.SearchSourceBuilder;
  18. public class SearchDemo {
  19. private static Logger logger = LogManager.getRootLogger();
  20. public static void main(String[] args) {
  21. try (TransportClient client = InitDemo.getClient();) {
  22. // 1、创建search请求
  23. //SearchRequest searchRequest = new SearchRequest();
  24. SearchRequest searchRequest = new SearchRequest("bank");
  25. searchRequest.types("_doc");
  26. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  27. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  28. //构造QueryBuilder
  29. /*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
  30. .fuzziness(Fuzziness.AUTO)
  31. .prefixLength(3)
  32. .maxExpansions(10);
  33. sourceBuilder.query(matchQueryBuilder);*/
  34. sourceBuilder.query(QueryBuilders.termQuery("age", 24));
  35. sourceBuilder.from(0);
  36. sourceBuilder.size(10);
  37. sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
  38. //是否返回_source字段
  39. //sourceBuilder.fetchSource(false);
  40. //设置返回哪些字段
  41. /*String[] includeFields = new String[] {"title", "user", "innerObject.*"};
  42. String[] excludeFields = new String[] {"_type"};
  43. sourceBuilder.fetchSource(includeFields, excludeFields);*/
  44. //指定排序
  45. //sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
  46. //sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));
  47. // 设置返回 profile
  48. //sourceBuilder.profile(true);
  49. //将请求体加入到请求中
  50. searchRequest.source(sourceBuilder);
  51. // 可选的设置
  52. //searchRequest.routing("routing");
  53. // 高亮设置
  54. /*
  55. HighlightBuilder highlightBuilder = new HighlightBuilder();
  56. HighlightBuilder.Field highlightTitle =
  57. new HighlightBuilder.Field("title");
  58. highlightTitle.highlighterType("unified");
  59. highlightBuilder.field(highlightTitle);
  60. HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
  61. highlightBuilder.field(highlightUser);
  62. sourceBuilder.highlighter(highlightBuilder);*/
  63. //加入聚合
  64. /*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
  65. .field("company.keyword");
  66. aggregation.subAggregation(AggregationBuilders.avg("average_age")
  67. .field("age"));
  68. sourceBuilder.aggregation(aggregation);*/
  69. //做查询建议
  70. /*SuggestionBuilder termSuggestionBuilder =
  71. SuggestBuilders.termSuggestion("user").text("kmichy");
  72. SuggestBuilder suggestBuilder = new SuggestBuilder();
  73. suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
  74. sourceBuilder.suggest(suggestBuilder);*/
  75. //3、发送请求
  76. SearchResponse searchResponse = client.search(searchRequest).get();
  77. //4、处理响应
  78. //搜索结果状态信息
  79. RestStatus status = searchResponse.status();
  80. TimeValue took = searchResponse.getTook();
  81. Boolean terminatedEarly = searchResponse.isTerminatedEarly();
  82. boolean timedOut = searchResponse.isTimedOut();
  83. //分片搜索情况
  84. int totalShards = searchResponse.getTotalShards();
  85. int successfulShards = searchResponse.getSuccessfulShards();
  86. int failedShards = searchResponse.getFailedShards();
  87. for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
  88. // failures should be handled here
  89. }
  90. //处理搜索命中文档结果
  91. SearchHits hits = searchResponse.getHits();
  92. long totalHits = hits.getTotalHits();
  93. float maxScore = hits.getMaxScore();
  94. SearchHit[] searchHits = hits.getHits();
  95. for (SearchHit hit : searchHits) {
  96. // do something with the SearchHit
  97. String index = hit.getIndex();
  98. String type = hit.getType();
  99. String id = hit.getId();
  100. float score = hit.getScore();
  101. //取_source字段值
  102. String sourceAsString = hit.getSourceAsString(); //取成json串
  103. Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
  104. //从map中取字段值
  105. /*
  106. String documentTitle = (String) sourceAsMap.get("title");
  107. List<Object> users = (List<Object>) sourceAsMap.get("user");
  108. Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
  109. */
  110. logger.info("index:" + index + " type:" + type + " id:" + id);
  111. logger.info(sourceAsString);
  112. //取高亮结果
  113. /*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
  114. HighlightField highlight = highlightFields.get("title");
  115. Text[] fragments = highlight.fragments();
  116. String fragmentString = fragments[0].string();*/
  117. }
  118. // 获取聚合结果
  119. /*
  120. Aggregations aggregations = searchResponse.getAggregations();
  121. Terms byCompanyAggregation = aggregations.get("by_company");
  122. Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
  123. Avg averageAge = elasticBucket.getAggregations().get("average_age");
  124. double avg = averageAge.getValue();
  125. */
  126. // 获取建议结果
  127. /*Suggest suggest = searchResponse.getSuggest();
  128. TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
  129. for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
  130. for (TermSuggestion.Entry.Option option : entry) {
  131. String suggestText = option.getText().string();
  132. }
  133. }
  134. */
  135. //异步方式发送获查询请求
  136. /*
  137. ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
  138. @Override
  139. public void onResponse(SearchResponse getResponse) {
  140. //结果获取
  141. }
  142. @Override
  143. public void onFailure(Exception e) {
  144. //失败处理
  145. }
  146. };
  147. client.searchAsync(searchRequest, listener);
  148. */
  149. } catch (IOException | InterruptedException | ExecutionException e) {
  150. logger.error(e);
  151. }
  152. }
  153. }

7. highlight 高亮

HighlightDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.Map;
  4. import java.util.concurrent.ExecutionException;
  5. import org.apache.logging.log4j.LogManager;
  6. import org.apache.logging.log4j.Logger;
  7. import org.elasticsearch.action.search.SearchRequest;
  8. import org.elasticsearch.action.search.SearchResponse;
  9. import org.elasticsearch.client.transport.TransportClient;
  10. import org.elasticsearch.common.text.Text;
  11. import org.elasticsearch.index.query.QueryBuilder;
  12. import org.elasticsearch.index.query.QueryBuilders;
  13. import org.elasticsearch.rest.RestStatus;
  14. import org.elasticsearch.search.SearchHit;
  15. import org.elasticsearch.search.SearchHits;
  16. import org.elasticsearch.search.builder.SearchSourceBuilder;
  17. import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
  18. import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
  19. public class HighlightDemo {
  20. private static Logger logger = LogManager.getRootLogger();
  21. public static void main(String[] args) {
  22. try (TransportClient client = InitDemo.getClient();) {
  23. // 1、创建search请求
  24. SearchRequest searchRequest = new SearchRequest("hl_test");
  25. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  26. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  27. //构造QueryBuilder
  28. QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "lucene solr");
  29. sourceBuilder.query(matchQueryBuilder);
  30. //分页设置
  31. /*sourceBuilder.from(0);
  32. sourceBuilder.size(5); ;*/
  33. // 高亮设置
  34. HighlightBuilder highlightBuilder = new HighlightBuilder();
  35. highlightBuilder.requireFieldMatch(false).field("title").field("content")
  36. .preTags("<strong>").postTags("</strong>");
  37. //不同字段可有不同设置,如不同标签
  38. /*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title");
  39. highlightTitle.preTags("<strong>").postTags("</strong>");
  40. highlightBuilder.field(highlightTitle);
  41. HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content");
  42. highlightContent.preTags("<b>").postTags("</b>");
  43. highlightBuilder.field(highlightContent).requireFieldMatch(false);*/
  44. sourceBuilder.highlighter(highlightBuilder);
  45. searchRequest.source(sourceBuilder);
  46. //3、发送请求
  47. SearchResponse searchResponse = client.search(searchRequest).get();
  48. //4、处理响应
  49. if(RestStatus.OK.equals(searchResponse.status())) {
  50. //处理搜索命中文档结果
  51. SearchHits hits = searchResponse.getHits();
  52. long totalHits = hits.getTotalHits();
  53. SearchHit[] searchHits = hits.getHits();
  54. for (SearchHit hit : searchHits) {
  55. String index = hit.getIndex();
  56. String type = hit.getType();
  57. String id = hit.getId();
  58. float score = hit.getScore();
  59. //取_source字段值
  60. //String sourceAsString = hit.getSourceAsString(); //取成json串
  61. Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
  62. //从map中取字段值
  63. /*String title = (String) sourceAsMap.get("title");
  64. String content = (String) sourceAsMap.get("content"); */
  65. logger.info("index:" + index + " type:" + type + " id:" + id);
  66. logger.info("sourceMap : " + sourceAsMap);
  67. //取高亮结果
  68. Map<String, HighlightField> highlightFields = hit.getHighlightFields();
  69. HighlightField highlight = highlightFields.get("title");
  70. if(highlight != null) {
  71. Text[] fragments = highlight.fragments(); //多值的字段会有多个值
  72. if(fragments != null) {
  73. String fragmentString = fragments[0].string();
  74. logger.info("title highlight : " + fragmentString);
  75. //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
  76. //sourceAsMap.put("title", fragmentString);
  77. }
  78. }
  79. highlight = highlightFields.get("content");
  80. if(highlight != null) {
  81. Text[] fragments = highlight.fragments(); //多值的字段会有多个值
  82. if(fragments != null) {
  83. String fragmentString = fragments[0].string();
  84. logger.info("content highlight : " + fragmentString);
  85. //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
  86. //sourceAsMap.put("content", fragmentString);
  87. }
  88. }
  89. }
  90. }
  91. } catch (IOException | InterruptedException | ExecutionException e) {
  92. logger.error(e);
  93. }
  94. }
  95. }

8. suggest 查询建议

SuggestDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.concurrent.ExecutionException;
  4. import org.apache.logging.log4j.LogManager;
  5. import org.apache.logging.log4j.Logger;
  6. import org.elasticsearch.action.search.SearchRequest;
  7. import org.elasticsearch.action.search.SearchResponse;
  8. import org.elasticsearch.client.transport.TransportClient;
  9. import org.elasticsearch.rest.RestStatus;
  10. import org.elasticsearch.search.builder.SearchSourceBuilder;
  11. import org.elasticsearch.search.suggest.Suggest;
  12. import org.elasticsearch.search.suggest.SuggestBuilder;
  13. import org.elasticsearch.search.suggest.SuggestBuilders;
  14. import org.elasticsearch.search.suggest.SuggestionBuilder;
  15. import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
  16. import org.elasticsearch.search.suggest.term.TermSuggestion;
  17. public class SuggestDemo {
  18. private static Logger logger = LogManager.getRootLogger();
  19. //拼写检查
  20. public static void termSuggest(TransportClient client) {
  21. // 1、创建search请求
  22. //SearchRequest searchRequest = new SearchRequest();
  23. SearchRequest searchRequest = new SearchRequest("mess");
  24. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  25. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  26. sourceBuilder.size(0);
  27. //做查询建议
  28. //词项建议
  29. SuggestionBuilder termSuggestionBuilder =
  30. SuggestBuilders.termSuggestion("user").text("kmichy");
  31. SuggestBuilder suggestBuilder = new SuggestBuilder();
  32. suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
  33. sourceBuilder.suggest(suggestBuilder);
  34. searchRequest.source(sourceBuilder);
  35. try{
  36. //3、发送请求
  37. SearchResponse searchResponse = client.search(searchRequest).get();
  38. //4、处理响应
  39. //搜索结果状态信息
  40. if(RestStatus.OK.equals(searchResponse.status())) {
  41. // 获取建议结果
  42. Suggest suggest = searchResponse.getSuggest();
  43. TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
  44. for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
  45. logger.info("text: " + entry.getText().string());
  46. for (TermSuggestion.Entry.Option option : entry) {
  47. String suggestText = option.getText().string();
  48. logger.info(" suggest option : " + suggestText);
  49. }
  50. }
  51. }
  52. } catch (InterruptedException | ExecutionException e) {
  53. logger.error(e);
  54. }
  55. /*
  56. "suggest": {
  57. "my-suggestion": [
  58. {
  59. "text": "tring",
  60. "offset": 0,
  61. "length": 5,
  62. "options": [
  63. {
  64. "text": "trying",
  65. "score": 0.8,
  66. "freq": 1
  67. }
  68. ]
  69. },
  70. {
  71. "text": "out",
  72. "offset": 6,
  73. "length": 3,
  74. "options": []
  75. },
  76. {
  77. "text": "elasticsearch",
  78. "offset": 10,
  79. "length": 13,
  80. "options": []
  81. }
  82. ]
  83. }*/
  84. }
  85. //自动补全
  86. public static void completionSuggester(TransportClient client) {
  87. // 1、创建search请求
  88. //SearchRequest searchRequest = new SearchRequest();
  89. SearchRequest searchRequest = new SearchRequest("music");
  90. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  91. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  92. sourceBuilder.size(0);
  93. //做查询建议
  94. //自动补全
  95. /*POST music/_search?pretty
  96. {
  97. "suggest": {
  98. "song-suggest" : {
  99. "prefix" : "lucene s",
  100. "completion" : {
  101. "field" : "suggest" ,
  102. "skip_duplicates": true
  103. }
  104. }
  105. }
  106. }*/
  107. SuggestionBuilder termSuggestionBuilder =
  108. SuggestBuilders.completionSuggestion("suggest").prefix("lucene s")
  109. .skipDuplicates(true);
  110. SuggestBuilder suggestBuilder = new SuggestBuilder();
  111. suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder);
  112. sourceBuilder.suggest(suggestBuilder);
  113. searchRequest.source(sourceBuilder);
  114. try {
  115. //3、发送请求
  116. SearchResponse searchResponse = client.search(searchRequest).get();
  117. //4、处理响应
  118. //搜索结果状态信息
  119. if(RestStatus.OK.equals(searchResponse.status())) {
  120. // 获取建议结果
  121. Suggest suggest = searchResponse.getSuggest();
  122. CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest");
  123. for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) {
  124. logger.info("text: " + entry.getText().string());
  125. for (CompletionSuggestion.Entry.Option option : entry) {
  126. String suggestText = option.getText().string();
  127. logger.info(" suggest option : " + suggestText);
  128. }
  129. }
  130. }
  131. } catch (InterruptedException | ExecutionException e) {
  132. logger.error(e);
  133. }
  134. }
  135. public static void main(String[] args) {
  136. try (TransportClient client = InitDemo.getClient();) {
  137. termSuggest(client);
  138. logger.info("--------------------------------------");
  139. completionSuggester(client);
  140. } catch (IOException e) {
  141. logger.error(e);
  142. }
  143. }
  144. }

9. aggregation 聚合分析

AggregationDemo.java

  1. package com.study.es_java_client;
  2. import java.io.IOException;
  3. import java.util.concurrent.ExecutionException;
  4. import org.apache.logging.log4j.LogManager;
  5. import org.apache.logging.log4j.Logger;
  6. import org.elasticsearch.action.search.SearchRequest;
  7. import org.elasticsearch.action.search.SearchResponse;
  8. import org.elasticsearch.client.transport.TransportClient;
  9. import org.elasticsearch.rest.RestStatus;
  10. import org.elasticsearch.search.aggregations.AggregationBuilders;
  11. import org.elasticsearch.search.aggregations.Aggregations;
  12. import org.elasticsearch.search.aggregations.BucketOrder;
  13. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  14. import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
  15. import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
  16. import org.elasticsearch.search.aggregations.metrics.avg.Avg;
  17. import org.elasticsearch.search.builder.SearchSourceBuilder;
  18. public class AggregationDemo {
  19. private static Logger logger = LogManager.getRootLogger();
  20. public static void main(String[] args) {
  21. try (TransportClient client = InitDemo.getClient();) {
  22. // 1、创建search请求
  23. //SearchRequest searchRequest = new SearchRequest();
  24. SearchRequest searchRequest = new SearchRequest("bank");
  25. // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
  26. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  27. sourceBuilder.size(0);
  28. //加入聚合
  29. //字段值项分组聚合
  30. TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age")
  31. .field("age").order(BucketOrder.aggregation("average_balance", true));
  32. //计算每组的平均balance指标
  33. aggregation.subAggregation(AggregationBuilders.avg("average_balance")
  34. .field("balance"));
  35. sourceBuilder.aggregation(aggregation);
  36. searchRequest.source(sourceBuilder);
  37. //3、发送请求
  38. SearchResponse searchResponse = client.search(searchRequest).get();
  39. //4、处理响应
  40. //搜索结果状态信息
  41. if(RestStatus.OK.equals(searchResponse.status())) {
  42. // 获取聚合结果
  43. Aggregations aggregations = searchResponse.getAggregations();
  44. Terms byAgeAggregation = aggregations.get("by_age");
  45. logger.info("aggregation by_age 结果");
  46. logger.info("docCountError: " + byAgeAggregation.getDocCountError());
  47. logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts());
  48. logger.info("------------------------------------");
  49. for(Bucket buck : byAgeAggregation.getBuckets()) {
  50. logger.info("key: " + buck.getKeyAsNumber());
  51. logger.info("docCount: " + buck.getDocCount());
  52. //logger.info("docCountError: " + buck.getDocCountError());
  53. //取子聚合
  54. Avg averageBalance = buck.getAggregations().get("average_balance");
  55. logger.info("average_balance: " + averageBalance.getValue());
  56. logger.info("------------------------------------");
  57. }
  58. //直接用key 来去分组
  59. /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24");
  60. Avg averageAge = elasticBucket.getAggregations().get("average_age");
  61. double avg = averageAge.getValue();*/
  62. }
  63. } catch (IOException | InterruptedException | ExecutionException e) {
  64. logger.error(e);
  65. }
  66. }
  67. }

10. 官网文档

Document API 文档操作API:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-docs.html
Search API:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-search.html

11.源代码获取地址

https://github.com/Star-Lordxing/ES-java-client-api

七、集成Spring

1.集成spring参考官方文档

官网链接:
https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/
代码库:
https://github.com/spring-projects/spring-data-elasticsearch

2.集成spring boot

参考博客:https://blog.csdn.net/yejingtao703/article/details/78414874
spring 最新集成包只有ES 5.5,推荐使用ES提供原生Client包

参考
Elasticsearch API
https://www.cnblogs.com/leeSmall/p/9218779.html