コンテンツにスキップ

ログ・監視設定完全ガイド

概要

本プロジェクトは、構造化ログ可観測性(Observability)を重視し、Loki + Grafanaによる集中ログ管理を実現します。

アーキテクチャ

graph LR
    subgraph Application
        API[FastAPI<br/>JSON Logger]
        MIDDLEWARE[Logging Middleware<br/>Request ID付与]
    end

    subgraph Log Collection
        FILE[/var/log/app/<br/>fastapi.log]
        PROMTAIL[Promtail<br/>Log Collector]
    end

    subgraph Log Storage
        LOKI[Loki<br/>Log Aggregation]
    end

    subgraph Visualization
        GRAFANA[Grafana<br/>Dashboard]
    end

    API --> MIDDLEWARE
    MIDDLEWARE --> FILE
    FILE --> PROMTAIL
    PROMTAIL --> LOKI
    LOKI --> GRAFANA

    style API fill:#87CEEB
    style LOKI fill:#FFB6C1
    style GRAFANA fill:#90EE90

ログ設計の原則

構造化ログの必須フィールド

フィールド 説明
timestamp ログ発生時刻(ISO8601) 2025-01-15T10:30:45.123Z
level ログレベル INFO, WARNING, ERROR
request_id リクエスト追跡ID(UUID) 7b3f5c2a-1234-5678-9abc-def012345678
event イベント種別 request, response, error
service サービス名 fastapi_app
user_id ユーザーID(認証済みの場合) 12345
method HTTPメソッド GET, POST
url リクエストURL /api/users/1
status_code HTTPステータス 200, 404, 500
duration_ms 処理時間(ミリ秒) 123.45
error エラー情報 ValueError: User not found

FastAPI ログミドルウェア実装

app/core/logging.py

# app/core/logging.py
import logging
import json
import time
import uuid
from typing import Callable
from datetime import datetime

from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.datastructures import Headers
from pythonjsonlogger import jsonlogger

# ============================================================
# ログ設定
# ============================================================

LOG_FILE = "logs/fastapi.log"

# JSONフォーマッタ設定
class CustomJsonFormatter(jsonlogger.JsonFormatter):
    """カスタムJSONフォーマッタ"""

    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)

        # タイムスタンプをISO8601形式に
        if not log_record.get('timestamp'):
            log_record['timestamp'] = datetime.utcnow().isoformat() + 'Z'

        # ログレベル
        if log_record.get('level'):
            log_record['level'] = log_record['level'].upper()
        else:
            log_record['level'] = record.levelname

        # サービス名
        log_record['service'] = 'fastapi_app'


# ロガー設定
logger = logging.getLogger("fastapi_json")
logger.setLevel(logging.INFO)

# ファイルハンドラ
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setLevel(logging.INFO)

# JSONフォーマッタ適用
formatter = CustomJsonFormatter(
    '%(timestamp)s %(level)s %(name)s %(message)s'
)
file_handler.setFormatter(formatter)

# コンソールハンドラ(開発用)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)

# ハンドラ追加
logger.addHandler(file_handler)
logger.addHandler(console_handler)


# ============================================================
# リクエストID管理
# ============================================================

class RequestContextFilter(logging.Filter):
    """リクエストコンテキストをログに追加"""

    def filter(self, record):
        # request_idがない場合はデフォルト値
        if not hasattr(record, "request_id"):
            record.request_id = "-"
        if not hasattr(record, "user_id"):
            record.user_id = "-"
        return True


logger.addFilter(RequestContextFilter())


# ============================================================
# ログミドルウェア
# ============================================================

