コンテンツにスキップ

FastAPI + pgvector によるRAG実装ガイド

概要

このガイドでは、FastAPIPostgreSQL (pgvector)を使用したRAG(Retrieval-Augmented Generation)システムの構築方法を詳しく解説します。

対象読者

  • FastAPIの基本を理解している
  • PostgreSQLの基礎知識がある
  • RAGシステムを実装したい開発者

学習目標

  1. pgvectorの導入と設定
  2. FastAPIでのRAG API実装
  3. Embeddingの生成と保存
  4. 類似検索の実装
  5. LLMとの統合

システムアーキテクチャ

graph LR
    A[ユーザー] --> B[FastAPI]
    B --> C{ユースケース層}
    C --> D[Embedding Service]
    C --> E[Search Service]
    C --> F[RAG Service]
    D --> G[(PostgreSQL + pgvector)]
    E --> G
    F --> G
    F --> H[OpenAI API]
    H --> I[回答生成]
    I --> A

環境構築

必要なツール

  • Docker & Docker Compose
  • Python 3.11+
  • PostgreSQL 16 (pgvector拡張)
  • OpenAI API Key

ディレクトリ構成

project-root/
├── app/
│   ├── main.py
│   ├── core/
│   │   ├── config.py
│   │   └── database.py
│   ├── models/
│   │   └── document.py
│   ├── services/
│   │   ├── embedding_service.py
│   │   ├── search_service.py
│   │   └── rag_service.py
│   ├── api/
│   │   └── routes.py
│   └── schemas/
│       └── document_schema.py
├── docker-compose.yaml
├── Dockerfile
├── requirements.txt
└── .env

Docker環境のセットアップ

docker-compose.yaml

version: "3.9"

services:
  # PostgreSQL with pgvector
  db:
    image: ankane/pgvector:latest
    container_name: postgres_pgvector
    environment:
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DB}
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: unless-stopped

  # FastAPI Application
  api:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: fastapi_rag
    environment:
      - DATABASE_URL=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    ports:
      - "8000:8000"
    depends_on:
      db:
        condition: service_healthy
    volumes:
      - ./app:/app
    restart: unless-stopped

volumes:
  postgres_data:

.env ファイル

# Database
POSTGRES_USER=raguser
POSTGRES_PASSWORD=ragpassword
POSTGRES_DB=ragdb

# OpenAI
OPENAI_API_KEY=sk-your-api-key-here

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 依存関係をコピーしてインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# アプリケーションコードをコピー
COPY ./app /app

# ポート公開
EXPOSE 8000

# アプリケーション起動
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

requirements.txt

fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
pgvector==0.2.4
pydantic==2.5.0
pydantic-settings==2.1.0
openai==1.3.0
python-dotenv==1.0.0

init.sql

-- pgvector拡張を有効化
CREATE EXTENSION IF NOT EXISTS vector;

-- ドキュメントテーブル作成
CREATE TABLE IF NOT EXISTS documents (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    embedding vector(1536),  -- OpenAI text-embedding-3-small の次元数
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- インデックス作成(HNSW - 高速検索用)
CREATE INDEX IF NOT EXISTS idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops);

-- 全文検索用インデックス
CREATE INDEX IF NOT EXISTS idx_documents_content
ON documents USING gin(to_tsvector('japanese', content));

アプリケーション実装

1. 設定ファイル(app/core/config.py)

from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    # Database
    DATABASE_URL: str

    # OpenAI
    OPENAI_API_KEY: str
    EMBEDDING_MODEL: str = "text-embedding-3-small"
    CHAT_MODEL: str = "gpt-4o-mini"

    # Application
    APP_NAME: str = "RAG API"
    DEBUG: bool = True

    class Config:
        env_file = ".env"


@lru_cache()
def get_settings() -> Settings:
    return Settings()

2. データベース接続(app/core/database.py)

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from app.core.config import get_settings

settings = get_settings()

