コンテンツにスキップ

運用とベストプラクティス

概要

このドキュメントは、AI実装システムの運用、バックアップ戦略、セキュリティ対策、トラブルシューティングに関する包括的なガイドです。

目次

  1. バックアップ戦略
  2. セキュリティ対策
  3. パフォーマンス最適化
  4. 監視とモニタリング
  5. トラブルシューティング
  6. デプロイメント

バックアップ戦略

1. PostgreSQL自動バックアップ

設定

# docker-compose.yaml
services:
  db-backup:
    image: postgres:16
    container_name: db_backup
    depends_on:
      - postgres
    environment:
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DB}
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      S3_BUCKET: ${S3_BUCKET_NAME}
    volumes:
      - ./backups:/backups
      - ./scripts/backup.sh:/backup.sh
    entrypoint: >
      bash -c "
        apt-get update && apt-get install -y awscli cron &&
        echo '0 3 * * * /backup.sh' | crontab - &&
        cron -f
      "
    networks:
      - backend

バックアップスクリプト

#!/bin/bash
# scripts/backup.sh

set -e

# 設定
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/backups"
FILENAME="postgres_backup_${TIMESTAMP}.dump"
S3_PATH="s3://${S3_BUCKET}/postgres-backups/"

# ログ
echo "[$(date)] Starting backup..."

# pg_dumpでバックアップ
PGPASSWORD=${POSTGRES_PASSWORD} pg_dump \
    -h postgres \
    -U ${POSTGRES_USER} \
    -Fc \
    ${POSTGRES_DB} \
    > ${BACKUP_DIR}/${FILENAME}

# 圧縮
gzip ${BACKUP_DIR}/${FILENAME}

# S3にアップロード
aws s3 cp ${BACKUP_DIR}/${FILENAME}.gz ${S3_PATH}

# ローカルの古いバックアップ削除(7日以上)
find ${BACKUP_DIR} -type f -mtime +7 -delete

# S3の古いバックアップ削除(30日以上)
aws s3 ls ${S3_PATH} | while read -r line; do
    createDate=$(echo $line|awk '{print $1" "$2}')
    createDate=$(date -d "$createDate" +%s)
    olderThan=$(date -d "30 days ago" +%s)
    if [[ $createDate -lt $olderThan ]]; then
        fileName=$(echo $line|awk '{print $4}')
        if [[ $fileName != "" ]]; then
            aws s3 rm ${S3_PATH}${fileName}
        fi
    fi
done

echo "[$(date)] Backup completed: ${FILENAME}.gz"

復元手順

# バックアップファイルをS3からダウンロード
aws s3 cp s3://your-bucket/postgres-backups/postgres_backup_20250110_030000.dump.gz .

# 解凍
gunzip postgres_backup_20250110_030000.dump.gz

# 復元
docker exec -i postgres psql -U appuser -d postgres -c "DROP DATABASE IF EXISTS appdb;"
docker exec -i postgres psql -U appuser -d postgres -c "CREATE DATABASE appdb;"
docker exec -i postgres pg_restore -U appuser -d appdb < postgres_backup_20250110_030000.dump

2. Neo4jバックアップ

#!/bin/bash
# scripts/neo4j_backup.sh

TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="neo4j_backup_${TIMESTAMP}.dump"

# Neo4jのバックアップ
docker exec neo4j neo4j-admin database dump neo4j \
    --to-path=/backups/${BACKUP_FILE}

# S3にアップロード
docker run --rm \
    -v $(pwd)/backups:/backups \
    -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
    -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
    amazon/aws-cli \
    s3 cp /backups/${BACKUP_FILE} s3://${S3_BUCKET}/neo4j-backups/

echo "Neo4j backup completed: ${BACKUP_FILE}"

3. バックアップ監視

# app/monitoring/backup_checker.py
import boto3
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class BackupMonitor:
    """バックアップ監視"""

    def __init__(self, bucket_name: str):
        self.s3 = boto3.client('s3')
        self.bucket_name = bucket_name

    def check_recent_backup(self, prefix: str, hours: int = 24) -> dict:
        """直近のバックアップ確認"""

        try:
            response = self.s3.list_objects_v2(
                Bucket=self.bucket_name,
                Prefix=prefix
            )

            if 'Contents' not in response:
                return {
                    "status": "error",
                    "message": "No backups found"
                }

            # 最新のバックアップ
            latest = max(response['Contents'], key=lambda x: x['LastModified'])
            age = datetime.now(latest['LastModified'].tzinfo) - latest['LastModified']

            if age > timedelta(hours=hours):
                return {
                    "status": "warning",
                    "message": f"Latest backup is {age.total_seconds() / 3600:.1f} hours old",
                    "last_backup": latest['LastModified'].isoformat()
                }

            return {
                "status": "ok",
                "message": "Recent backup found",
                "last_backup": latest['LastModified'].isoformat(),
                "age_hours": age.total_seconds() / 3600
            }

        except Exception as e:
            logger.error(f"Backup check failed: {e}")
            return {
                "status": "error",
                "message": str(e)
            }

