コンテンツにスキップ

LangChain GraphRAG実装ガイド

概要

LangChain GraphRAGは、Neo4jとpgvectorを統合し、知識グラフとベクトル検索を組み合わせた高度な検索拡張生成(RAG)システムです。ConversationalRetrievalChainを使用して、文脈を保持した対話型AIシステムを構築します。

アーキテクチャ

graph TD
    A[ユーザークエリ] --> B[LangChain Orchestrator]
    B --> C[pgvector 類似検索]
    B --> D[Neo4j グラフ検索]
    C --> E[関連文書取得]
    D --> F[関係性情報取得]
    E --> G[LLM コンテキスト統合]
    F --> G
    G --> H[回答生成]
    H --> I[会話履歴保存]
    I --> B

技術スタック

コンポーネント 技術 用途
ベクトル検索 PostgreSQL + pgvector 意味的類似検索
グラフDB Neo4j 概念間の関係性管理
オーケストレーター LangChain RAGワークフロー制御
LLM OpenAI GPT-4 / Claude 回答生成
メモリ ConversationBufferMemory 会話履歴管理

実装

1. 依存関係のインストール

pip install langchain langchain-openai langchain-community
pip install neo4j psycopg2-binary pgvector
pip install openai tiktoken

2. Neo4j + pgvector統合サービス

# app/infrastructure/langchain/graph_rag_orchestrator.py
from neo4j import GraphDatabase
from langchain.vectorstores import PGVector
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import ConversationalRetrievalChain
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from app.infrastructure.db.vector_repository import VectorRepository
import os

class GraphRAGOrchestrator:
    """Neo4j + pgvector統合RAGシステム"""

    def __init__(
        self,
        neo4j_uri: str = "bolt://localhost:7687",
        neo4j_user: str = "neo4j",
        neo4j_password: str = "password",
        postgres_connection: str = None
    ):
        # Neo4jドライバー初期化
        self.driver = GraphDatabase.driver(
            neo4j_uri,
            auth=(neo4j_user, neo4j_password)
        )

        # pgvector初期化
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small",
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        self.vectorstore = PGVector(
            connection_string=postgres_connection or os.getenv("DATABASE_URL"),
            embedding_function=self.embeddings,
            collection_name="documents"
        )

        # LLM初期化
        self.llm = ChatOpenAI(
            model="gpt-4o",
            temperature=0.7,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        # 会話メモリ
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            output_key="answer"
        )

        # ConversationalRetrievalChain構築
        self.qa_chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": 5}),
            memory=self.memory,
            return_source_documents=True
        )

    def query_with_graph_context(self, question: str) -> dict:
        """グラフコンテキストを含めた検索"""

        # 1. ベクトル検索で関連文書取得
        vector_results = self.vectorstore.similarity_search(question, k=5)

        # 2. Neo4jで関係性検索
        graph_context = self._search_graph_relations(question)

        # 3. コンテキストを統合
        enhanced_context = self._merge_contexts(vector_results, graph_context)

        # 4. LLMに質問
        result = self.qa_chain({
            "question": question,
            "context": enhanced_context
        })

        return {
            "answer": result["answer"],
            "sources": result["source_documents"],
            "graph_relations": graph_context,
            "chat_history": self.memory.chat_memory.messages
        }

    def _search_graph_relations(self, query: str) -> list:
        """Neo4jで関連概念を検索"""
        with self.driver.session() as session:
            # エンティティ抽出(簡易版)
            keywords = self._extract_keywords(query)

            cypher_query = """
            MATCH (n)-[r]->(m)
            WHERE n.name IN $keywords OR m.name IN $keywords
            RETURN n.name as source, type(r) as relation, m.name as target
            LIMIT 10
            """

            result = session.run(cypher_query, keywords=keywords)
            return [dict(record) for record in result]

    def _extract_keywords(self, text: str) -> list:
        """キーワード抽出(実際にはNLPライブラリを使用)"""
        # 簡易実装:重要そうな単語を抽出
        words = text.split()
        return [w for w in words if len(w) > 3]

    def _merge_contexts(self, vector_docs: list, graph_relations: list) -> str:
        """ベクトル検索結果とグラフ情報を統合"""
        context_parts = []

        # ベクトル検索結果
        context_parts.append("## 関連文書:")
        for doc in vector_docs:
            context_parts.append(f"- {doc.page_content}")

        # グラフ関係性
        if graph_relations:
            context_parts.append("\n## 関連する概念:")
            for rel in graph_relations:
                context_parts.append(
                    f"- {rel['source']} --[{rel['relation']}]--> {rel['target']}"
                )

        return "\n".join(context_parts)

    def add_document_to_graph(self, content: str, entities: list, relations: list):
        """文書をグラフとベクトルDBの両方に追加"""

        # 1. pgvectorに追加
        self.vectorstore.add_texts([content])

        # 2. Neo4jにエンティティと関係を追加
        with self.driver.session() as session:
            for entity in entities:
                session.run(
                    "MERGE (n:Entity {name: $name, type: $type})",
                    name=entity["name"],
                    type=entity["type"]
                )

            for rel in relations:
                session.run(
                    """
                    MATCH (a:Entity {name: $source})
                    MATCH (b:Entity {name: $target})
                    MERGE (a)-[r:RELATES_TO {type: $rel_type}]->(b)
                    """,
                    source=rel["source"],
                    target=rel["target"],
                    rel_type=rel["type"]
                )

    def close(self):
        """リソース解放"""
        self.driver.close()

