コンテンツにスキップ

Self-Learning RAG(自己学習型RAG)システム

概要

Self-Learning RAGは、回答の信頼度を自己評価し、低信頼度の場合は自動的に再埋め込みやMkDocsドキュメントとの同期を行う、自己改善型の検索拡張生成システムです。

システム構成

graph TD
    A[ユーザークエリ] --> B[RAG検索]
    B --> C[回答生成]
    C --> D[信頼度評価]
    D --> E{信頼度 > 閾値?}
    E -->|Yes| F[回答返却]
    E -->|No| G[自動学習トリガー]
    G --> H[MkDocs同期]
    G --> I[再埋め込み]
    H --> J[Neo4j更新]
    I --> J
    J --> K[学習ログ記録]
    K --> B

コア機能

1. 信頼度スコアリング

回答の品質を以下の指標で評価:

指標 重み 説明
コサイン類似度 0.4 検索結果との意味的類似性
LLM自己評価 0.3 モデル自身の確信度
ソース数 0.2 参照できた情報源の数
文脈一致度 0.1 会話履歴との整合性

2. 自動再埋め込み

信頼度が閾値以下の場合、以下を実行:

  • 関連文書の再取得
  • Embeddingの再生成
  • ベクトルDBへの更新登録

3. MkDocs同期

定期的にMkDocsドキュメントをスキャンし、新規・更新内容をRAGシステムに反映

実装

1. 信頼度評価エンジン

# app/agents/self_learning_rag.py
from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator
from app.infrastructure.external.openai_client import OpenAIClient
from datetime import datetime
import json
import os

