Python RAG 数据标注和对齐

作者:追风剑情 发布于:2026-4-28 17:42 分类:AI

本例中演示了数据标注及生成答案与标准答案之间的对齐(相似)程度计算。

安装本示例中用到的依赖:
pip install numpy jieba scikit-learn langchain-ollama langchain-chroma chromadb

"""
RAG 数据标注和对齐完整示例(使用 Ollama nomic-embed-text + deepseek-r1:7b)
功能:数据标注、向量检索、答案生成、多维度对齐评估(jieba+TF-IDF + 语义相似度)
"""

import warnings
# 忽略一些警告日志输出
warnings.filterwarnings("ignore", category=UserWarning)
import json
from typing import List, Dict, Any, Optional

import numpy as np
import jieba
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# 新版 LangChain 推荐从 langchain_ollama 导入
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_core.messages import HumanMessage
from langchain_chroma import Chroma

# ==================== 全局常量配置 ====================
OLLAMA_BASE_URL = "http://localhost:11434"      # Ollama 服务地址
EMBEDDING_MODEL = "nomic-embed-text"            # 用于向量化的嵌入模型
GENERATION_MODEL = "deepseek-r1:7b"             # 用于生成答案的大语言模型

# 停用词集合(中英文常见无意义词)
STOP_WORDS = set([
    '的', '了', '是', '在', '和', '有', '我', '不', '也', '都', '这', '那',
    '就', '而', '与', '更', '最', '又', '及', '或', '对', '被', '把', '让',
    'the', 'of', 'and', 'to', 'in', 'for', 'on', 'with', 'as', 'by', 'at'
])

