ES

Input

clean_run 是否应该保留以前的运行状态
columns_charset 特定列的字符编码。此选项将覆盖指定列的选项 columns_charset => { “column0” => “ISO-8859-1” }
connection_retry_attempts 尝试连接到数据库的最大次数(默认1)
jdbc_driver_class org.postgresql.Driver
jdbc_driver_library JDBC驱动 D:/xx.jar
jdbc_connection_string jdbc:xxx?characterEncoding=utf8&serverTimezone=UTC
jdbc_user postgres
jdbc_password
statement SELECT * from add_bridge where id = 1
parameters 查询参数的哈希 { “target_id” => “321” }
schedule 设置监听间隔 各字段含义(从左至右)分、时、天、月、年,全为*默认含义为每分钟都更新 schedule => “10 “ (10分钟执行一次)
lowercase_column_names 是否强制使用标识符字段的小写,默认 true
statement 要使用参数,请使用命名参数语法 “SELECT * FROM xx WHERE id = :target_id”
jdbc_default_timezone Asia/Shanghai
  1. input {
  2. jdbc {
  3. jdbc_driver_library => "D:/code/elasticsearch/logstash-7.11.1-windows-x86_64/logstash-7.11.1/test/postgresql-42.2.19.jar"
  4. jdbc_driver_class => "org.postgresql.Driver"
  5. jdbc_connection_string => "jdbc:postgresql://localhost:5432/jtstest?characterEncoding=utf8&serverTimezone=GMT%2B8"
  6. jdbc_user => "postgres"
  7. jdbc_password => "953598751"
  8. schedule => "* * * * *"
  9. # statement => "SELECT * from logstash_text where times > :sql_last_value and times < now()"
  10. statement => "SELECT * from logstash_text where times > :sql_last_value and times < now()"
  11. tracking_column => "times"
  12. id => "logstash_text"
  13. use_column_value => true
  14. tracking_column_type => "timestamp"
  15. jdbc_default_timezone =>"Asia/Shanghai"
  16. type => "type1"
  17. }
  18. jdbc {
  19. jdbc_driver_library => "D:/code/elasticsearch/logstash-7.11.1-windows-x86_64/logstash-7.11.1/test/postgresql-42.2.19.jar"
  20. jdbc_driver_class => "org.postgresql.Driver"
  21. jdbc_connection_string => "jdbc:postgresql://localhost:5432/jtstest?characterEncoding=utf8&serverTimezone=GMT%2B8"
  22. jdbc_user => "postgres"
  23. jdbc_password => "953598751"
  24. schedule => "* * * * *"
  25. statement => "SELECT * from test_test where times > :sql_last_value and times < now()"
  26. tracking_column => "times"
  27. id => "test_test"
  28. use_column_value => true
  29. tracking_column_type => "timestamp"
  30. jdbc_default_timezone =>"Asia/Shanghai"
  31. type => "type2"
  32. }
  33. }
  34. filter {
  35. json {
  36. source => "message"
  37. remove_field => ["message"]
  38. }
  39. }
  40. output {
  41. if[type] == "type1" {
  42. elasticsearch {
  43. hosts => ["localhost:9200"]
  44. index => "index_logstash_text"
  45. id => "id_index_logstash_text"
  46. document_id => "%{id}"
  47. }
  48. }
  49. if[type] == "type2" {
  50. elasticsearch {
  51. hosts => ["localhost:9200"]
  52. index => "index_test_test"
  53. id => "id_index_test_test"
  54. document_id => "%{id}"
  55. }
  56. }
  57. stdout {
  58. # codec => json_lines
  59. codec => rubydebug
  60. }
  61. }

