栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

【Elasticsearch】使用Python完成对ES的插入操作

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Elasticsearch】使用Python完成对ES的插入操作

实现思路

1.Python搭建Flask服务,编写ES脚本。
2.通过Java调用Python接口,完成对ES的插入操作。

环境配置

Elasticsearch 7.16.0

具体代码实现 ESObject模板
import json
from flask import Flask, request, jsonify, Response
import jieba
import time
import hashlib
import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

server = Flask(__name__)
server.config["JSON_AS_ASCII"] = False


class ESObject:
    def __init__(self, index_name, index_type, host='127.0.0.1', port=9300):
        self.host = host
        self.port = port
        self.index_name = index_name
        self.index_type = index_type

        self.es_obj = self.connect_elasticsearch()

    def set_index_name(self, index_name):
        self.index_name = index_name

    def set_index_type(self, index_type):
        self.index_type = index_type

    def connect_elasticsearch(self):
        """
        创建连接
        :return:
        """
        _es = None
        _es = Elasticsearch([{'host': self.host, 'port': self.port}], request_timeout=60, max_retries=3,
                            retry_on_timeout=True)
        if _es.ping():
            print('The connection is successful!')
        else:
            print("Error: ES could not connect!")
        return _es

    def create_index(self, settings):
        """
        创建索引(数据库)
        访问“http://localhost:9200/entities/_mappings”验证创建是否成功
        :return:
        """
        created = False
        try:
            if not self.es_obj.indices.exists(self.index_name):
                # 参数ignore = 400在检查后不再需要,因为这可以防止错误地覆盖现有索引
                self.es_obj.indices.create(index=self.index_name, ignore=400, body=settings)
                print("Created Index")
            created = True
        except Exception as ex:
            print(str(ex))
        finally:
            return created

    def delete_index(self):
        try:
            if self.es_obj.indices.exists(self.index_name):
                # 参数ignore 用来忽略 Index 不存在而删除失败导致程序中断的问题
                self.es_obj.indices.delete(index=self.index_name, ignore=[400, 404])
                print("Deleted Index")
        except Exception as ex:
            print(str(ex))

    def store_record(self, record):
        try:
            outcome = self.es_obj.index(index=self.index_name, doc_type=self.index_type, body=record)
            print(outcome['result'])
            return outcome
        except Exception as ex:
            print("Error in indexing data")
            print(str(ex))

    def store_record_list(self, record_list):
        for record in record_list:
            self.store_record(record)

    def bulk_index_data(self, record_list):
        """
        批量插入
        :param record_list:
        :return:
        """
        ACTIONS = []
        i = 1
        for record in record_list:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                # "_id": i,  # _id 可以默认生成,不赋值
                "_source": record
            }
            i += 1
            ACTIONS.append(action)
        success, _ = bulk(self.es_obj, ACTIONS, index=self.index_name, raise_on_error=True)
        print('Performed %d actions' % success)

    def get_data_by_id(self, id):
        res = self.es_obj.get(index=self.index_name, doc_type=self.index_type, id=id)
        return res['hits']

    def get_data_by_body(self, search):
        # res = self.es_obj.search(index=self.index_name, doc_type=self.index_type, body=search)
        res = self.es_obj.search(index=self.index_name, body=search)
        return res['hits']

    def update_data(self, id, body):
        res = self.es_obj.update(index=self.index_name, doc_type=self.index_type, id=id, body=body)
        return res

    def delete_type_data(self):
        query_object = {'query': {'match_all': {}}}
        res = self.es_obj.delete_by_query(index_name=self.index_name, doc_type=self.index_type, body=query_object)
        return res

    def delect_index_data(self, id):
        res = self.es_obj.delete(index=self.index_name, doc_type=self.index_type, id=id)
        return res

    def delete_by_query(self, query):
        res = self.es_obj.delete_by_query(index=self.index_name, doc_type=self.index_type, body=query)
        return res


def secret_key(length=30):
    """
    Generate secret key from alpha and digit.
    :param length: length of secret key.
    :return: [length] long secret key.
    """
    key = ''
    while length:
        key += random.choice(string.ascii_letters + string.digits)
        length -= 1
    return key