# ==================== JSON 序列化辅助函数 ====================
def json_serializable(obj):
    """将 NumPy 类型转换为 Python 原生类型,以便 JSON 序列化"""
    if isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.bool_):
        return bool(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    else:
        # 如果不是可处理的类型,交给默认编码器处理(会抛出异常)
        raise TypeError(f"Object of type {type(obj)} is not JSON serializable")

# ==================== 工具函数 ====================
def chinese_tokenizer(text: str) -> List[str]:
    """
    使用 jieba 进行中文分词,并过滤停用词和单字符词。
    用于 TF-IDF 向量化时的分词回调。
    """
    words = jieba.cut(text, cut_all=False)          # 精确模式分词
    # 只保留长度大于1且不在停用词集合中的词
    return [w for w in words if len(w.strip()) > 1 and w not in STOP_WORDS]

# ==================== 第一步:数据标注 ====================
class DataAnnotator:
    """数据标注器:为文档和问答对添加元数据标签,并生成负样本。"""

    def __init__(self):
        self.annotated_data = []   # 存储所有标注过的数据

    def annotate_documents(self, documents: List[Dict]) -> List[Dict]:
        """
        标注原始文档,添加类别、来源、难度等元信息。
        :param documents: 每个元素包含 'content' 和可选的 'category', 'source'
        :return: 标注后的文档列表,每条带 'metadata' 字段
        """
        annotated_docs = []
        for doc in documents:
            content = doc["content"]
            annotated_doc = {
                "content": content,
                "metadata": {
                    "category": doc.get("category", "通用"),
                    "source": doc.get("source", "未知"),
                    "difficulty": "简单" if len(content) < 50 else ("中等" if len(content) < 200 else "困难"),
                    "length": len(content),
                    "is_relevant": True            # 正样本标记
                }
            }
            annotated_docs.append(annotated_doc)
            self.annotated_data.append(annotated_doc)
        return annotated_docs

    def annotate_qa_pairs(self, qa_pairs: List[Dict]) -> List[Dict]:
        """
        标注问答对,包含问题类型、正确标准答案等。
        :param qa_pairs: 每个元素包含 'question', 'answer', 'difficulty'
        :return: 标注后的问答对列表
        """
        annotated_qa = []
        for qa in qa_pairs:
            question = qa["question"]
            annotated_item = {
                "question": question,
                "correct_answer": qa["answer"],
                "metadata": {
                    "question_type": self._classify_question_type(question),
                    "difficulty": qa.get("difficulty", "中等"),
                    "context_needed": qa.get("context_needed", True),
                }
            }
            annotated_qa.append(annotated_item)
            self.annotated_data.append(annotated_item)
        return annotated_qa

    def create_negative_samples(self, positive_docs: List[Dict], negative_count: int = 2) -> List[Dict]:
        """
        生成负样本(与任何问题都不相关的文档),用于后续检索器的对比训练。
        :param positive_docs: 正样本文档列表(仅用于参考,不直接使用)
        :param negative_count: 生成的负样本数量
        :return: 负样本文档列表
        """
        negative_samples = []
        for i in range(negative_count):
            negative_doc = {
                "content": f"这是第{i+1}个与问题无关的干扰文档内容,例如天气、体育新闻等无关信息。",
                "metadata": {
                    "category": "干扰项",
                    "source": "负样本池",
                    "is_relevant": False,
                    "is_negative_sample": True
                }
            }
            negative_samples.append(negative_doc)
            self.annotated_data.append(negative_doc)
        return negative_samples

    def _classify_question_type(self, question: str) -> str:
        """根据问句中的关键词判断问题类型。"""
        if "什么" in question or "哪些" in question:
            return "事实型"
        elif "为什么" in question:
            return "因果型"
        elif "如何" in question or "怎么" in question:
            return "过程型"
        else:
            return "通用型"

# ==================== 第二步:检索器(使用 Ollama nomic-embed-text)====================
class RAGRetriever:
    """
    基于 Chroma 向量数据库的检索器,使用 Ollama 的 nomic-embed-text 作为嵌入模型。
    """
    def __init__(self, collection_name: str = "knowledge_base", persist_directory: str = "./chroma_db"):
        # 初始化 Ollama 嵌入模型(新版 langchain_ollama)
        self.embeddings = OllamaEmbeddings(
            base_url=OLLAMA_BASE_URL,
            model=EMBEDDING_MODEL
        )
        # 使用 LangChain 的 Chroma 封装,自动处理向量存储和检索
        self.vectorstore = Chroma(
            collection_name=collection_name,
            embedding_function=self.embeddings,
            persist_directory=persist_directory
        )
        self.persist_directory = persist_directory

    def add_documents(self, documents: List[Dict], doc_ids: Optional[List[str]] = None):
        """
        将标注后的文档添加到向量库中。
        :param documents: 每个元素包含 'content' 和 'metadata' 字段
        :param doc_ids: 可选的文档ID列表,不提供则自动生成
        """
        texts = [doc["content"] for doc in documents]
        metadatas = [doc.get("metadata", {}) for doc in documents]
        if doc_ids is None:
            doc_ids = [f"doc_{i}" for i in range(len(documents))]
        self.vectorstore.add_texts(texts=texts, metadatas=metadatas, ids=doc_ids)
        # 新版 Chroma 会自动持久化,不需要显式调用 persist()
        print(f"✓ 已添加 {len(documents)} 个文档到向量库")

    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        """
        根据查询文本检索最相关的文档。
        :param query: 查询字符串
        :param top_k: 返回的文档数量
        :return: 包含 content, metadata, similarity_score 的字典列表
        """
        results = self.vectorstore.similarity_search_with_relevance_scores(query, k=top_k)
        retrieved_docs = []
        for doc, score in results:
            retrieved_docs.append({
                "content": doc.page_content,
                "metadata": doc.metadata,
                "similarity_score": float(score)  # 确保转换为 Python float
            })
        return retrieved_docs

# ==================== 第三步:生成器(使用 Ollama deepseek-r1:7b)====================
class RAGGenerator:
    """基于检索上下文的答案生成器,调用本地 deepseek-r1:7b 模型。"""

    def __init__(self):
        self.llm = ChatOllama(
            base_url=OLLAMA_BASE_URL,
            model=GENERATION_MODEL,
            temperature=0.2,          # 低温度提高确定性
            num_predict=512           # 最大生成 token 数
        )

    def generate(self, question: str, contexts: List[str]) -> str:
        """
        基于检索到的上下文生成答案。
        :param question: 用户问题
        :param contexts: 检索到的相关文档内容列表
        :return: 生成的答案文本
        """
        context_text = "\n\n".join(contexts)
        prompt = f"""你是一个专业的知识问答助手。请严格基于以下【参考信息】回答问题。

【参考信息】
{context_text}

【问题】
{question}

【要求】
1. 仅基于参考信息回答,不要编造。
2. 如果信息不足以回答,请明确说“根据提供的信息无法回答”。
3. 回答要准确、简洁、完整。

【回答】
"""
        response = self.llm.invoke([HumanMessage(content=prompt)])
        return response.content.strip()

# ==================== 第四步:对齐评估器(jieba+TF-IDF + 语义相似度)====================
class AlignmentEvaluator:
    """
    评估生成答案与标准答案的对齐程度。
    采用两种方式:
      1. TF-IDF + 余弦相似度(基于 jieba 分词)
      2. 语义相似度(使用 Ollama nomic-embed-text 计算向量余弦相似度)
    最终综合评分。
    """
    def __init__(self):
        # TF-IDF 向量化器,使用自定义中文分词器
        self.tfidf_vectorizer = TfidfVectorizer(
            tokenizer=chinese_tokenizer,
            lowercase=False,
            smooth_idf=True,      # IDF 平滑
            sublinear_tf=True,    # 使用 1+log(tf) 压缩词频
            use_idf=True,
            max_df=0.85,          # 忽略过于频繁的词(>85%文档)
            min_df=1
        )
        # 用于语义相似度的嵌入模型(和检索器保持一致,使用常量)
        self.embeddings = OllamaEmbeddings(
            base_url=OLLAMA_BASE_URL,
            model=EMBEDDING_MODEL
        )

    def evaluate_tfidf_similarity(self, generated: str, annotated: str) -> float:
        """
        计算两个文本的 TF-IDF 余弦相似度(百分比)。
        """
        try:
            # 将两个文本进行 TF-IDF 向量化
            tfidf_matrix = self.tfidf_vectorizer.fit_transform([generated, annotated])
            sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])
            return float(sim[0][0] * 100)  # 确保返回 Python float
        except Exception as e:
            print(f"TF-IDF 计算出错: {e}")
            return 0.0

    def evaluate_semantic_similarity(self, generated: str, annotated: str) -> float:
        """
        使用 nomic-embed-text 计算两个文本的语义相似度(余弦相似度百分比)。
        """
        # 获取两个文本的向量表示
        vec_gen = self.embeddings.embed_query(generated)
        vec_ann = self.embeddings.embed_query(annotated)
        # 计算余弦相似度
        norm_gen = np.linalg.norm(vec_gen)
        norm_ann = np.linalg.norm(vec_ann)
        if norm_gen == 0 or norm_ann == 0:
            return 0.0
        similarity = np.dot(vec_gen, vec_ann) / (norm_gen * norm_ann)
        return float(similarity * 100)  # 确保返回 Python float

    def align_and_evaluate(self, generated: str, annotated: str) -> Dict[str, Any]:
        """
        综合评估对齐程度。
        :return: 包含各维度得分和最终结论的字典(所有值均为 Python 原生类型)
        """
        tfidf_score = self.evaluate_tfidf_similarity(generated, annotated)
        semantic_score = self.evaluate_semantic_similarity(generated, annotated)
        # 综合得分:语义权重 0.6,TF-IDF 权重 0.4(可以根据实际调整)
        combined_score = semantic_score * 0.6 + tfidf_score * 0.4
        is_aligned = bool(combined_score >= 60)   # 显式转换为 Python bool
        # 确定对齐等级
        if combined_score >= 80:
            level = "完全对齐"
        elif combined_score >= 60:
            level = "部分对齐"
        else:
            level = "未对齐"
        return {
            "tfidf_similarity": round(tfidf_score, 2),
            "semantic_similarity": round(semantic_score, 2),
            "combined_score": round(combined_score, 2),
            "alignment_level": level,
            "is_aligned": is_aligned
        }