3. FastAPI統合

# app/routes/graph_rag_routes.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator

router = APIRouter(prefix="/api/graphrag", tags=["GraphRAG"])

class QueryRequest(BaseModel):
    question: str

class DocumentRequest(BaseModel):
    content: str
    entities: list[dict]
    relations: list[dict]

def get_orchestrator():
    """依存性注入"""
    return GraphRAGOrchestrator()

@router.post("/query")
async def query_graph_rag(
    request: QueryRequest,
    orchestrator: GraphRAGOrchestrator = Depends(get_orchestrator)
):
    """GraphRAG検索エンドポイント"""
    try:
        result = orchestrator.query_with_graph_context(request.question)
        return {
            "success": True,
            "answer": result["answer"],
            "sources": [doc.page_content for doc in result["sources"]],
            "graph_relations": result["graph_relations"]
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/add-document")
async def add_document(
    request: DocumentRequest,
    orchestrator: GraphRAGOrchestrator = Depends(get_orchestrator)
):
    """文書追加エンドポイント"""
    try:
        orchestrator.add_document_to_graph(
            content=request.content,
            entities=request.entities,
            relations=request.relations
        )
        return {"success": True, "message": "Document added successfully"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

4. Docker Compose設定

version: "3.9"

services:
  # PostgreSQL + pgvector
  postgres:
    image: ankane/pgvector:pg16
    environment:
      POSTGRES_USER: appuser
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: vectors
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - backend
    ports:
      - "5432:5432"

  # Neo4j
  neo4j:
    image: neo4j:5.15.0
    environment:
      NEO4J_AUTH: neo4j/${NEO4J_PASSWORD}
      NEO4J_PLUGINS: '["apoc", "graph-data-science"]'
      NEO4J_apoc_export_file_enabled: "true"
      NEO4J_apoc_import_file_enabled: "true"
    volumes:
      - neo4j_data:/data
      - neo4j_logs:/logs
    networks:
      - backend
    ports:
      - "7474:7474"  # HTTP
      - "7687:7687"  # Bolt

  # FastAPI
  api:
    build: .
    environment:
      DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
      NEO4J_URI: bolt://neo4j:7687
      NEO4J_USER: neo4j
      NEO4J_PASSWORD: ${NEO4J_PASSWORD}
      OPENAI_API_KEY: ${OPENAI_API_KEY}
    depends_on:
      - postgres
      - neo4j
    ports:
      - "8000:8000"
    networks:
      - backend
      - web

volumes:
  postgres_data:
  neo4j_data:
  neo4j_logs:

networks:
  backend:
  web:

使用例

基本的な検索

from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator

# 初期化
orchestrator = GraphRAGOrchestrator()

# 検索実行
result = orchestrator.query_with_graph_context(
    "FastAPIとPostgreSQLの連携方法を教えてください"
)

print("回答:", result["answer"])
print("参照元:", result["sources"])
print("関連概念:", result["graph_relations"])

文書とグラフの同時登録

orchestrator.add_document_to_graph(
    content="FastAPIはPython製の高速Webフレームワークです。",
    entities=[
        {"name": "FastAPI", "type": "Framework"},
        {"name": "Python", "type": "Language"}
    ],
    relations=[
        {"source": "FastAPI", "target": "Python", "type": "BUILT_WITH"}
    ]
)

パフォーマンス最適化

1. ベクトルインデックス作成

-- pgvectorのインデックス作成
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

2. Neo4jインデックス

-- エンティティ名にインデックス
CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name);

-- 関係タイプにインデックス
CREATE INDEX relation_type IF NOT EXISTS FOR ()-[r:RELATES_TO]-() ON (r.type);

3. キャッシュ戦略

from functools import lru_cache

class GraphRAGOrchestrator:
    @lru_cache(maxsize=100)
    def _cached_graph_search(self, query_hash: str):
        """グラフ検索結果をキャッシュ"""
        # 実装...
        pass

トラブルシューティング

問題1: Neo4j接続エラー

# Neo4jの起動確認
docker logs neo4j

# 接続テスト
cypher-shell -u neo4j -p password

問題2: pgvector拡張が見つからない

-- 拡張の確認
SELECT * FROM pg_available_extensions WHERE name = 'vector';

-- 拡張のインストール
CREATE EXTENSION IF NOT EXISTS vector;

問題3: メモリ不足

# docker-compose.yamlでメモリ制限を調整
services:
  neo4j:
    deploy:
      resources:
        limits:
          memory: 2G

ベストプラクティス

  1. エンティティ抽出の自動化: spaCyやTransformersを使用
  2. 関係性の階層化: Neo4jで複数レベルの関係を管理
  3. ハイブリッド検索: スコアリングでベクトルとグラフの重み調整
  4. バッチ処理: 大量文書は非同期で処理
  5. モニタリング: Phoenix + Grafanaで検索品質を監視

参考リソース