セキュリティ対策

1. データベースセキュリティ

PostgreSQL設定強化

# docker-compose.yaml
services:
  postgres:
    image: ankane/pgvector:pg16
    environment:
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_HOST_AUTH_METHOD: scram-sha-256
      POSTGRES_INITDB_ARGS: "--auth-host=scram-sha-256"
    volumes:
      - ./postgres/postgresql.conf:/etc/postgresql/postgresql.conf
    command: postgres -c config_file=/etc/postgresql/postgresql.conf
    networks:
      - backend  # 外部公開しない
    # ポート公開なし(SSHトンネル経由でのみアクセス)

postgresql.conf

# postgres/postgresql.conf

# 接続設定
listen_addresses = '*'
max_connections = 100

# セキュリティ
ssl = on
ssl_cert_file = '/etc/ssl/certs/server.crt'
ssl_key_file = '/etc/ssl/private/server.key'
password_encryption = scram-sha-256

# ロギング
logging_collector = on
log_destination = 'stderr'
log_directory = '/var/log/postgresql'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_statement = 'all'
log_duration = on
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '

# パフォーマンス
shared_buffers = 256MB
effective_cache_size = 1GB
maintenance_work_mem = 64MB
work_mem = 16MB

2. API認証とJWT

JWT設定

# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
import os