# SQLAlchemyエンジン作成
engine = create_engine(
    settings.DATABASE_URL,
    pool_pre_ping=True,
    pool_size=10,
    max_overflow=20
)

# セッションファクトリー
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# ベースクラス
Base = declarative_base()


def get_db():
    """データベースセッションを取得"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

3. モデル定義(app/models/document.py)

from sqlalchemy import Column, Integer, String, Text, DateTime, func
from sqlalchemy.dialects.postgresql import JSONB
from pgvector.sqlalchemy import Vector
from app.core.database import Base


class Document(Base):
    """ドキュメントモデル"""
    __tablename__ = "documents"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(Text, nullable=False)
    embedding = Column(Vector(1536))  # OpenAI embedding dimension
    metadata = Column(JSONB, default={})
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

    def __repr__(self):
        return f"<Document(id={self.id}, content={self.content[:50]}...)>"

4. スキーマ定義(app/schemas/document_schema.py)

from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List
from datetime import datetime


class DocumentBase(BaseModel):
    """ドキュメントの基本スキーマ"""
    content: str = Field(..., min_length=1, description="ドキュメントの内容")
    metadata: Optional[Dict[str, Any]] = Field(default={}, description="メタデータ")


class DocumentCreate(DocumentBase):
    """ドキュメント作成用スキーマ"""
    pass


class DocumentResponse(DocumentBase):
    """ドキュメントレスポンススキーマ"""
    id: int
    created_at: datetime

    class Config:
        from_attributes = True


class SearchRequest(BaseModel):
    """検索リクエストスキーマ"""
    query: str = Field(..., min_length=1, description="検索クエリ")
    limit: int = Field(default=5, ge=1, le=20, description="取得件数")
    threshold: Optional[float] = Field(default=None, description="類似度の閾値")


class SearchResult(BaseModel):
    """検索結果スキーマ"""
    id: int
    content: str
    distance: float
    metadata: Dict[str, Any]


class RAGRequest(BaseModel):
    """RAGリクエストスキーマ"""
    question: str = Field(..., min_length=1, description="質問")
    max_context_docs: int = Field(default=3, ge=1, le=10, description="コンテキストに含める文書数")


class RAGResponse(BaseModel):
    """RAGレスポンススキーマ"""
    question: str
    answer: str
    sources: List[SearchResult]
    model: str

5. Embeddingサービス(app/services/embedding_service.py)

import openai
from typing import List
import numpy as np
from app.core.config import get_settings

settings = get_settings()
openai.api_key = settings.OPENAI_API_KEY


class EmbeddingService:
    """Embedding生成サービス"""

    def __init__(self):
        self.model = settings.EMBEDDING_MODEL

    def create_embedding(self, text: str) -> List[float]:
        """テキストからEmbeddingを生成

        Args:
            text: Embedding化するテキスト

        Returns:
            Embeddingベクトル(1536次元)
        """
        try:
            response = openai.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            raise RuntimeError(f"Embedding生成エラー: {str(e)}")

    def create_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        """複数テキストからEmbeddingを一括生成

        Args:
            texts: Embedding化するテキストのリスト

        Returns:
            Embeddingベクトルのリスト
        """
        try:
            response = openai.embeddings.create(
                model=self.model,
                input=texts
            )
            return [data.embedding for data in response.data]
        except Exception as e:
            raise RuntimeError(f"バッチEmbedding生成エラー: {str(e)}")

    @staticmethod
    def cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
        """コサイン類似度を計算

        Args:
            vec1: ベクトル1
            vec2: ベクトル2

        Returns:
            コサイン類似度(-1〜1)
        """
        vec1_np = np.array(vec1)
        vec2_np = np.array(vec2)

        dot_product = np.dot(vec1_np, vec2_np)
        norm_vec1 = np.linalg.norm(vec1_np)
        norm_vec2 = np.linalg.norm(vec2_np)

        if norm_vec1 == 0 or norm_vec2 == 0:
            return 0.0

        return dot_product / (norm_vec1 * norm_vec2)

6. 検索サービス(app/services/search_service.py)

from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import List, Optional
from app.models.document import Document
from app.services.embedding_service import EmbeddingService


class SearchService:
    """ベクトル検索サービス"""

    def __init__(self, db: Session):
        self.db = db
        self.embedding_service = EmbeddingService()

    def search_similar(
        self,
        query: str,
        limit: int = 5,
        threshold: Optional[float] = None
    ) -> List[dict]:
        """類似ドキュメントを検索

        Args:
            query: 検索クエリ
            limit: 取得件数
            threshold: 類似度の閾値(Noneの場合は全て返す)

        Returns:
            検索結果のリスト
        """
        # クエリをEmbedding化
        query_embedding = self.embedding_service.create_embedding(query)

        # pgvectorで類似検索(コサイン距離)
        sql = text("""
            SELECT
                id,
                content,
                metadata,
                embedding <=> :query_vector AS distance
            FROM documents
            WHERE (:threshold IS NULL OR embedding <=> :query_vector < :threshold)
            ORDER BY distance ASC
            LIMIT :limit
        """)

        result = self.db.execute(
            sql,
            {
                "query_vector": query_embedding,
                "limit": limit,
                "threshold": threshold
            }
        )

        return [
            {
                "id": row.id,
                "content": row.content,
                "distance": float(row.distance),
                "metadata": row.metadata or {}
            }
            for row in result
        ]

    def search_by_text(
        self,
        query: str,
        limit: int = 5
    ) -> List[Document]:
        """全文検索

        Args:
            query: 検索クエリ
            limit: 取得件数

        Returns:
            検索結果のリスト
        """
        sql = text("""
            SELECT *
            FROM documents
            WHERE to_tsvector('japanese', content) @@ plainto_tsquery('japanese', :query)
            LIMIT :limit
        """)

        result = self.db.execute(sql, {"query": query, "limit": limit})
        return result.all()

    def hybrid_search(
        self,
        query: str,
        limit: int = 5,
        vector_weight: float = 0.7
    ) -> List[dict]:
        """ハイブリッド検索(ベクトル + 全文検索)

        Args:
            query: 検索クエリ
            limit: 取得件数
            vector_weight: ベクトル検索の重み(0〜1)

        Returns:
            検索結果のリスト
        """
        # ベクトル検索
        vector_results = self.search_similar(query, limit=limit * 2)

        # 全文検索
        text_results = self.search_by_text(query, limit=limit * 2)

        # スコアの正規化と統合
        # (実装は簡略化しています)
        combined_results = {}

        for result in vector_results:
            doc_id = result["id"]
            score = (1 - result["distance"]) * vector_weight
            combined_results[doc_id] = {
                "content": result["content"],
                "metadata": result["metadata"],
                "score": score
            }

        for result in text_results:
            doc_id = result.id
            if doc_id in combined_results:
                combined_results[doc_id]["score"] += (1 - vector_weight)
            else:
                combined_results[doc_id] = {
                    "content": result.content,
                    "metadata": result.metadata or {},
                    "score": (1 - vector_weight)
                }

        # スコアでソート
        sorted_results = sorted(
            combined_results.items(),
            key=lambda x: x[1]["score"],
            reverse=True
        )[:limit]

        return [
            {
                "id": doc_id,
                "content": data["content"],
                "score": data["score"],
                "metadata": data["metadata"]
            }
            for doc_id, data in sorted_results
        ]

7. RAGサービス(app/services/rag_service.py)

import openai
from sqlalchemy.orm import Session
from typing import List, Dict, Any
from app.services.search_service import SearchService
from app.core.config import get_settings

settings = get_settings()
openai.api_key = settings.OPENAI_API_KEY


class RAGService:
    """RAG(検索拡張生成)サービス"""

    def __init__(self, db: Session):
        self.db = db
        self.search_service = SearchService(db)
        self.chat_model = settings.CHAT_MODEL

    def generate_answer(
        self,
        question: str,
        max_context_docs: int = 3
    ) -> Dict[str, Any]:
        """質問に対する回答を生成

        Args:
            question: ユーザーの質問
            max_context_docs: コンテキストに含める文書数

        Returns:
            回答と使用したソースのdict
        """
        # 関連文書を検索
        search_results = self.search_service.search_similar(
            query=question,
            limit=max_context_docs
        )

        if not search_results:
            return {
                "question": question,
                "answer": "関連する情報が見つかりませんでした。",
                "sources": [],
                "model": self.chat_model
            }

        # コンテキストを構築
        context = self._build_context(search_results)

        # プロンプトを生成
        prompt = self._build_prompt(question, context)

        # OpenAI APIで回答生成
        try:
            response = openai.chat.completions.create(
                model=self.chat_model,
                messages=[
                    {
                        "role": "system",
                        "content": "あなたは親切なアシスタントです。与えられたコンテキスト情報を基に、正確で分かりやすい回答を提供してください。"
                    },
                    {
                        "role": "user",
                        "content": prompt
                    }
                ],
                temperature=0.7,
                max_tokens=1000
            )

            answer = response.choices[0].message.content

            return {
                "question": question,
                "answer": answer,
                "sources": search_results,
                "model": self.chat_model
            }

        except Exception as e:
            raise RuntimeError(f"回答生成エラー: {str(e)}")

    def _build_context(self, search_results: List[dict]) -> str:
        """検索結果からコンテキストを構築

        Args:
            search_results: 検索結果のリスト

        Returns:
            コンテキスト文字列
        """
        context_parts = []
        for i, result in enumerate(search_results, 1):
            context_parts.append(
                f"[文書{i}]\n{result['content']}\n"
            )

        return "\n".join(context_parts)

    def _build_prompt(self, question: str, context: str) -> str:
        """プロンプトを構築

        Args:
            question: 質問
            context: コンテキスト

        Returns:
            プロンプト文字列
        """
        return f"""以下のコンテキスト情報を参考にして、質問に答えてください。