class LoggingMiddleware(BaseHTTPMiddleware):
    """
    FastAPIログミドルウェア

    機能:
    - リクエストID自動付与
    - リクエスト/レスポンスの完全ログ記録
    - 処理時間計測
    - ユーザーID追跡(認証済みの場合)
    """

    async def dispatch(self, request: Request, call_next: Callable) -> Response:
        # ============================================================
        # リクエスト処理開始
        # ============================================================
        request_id = str(uuid.uuid4())
        request.state.request_id = request_id

        # 開始時刻
        start_time = time.time()

        # ユーザーID取得(認証済みの場合)
        user_id = getattr(request.state, "user_id", None)

        # リクエストボディ取得
        body = await self._get_request_body(request)

        # リクエストログ
        logger.info(
            "Request received",
            extra={
                "request_id": request_id,
                "user_id": user_id,
                "event": "request",
                "method": request.method,
                "url": str(request.url),
                "path": request.url.path,
                "query_params": dict(request.query_params),
                "headers": self._sanitize_headers(request.headers),
                "client_ip": request.client.host if request.client else None,
                "body": body
            }
        )

        # ============================================================
        # アプリケーション処理
        # ============================================================
        try:
            response = await call_next(request)
        except Exception as exc:
            # エラーログ
            duration_ms = (time.time() - start_time) * 1000
            logger.error(
                f"Request failed: {str(exc)}",
                extra={
                    "request_id": request_id,
                    "user_id": user_id,
                    "event": "error",
                    "method": request.method,
                    "url": str(request.url),
                    "duration_ms": round(duration_ms, 2),
                    "error_type": type(exc).__name__,
                    "error_message": str(exc)
                },
                exc_info=True
            )
            raise

        # ============================================================
        # レスポンス処理
        # ============================================================
        duration_ms = (time.time() - start_time) * 1000

        # レスポンスボディ取得
        response_body = await self._get_response_body(response)

        # レスポンスログ
        logger.info(
            "Request completed",
            extra={
                "request_id": request_id,
                "user_id": user_id,
                "event": "response",
                "method": request.method,
                "url": str(request.url),
                "status_code": response.status_code,
                "duration_ms": round(duration_ms, 2),
                "response_body": response_body
            }
        )

        # カスタムヘッダーにリクエストID追加
        response.headers["X-Request-ID"] = request_id

        return response

    # ============================================================
    # ヘルパーメソッド
    # ============================================================

    async def _get_request_body(self, request: Request) -> str:
        """リクエストボディを取得"""
        try:
            body = await request.body()
            if body:
                # JSON解析を試みる
                try:
                    return json.loads(body.decode("utf-8"))
                except json.JSONDecodeError:
                    return body.decode("utf-8", errors="ignore")
            return ""
        except Exception:
            return "<unreadable>"

    async def _get_response_body(self, response: Response) -> str:
        """レスポンスボディを取得"""
        try:
            response_body = b""
            async for chunk in response.body_iterator:
                response_body += chunk

            # ボディを再設定(消費したため)
            response.body_iterator = self._iterate_in_chunks(response_body)

            # JSON解析を試みる
            try:
                return json.loads(response_body.decode("utf-8"))
            except json.JSONDecodeError:
                return response_body.decode("utf-8", errors="ignore")
        except Exception:
            return "<unreadable>"

    @staticmethod
    async def _iterate_in_chunks(body: bytes):
        """ボディをチャンクで返すジェネレータ"""
        yield body

    def _sanitize_headers(self, headers: Headers) -> dict:
        """
        ヘッダーをサニタイズ(機密情報を除外)

        除外するヘッダー:
        - Authorization
        - Cookie
        - X-API-Key
        """
        sanitized = dict(headers)
        sensitive_headers = ["authorization", "cookie", "x-api-key"]

        for header in sensitive_headers:
            if header in sanitized:
                sanitized[header] = "***REDACTED***"

        return sanitized


# ============================================================
# アプリケーション固有のロガー
# ============================================================

def get_logger(name: str) -> logging.Logger:
    """
    アプリケーション固有のロガーを取得

    Usage:
        logger = get_logger(__name__)
        logger.info("User created", extra={"user_id": 123})
    """
    app_logger = logging.getLogger(name)
    app_logger.setLevel(logging.INFO)
    app_logger.addHandler(file_handler)
    app_logger.addHandler(console_handler)
    app_logger.addFilter(RequestContextFilter())
    return app_logger

