JavaSpringBootElasticsearch

一、简介

实际的项目开发过程中,通常基于某些主流框架平台进行技术开发,比如 SpringBoot,以 SpringBoot 整合 ElasticSearch 为例,这里详细的介绍 ElasticSearch 的使用!
SpringBoot 连接 ElasticSearch,主流的方式有以下四种方式

  • 方式一:通过Elastic Transport Client客户端连接 es 服务器,底层基于 TCP 协议通过 transport 模块和远程 ES 服务端通信,不过,从 V7.0 开始官方不建议使用,V8.0开始正式移除。
  • 方式二:通过Elastic Java Low Level Rest Client客户端连接 es 服务器,底层基于 HTTP 协议通过 restful API 来和远程 ES 服务端通信,只提供了最简单最基本的 API,类似于上篇文章中给大家介绍的 API 操作逻辑
  • 方式三:通过Elastic Java High Level Rest Client客户端连接 es 服务器,底层基于Elastic Java Low Level Rest Client客户端做了一层封装,提供了更高级得 API 且和Elastic Transport Client接口及参数保持一致,官方推荐的 es 客户端。
  • 方式四:通过JestClient客户端连接 es 服务器,这是开源社区基于 HTTP 协议开发的一款 es 客户端,官方宣称接口及代码设计比 ES 官方提供的 Rest 客户端更简洁、更合理,更好用,具有一定的 ES 服务端版本兼容性,但是更新速度不是很快,目前 ES 版本已经出到 V7.9,但是JestClient只支持 V1.0~V6.X 版 本的 ES。

还有一个需要大家注意的地方,那就是版本号的兼容!
在开发过程中,大家尤其需要关注一下客户端和服务端的版本号,要尽可能保持一致,比如服务端 es 的版本号是6.8.2,那么连接 es 的客户端版本号,最好也是6.8.2,即使因项目的原因不能保持一致,客户端的版本号必须在6.0.0 ~6.8.2,不要超过服务器的版本号,这样客户端才能保持正常工作,否则会出现很多意想不到的问题,假如客户端是7.0.4的版本号,此时的程序会各种报错,甚至没办法用!
为什么要这样做呢?主要原因就是 es 的服务端,高版本不兼容低版本;es6 和 es7 的某些 API 请求参数结构有着很大的区别,所以客户端和服务端版本号尽量保持一致。

二、代码实践

本文采用的SpringBoot版本号是2.1.0.RELEASE,服务端 es 的版本号是6.8.2,客户端采用的是官方推荐的Elastic Java High Level Rest Client版本号是6.4.2,方便与SpringBoot的版本兼容。

2.1、导入依赖

  1. <!--elasticsearch-->
  2. <dependency>
  3. <groupId>org.elasticsearch</groupId>
  4. <artifactId>elasticsearch</artifactId>
  5. <version>6.4.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.elasticsearch.client</groupId>
  9. <artifactId>elasticsearch-rest-client</artifactId>
  10. <version>6.4.2</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.elasticsearch.client</groupId>
  14. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  15. <version>6.4.2</version>
  16. </dependency>

2.2、配置环境变量

在application.properties全局配置文件中,配置elasticsearch自定义环境变量

  1. elasticsearch.scheme=http
  2. elasticsearch.address=127.0.0.1:9200
  3. elasticsearch.userName=
  4. elasticsearch.userPwd=
  5. elasticsearch.socketTimeout=5000
  6. elasticsearch.connectTimeout=5000
  7. elasticsearch.connectionRequestTimeout=5000

