FastAPI + pgvector によるRAG実装ガイド¶
概要¶
このガイドでは、FastAPIとPostgreSQL (pgvector)を使用したRAG(Retrieval-Augmented Generation)システムの構築方法を詳しく解説します。
対象読者¶
- FastAPIの基本を理解している
- PostgreSQLの基礎知識がある
- RAGシステムを実装したい開発者
学習目標¶
- pgvectorの導入と設定
- FastAPIでのRAG API実装
- Embeddingの生成と保存
- 類似検索の実装
- LLMとの統合
システムアーキテクチャ¶
graph LR
A[ユーザー] --> B[FastAPI]
B --> C{ユースケース層}
C --> D[Embedding Service]
C --> E[Search Service]
C --> F[RAG Service]
D --> G[(PostgreSQL + pgvector)]
E --> G
F --> G
F --> H[OpenAI API]
H --> I[回答生成]
I --> A
環境構築¶
必要なツール¶
- Docker & Docker Compose
- Python 3.11+
- PostgreSQL 16 (pgvector拡張)
- OpenAI API Key
ディレクトリ構成¶
project-root/
├── app/
│ ├── main.py
│ ├── core/
│ │ ├── config.py
│ │ └── database.py
│ ├── models/
│ │ └── document.py
│ ├── services/
│ │ ├── embedding_service.py
│ │ ├── search_service.py
│ │ └── rag_service.py
│ ├── api/
│ │ └── routes.py
│ └── schemas/
│ └── document_schema.py
├── docker-compose.yaml
├── Dockerfile
├── requirements.txt
└── .env
Docker環境のセットアップ¶
docker-compose.yaml¶
version: "3.9"
services:
# PostgreSQL with pgvector
db:
image: ankane/pgvector:latest
container_name: postgres_pgvector
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
# FastAPI Application
api:
build:
context: .
dockerfile: Dockerfile
container_name: fastapi_rag
environment:
- DATABASE_URL=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
- OPENAI_API_KEY=${OPENAI_API_KEY}
ports:
- "8000:8000"
depends_on:
db:
condition: service_healthy
volumes:
- ./app:/app
restart: unless-stopped
volumes:
postgres_data:
.env ファイル¶
# Database
POSTGRES_USER=raguser
POSTGRES_PASSWORD=ragpassword
POSTGRES_DB=ragdb
# OpenAI
OPENAI_API_KEY=sk-your-api-key-here
Dockerfile¶
FROM python:3.11-slim
WORKDIR /app
# 依存関係をコピーしてインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションコードをコピー
COPY ./app /app
# ポート公開
EXPOSE 8000
# アプリケーション起動
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
requirements.txt¶
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
pgvector==0.2.4
pydantic==2.5.0
pydantic-settings==2.1.0
openai==1.3.0
python-dotenv==1.0.0
init.sql¶
-- pgvector拡張を有効化
CREATE EXTENSION IF NOT EXISTS vector;
-- ドキュメントテーブル作成
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1536), -- OpenAI text-embedding-3-small の次元数
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- インデックス作成(HNSW - 高速検索用)
CREATE INDEX IF NOT EXISTS idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops);
-- 全文検索用インデックス
CREATE INDEX IF NOT EXISTS idx_documents_content
ON documents USING gin(to_tsvector('japanese', content));
アプリケーション実装¶
1. 設定ファイル(app/core/config.py)¶
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Database
DATABASE_URL: str
# OpenAI
OPENAI_API_KEY: str
EMBEDDING_MODEL: str = "text-embedding-3-small"
CHAT_MODEL: str = "gpt-4o-mini"
# Application
APP_NAME: str = "RAG API"
DEBUG: bool = True
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()
2. データベース接続(app/core/database.py)¶
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from app.core.config import get_settings
settings = get_settings()
# SQLAlchemyエンジン作成
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_size=10,
max_overflow=20
)
# セッションファクトリー
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# ベースクラス
Base = declarative_base()
def get_db():
"""データベースセッションを取得"""
db = SessionLocal()
try:
yield db
finally:
db.close()
3. モデル定義(app/models/document.py)¶
from sqlalchemy import Column, Integer, String, Text, DateTime, func
from sqlalchemy.dialects.postgresql import JSONB
from pgvector.sqlalchemy import Vector
from app.core.database import Base
class Document(Base):
"""ドキュメントモデル"""
__tablename__ = "documents"
id = Column(Integer, primary_key=True, index=True)
content = Column(Text, nullable=False)
embedding = Column(Vector(1536)) # OpenAI embedding dimension
metadata = Column(JSONB, default={})
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<Document(id={self.id}, content={self.content[:50]}...)>"
4. スキーマ定義(app/schemas/document_schema.py)¶
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List
from datetime import datetime
class DocumentBase(BaseModel):
"""ドキュメントの基本スキーマ"""
content: str = Field(..., min_length=1, description="ドキュメントの内容")
metadata: Optional[Dict[str, Any]] = Field(default={}, description="メタデータ")
class DocumentCreate(DocumentBase):
"""ドキュメント作成用スキーマ"""
pass
class DocumentResponse(DocumentBase):
"""ドキュメントレスポンススキーマ"""
id: int
created_at: datetime
class Config:
from_attributes = True
class SearchRequest(BaseModel):
"""検索リクエストスキーマ"""
query: str = Field(..., min_length=1, description="検索クエリ")
limit: int = Field(default=5, ge=1, le=20, description="取得件数")
threshold: Optional[float] = Field(default=None, description="類似度の閾値")
class SearchResult(BaseModel):
"""検索結果スキーマ"""
id: int
content: str
distance: float
metadata: Dict[str, Any]
class RAGRequest(BaseModel):
"""RAGリクエストスキーマ"""
question: str = Field(..., min_length=1, description="質問")
max_context_docs: int = Field(default=3, ge=1, le=10, description="コンテキストに含める文書数")
class RAGResponse(BaseModel):
"""RAGレスポンススキーマ"""
question: str
answer: str
sources: List[SearchResult]
model: str
5. Embeddingサービス(app/services/embedding_service.py)¶
import openai
from typing import List
import numpy as np
from app.core.config import get_settings
settings = get_settings()
openai.api_key = settings.OPENAI_API_KEY
class EmbeddingService:
"""Embedding生成サービス"""
def __init__(self):
self.model = settings.EMBEDDING_MODEL
def create_embedding(self, text: str) -> List[float]:
"""テキストからEmbeddingを生成
Args:
text: Embedding化するテキスト
Returns:
Embeddingベクトル(1536次元)
"""
try:
response = openai.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
raise RuntimeError(f"Embedding生成エラー: {str(e)}")
def create_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""複数テキストからEmbeddingを一括生成
Args:
texts: Embedding化するテキストのリスト
Returns:
Embeddingベクトルのリスト
"""
try:
response = openai.embeddings.create(
model=self.model,
input=texts
)
return [data.embedding for data in response.data]
except Exception as e:
raise RuntimeError(f"バッチEmbedding生成エラー: {str(e)}")
@staticmethod
def cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
"""コサイン類似度を計算
Args:
vec1: ベクトル1
vec2: ベクトル2
Returns:
コサイン類似度(-1〜1)
"""
vec1_np = np.array(vec1)
vec2_np = np.array(vec2)
dot_product = np.dot(vec1_np, vec2_np)
norm_vec1 = np.linalg.norm(vec1_np)
norm_vec2 = np.linalg.norm(vec2_np)
if norm_vec1 == 0 or norm_vec2 == 0:
return 0.0
return dot_product / (norm_vec1 * norm_vec2)
6. 検索サービス(app/services/search_service.py)¶
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import List, Optional
from app.models.document import Document
from app.services.embedding_service import EmbeddingService
class SearchService:
"""ベクトル検索サービス"""
def __init__(self, db: Session):
self.db = db
self.embedding_service = EmbeddingService()
def search_similar(
self,
query: str,
limit: int = 5,
threshold: Optional[float] = None
) -> List[dict]:
"""類似ドキュメントを検索
Args:
query: 検索クエリ
limit: 取得件数
threshold: 類似度の閾値(Noneの場合は全て返す)
Returns:
検索結果のリスト
"""
# クエリをEmbedding化
query_embedding = self.embedding_service.create_embedding(query)
# pgvectorで類似検索(コサイン距離)
sql = text("""
SELECT
id,
content,
metadata,
embedding <=> :query_vector AS distance
FROM documents
WHERE (:threshold IS NULL OR embedding <=> :query_vector < :threshold)
ORDER BY distance ASC
LIMIT :limit
""")
result = self.db.execute(
sql,
{
"query_vector": query_embedding,
"limit": limit,
"threshold": threshold
}
)
return [
{
"id": row.id,
"content": row.content,
"distance": float(row.distance),
"metadata": row.metadata or {}
}
for row in result
]
def search_by_text(
self,
query: str,
limit: int = 5
) -> List[Document]:
"""全文検索
Args:
query: 検索クエリ
limit: 取得件数
Returns:
検索結果のリスト
"""
sql = text("""
SELECT *
FROM documents
WHERE to_tsvector('japanese', content) @@ plainto_tsquery('japanese', :query)
LIMIT :limit
""")
result = self.db.execute(sql, {"query": query, "limit": limit})
return result.all()
def hybrid_search(
self,
query: str,
limit: int = 5,
vector_weight: float = 0.7
) -> List[dict]:
"""ハイブリッド検索(ベクトル + 全文検索)
Args:
query: 検索クエリ
limit: 取得件数
vector_weight: ベクトル検索の重み(0〜1)
Returns:
検索結果のリスト
"""
# ベクトル検索
vector_results = self.search_similar(query, limit=limit * 2)
# 全文検索
text_results = self.search_by_text(query, limit=limit * 2)
# スコアの正規化と統合
# (実装は簡略化しています)
combined_results = {}
for result in vector_results:
doc_id = result["id"]
score = (1 - result["distance"]) * vector_weight
combined_results[doc_id] = {
"content": result["content"],
"metadata": result["metadata"],
"score": score
}
for result in text_results:
doc_id = result.id
if doc_id in combined_results:
combined_results[doc_id]["score"] += (1 - vector_weight)
else:
combined_results[doc_id] = {
"content": result.content,
"metadata": result.metadata or {},
"score": (1 - vector_weight)
}
# スコアでソート
sorted_results = sorted(
combined_results.items(),
key=lambda x: x[1]["score"],
reverse=True
)[:limit]
return [
{
"id": doc_id,
"content": data["content"],
"score": data["score"],
"metadata": data["metadata"]
}
for doc_id, data in sorted_results
]
7. RAGサービス(app/services/rag_service.py)¶
import openai
from sqlalchemy.orm import Session
from typing import List, Dict, Any
from app.services.search_service import SearchService
from app.core.config import get_settings
settings = get_settings()
openai.api_key = settings.OPENAI_API_KEY
class RAGService:
"""RAG(検索拡張生成)サービス"""
def __init__(self, db: Session):
self.db = db
self.search_service = SearchService(db)
self.chat_model = settings.CHAT_MODEL
def generate_answer(
self,
question: str,
max_context_docs: int = 3
) -> Dict[str, Any]:
"""質問に対する回答を生成
Args:
question: ユーザーの質問
max_context_docs: コンテキストに含める文書数
Returns:
回答と使用したソースのdict
"""
# 関連文書を検索
search_results = self.search_service.search_similar(
query=question,
limit=max_context_docs
)
if not search_results:
return {
"question": question,
"answer": "関連する情報が見つかりませんでした。",
"sources": [],
"model": self.chat_model
}
# コンテキストを構築
context = self._build_context(search_results)
# プロンプトを生成
prompt = self._build_prompt(question, context)
# OpenAI APIで回答生成
try:
response = openai.chat.completions.create(
model=self.chat_model,
messages=[
{
"role": "system",
"content": "あなたは親切なアシスタントです。与えられたコンテキスト情報を基に、正確で分かりやすい回答を提供してください。"
},
{
"role": "user",
"content": prompt
}
],
temperature=0.7,
max_tokens=1000
)
answer = response.choices[0].message.content
return {
"question": question,
"answer": answer,
"sources": search_results,
"model": self.chat_model
}
except Exception as e:
raise RuntimeError(f"回答生成エラー: {str(e)}")
def _build_context(self, search_results: List[dict]) -> str:
"""検索結果からコンテキストを構築
Args:
search_results: 検索結果のリスト
Returns:
コンテキスト文字列
"""
context_parts = []
for i, result in enumerate(search_results, 1):
context_parts.append(
f"[文書{i}]\n{result['content']}\n"
)
return "\n".join(context_parts)
def _build_prompt(self, question: str, context: str) -> str:
"""プロンプトを構築
Args:
question: 質問
context: コンテキスト
Returns:
プロンプト文字列
"""
return f"""以下のコンテキスト情報を参考にして、質問に答えてください。
コンテキストに関連する情報がない場合は、正直にそう伝えてください。
コンテキスト:
{context}
質問: {question}
回答:"""
def stream_answer(
self,
question: str,
max_context_docs: int = 3
):
"""ストリーミング形式で回答を生成
Args:
question: ユーザーの質問
max_context_docs: コンテキストに含める文書数
Yields:
回答のチャンク
"""
# 関連文書を検索
search_results = self.search_service.search_similar(
query=question,
limit=max_context_docs
)
if not search_results:
yield "関連する情報が見つかりませんでした。"
return
# コンテキストとプロンプトを構築
context = self._build_context(search_results)
prompt = self._build_prompt(question, context)
# ストリーミングで回答生成
try:
stream = openai.chat.completions.create(
model=self.chat_model,
messages=[
{
"role": "system",
"content": "あなたは親切なアシスタントです。"
},
{
"role": "user",
"content": prompt
}
],
temperature=0.7,
max_tokens=1000,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"エラーが発生しました: {str(e)}"
8. APIルート(app/api/routes.py)¶
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.core.database import get_db
from app.models.document import Document
from app.schemas.document_schema import (
DocumentCreate,
DocumentResponse,
SearchRequest,
SearchResult,
RAGRequest,
RAGResponse
)
from app.services.embedding_service import EmbeddingService
from app.services.search_service import SearchService
from app.services.rag_service import RAGService
router = APIRouter(prefix="/api", tags=["RAG API"])
@router.post("/documents", response_model=DocumentResponse)
def create_document(
document: DocumentCreate,
db: Session = Depends(get_db)
):
"""ドキュメントを作成してEmbeddingを生成"""
try:
# Embeddingを生成
embedding_service = EmbeddingService()
embedding = embedding_service.create_embedding(document.content)
# ドキュメントを保存
db_document = Document(
content=document.content,
embedding=embedding,
metadata=document.metadata
)
db.add(db_document)
db.commit()
db.refresh(db_document)
return db_document
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@router.get("/documents", response_model=List[DocumentResponse])
def list_documents(
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db)
):
"""ドキュメント一覧を取得"""
documents = db.query(Document).offset(skip).limit(limit).all()
return documents
@router.get("/documents/{document_id}", response_model=DocumentResponse)
def get_document(
document_id: int,
db: Session = Depends(get_db)
):
"""特定のドキュメントを取得"""
document = db.query(Document).filter(Document.id == document_id).first()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
return document
@router.post("/search", response_model=List[SearchResult])
def search_documents(
request: SearchRequest,
db: Session = Depends(get_db)
):
"""ベクトル検索を実行"""
try:
search_service = SearchService(db)
results = search_service.search_similar(
query=request.query,
limit=request.limit,
threshold=request.threshold
)
return results
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/rag", response_model=RAGResponse)
def rag_query(
request: RAGRequest,
db: Session = Depends(get_db)
):
"""RAG方式で質問に回答"""
try:
rag_service = RAGService(db)
result = rag_service.generate_answer(
question=request.question,
max_context_docs=request.max_context_docs
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/rag/stream")
async def rag_query_stream(
request: RAGRequest,
db: Session = Depends(get_db)
):
"""RAG方式でストリーミング回答"""
from fastapi.responses import StreamingResponse
try:
rag_service = RAGService(db)
def generate():
for chunk in rag_service.stream_answer(
question=request.question,
max_context_docs=request.max_context_docs
):
yield chunk
return StreamingResponse(generate(), media_type="text/plain")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
9. メインアプリケーション(app/main.py)¶
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import get_settings
from app.api.routes import router
from app.core.database import engine, Base
settings = get_settings()
# テーブル作成
Base.metadata.create_all(bind=engine)
# FastAPIアプリ作成
app = FastAPI(
title=settings.APP_NAME,
description="FastAPI + pgvector によるRAGシステム",
version="1.0.0",
debug=settings.DEBUG
)
# CORS設定
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ルーター登録
app.include_router(router)
@app.get("/")
def root():
"""ヘルスチェック"""
return {
"message": "RAG API is running",
"status": "ok"
}
@app.get("/health")
def health_check():
"""詳細なヘルスチェック"""
return {
"status": "healthy",
"database": "connected",
"version": "1.0.0"
}
起動と動作確認¶
1. Docker環境の起動¶
2. API動作確認¶
# ヘルスチェック
curl http://localhost:8000/health
# ドキュメント作成
curl -X POST http://localhost:8000/api/documents \
-H "Content-Type: application/json" \
-d '{
"content": "PostgreSQLは強力なオープンソースのリレーショナルデータベース管理システムです。",
"metadata": {"category": "database"}
}'
# 類似検索
curl -X POST http://localhost:8000/api/search \
-H "Content-Type: application/json" \
-d '{
"query": "データベース管理システムについて",
"limit": 5
}'
# RAG質問応答
curl -X POST http://localhost:8000/api/rag \
-H "Content-Type: application/json" \
-d '{
"question": "PostgreSQLの特徴は何ですか?",
"max_context_docs": 3
}'
3. Swagger UI¶
ブラウザで以下のURLにアクセス:
実装例: Pythonクライアント¶
import requests
class RAGClient:
"""RAG APIクライアント"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
def create_document(self, content: str, metadata: dict = None):
"""ドキュメントを作成"""
response = requests.post(
f"{self.base_url}/api/documents",
json={
"content": content,
"metadata": metadata or {}
}
)
return response.json()
def search(self, query: str, limit: int = 5):
"""類似検索"""
response = requests.post(
f"{self.base_url}/api/search",
json={
"query": query,
"limit": limit
}
)
return response.json()
def ask(self, question: str, max_context_docs: int = 3):
"""RAG質問応答"""
response = requests.post(
f"{self.base_url}/api/rag",
json={
"question": question,
"max_context_docs": max_context_docs
}
)
return response.json()
# 使用例
if __name__ == "__main__":
client = RAGClient()
# ドキュメント追加
doc = client.create_document(
content="FastAPIは高速で使いやすいPython製Webフレームワークです。",
metadata={"category": "framework"}
)
print(f"作成したドキュメントID: {doc['id']}")
# 検索
results = client.search("Pythonのフレームワーク")
print(f"検索結果: {len(results)}件")
# 質問応答
answer = client.ask("FastAPIの特徴を教えてください")
print(f"回答: {answer['answer']}")
パフォーマンス最適化¶
インデックスの選択¶
-- IVFFlat インデックス(中規模向け)
CREATE INDEX idx_embedding_ivf ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- HNSW インデックス(大規模向け)
CREATE INDEX idx_embedding_hnsw ON documents
USING hnsw (embedding vector_cosine_ops);
バッチ処理¶
def batch_insert_documents(contents: List[str], batch_size: int = 100):
"""ドキュメントをバッチ挿入"""
embedding_service = EmbeddingService()
for i in range(0, len(contents), batch_size):
batch = contents[i:i + batch_size]
# バッチでEmbedding生成
embeddings = embedding_service.create_embeddings_batch(batch)
# バルク挿入
documents = [
Document(content=content, embedding=embedding)
for content, embedding in zip(batch, embeddings)
]
db.bulk_save_objects(documents)
db.commit()
LangChainとの統合¶
from langchain.vectorstores import PGVector
from langchain.embeddings import OpenAIEmbeddings
# PGVector初期化
embeddings = OpenAIEmbeddings()
vectorstore = PGVector(
connection_string="postgresql://user:pass@localhost:5432/ragdb",
embedding_function=embeddings,
collection_name="documents"
)
# ドキュメント追加
vectorstore.add_texts(
texts=["PostgreSQLは強力なRDBMS"],
metadatas=[{"category": "database"}]
)
# 検索
docs = vectorstore.similarity_search("データベース", k=5)
まとめ¶
このガイドでは、FastAPIとpgvectorを使用したRAGシステムの完全な実装を解説しました。
重要なポイント¶
- pgvector: PostgreSQL拡張として簡単に導入可能
- Embedding: OpenAI APIで高品質なベクトル生成
- 検索: コサイン距離での高速類似検索
- RAG: コンテキストを活用したLLM回答生成
- スケーラビリティ: インデックスとバッチ処理で最適化
次のステップ¶
- 認証機能: JWT認証の追加
- キャッシング: Redisでの検索結果キャッシュ
- モニタリング: Prometheus + Grafanaでの監視
- テスト: pytestでの自動テスト追加
- デプロイ: Kubernetes環境への展開
この実装をベースに、用途に応じてカスタマイズしていくことが可能です。