class SelfLearningRAG:
    """自己学習型RAGシステム"""

    def __init__(
        self,
        confidence_threshold: float = 0.7,
        learning_log_path: str = "data/learning_log.json"
    ):
        self.rag = GraphRAGOrchestrator()
        self.llm = OpenAIClient(api_key=os.getenv("OPENAI_API_KEY"))
        self.confidence_threshold = confidence_threshold
        self.learning_log_path = learning_log_path

    def query_with_self_learning(self, question: str) -> dict:
        """自己学習機能付き検索"""

        # 1. 通常のRAG検索
        result = self.rag.query_with_graph_context(question)

        # 2. 信頼度評価
        confidence_score = self._evaluate_confidence(
            question=question,
            answer=result["answer"],
            sources=result["sources"],
            graph_relations=result["graph_relations"]
        )

        # 3. 低信頼度の場合は自動学習
        if confidence_score < self.confidence_threshold:
            self._trigger_learning(question, result, confidence_score)

        return {
            "answer": result["answer"],
            "confidence": confidence_score,
            "sources": result["sources"],
            "learned": confidence_score < self.confidence_threshold
        }

    def _evaluate_confidence(
        self,
        question: str,
        answer: str,
        sources: list,
        graph_relations: list
    ) -> float:
        """信頼度スコア計算"""

        # 1. コサイン類似度(0.4)
        similarity_score = self._calculate_similarity(question, sources)

        # 2. LLM自己評価(0.3)
        self_eval_score = self._llm_self_evaluation(question, answer)

        # 3. ソース数スコア(0.2)
        source_score = min(len(sources) / 5.0, 1.0)

        # 4. グラフ関係性スコア(0.1)
        graph_score = min(len(graph_relations) / 3.0, 1.0)

        # 重み付け合計
        total_score = (
            similarity_score * 0.4 +
            self_eval_score * 0.3 +
            source_score * 0.2 +
            graph_score * 0.1
        )

        return round(total_score, 3)

    def _calculate_similarity(self, question: str, sources: list) -> float:
        """質問と情報源の類似度計算"""
        if not sources:
            return 0.0

        # Embeddingを使用した類似度計算
        from openai import OpenAI
        client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

        q_embedding = client.embeddings.create(
            model="text-embedding-3-small",
            input=question
        ).data[0].embedding

        similarities = []
        for source in sources[:3]:  # 上位3件で評価
            s_embedding = client.embeddings.create(
                model="text-embedding-3-small",
                input=source.page_content
            ).data[0].embedding

            # コサイン類似度
            similarity = self._cosine_similarity(q_embedding, s_embedding)
            similarities.append(similarity)

        return sum(similarities) / len(similarities) if similarities else 0.0

    def _cosine_similarity(self, vec1: list, vec2: list) -> float:
        """コサイン類似度計算"""
        import numpy as np
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

    def _llm_self_evaluation(self, question: str, answer: str) -> float:
        """LLMによる自己評価"""
        prompt = f"""
        以下の質問と回答の品質を0.0〜1.0で評価してください。
        評価基準:
        - 回答の正確性
        - 質問への適切な対応
        - 情報の完全性

        質問: {question}
        回答: {answer}

        スコアのみを数値で返してください(例: 0.85)
        """

        try:
            response = self.llm.ask(prompt)
            score = float(response.strip())
            return max(0.0, min(1.0, score))
        except:
            return 0.5  # デフォルト値

    def _trigger_learning(self, question: str, result: dict, confidence: float):
        """自動学習プロセスの実行"""

        learning_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "question": question,
            "confidence": confidence,
            "actions_taken": []
        }

        # 1. MkDocsドキュメントから関連情報を再取得
        new_docs = self._sync_from_mkdocs(question)
        if new_docs:
            learning_entry["actions_taken"].append("mkdocs_sync")

        # 2. 再埋め込み
        if confidence < 0.5:
            self._re_embed_documents(question, result["sources"])
            learning_entry["actions_taken"].append("re_embedding")

        # 3. Neo4jグラフ更新
        self._update_graph_relations(question, new_docs)
        learning_entry["actions_taken"].append("graph_update")

        # 学習ログ記録
        self._log_learning(learning_entry)

    def _sync_from_mkdocs(self, question: str) -> list:
        """MkDocsドキュメントから関連情報を取得"""
        import glob
        from pathlib import Path

        docs_path = os.getenv("MKDOCS_PATH", "docs/")
        relevant_docs = []

        # キーワード抽出
        keywords = self._extract_keywords(question)

        # Markdownファイルをスキャン
        for md_file in glob.glob(f"{docs_path}/**/*.md", recursive=True):
            content = Path(md_file).read_text(encoding="utf-8")

            # キーワードマッチング
            if any(keyword.lower() in content.lower() for keyword in keywords):
                relevant_docs.append({
                    "file": md_file,
                    "content": content
                })

        # 新規文書をRAGに追加
        for doc in relevant_docs:
            self.rag.vectorstore.add_texts([doc["content"]])

        return relevant_docs

    def _extract_keywords(self, text: str) -> list:
        """キーワード抽出(簡易版)"""
        # 実際にはspaCyやTransformersを使用
        import re
        words = re.findall(r'\w+', text)
        return [w for w in words if len(w) > 3]

    def _re_embed_documents(self, question: str, sources: list):
        """文書の再埋め込み"""
        for source in sources:
            # 既存の埋め込みを更新
            self.rag.vectorstore.add_texts(
                [source.page_content],
                metadatas=[{"re_embedded": True, "trigger_question": question}]
            )

    def _update_graph_relations(self, question: str, new_docs: list):
        """Neo4jグラフに新しい関係を追加"""
        if not new_docs:
            return

        with self.rag.driver.session() as session:
            for doc in new_docs:
                # エンティティ抽出(簡易版)
                entities = self._extract_entities(doc["content"])

                for entity in entities:
                    session.run(
                        """
                        MERGE (n:Entity {name: $name})
                        SET n.last_updated = datetime()
                        """,
                        name=entity
                    )

    def _extract_entities(self, text: str) -> list:
        """エンティティ抽出(簡易版)"""
        # 実際にはNERモデルを使用
        import re
        # 大文字で始まる単語を抽出
        entities = re.findall(r'\b[A-Z][a-z]+\b', text)
        return list(set(entities))

    def _log_learning(self, entry: dict):
        """学習ログの記録"""
        logs = []
        if os.path.exists(self.learning_log_path):
            with open(self.learning_log_path, 'r', encoding='utf-8') as f:
                logs = json.load(f)

        logs.append(entry)

        # 最新100件のみ保持
        logs = logs[-100:]

        with open(self.learning_log_path, 'w', encoding='utf-8') as f:
            json.dump(logs, f, ensure_ascii=False, indent=2)

    def get_learning_stats(self) -> dict:
        """学習統計の取得"""
        if not os.path.exists(self.learning_log_path):
            return {"total_learnings": 0, "actions": {}}

        with open(self.learning_log_path, 'r', encoding='utf-8') as f:
            logs = json.load(f)

        actions_count = {}
        for log in logs:
            for action in log.get("actions_taken", []):
                actions_count[action] = actions_count.get(action, 0) + 1

        return {
            "total_learnings": len(logs),
            "actions": actions_count,
            "last_learning": logs[-1] if logs else None
        }

2. スケジューラー(自動同期)