2.3、创建 elasticsearch 的 config 类

  1. @Configuration
  2. public class ElasticsearchConfiguration {
  3. private static final Logger log = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
  4. private static final int ADDRESS_LENGTH = 2;
  5. @Value("${elasticsearch.scheme:http}")
  6. private String scheme;
  7. @Value("${elasticsearch.address}")
  8. private String address;
  9. @Value("${elasticsearch.userName}")
  10. private String userName;
  11. @Value("${elasticsearch.userPwd}")
  12. private String userPwd;
  13. @Value("${elasticsearch.socketTimeout:5000}")
  14. private Integer socketTimeout;
  15. @Value("${elasticsearch.connectTimeout:5000}")
  16. private Integer connectTimeout;
  17. @Value("${elasticsearch.connectionRequestTimeout:5000}")
  18. private Integer connectionRequestTimeout;
  19. /**
  20. * 初始化客户端
  21. * @return
  22. */
  23. @Bean(name = "restHighLevelClient")
  24. public RestHighLevelClient restClientBuilder() {
  25. HttpHost[] hosts = Arrays.stream(address.split(","))
  26. .map(this::buildHttpHost)
  27. .filter(Objects::nonNull)
  28. .toArray(HttpHost[]::new);
  29. RestClientBuilder restClientBuilder = RestClient.builder(hosts);
  30. // 异步参数配置
  31. restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
  32. httpClientBuilder.setDefaultCredentialsProvider(buildCredentialsProvider());
  33. return httpClientBuilder;
  34. });
  35. // 异步连接延时配置
  36. restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
  37. requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);
  38. requestConfigBuilder.setSocketTimeout(socketTimeout);
  39. requestConfigBuilder.setConnectTimeout(connectTimeout);
  40. return requestConfigBuilder;
  41. });
  42. return new RestHighLevelClient(restClientBuilder);
  43. }
  44. /**
  45. * 根据配置创建HttpHost
  46. * @param s
  47. * @return
  48. */
  49. private HttpHost buildHttpHost(String s) {
  50. String[] address = s.split(":");
  51. if (address.length == ADDRESS_LENGTH) {
  52. String ip = address[0];
  53. int port = Integer.parseInt(address[1]);
  54. return new HttpHost(ip, port, scheme);
  55. } else {
  56. return null;
  57. }
  58. }
  59. /**
  60. * 构建认证服务
  61. * @return
  62. */
  63. private CredentialsProvider buildCredentialsProvider(){
  64. final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  65. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName,
  66. userPwd));
  67. return credentialsProvider;
  68. }
  69. }

至此,客户端配置完毕,项目启动的时候,会自动注入到Spring的ioc容器里面。

2.4、索引管理

es 中最重要的就是索引库,客户端如何创建呢?

创建索引

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class IndexJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 创建索引(简单模式)
  8. * @throws IOException
  9. */
  10. @Test
  11. public void createIndex() throws IOException {
  12. CreateIndexRequest request = new CreateIndexRequest("cs_index");
  13. CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
  14. System.out.println(response.isAcknowledged());
  15. }
  16. /**
  17. * 创建索引(复杂模式)
  18. * 可以直接把对应的文档结构也一并初始化
  19. * @throws IOException
  20. */
  21. @Test
  22. public void createIndexComplete() throws IOException {
  23. CreateIndexRequest request = new CreateIndexRequest();
  24. //索引名称
  25. request.index("cs_index");
  26. //索引配置
  27. Settings settings = Settings.builder()
  28. .put("index.number_of_shards", 3)
  29. .put("index.number_of_replicas", 1)
  30. .build();
  31. request.settings(settings);
  32. //映射结构字段
  33. Map<String, Object> properties = new HashMap();
  34. properties.put("id", ImmutableBiMap.of("type", "text"));
  35. properties.put("name", ImmutableBiMap.of("type", "text"));
  36. properties.put("sex", ImmutableBiMap.of("type", "text"));
  37. properties.put("age", ImmutableBiMap.of("type", "long"));
  38. properties.put("city", ImmutableBiMap.of("type", "text"));
  39. properties.put("createTime", ImmutableBiMap.of("type", "long"));
  40. Map<String, Object> mapping = new HashMap<>();
  41. mapping.put("properties", properties);
  42. //添加一个默认类型
  43. System.out.println(JSON.toJSONString(request));
  44. request.mapping("_doc",mapping);
  45. CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
  46. System.out.println(response.isAcknowledged());
  47. }
  48. }

删除索引

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class IndexJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 删除索引
  8. * @throws IOException
  9. */
  10. @Test
  11. public void deleteIndex() throws IOException {
  12. DeleteIndexRequest request = new DeleteIndexRequest("cs_index1");
  13. AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
  14. System.out.println(response.isAcknowledged());
  15. }
  16. }

