Python RAG Elasticsearch

作者:追风剑情 发布于:2026-5-19 16:47 分类:AI

在 RAG 模型中,Elasticsearch 是一个强大的工具,用于实现高效的文档检索。Elasticsearch 是一个基于 Lucene 的开源搜索引擎,提供了分布式、可扩展的搜索和分析能力。Elasticsearch 的主要特点如下。

  • 全文搜索: Elasticsearch 支持复杂的全文搜索功能,包括对文档内容的分词、同义词处理、短语匹配等。它能够理解和处理自然语言查询,从而提供相关性较高的检索结果。
  • 分布式架构: Elasticsearch 是一个分布式搜索引擎,能够在多台机器上运行,支持大规模数据集的存储和查询。它通过分片和副本机制实现数据的分布和冗余,提升了系统的可靠性和性能。
  • 快速检索: Elasticsearch 使用倒排索引(inverted index)来加速检索过程。这种索引结构将文档中的每个词与包含该词的文档关联起来,从而支持快速的搜索和筛选。
  • 强大的查询语言: Elasticsearch 提供了功能丰富的查询语言(DSL),允许用户构建复杂的搜索查询,如布尔查询、范围查询、聚合查询等,以满足不同的检索需求。
  • 实时数据处理: Elasticsearch 支持近实时的数据索引和搜索,能够在数据更新后迅速反映最新的搜索结果,适用于动态内容搜索和实时分析。

一、安装 elasticsearch 服务器

下载地址 https://mirrors.huaweicloud.com/elasticsearch/

修改配置文件(config/elasticsearch.yml)以支持 http 访问,默认仅支持 https 访问。

# 设置为false,关闭安装检测
xpack.security.enabled: false
xpack.security.enrollment.enabled: true

# Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents
xpack.security.http.ssl:
# 关闭ssl
  enabled: false
  keystore.path: certs/http.p12  

二、安装中文分词插件

下载地址 https://release.infinilabs.com/analysis-ik/stable/
注意:要下载与 elasticsearch 版本号相同的 elasticsearch-analysis-ik

手动将 elasticsearch-analysis-ik 解压到 {elasticsearch安装目录}\plugins\ik 下。

启动 bin\elasticsearch.bat

用浏览器访问 http://localhost:9200/ 测试服务器是否可正常访问。

在 PowerShell 中执行 curl.exe -X GET "localhost:9200/_cat/plugins?v" 测试中文分词插件是否已生效。

三、安装Python插件

pip install elasticsearch -i https://pypi.tuna.tsinghua.edu.cn/simple

示例

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import requests
import time

# ========== 初始化本地 Elasticsearch ==========
# 确保 Elasticsearch 已安装并运行在本地
es = Elasticsearch(hosts=["http://localhost:9200"], request_timeout=30)

# ========== 初始化本地 Ollama 模型(替代 DeepSeek API)==========
OLLAMA_GENERATE_URL = "http://localhost:11434/api/generate"
GENERATE_MODEL = "deepseek-r1:7b"  # 使用您本地的 DeepSeek 模型

def extend_text_with_ollama(text):
    """使用本地 Ollama 模型扩展文本"""
    prompt = f"""请扩展以下文本,提供不同的视角和细节。扩展文本应至少包含150个词。

文本: {text}

请从多个角度(如技术发展、社会影响、未来趋势等)进行扩展。"""

    response = requests.post(
        OLLAMA_GENERATE_URL,
        json={"model": GENERATE_MODEL, "prompt": prompt, "stream": False}
    )
    if response.status_code == 200:
        return response.json()["response"]
    else:
        return f"生成失败: {response.status_code}"

# 索引名称
index_name = 'documents'

# 创建索引
def create_index(index_name):
    try:
        # 检查索引是否存在,如果存在则删除(为了演示效果)
        if es.indices.exists(index=index_name):
            es.indices.delete(index=index_name)
            print(f"已删除旧索引 '{index_name}'")
        
        # 创建新索引,配置中文分词器(需要安装 ik 分词插件)
        es.indices.create(
            index=index_name,
            body={
                "settings": {
                    "number_of_shards": 1,
                    "number_of_replicas": 0,
                    "analysis": {
                        "analyzer": {
                            "ik_smart_analyzer": {
                                "type": "ik_smart"
                            }
                        }
                    }
                },
                "mappings": {
                    "properties": {
                        "text": {
                            "type": "text",
                            "analyzer": "ik_smart_analyzer"  # 使用中文分词
                        }
                    }
                }
            }
        )
        print(f"索引 '{index_name}' 创建成功")
    except Exception as e:
        print(f"创建索引时出错: {e}")

# 批量插入文档
def insert_documents(index_name, documents):
    actions = [
        {
            "_index": index_name,
            "_source": {
                "text": doc
            }
        }
        for doc in documents
    ]
    try:
        success, failed = bulk(es, actions, stats_only=True)
        print(f"文档插入成功: {success} 条,失败: {failed} 条")
    except Exception as e:
        print(f"插入文档时出错: {e}")

# 查询文档
def search_documents(index_name, query):
    try:
        response = es.search(
            index=index_name,
            body={
                "query": {
                    "match": {  # 使用 match 查询(会进行分词)
                        "text": query
                    }
                },
                "size": 10  # 返回最多10条结果
            }
        )
        return response
    except Exception as e:
        print(f"查询时出错: {e}")
        return None

# 示例文档(中文)
documents = [
    "人工智能正在改变世界。",
    "人工智能正在革新包括医疗和金融在内的各个行业。",
    "人工智能对技术产生了深远的影响。",
    "机器学习和深度学习是人工智能的子领域。",
    "人工智能算法正在快速发展。",
    "人工智能在技术领域的未来前景广阔。"
]

# 用户查询
query = "人工智能如何影响技术?"

# 执行流程
print("========== 1. 创建索引 ==========")
create_index(index_name)

print("\n========== 2. 插入文档 ==========")
insert_documents(index_name, documents)

# 等待索引刷新
time.sleep(1)

print("\n========== 3. 执行检索 ==========")
response = search_documents(index_name, query)

if response:
    print(f"查询: {query}")
    print(f"共找到 {response['hits']['total']['value']} 条结果\n")
    print("检索结果:")
    for i, hit in enumerate(response['hits']['hits']):
        print(f"  {i+1}. 得分: {hit['_score']:.4f}")
        print(f"     文档: {hit['_source']['text']}")
    
    # 获取最相关的文档
    if response['hits']['hits']:
        most_relevant_doc = response['hits']['hits'][0]['_source']['text']
        print(f"\n最相关文档: {most_relevant_doc}")
        
        print("\n========== 4. 使用本地 DeepSeek 扩展文本 ==========")
        extended_text = extend_text_with_ollama(most_relevant_doc)
        print(f"\n扩展后的文本:\n{extended_text}")
else:
    print("查询失败")

测试运行
11111.png

标签: AI

Powered by emlog  蜀ICP备18021003号-1   sitemap

川公网安备 51019002001593号