ES
Input
clean_run |
是否应该保留以前的运行状态 |
|
columns_charset |
特定列的字符编码。此选项将覆盖指定列的选项 |
columns_charset => { “column0” => “ISO-8859-1” } |
connection_retry_attempts |
尝试连接到数据库的最大次数(默认1) |
|
jdbc_driver_class |
|
org.postgresql.Driver |
jdbc_driver_library |
JDBC驱动 |
D:/xx.jar |
jdbc_connection_string |
|
jdbc:xxx?characterEncoding=utf8&serverTimezone=UTC |
jdbc_user |
|
postgres |
jdbc_password |
|
|
statement |
|
SELECT * from add_bridge where id = 1 |
parameters |
查询参数的哈希 |
{ “target_id” => “321” } |
schedule |
设置监听间隔 各字段含义(从左至右)分、时、天、月、年,全为*默认含义为每分钟都更新 |
schedule => “10 “ (10分钟执行一次) |
lowercase_column_names |
是否强制使用标识符字段的小写,默认 true |
|
statement |
要使用参数,请使用命名参数语法 |
“SELECT * FROM xx WHERE id = :target_id” |
jdbc_default_timezone |
|
Asia/Shanghai |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input {
jdbc {
jdbc_driver_library => "D:/code/elasticsearch/logstash-7.11.1-windows-x86_64/logstash-7.11.1/test/postgresql-42.2.19.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5432/jtstest?characterEncoding=utf8&serverTimezone=GMT%2B8"
jdbc_user => "postgres"
jdbc_password => "953598751"
schedule => "* * * * *"
# statement => "SELECT * from logstash_text where times > :sql_last_value and times < now()"
statement => "SELECT * from logstash_text where times > :sql_last_value and times < now()"
tracking_column => "times"
id => "logstash_text"
use_column_value => true
tracking_column_type => "timestamp"
jdbc_default_timezone =>"Asia/Shanghai"
type => "type1"
}
jdbc {
jdbc_driver_library => "D:/code/elasticsearch/logstash-7.11.1-windows-x86_64/logstash-7.11.1/test/postgresql-42.2.19.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5432/jtstest?characterEncoding=utf8&serverTimezone=GMT%2B8"
jdbc_user => "postgres"
jdbc_password => "953598751"
schedule => "* * * * *"
statement => "SELECT * from test_test where times > :sql_last_value and times < now()"
tracking_column => "times"
id => "test_test"
use_column_value => true
tracking_column_type => "timestamp"
jdbc_default_timezone =>"Asia/Shanghai"
type => "type2"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if[type] == "type1" {
elasticsearch {
hosts => ["localhost:9200"]
index => "index_logstash_text"
id => "id_index_logstash_text"
document_id => "%{id}"
}
}
if[type] == "type2" {
elasticsearch {
hosts => ["localhost:9200"]
index => "index_test_test"
id => "id_index_test_test"
document_id => "%{id}"
}
}
stdout {
# codec => json_lines
codec => rubydebug
}
}
自定义分析器
PUT /my_index
{
"settings": {
"number_of_shards": "2",
"number_of_replicas": "1",
"analysis": {
"analyzer": {
"ik_smart_pinyin": {
"type": "custom",
"tokenizer": "ik_smart",
"filter": [
"my_pinyin",
"word_delimiter"
]
},
"ik_max_word_pinyin": {
"type": "custom",
"tokenizer": "ik_max_word",
"filter": [
"my_pinyin",
"word_delimiter"
]
}
},
"filter": {
"my_pinyin": {
"type": "pinyin",
"keep_separate_first_letter": true,
"keep_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"lowercase": true,
"remove_duplicated_term": true
}
}
}
},
"mappings": {
"_source": {
"enabled": true
},
"properties": {
"mmsi": {
"type": "text"
},
"name": {
"analyzer": "ik_max_word_pinyin",
"type": "text"
},
"pinyin": {
"type": "text"
}
}
}
}
自定义模板
############################### template ##################
{
"index_patterns": "index_*",
"order": 1000,
"settings": {
"number_of_shards": "2",
"number_of_replicas": "1"
},
"mappings": {
"_source": {
"enabled": true
},
"properties": {
"mmsi": {
"type": "text"
},
"name": {
"type": "text"
},
"k_name":{
"type":"keyword"
},
"address":{
"type":"text"
},
"p_name":{
"type":"text"
}
}
},
"aliases": {}
}
############################### template ##################
input {
jdbc {
jdbc_driver_library => "D:/elk/logstash-7.11.1/test/postgresql-42.2.19.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5432/waterway?characterEncoding=utf8&serverTimezone=GMT%2B8"
jdbc_user => "postgres"
jdbc_password => "953598751"
schedule => "*/20 * * * * *"
statement => "select id,st_astext(ST_Force2D(geom)) locations,name,pinyin p_name from add_anchor"
tracking_column => "id"
tracking_column_type => "numeric"
id => "add_padd_anchorort"
use_column_value => true
jdbc_default_timezone =>"Asia/Shanghai"
type => "add_anchor"
}
}
filter {
mutate {
add_field => { "k_name" => "%{name}" }
}
mutate {
remove_field => ["@version","@timestamp"]
}
}
output {
elasticsearch {
hosts => ["121.196.158.48:9200"]
index => "index_add_anchor"
document_id => "%{id}"
template =>"D:/elk/logstash-7.11.1/test/template/hd.json"
template_name => "index_my"
template_overwrite => true
# user => "elastic"
# password => "953598751"
}
}
JavaUtil
@Component
public class EsUtil {
private static RestHighLevelClient client;
private EsUtil(@Autowired RestHighLevelClient restHighLevelClient){
client = restHighLevelClient;
}
/**
* 创建索引
* @param indexName 索引名称
*/
public static boolean createIndex(String indexName) throws IOException {
GetIndexRequest getRequest = new GetIndexRequest(indexName);
if(client.indices().exists(getRequest, RequestOptions.DEFAULT)){
System.out.println("索引已经存在");
return false;
}
if(client.indices().create(new CreateIndexRequest(indexName),RequestOptions.DEFAULT).isAcknowledged()){
System.out.println("索引创建成功");
return true;
}
return false;
}
/**
* 判断所以是否已经存在
* @param indexName 索引名称
*/
public static boolean indexExists(String indexName){
GetIndexRequest getRequest = new GetIndexRequest(indexName);
try {
if(client.indices().exists(getRequest, RequestOptions.DEFAULT)){
System.out.println("索引已经存在");
return false;
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 创建索引,对中文字段进行 ik+pinyin 分词
* @param indexName 索引名称
*/
public static boolean createIndexIkPinyin(String indexName){
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
// ik + pinyin 分词
createIndexRequest.settings("{\n" +
" \"number_of_shards\": \"2\",\n" +
" \"number_of_replicas\": \"1\",\n" +
" \"analysis\": {\n" +
" \"analyzer\": {\n" +
" \"ik_max_word_pinyin\": {\n" +
" \"type\": \"custom\",\n" +
" \"tokenizer\": \"ik_max_word\",\n" +
" \"filter\": [\n" +
" \"my_pinyin\",\n" +
" \"word_delimiter_graph\"\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"filter\": {\n" +
" \"my_pinyin\": {\n" +
" \"type\": \"pinyin\",\n" +
" \"keep_separate_first_letter\": true,\n" +
" \"keep_full_pinyin\": true,\n" +
" \"keep_original\": true,\n" +
" \"limit_first_letter_length\": 16,\n" +
" \"lowercase\": true,\n" +
" \"remove_duplicated_term\": true\n" +
" }\n" +
" }\n" +
" }\n" +
" }",XContentType.JSON);
createIndexRequest.mapping("{\n" +
" \"_source\": {\n" +
" \"enabled\": true\n" +
" },\n" +
" \"properties\": {\n" +
" \"mmsi\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"name\": {\n" +
" \"analyzer\": \"ik_max_word_pinyin\",\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"pinyin\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
" }",XContentType.JSON);
CreateIndexResponse createIndexResponse = null;
try {
createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
assert createIndexResponse != null;
return createIndexResponse.isAcknowledged();
}
/**
* 删除索引
* @param indexName 索引名称
*/
public static boolean deleteIndex(String indexName){
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
AcknowledgedResponse delete = null;
try {
delete = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
assert delete != null;
return delete.isAcknowledged();
}
/**
* 根据索引和 id 删除一条数据
* @param indexName 索引名称
* @param id 文档 id
*/
public static boolean deleteIndexId(String indexName,String id){
DeleteRequest request = new DeleteRequest(indexName, id);
DeleteResponse delete1 = null;
try {
delete1 = client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
assert delete1 != null;
return delete1.getResult() == DocWriteResponse.Result.DELETED;
}
/**
* 根据索引和 id 删除一条数据
* @param indexName 索引
* @param ids 文档 id
*/
public static boolean bulkDeleteIndexId(String indexName, Set<String> ids) {
BulkRequest bulkRequest = new BulkRequest();
for(String id : ids){
bulkRequest.add(new DeleteRequest(indexName).id(id));
}
BulkResponse bulk = null;
try {
bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
assert bulk != null;
return !bulk.hasFailures();
}
/**
* 插入数据
* @param indexName 索引
* @param id 文档 id
* @param data 数据,JSON格式
*/
public static void insertData(String indexName, String id,String data) {
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.id(id).source(data, XContentType.JSON);
try {
client.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 批量插入数据
* @param indexName 索引
* @param data 数据(json)-- id,source
*/
public static boolean bulkInsertData(String indexName, Map<String,String> data){
BulkRequest bulkRequest = new BulkRequest();
for(Map.Entry<String,String> entry : data.entrySet()){
bulkRequest.add(new IndexRequest(indexName).id(entry.getKey()).source(entry.getValue(),XContentType.JSON));
}
BulkResponse bulk = null;
try {
bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
assert bulk != null;
return !bulk.hasFailures();
}
/**
* 查询数据
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.x/java-rest-high-search.html
* @param indexName 索引
*/
public static Object search(String indexName) throws IOException {
SearchRequest searchRequest = new SearchRequest(indexName);
// 查询条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
searchSourceBuilder.timeout(new TimeValue(3, TimeUnit.SECONDS));
// // 按 _score 降序排序(默认)
// searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
// // 按 _id 字段升序排序
// searchSourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.ASC));
// 设置返回结果中包含或排除那些字段
// String[] includeFields = new String[] {"title", "innerObject.*"};
// String[] excludeFields = new String[] {"user"};
// searchSourceBuilder.fetchSource(includeFields, excludeFields);
searchRequest.source(searchSourceBuilder);
SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = search.getHits();
TotalHits totalHits = hits.getTotalHits();
System.out.println(totalHits.value);
System.out.println(hits.getMaxScore());
SortField[] sortFields = hits.getSortFields();
SearchHit[] hits1 = hits.getHits();
for(SearchHit hit : hits1){
System.out.println(hit.getIndex());
System.out.println(hit.getId());
}
return "";
}
}