1、Spring Data框架集成

1.1、Spring Data框架介绍

Spring Data 是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。

其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce 框架和云计算数据服务。

Spring Data 可以极大的简化 JPA(Elasticsearch„)的写法,可以在几乎不用写实现的情况下,实现对数据的访问和操作。除了 CRUD 外,还包括如分页、排序等一些常用的功能。

Spring Data 的官网:https://spring.io/projects/spring-data

七、Elasticsearch集成 - 图1

Spring Data 常用的功能模块如下:

七、Elasticsearch集成 - 图2

1.2、Spring Data Elasticsearch 介绍

Spring Data Elasticsearch 基于 spring data API 简化 Elasticsearch 操作,将原始操作Elasticsearch 的客户端 API 进行封装 。

Spring Data 为 Elasticsearch 项目提供集成搜索引擎。

Spring Data Elasticsearch POJO 的关键功能区域为中心的模型与 Elastichsearch 交互文档和轻松地编写一个存储索引库数据访问层。

官方网站: https://spring.io/projects/spring-data-elasticsearch

七、Elasticsearch集成 - 图3

1.3、Spring Data Elasticsearch 版本对比

七、Elasticsearch集成 - 图4

Spring boot2.3.x 一般可以兼容 Elasticsearch7.x

1.4、框架集成

  1. 创建 Maven 项目

七、Elasticsearch集成 - 图5

  1. 修改 pom 文件,增加依赖关系
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.atguigu.es</groupId>
  7. <artifactId>es-spring</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <parent>
  10. <artifactId>spring-boot-starter-parent</artifactId>
  11. <groupId>org.springframework.boot</groupId>
  12. <version>2.3.6.RELEASE</version>
  13. </parent>
  14. <properties>
  15. <maven.compiler.source>8</maven.compiler.source>
  16. <maven.compiler.target>8</maven.compiler.target>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>org.projectlombok</groupId>
  21. <artifactId>lombok</artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-devtools</artifactId>
  30. <scope>runtime</scope>
  31. <optional>true</optional>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-starter-test</artifactId>
  36. <scope>test</scope>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-test</artifactId>
  41. </dependency>
  42. <dependency>
  43. <groupId>junit</groupId>
  44. <artifactId>junit</artifactId>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.springframework</groupId>
  48. <artifactId>spring-test</artifactId>
  49. </dependency>
  50. </dependencies>
  51. </project>
  1. 增加配置文件
    在 resources 目录中增加 application.properties 文件
  1. # es 服务地址
  2. elasticsearch.host=127.0.0.1
  3. # es 服务端口
  4. elasticsearch.port=9200
  5. # 配置日志级别,开启 debug 日志
  6. logging.level.com.atguigu.es=debug
  1. SpringBoot 主程序
  1. package com.atguigu.es;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class SpringDataElasticSearchMainApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(SpringDataElasticSearchMainApplication.class, args);
  8. }
  9. }
  1. 配置类
  • ElasticsearchRestTemplate 是 spring-data-elasticsearch 项目中的一个类,和其他 spring 项目中的 template类似。
  • 在新版的 spring-data-elasticsearch 中,ElasticsearchRestTemplate 代替了原来的 ElasticsearchTemplate。
  • 原因是 ElasticsearchTemplate 基于 TransportClient,TransportClient 即将在 8.x 以后的版本中移除。所以,我们推荐使用 ElasticsearchRestTemplate。
  • ElasticsearchRestTemplate 基 于 RestHighLevelClient 客 户 端 的 。 需 要 自 定 义 配 置 类 , 继 承
    AbstractElasticsearchConfiguration,并实现 elasticsearchClient()抽象方法,创建 RestHighLevelClient 对象。
  1. package com.atguigu.es.config;
  2. import lombok.Data;
  3. import org.apache.http.HttpHost;
  4. import org.elasticsearch.client.RestClient;
  5. import org.elasticsearch.client.RestClientBuilder;
  6. import org.elasticsearch.client.RestHighLevelClient;
  7. import org.springframework.boot.context.properties.ConfigurationProperties;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
  10. @ConfigurationProperties(prefix = "elasticsearch")
  11. @Configuration
  12. @Data
  13. public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
  14. private String host;
  15. private Integer port;
  16. @Override
  17. public RestHighLevelClient elasticsearchClient() {
  18. RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
  19. RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
  20. return restHighLevelClient;
  21. }
  22. }
  1. 数据实体类
  1. package com.atguigu.es;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import lombok.ToString;
  6. @Data
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. @ToString
  10. public class product {
  11. private Long id;//商品唯一标识
  12. private String title;//商品名称
  13. private String category;//分类名称
  14. private Double price;//商品价格
  15. private String images;//图片地址
  16. }
  1. DAO 数据访问对象
  1. package com.atguigu.es;
  2. import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
  3. import org.springframework.stereotype.Repository;
  4. @Repository
  5. public interface ProductDao extends ElasticsearchRepository<Product, Long> {
  6. }
  1. 实体类映射操作
  1. package com.atguigu.es;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import lombok.ToString;
  6. import org.springframework.data.annotation.Id;
  7. import org.springframework.data.elasticsearch.annotations.Document;
  8. import org.springframework.data.elasticsearch.annotations.Field;
  9. import org.springframework.data.elasticsearch.annotations.FieldType;
  10. @Data
  11. @NoArgsConstructor
  12. @AllArgsConstructor
  13. @ToString
  14. @Document(indexName = "product", shards = 3, replicas = 1)
  15. public class Product {
  16. //必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
  17. @Id
  18. private Long id;//商品唯一标识
  19. /**
  20. * type : 字段数据类型
  21. * analyzer : 分词器类型
  22. * index : 是否索引(默认:true)
  23. * Keyword : 短语,不进行分词
  24. */
  25. @Field(type = FieldType.Text, analyzer = "ik_max_word")
  26. private String title;//商品名称
  27. @Field(type = FieldType.Keyword)
  28. private String category;//分类名称
  29. @Field(type = FieldType.Double)
  30. private Double price;//商品价格
  31. @Field(type = FieldType.Keyword, index = false)
  32. private String images;//图片地址
  33. }

