完全なコードサンプル集¶
概要¶
このドキュメントは、AI実装に必要な全てのコードサンプルを網羅しています。FastAPIルーター、SQL例、Docker設定、テストコードなど、即座に利用できる実装例を提供します。
目次¶
FastAPIルーター実装¶
1. ベクトル検索API¶
# app/routes/vector_routes.py
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from typing import List, Optional
from app.db.database import get_db
from app.services.embedding_service import EmbeddingService
from app.models.document import Document
import logging
router = APIRouter(prefix="/api/vectors", tags=["Vector Search"])
logger = logging.getLogger(__name__)
# === Request/Response Models ===
class EmbedRequest(BaseModel):
"""文書埋め込みリクエスト"""
content: str = Field(..., description="文書内容", min_length=1)
metadata: Optional[dict] = Field(default=None, description="メタデータ")
class Config:
json_schema_extra = {
"example": {
"content": "FastAPIはPython製の高速Webフレームワークです。",
"metadata": {"source": "documentation", "category": "技術"}
}
}
class SearchRequest(BaseModel):
"""ベクトル検索リクエスト"""
query: str = Field(..., description="検索クエリ", min_length=1)
k: int = Field(default=5, ge=1, le=50, description="取得件数")
threshold: Optional[float] = Field(default=None, ge=0.0, le=1.0, description="類似度閾値")
class DocumentResponse(BaseModel):
"""文書レスポンス"""
id: int
content: str
similarity: Optional[float] = None
metadata: Optional[dict] = None
created_at: str
class SearchResponse(BaseModel):
"""検索レスポンス"""
query: str
results: List[DocumentResponse]
count: int
execution_time: float
# === Dependencies ===
def get_embedding_service(db: Session = Depends(get_db)):
"""Embedding Service依存性注入"""
return EmbeddingService(db)
# === Endpoints ===
@router.post("/embed", response_model=dict, status_code=201)
async def embed_document(
request: EmbedRequest,
service: EmbeddingService = Depends(get_embedding_service)
):
"""
文書をEmbeddingしてDBに保存
Args:
request: 文書内容とメタデータ
Returns:
作成された文書のID
"""
try:
doc_id = service.embed_and_save(
content=request.content,
metadata=request.metadata
)
logger.info(f"Document embedded successfully: {doc_id}")
return {
"success": True,
"document_id": doc_id,
"message": "Document embedded and saved"
}
except Exception as e:
logger.error(f"Error embedding document: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/search", response_model=SearchResponse)
async def search_similar(
request: SearchRequest,
service: EmbeddingService = Depends(get_embedding_service)
):
"""
類似文書検索
Args:
request: 検索クエリとパラメータ
Returns:
類似文書のリスト
"""
try:
import time
start_time = time.time()
results = service.search_similar(
query=request.query,
k=request.k,
threshold=request.threshold
)
execution_time = time.time() - start_time
documents = [
DocumentResponse(
id=doc.id,
content=doc.content,
similarity=doc.similarity,
metadata=doc.metadata,
created_at=doc.created_at.isoformat()
)
for doc in results
]
return SearchResponse(
query=request.query,
results=documents,
count=len(documents),
execution_time=execution_time
)
except Exception as e:
logger.error(f"Error searching documents: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/documents/{document_id}", response_model=DocumentResponse)
async def get_document(
document_id: int,
db: Session = Depends(get_db)
):
"""
文書IDから文書を取得
Args:
document_id: 文書ID
Returns:
文書詳細
"""
doc = db.query(Document).filter(Document.id == document_id).first()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
return DocumentResponse(
id=doc.id,
content=doc.content,
metadata=doc.metadata,
created_at=doc.created_at.isoformat()
)
@router.delete("/documents/{document_id}", status_code=204)
async def delete_document(
document_id: int,
db: Session = Depends(get_db)
):
"""
文書削除
Args:
document_id: 文書ID
"""
doc = db.query(Document).filter(Document.id == document_id).first()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
db.delete(doc)
db.commit()
logger.info(f"Document deleted: {document_id}")
@router.get("/stats", response_model=dict)
async def get_stats(db: Session = Depends(get_db)):
"""
統計情報取得
Returns:
文書数、平均埋め込み時間など
"""
from sqlalchemy import func
total_docs = db.query(func.count(Document.id)).scalar()
latest_doc = db.query(func.max(Document.created_at)).scalar()
return {
"total_documents": total_docs,
"latest_document": latest_doc.isoformat() if latest_doc else None
}
2. RAGエンドポイント¶
# app/routes/rag_routes.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional
from app.agents.self_learning_rag import SelfLearningRAG
import logging
router = APIRouter(prefix="/api/rag", tags=["RAG"])
logger = logging.getLogger(__name__)
# === Models ===
class RAGRequest(BaseModel):
"""RAGリクエスト"""
question: str = Field(..., description="質問", min_length=1)
context_k: int = Field(default=5, ge=1, le=20, description="コンテキスト取得件数")
temperature: float = Field(default=0.7, ge=0.0, le=2.0, description="LLM温度")
class RAGResponse(BaseModel):
"""RAG応答"""
question: str
answer: str
sources: List[dict]
confidence: float
learned: bool
execution_time: float
# === Dependencies ===
def get_rag_system():
return SelfLearningRAG()
# === Endpoints ===
@router.post("/query", response_model=RAGResponse)
async def rag_query(
request: RAGRequest,
rag: SelfLearningRAG = Depends(get_rag_system)
):
"""
RAG検索実行
Args:
request: 質問とパラメータ
Returns:
回答、情報源、信頼度
"""
try:
import time
start_time = time.time()
result = rag.query_with_self_learning(request.question)
execution_time = time.time() - start_time
return RAGResponse(
question=request.question,
answer=result["answer"],
sources=[
{"content": src.page_content, "metadata": src.metadata}
for src in result.get("sources", [])
],
confidence=result.get("confidence", 0.0),
learned=result.get("learned", False),
execution_time=execution_time
)
except Exception as e:
logger.error(f"RAG query error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/learning-stats", response_model=dict)
async def get_learning_stats(rag: SelfLearningRAG = Depends(get_rag_system)):
"""学習統計取得"""
return rag.get_learning_stats()
@router.post("/manual-sync", response_model=dict)
async def manual_sync(rag: SelfLearningRAG = Depends(get_rag_system)):
"""手動でMkDocs同期実行"""
try:
docs = rag._sync_from_mkdocs("")
return {
"success": True,
"synced_documents": len(docs),
"message": "Sync completed successfully"
}
except Exception as e:
logger.error(f"Manual sync error: {e}")
raise HTTPException(status_code=500, detail=str(e))
3. 認証API¶
# app/routes/auth_routes.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.db.database import get_db
from app.models.user import User
import os
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
# === Configuration ===
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
# === Models ===
class UserCreate(BaseModel):
"""ユーザー登録"""
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
password: str = Field(..., min_length=8)
class UserResponse(BaseModel):
"""ユーザー応答"""
id: int
email: str
username: str
is_active: bool
created_at: datetime
class Token(BaseModel):
"""トークン"""
access_token: str
token_type: str
class TokenData(BaseModel):
"""トークンデータ"""
username: Optional[str] = None
# === Utilities ===
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""パスワード検証"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""パスワードハッシュ化"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""アクセストークン生成"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
"""現在のユーザー取得"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == token_data.username).first()
if user is None:
raise credentials_exception
return user
# === Endpoints ===
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(user: UserCreate, db: Session = Depends(get_db)):
"""
ユーザー登録
Args:
user: ユーザー情報
Returns:
作成されたユーザー
"""
# 既存ユーザーチェック
existing_user = db.query(User).filter(
(User.email == user.email) | (User.username == user.username)
).first()
if existing_user:
raise HTTPException(
status_code=400,
detail="Email or username already registered"
)
# ユーザー作成
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
username=user.username,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return UserResponse(
id=db_user.id,
email=db_user.email,
username=db_user.username,
is_active=db_user.is_active,
created_at=db_user.created_at
)
@router.post("/token", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""
ログイン・トークン取得
Args:
form_data: ユーザー名とパスワード
Returns:
アクセストークン
"""
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@router.get("/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_user)):
"""
現在のユーザー情報取得
Returns:
ユーザー情報
"""
return UserResponse(
id=current_user.id,
email=current_user.email,
username=current_user.username,
is_active=current_user.is_active,
created_at=current_user.created_at
)
データベース設定¶
1. SQLAlchemy設定¶
# app/db/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
import os
# データベースURL
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://appuser:password@localhost:5432/appdb"
)
# Engine作成
engine = create_engine(
DATABASE_URL,
poolclass=NullPool, # pgvectorとの互換性のため
echo=True if os.getenv("DEBUG") == "1" else False
)
# SessionLocal作成
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
# Base作成
Base = declarative_base()
# 依存性注入
def get_db():
"""データベースセッション取得"""
db = SessionLocal()
try:
yield db
finally:
db.close()
2. モデル定義¶
# app/models/document.py
from sqlalchemy import Column, Integer, Text, DateTime, JSON
from sqlalchemy.sql import func
from pgvector.sqlalchemy import Vector
from app.db.database import Base
class Document(Base):
"""文書モデル"""
__tablename__ = "documents"
id = Column(Integer, primary_key=True, index=True)
content = Column(Text, nullable=False)
embedding = Column(Vector(1536), nullable=False) # OpenAI text-embedding-3-small
metadata = Column(JSON, default={})
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<Document(id={self.id}, content='{self.content[:50]}...')>"
# app/models/user.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from app.db.database import Base
class User(Base):
"""ユーザーモデル"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
3. マイグレーション¶
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
from app.db.database import Base
from app.models import document, user # 全モデルをインポート
import os
# Alembic Config
config = context.config
# ロギング設定
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# メタデータ
target_metadata = Base.metadata
# データベースURL
config.set_main_option("sqlalchemy.url", os.getenv("DATABASE_URL"))
def run_migrations_offline():
"""オフラインマイグレーション"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""オンラインマイグレーション"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
4. 初期化SQL¶
-- migrations/001_init.sql
-- pgvector拡張のインストール
CREATE EXTENSION IF NOT EXISTS vector;
-- documentsテーブル
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1536) NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE
);
-- ベクトルインデックス
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- 全文検索インデックス
CREATE INDEX documents_content_idx ON documents USING gin(to_tsvector('japanese', content));
-- usersテーブル
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
username VARCHAR(50) UNIQUE NOT NULL,
hashed_password VARCHAR(255) NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
is_superuser BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE
);
-- インデックス
CREATE INDEX users_email_idx ON users(email);
CREATE INDEX users_username_idx ON users(username);
Docker構成¶
1. 完全なDocker Compose¶
# docker-compose.yaml
version: "3.9"
services:
# PostgreSQL + pgvector
postgres:
image: ankane/pgvector:pg16
container_name: postgres_db
environment:
POSTGRES_USER: ${POSTGRES_USER:-appuser}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB:-appdb}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
ports:
- "5432:5432"
networks:
- backend
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-appuser}"]
interval: 10s
timeout: 5s
retries: 5
# Neo4j
neo4j:
image: neo4j:5.15.0
container_name: neo4j_db
environment:
NEO4J_AUTH: neo4j/${NEO4J_PASSWORD}
NEO4J_PLUGINS: '["apoc", "graph-data-science"]'
NEO4J_dbms_memory_heap_initial__size: 512m
NEO4J_dbms_memory_heap_max__size: 2G
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
ports:
- "7474:7474" # HTTP
- "7687:7687" # Bolt
networks:
- backend
restart: unless-stopped
# FastAPI Application
api:
build:
context: .
dockerfile: Dockerfile
container_name: fastapi_app
environment:
DATABASE_URL: postgresql+psycopg2://${POSTGRES_USER:-appuser}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB:-appdb}
NEO4J_URI: bolt://neo4j:7687
NEO4J_USER: neo4j
NEO4J_PASSWORD: ${NEO4J_PASSWORD}
OPENAI_API_KEY: ${OPENAI_API_KEY}
SECRET_KEY: ${SECRET_KEY}
MKDOCS_PATH: /docs
volumes:
- ./app:/app/app
- ./docs:/docs:ro
- ./data:/app/data
ports:
- "8000:8000"
depends_on:
postgres:
condition: service_healthy
neo4j:
condition: service_started
networks:
- backend
- web
restart: unless-stopped
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
# Arize Phoenix
phoenix:
image: python:3.11-slim
container_name: phoenix_viz
working_dir: /app
volumes:
- ./phoenix:/app
- ./exports:/exports
ports:
- "6006:6006"
environment:
DATABASE_URL: postgresql://appuser:${POSTGRES_PASSWORD}@postgres:5432/appdb
command: >
bash -c "
pip install arize-phoenix psycopg2-binary pandas numpy &&
python phoenix_server.py
"
depends_on:
- postgres
networks:
- backend
- web
restart: unless-stopped
# MkDocs
mkdocs:
image: squidfunk/mkdocs-material:latest
container_name: mkdocs_docs
volumes:
- ./docs:/docs
- ./mkdocs.yml:/mkdocs.yml
ports:
- "8080:8000"
command: serve -a 0.0.0.0:8000
networks:
- web
restart: unless-stopped
# Grafana
grafana:
image: grafana/grafana:latest
container_name: grafana_monitoring
environment:
GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_PASSWORD}
GF_INSTALL_PLUGINS: grafana-clock-panel
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
ports:
- "3000:3000"
networks:
- backend
- web
restart: unless-stopped
# Loki
loki:
image: grafana/loki:latest
container_name: loki_logs
volumes:
- ./loki/config.yaml:/etc/loki/config.yaml
- loki_data:/loki
ports:
- "3100:3100"
command: -config.file=/etc/loki/config.yaml
networks:
- backend
restart: unless-stopped
volumes:
postgres_data:
neo4j_data:
neo4j_logs:
grafana_data:
loki_data:
networks:
backend:
driver: bridge
web:
driver: bridge
2. Dockerfile¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# システムパッケージのインストール
RUN apt-get update && apt-get install -y \
build-essential \
postgresql-client \
curl \
&& rm -rf /var/lib/apt/lists/*
# Pythonパッケージのインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションコピー
COPY ./app /app/app
# ポート公開
EXPOSE 8000
# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# アプリケーション起動
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
3. requirements.txt¶
# requirements.txt
# FastAPI
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.5.3
pydantic-settings==2.1.0
# Database
sqlalchemy==2.0.25
psycopg2-binary==2.9.9
pgvector==0.2.4
alembic==1.13.1
# Neo4j
neo4j==5.16.0
# LangChain
langchain==0.1.0
langchain-community==0.0.13
langchain-openai==0.0.2
# OpenAI
openai==1.10.0
# Authentication
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
# Utilities
python-dotenv==1.0.0
httpx==0.26.0
aiofiles==23.2.1
# AI/ML
arize-phoenix==2.0.0
numpy==1.26.3
pandas==2.1.4
scikit-learn==1.4.0
umap-learn==0.5.5
# CrewAI
crewai==0.11.0
# Testing
pytest==7.4.4
pytest-asyncio==0.23.3
factory-boy==3.3.0
faker==22.0.0
# Monitoring
structlog==24.1.0
4. .env.example¶
# .env.example
# PostgreSQL
POSTGRES_USER=appuser
POSTGRES_PASSWORD=change-me-in-production
POSTGRES_DB=appdb
# Neo4j
NEO4J_PASSWORD=change-me-in-production
# OpenAI
OPENAI_API_KEY=sk-your-openai-api-key
# Security
SECRET_KEY=your-secret-key-change-in-production
# Grafana
GRAFANA_PASSWORD=admin
# MkDocs
MKDOCS_PATH=./docs
# Application
DEBUG=0
LOG_LEVEL=INFO
テストコード¶
1. テスト設定¶
# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.main import app
from app.db.database import Base, get_db
# テスト用インメモリDB
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
# Fixture: DB
@pytest.fixture
def db():
Base.metadata.create_all(bind=engine)
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
Base.metadata.drop_all(bind=engine)
# Fixture: FastAPI Client
@pytest.fixture
def client(db):
def override_get_db():
try:
yield db
finally:
pass
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as test_client:
yield test_client
# Fixture: Test User
@pytest.fixture
def test_user(db):
from app.models.user import User
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
user = User(
email="test@example.com",
username="testuser",
hashed_password=pwd_context.hash("testpassword123")
)
db.add(user)
db.commit()
db.refresh(user)
return user
2. APIテスト¶
# tests/test_api.py
import pytest
from fastapi import status
def test_health_check(client):
"""ヘルスチェックテスト"""
response = client.get("/health")
assert response.status_code == status.HTTP_200_OK
assert response.json()["status"] == "healthy"
def test_register_user(client):
"""ユーザー登録テスト"""
response = client.post(
"/api/auth/register",
json={
"email": "newuser@example.com",
"username": "newuser",
"password": "password123"
}
)
assert response.status_code == status.HTTP_201_CREATED
data = response.json()
assert data["email"] == "newuser@example.com"
assert data["username"] == "newuser"
def test_login(client, test_user):
"""ログインテスト"""
response = client.post(
"/api/auth/token",
data={
"username": "testuser",
"password": "testpassword123"
}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
def test_embed_document(client, test_user):
"""文書埋め込みテスト"""
# ログイン
login_response = client.post(
"/api/auth/token",
data={"username": "testuser", "password": "testpassword123"}
)
token = login_response.json()["access_token"]
# 文書埋め込み
response = client.post(
"/api/vectors/embed",
json={
"content": "This is a test document.",
"metadata": {"source": "test"}
},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == status.HTTP_201_CREATED
assert response.json()["success"] is True
ユーティリティ¶
1. ロガー設定¶
# app/core/logging_config.py
import logging
import json
from datetime import datetime
from typing import Any, Dict
class JSONFormatter(logging.Formatter):
"""JSON形式のログフォーマッター"""
def format(self, record: logging.LogRecord) -> str:
log_data: Dict[str, Any] = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "request_id"):
log_data["request_id"] = record.request_id
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data, ensure_ascii=False)
def setup_logging():
"""ロギング設定"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
return logger
2. ミドルウェア¶
# app/core/middleware.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
import uuid
logger = logging.getLogger(__name__)
class LoggingMiddleware(BaseHTTPMiddleware):
"""リクエストロギングミドルウェア"""
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())
request.state.request_id = request_id
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.info(
"Request completed",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"process_time": process_time
}
)
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(process_time)
return response
このサンプル集を参考に、プロジェクトの要件に合わせてカスタマイズしてください。