LangChain GraphRAG実装ガイド¶
概要¶
LangChain GraphRAGは、Neo4jとpgvectorを統合し、知識グラフとベクトル検索を組み合わせた高度な検索拡張生成(RAG)システムです。ConversationalRetrievalChainを使用して、文脈を保持した対話型AIシステムを構築します。
アーキテクチャ¶
graph TD
A[ユーザークエリ] --> B[LangChain Orchestrator]
B --> C[pgvector 類似検索]
B --> D[Neo4j グラフ検索]
C --> E[関連文書取得]
D --> F[関係性情報取得]
E --> G[LLM コンテキスト統合]
F --> G
G --> H[回答生成]
H --> I[会話履歴保存]
I --> B
技術スタック¶
| コンポーネント | 技術 | 用途 |
|---|---|---|
| ベクトル検索 | PostgreSQL + pgvector | 意味的類似検索 |
| グラフDB | Neo4j | 概念間の関係性管理 |
| オーケストレーター | LangChain | RAGワークフロー制御 |
| LLM | OpenAI GPT-4 / Claude | 回答生成 |
| メモリ | ConversationBufferMemory | 会話履歴管理 |
実装¶
1. 依存関係のインストール¶
pip install langchain langchain-openai langchain-community
pip install neo4j psycopg2-binary pgvector
pip install openai tiktoken
2. Neo4j + pgvector統合サービス¶
# app/infrastructure/langchain/graph_rag_orchestrator.py
from neo4j import GraphDatabase
from langchain.vectorstores import PGVector
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import ConversationalRetrievalChain
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from app.infrastructure.db.vector_repository import VectorRepository
import os
class GraphRAGOrchestrator:
"""Neo4j + pgvector統合RAGシステム"""
def __init__(
self,
neo4j_uri: str = "bolt://localhost:7687",
neo4j_user: str = "neo4j",
neo4j_password: str = "password",
postgres_connection: str = None
):
# Neo4jドライバー初期化
self.driver = GraphDatabase.driver(
neo4j_uri,
auth=(neo4j_user, neo4j_password)
)
# pgvector初期化
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=os.getenv("OPENAI_API_KEY")
)
self.vectorstore = PGVector(
connection_string=postgres_connection or os.getenv("DATABASE_URL"),
embedding_function=self.embeddings,
collection_name="documents"
)
# LLM初期化
self.llm = ChatOpenAI(
model="gpt-4o",
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
# 会話メモリ
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
# ConversationalRetrievalChain構築
self.qa_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 5}),
memory=self.memory,
return_source_documents=True
)
def query_with_graph_context(self, question: str) -> dict:
"""グラフコンテキストを含めた検索"""
# 1. ベクトル検索で関連文書取得
vector_results = self.vectorstore.similarity_search(question, k=5)
# 2. Neo4jで関係性検索
graph_context = self._search_graph_relations(question)
# 3. コンテキストを統合
enhanced_context = self._merge_contexts(vector_results, graph_context)
# 4. LLMに質問
result = self.qa_chain({
"question": question,
"context": enhanced_context
})
return {
"answer": result["answer"],
"sources": result["source_documents"],
"graph_relations": graph_context,
"chat_history": self.memory.chat_memory.messages
}
def _search_graph_relations(self, query: str) -> list:
"""Neo4jで関連概念を検索"""
with self.driver.session() as session:
# エンティティ抽出(簡易版)
keywords = self._extract_keywords(query)
cypher_query = """
MATCH (n)-[r]->(m)
WHERE n.name IN $keywords OR m.name IN $keywords
RETURN n.name as source, type(r) as relation, m.name as target
LIMIT 10
"""
result = session.run(cypher_query, keywords=keywords)
return [dict(record) for record in result]
def _extract_keywords(self, text: str) -> list:
"""キーワード抽出(実際にはNLPライブラリを使用)"""
# 簡易実装:重要そうな単語を抽出
words = text.split()
return [w for w in words if len(w) > 3]
def _merge_contexts(self, vector_docs: list, graph_relations: list) -> str:
"""ベクトル検索結果とグラフ情報を統合"""
context_parts = []
# ベクトル検索結果
context_parts.append("## 関連文書:")
for doc in vector_docs:
context_parts.append(f"- {doc.page_content}")
# グラフ関係性
if graph_relations:
context_parts.append("\n## 関連する概念:")
for rel in graph_relations:
context_parts.append(
f"- {rel['source']} --[{rel['relation']}]--> {rel['target']}"
)
return "\n".join(context_parts)
def add_document_to_graph(self, content: str, entities: list, relations: list):
"""文書をグラフとベクトルDBの両方に追加"""
# 1. pgvectorに追加
self.vectorstore.add_texts([content])
# 2. Neo4jにエンティティと関係を追加
with self.driver.session() as session:
for entity in entities:
session.run(
"MERGE (n:Entity {name: $name, type: $type})",
name=entity["name"],
type=entity["type"]
)
for rel in relations:
session.run(
"""
MATCH (a:Entity {name: $source})
MATCH (b:Entity {name: $target})
MERGE (a)-[r:RELATES_TO {type: $rel_type}]->(b)
""",
source=rel["source"],
target=rel["target"],
rel_type=rel["type"]
)
def close(self):
"""リソース解放"""
self.driver.close()
3. FastAPI統合¶
# app/routes/graph_rag_routes.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator
router = APIRouter(prefix="/api/graphrag", tags=["GraphRAG"])
class QueryRequest(BaseModel):
question: str
class DocumentRequest(BaseModel):
content: str
entities: list[dict]
relations: list[dict]
def get_orchestrator():
"""依存性注入"""
return GraphRAGOrchestrator()
@router.post("/query")
async def query_graph_rag(
request: QueryRequest,
orchestrator: GraphRAGOrchestrator = Depends(get_orchestrator)
):
"""GraphRAG検索エンドポイント"""
try:
result = orchestrator.query_with_graph_context(request.question)
return {
"success": True,
"answer": result["answer"],
"sources": [doc.page_content for doc in result["sources"]],
"graph_relations": result["graph_relations"]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/add-document")
async def add_document(
request: DocumentRequest,
orchestrator: GraphRAGOrchestrator = Depends(get_orchestrator)
):
"""文書追加エンドポイント"""
try:
orchestrator.add_document_to_graph(
content=request.content,
entities=request.entities,
relations=request.relations
)
return {"success": True, "message": "Document added successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
4. Docker Compose設定¶
version: "3.9"
services:
# PostgreSQL + pgvector
postgres:
image: ankane/pgvector:pg16
environment:
POSTGRES_USER: appuser
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: vectors
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- backend
ports:
- "5432:5432"
# Neo4j
neo4j:
image: neo4j:5.15.0
environment:
NEO4J_AUTH: neo4j/${NEO4J_PASSWORD}
NEO4J_PLUGINS: '["apoc", "graph-data-science"]'
NEO4J_apoc_export_file_enabled: "true"
NEO4J_apoc_import_file_enabled: "true"
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
networks:
- backend
ports:
- "7474:7474" # HTTP
- "7687:7687" # Bolt
# FastAPI
api:
build: .
environment:
DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
NEO4J_URI: bolt://neo4j:7687
NEO4J_USER: neo4j
NEO4J_PASSWORD: ${NEO4J_PASSWORD}
OPENAI_API_KEY: ${OPENAI_API_KEY}
depends_on:
- postgres
- neo4j
ports:
- "8000:8000"
networks:
- backend
- web
volumes:
postgres_data:
neo4j_data:
neo4j_logs:
networks:
backend:
web:
使用例¶
基本的な検索¶
from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator
# 初期化
orchestrator = GraphRAGOrchestrator()
# 検索実行
result = orchestrator.query_with_graph_context(
"FastAPIとPostgreSQLの連携方法を教えてください"
)
print("回答:", result["answer"])
print("参照元:", result["sources"])
print("関連概念:", result["graph_relations"])
文書とグラフの同時登録¶
orchestrator.add_document_to_graph(
content="FastAPIはPython製の高速Webフレームワークです。",
entities=[
{"name": "FastAPI", "type": "Framework"},
{"name": "Python", "type": "Language"}
],
relations=[
{"source": "FastAPI", "target": "Python", "type": "BUILT_WITH"}
]
)
パフォーマンス最適化¶
1. ベクトルインデックス作成¶
-- pgvectorのインデックス作成
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
2. Neo4jインデックス¶
-- エンティティ名にインデックス
CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name);
-- 関係タイプにインデックス
CREATE INDEX relation_type IF NOT EXISTS FOR ()-[r:RELATES_TO]-() ON (r.type);
3. キャッシュ戦略¶
from functools import lru_cache
class GraphRAGOrchestrator:
@lru_cache(maxsize=100)
def _cached_graph_search(self, query_hash: str):
"""グラフ検索結果をキャッシュ"""
# 実装...
pass
トラブルシューティング¶
問題1: Neo4j接続エラー¶
問題2: pgvector拡張が見つからない¶
-- 拡張の確認
SELECT * FROM pg_available_extensions WHERE name = 'vector';
-- 拡張のインストール
CREATE EXTENSION IF NOT EXISTS vector;
問題3: メモリ不足¶
ベストプラクティス¶
- エンティティ抽出の自動化: spaCyやTransformersを使用
- 関係性の階層化: Neo4jで複数レベルの関係を管理
- ハイブリッド検索: スコアリングでベクトルとグラフの重み調整
- バッチ処理: 大量文書は非同期で処理
- モニタリング: Phoenix + Grafanaで検索品質を監視