FastAPIアプリケーションへの組み込み

# app/main.py
from fastapi import FastAPI
from app.core.logging import LoggingMiddleware, get_logger
from app.interfaces.routers import user_router

# ロガー取得
logger = get_logger(__name__)

# FastAPIアプリケーション
app = FastAPI(
    title="DDD FastAPI Template",
    version="1.0.0"
)

# ログミドルウェア追加
app.add_middleware(LoggingMiddleware)

# ルーター登録
app.include_router(user_router.router)


@app.get("/health")
async def health_check():
    """ヘルスチェック"""
    logger.info("Health check requested")
    return {"status": "ok"}


@app.on_event("startup")
async def startup_event():
    """起動時処理"""
    logger.info("Application starting up")


@app.on_event("shutdown")
async def shutdown_event():
    """シャットダウン時処理"""
    logger.info("Application shutting down")

ビジネスロジック内でのログ記録

# app/application/user_summary_usecase.py
from app.core.logging import get_logger

logger = get_logger(__name__)


class UserSummaryUseCase:
    """ユーザーサマリーユースケース"""

    def execute(self, user_id: int) -> dict:
        """
        ユーザーサマリー取得

        Args:
            user_id: ユーザーID

        Returns:
            ユーザーサマリー情報
        """
        logger.info(
            "Fetching user summary",
            extra={"user_id": user_id, "event": "usecase_start"}
        )

        try:
            # ビジネスロジック実行
            profile = self.corporate_repo.find_by_id(user_id)

            if not profile:
                logger.warning(
                    f"User not found: {user_id}",
                    extra={"user_id": user_id, "event": "user_not_found"}
                )
                raise ValueError(f"User {user_id} not found")

            auth_info = self.auth_repo.find_by_id(user_id)

            result = {
                "user_id": profile.id,
                "name": profile.name,
                "email": auth_info.email if auth_info else profile.email,
                "is_premium": profile.is_premium_user()
            }

            logger.info(
                "User summary fetched successfully",
                extra={
                    "user_id": user_id,
                    "event": "usecase_success",
                    "is_premium": result["is_premium"]
                }
            )

            return result

        except Exception as e:
            logger.error(
                f"Error fetching user summary: {str(e)}",
                extra={
                    "user_id": user_id,
                    "event": "usecase_error",
                    "error_type": type(e).__name__
                },
                exc_info=True
            )
            raise

Loki + Grafana 設定

Loki設定

# observability/loki/local-config.yaml
auth_enabled: false

server:
  http_listen_port: 3100
  grpc_listen_port: 9096

common:
  instance_addr: 127.0.0.1
  path_prefix: /loki
  storage:
    filesystem:
      chunks_directory: /loki/chunks
      rules_directory: /loki/rules
  replication_factor: 1
  ring:
    kvstore:
      store: inmemory

query_range:
  results_cache:
    cache:
      embedded_cache:
        enabled: true
        max_size_mb: 100

schema_config:
  configs:
    - from: 2024-01-01
      store: tsdb
      object_store: filesystem
      schema: v13
      index:
        prefix: index_
        period: 24h

ruler:
  alertmanager_url: http://localhost:9093

# 保持期間設定(7日間)
limits_config:
  retention_period: 168h

# コンパクション設定
compactor:
  working_directory: /loki/compactor
  compaction_interval: 10m
  retention_enabled: true
  retention_delete_delay: 2h
  retention_delete_worker_count: 150

Promtail設定

