ログ・監視設定完全ガイド
概要
本プロジェクトは、構造化ログと可観測性(Observability)を重視し、Loki + Grafanaによる集中ログ管理を実現します。
アーキテクチャ
graph LR
subgraph Application
API[FastAPI<br/>JSON Logger]
MIDDLEWARE[Logging Middleware<br/>Request ID付与]
end
subgraph Log Collection
FILE[/var/log/app/<br/>fastapi.log]
PROMTAIL[Promtail<br/>Log Collector]
end
subgraph Log Storage
LOKI[Loki<br/>Log Aggregation]
end
subgraph Visualization
GRAFANA[Grafana<br/>Dashboard]
end
API --> MIDDLEWARE
MIDDLEWARE --> FILE
FILE --> PROMTAIL
PROMTAIL --> LOKI
LOKI --> GRAFANA
style API fill:#87CEEB
style LOKI fill:#FFB6C1
style GRAFANA fill:#90EE90
ログ設計の原則
構造化ログの必須フィールド
| フィールド |
説明 |
例 |
| timestamp |
ログ発生時刻(ISO8601) |
2025-01-15T10:30:45.123Z |
| level |
ログレベル |
INFO, WARNING, ERROR |
| request_id |
リクエスト追跡ID(UUID) |
7b3f5c2a-1234-5678-9abc-def012345678 |
| event |
イベント種別 |
request, response, error |
| service |
サービス名 |
fastapi_app |
| user_id |
ユーザーID(認証済みの場合) |
12345 |
| method |
HTTPメソッド |
GET, POST |
| url |
リクエストURL |
/api/users/1 |
| status_code |
HTTPステータス |
200, 404, 500 |
| duration_ms |
処理時間(ミリ秒) |
123.45 |
| error |
エラー情報 |
ValueError: User not found |
FastAPI ログミドルウェア実装
app/core/logging.py
# app/core/logging.py
import logging
import json
import time
import uuid
from typing import Callable
from datetime import datetime
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.datastructures import Headers
from pythonjsonlogger import jsonlogger
# ============================================================
# ログ設定
# ============================================================
LOG_FILE = "logs/fastapi.log"
# JSONフォーマッタ設定
class CustomJsonFormatter(jsonlogger.JsonFormatter):
"""カスタムJSONフォーマッタ"""
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
# タイムスタンプをISO8601形式に
if not log_record.get('timestamp'):
log_record['timestamp'] = datetime.utcnow().isoformat() + 'Z'
# ログレベル
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
# サービス名
log_record['service'] = 'fastapi_app'
# ロガー設定
logger = logging.getLogger("fastapi_json")
logger.setLevel(logging.INFO)
# ファイルハンドラ
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setLevel(logging.INFO)
# JSONフォーマッタ適用
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s'
)
file_handler.setFormatter(formatter)
# コンソールハンドラ(開発用)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)
# ハンドラ追加
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# ============================================================
# リクエストID管理
# ============================================================
class RequestContextFilter(logging.Filter):
"""リクエストコンテキストをログに追加"""
def filter(self, record):
# request_idがない場合はデフォルト値
if not hasattr(record, "request_id"):
record.request_id = "-"
if not hasattr(record, "user_id"):
record.user_id = "-"
return True
logger.addFilter(RequestContextFilter())
# ============================================================
# ログミドルウェア
# ============================================================
class LoggingMiddleware(BaseHTTPMiddleware):
"""
FastAPIログミドルウェア
機能:
- リクエストID自動付与
- リクエスト/レスポンスの完全ログ記録
- 処理時間計測
- ユーザーID追跡(認証済みの場合)
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# ============================================================
# リクエスト処理開始
# ============================================================
request_id = str(uuid.uuid4())
request.state.request_id = request_id
# 開始時刻
start_time = time.time()
# ユーザーID取得(認証済みの場合)
user_id = getattr(request.state, "user_id", None)
# リクエストボディ取得
body = await self._get_request_body(request)
# リクエストログ
logger.info(
"Request received",
extra={
"request_id": request_id,
"user_id": user_id,
"event": "request",
"method": request.method,
"url": str(request.url),
"path": request.url.path,
"query_params": dict(request.query_params),
"headers": self._sanitize_headers(request.headers),
"client_ip": request.client.host if request.client else None,
"body": body
}
)
# ============================================================
# アプリケーション処理
# ============================================================
try:
response = await call_next(request)
except Exception as exc:
# エラーログ
duration_ms = (time.time() - start_time) * 1000
logger.error(
f"Request failed: {str(exc)}",
extra={
"request_id": request_id,
"user_id": user_id,
"event": "error",
"method": request.method,
"url": str(request.url),
"duration_ms": round(duration_ms, 2),
"error_type": type(exc).__name__,
"error_message": str(exc)
},
exc_info=True
)
raise
# ============================================================
# レスポンス処理
# ============================================================
duration_ms = (time.time() - start_time) * 1000
# レスポンスボディ取得
response_body = await self._get_response_body(response)
# レスポンスログ
logger.info(
"Request completed",
extra={
"request_id": request_id,
"user_id": user_id,
"event": "response",
"method": request.method,
"url": str(request.url),
"status_code": response.status_code,
"duration_ms": round(duration_ms, 2),
"response_body": response_body
}
)
# カスタムヘッダーにリクエストID追加
response.headers["X-Request-ID"] = request_id
return response
# ============================================================
# ヘルパーメソッド
# ============================================================
async def _get_request_body(self, request: Request) -> str:
"""リクエストボディを取得"""
try:
body = await request.body()
if body:
# JSON解析を試みる
try:
return json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
return body.decode("utf-8", errors="ignore")
return ""
except Exception:
return "<unreadable>"
async def _get_response_body(self, response: Response) -> str:
"""レスポンスボディを取得"""
try:
response_body = b""
async for chunk in response.body_iterator:
response_body += chunk
# ボディを再設定(消費したため)
response.body_iterator = self._iterate_in_chunks(response_body)
# JSON解析を試みる
try:
return json.loads(response_body.decode("utf-8"))
except json.JSONDecodeError:
return response_body.decode("utf-8", errors="ignore")
except Exception:
return "<unreadable>"
@staticmethod
async def _iterate_in_chunks(body: bytes):
"""ボディをチャンクで返すジェネレータ"""
yield body
def _sanitize_headers(self, headers: Headers) -> dict:
"""
ヘッダーをサニタイズ(機密情報を除外)
除外するヘッダー:
- Authorization
- Cookie
- X-API-Key
"""
sanitized = dict(headers)
sensitive_headers = ["authorization", "cookie", "x-api-key"]
for header in sensitive_headers:
if header in sanitized:
sanitized[header] = "***REDACTED***"
return sanitized
# ============================================================
# アプリケーション固有のロガー
# ============================================================
def get_logger(name: str) -> logging.Logger:
"""
アプリケーション固有のロガーを取得
Usage:
logger = get_logger(__name__)
logger.info("User created", extra={"user_id": 123})
"""
app_logger = logging.getLogger(name)
app_logger.setLevel(logging.INFO)
app_logger.addHandler(file_handler)
app_logger.addHandler(console_handler)
app_logger.addFilter(RequestContextFilter())
return app_logger
FastAPIアプリケーションへの組み込み
# app/main.py
from fastapi import FastAPI
from app.core.logging import LoggingMiddleware, get_logger
from app.interfaces.routers import user_router
# ロガー取得
logger = get_logger(__name__)
# FastAPIアプリケーション
app = FastAPI(
title="DDD FastAPI Template",
version="1.0.0"
)
# ログミドルウェア追加
app.add_middleware(LoggingMiddleware)
# ルーター登録
app.include_router(user_router.router)
@app.get("/health")
async def health_check():
"""ヘルスチェック"""
logger.info("Health check requested")
return {"status": "ok"}
@app.on_event("startup")
async def startup_event():
"""起動時処理"""
logger.info("Application starting up")
@app.on_event("shutdown")
async def shutdown_event():
"""シャットダウン時処理"""
logger.info("Application shutting down")
ビジネスロジック内でのログ記録
# app/application/user_summary_usecase.py
from app.core.logging import get_logger
logger = get_logger(__name__)
class UserSummaryUseCase:
"""ユーザーサマリーユースケース"""
def execute(self, user_id: int) -> dict:
"""
ユーザーサマリー取得
Args:
user_id: ユーザーID
Returns:
ユーザーサマリー情報
"""
logger.info(
"Fetching user summary",
extra={"user_id": user_id, "event": "usecase_start"}
)
try:
# ビジネスロジック実行
profile = self.corporate_repo.find_by_id(user_id)
if not profile:
logger.warning(
f"User not found: {user_id}",
extra={"user_id": user_id, "event": "user_not_found"}
)
raise ValueError(f"User {user_id} not found")
auth_info = self.auth_repo.find_by_id(user_id)
result = {
"user_id": profile.id,
"name": profile.name,
"email": auth_info.email if auth_info else profile.email,
"is_premium": profile.is_premium_user()
}
logger.info(
"User summary fetched successfully",
extra={
"user_id": user_id,
"event": "usecase_success",
"is_premium": result["is_premium"]
}
)
return result
except Exception as e:
logger.error(
f"Error fetching user summary: {str(e)}",
extra={
"user_id": user_id,
"event": "usecase_error",
"error_type": type(e).__name__
},
exc_info=True
)
raise
Loki + Grafana 設定
Loki設定
# observability/loki/local-config.yaml
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
common:
instance_addr: 127.0.0.1
path_prefix: /loki
storage:
filesystem:
chunks_directory: /loki/chunks
rules_directory: /loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 100
schema_config:
configs:
- from: 2024-01-01
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
ruler:
alertmanager_url: http://localhost:9093
# 保持期間設定(7日間)
limits_config:
retention_period: 168h
# コンパクション設定
compactor:
working_directory: /loki/compactor
compaction_interval: 10m
retention_enabled: true
retention_delete_delay: 2h
retention_delete_worker_count: 150
Promtail設定
# observability/promtail/config.yml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
# FastAPIログ収集
- job_name: fastapi
static_configs:
- targets:
- localhost
labels:
job: fastapi
service: fastapi_app
environment: production
__path__: /var/log/app/*.log
# JSON解析パイプライン
pipeline_stages:
# JSONパース
- json:
expressions:
timestamp: timestamp
level: level
service: service
request_id: request_id
user_id: user_id
event: event
method: method
url: url
status_code: status_code
duration_ms: duration_ms
error: error
# タイムスタンプ抽出
- timestamp:
source: timestamp
format: RFC3339
# ラベル追加
- labels:
level:
service:
event:
request_id:
# 出力フィルター(ERRORのみ別ストリーム)
- match:
selector: '{level="ERROR"}'
stages:
- labels:
severity: error
Grafana ダッシュボード設定
{
"dashboard": {
"title": "FastAPI Logs Dashboard",
"panels": [
{
"title": "リクエスト数(時系列)",
"targets": [
{
"expr": "sum(count_over_time({job=\"fastapi\", event=\"request\"}[5m]))",
"legendFormat": "Requests"
}
],
"type": "graph"
},
{
"title": "エラー率",
"targets": [
{
"expr": "sum(count_over_time({job=\"fastapi\", level=\"ERROR\"}[5m])) / sum(count_over_time({job=\"fastapi\"}[5m]))",
"legendFormat": "Error Rate"
}
],
"type": "graph"
},
{
"title": "平均レスポンスタイム",
"targets": [
{
"expr": "avg_over_time({job=\"fastapi\", event=\"response\"} | json | duration_ms != \"\" | unwrap duration_ms [5m])",
"legendFormat": "Avg Response Time (ms)"
}
],
"type": "graph"
},
{
"title": "ステータスコード分布",
"targets": [
{
"expr": "sum by (status_code) (count_over_time({job=\"fastapi\", event=\"response\"} | json | status_code != \"\" [5m]))",
"legendFormat": "{{status_code}}"
}
],
"type": "pie"
},
{
"title": "最近のエラーログ",
"targets": [
{
"expr": "{job=\"fastapi\", level=\"ERROR\"}"
}
],
"type": "logs"
}
]
}
}
ログクエリ例(LogQL)
基本クエリ
# 全ログ取得
{job="fastapi"}
# エラーログのみ
{job="fastapi", level="ERROR"}
# 特定リクエストID
{job="fastapi"} | json | request_id="7b3f5c2a-1234-5678-9abc-def012345678"
# 特定ユーザーのログ
{job="fastapi"} | json | user_id="12345"
# HTTPメソッドフィルター
{job="fastapi", event="request"} | json | method="POST"
集計クエリ
# 5分間のリクエスト数
sum(count_over_time({job="fastapi", event="request"}[5m]))
# エンドポイント別リクエスト数
sum by (url) (count_over_time({job="fastapi", event="request"}[1h]))
# 平均レスポンスタイム
avg_over_time({job="fastapi", event="response"} | json | unwrap duration_ms [5m])
# P95レスポンスタイム
quantile_over_time(0.95, {job="fastapi", event="response"} | json | unwrap duration_ms [5m])
# エラー率
sum(count_over_time({job="fastapi", level="ERROR"}[5m]))
/
sum(count_over_time({job="fastapi"}[5m]))
高度なクエリ
# 遅いリクエスト(500ms以上)
{job="fastapi", event="response"}
| json
| duration_ms > 500
# 特定エンドポイントのエラー
{job="fastapi", level="ERROR"}
| json
| url =~ "/api/users/.*"
# ユーザー別エラー数
sum by (user_id) (
count_over_time({job="fastapi", level="ERROR"} | json | user_id != "" [1h])
)
アラート設定
Loki Ruler設定
# observability/loki/rules/alerts.yml
groups:
- name: fastapi_alerts
interval: 1m
rules:
# エラー率が5%超過
- alert: HighErrorRate
expr: |
sum(rate({job="fastapi", level="ERROR"}[5m]))
/
sum(rate({job="fastapi"}[5m])) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
# レスポンスタイムが1秒超過
- alert: SlowResponse
expr: |
avg_over_time({job="fastapi", event="response"}
| json
| unwrap duration_ms [5m]) > 1000
for: 10m
labels:
severity: warning
annotations:
summary: "Slow response time detected"
description: "Average response time is {{ $value }}ms"
# リクエスト数急増
- alert: HighRequestRate
expr: |
sum(rate({job="fastapi", event="request"}[5m])) > 1000
for: 5m
labels:
severity: info
annotations:
summary: "High request rate detected"
description: "Request rate is {{ $value }} req/s"
ログ出力例
リクエストログ
{
"timestamp": "2025-01-15T10:30:45.123Z",
"level": "INFO",
"service": "fastapi_app",
"request_id": "7b3f5c2a-1234-5678-9abc-def012345678",
"user_id": "12345",
"event": "request",
"method": "POST",
"url": "http://localhost:8000/api/users",
"path": "/api/users",
"query_params": {},
"headers": {
"content-type": "application/json",
"user-agent": "Mozilla/5.0",
"authorization": "***REDACTED***"
},
"client_ip": "172.20.0.1",
"body": {
"name": "田中太郎",
"email": "tanaka@example.com"
}
}
レスポンスログ
{
"timestamp": "2025-01-15T10:30:45.456Z",
"level": "INFO",
"service": "fastapi_app",
"request_id": "7b3f5c2a-1234-5678-9abc-def012345678",
"user_id": "12345",
"event": "response",
"method": "POST",
"url": "http://localhost:8000/api/users",
"status_code": 201,
"duration_ms": 123.45,
"response_body": {
"id": 1,
"name": "田中太郎",
"email": "tanaka@example.com"
}
}
エラーログ
{
"timestamp": "2025-01-15T10:35:12.789Z",
"level": "ERROR",
"service": "fastapi_app",
"request_id": "8c4a6d3b-5678-9012-3456-789abcdef012",
"user_id": "12345",
"event": "error",
"method": "GET",
"url": "http://localhost:8000/api/users/999",
"duration_ms": 45.67,
"error_type": "ValueError",
"error_message": "User 999 not found",
"traceback": "Traceback (most recent call last):\n File ..."
}
運用ベストプラクティス
ログレベルの使い分け
| レベル |
用途 |
例 |
| DEBUG |
開発時の詳細情報 |
変数の値、関数呼び出し |
| INFO |
通常の運用情報 |
リクエスト/レスポンス、正常処理完了 |
| WARNING |
警告(処理は継続) |
リトライ、非推奨機能の使用 |
| ERROR |
エラー(処理失敗) |
バリデーションエラー、DB接続失敗 |
| CRITICAL |
致命的エラー |
システムダウン、データ破損 |
ログ保持期間
| 環境 |
保持期間 |
理由 |
| 開発環境 |
3日 |
ディスク節約 |
| ステージング |
7日 |
テスト結果確認 |
| 本番環境 |
30日 |
トラブルシューティング、監査 |
| アーカイブ |
1年以上 |
S3/GCSに長期保存 |
パフォーマンス最適化
# 開発環境: 詳細ログ
if settings.ENV == "development":
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# ログサンプリング(高トラフィック時)
import random
if random.random() < 0.1: # 10%のみログ記録
logger.info("Sampled request", extra={"sampled": True})
トラブルシューティング
ログが表示されない
# ログファイル確認
docker exec -it api ls -lh /app/logs/
# Promtailログ確認
docker logs promtail
# Lokiログ確認
docker logs loki
# Lokiエンドポイント確認
curl http://localhost:3100/ready
ログクエリが遅い
# インデックス確認
{job="fastapi"} | json | __error__=""
# ラベル最適化(フィルタリング前にラベルで絞る)
{job="fastapi", level="ERROR"} | json
まとめ
| 項目 |
内容 |
| ログ形式 |
構造化JSON |
| リクエスト追跡 |
UUID(request_id) |
| 収集 |
Promtail → Loki |
| 可視化 |
Grafana Dashboard |
| 保持期間 |
7日(本番30日) |
| アラート |
Loki Ruler |
次のステップ: Docker環境設定 | テスト・CI/CD