查询索引

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class IndexJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 查询索引
  8. * @throws IOException
  9. */
  10. @Test
  11. public void getIndex() throws IOException {
  12. // 创建请求
  13. GetIndexRequest request = new GetIndexRequest();
  14. request.indices("cs_index");
  15. // 执行请求,获取响应
  16. GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
  17. System.out.println(response.toString());
  18. }
  19. }

查询索引是否存在

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class IndexJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 检查索引是否存在
  8. * @throws IOException
  9. */
  10. @Test
  11. public void exists() throws IOException {
  12. // 创建请求
  13. GetIndexRequest request = new GetIndexRequest();
  14. request.indices("cs_index");
  15. // 执行请求,获取响应
  16. boolean response = client.indices().exists(request, RequestOptions.DEFAULT);
  17. System.out.println(response);
  18. }
  19. }

查询所有的索引名称

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class IndexJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 查询所有的索引名称
  8. * @throws IOException
  9. */
  10. @Test
  11. public void getAllIndices() throws IOException {
  12. GetAliasesRequest request = new GetAliasesRequest();
  13. GetAliasesResponse response = client.indices().getAlias(request,RequestOptions.DEFAULT);
  14. Map<String, Set<AliasMetaData>> map = response.getAliases();
  15. Set<String> indices = map.keySet();
  16. for (String key : indices) {
  17. System.out.println(key);
  18. }
  19. }
  20. }

查询索引映射字段

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class IndexJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 查询索引映射字段
  8. * @throws IOException
  9. */
  10. @Test
  11. public void getMapping() throws IOException {
  12. GetMappingsRequest request = new GetMappingsRequest();
  13. request.indices("cs_index");
  14. request.types("_doc");
  15. GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
  16. System.out.println(response.toString());
  17. }
  18. }

添加索引映射字段

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class IndexJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 添加索引映射字段
  8. * @throws IOException
  9. */
  10. @Test
  11. public void addMapping() throws IOException {
  12. PutMappingRequest request = new PutMappingRequest();
  13. request.indices("cs_index");
  14. request.type("_doc");
  15. //添加字段
  16. Map<String, Object> properties = new HashMap();
  17. properties.put("accountName", ImmutableBiMap.of("type", "keyword"));
  18. Map<String, Object> mapping = new HashMap<>();
  19. mapping.put("properties", properties);
  20. request.source(mapping);
  21. PutMappingResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT);
  22. System.out.println(response.isAcknowledged());
  23. }
  24. }

2.5、文档管理

所谓文档,就是向索引里面添加数据,方便进行数据查询,详细操作内容,请看下文!

添加文档

  1. public class UserDocument {
  2. private String id;
  3. private String name;
  4. private String sex;
  5. private Integer age;
  6. private String city;
  7. private Date createTime;
  8. //省略get、set...
  9. }
  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class DocJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 添加文档
  8. * @throws IOException
  9. */
  10. @Test
  11. public void addDocument() throws IOException {
  12. // 创建对象
  13. UserDocument user = new UserDocument();
  14. user.setId("1");
  15. user.setName("里斯");
  16. user.setCity("武汉");
  17. user.setSex("男");
  18. user.setAge(20);
  19. user.setCreateTime(new Date());
  20. // 创建索引,即获取索引
  21. IndexRequest request = new IndexRequest();
  22. // 外层参数
  23. request.id("1");
  24. request.index("cs_index");
  25. request.type("_doc");
  26. request.timeout(TimeValue.timeValueSeconds(1));
  27. // 存入对象
  28. request.source(JSON.toJSONString(user), XContentType.JSON);
  29. // 发送请求
  30. System.out.println(request.toString());
  31. IndexResponse response = client.index(request, RequestOptions.DEFAULT);
  32. System.out.println(response.toString());
  33. }
  34. }