# observability/promtail/config.yml
server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  # FastAPIログ収集
  - job_name: fastapi
    static_configs:
      - targets:
          - localhost
        labels:
          job: fastapi
          service: fastapi_app
          environment: production
          __path__: /var/log/app/*.log

    # JSON解析パイプライン
    pipeline_stages:
      # JSONパース
      - json:
          expressions:
            timestamp: timestamp
            level: level
            service: service
            request_id: request_id
            user_id: user_id
            event: event
            method: method
            url: url
            status_code: status_code
            duration_ms: duration_ms
            error: error

      # タイムスタンプ抽出
      - timestamp:
          source: timestamp
          format: RFC3339

      # ラベル追加
      - labels:
          level:
          service:
          event:
          request_id:

      # 出力フィルター(ERRORのみ別ストリーム)
      - match:
          selector: '{level="ERROR"}'
          stages:
            - labels:
                severity: error

Grafana ダッシュボード設定

{
  "dashboard": {
    "title": "FastAPI Logs Dashboard",
    "panels": [
      {
        "title": "リクエスト数(時系列)",
        "targets": [
          {
            "expr": "sum(count_over_time({job=\"fastapi\", event=\"request\"}[5m]))",
            "legendFormat": "Requests"
          }
        ],
        "type": "graph"
      },
      {
        "title": "エラー率",
        "targets": [
          {
            "expr": "sum(count_over_time({job=\"fastapi\", level=\"ERROR\"}[5m])) / sum(count_over_time({job=\"fastapi\"}[5m]))",
            "legendFormat": "Error Rate"
          }
        ],
        "type": "graph"
      },
      {
        "title": "平均レスポンスタイム",
        "targets": [
          {
            "expr": "avg_over_time({job=\"fastapi\", event=\"response\"} | json | duration_ms != \"\" | unwrap duration_ms [5m])",
            "legendFormat": "Avg Response Time (ms)"
          }
        ],
        "type": "graph"
      },
      {
        "title": "ステータスコード分布",
        "targets": [
          {
            "expr": "sum by (status_code) (count_over_time({job=\"fastapi\", event=\"response\"} | json | status_code != \"\" [5m]))",
            "legendFormat": "{{status_code}}"
          }
        ],
        "type": "pie"
      },
      {
        "title": "最近のエラーログ",
        "targets": [
          {
            "expr": "{job=\"fastapi\", level=\"ERROR\"}"
          }
        ],
        "type": "logs"
      }
    ]
  }
}

ログクエリ例(LogQL)

基本クエリ

# 全ログ取得
{job="fastapi"}

# エラーログのみ
{job="fastapi", level="ERROR"}

# 特定リクエストID
{job="fastapi"} | json | request_id="7b3f5c2a-1234-5678-9abc-def012345678"

# 特定ユーザーのログ
{job="fastapi"} | json | user_id="12345"

# HTTPメソッドフィルター
{job="fastapi", event="request"} | json | method="POST"

集計クエリ

# 5分間のリクエスト数
sum(count_over_time({job="fastapi", event="request"}[5m]))

# エンドポイント別リクエスト数
sum by (url) (count_over_time({job="fastapi", event="request"}[1h]))

# 平均レスポンスタイム
avg_over_time({job="fastapi", event="response"} | json | unwrap duration_ms [5m])

# P95レスポンスタイム
quantile_over_time(0.95, {job="fastapi", event="response"} | json | unwrap duration_ms [5m])

# エラー率
sum(count_over_time({job="fastapi", level="ERROR"}[5m]))
  /
sum(count_over_time({job="fastapi"}[5m]))

高度なクエリ

# 遅いリクエスト(500ms以上)
{job="fastapi", event="response"}
  | json
  | duration_ms > 500

# 特定エンドポイントのエラー
{job="fastapi", level="ERROR"}
  | json
  | url =~ "/api/users/.*"

# ユーザー別エラー数
sum by (user_id) (
  count_over_time({job="fastapi", level="ERROR"} | json | user_id != "" [1h])
)

アラート設定

Loki Ruler設定

# observability/loki/rules/alerts.yml
groups:
  - name: fastapi_alerts
    interval: 1m
    rules:
      # エラー率が5%超過
      - alert: HighErrorRate
        expr: |
          sum(rate({job="fastapi", level="ERROR"}[5m]))
            /
          sum(rate({job="fastapi"}[5m])) > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"

      # レスポンスタイムが1秒超過
      - alert: SlowResponse
        expr: |
          avg_over_time({job="fastapi", event="response"}
            | json
            | unwrap duration_ms [5m]) > 1000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Slow response time detected"
          description: "Average response time is {{ $value }}ms"

      # リクエスト数急増
      - alert: HighRequestRate
        expr: |
          sum(rate({job="fastapi", event="request"}[5m])) > 1000
        for: 5m
        labels:
          severity: info
        annotations:
          summary: "High request rate detected"
          description: "Request rate is {{ $value }} req/s"

ログ出力例

リクエストログ

{
  "timestamp": "2025-01-15T10:30:45.123Z",
  "level": "INFO",
  "service": "fastapi_app",
  "request_id": "7b3f5c2a-1234-5678-9abc-def012345678",
  "user_id": "12345",
  "event": "request",
  "method": "POST",
  "url": "http://localhost:8000/api/users",
  "path": "/api/users",
  "query_params": {},
  "headers": {
    "content-type": "application/json",
    "user-agent": "Mozilla/5.0",
    "authorization": "***REDACTED***"
  },
  "client_ip": "172.20.0.1",
  "body": {
    "name": "田中太郎",
    "email": "tanaka@example.com"
  }
}

レスポンスログ

{
  "timestamp": "2025-01-15T10:30:45.456Z",
  "level": "INFO",
  "service": "fastapi_app",
  "request_id": "7b3f5c2a-1234-5678-9abc-def012345678",
  "user_id": "12345",
  "event": "response",
  "method": "POST",
  "url": "http://localhost:8000/api/users",
  "status_code": 201,
  "duration_ms": 123.45,
  "response_body": {
    "id": 1,
    "name": "田中太郎",
    "email": "tanaka@example.com"
  }
}

エラーログ

{
  "timestamp": "2025-01-15T10:35:12.789Z",
  "level": "ERROR",
  "service": "fastapi_app",
  "request_id": "8c4a6d3b-5678-9012-3456-789abcdef012",
  "user_id": "12345",
  "event": "error",
  "method": "GET",
  "url": "http://localhost:8000/api/users/999",
  "duration_ms": 45.67,
  "error_type": "ValueError",
  "error_message": "User 999 not found",
  "traceback": "Traceback (most recent call last):\n  File ..."
}

運用ベストプラクティス

ログレベルの使い分け

レベル 用途
DEBUG 開発時の詳細情報 変数の値、関数呼び出し
INFO 通常の運用情報 リクエスト/レスポンス、正常処理完了
WARNING 警告(処理は継続) リトライ、非推奨機能の使用
ERROR エラー(処理失敗) バリデーションエラー、DB接続失敗
CRITICAL 致命的エラー システムダウン、データ破損

ログ保持期間

環境 保持期間 理由
開発環境 3日 ディスク節約
ステージング 7日 テスト結果確認
本番環境 30日 トラブルシューティング、監査
アーカイブ 1年以上 S3/GCSに長期保存

パフォーマンス最適化

# 開発環境: 詳細ログ
if settings.ENV == "development":
    logger.setLevel(logging.DEBUG)
else:
    logger.setLevel(logging.INFO)

# ログサンプリング(高トラフィック時)
import random

if random.random() < 0.1:  # 10%のみログ記録
    logger.info("Sampled request", extra={"sampled": True})

トラブルシューティング

ログが表示されない

# ログファイル確認
docker exec -it api ls -lh /app/logs/

# Promtailログ確認
docker logs promtail

# Lokiログ確認
docker logs loki

# Lokiエンドポイント確認
curl http://localhost:3100/ready

ログクエリが遅い

# インデックス確認
{job="fastapi"} | json | __error__=""

# ラベル最適化(フィルタリング前にラベルで絞る)
{job="fastapi", level="ERROR"} | json

まとめ

項目 内容
ログ形式 構造化JSON
リクエスト追跡 UUID(request_id)
収集 Promtail → Loki
可視化 Grafana Dashboard
保持期間 7日(本番30日)
アラート Loki Ruler

次のステップ: Docker環境設定 | テスト・CI/CD