可选方式

为了将集群的多个索引迁移到另外一个集群,经调研可以使用多种方式:

  • 使用snapshot方式,将索引数据快照到文件系统上(hdfs、本地+NAS),在另外一个集群上可以直接进行恢复。需要配置path.repo动态配置,指定文件目录
  • 使用Reindex方式, 需要两个集群进行配置互信,并重启

本次主要对 logstash方式进行测试

迁移过程

ENV前提

两个集群的数据,都是Elasticsearch 6.8.0的版本,logstash也选用相同的版本。

对于源集群目标集群版本不同的问题,没有进行测试

可以提交创建索引以及mapping。若是同步开始后发现没有索引,会自动创建。但是 mapping会自动映射。

Logstash配置文件

input、 output都是使用Elasticsearch插件:
Elasticsearch插件使用指南

完整配置

文件名称:1.conf

  1. input {
  2. elasticsearch {
  3. hosts => ["xxx:9200"]
  4. index => "test_*" ## 会采集“test_”开头的所有索引
  5. size =>5000
  6. scroll =>"50m"
  7. docinfo => true
  8. }
  9. }
  10. filter {
  11. }
  12. output {
  13. elasticsearch {
  14. hosts => ["xxxx-1.es.amazonaws.com:443"]
  15. ssl => true
  16. user => "xxx"
  17. password => "xxxx"
  18. pool_max => 5000
  19. pool_max_per_route =>500
  20. index => "%{[@metadata][_index]}_fix" #新建的索引加上_fix或者跟原索引相同去掉_fix
  21. document_type => "%{[@metadata][_type]}"
  22. document_id => "%{[@metadata][_id]}"
  23. ilm_enabled => false
  24. }
  25. }

配置说明

input.elasticsearch.index参数,只能接收 字符串, 可以使用 通配符的方式进行配置,
没有用户名密码,可以取消相关参数

执行迁移

执行命令

  1. ## -w 线程数,默认内核数
  2. ## -b 每个批次提交的行数
  3. ## -u 每次提交最长等待时间
  4. ./bin/logstash -f 1.conf -w 10 -b 500 -u 120

执行结果

  1. [root@vm10-0-0-207 logstash-6.8.0]# sh bin/logstash -f 1.conf -w 10 -b 500 -u 120
  2. Sending Logstash logs to /data/apps/logstash-6.8.0/logs which is now configured via log4j2.properties
  3. [2022-02-28T17:15:53,765][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
  4. [2022-02-28T17:15:53,779][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.8.0"}
  5. [2022-02-28T17:15:58,297][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch pool_max_per_route=>500, hosts=>[//dg-es2.db.sdns.kscbigdata.cloud:19201], ilm_enabled=>"false", index=>"%{[@metadata][_index]}", id=>"d7ef501749515c455c9e25003b5b91577798ee00c98616aef4bfc23bb1876939", document_id=>"%{[@metadata][_id]}", pool_max=>5000, document_type=>"%{[@metadata][_type]}", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_fb4c9d2b-20c4-401d-ae3b-c78b58a4c52e", enable_metric=>true, charset=>"UTF-8">, workers=>1, manage_template=>true, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, ilm_rollover_alias=>"logstash", ilm_pattern=>"{now/d}-000001", ilm_policy=>"logstash-policy", action=>"index", ssl_certificate_verification=>true, sniffing=>false, sniffing_delay=>5, timeout=>60, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}
  6. [2022-02-28T17:15:58,338][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>10, "pipeline.batch.size"=>500, "pipeline.batch.delay"=>120}
  7. [2022-02-28T17:15:58,654][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://dg-es2.db.sdns.kscbigdata.cloud:19201/]}}
  8. [2022-02-28T17:15:58,807][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://dg-es2.db.sdns.kscbigdata.cloud:19201/"}
  9. [2022-02-28T17:15:58,849][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
  10. [2022-02-28T17:15:58,852][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
  11. [2022-02-28T17:15:58,870][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//dg-es2.db.sdns.kscbigdata.cloud:19201"]}
  12. [2022-02-28T17:15:58,877][INFO ][logstash.outputs.elasticsearch] Using default mapping template
  13. [2022-02-28T17:15:58,898][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
  14. [2022-02-28T17:15:59,204][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x37b943cd sleep>"}
  15. [2022-02-28T17:15:59,247][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
  16. [2022-02-28T17:15:59,513][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
  17. [2022-02-28T17:16:30,592][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x37b943cd run>"}
  18. [2022-02-28T17:16:30,695][INFO ][logstash.runner ] Logstash shut down.
  19. [root@vm10-0-0-207 logstash-6.8.0]#
  20. [root@vm10-0-0-207 logstash-6.8.0]#
  21. [root@vm10-0-0-207 logstash-6.8.0]#

异常情况

第一次执行的参数是sh bin/logstash -f config/dam-dg.conf -w 50 -b 5000 -u 120
出现异常 :

  1. 022-02-28T17:06:45,416][INFO ][logstash.outputs.elasticsearch] Retrying individual bulk actions that failed or were rejected by the previous bulk request. {:count=>49}
  2. java.lang.OutOfMemoryError: Java heap space
  3. Dumping heap to java_pid92221.hprof ...
  4. Heap dump file created [1550816807 bytes in 29.064 secs]

后面调整为上面参数,执行成功,

检查结果

search结果进行对比, 数据完全一致。