# 設定
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """アクセストークン生成"""
    to_encode = data.copy()

    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode.update({
        "exp": expire,
        "iat": datetime.utcnow(),
        "type": "access"
    })

    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def create_refresh_token(data: dict):
    """リフレッシュトークン生成"""
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)

    to_encode.update({
        "exp": expire,
        "iat": datetime.utcnow(),
        "type": "refresh"
    })

    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str) -> dict:
    """トークン検証"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None

3. レート制限

# app/core/rate_limiter.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from collections import defaultdict
from datetime import datetime, timedelta
import asyncio

class RateLimiter(BaseHTTPMiddleware):
    """レート制限ミドルウェア"""

    def __init__(self, app, requests_per_minute: int = 60):
        super().__init__(app)
        self.requests_per_minute = requests_per_minute
        self.requests = defaultdict(list)
        self.cleanup_task = asyncio.create_task(self._cleanup_old_requests())

    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host

        # 現在時刻
        now = datetime.utcnow()

        # 1分以内のリクエストをカウント
        minute_ago = now - timedelta(minutes=1)
        self.requests[client_ip] = [
            req_time for req_time in self.requests[client_ip]
            if req_time > minute_ago
        ]

        # レート制限チェック
        if len(self.requests[client_ip]) >= self.requests_per_minute:
            raise HTTPException(
                status_code=429,
                detail="Too many requests. Please try again later."
            )

        # リクエスト記録
        self.requests[client_ip].append(now)

        response = await call_next(request)
        return response

    async def _cleanup_old_requests(self):
        """古いリクエスト記録のクリーンアップ"""
        while True:
            await asyncio.sleep(60)  # 1分ごと
            now = datetime.utcnow()
            hour_ago = now - timedelta(hours=1)

            for ip in list(self.requests.keys()):
                self.requests[ip] = [
                    req_time for req_time in self.requests[ip]
                    if req_time > hour_ago
                ]

                if not self.requests[ip]:
                    del self.requests[ip]

4. セキュリティヘッダー

# app/core/security_headers.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """セキュリティヘッダー追加"""

    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        # セキュリティヘッダー
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"

        return response

パフォーマンス最適化

1. データベース最適化

インデックス戦略

-- ベクトル検索インデックス(IVFFlat)
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- HNSW(より高速だがメモリ消費大)
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- 複合インデックス
CREATE INDEX documents_metadata_idx ON documents USING gin(metadata);
CREATE INDEX documents_created_at_idx ON documents(created_at DESC);

-- 部分インデックス
CREATE INDEX active_documents_idx ON documents(id) WHERE metadata->>'active' = 'true';

クエリ最適化

# app/services/optimized_search.py
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.models.document import Document

class OptimizedSearchService:
    """最適化された検索サービス"""

    def __init__(self, db: Session):
        self.db = db

    def search_with_pagination(
        self,
        embedding: list,
        page: int = 1,
        page_size: int = 20,
        threshold: float = 0.7
    ):
        """ページネーション付き検索"""

        # サブクエリで距離計算
        distance_subquery = select(
            Document.id,
            Document.content,
            Document.metadata,
            Document.created_at,
            (1 - func.cosine_distance(Document.embedding, embedding)).label("similarity")
        ).where(
            func.cosine_distance(Document.embedding, embedding) < (1 - threshold)
        ).subquery()

        # ページネーション
        offset = (page - 1) * page_size
        query = select(distance_subquery).order_by(
            distance_subquery.c.similarity.desc()
        ).limit(page_size).offset(offset)

        results = self.db.execute(query).all()

        return {
            "results": results,
            "page": page,
            "page_size": page_size
        }

    def batch_embed(self, documents: list, batch_size: int = 100):
        """バッチ埋め込み"""
        from openai import OpenAI

        client = OpenAI()

        for i in range(0, len(documents), batch_size):
            batch = documents[i:i+batch_size]

            # バッチで埋め込み生成
            response = client.embeddings.create(
                model="text-embedding-3-small",
                input=[doc["content"] for doc in batch]
            )

            # DBに保存
            for j, embedding_data in enumerate(response.data):
                doc = Document(
                    content=batch[j]["content"],
                    embedding=embedding_data.embedding,
                    metadata=batch[j].get("metadata", {})
                )
                self.db.add(doc)

            self.db.commit()

2. キャッシング戦略

# app/core/cache.py
from functools import lru_cache
import hashlib
import json
import redis
from typing import Any, Optional

class CacheManager:
    """キャッシュマネージャー"""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)

    def get(self, key: str) -> Optional[Any]:
        """キャッシュ取得"""
        value = self.redis.get(key)
        if value:
            return json.loads(value)
        return None

    def set(self, key: str, value: Any, ttl: int = 3600):
        """キャッシュ設定"""
        self.redis.setex(
            key,
            ttl,
            json.dumps(value, ensure_ascii=False)
        )

    def delete(self, key: str):
        """キャッシュ削除"""
        self.redis.delete(key)

    def generate_key(self, prefix: str, *args, **kwargs) -> str:
        """キャッシュキー生成"""
        data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
        hash_value = hashlib.md5(data.encode()).hexdigest()
        return f"{prefix}:{hash_value}"

# 使用例
cache = CacheManager()

def cached_search(query: str, k: int = 5):
    """キャッシュ付き検索"""
    cache_key = cache.generate_key("search", query, k)

    # キャッシュチェック
    cached_result = cache.get(cache_key)
    if cached_result:
        return cached_result

    # 検索実行
    result = perform_search(query, k)

    # キャッシュに保存(1時間)
    cache.set(cache_key, result, ttl=3600)

    return result

3. 非同期処理

# app/tasks/async_tasks.py
from celery import Celery
from app.services.embedding_service import EmbeddingService
from app.db.database import SessionLocal

celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task
def embed_document_async(content: str, metadata: dict = None):
    """非同期文書埋め込み"""
    db = SessionLocal()
    try:
        service = EmbeddingService(db)
        doc_id = service.embed_and_save(content, metadata)
        return {"success": True, "document_id": doc_id}
    finally:
        db.close()

@celery_app.task
def bulk_reindex(document_ids: list):
    """バルク再インデックス"""
    db = SessionLocal()
    try:
        service = EmbeddingService(db)
        for doc_id in document_ids:
            service.reindex_document(doc_id)
        return {"success": True, "processed": len(document_ids)}
    finally:
        db.close()

監視とモニタリング

1. Grafana Dashboard設定

# grafana/dashboards/ai-system.json
{
  "dashboard": {
    "title": "AI System Monitoring",
    "panels": [
      {
        "title": "API Response Time",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"
          }
        ]
      },
      {
        "title": "Database Connections",
        "targets": [
          {
            "expr": "pg_stat_activity_count"
          }
        ]
      },
      {
        "title": "Embedding Queue Size",
        "targets": [
          {
            "expr": "celery_queue_length{queue=\"embeddings\"}"
          }
        ]
      }
    ]
  }
}

2. アラート設定

# app/monitoring/alerts.py
import logging
from typing import Callable
import smtplib
from email.mime.text import MIMEText

logger = logging.getLogger(__name__)

class AlertManager:
    """アラート管理"""

    def __init__(self, smtp_host: str, smtp_port: int, email_from: str, email_to: list):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.email_from = email_from
        self.email_to = email_to

    def send_alert(self, subject: str, message: str):
        """アラート送信"""
        try:
            msg = MIMEText(message)
            msg["Subject"] = subject
            msg["From"] = self.email_from
            msg["To"] = ", ".join(self.email_to)

            with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
                server.send_message(msg)

            logger.info(f"Alert sent: {subject}")

        except Exception as e:
            logger.error(f"Failed to send alert: {e}")

    def check_backup_status(self):
        """バックアップステータスチェック"""
        from app.monitoring.backup_checker import BackupMonitor

        monitor = BackupMonitor(bucket_name="your-bucket")
        status = monitor.check_recent_backup("postgres-backups/", hours=24)

        if status["status"] in ["warning", "error"]:
            self.send_alert(
                subject="⚠️ Backup Alert",
                message=f"Backup status: {status['message']}"
            )

    def check_database_health(self):
        """データベースヘルスチェック"""
        from sqlalchemy import text
        from app.db.database import engine

        try:
            with engine.connect() as conn:
                result = conn.execute(text("SELECT 1"))
                result.fetchone()

        except Exception as e:
            self.send_alert(
                subject="🚨 Database Down",
                message=f"Database connection failed: {str(e)}"
            )

トラブルシューティング

1. よくある問題と解決策

問題: pgvector拡張がない

# 確認
docker exec -it postgres psql -U appuser -d appdb -c "SELECT * FROM pg_extension WHERE extname = 'vector';"

# インストール
docker exec -it postgres psql -U appuser -d appdb -c "CREATE EXTENSION IF NOT EXISTS vector;"

問題: メモリ不足

# docker-compose.yaml
services:
  postgres:
    deploy:
      resources:
        limits:
          memory: 2G
        reservations:
          memory: 1G

問題: 接続タイムアウト

# app/db/database.py
engine = create_engine(
    DATABASE_URL,
    pool_size=20,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=3600
)

2. ログ分析

# app/monitoring/log_analyzer.py
import json
import re
from collections import Counter
from datetime import datetime, timedelta

class LogAnalyzer:
    """ログ分析"""

    def analyze_errors(self, log_file: str, hours: int = 24):
        """エラー分析"""
        errors = []
        cutoff = datetime.utcnow() - timedelta(hours=hours)

        with open(log_file, 'r') as f:
            for line in f:
                try:
                    log = json.loads(line)

                    if log.get("level") == "ERROR":
                        log_time = datetime.fromisoformat(log["timestamp"])

                        if log_time > cutoff:
                            errors.append(log)

                except json.JSONDecodeError:
                    continue

        # エラー集計
        error_counts = Counter([e.get("message", "Unknown") for e in errors])

        return {
            "total_errors": len(errors),
            "error_types": dict(error_counts.most_common(10)),
            "recent_errors": errors[-10:]
        }

デプロイメント

1. CI/CD設定

# .github/workflows/deploy.yml
name: Deploy to Production

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v3

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v2

      - name: Login to Container Registry
        uses: docker/login-action@v2
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Build and push
        uses: docker/build-push-action@v4
        with:
          context: .
          push: true
          tags: ghcr.io/${{ github.repository }}:latest

      - name: Deploy to VPS
        uses: appleboy/ssh-action@master
        with:
          host: ${{ secrets.VPS_HOST }}
          username: ${{ secrets.VPS_USER }}
          key: ${{ secrets.VPS_SSH_KEY }}
          script: |
            cd /opt/app
            docker-compose pull
            docker-compose up -d
            docker system prune -f

2. ゼロダウンタイムデプロイ

#!/bin/bash
# scripts/rolling_update.sh

set -e

echo "Starting rolling update..."

# 新しいイメージをpull
docker-compose pull api

# 新しいコンテナを起動
docker-compose up -d --scale api=2 --no-recreate

# ヘルスチェック待機
sleep 10

# 古いコンテナを停止
OLD_CONTAINER=$(docker ps -q -f name=api -f status=running | head -n 1)
docker stop $OLD_CONTAINER

# クリーンアップ
docker system prune -f

echo "Rolling update completed!"

まとめ

このドキュメントで紹介した運用ベストプラクティスを実装することで、安全で信頼性の高いAIシステムを構築・運用できます。

チェックリスト

  • 自動バックアップ設定完了
  • S3へのバックアップアップロード確認
  • セキュリティヘッダー実装
  • JWT認証実装
  • レート制限設定
  • データベースインデックス最適化
  • キャッシング戦略実装
  • Grafana監視ダッシュボード設定
  • アラート設定
  • CI/CDパイプライン構築
  • ドキュメント整備

定期的にこのチェックリストを確認し、システムの健全性を維持してください。