目的(痛点):
Elasticsearch 同集群、不同集群间索引的复制,按检索条件复制索引文档,备份索引数据等。
方案:
1.克隆(Clone index API):
文档导航位置:REST APIs » Index APIs » Clone index API
说明:同集群,可用于 Mount snapshot 后的索引复制。2.reindex(Reindex API):
文档导航位置:REST APIs » Document APIs » Reindex API
说明:可用于同集群、远程集群的文档按检索条件复制。远程集群复制不能使用索引分片读取。3.快照(Snapshot and restore APIs)
文档导航位置:REST APIs » Snapshot and restore APIs
说明:需借助外部分布式储存库,如 HDFS。可用于索引不同集群间备份、恢复。不支持索引检索条件复制。4.logstash
说明:使用 logstash 对 ES 索引进行复制转移,支持索引分片读取,按检索条件复制。5.自我脚本实现
说明:通过脚本程序按时间字段分片查询,或者使用 Scroll API 轮询遍历。
Scroll API:
文档导航位置:REST APIs » Search APIs » Scroll API
reindex 和 logstash 内部间接使用 Scroll API 实现。
拓展:
1.reindex 示例:
目标服务器 `elasticsearch.yml` 配置:
reindex.remote.whitelist: "10.10.100.100:*,10.10.200.200:*"
POST _reindex?wait_for_completion=false { "conflicts": "proceed", "source": { "remote": { "host": "http://10.10.100.1:9201" }, "index": "index-20220309", "slice": { "id": 0, "max": 2 }, "query": { "bool": { "must": [ { "terms": { "api_num": [ "100", "101", "102" ] } } ] } }, "sort": { "o_l_time": "asc" } }, "dest": { "index": "core.index-20220309" }, "script": { "source": "ctx._source.remove('field-a');ctx._source.remove('field-b');" } }
注:Reindexing from remote clusters does not support manual or automatic slicing.
slices 方式一(自动分片):
_reindex?slices=20&refresh
slices 方式二(手工分片):
```
"slice": {
"id": 0,
"max": 20
},
```
推荐:slice 数 与 索引主分片数一致。
2.logstash 配置示例:
① 创建 `es_sync.conf`:
input { elasticsearch { hosts => "10.10.100.1:9201" index => "index-20220309" query => '{"query": {"bool": {"must": [{"terms": {"api_num": ["100", "101", "102"] } } ] } }, "sort": {"o_l_time": "asc"} }' size => 1000 scroll => "3m" docinfo => true docinfo_target => "[@metadata][doc]" slices => 20 } } filter { mutate { remove_field => ["@version", "field-a", "field-b"] } } output { elasticsearch { hosts => "10.10.200.200:9202" index => "core.index-20220309" document_type => "%{[@metadata][doc][_type]}" document_id => "%{[@metadata][doc][_id]}" } }
② 启动 logstash:
bin/logstash -f es_sync.conf
③ 创建 `runner.py ` Python 脚本:
import datetime import os try: os.rename("/data/logstash/logstash-7.12.1/es_sync.conf", "/data/logstash/logstash-7.12.1/es_sync.conf.bak") except OSError: pass date_str = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y%m%d") with open("/data/logstash/logstash-7.12.1/es_sync.conf.tpl") as f, open("/data/logstash/logstash-7.12.1/es_sync.conf", "w+") as f2: s = f.read().encode("utf-8") s2 = s.replace("${date_str}", date_str) f2.write(s2) res = os.system("sh /data/logstash/logstash-7.12.1/bin/logstash -f /data/logstash/logstash-7.12.1/es_sync.conf") print(res)
④ 创建 crontab 任务:
3 3 * * * /usr/bin/python /data/logstash/logstash-7.12.1/runner.py >> /data/logstash/logstash-7.12.1/log.txt 2>&1
⑤ 监控任务执行结果(今日凌晨复制昨日数据,校对昨日数据):
POST index-20220309/_count { "query": { "bool": { "must": [ { "terms": { "action": [ "100", "101", "102" ] } } ] } } } GET core.index-20220309/_count
注:索引时间可计算,即 index-20220309 可写为 `%3Ccore.index-%7Bnow%2Fd-1d%7ByyyyMMdd%7C%2B08%3A00%7D%7D%3E`。
文档导航位置:REST APIs » API conventions » Date math support in index names
3.实践案例:
reindex 425115575 数据量复制 21448181 数据到远程集群,用时 1.07 h
logstash 425115575 数据量复制 21448181 数据到远程集群,用时 8 min。(slices => ${索引分片数})