# app/schedulers/mkdocs_sync_scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
from app.agents.self_learning_rag import SelfLearningRAG
import logging

logger = logging.getLogger(__name__)

def setup_mkdocs_sync():
    """MkDocs定期同期のセットアップ"""
    scheduler = BackgroundScheduler()
    rag = SelfLearningRAG()

    def sync_job():
        """同期ジョブ"""
        logger.info("Starting MkDocs sync...")
        try:
            # 全ドキュメントをスキャン
            docs = rag._sync_from_mkdocs("")
            logger.info(f"Synced {len(docs)} documents")
        except Exception as e:
            logger.error(f"Sync failed: {e}")

    # 毎日深夜2時に実行
    scheduler.add_job(sync_job, 'cron', hour=2, minute=0)
    scheduler.start()

    return scheduler

3. FastAPI統合

# app/routes/self_learning_routes.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from app.agents.self_learning_rag import SelfLearningRAG

router = APIRouter(prefix="/api/self-learning", tags=["Self-Learning RAG"])

class QueryRequest(BaseModel):
    question: str

def get_rag():
    return SelfLearningRAG()

@router.post("/query")
async def query_with_learning(
    request: QueryRequest,
    rag: SelfLearningRAG = Depends(get_rag)
):
    """自己学習型RAG検索"""
    result = rag.query_with_self_learning(request.question)
    return result

@router.get("/stats")
async def get_stats(rag: SelfLearningRAG = Depends(get_rag)):
    """学習統計取得"""
    return rag.get_learning_stats()

@router.post("/manual-sync")
async def manual_sync(rag: SelfLearningRAG = Depends(get_rag)):
    """手動でMkDocs同期を実行"""
    docs = rag._sync_from_mkdocs("")
    return {"synced_documents": len(docs)}

4. Docker Compose設定

version: "3.9"

services:
  api:
    build: .
    environment:
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      MKDOCS_PATH: /docs
      DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
    volumes:
      - ./docs:/docs:ro  # MkDocsドキュメントをマウント
      - ./data:/app/data  # 学習ログ保存
    depends_on:
      - postgres
      - neo4j
    ports:
      - "8000:8000"
    networks:
      - backend

  # Cron for scheduled sync
  sync-scheduler:
    build: .
    command: python -m app.schedulers.mkdocs_sync_scheduler
    environment:
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      MKDOCS_PATH: /docs
      DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
    volumes:
      - ./docs:/docs:ro
      - ./data:/app/data
    depends_on:
      - postgres
      - neo4j
    networks:
      - backend

networks:
  backend:

監視とモニタリング

学習ダッシュボード

# app/ui/learning_dashboard.py
import streamlit as st
import pandas as pd
import json
from datetime import datetime

st.title("Self-Learning RAG ダッシュボード")

# 学習ログ読み込み
with open("data/learning_log.json", "r") as f:
    logs = json.load(f)

df = pd.DataFrame(logs)

# 統計表示
st.metric("総学習回数", len(logs))
st.metric("平均信頼度", df["confidence"].mean())

# アクション分布
actions = []
for log in logs:
    actions.extend(log.get("actions_taken", []))

action_df = pd.DataFrame({"action": actions})
st.bar_chart(action_df["action"].value_counts())

# 時系列グラフ
df["timestamp"] = pd.to_datetime(df["timestamp"])
st.line_chart(df.set_index("timestamp")["confidence"])

パフォーマンス最適化

1. キャッシング戦略

from functools import lru_cache
import hashlib

class SelfLearningRAG:
    @lru_cache(maxsize=200)
    def _cached_confidence_eval(self, question_hash: str):
        """信頼度評価をキャッシュ"""
        pass

2. バッチ処理

def batch_re_embed(documents: list, batch_size: int = 50):
    """バッチでの再埋め込み"""
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        # バッチ処理
        yield batch

ベストプラクティス

  1. 閾値の調整: ドメインに応じて信頼度閾値を調整
  2. 定期メンテナンス: 古い埋め込みは定期的に更新
  3. ログ分析: 学習パターンを分析して改善
  4. 増分学習: 全体再学習ではなく差分更新を優先
  5. A/Bテスト: 学習前後で回答品質を比較

トラブルシューティング

問題1: 過学習

# 学習頻度の制限
def _trigger_learning(self, question: str, result: dict, confidence: float):
    # 同一質問の短時間内の再学習を防止
    if self._recently_learned(question):
        return

問題2: ストレージ不足

# 古いログの削除
find data/learning_log.json -type f -mtime +30 -delete

参考リソース