1. Spring Data

目的:简化操作,实际就是将对索引、文档等的操作映射成对象进行操作

1.1 版本对照

目前最新 springboot 对应 Elasticsearch7.6.2,Spring boot2.3.x 一般可以兼容 Elasticsearch7.x

image.png

1.2 集成

1.2.1 依赖

  1. <!-- 继承Spring-boot-starter-parent -->
  2. <parent>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-parent</artifactId>
  5. <version>2.3.6.RELEASE</version>
  6. <relativePath/>
  7. </parent>
  8. <properties>
  9. <maven.compiler.source>8</maven.compiler.source>
  10. <maven.compiler.target>8</maven.compiler.target>
  11. </properties>
  12. <dependencies>
  13. <!-- lombok -->
  14. <dependency>
  15. <groupId>org.projectlombok</groupId>
  16. <artifactId>lombok</artifactId>
  17. </dependency>
  18. <!-- spring-data-es的startrer -->
  19. <dependency>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  22. </dependency>
  23. <!-- devtools-->
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-devtools</artifactId>
  27. <scope>runtime</scope>
  28. <optional>true</optional>
  29. </dependency>
  30. <!--spring test的starter-->
  31. <dependency>
  32. <groupId>org.springframework.boot</groupId>
  33. <artifactId>spring-boot-starter-test</artifactId>
  34. <scope>test</scope>
  35. </dependency>
  36. <!--spring boot test-->
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-test</artifactId>
  40. </dependency>
  41. <!--junit-->
  42. <dependency>
  43. <groupId>junit</groupId>
  44. <artifactId>junit</artifactId>
  45. </dependency>
  46. <!-- spring test -->
  47. <dependency>
  48. <groupId>org.springframework</groupId>
  49. <artifactId>spring-test</artifactId>
  50. </dependency>
  51. </dependencies>

1.2.2 整合

1.2.2.1 配置文件

  1. # 集群环境及访问方式
  2. elasticsearch:
  3. nodes: hadoop101:9200,hadoop102:9200,hadoop103:9200
  4. schema: http

1.2.2.2 启动程序

  1. /**
  2. * @author JShawn 2021/8/10 13:05
  3. */
  4. @SpringBootApplication
  5. public class SpringDataElasticSearchMainApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(SpringDataElasticSearchMainApplication.class, args);
  8. }
  9. }

1.2.2.3 实体类

实体类需要配置mapping映射关系 @Id —- 指定 **_id** @Document —- 指定**索引的名称、分片、副本**等信息 @Field —- 指定**字段的数据类型、分词器类型、是否索引、是否单独存储**等信息

  1. /**
  2. * @Id
  3. * 必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
  4. * @FiledType:
  5. * type : 字段数据类型
  6. * analyzer : 分词器类型
  7. * index : 是否索引(默认:true)
  8. * Keyword : 短语,不进行分词
  9. * store:是否单独存储
  10. */
  11. @Data
  12. @NoArgsConstructor
  13. @AllArgsConstructor
  14. @ToString
  15. @Document(indexName = "product", shards = 3, replicas = 1)
  16. public class Product {
  17. /**
  18. * 商品唯一标识
  19. */
  20. @Id
  21. private Long id;
  22. /**
  23. * 商品名称
  24. */
  25. @Field(type = FieldType.Text, analyzer = "ik_max_word")
  26. private String title;
  27. /**
  28. * 分类名称
  29. */
  30. @Field(type = FieldType.Keyword)
  31. private String category;
  32. /**
  33. * 商品价格
  34. */
  35. @Field(type = FieldType.Double)
  36. private Double price;
  37. /**
  38. * 图片地址
  39. */
  40. @Field(type = FieldType.Keyword, index = false)
  41. private String images;
  42. }

1.2.2.4 配置类

ElasticsearchRestTemplate 是 spring-data-elasticsearch 项目中的一个类,和其他 spring 项目中的 template类似。