自定义分析器

  1. PUT /my_index
  2. {
  3. "settings": {
  4. "number_of_shards": "2",
  5. "number_of_replicas": "1",
  6. "analysis": {
  7. "analyzer": {
  8. "ik_smart_pinyin": {
  9. "type": "custom",
  10. "tokenizer": "ik_smart",
  11. "filter": [
  12. "my_pinyin",
  13. "word_delimiter"
  14. ]
  15. },
  16. "ik_max_word_pinyin": {
  17. "type": "custom",
  18. "tokenizer": "ik_max_word",
  19. "filter": [
  20. "my_pinyin",
  21. "word_delimiter"
  22. ]
  23. }
  24. },
  25. "filter": {
  26. "my_pinyin": {
  27. "type": "pinyin",
  28. "keep_separate_first_letter": true,
  29. "keep_full_pinyin": true,
  30. "keep_original": true,
  31. "limit_first_letter_length": 16,
  32. "lowercase": true,
  33. "remove_duplicated_term": true
  34. }
  35. }
  36. }
  37. },
  38. "mappings": {
  39. "_source": {
  40. "enabled": true
  41. },
  42. "properties": {
  43. "mmsi": {
  44. "type": "text"
  45. },
  46. "name": {
  47. "analyzer": "ik_max_word_pinyin",
  48. "type": "text"
  49. },
  50. "pinyin": {
  51. "type": "text"
  52. }
  53. }
  54. }
  55. }

自定义模板

  1. ############################### template ##################
  2. {
  3. "index_patterns": "index_*",
  4. "order": 1000,
  5. "settings": {
  6. "number_of_shards": "2",
  7. "number_of_replicas": "1"
  8. },
  9. "mappings": {
  10. "_source": {
  11. "enabled": true
  12. },
  13. "properties": {
  14. "mmsi": {
  15. "type": "text"
  16. },
  17. "name": {
  18. "type": "text"
  19. },
  20. "k_name":{
  21. "type":"keyword"
  22. },
  23. "address":{
  24. "type":"text"
  25. },
  26. "p_name":{
  27. "type":"text"
  28. }
  29. }
  30. },
  31. "aliases": {}
  32. }
  33. ############################### template ##################
  34. input {
  35. jdbc {
  36. jdbc_driver_library => "D:/elk/logstash-7.11.1/test/postgresql-42.2.19.jar"
  37. jdbc_driver_class => "org.postgresql.Driver"
  38. jdbc_connection_string => "jdbc:postgresql://localhost:5432/waterway?characterEncoding=utf8&serverTimezone=GMT%2B8"
  39. jdbc_user => "postgres"
  40. jdbc_password => "953598751"
  41. schedule => "*/20 * * * * *"
  42. statement => "select id,st_astext(ST_Force2D(geom)) locations,name,pinyin p_name from add_anchor"
  43. tracking_column => "id"
  44. tracking_column_type => "numeric"
  45. id => "add_padd_anchorort"
  46. use_column_value => true
  47. jdbc_default_timezone =>"Asia/Shanghai"
  48. type => "add_anchor"
  49. }
  50. }
  51. filter {
  52. mutate {
  53. add_field => { "k_name" => "%{name}" }
  54. }
  55. mutate {
  56. remove_field => ["@version","@timestamp"]
  57. }
  58. }
  59. output {
  60. elasticsearch {
  61. hosts => ["121.196.158.48:9200"]
  62. index => "index_add_anchor"
  63. document_id => "%{id}"
  64. template =>"D:/elk/logstash-7.11.1/test/template/hd.json"
  65. template_name => "index_my"
  66. template_overwrite => true
  67. # user => "elastic"
  68. # password => "953598751"
  69. }
  70. }

