1. Spring Data
目的:简化操作,实际就是将对索引、文档等的操作映射成对象进行操作
1.1 版本对照
目前最新 springboot 对应 Elasticsearch7.6.2,Spring boot2.3.x 一般可以兼容 Elasticsearch7.x
1.2 集成
1.2.1 依赖
<!-- 继承Spring-boot-starter-parent --><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.6.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- spring-data-es的startrer --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><!-- devtools--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!--spring test的starter--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--spring boot test--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId></dependency><!--junit--><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><!-- spring test --><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId></dependency></dependencies>
1.2.2 整合
1.2.2.1 配置文件
# 集群环境及访问方式elasticsearch:nodes: hadoop101:9200,hadoop102:9200,hadoop103:9200schema: http
1.2.2.2 启动程序
/*** @author JShawn 2021/8/10 13:05*/@SpringBootApplicationpublic class SpringDataElasticSearchMainApplication {public static void main(String[] args) {SpringApplication.run(SpringDataElasticSearchMainApplication.class, args);}}
1.2.2.3 实体类
实体类需要配置mapping映射关系
@Id—- 指定**_id**@Document—- 指定**索引的名称、分片、副本**等信息@Field—- 指定**字段的数据类型、分词器类型、是否索引、是否单独存储**等信息
/*** @Id* 必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"* @FiledType:* type : 字段数据类型* analyzer : 分词器类型* index : 是否索引(默认:true)* Keyword : 短语,不进行分词* store:是否单独存储*/@Data@NoArgsConstructor@AllArgsConstructor@ToString@Document(indexName = "product", shards = 3, replicas = 1)public class Product {/*** 商品唯一标识*/@Idprivate Long id;/*** 商品名称*/@Field(type = FieldType.Text, analyzer = "ik_max_word")private String title;/*** 分类名称*/@Field(type = FieldType.Keyword)private String category;/*** 商品价格*/@Field(type = FieldType.Double)private Double price;/*** 图片地址*/@Field(type = FieldType.Keyword, index = false)private String images;}
1.2.2.4 配置类
ElasticsearchRestTemplate 是 spring-data-elasticsearch 项目中的一个类,和其他 spring 项目中的 template类似。
在新版的 spring-data-elasticsearch 中,ElasticsearchRestTemplate 代替了原来的 ElasticsearchTemplate。原因是 ElasticsearchTemplate 基于 TransportClient,TransportClient 即将在 8.x 以后的版本中移除。所以,推荐使用 ElasticsearchRestTemplate。 ElasticsearchRestTemplate 基 于 RestHighLevelClient 客户端的。需要自定义配置类,继承
**AbstractElasticsearchConfiguration**,并实现**elasticsearchClient()**抽象方法,创建 RestHighLevelClient 对象。
/*** @author JShawn 2021/8/10 13:07*/@ConfigurationProperties(prefix = "elasticsearch")@Configuration@Datapublic class ElasticsearchConfig extends AbstractElasticsearchConfiguration {/*** 集群地址*/private String nodes;/*** 访问方式*/private String schema;@Overridepublic RestHighLevelClient elasticsearchClient() {// 1. 获取ES客户端// 1. ES集群地址配置String[] clusters = nodes.split(",");HttpHost[] hosts = new HttpHost[clusters.length];for (int i = 0; i < clusters.length; i++) {String[] address = clusters[i].split(":");HttpHost httpHost = new HttpHost(address[0], Integer.parseInt(address[1]), schema);hosts[i] = httpHost;}// 2. 创建客户端RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(hosts));return restHighLevelClient;}}
1.2.2.5 数据访问层
跟其他SpringData整合一样,继承一个Repository接口
/*** @author JShawn 2021/8/10 13:19*/@Repositorypublic interface ProductDao extends ElasticsearchRepository<Product,Long> {}
1.2.2.6 索引CRUD
/*** 索引CRUD测试** @author JShawn 2021/8/10 17:15*/@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringDataESIndexTest {/*** 注入 ElasticsearchRestTemplate*/@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;/*** 1. 创建索引并增加映射配置(映射已在实体类中进行配置)* 在初始化ElasticsearchRestTemplate时,也要初始化ProductDao* 因此在读取Product类时,如果发现此索引没被创建,会自动创建*/@Testpublic void createIndex(){//创建索引,系统初始化会自动创建索引System.out.println("创建索引");}/*** 2. 删除索引*/@Testpublic void deleteIndex(){//创建索引,系统初始化会自动创建索引boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);System.out.println("删除索引 = " + flg);}}
1.2.2.7 文档CRUD
/*** 文档CRUD测试** @author JShawn 2021/8/10 17:23*/@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringDataESProductDaoTest {@Autowiredprivate ProductDao productDao;/*** 1. 新增文档* save()*/@Testpublic void save(){Product product = new Product();product.setId(2L);product.setTitle("华为手机");product.setCategory("手机");product.setPrice(2999.0);product.setImages("http://www.jshawn/hw.jpg");productDao.save(product);}/*** 2. 修改文档* 也是save(),存在id对应的记录就修改,没有就新增*/@Testpublic void update(){Product product = new Product();product.setId(1L);product.setTitle("小米 2 手机");product.setCategory("手机");product.setPrice(9999.0);product.setImages("http://www.jshawn/xm.jpg");productDao.save(product);}/*** 3. 根据 id 查询文档*/@Testpublic void findById(){Product product = productDao.findById(1L).get();System.out.println(product);}/*** 4. 查询所有文档*/@Testpublic void findAll(){Iterable<Product> products = productDao.findAll();for (Product product : products) {System.out.println(product);}}/*** 5. 删除*/@Testpublic void delete(){Product product = new Product();product.setId(2L);productDao.delete(product);}/*** 6. 批量新增*/@Testpublic void saveAll(){List<Product> productList = new ArrayList<>();for (int i = 0; i < 10; i++) {Product product = new Product();product.setId(Long.valueOf(i));product.setTitle("["+i+"]小米手机");product.setCategory("手机");product.setPrice(1999.0+i);product.setImages("http://www.atguigu/xm.jpg");productList.add(product);}productDao.saveAll(productList);}/*** 7. 分页查询*/@Testpublic void findByPageable(){// 设置排序(排序方式,正序还是倒序,排序的 id)Sort sort = Sort.by(Sort.Direction.DESC,"id");// 当前页,特殊:******第一页从 0 开始,1 表示第二页******int currentPage=0;// 每页显示多少条int pageSize = 5;// 设置查询分页PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);// 分页查询Page<Product> productPage = productDao.findAll(pageRequest);for (Product Product : productPage.getContent()) {System.out.println(Product);}}}
1.2.2.8 高级搜索
更多操作看Bos中的SpringData整合ES或Java API
/*** 文档搜索测试------更多操作看Bos中的SpringData整合ES或Java API** @author JShawn 2021/8/12 0:47*/@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringDataESSearchTest {@Autowiredprivate ProductDao productDao;/*** 1. term 查询* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象*/@Testpublic void termQuery(){TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");Iterable<Product> products = productDao.search(termQueryBuilder);for (Product product : products) {System.out.println(product);}}/*** 2. term 查询加分页*/@Testpublic void termQueryByPage(){int currentPage= 0 ;int pageSize = 5;//设置查询分页PageRequest pageRequest = PageRequest.of(currentPage, pageSize);TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");Iterable<Product> products = productDao.search(termQueryBuilder,pageRequest);for (Product product : products) {System.out.println(product);}}}
2. Spark Streaming
前提:需要Scala环境
2.1 依赖
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- Spark --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><!-- ElasticSearch --><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch 的高级客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch 依赖 2.x 的 log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!-- <dependency>--><!-- <groupId>com.fasterxml.jackson.core</groupId>--><!-- <artifactId>jackson-databind</artifactId>--><!-- <version>2.11.1</version>--><!-- </dependency>--><!-- <!– junit 单元测试 –>--><!-- <dependency>--><!-- <groupId>junit</groupId>--><!-- <artifactId>junit</artifactId>--><!-- <version>4.12</version>--><!-- </dependency>--></dependencies>
2.2 集成
object SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {println("*************** " + new Date())rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));// 新增文档 - 请求对象val request = new IndexRequest();// 设置索引及唯一性标识val ss = data.split(" ")println("ss = " + ss.mkString(","))request.index("sparkstreaming").id(ss(0));val productJson =s"""| { "data":"${ss(1)}" }|""".stripMargin;// 添加文档数据,数据格式为 JSON 格式request.source(productJson,XContentType.JSON);// 客户端发送请求,获取响应对象val response = client.index(request, RequestOptions.DEFAULT);System.out.println("_index:" + response.getIndex());System.out.println("_id:" + response.getId());System.out.println("_result:" + response.getResult());client.close()})})ssc.start()ssc.awaitTermination()}}
3. Flink
3.1 依赖
<!-- Flink集成ES所需包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>1.12.0</version></dependency><!-- Flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency>
3.2 集成
/*** Flink 集成 ES** @author JShawn 2021/8/17 22:02*/public class FlinkElasticSearchTest {public static void main(String[] args) throws Exception {//1. 构建Flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. sourceDataStream<String> source = env.socketTextStream("localhost", 7777);// 3.sink// 3.1 配置ES集群ArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("hadoop101", 9200));httpHosts.add(new HttpHost("hadoop102", 9200));httpHosts.add(new HttpHost("hadoop103", 9200));// 3.2 构建ES的SinkElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder(httpHosts, new ElasticsearchSinkFunction<String>() {@Overridepublic void process(String source, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {// 3.3 封装数据HashMap<String, String> map = Maps.newHashMap();map.put("data", source);// 创建请求,作为向ES发起的写入命令IndexRequest indexRequest = Requests.indexRequest().index("flink-es").source(map);// 向index发送请求requestIndexer.add(indexRequest);}});// 3.5 可选,由于默认是批处理形式,因此需要设置批处理刷新阈值为1,有一条写一条esSinkBuilder.setBulkFlushMaxActions(1);// 3.6 绑定source和sinksource.addSink(esSinkBuilder.build());// 3.7 执行jobenv.execute("flink-es");}}