1.5、索引操作

  1. package com.atguigu.es;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. @RunWith(SpringRunner.class)
  9. @SpringBootTest
  10. public class SpringDataESIndexTest {
  11. //注入 ElasticsearchRestTemplate
  12. @Autowired
  13. private ElasticsearchRestTemplate elasticsearchRestTemplate;
  14. //创建索引并增加映射配置
  15. @Test
  16. public void createIndex() {
  17. //创建索引,系统初始化会自动创建索引
  18. System.out.println("创建索引");
  19. }
  20. @Test
  21. public void deleteIndex() {
  22. //创建索引,系统初始化会自动创建索引
  23. boolean flag = elasticsearchRestTemplate.deleteIndex(Product.class);
  24. System.out.println("删除索引 = " + flag);
  25. }
  26. }

1.6、文档操作

  1. package com.atguigu.es.test;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. @RunWith(SpringRunner.class)
  8. @SpringBootTest
  9. public class SpringDataESProductDaoTest {
  10. @Autowired
  11. private ProductDao productDao;
  12. /**
  13. * 新增
  14. */
  15. @Test
  16. public void save() {
  17. Product product = new Product();
  18. product.setId(2L);
  19. product.setTitle("华为手机");
  20. product.setCategory("手机");
  21. product.setPrice(2999.0);
  22. product.setImages("http://www.atguigu/hw.jpg");
  23. productDao.save(product);
  24. }
  25. }

七、Elasticsearch集成 - 图6

  1. //修改
  2. @Test
  3. public void update(){
  4. Product product = new Product();
  5. product.setId(2L);
  6. product.setTitle("小米手机");
  7. product.setCategory("手机");
  8. product.setPrice(9999.0);
  9. product.setImages("http://www.atguigu/xm.jpg");
  10. productDao.save(product);
  11. }

七、Elasticsearch集成 - 图7

  1. //根据 id 查询
  2. @Test
  3. public void findById() {
  4. Product product = productDao.findById(2L).get();
  5. System.out.println(product);
  6. }

