ES
Input
| clean_run |
是否应该保留以前的运行状态 |
|
| columns_charset |
特定列的字符编码。此选项将覆盖指定列的选项 |
columns_charset => { “column0” => “ISO-8859-1” } |
| connection_retry_attempts |
尝试连接到数据库的最大次数(默认1) |
|
| jdbc_driver_class |
|
org.postgresql.Driver |
| jdbc_driver_library |
JDBC驱动 |
D:/xx.jar |
| jdbc_connection_string |
|
jdbc:xxx?characterEncoding=utf8&serverTimezone=UTC |
| jdbc_user |
|
postgres |
| jdbc_password |
|
|
| statement |
|
SELECT * from add_bridge where id = 1 |
| parameters |
查询参数的哈希 |
{ “target_id” => “321” } |
| schedule |
设置监听间隔 各字段含义(从左至右)分、时、天、月、年,全为*默认含义为每分钟都更新 |
schedule => “10 “ (10分钟执行一次) |
| lowercase_column_names |
是否强制使用标识符字段的小写,默认 true |
|
| statement |
要使用参数,请使用命名参数语法 |
“SELECT * FROM xx WHERE id = :target_id” |
| jdbc_default_timezone |
|
Asia/Shanghai |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input { jdbc { jdbc_driver_library => "D:/code/elasticsearch/logstash-7.11.1-windows-x86_64/logstash-7.11.1/test/postgresql-42.2.19.jar" jdbc_driver_class => "org.postgresql.Driver" jdbc_connection_string => "jdbc:postgresql://localhost:5432/jtstest?characterEncoding=utf8&serverTimezone=GMT%2B8" jdbc_user => "postgres" jdbc_password => "953598751" schedule => "* * * * *" # statement => "SELECT * from logstash_text where times > :sql_last_value and times < now()" statement => "SELECT * from logstash_text where times > :sql_last_value and times < now()" tracking_column => "times" id => "logstash_text" use_column_value => true tracking_column_type => "timestamp" jdbc_default_timezone =>"Asia/Shanghai" type => "type1" } jdbc { jdbc_driver_library => "D:/code/elasticsearch/logstash-7.11.1-windows-x86_64/logstash-7.11.1/test/postgresql-42.2.19.jar" jdbc_driver_class => "org.postgresql.Driver" jdbc_connection_string => "jdbc:postgresql://localhost:5432/jtstest?characterEncoding=utf8&serverTimezone=GMT%2B8" jdbc_user => "postgres" jdbc_password => "953598751" schedule => "* * * * *" statement => "SELECT * from test_test where times > :sql_last_value and times < now()" tracking_column => "times" id => "test_test" use_column_value => true tracking_column_type => "timestamp" jdbc_default_timezone =>"Asia/Shanghai" type => "type2" }}filter { json { source => "message" remove_field => ["message"] }}output { if[type] == "type1" { elasticsearch { hosts => ["localhost:9200"] index => "index_logstash_text" id => "id_index_logstash_text" document_id => "%{id}" } } if[type] == "type2" { elasticsearch { hosts => ["localhost:9200"] index => "index_test_test" id => "id_index_test_test" document_id => "%{id}" } } stdout { # codec => json_lines codec => rubydebug }}
自定义分析器
PUT /my_index{ "settings": { "number_of_shards": "2", "number_of_replicas": "1", "analysis": { "analyzer": { "ik_smart_pinyin": { "type": "custom", "tokenizer": "ik_smart", "filter": [ "my_pinyin", "word_delimiter" ] }, "ik_max_word_pinyin": { "type": "custom", "tokenizer": "ik_max_word", "filter": [ "my_pinyin", "word_delimiter" ] } }, "filter": { "my_pinyin": { "type": "pinyin", "keep_separate_first_letter": true, "keep_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "lowercase": true, "remove_duplicated_term": true } } } }, "mappings": { "_source": { "enabled": true }, "properties": { "mmsi": { "type": "text" }, "name": { "analyzer": "ik_max_word_pinyin", "type": "text" }, "pinyin": { "type": "text" } } }}
自定义模板
############################### template ##################{ "index_patterns": "index_*", "order": 1000, "settings": { "number_of_shards": "2", "number_of_replicas": "1" }, "mappings": { "_source": { "enabled": true }, "properties": { "mmsi": { "type": "text" }, "name": { "type": "text" }, "k_name":{ "type":"keyword" }, "address":{ "type":"text" }, "p_name":{ "type":"text" } } }, "aliases": {}}############################### template ##################input { jdbc { jdbc_driver_library => "D:/elk/logstash-7.11.1/test/postgresql-42.2.19.jar" jdbc_driver_class => "org.postgresql.Driver" jdbc_connection_string => "jdbc:postgresql://localhost:5432/waterway?characterEncoding=utf8&serverTimezone=GMT%2B8" jdbc_user => "postgres" jdbc_password => "953598751" schedule => "*/20 * * * * *" statement => "select id,st_astext(ST_Force2D(geom)) locations,name,pinyin p_name from add_anchor" tracking_column => "id" tracking_column_type => "numeric" id => "add_padd_anchorort" use_column_value => true jdbc_default_timezone =>"Asia/Shanghai" type => "add_anchor" }}filter { mutate { add_field => { "k_name" => "%{name}" } } mutate { remove_field => ["@version","@timestamp"] }}output { elasticsearch { hosts => ["121.196.158.48:9200"] index => "index_add_anchor" document_id => "%{id}" template =>"D:/elk/logstash-7.11.1/test/template/hd.json" template_name => "index_my" template_overwrite => true # user => "elastic" # password => "953598751" }}
JavaUtil
@Componentpublic class EsUtil { private static RestHighLevelClient client; private EsUtil(@Autowired RestHighLevelClient restHighLevelClient){ client = restHighLevelClient; } /** * 创建索引 * @param indexName 索引名称 */ public static boolean createIndex(String indexName) throws IOException { GetIndexRequest getRequest = new GetIndexRequest(indexName); if(client.indices().exists(getRequest, RequestOptions.DEFAULT)){ System.out.println("索引已经存在"); return false; } if(client.indices().create(new CreateIndexRequest(indexName),RequestOptions.DEFAULT).isAcknowledged()){ System.out.println("索引创建成功"); return true; } return false; } /** * 判断所以是否已经存在 * @param indexName 索引名称 */ public static boolean indexExists(String indexName){ GetIndexRequest getRequest = new GetIndexRequest(indexName); try { if(client.indices().exists(getRequest, RequestOptions.DEFAULT)){ System.out.println("索引已经存在"); return false; } } catch (IOException e) { e.printStackTrace(); } return true; } /** * 创建索引,对中文字段进行 ik+pinyin 分词 * @param indexName 索引名称 */ public static boolean createIndexIkPinyin(String indexName){ CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); // ik + pinyin 分词 createIndexRequest.settings("{\n" + " \"number_of_shards\": \"2\",\n" + " \"number_of_replicas\": \"1\",\n" + " \"analysis\": {\n" + " \"analyzer\": {\n" + " \"ik_max_word_pinyin\": {\n" + " \"type\": \"custom\",\n" + " \"tokenizer\": \"ik_max_word\",\n" + " \"filter\": [\n" + " \"my_pinyin\",\n" + " \"word_delimiter_graph\"\n" + " ]\n" + " }\n" + " },\n" + " \"filter\": {\n" + " \"my_pinyin\": {\n" + " \"type\": \"pinyin\",\n" + " \"keep_separate_first_letter\": true,\n" + " \"keep_full_pinyin\": true,\n" + " \"keep_original\": true,\n" + " \"limit_first_letter_length\": 16,\n" + " \"lowercase\": true,\n" + " \"remove_duplicated_term\": true\n" + " }\n" + " }\n" + " }\n" + " }",XContentType.JSON); createIndexRequest.mapping("{\n" + " \"_source\": {\n" + " \"enabled\": true\n" + " },\n" + " \"properties\": {\n" + " \"mmsi\": {\n" + " \"type\": \"text\"\n" + " },\n" + " \"name\": {\n" + " \"analyzer\": \"ik_max_word_pinyin\",\n" + " \"type\": \"text\"\n" + " },\n" + " \"pinyin\": {\n" + " \"type\": \"text\"\n" + " }\n" + " }\n" + " }",XContentType.JSON); CreateIndexResponse createIndexResponse = null; try { createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } assert createIndexResponse != null; return createIndexResponse.isAcknowledged(); } /** * 删除索引 * @param indexName 索引名称 */ public static boolean deleteIndex(String indexName){ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); AcknowledgedResponse delete = null; try { delete = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } assert delete != null; return delete.isAcknowledged(); } /** * 根据索引和 id 删除一条数据 * @param indexName 索引名称 * @param id 文档 id */ public static boolean deleteIndexId(String indexName,String id){ DeleteRequest request = new DeleteRequest(indexName, id); DeleteResponse delete1 = null; try { delete1 = client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } assert delete1 != null; return delete1.getResult() == DocWriteResponse.Result.DELETED; } /** * 根据索引和 id 删除一条数据 * @param indexName 索引 * @param ids 文档 id */ public static boolean bulkDeleteIndexId(String indexName, Set<String> ids) { BulkRequest bulkRequest = new BulkRequest(); for(String id : ids){ bulkRequest.add(new DeleteRequest(indexName).id(id)); } BulkResponse bulk = null; try { bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } assert bulk != null; return !bulk.hasFailures(); } /** * 插入数据 * @param indexName 索引 * @param id 文档 id * @param data 数据,JSON格式 */ public static void insertData(String indexName, String id,String data) { IndexRequest indexRequest = new IndexRequest(indexName); indexRequest.id(id).source(data, XContentType.JSON); try { client.index(indexRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } /** * 批量插入数据 * @param indexName 索引 * @param data 数据(json)-- id,source */ public static boolean bulkInsertData(String indexName, Map<String,String> data){ BulkRequest bulkRequest = new BulkRequest(); for(Map.Entry<String,String> entry : data.entrySet()){ bulkRequest.add(new IndexRequest(indexName).id(entry.getKey()).source(entry.getValue(),XContentType.JSON)); } BulkResponse bulk = null; try { bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } assert bulk != null; return !bulk.hasFailures(); } /** * 查询数据 * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.x/java-rest-high-search.html * @param indexName 索引 */ public static Object search(String indexName) throws IOException { SearchRequest searchRequest = new SearchRequest(indexName); // 查询条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); searchSourceBuilder.from(0); searchSourceBuilder.size(10); searchSourceBuilder.timeout(new TimeValue(3, TimeUnit.SECONDS));// // 按 _score 降序排序(默认)// searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));// // 按 _id 字段升序排序// searchSourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.ASC)); // 设置返回结果中包含或排除那些字段// String[] includeFields = new String[] {"title", "innerObject.*"};// String[] excludeFields = new String[] {"user"};// searchSourceBuilder.fetchSource(includeFields, excludeFields); searchRequest.source(searchSourceBuilder); SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = search.getHits(); TotalHits totalHits = hits.getTotalHits(); System.out.println(totalHits.value); System.out.println(hits.getMaxScore()); SortField[] sortFields = hits.getSortFields(); SearchHit[] hits1 = hits.getHits(); for(SearchHit hit : hits1){ System.out.println(hit.getIndex()); System.out.println(hit.getId()); } return ""; }}