在新版的 spring-data-elasticsearch 中,ElasticsearchRestTemplate 代替了原来的 ElasticsearchTemplate。原因是 ElasticsearchTemplate 基于 TransportClient,TransportClient 即将在 8.x 以后的版本中移除。所以,推荐使用 ElasticsearchRestTemplate。 ElasticsearchRestTemplate 基 于 RestHighLevelClient 客户端的。需要自定义配置类,继承 **AbstractElasticsearchConfiguration** ,并实现 **elasticsearchClient()** 抽象方法,创建 RestHighLevelClient 对象。

  1. /**
  2. * @author JShawn 2021/8/10 13:07
  3. */
  4. @ConfigurationProperties(prefix = "elasticsearch")
  5. @Configuration
  6. @Data
  7. public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
  8. /**
  9. * 集群地址
  10. */
  11. private String nodes;
  12. /**
  13. * 访问方式
  14. */
  15. private String schema;
  16. @Override
  17. public RestHighLevelClient elasticsearchClient() {
  18. // 1. 获取ES客户端
  19. // 1. ES集群地址配置
  20. String[] clusters = nodes.split(",");
  21. HttpHost[] hosts = new HttpHost[clusters.length];
  22. for (int i = 0; i < clusters.length; i++) {
  23. String[] address = clusters[i].split(":");
  24. HttpHost httpHost = new HttpHost(address[0], Integer.parseInt(address[1]), schema);
  25. hosts[i] = httpHost;
  26. }
  27. // 2. 创建客户端
  28. RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(hosts));
  29. return restHighLevelClient;
  30. }
  31. }

1.2.2.5 数据访问层

跟其他SpringData整合一样,继承一个Repository接口

  1. /**
  2. * @author JShawn 2021/8/10 13:19
  3. */
  4. @Repository
  5. public interface ProductDao extends ElasticsearchRepository<Product,Long> {
  6. }

1.2.2.6 索引CRUD

  1. /**
  2. * 索引CRUD测试
  3. *
  4. * @author JShawn 2021/8/10 17:15
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest
  8. public class SpringDataESIndexTest {
  9. /**
  10. * 注入 ElasticsearchRestTemplate
  11. */
  12. @Autowired
  13. private ElasticsearchRestTemplate elasticsearchRestTemplate;
  14. /**
  15. * 1. 创建索引并增加映射配置(映射已在实体类中进行配置)
  16. * 在初始化ElasticsearchRestTemplate时,也要初始化ProductDao
  17. * 因此在读取Product类时,如果发现此索引没被创建,会自动创建
  18. */
  19. @Test
  20. public void createIndex(){
  21. //创建索引,系统初始化会自动创建索引
  22. System.out.println("创建索引");
  23. }
  24. /**
  25. * 2. 删除索引
  26. */
  27. @Test
  28. public void deleteIndex(){
  29. //创建索引,系统初始化会自动创建索引
  30. boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
  31. System.out.println("删除索引 = " + flg);
  32. }
  33. }