コンテキストに関連する情報がない場合は、正直にそう伝えてください。

コンテキスト:
{context}

質問: {question}

回答:"""

    def stream_answer(
        self,
        question: str,
        max_context_docs: int = 3
    ):
        """ストリーミング形式で回答を生成

        Args:
            question: ユーザーの質問
            max_context_docs: コンテキストに含める文書数

        Yields:
            回答のチャンク
        """
        # 関連文書を検索
        search_results = self.search_service.search_similar(
            query=question,
            limit=max_context_docs
        )

        if not search_results:
            yield "関連する情報が見つかりませんでした。"
            return

        # コンテキストとプロンプトを構築
        context = self._build_context(search_results)
        prompt = self._build_prompt(question, context)

        # ストリーミングで回答生成
        try:
            stream = openai.chat.completions.create(
                model=self.chat_model,
                messages=[
                    {
                        "role": "system",
                        "content": "あなたは親切なアシスタントです。"
                    },
                    {
                        "role": "user",
                        "content": prompt
                    }
                ],
                temperature=0.7,
                max_tokens=1000,
                stream=True
            )

            for chunk in stream:
                if chunk.choices[0].delta.content is not None:
                    yield chunk.choices[0].delta.content

        except Exception as e:
            yield f"エラーが発生しました: {str(e)}"

8. APIルート(app/api/routes.py)

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.core.database import get_db
from app.models.document import Document
from app.schemas.document_schema import (
    DocumentCreate,
    DocumentResponse,
    SearchRequest,
    SearchResult,
    RAGRequest,
    RAGResponse
)
from app.services.embedding_service import EmbeddingService
from app.services.search_service import SearchService
from app.services.rag_service import RAGService

router = APIRouter(prefix="/api", tags=["RAG API"])


@router.post("/documents", response_model=DocumentResponse)
def create_document(
    document: DocumentCreate,
    db: Session = Depends(get_db)
):
    """ドキュメントを作成してEmbeddingを生成"""
    try:
        # Embeddingを生成
        embedding_service = EmbeddingService()
        embedding = embedding_service.create_embedding(document.content)

        # ドキュメントを保存
        db_document = Document(
            content=document.content,
            embedding=embedding,
            metadata=document.metadata
        )
        db.add(db_document)
        db.commit()
        db.refresh(db_document)

        return db_document

    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))


@router.get("/documents", response_model=List[DocumentResponse])
def list_documents(
    skip: int = 0,
    limit: int = 10,
    db: Session = Depends(get_db)
):
    """ドキュメント一覧を取得"""
    documents = db.query(Document).offset(skip).limit(limit).all()
    return documents


@router.get("/documents/{document_id}", response_model=DocumentResponse)
def get_document(
    document_id: int,
    db: Session = Depends(get_db)
):
    """特定のドキュメントを取得"""
    document = db.query(Document).filter(Document.id == document_id).first()
    if not document:
        raise HTTPException(status_code=404, detail="Document not found")
    return document


@router.post("/search", response_model=List[SearchResult])
def search_documents(
    request: SearchRequest,
    db: Session = Depends(get_db)
):
    """ベクトル検索を実行"""
    try:
        search_service = SearchService(db)
        results = search_service.search_similar(
            query=request.query,
            limit=request.limit,
            threshold=request.threshold
        )
        return results

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@router.post("/rag", response_model=RAGResponse)
def rag_query(
    request: RAGRequest,
    db: Session = Depends(get_db)
):
    """RAG方式で質問に回答"""
    try:
        rag_service = RAGService(db)
        result = rag_service.generate_answer(
            question=request.question,
            max_context_docs=request.max_context_docs
        )
        return result

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@router.post("/rag/stream")
async def rag_query_stream(
    request: RAGRequest,
    db: Session = Depends(get_db)
):
    """RAG方式でストリーミング回答"""
    from fastapi.responses import StreamingResponse

    try:
        rag_service = RAGService(db)

        def generate():
            for chunk in rag_service.stream_answer(
                question=request.question,
                max_context_docs=request.max_context_docs
            ):
                yield chunk

        return StreamingResponse(generate(), media_type="text/plain")

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

9. メインアプリケーション(app/main.py)

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import get_settings
from app.api.routes import router
from app.core.database import engine, Base

settings = get_settings()

# テーブル作成
Base.metadata.create_all(bind=engine)

# FastAPIアプリ作成
app = FastAPI(
    title=settings.APP_NAME,
    description="FastAPI + pgvector によるRAGシステム",
    version="1.0.0",
    debug=settings.DEBUG
)

# CORS設定
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ルーター登録
app.include_router(router)


@app.get("/")
def root():
    """ヘルスチェック"""
    return {
        "message": "RAG API is running",
        "status": "ok"
    }


@app.get("/health")
def health_check():
    """詳細なヘルスチェック"""
    return {
        "status": "healthy",
        "database": "connected",
        "version": "1.0.0"
    }

起動と動作確認

1. Docker環境の起動

# プロジェクトディレクトリで実行
docker-compose up -d

# ログ確認
docker-compose logs -f api

2. API動作確認

# ヘルスチェック
curl http://localhost:8000/health

# ドキュメント作成
curl -X POST http://localhost:8000/api/documents \
  -H "Content-Type: application/json" \
  -d '{
    "content": "PostgreSQLは強力なオープンソースのリレーショナルデータベース管理システムです。",
    "metadata": {"category": "database"}
  }'

# 類似検索
curl -X POST http://localhost:8000/api/search \
  -H "Content-Type: application/json" \
  -d '{
    "query": "データベース管理システムについて",
    "limit": 5
  }'

# RAG質問応答
curl -X POST http://localhost:8000/api/rag \
  -H "Content-Type: application/json" \
  -d '{
    "question": "PostgreSQLの特徴は何ですか?",
    "max_context_docs": 3
  }'

3. Swagger UI

ブラウザで以下のURLにアクセス:

http://localhost:8000/docs

実装例: Pythonクライアント

import requests

class RAGClient:
    """RAG APIクライアント"""

    def __init__(self, base_url: str = "http://localhost:8000"):
        self.base_url = base_url

    def create_document(self, content: str, metadata: dict = None):
        """ドキュメントを作成"""
        response = requests.post(
            f"{self.base_url}/api/documents",
            json={
                "content": content,
                "metadata": metadata or {}
            }
        )
        return response.json()

    def search(self, query: str, limit: int = 5):
        """類似検索"""
        response = requests.post(
            f"{self.base_url}/api/search",
            json={
                "query": query,
                "limit": limit
            }
        )
        return response.json()

    def ask(self, question: str, max_context_docs: int = 3):
        """RAG質問応答"""
        response = requests.post(
            f"{self.base_url}/api/rag",
            json={
                "question": question,
                "max_context_docs": max_context_docs
            }
        )
        return response.json()


# 使用例
if __name__ == "__main__":
    client = RAGClient()

    # ドキュメント追加
    doc = client.create_document(
        content="FastAPIは高速で使いやすいPython製Webフレームワークです。",
        metadata={"category": "framework"}
    )
    print(f"作成したドキュメントID: {doc['id']}")

    # 検索
    results = client.search("Pythonのフレームワーク")
    print(f"検索結果: {len(results)}件")

    # 質問応答
    answer = client.ask("FastAPIの特徴を教えてください")
    print(f"回答: {answer['answer']}")

パフォーマンス最適化

インデックスの選択

-- IVFFlat インデックス(中規模向け)
CREATE INDEX idx_embedding_ivf ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- HNSW インデックス(大規模向け)
CREATE INDEX idx_embedding_hnsw ON documents
USING hnsw (embedding vector_cosine_ops);

バッチ処理

def batch_insert_documents(contents: List[str], batch_size: int = 100):
    """ドキュメントをバッチ挿入"""
    embedding_service = EmbeddingService()

    for i in range(0, len(contents), batch_size):
        batch = contents[i:i + batch_size]

        # バッチでEmbedding生成
        embeddings = embedding_service.create_embeddings_batch(batch)

        # バルク挿入
        documents = [
            Document(content=content, embedding=embedding)
            for content, embedding in zip(batch, embeddings)
        ]
        db.bulk_save_objects(documents)
        db.commit()

LangChainとの統合

from langchain.vectorstores import PGVector
from langchain.embeddings import OpenAIEmbeddings

# PGVector初期化
embeddings = OpenAIEmbeddings()
vectorstore = PGVector(
    connection_string="postgresql://user:pass@localhost:5432/ragdb",
    embedding_function=embeddings,
    collection_name="documents"
)

# ドキュメント追加
vectorstore.add_texts(
    texts=["PostgreSQLは強力なRDBMS"],
    metadatas=[{"category": "database"}]
)

# 検索
docs = vectorstore.similarity_search("データベース", k=5)

まとめ

このガイドでは、FastAPIとpgvectorを使用したRAGシステムの完全な実装を解説しました。

重要なポイント

  1. pgvector: PostgreSQL拡張として簡単に導入可能
  2. Embedding: OpenAI APIで高品質なベクトル生成
  3. 検索: コサイン距離での高速類似検索
  4. RAG: コンテキストを活用したLLM回答生成
  5. スケーラビリティ: インデックスとバッチ処理で最適化

次のステップ

  • 認証機能: JWT認証の追加
  • キャッシング: Redisでの検索結果キャッシュ
  • モニタリング: Prometheus + Grafanaでの監視
  • テスト: pytestでの自動テスト追加
  • デプロイ: Kubernetes環境への展開

この実装をベースに、用途に応じてカスタマイズしていくことが可能です。