JavaUtil

  1. @Component
  2. public class EsUtil {
  3. private static RestHighLevelClient client;
  4. private EsUtil(@Autowired RestHighLevelClient restHighLevelClient){
  5. client = restHighLevelClient;
  6. }
  7. /**
  8. * 创建索引
  9. * @param indexName 索引名称
  10. */
  11. public static boolean createIndex(String indexName) throws IOException {
  12. GetIndexRequest getRequest = new GetIndexRequest(indexName);
  13. if(client.indices().exists(getRequest, RequestOptions.DEFAULT)){
  14. System.out.println("索引已经存在");
  15. return false;
  16. }
  17. if(client.indices().create(new CreateIndexRequest(indexName),RequestOptions.DEFAULT).isAcknowledged()){
  18. System.out.println("索引创建成功");
  19. return true;
  20. }
  21. return false;
  22. }
  23. /**
  24. * 判断所以是否已经存在
  25. * @param indexName 索引名称
  26. */
  27. public static boolean indexExists(String indexName){
  28. GetIndexRequest getRequest = new GetIndexRequest(indexName);
  29. try {
  30. if(client.indices().exists(getRequest, RequestOptions.DEFAULT)){
  31. System.out.println("索引已经存在");
  32. return false;
  33. }
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. return true;
  38. }
  39. /**
  40. * 创建索引,对中文字段进行 ik+pinyin 分词
  41. * @param indexName 索引名称
  42. */
  43. public static boolean createIndexIkPinyin(String indexName){
  44. CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
  45. // ik + pinyin 分词
  46. createIndexRequest.settings("{\n" +
  47. " \"number_of_shards\": \"2\",\n" +
  48. " \"number_of_replicas\": \"1\",\n" +
  49. " \"analysis\": {\n" +
  50. " \"analyzer\": {\n" +
  51. " \"ik_max_word_pinyin\": {\n" +
  52. " \"type\": \"custom\",\n" +
  53. " \"tokenizer\": \"ik_max_word\",\n" +
  54. " \"filter\": [\n" +
  55. " \"my_pinyin\",\n" +
  56. " \"word_delimiter_graph\"\n" +
  57. " ]\n" +
  58. " }\n" +
  59. " },\n" +
  60. " \"filter\": {\n" +
  61. " \"my_pinyin\": {\n" +
  62. " \"type\": \"pinyin\",\n" +
  63. " \"keep_separate_first_letter\": true,\n" +
  64. " \"keep_full_pinyin\": true,\n" +
  65. " \"keep_original\": true,\n" +
  66. " \"limit_first_letter_length\": 16,\n" +
  67. " \"lowercase\": true,\n" +
  68. " \"remove_duplicated_term\": true\n" +
  69. " }\n" +
  70. " }\n" +
  71. " }\n" +
  72. " }",XContentType.JSON);
  73. createIndexRequest.mapping("{\n" +
  74. " \"_source\": {\n" +
  75. " \"enabled\": true\n" +
  76. " },\n" +
  77. " \"properties\": {\n" +
  78. " \"mmsi\": {\n" +
  79. " \"type\": \"text\"\n" +
  80. " },\n" +
  81. " \"name\": {\n" +
  82. " \"analyzer\": \"ik_max_word_pinyin\",\n" +
  83. " \"type\": \"text\"\n" +
  84. " },\n" +
  85. " \"pinyin\": {\n" +
  86. " \"type\": \"text\"\n" +
  87. " }\n" +
  88. " }\n" +
  89. " }",XContentType.JSON);
  90. CreateIndexResponse createIndexResponse = null;
  91. try {
  92. createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
  93. } catch (IOException e) {
  94. e.printStackTrace();
  95. }
  96. assert createIndexResponse != null;
  97. return createIndexResponse.isAcknowledged();
  98. }
  99. /**
  100. * 删除索引
  101. * @param indexName 索引名称
  102. */
  103. public static boolean deleteIndex(String indexName){
  104. DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
  105. AcknowledgedResponse delete = null;
  106. try {
  107. delete = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
  108. } catch (IOException e) {
  109. e.printStackTrace();
  110. }
  111. assert delete != null;
  112. return delete.isAcknowledged();
  113. }
  114. /**
  115. * 根据索引和 id 删除一条数据
  116. * @param indexName 索引名称
  117. * @param id 文档 id
  118. */
  119. public static boolean deleteIndexId(String indexName,String id){
  120. DeleteRequest request = new DeleteRequest(indexName, id);
  121. DeleteResponse delete1 = null;
  122. try {
  123. delete1 = client.delete(request, RequestOptions.DEFAULT);
  124. } catch (IOException e) {
  125. e.printStackTrace();
  126. }
  127. assert delete1 != null;
  128. return delete1.getResult() == DocWriteResponse.Result.DELETED;
  129. }
  130. /**
  131. * 根据索引和 id 删除一条数据
  132. * @param indexName 索引
  133. * @param ids 文档 id
  134. */
  135. public static boolean bulkDeleteIndexId(String indexName, Set<String> ids) {
  136. BulkRequest bulkRequest = new BulkRequest();
  137. for(String id : ids){
  138. bulkRequest.add(new DeleteRequest(indexName).id(id));
  139. }
  140. BulkResponse bulk = null;
  141. try {
  142. bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
  143. } catch (IOException e) {
  144. e.printStackTrace();
  145. }
  146. assert bulk != null;
  147. return !bulk.hasFailures();
  148. }
  149. /**
  150. * 插入数据
  151. * @param indexName 索引
  152. * @param id 文档 id
  153. * @param data 数据,JSON格式
  154. */
  155. public static void insertData(String indexName, String id,String data) {
  156. IndexRequest indexRequest = new IndexRequest(indexName);
  157. indexRequest.id(id).source(data, XContentType.JSON);
  158. try {
  159. client.index(indexRequest, RequestOptions.DEFAULT);
  160. } catch (IOException e) {
  161. e.printStackTrace();
  162. }
  163. }
  164. /**
  165. * 批量插入数据
  166. * @param indexName 索引
  167. * @param data 数据(json)-- id,source
  168. */
  169. public static boolean bulkInsertData(String indexName, Map<String,String> data){
  170. BulkRequest bulkRequest = new BulkRequest();
  171. for(Map.Entry<String,String> entry : data.entrySet()){
  172. bulkRequest.add(new IndexRequest(indexName).id(entry.getKey()).source(entry.getValue(),XContentType.JSON));
  173. }
  174. BulkResponse bulk = null;
  175. try {
  176. bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
  177. } catch (IOException e) {
  178. e.printStackTrace();
  179. }
  180. assert bulk != null;
  181. return !bulk.hasFailures();
  182. }
  183. /**
  184. * 查询数据
  185. * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.x/java-rest-high-search.html
  186. * @param indexName 索引
  187. */
  188. public static Object search(String indexName) throws IOException {
  189. SearchRequest searchRequest = new SearchRequest(indexName);
  190. // 查询条件
  191. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  192. searchSourceBuilder.query(QueryBuilders.matchAllQuery());
  193. searchSourceBuilder.from(0);
  194. searchSourceBuilder.size(10);
  195. searchSourceBuilder.timeout(new TimeValue(3, TimeUnit.SECONDS));
  196. // // 按 _score 降序排序(默认)
  197. // searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
  198. // // 按 _id 字段升序排序
  199. // searchSourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.ASC));
  200. // 设置返回结果中包含或排除那些字段
  201. // String[] includeFields = new String[] {"title", "innerObject.*"};
  202. // String[] excludeFields = new String[] {"user"};
  203. // searchSourceBuilder.fetchSource(includeFields, excludeFields);
  204. searchRequest.source(searchSourceBuilder);
  205. SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
  206. SearchHits hits = search.getHits();
  207. TotalHits totalHits = hits.getTotalHits();
  208. System.out.println(totalHits.value);
  209. System.out.println(hits.getMaxScore());
  210. SortField[] sortFields = hits.getSortFields();
  211. SearchHit[] hits1 = hits.getHits();
  212. for(SearchHit hit : hits1){
  213. System.out.println(hit.getIndex());
  214. System.out.println(hit.getId());
  215. }
  216. return "";
  217. }
  218. }