1.2.2.7 文档CRUD

  1. /**
  2. * 文档CRUD测试
  3. *
  4. * @author JShawn 2021/8/10 17:23
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest
  8. public class SpringDataESProductDaoTest {
  9. @Autowired
  10. private ProductDao productDao;
  11. /**
  12. * 1. 新增文档
  13. * save()
  14. */
  15. @Test
  16. public void save(){
  17. Product product = new Product();
  18. product.setId(2L);
  19. product.setTitle("华为手机");
  20. product.setCategory("手机");
  21. product.setPrice(2999.0);
  22. product.setImages("http://www.jshawn/hw.jpg");
  23. productDao.save(product);
  24. }
  25. /**
  26. * 2. 修改文档
  27. * 也是save(),存在id对应的记录就修改,没有就新增
  28. */
  29. @Test
  30. public void update(){
  31. Product product = new Product();
  32. product.setId(1L);
  33. product.setTitle("小米 2 手机");
  34. product.setCategory("手机");
  35. product.setPrice(9999.0);
  36. product.setImages("http://www.jshawn/xm.jpg");
  37. productDao.save(product);
  38. }
  39. /**
  40. * 3. 根据 id 查询文档
  41. */
  42. @Test
  43. public void findById(){
  44. Product product = productDao.findById(1L).get();
  45. System.out.println(product);
  46. }
  47. /**
  48. * 4. 查询所有文档
  49. */
  50. @Test
  51. public void findAll(){
  52. Iterable<Product> products = productDao.findAll();
  53. for (Product product : products) {
  54. System.out.println(product);
  55. }
  56. }
  57. /**
  58. * 5. 删除
  59. */
  60. @Test
  61. public void delete(){
  62. Product product = new Product();
  63. product.setId(2L);
  64. productDao.delete(product);
  65. }
  66. /**
  67. * 6. 批量新增
  68. */
  69. @Test
  70. public void saveAll(){
  71. List<Product> productList = new ArrayList<>();
  72. for (int i = 0; i < 10; i++) {
  73. Product product = new Product();
  74. product.setId(Long.valueOf(i));
  75. product.setTitle("["+i+"]小米手机");
  76. product.setCategory("手机");
  77. product.setPrice(1999.0+i);
  78. product.setImages("http://www.atguigu/xm.jpg");
  79. productList.add(product);
  80. }
  81. productDao.saveAll(productList);
  82. }
  83. /**
  84. * 7. 分页查询
  85. */
  86. @Test
  87. public void findByPageable(){
  88. // 设置排序(排序方式,正序还是倒序,排序的 id)
  89. Sort sort = Sort.by(Sort.Direction.DESC,"id");
  90. // 当前页,特殊:******第一页从 0 开始,1 表示第二页******
  91. int currentPage=0;
  92. // 每页显示多少条
  93. int pageSize = 5;
  94. // 设置查询分页
  95. PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
  96. // 分页查询
  97. Page<Product> productPage = productDao.findAll(pageRequest);
  98. for (Product Product : productPage.getContent()) {
  99. System.out.println(Product);
  100. }
  101. }
  102. }

1.2.2.8 高级搜索

更多操作看Bos中的SpringData整合ES或Java API

  1. /**
  2. * 文档搜索测试------更多操作看Bos中的SpringData整合ES或Java API
  3. *
  4. * @author JShawn 2021/8/12 0:47
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest
  8. public class SpringDataESSearchTest {
  9. @Autowired
  10. private ProductDao productDao;
  11. /**
  12. * 1. term 查询
  13. * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
  14. */
  15. @Test
  16. public void termQuery(){
  17. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
  18. Iterable<Product> products = productDao.search(termQueryBuilder);
  19. for (Product product : products) {
  20. System.out.println(product);
  21. }
  22. }
  23. /**
  24. * 2. term 查询加分页
  25. */
  26. @Test
  27. public void termQueryByPage(){
  28. int currentPage= 0 ;
  29. int pageSize = 5;
  30. //设置查询分页
  31. PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
  32. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
  33. Iterable<Product> products = productDao.search(termQueryBuilder,pageRequest);
  34. for (Product product : products) {
  35. System.out.println(product);
  36. }
  37. }
  38. }

2. Spark Streaming

前提:需要Scala环境

2.1 依赖

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. </properties>
  5. <dependencies>
  6. <!-- Spark -->
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-core_2.12</artifactId>
  10. <version>3.0.0</version>
  11. </dependency>
  12. <!-- Spark Streaming -->
  13. <dependency>
  14. <groupId>org.apache.spark</groupId>
  15. <artifactId>spark-streaming_2.12</artifactId>
  16. <version>3.0.0</version>
  17. </dependency>
  18. <!-- ElasticSearch -->
  19. <dependency>
  20. <groupId>org.elasticsearch</groupId>
  21. <artifactId>elasticsearch</artifactId>
  22. <version>7.8.0</version>
  23. </dependency>
  24. <!-- elasticsearch 的高级客户端 -->
  25. <dependency>
  26. <groupId>org.elasticsearch.client</groupId>
  27. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  28. <version>7.8.0</version>
  29. </dependency>
  30. <!-- elasticsearch 依赖 2.x 的 log4j -->
  31. <dependency>
  32. <groupId>org.apache.logging.log4j</groupId>
  33. <artifactId>log4j-api</artifactId>
  34. <version>2.8.2</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.logging.log4j</groupId>
  38. <artifactId>log4j-core</artifactId>
  39. <version>2.8.2</version>
  40. </dependency>
  41. <!-- <dependency>-->
  42. <!-- <groupId>com.fasterxml.jackson.core</groupId>-->
  43. <!-- <artifactId>jackson-databind</artifactId>-->
  44. <!-- <version>2.11.1</version>-->
  45. <!-- </dependency>-->
  46. <!-- &lt;!&ndash; junit 单元测试 &ndash;&gt;-->
  47. <!-- <dependency>-->
  48. <!-- <groupId>junit</groupId>-->
  49. <!-- <artifactId>junit</artifactId>-->
  50. <!-- <version>4.12</version>-->
  51. <!-- </dependency>-->
  52. </dependencies>

