Self-Learning RAG(自己学習型RAG)システム¶
概要¶
Self-Learning RAGは、回答の信頼度を自己評価し、低信頼度の場合は自動的に再埋め込みやMkDocsドキュメントとの同期を行う、自己改善型の検索拡張生成システムです。
システム構成¶
graph TD
A[ユーザークエリ] --> B[RAG検索]
B --> C[回答生成]
C --> D[信頼度評価]
D --> E{信頼度 > 閾値?}
E -->|Yes| F[回答返却]
E -->|No| G[自動学習トリガー]
G --> H[MkDocs同期]
G --> I[再埋め込み]
H --> J[Neo4j更新]
I --> J
J --> K[学習ログ記録]
K --> B
コア機能¶
1. 信頼度スコアリング¶
回答の品質を以下の指標で評価:
| 指標 | 重み | 説明 |
|---|---|---|
| コサイン類似度 | 0.4 | 検索結果との意味的類似性 |
| LLM自己評価 | 0.3 | モデル自身の確信度 |
| ソース数 | 0.2 | 参照できた情報源の数 |
| 文脈一致度 | 0.1 | 会話履歴との整合性 |
2. 自動再埋め込み¶
信頼度が閾値以下の場合、以下を実行:
- 関連文書の再取得
- Embeddingの再生成
- ベクトルDBへの更新登録
3. MkDocs同期¶
定期的にMkDocsドキュメントをスキャンし、新規・更新内容をRAGシステムに反映
実装¶
1. 信頼度評価エンジン¶
# app/agents/self_learning_rag.py
from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator
from app.infrastructure.external.openai_client import OpenAIClient
from datetime import datetime
import json
import os
class SelfLearningRAG:
"""自己学習型RAGシステム"""
def __init__(
self,
confidence_threshold: float = 0.7,
learning_log_path: str = "data/learning_log.json"
):
self.rag = GraphRAGOrchestrator()
self.llm = OpenAIClient(api_key=os.getenv("OPENAI_API_KEY"))
self.confidence_threshold = confidence_threshold
self.learning_log_path = learning_log_path
def query_with_self_learning(self, question: str) -> dict:
"""自己学習機能付き検索"""
# 1. 通常のRAG検索
result = self.rag.query_with_graph_context(question)
# 2. 信頼度評価
confidence_score = self._evaluate_confidence(
question=question,
answer=result["answer"],
sources=result["sources"],
graph_relations=result["graph_relations"]
)
# 3. 低信頼度の場合は自動学習
if confidence_score < self.confidence_threshold:
self._trigger_learning(question, result, confidence_score)
return {
"answer": result["answer"],
"confidence": confidence_score,
"sources": result["sources"],
"learned": confidence_score < self.confidence_threshold
}
def _evaluate_confidence(
self,
question: str,
answer: str,
sources: list,
graph_relations: list
) -> float:
"""信頼度スコア計算"""
# 1. コサイン類似度(0.4)
similarity_score = self._calculate_similarity(question, sources)
# 2. LLM自己評価(0.3)
self_eval_score = self._llm_self_evaluation(question, answer)
# 3. ソース数スコア(0.2)
source_score = min(len(sources) / 5.0, 1.0)
# 4. グラフ関係性スコア(0.1)
graph_score = min(len(graph_relations) / 3.0, 1.0)
# 重み付け合計
total_score = (
similarity_score * 0.4 +
self_eval_score * 0.3 +
source_score * 0.2 +
graph_score * 0.1
)
return round(total_score, 3)
def _calculate_similarity(self, question: str, sources: list) -> float:
"""質問と情報源の類似度計算"""
if not sources:
return 0.0
# Embeddingを使用した類似度計算
from openai import OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
q_embedding = client.embeddings.create(
model="text-embedding-3-small",
input=question
).data[0].embedding
similarities = []
for source in sources[:3]: # 上位3件で評価
s_embedding = client.embeddings.create(
model="text-embedding-3-small",
input=source.page_content
).data[0].embedding
# コサイン類似度
similarity = self._cosine_similarity(q_embedding, s_embedding)
similarities.append(similarity)
return sum(similarities) / len(similarities) if similarities else 0.0
def _cosine_similarity(self, vec1: list, vec2: list) -> float:
"""コサイン類似度計算"""
import numpy as np
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
def _llm_self_evaluation(self, question: str, answer: str) -> float:
"""LLMによる自己評価"""
prompt = f"""
以下の質問と回答の品質を0.0〜1.0で評価してください。
評価基準:
- 回答の正確性
- 質問への適切な対応
- 情報の完全性
質問: {question}
回答: {answer}
スコアのみを数値で返してください(例: 0.85)
"""
try:
response = self.llm.ask(prompt)
score = float(response.strip())
return max(0.0, min(1.0, score))
except:
return 0.5 # デフォルト値
def _trigger_learning(self, question: str, result: dict, confidence: float):
"""自動学習プロセスの実行"""
learning_entry = {
"timestamp": datetime.utcnow().isoformat(),
"question": question,
"confidence": confidence,
"actions_taken": []
}
# 1. MkDocsドキュメントから関連情報を再取得
new_docs = self._sync_from_mkdocs(question)
if new_docs:
learning_entry["actions_taken"].append("mkdocs_sync")
# 2. 再埋め込み
if confidence < 0.5:
self._re_embed_documents(question, result["sources"])
learning_entry["actions_taken"].append("re_embedding")
# 3. Neo4jグラフ更新
self._update_graph_relations(question, new_docs)
learning_entry["actions_taken"].append("graph_update")
# 学習ログ記録
self._log_learning(learning_entry)
def _sync_from_mkdocs(self, question: str) -> list:
"""MkDocsドキュメントから関連情報を取得"""
import glob
from pathlib import Path
docs_path = os.getenv("MKDOCS_PATH", "docs/")
relevant_docs = []
# キーワード抽出
keywords = self._extract_keywords(question)
# Markdownファイルをスキャン
for md_file in glob.glob(f"{docs_path}/**/*.md", recursive=True):
content = Path(md_file).read_text(encoding="utf-8")
# キーワードマッチング
if any(keyword.lower() in content.lower() for keyword in keywords):
relevant_docs.append({
"file": md_file,
"content": content
})
# 新規文書をRAGに追加
for doc in relevant_docs:
self.rag.vectorstore.add_texts([doc["content"]])
return relevant_docs
def _extract_keywords(self, text: str) -> list:
"""キーワード抽出(簡易版)"""
# 実際にはspaCyやTransformersを使用
import re
words = re.findall(r'\w+', text)
return [w for w in words if len(w) > 3]
def _re_embed_documents(self, question: str, sources: list):
"""文書の再埋め込み"""
for source in sources:
# 既存の埋め込みを更新
self.rag.vectorstore.add_texts(
[source.page_content],
metadatas=[{"re_embedded": True, "trigger_question": question}]
)
def _update_graph_relations(self, question: str, new_docs: list):
"""Neo4jグラフに新しい関係を追加"""
if not new_docs:
return
with self.rag.driver.session() as session:
for doc in new_docs:
# エンティティ抽出(簡易版)
entities = self._extract_entities(doc["content"])
for entity in entities:
session.run(
"""
MERGE (n:Entity {name: $name})
SET n.last_updated = datetime()
""",
name=entity
)
def _extract_entities(self, text: str) -> list:
"""エンティティ抽出(簡易版)"""
# 実際にはNERモデルを使用
import re
# 大文字で始まる単語を抽出
entities = re.findall(r'\b[A-Z][a-z]+\b', text)
return list(set(entities))
def _log_learning(self, entry: dict):
"""学習ログの記録"""
logs = []
if os.path.exists(self.learning_log_path):
with open(self.learning_log_path, 'r', encoding='utf-8') as f:
logs = json.load(f)
logs.append(entry)
# 最新100件のみ保持
logs = logs[-100:]
with open(self.learning_log_path, 'w', encoding='utf-8') as f:
json.dump(logs, f, ensure_ascii=False, indent=2)
def get_learning_stats(self) -> dict:
"""学習統計の取得"""
if not os.path.exists(self.learning_log_path):
return {"total_learnings": 0, "actions": {}}
with open(self.learning_log_path, 'r', encoding='utf-8') as f:
logs = json.load(f)
actions_count = {}
for log in logs:
for action in log.get("actions_taken", []):
actions_count[action] = actions_count.get(action, 0) + 1
return {
"total_learnings": len(logs),
"actions": actions_count,
"last_learning": logs[-1] if logs else None
}
2. スケジューラー(自動同期)¶
# app/schedulers/mkdocs_sync_scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
from app.agents.self_learning_rag import SelfLearningRAG
import logging
logger = logging.getLogger(__name__)
def setup_mkdocs_sync():
"""MkDocs定期同期のセットアップ"""
scheduler = BackgroundScheduler()
rag = SelfLearningRAG()
def sync_job():
"""同期ジョブ"""
logger.info("Starting MkDocs sync...")
try:
# 全ドキュメントをスキャン
docs = rag._sync_from_mkdocs("")
logger.info(f"Synced {len(docs)} documents")
except Exception as e:
logger.error(f"Sync failed: {e}")
# 毎日深夜2時に実行
scheduler.add_job(sync_job, 'cron', hour=2, minute=0)
scheduler.start()
return scheduler
3. FastAPI統合¶
# app/routes/self_learning_routes.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from app.agents.self_learning_rag import SelfLearningRAG
router = APIRouter(prefix="/api/self-learning", tags=["Self-Learning RAG"])
class QueryRequest(BaseModel):
question: str
def get_rag():
return SelfLearningRAG()
@router.post("/query")
async def query_with_learning(
request: QueryRequest,
rag: SelfLearningRAG = Depends(get_rag)
):
"""自己学習型RAG検索"""
result = rag.query_with_self_learning(request.question)
return result
@router.get("/stats")
async def get_stats(rag: SelfLearningRAG = Depends(get_rag)):
"""学習統計取得"""
return rag.get_learning_stats()
@router.post("/manual-sync")
async def manual_sync(rag: SelfLearningRAG = Depends(get_rag)):
"""手動でMkDocs同期を実行"""
docs = rag._sync_from_mkdocs("")
return {"synced_documents": len(docs)}
4. Docker Compose設定¶
version: "3.9"
services:
api:
build: .
environment:
OPENAI_API_KEY: ${OPENAI_API_KEY}
MKDOCS_PATH: /docs
DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
volumes:
- ./docs:/docs:ro # MkDocsドキュメントをマウント
- ./data:/app/data # 学習ログ保存
depends_on:
- postgres
- neo4j
ports:
- "8000:8000"
networks:
- backend
# Cron for scheduled sync
sync-scheduler:
build: .
command: python -m app.schedulers.mkdocs_sync_scheduler
environment:
OPENAI_API_KEY: ${OPENAI_API_KEY}
MKDOCS_PATH: /docs
DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
volumes:
- ./docs:/docs:ro
- ./data:/app/data
depends_on:
- postgres
- neo4j
networks:
- backend
networks:
backend:
監視とモニタリング¶
学習ダッシュボード¶
# app/ui/learning_dashboard.py
import streamlit as st
import pandas as pd
import json
from datetime import datetime
st.title("Self-Learning RAG ダッシュボード")
# 学習ログ読み込み
with open("data/learning_log.json", "r") as f:
logs = json.load(f)
df = pd.DataFrame(logs)
# 統計表示
st.metric("総学習回数", len(logs))
st.metric("平均信頼度", df["confidence"].mean())
# アクション分布
actions = []
for log in logs:
actions.extend(log.get("actions_taken", []))
action_df = pd.DataFrame({"action": actions})
st.bar_chart(action_df["action"].value_counts())
# 時系列グラフ
df["timestamp"] = pd.to_datetime(df["timestamp"])
st.line_chart(df.set_index("timestamp")["confidence"])
パフォーマンス最適化¶
1. キャッシング戦略¶
from functools import lru_cache
import hashlib
class SelfLearningRAG:
@lru_cache(maxsize=200)
def _cached_confidence_eval(self, question_hash: str):
"""信頼度評価をキャッシュ"""
pass
2. バッチ処理¶
def batch_re_embed(documents: list, batch_size: int = 50):
"""バッチでの再埋め込み"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
# バッチ処理
yield batch
ベストプラクティス¶
- 閾値の調整: ドメインに応じて信頼度閾値を調整
- 定期メンテナンス: 古い埋め込みは定期的に更新
- ログ分析: 学習パターンを分析して改善
- 増分学習: 全体再学習ではなく差分更新を優先
- A/Bテスト: 学習前後で回答品質を比較
トラブルシューティング¶
問題1: 過学習¶
# 学習頻度の制限
def _trigger_learning(self, question: str, result: dict, confidence: float):
# 同一質問の短時間内の再学習を防止
if self._recently_learned(question):
return