# ==================== 第五步:主流程 ====================
def main():
    print("=" * 60)
    print("RAG 完整示例:数据标注 + 检索 + 生成 + 对齐评估")
    print(f"Ollama 地址: {OLLAMA_BASE_URL}")
    print(f"嵌入模型: {EMBEDDING_MODEL}")
    print(f"生成模型: {GENERATION_MODEL}")
    print("=" * 60)

    # ---------- 1. 准备原始数据 ----------
    print("\n【步骤1】准备原始数据...")
    raw_documents = [
        {
            "content": "人工智能(AI)是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。这些任务包括学习、推理、问题解决、感知和语言理解。",
            "category": "技术",
            "source": "AI百科"
        },
        {
            "content": "机器学习是人工智能的一个子领域,专注于开发能够从数据中学习并改进的算法。深度学习是机器学习的一个分支,使用多层神经网络进行学习。",
            "category": "技术",
            "source": "ML教程"
        },
        {
            "content": "DeepSeek 是由深度求索公司开发的先进大语言模型。它支持中文和英文等多种语言,能够进行对话、问答、代码生成等多种任务。",
            "category": "产品",
            "source": "产品文档"
        }
    ]
    raw_qa_pairs = [
        {
            "question": "什么是人工智能?",
            "answer": "人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。",
            "difficulty": "简单"
        },
        {
            "question": "机器学习与深度学习有什么关系?",
            "answer": "深度学习是机器学习的一个分支,机器学习又是人工智能的子领域。深度学习使用多层神经网络从数据中学习。",
            "difficulty": "中等"
        }
    ]

    # ---------- 2. 数据标注 ----------
    print("\n【步骤2】数据标注...")
    annotator = DataAnnotator()
    annotated_docs = annotator.annotate_documents(raw_documents)
    annotated_qa = annotator.annotate_qa_pairs(raw_qa_pairs)
    negative_samples = annotator.create_negative_samples(annotated_docs, negative_count=2)
    all_docs = annotated_docs + negative_samples
    print(f"✓ 标注完成:{len(annotated_docs)} 个文档,{len(annotated_qa)} 个问答对,{len(negative_samples)} 个负样本")

    # ---------- 3. 构建检索器并添加文档 ----------
    print("\n【步骤3】构建向量检索库(使用 nomic-embed-text)...")
    retriever = RAGRetriever()
    retriever.add_documents(all_docs)

    # ---------- 4. 初始化生成器 ----------
    print("\n【步骤4】初始化生成器(deepseek-r1:7b)...")
    generator = RAGGenerator()

    # ---------- 5. 初始化评估器 ----------
    evaluator = AlignmentEvaluator()

    # ---------- 6. 测试问答 ----------
    print("\n【步骤5】开始问答测试与对齐评估...")
    test_questions = [
        "什么是人工智能?",
        "机器学习是什么?",
        "DeepSeek 是什么?"
    ]

    results = []
    for question in test_questions:
        print(f"\n{'─'*50}")
        print(f"问题: {question}")

        # 检索相关文档
        retrieved = retriever.retrieve(question, top_k=3)
        print(f"检索到 {len(retrieved)} 个相关文档 (相似度分数: {[round(d['similarity_score'],3) for d in retrieved]})")

        # 提取上下文(所有检索到的文档都可作为上下文)
        contexts = [doc["content"] for doc in retrieved]
        # 生成答案
        generated = generator.generate(question, contexts)
        print(f"生成答案: {generated}")

        # 获取标准答案(从标注数据中查找)
        annotated_answer = None
        for qa in annotated_qa:
            if qa["question"] == question:
                annotated_answer = qa["correct_answer"]
                break
        # 针对 "DeepSeek 是什么?" 临时补充标准答案
        if question == "DeepSeek 是什么?" and not annotated_answer:
            annotated_answer = "DeepSeek是由深度求索公司开发的先进大语言模型,支持多种语言和任务。"

        if annotated_answer:
            print(f"标准答案: {annotated_answer}")
            alignment = evaluator.align_and_evaluate(generated, annotated_answer)
            print(f"\n【对齐评估结果】")
            print(f"  TF-IDF 相似度: {alignment['tfidf_similarity']}%")
            print(f"  语义相似度: {alignment['semantic_similarity']}%")
            print(f"  综合得分: {alignment['combined_score']}%")
            print(f"  对齐等级: {alignment['alignment_level']}")
            results.append({
                "question": question,
                "generated": generated,
                "annotated": annotated_answer,
                "alignment": alignment
            })
        else:
            print("未找到标准答案,跳过评估")

    # ---------- 7. 总结报告 ----------
    print("\n" + "=" * 60)
    print("【最终总结】")
    print("=" * 60)
    if results:
        aligned_count = sum(1 for r in results if r["alignment"]["is_aligned"])
        print(f"测试问题总数: {len(results)}")
        print(f"对齐成功数: {aligned_count}")
        print(f"对齐成功率: {aligned_count/len(results)*100:.1f}%")
    else:
        print("未完成评估。")

    # 保存标注数据到JSON文件(使用自定义序列化函数处理 NumPy 类型)
    with open("annotated_data.json", "w", encoding="utf-8") as f:
        json.dump({
            "documents": annotated_docs,
            "qa_pairs": annotated_qa,
            "test_results": results
        }, f, ensure_ascii=False, indent=2, default=json_serializable)
    print("\n✓ 标注数据已保存到 annotated_data.json")

if __name__ == "__main__":
    main()

运行测试
22222.png

标签: AI

Powered by emlog  蜀ICP备18021003号-1   sitemap

川公网安备 51019002001593号