1.Python搭建Flask服务,编写ES脚本。
2.通过Java调用Python接口,完成对ES的插入操作。
Elasticsearch 7.16.0
具体代码实现 ESObject模板import json from flask import Flask, request, jsonify, Response import jieba import time import hashlib import random import string from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk server = Flask(__name__) server.config["JSON_AS_ASCII"] = False class ESObject: def __init__(self, index_name, index_type, host='127.0.0.1', port=9300): self.host = host self.port = port self.index_name = index_name self.index_type = index_type self.es_obj = self.connect_elasticsearch() def set_index_name(self, index_name): self.index_name = index_name def set_index_type(self, index_type): self.index_type = index_type def connect_elasticsearch(self): """ 创建连接 :return: """ _es = None _es = Elasticsearch([{'host': self.host, 'port': self.port}], request_timeout=60, max_retries=3, retry_on_timeout=True) if _es.ping(): print('The connection is successful!') else: print("Error: ES could not connect!") return _es def create_index(self, settings): """ 创建索引(数据库) 访问“http://localhost:9200/entities/_mappings”验证创建是否成功 :return: """ created = False try: if not self.es_obj.indices.exists(self.index_name): # 参数ignore = 400在检查后不再需要,因为这可以防止错误地覆盖现有索引 self.es_obj.indices.create(index=self.index_name, ignore=400, body=settings) print("Created Index") created = True except Exception as ex: print(str(ex)) finally: return created def delete_index(self): try: if self.es_obj.indices.exists(self.index_name): # 参数ignore 用来忽略 Index 不存在而删除失败导致程序中断的问题 self.es_obj.indices.delete(index=self.index_name, ignore=[400, 404]) print("Deleted Index") except Exception as ex: print(str(ex)) def store_record(self, record): try: outcome = self.es_obj.index(index=self.index_name, doc_type=self.index_type, body=record) print(outcome['result']) return outcome except Exception as ex: print("Error in indexing data") print(str(ex)) def store_record_list(self, record_list): for record in record_list: self.store_record(record) def bulk_index_data(self, record_list): """ 批量插入 :param record_list: :return: """ ACTIONS = [] i = 1 for record in record_list: action = { "_index": self.index_name, "_type": self.index_type, # "_id": i, # _id 可以默认生成,不赋值 "_source": record } i += 1 ACTIONS.append(action) success, _ = bulk(self.es_obj, ACTIONS, index=self.index_name, raise_on_error=True) print('Performed %d actions' % success) def get_data_by_id(self, id): res = self.es_obj.get(index=self.index_name, doc_type=self.index_type, id=id) return res['hits'] def get_data_by_body(self, search): # res = self.es_obj.search(index=self.index_name, doc_type=self.index_type, body=search) res = self.es_obj.search(index=self.index_name, body=search) return res['hits'] def update_data(self, id, body): res = self.es_obj.update(index=self.index_name, doc_type=self.index_type, id=id, body=body) return res def delete_type_data(self): query_object = {'query': {'match_all': {}}} res = self.es_obj.delete_by_query(index_name=self.index_name, doc_type=self.index_type, body=query_object) return res def delect_index_data(self, id): res = self.es_obj.delete(index=self.index_name, doc_type=self.index_type, id=id) return res def delete_by_query(self, query): res = self.es_obj.delete_by_query(index=self.index_name, doc_type=self.index_type, body=query) return res def secret_key(length=30): """ Generate secret key from alpha and digit. :param length: length of secret key. :return: [length] long secret key. """ key = '' while length: key += random.choice(string.ascii_letters + string.digits) length -= 1 return key def hash_code(*args, **kwargs): """ Generate 64-strings(in hashlib.sha256()) hash code. :param args: for any other position args packing. :param kwargs: for any other key-word args packing. :return: 64-strings long hash code. """ text = '' if not args and not kwargs: text += time.strftime("%Y%m%d%H%M%s") if args: for arg in args: text += str(arg) if kwargs: for kwarg in kwargs: text += str(kwargs[kwarg]) return hashlib.sha256(text.encode("utf-8")).hexdigest() if __name__ == '__main__': server.run(host='0.0.0.0', port=8660, debug=False)插入函数及接口
def es_insert(datas): try: # 可以等同于 DataBase index_name = "index_name" # 可以等同于 Table index_type = "_doc" # ES对象 es = ESObject(index_name, index_type, 'localhost', 9200) # # 删除索引 # es.delete_index() # # # 建立索引 # settings = { # "settings": { # "number_of_shards": 5, # "number_of_replicas": 0 # }, # "index": { # "refresh_interval": "20s" # }, # "mappings": { # index_type: { # "dynamic": "strict", # "properties": { # "es_id": { # "type": "text" # }, "content": { # "type": "text" # }, "file_name": { # "type": "text" # }, "jieba_content": { # "type": "text" # } # } # } # } # } # es.create_index(settings) #插入操作 record_list = [] file_name = datas["filename"] for i, data in enumerate(datas["contentList"]): jieba_con_list = jieba.lcut(data) jieba_con_str = str.join(" ", jieba_con_list) es_id = secret_key() # print(es_id) # print(secret_key()) record = {"es_id": es_id, "content": data, "file_name": file_name, "jieba_content": jieba_con_str} record_list.append(record) if len(record_list) >= 10000: start = time.time() es.bulk_index_data(record_list) print("Finished!") end = time.time() print(str(end - start)) record_list = [] print("record_list finished") start = time.time() es.bulk_index_data(record_list) print("success finished!") end = time.time() print(str(end - start)) return "1" except Exception as e: print(e) return "0" @server.route("/es_insert", methods=['get', 'post']) def question_regex(): if not request.data: return "fail" #获取接口调用传入的参数 data = json.loads(request.data.decode("utf-8")) # print(data) res_code = es_insert(data) print(res_code) return Response(str(res_code))拓展思路
ESObject是一个模板,其中有很多其他的函数。通过Java调用,还可以实现很多操作,如删除、查询等。
拓展删除操作示例def es_delete_by_id(p_file_name): try: # 等同于 DataBase index_name = "index_name" # 等同于 Table index_type = "_doc" es = ESObject(index_name, index_type, 'localhost', 9200) ld_datas = es.get_data_by_body(10000) ll_hits = ld_datas['hits'] ll_delete_list = [] for i, d in enumerate(ll_hits): l_id = d['_id'] l_file_name = d['_source']['file_name'] if p_file_name == l_file_name: es.delete_index_data(l_id) ll_delete_list.append(l_file_name) print(list(set(ll_delete_list))) return "1" except Exception as e: print(e) return "0" @server.route("/es_delete", methods=['get', 'post']) def question_regex(): if not request.data: return "fail" data = json.loads(request.data.decode("utf-8")) print(data) # filename = '' # l_res_code = es_delete_by_id(filename) l_delete_file_name = data["filename"] l_res_code = es_delete_by_id(l_delete_file_name) return Response(str(l_res_code))