2.2 集成

  1. object SparkStreamingESTest {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
  4. val ssc = new StreamingContext(sparkConf, Seconds(3))
  5. val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
  6. ds.foreachRDD(
  7. rdd => {
  8. println("*************** " + new Date())
  9. rdd.foreach(
  10. data => {
  11. val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));
  12. // 新增文档 - 请求对象
  13. val request = new IndexRequest();
  14. // 设置索引及唯一性标识
  15. val ss = data.split(" ")
  16. println("ss = " + ss.mkString(","))
  17. request.index("sparkstreaming").id(ss(0));
  18. val productJson =
  19. s"""
  20. | { "data":"${ss(1)}" }
  21. |""".stripMargin;
  22. // 添加文档数据,数据格式为 JSON 格式
  23. request.source(productJson,XContentType.JSON);
  24. // 客户端发送请求,获取响应对象
  25. val response = client.index(request, RequestOptions.DEFAULT);
  26. System.out.println("_index:" + response.getIndex());
  27. System.out.println("_id:" + response.getId());
  28. System.out.println("_result:" + response.getResult());
  29. client.close()
  30. }
  31. )
  32. }
  33. )
  34. ssc.start()
  35. ssc.awaitTermination()
  36. }
  37. }

3. Flink

可以参考Flink 五、Flink的流处理API 4.3 ElasticSearch

3.1 依赖

  1. <!-- Flink集成ES所需包 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
  5. <version>1.12.0</version>
  6. </dependency>
  7. <!-- Flink -->
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-scala_2.12</artifactId>
  11. <version>1.12.0</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-streaming-scala_2.12</artifactId>
  16. <version>1.12.0</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-clients_2.12</artifactId>
  21. <version>1.12.0</version>
  22. </dependency>

3.2 集成

  1. /**
  2. * Flink 集成 ES
  3. *
  4. * @author JShawn 2021/8/17 22:02
  5. */
  6. public class FlinkElasticSearchTest {
  7. public static void main(String[] args) throws Exception {
  8. //1. 构建Flink环境
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. // 2. source
  11. DataStream<String> source = env.socketTextStream("localhost", 7777);
  12. // 3.sink
  13. // 3.1 配置ES集群
  14. ArrayList<HttpHost> httpHosts = new ArrayList<>();
  15. httpHosts.add(new HttpHost("hadoop101", 9200));
  16. httpHosts.add(new HttpHost("hadoop102", 9200));
  17. httpHosts.add(new HttpHost("hadoop103", 9200));
  18. // 3.2 构建ES的Sink
  19. ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder(httpHosts, new ElasticsearchSinkFunction<String>() {
  20. @Override
  21. public void process(String source, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
  22. // 3.3 封装数据
  23. HashMap<String, String> map = Maps.newHashMap();
  24. map.put("data", source);
  25. // 创建请求,作为向ES发起的写入命令
  26. IndexRequest indexRequest = Requests.indexRequest()
  27. .index("flink-es")
  28. .source(map);
  29. // 向index发送请求
  30. requestIndexer.add(indexRequest);
  31. }
  32. });
  33. // 3.5 可选,由于默认是批处理形式,因此需要设置批处理刷新阈值为1,有一条写一条
  34. esSinkBuilder.setBulkFlushMaxActions(1);
  35. // 3.6 绑定source和sink
  36. source.addSink(esSinkBuilder.build());
  37. // 3.7 执行job
  38. env.execute("flink-es");
  39. }
  40. }