def hash_code(*args, **kwargs):
    """
    Generate 64-strings(in hashlib.sha256()) hash code.
    :param args: for any other position args packing.
    :param kwargs: for any other key-word args packing.
    :return: 64-strings long hash code.
    """
    text = ''
    if not args and not kwargs:
        text += time.strftime("%Y%m%d%H%M%s")
    if args:
        for arg in args:
            text += str(arg)
    if kwargs:
        for kwarg in kwargs:
            text += str(kwargs[kwarg])
    return hashlib.sha256(text.encode("utf-8")).hexdigest()


if __name__ == '__main__':
    server.run(host='0.0.0.0', port=8660, debug=False)

插入函数及接口
def es_insert(datas):
    try:
        # 可以等同于 DataBase
        index_name = "index_name"
        # 可以等同于 Table
        index_type = "_doc"
		# ES对象
        es = ESObject(index_name, index_type, 'localhost', 9200)
		
        # # 删除索引
        # es.delete_index()
        #
        # # 建立索引
        # settings = {
        #     "settings": {
        #         "number_of_shards": 5,
        #         "number_of_replicas": 0
        #     },
        #     "index": {
        #         "refresh_interval": "20s"
        #     },
        #     "mappings": {
        #         index_type: {
        #             "dynamic": "strict",
        #             "properties": {
        #                 "es_id": {
        #                     "type": "text"
        #                 }, "content": {
        #                     "type": "text"
        #                 }, "file_name": {
        #                     "type": "text"
        #                 }, "jieba_content": {
        #                     "type": "text"
        #                 }
        #             }
        #         }
        #     }
        # }
        # es.create_index(settings)
		#插入操作
        record_list = []
        file_name = datas["filename"]
        for i, data in enumerate(datas["contentList"]):
            jieba_con_list = jieba.lcut(data)
            jieba_con_str = str.join(" ", jieba_con_list)
            es_id = secret_key()
            # print(es_id)
            # print(secret_key())
            record = {"es_id": es_id, "content": data, "file_name": file_name, "jieba_content": jieba_con_str}
            record_list.append(record)
            if len(record_list) >= 10000:
                start = time.time()
                es.bulk_index_data(record_list)
                print("Finished!")
                end = time.time()
                print(str(end - start))
                record_list = []
        print("record_list finished")
        start = time.time()
        es.bulk_index_data(record_list)
        print("success finished!")
        end = time.time()
        print(str(end - start))
        return "1"
    except Exception as e:
        print(e)
        return "0"


@server.route("/es_insert", methods=['get', 'post'])
def question_regex():
    if not request.data:
        return "fail"
     #获取接口调用传入的参数
    data = json.loads(request.data.decode("utf-8"))
    # print(data)
    res_code = es_insert(data)
    print(res_code)
    return Response(str(res_code))
拓展思路

ESObject是一个模板,其中有很多其他的函数。通过Java调用,还可以实现很多操作,如删除、查询等。

拓展删除操作示例
def es_delete_by_id(p_file_name):
    try:
        # 等同于 DataBase
        index_name = "index_name"
        # 等同于 Table
        index_type = "_doc"
        es = ESObject(index_name, index_type, 'localhost', 9200)
        ld_datas = es.get_data_by_body(10000)

        ll_hits = ld_datas['hits']
        ll_delete_list = []
        for i, d in enumerate(ll_hits):
            l_id = d['_id']
            l_file_name = d['_source']['file_name']
            if p_file_name == l_file_name:
                es.delete_index_data(l_id)
                ll_delete_list.append(l_file_name)
        print(list(set(ll_delete_list)))
        return "1"
    except Exception as e:
        print(e)
        return "0"


@server.route("/es_delete", methods=['get', 'post'])
def question_regex():
    if not request.data:
        return "fail"
    data = json.loads(request.data.decode("utf-8"))
    print(data)

    # filename = ''
    # l_res_code = es_delete_by_id(filename)
    l_delete_file_name = data["filename"]
    l_res_code = es_delete_by_id(l_delete_file_name)
    return Response(str(l_res_code))
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1040929.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号