七、Elasticsearch集成 - 图8

  1. //查询所有
  2. @Test
  3. public void findAll() {
  4. Iterable<Product> products = productDao.findAll();
  5. for (Product product : products) {
  6. System.out.println(product);
  7. }
  8. }
  1. //删除
  2. @Test
  3. public void delete() {
  4. Product product = new Product();
  5. product.setId(2L);
  6. productDao.delete(product);
  7. }
  1. //批量新增
  2. @Test
  3. public void saveAll() {
  4. List<Product> productList = new ArrayList<>();
  5. for (int i = 0; i < 10; i++) {
  6. Product product = new Product();
  7. product.setId(Long.valueOf(i));
  8. product.setTitle("[" + i + "]小米手机");
  9. product.setCategory("手机");
  10. product.setPrice(1999.0 + i);
  11. product.setImages("http://www.atguigu/xm.jpg");
  12. productList.add(product);
  13. }
  14. productDao.saveAll(productList);
  15. }
  1. //分页查询
  2. @Test
  3. public void findByPageable() {
  4. //设置排序(排序方式,正序还是倒序,排序的 id)
  5. Sort sort = Sort.by(Sort.Direction.DESC, "id");
  6. int currentPage = 0;//当前页,第一页从 0 开始,1 表示第二页
  7. int pageSize = 5;//每页显示多少条
  8. //设置查询分页
  9. PageRequest pageRequest = PageRequest.of(currentPage, pageSize, sort);
  10. //分页查询
  11. Page<Product> productPage = productDao.findAll(pageRequest);
  12. for (Product Product : productPage.getContent()) {
  13. System.out.println(Product);
  14. }
  15. }

1.7、文档搜索

  1. package com.atguigu.es.test;
  2. import org.elasticsearch.index.query.QueryBuilders;
  3. import org.elasticsearch.index.query.TermQueryBuilder;
  4. import org.junit.Test;
  5. import org.junit.runner.RunWith;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.data.domain.PageRequest;
  9. import org.springframework.test.context.junit4.SpringRunner;
  10. @RunWith(SpringRunner.class)
  11. @SpringBootTest
  12. public class SpringDataESSearchTest {
  13. @Autowired
  14. private ProductDao productDao;
  15. /**
  16. * term 查询
  17. * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
  18. */
  19. @Test
  20. public void termQuery() {
  21. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
  22. Iterable<Product> products = productDao.search(termQueryBuilder);
  23. for (Product product : products) {
  24. System.out.println(product);
  25. }
  26. }
  27. /**
  28. * term 查询加分页
  29. */
  30. @Test
  31. public void termQueryByPage() {
  32. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
  33. //设置查询分页
  34. int currentPage = 0;
  35. int pageSize = 5;
  36. PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
  37. Iterable<Product> products = productDao.search(termQueryBuilder, pageRequest);
  38. for (Product product : products) {
  39. System.out.println(product);
  40. }
  41. }
  42. }

2、 Spark Streaming 框架集成

2.1 Spark Streaming 框架介绍

Spark Streaming 是 Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。

数据可以从许多来源获取,如 Kafka,Flume,Kinesis 或 TCP sockets,
并且可以使用复杂的算法进行处理,这些算法使用诸如 map,reduce,join 和 window 等高级函数表示。

最后,处理后的数据可以推送到文件系统,数据库等。

实际上,您可以将Spark 的机器学习和图形处理算法应用于数据流。

七、Elasticsearch集成 - 图9

3、Flink框架集成

3.1、 Flink 框架介绍

七、Elasticsearch集成 - 图10

Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。

但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没有完全解决,而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而凸显的更加明显:

  • 数据精准一次性处理(Exactly-Once)
  • 乱序数据,迟到数据
  • 低延迟,高吞吐,准确性
  • 容错性

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在Spark 火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。

慢慢地,随着这些问题的解决,Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公司在 2015 年改进 Flink,并创建了内部分支 Blink,目前服务于阿里集团内部搜索、推荐、广告和蚂蚁等大量核心实时业务。