運用とベストプラクティス¶
概要¶
このドキュメントは、AI実装システムの運用、バックアップ戦略、セキュリティ対策、トラブルシューティングに関する包括的なガイドです。
目次¶
バックアップ戦略¶
1. PostgreSQL自動バックアップ¶
設定¶
# docker-compose.yaml
services:
db-backup:
image: postgres:16
container_name: db_backup
depends_on:
- postgres
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
S3_BUCKET: ${S3_BUCKET_NAME}
volumes:
- ./backups:/backups
- ./scripts/backup.sh:/backup.sh
entrypoint: >
bash -c "
apt-get update && apt-get install -y awscli cron &&
echo '0 3 * * * /backup.sh' | crontab - &&
cron -f
"
networks:
- backend
バックアップスクリプト¶
#!/bin/bash
# scripts/backup.sh
set -e
# 設定
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/backups"
FILENAME="postgres_backup_${TIMESTAMP}.dump"
S3_PATH="s3://${S3_BUCKET}/postgres-backups/"
# ログ
echo "[$(date)] Starting backup..."
# pg_dumpでバックアップ
PGPASSWORD=${POSTGRES_PASSWORD} pg_dump \
-h postgres \
-U ${POSTGRES_USER} \
-Fc \
${POSTGRES_DB} \
> ${BACKUP_DIR}/${FILENAME}
# 圧縮
gzip ${BACKUP_DIR}/${FILENAME}
# S3にアップロード
aws s3 cp ${BACKUP_DIR}/${FILENAME}.gz ${S3_PATH}
# ローカルの古いバックアップ削除(7日以上)
find ${BACKUP_DIR} -type f -mtime +7 -delete
# S3の古いバックアップ削除(30日以上)
aws s3 ls ${S3_PATH} | while read -r line; do
createDate=$(echo $line|awk '{print $1" "$2}')
createDate=$(date -d "$createDate" +%s)
olderThan=$(date -d "30 days ago" +%s)
if [[ $createDate -lt $olderThan ]]; then
fileName=$(echo $line|awk '{print $4}')
if [[ $fileName != "" ]]; then
aws s3 rm ${S3_PATH}${fileName}
fi
fi
done
echo "[$(date)] Backup completed: ${FILENAME}.gz"
復元手順¶
# バックアップファイルをS3からダウンロード
aws s3 cp s3://your-bucket/postgres-backups/postgres_backup_20250110_030000.dump.gz .
# 解凍
gunzip postgres_backup_20250110_030000.dump.gz
# 復元
docker exec -i postgres psql -U appuser -d postgres -c "DROP DATABASE IF EXISTS appdb;"
docker exec -i postgres psql -U appuser -d postgres -c "CREATE DATABASE appdb;"
docker exec -i postgres pg_restore -U appuser -d appdb < postgres_backup_20250110_030000.dump
2. Neo4jバックアップ¶
#!/bin/bash
# scripts/neo4j_backup.sh
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="neo4j_backup_${TIMESTAMP}.dump"
# Neo4jのバックアップ
docker exec neo4j neo4j-admin database dump neo4j \
--to-path=/backups/${BACKUP_FILE}
# S3にアップロード
docker run --rm \
-v $(pwd)/backups:/backups \
-e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
-e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
amazon/aws-cli \
s3 cp /backups/${BACKUP_FILE} s3://${S3_BUCKET}/neo4j-backups/
echo "Neo4j backup completed: ${BACKUP_FILE}"
3. バックアップ監視¶
# app/monitoring/backup_checker.py
import boto3
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class BackupMonitor:
"""バックアップ監視"""
def __init__(self, bucket_name: str):
self.s3 = boto3.client('s3')
self.bucket_name = bucket_name
def check_recent_backup(self, prefix: str, hours: int = 24) -> dict:
"""直近のバックアップ確認"""
try:
response = self.s3.list_objects_v2(
Bucket=self.bucket_name,
Prefix=prefix
)
if 'Contents' not in response:
return {
"status": "error",
"message": "No backups found"
}
# 最新のバックアップ
latest = max(response['Contents'], key=lambda x: x['LastModified'])
age = datetime.now(latest['LastModified'].tzinfo) - latest['LastModified']
if age > timedelta(hours=hours):
return {
"status": "warning",
"message": f"Latest backup is {age.total_seconds() / 3600:.1f} hours old",
"last_backup": latest['LastModified'].isoformat()
}
return {
"status": "ok",
"message": "Recent backup found",
"last_backup": latest['LastModified'].isoformat(),
"age_hours": age.total_seconds() / 3600
}
except Exception as e:
logger.error(f"Backup check failed: {e}")
return {
"status": "error",
"message": str(e)
}
セキュリティ対策¶
1. データベースセキュリティ¶
PostgreSQL設定強化¶
# docker-compose.yaml
services:
postgres:
image: ankane/pgvector:pg16
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
POSTGRES_INITDB_ARGS: "--auth-host=scram-sha-256"
volumes:
- ./postgres/postgresql.conf:/etc/postgresql/postgresql.conf
command: postgres -c config_file=/etc/postgresql/postgresql.conf
networks:
- backend # 外部公開しない
# ポート公開なし(SSHトンネル経由でのみアクセス)
postgresql.conf¶
# postgres/postgresql.conf
# 接続設定
listen_addresses = '*'
max_connections = 100
# セキュリティ
ssl = on
ssl_cert_file = '/etc/ssl/certs/server.crt'
ssl_key_file = '/etc/ssl/private/server.key'
password_encryption = scram-sha-256
# ロギング
logging_collector = on
log_destination = 'stderr'
log_directory = '/var/log/postgresql'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_statement = 'all'
log_duration = on
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
# パフォーマンス
shared_buffers = 256MB
effective_cache_size = 1GB
maintenance_work_mem = 64MB
work_mem = 16MB
2. API認証とJWT¶
JWT設定¶
# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
import os
# 設定
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""アクセストークン生成"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict):
"""リフレッシュトークン生成"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "refresh"
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> dict:
"""トークン検証"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
3. レート制限¶
# app/core/rate_limiter.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
class RateLimiter(BaseHTTPMiddleware):
"""レート制限ミドルウェア"""
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.requests = defaultdict(list)
self.cleanup_task = asyncio.create_task(self._cleanup_old_requests())
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
# 現在時刻
now = datetime.utcnow()
# 1分以内のリクエストをカウント
minute_ago = now - timedelta(minutes=1)
self.requests[client_ip] = [
req_time for req_time in self.requests[client_ip]
if req_time > minute_ago
]
# レート制限チェック
if len(self.requests[client_ip]) >= self.requests_per_minute:
raise HTTPException(
status_code=429,
detail="Too many requests. Please try again later."
)
# リクエスト記録
self.requests[client_ip].append(now)
response = await call_next(request)
return response
async def _cleanup_old_requests(self):
"""古いリクエスト記録のクリーンアップ"""
while True:
await asyncio.sleep(60) # 1分ごと
now = datetime.utcnow()
hour_ago = now - timedelta(hours=1)
for ip in list(self.requests.keys()):
self.requests[ip] = [
req_time for req_time in self.requests[ip]
if req_time > hour_ago
]
if not self.requests[ip]:
del self.requests[ip]
4. セキュリティヘッダー¶
# app/core/security_headers.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""セキュリティヘッダー追加"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# セキュリティヘッダー
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
return response
パフォーマンス最適化¶
1. データベース最適化¶
インデックス戦略¶
-- ベクトル検索インデックス(IVFFlat)
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- HNSW(より高速だがメモリ消費大)
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 複合インデックス
CREATE INDEX documents_metadata_idx ON documents USING gin(metadata);
CREATE INDEX documents_created_at_idx ON documents(created_at DESC);
-- 部分インデックス
CREATE INDEX active_documents_idx ON documents(id) WHERE metadata->>'active' = 'true';
クエリ最適化¶
# app/services/optimized_search.py
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.models.document import Document
class OptimizedSearchService:
"""最適化された検索サービス"""
def __init__(self, db: Session):
self.db = db
def search_with_pagination(
self,
embedding: list,
page: int = 1,
page_size: int = 20,
threshold: float = 0.7
):
"""ページネーション付き検索"""
# サブクエリで距離計算
distance_subquery = select(
Document.id,
Document.content,
Document.metadata,
Document.created_at,
(1 - func.cosine_distance(Document.embedding, embedding)).label("similarity")
).where(
func.cosine_distance(Document.embedding, embedding) < (1 - threshold)
).subquery()
# ページネーション
offset = (page - 1) * page_size
query = select(distance_subquery).order_by(
distance_subquery.c.similarity.desc()
).limit(page_size).offset(offset)
results = self.db.execute(query).all()
return {
"results": results,
"page": page,
"page_size": page_size
}
def batch_embed(self, documents: list, batch_size: int = 100):
"""バッチ埋め込み"""
from openai import OpenAI
client = OpenAI()
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
# バッチで埋め込み生成
response = client.embeddings.create(
model="text-embedding-3-small",
input=[doc["content"] for doc in batch]
)
# DBに保存
for j, embedding_data in enumerate(response.data):
doc = Document(
content=batch[j]["content"],
embedding=embedding_data.embedding,
metadata=batch[j].get("metadata", {})
)
self.db.add(doc)
self.db.commit()
2. キャッシング戦略¶
# app/core/cache.py
from functools import lru_cache
import hashlib
import json
import redis
from typing import Any, Optional
class CacheManager:
"""キャッシュマネージャー"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
def get(self, key: str) -> Optional[Any]:
"""キャッシュ取得"""
value = self.redis.get(key)
if value:
return json.loads(value)
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""キャッシュ設定"""
self.redis.setex(
key,
ttl,
json.dumps(value, ensure_ascii=False)
)
def delete(self, key: str):
"""キャッシュ削除"""
self.redis.delete(key)
def generate_key(self, prefix: str, *args, **kwargs) -> str:
"""キャッシュキー生成"""
data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
hash_value = hashlib.md5(data.encode()).hexdigest()
return f"{prefix}:{hash_value}"
# 使用例
cache = CacheManager()
def cached_search(query: str, k: int = 5):
"""キャッシュ付き検索"""
cache_key = cache.generate_key("search", query, k)
# キャッシュチェック
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
# 検索実行
result = perform_search(query, k)
# キャッシュに保存(1時間)
cache.set(cache_key, result, ttl=3600)
return result
3. 非同期処理¶
# app/tasks/async_tasks.py
from celery import Celery
from app.services.embedding_service import EmbeddingService
from app.db.database import SessionLocal
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def embed_document_async(content: str, metadata: dict = None):
"""非同期文書埋め込み"""
db = SessionLocal()
try:
service = EmbeddingService(db)
doc_id = service.embed_and_save(content, metadata)
return {"success": True, "document_id": doc_id}
finally:
db.close()
@celery_app.task
def bulk_reindex(document_ids: list):
"""バルク再インデックス"""
db = SessionLocal()
try:
service = EmbeddingService(db)
for doc_id in document_ids:
service.reindex_document(doc_id)
return {"success": True, "processed": len(document_ids)}
finally:
db.close()
監視とモニタリング¶
1. Grafana Dashboard設定¶
# grafana/dashboards/ai-system.json
{
"dashboard": {
"title": "AI System Monitoring",
"panels": [
{
"title": "API Response Time",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Database Connections",
"targets": [
{
"expr": "pg_stat_activity_count"
}
]
},
{
"title": "Embedding Queue Size",
"targets": [
{
"expr": "celery_queue_length{queue=\"embeddings\"}"
}
]
}
]
}
}
2. アラート設定¶
# app/monitoring/alerts.py
import logging
from typing import Callable
import smtplib
from email.mime.text import MIMEText
logger = logging.getLogger(__name__)
class AlertManager:
"""アラート管理"""
def __init__(self, smtp_host: str, smtp_port: int, email_from: str, email_to: list):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.email_from = email_from
self.email_to = email_to
def send_alert(self, subject: str, message: str):
"""アラート送信"""
try:
msg = MIMEText(message)
msg["Subject"] = subject
msg["From"] = self.email_from
msg["To"] = ", ".join(self.email_to)
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.send_message(msg)
logger.info(f"Alert sent: {subject}")
except Exception as e:
logger.error(f"Failed to send alert: {e}")
def check_backup_status(self):
"""バックアップステータスチェック"""
from app.monitoring.backup_checker import BackupMonitor
monitor = BackupMonitor(bucket_name="your-bucket")
status = monitor.check_recent_backup("postgres-backups/", hours=24)
if status["status"] in ["warning", "error"]:
self.send_alert(
subject="⚠️ Backup Alert",
message=f"Backup status: {status['message']}"
)
def check_database_health(self):
"""データベースヘルスチェック"""
from sqlalchemy import text
from app.db.database import engine
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
result.fetchone()
except Exception as e:
self.send_alert(
subject="🚨 Database Down",
message=f"Database connection failed: {str(e)}"
)
トラブルシューティング¶
1. よくある問題と解決策¶
問題: pgvector拡張がない¶
# 確認
docker exec -it postgres psql -U appuser -d appdb -c "SELECT * FROM pg_extension WHERE extname = 'vector';"
# インストール
docker exec -it postgres psql -U appuser -d appdb -c "CREATE EXTENSION IF NOT EXISTS vector;"
問題: メモリ不足¶
# docker-compose.yaml
services:
postgres:
deploy:
resources:
limits:
memory: 2G
reservations:
memory: 1G
問題: 接続タイムアウト¶
# app/db/database.py
engine = create_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600
)
2. ログ分析¶
# app/monitoring/log_analyzer.py
import json
import re
from collections import Counter
from datetime import datetime, timedelta
class LogAnalyzer:
"""ログ分析"""
def analyze_errors(self, log_file: str, hours: int = 24):
"""エラー分析"""
errors = []
cutoff = datetime.utcnow() - timedelta(hours=hours)
with open(log_file, 'r') as f:
for line in f:
try:
log = json.loads(line)
if log.get("level") == "ERROR":
log_time = datetime.fromisoformat(log["timestamp"])
if log_time > cutoff:
errors.append(log)
except json.JSONDecodeError:
continue
# エラー集計
error_counts = Counter([e.get("message", "Unknown") for e in errors])
return {
"total_errors": len(errors),
"error_types": dict(error_counts.most_common(10)),
"recent_errors": errors[-10:]
}
デプロイメント¶
1. CI/CD設定¶
# .github/workflows/deploy.yml
name: Deploy to Production
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: ghcr.io/${{ github.repository }}:latest
- name: Deploy to VPS
uses: appleboy/ssh-action@master
with:
host: ${{ secrets.VPS_HOST }}
username: ${{ secrets.VPS_USER }}
key: ${{ secrets.VPS_SSH_KEY }}
script: |
cd /opt/app
docker-compose pull
docker-compose up -d
docker system prune -f
2. ゼロダウンタイムデプロイ¶
#!/bin/bash
# scripts/rolling_update.sh
set -e
echo "Starting rolling update..."
# 新しいイメージをpull
docker-compose pull api
# 新しいコンテナを起動
docker-compose up -d --scale api=2 --no-recreate
# ヘルスチェック待機
sleep 10
# 古いコンテナを停止
OLD_CONTAINER=$(docker ps -q -f name=api -f status=running | head -n 1)
docker stop $OLD_CONTAINER
# クリーンアップ
docker system prune -f
echo "Rolling update completed!"
まとめ¶
このドキュメントで紹介した運用ベストプラクティスを実装することで、安全で信頼性の高いAIシステムを構築・運用できます。
チェックリスト¶
- 自動バックアップ設定完了
- S3へのバックアップアップロード確認
- セキュリティヘッダー実装
- JWT認証実装
- レート制限設定
- データベースインデックス最適化
- キャッシング戦略実装
- Grafana監視ダッシュボード設定
- アラート設定
- CI/CDパイプライン構築
- ドキュメント整備
定期的にこのチェックリストを確認し、システムの健全性を維持してください。