更新文档

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class DocJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 更新文档(按需修改)
  8. * @throws IOException
  9. */
  10. @Test
  11. public void updateDocument() throws IOException {
  12. // 创建对象
  13. UserDocument user = new UserDocument();
  14. user.setId("2");
  15. user.setName("程咬金");
  16. user.setCreateTime(new Date());
  17. // 创建索引,即获取索引
  18. UpdateRequest request = new UpdateRequest();
  19. // 外层参数
  20. request.id("2");
  21. request.index("cs_index");
  22. request.type("_doc");
  23. request.timeout(TimeValue.timeValueSeconds(1));
  24. // 存入对象
  25. request.doc(JSON.toJSONString(user), XContentType.JSON);
  26. // 发送请求
  27. System.out.println(request.toString());
  28. UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
  29. System.out.println(response.toString());
  30. }
  31. }

删除文档

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class DocJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 删除文档
  8. * @throws IOException
  9. */
  10. @Test
  11. public void deleteDocument() throws IOException {
  12. // 创建索引,即获取索引
  13. DeleteRequest request = new DeleteRequest();
  14. // 外层参数
  15. request.id("1");
  16. request.index("cs_index");
  17. request.type("_doc");
  18. request.timeout(TimeValue.timeValueSeconds(1));
  19. // 发送请求
  20. System.out.println(request.toString());
  21. DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
  22. System.out.println(response.toString());
  23. }
  24. }

查询文档是不是存在

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class DocJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 查询文档是不是存在
  8. * @throws IOException
  9. */
  10. @Test
  11. public void exists() throws IOException {
  12. // 创建索引,即获取索引
  13. GetRequest request = new GetRequest();
  14. // 外层参数
  15. request.id("3");
  16. request.index("cs_index");
  17. request.type("_doc");
  18. // 发送请求
  19. System.out.println(request.toString());
  20. boolean response = client.exists(request, RequestOptions.DEFAULT);
  21. System.out.println(response);
  22. }
  23. }

通过 ID 查询指定文档

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class DocJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 通过ID,查询指定文档
  8. * @throws IOException
  9. */
  10. @Test
  11. public void getById() throws IOException {
  12. // 创建索引,即获取索引
  13. GetRequest request = new GetRequest();
  14. // 外层参数
  15. request.id("1");
  16. request.index("cs_index");
  17. request.type("_doc");
  18. // 发送请求
  19. System.out.println(request.toString());
  20. GetResponse response = client.get(request, RequestOptions.DEFAULT);
  21. System.out.println(response.toString());
  22. }
  23. }

批量添加文档

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest(classes = ElasticSearchApplication.class)
  3. public class DocJunit {
  4. @Autowired
  5. private RestHighLevelClient client;
  6. /**
  7. * 批量添加文档
  8. * @throws IOException
  9. */
  10. @Test
  11. public void batchAddDocument() throws IOException {
  12. // 批量请求
  13. BulkRequest bulkRequest = new BulkRequest();
  14. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  15. // 创建对象
  16. List<UserDocument> userArrayList = new ArrayList<>();
  17. userArrayList.add(new UserDocument("张三", "男", 30, "武汉"));
  18. userArrayList.add(new UserDocument("里斯", "女", 31, "北京"));
  19. userArrayList.add(new UserDocument("王五", "男", 32, "武汉"));
  20. userArrayList.add(new UserDocument("赵六", "女", 33, "长沙"));
  21. userArrayList.add(new UserDocument("七七", "男", 34, "武汉"));
  22. // 添加请求
  23. for (int i = 0; i < userArrayList.size(); i++) {
  24. userArrayList.get(i).setId(String.valueOf(i));
  25. IndexRequest indexRequest = new IndexRequest();
  26. // 外层参数
  27. indexRequest.id(String.valueOf(i));
  28. indexRequest.index("cs_index");
  29. indexRequest.type("_doc");
  30. indexRequest.timeout(TimeValue.timeValueSeconds(1));
  31. indexRequest.source(JSON.toJSONString(userArrayList.get(i)), XContentType.JSON);
  32. bulkRequest.add(indexRequest);
  33. }
  34. // 执行请求
  35. BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
  36. System.out.println(response.status());
  37. }
  38. }