コンテンツにスキップ

Arize Phoenix可視化ガイド

概要

Arize Phoenixは、Embedding(埋め込みベクトル)の品質分析、クラスタリング、異常検知を可能にするオープソースの可視化ツールです。UMAP/PCAによる次元削減、MkDocsとの統合により、AIシステムの挙動を直感的に理解できます。

主要機能

機能 説明 用途
UMAP可視化 高次元ベクトルを2D/3Dに投影 クラスタの視覚的確認
PCA分析 主成分分析による次元削減 データの分散理解
クラスタリング K-means/DBSCAN 類似文書のグループ化
異常検知 Outlier detection 品質の低い埋め込み特定
時系列分析 埋め込み変化の追跡 モデル改善の効果測定
MkDocs統合 iframe埋め込み ドキュメント内での可視化

アーキテクチャ

graph LR
    A[PostgreSQL + pgvector] --> B[Phoenix Server]
    C[Neo4j] --> B
    B --> D[UMAP/PCA処理]
    D --> E[可視化レイヤー]
    E --> F[Phoenix UI]
    E --> G[MkDocs iframe]
    H[Streamlit] --> B

インストールと設定

1. 依存関係

pip install arize-phoenix
pip install psycopg2-binary pandas numpy
pip install scikit-learn umap-learn

2. Phoenix Server起動

# app/visualization/phoenix_server.py
import phoenix as px
import pandas as pd
import psycopg2
from datetime import datetime

class PhoenixServer:
    """Phoenix可視化サーバー"""

    def __init__(
        self,
        db_connection: str = "postgresql://appuser:password@localhost:5432/vectors",
        port: int = 6006
    ):
        self.db_connection = db_connection
        self.port = port

    def load_embeddings_from_db(self) -> pd.DataFrame:
        """pgvectorからEmbeddingを読み込み"""
        conn = psycopg2.connect(self.db_connection)

        query = """
        SELECT
            id,
            content,
            embedding::text,
            created_at,
            metadata
        FROM documents
        ORDER BY created_at DESC
        LIMIT 10000
        """

        df = pd.read_sql(query, conn)
        conn.close()

        # Embedding文字列をリストに変換
        df['embedding'] = df['embedding'].apply(self._parse_vector)

        return df

    def _parse_vector(self, vector_str: str) -> list:
        """pgvectorの文字列形式をリストに変換"""
        # "[0.1, 0.2, ...]" -> [0.1, 0.2, ...]
        import ast
        return ast.literal_eval(vector_str.strip('[]'))

    def launch(self):
        """Phoenix UIを起動"""
        df = self.load_embeddings_from_db()

        # Phoenix起動
        session = px.launch_app(
            primary=px.Inferences(
                dataframe=df,
                embedding_feature_column_names={
                    "text_embedding": "embedding"
                },
                text_column_name="content"
            ),
            port=self.port
        )

        print(f"Phoenix UI: http://localhost:{self.port}")
        return session

    def export_for_analysis(self, output_path: str = "exports/embeddings.parquet"):
        """分析用にデータをエクスポート"""
        df = self.load_embeddings_from_db()
        df.to_parquet(output_path)
        print(f"Exported to {output_path}")

3. Docker Compose統合

version: "3.9"

services:
  postgres:
    image: ankane/pgvector:pg16
    environment:
      POSTGRES_USER: appuser
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: vectors
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - backend

  phoenix:
    build:
      context: ./phoenix
      dockerfile: Dockerfile
    environment:
      DATABASE_URL: postgresql://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
    ports:
      - "6006:6006"
    depends_on:
      - postgres
    networks:
      - backend
      - web
    volumes:
      - ./exports:/exports

  api:
    build: .
    environment:
      DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
      PHOENIX_URL: http://phoenix:6006
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - phoenix
    networks:
      - backend
      - web

volumes:
  postgres_data:

networks:
  backend:
  web:

4. Phoenix Dockerfile

# phoenix/Dockerfile
FROM python:3.11-slim

WORKDIR /app

RUN pip install arize-phoenix psycopg2-binary pandas numpy scikit-learn

COPY phoenix_server.py .

EXPOSE 6006

CMD ["python", "phoenix_server.py"]

高度な可視化

1. UMAP 2D/3D可視化

# app/visualization/umap_visualizer.py
import umap
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from sklearn.preprocessing import StandardScaler

class UMAPVisualizer:
    """UMAP可視化エンジン"""

    def __init__(self, n_components: int = 2, n_neighbors: int = 15):
        self.n_components = n_components
        self.n_neighbors = n_neighbors
        self.reducer = None

    def fit_transform(self, embeddings: pd.DataFrame) -> pd.DataFrame:
        """Embeddingを2D/3Dに変換"""

        # 正規化
        scaler = StandardScaler()
        embeddings_scaled = scaler.fit_transform(embeddings)

        # UMAP変換
        self.reducer = umap.UMAP(
            n_components=self.n_components,
            n_neighbors=self.n_neighbors,
            min_dist=0.1,
            metric='cosine'
        )

        reduced = self.reducer.fit_transform(embeddings_scaled)

        # DataFrameに変換
        columns = [f"UMAP_{i+1}" for i in range(self.n_components)]
        df_reduced = pd.DataFrame(reduced, columns=columns)

        return df_reduced

    def plot_2d(
        self,
        df: pd.DataFrame,
        color_by: str = None,
        title: str = "UMAP 2D Visualization"
    ):
        """2Dプロット"""
        fig = px.scatter(
            df,
            x="UMAP_1",
            y="UMAP_2",
            color=color_by,
            hover_data=df.columns,
            title=title,
            width=1200,
            height=800
        )

        fig.update_traces(marker=dict(size=5, opacity=0.7))
        return fig

    def plot_3d(
        self,
        df: pd.DataFrame,
        color_by: str = None,
        title: str = "UMAP 3D Visualization"
    ):
        """3Dプロット"""
        fig = px.scatter_3d(
            df,
            x="UMAP_1",
            y="UMAP_2",
            z="UMAP_3",
            color=color_by,
            hover_data=df.columns,
            title=title,
            width=1200,
            height=800
        )

        fig.update_traces(marker=dict(size=3, opacity=0.6))
        return fig

    def plot_density(self, df: pd.DataFrame):
        """密度ヒートマップ"""
        fig = go.Figure(data=go.Histogram2d(
            x=df["UMAP_1"],
            y=df["UMAP_2"],
            colorscale="Viridis",
            nbinsx=50,
            nbinsy=50
        ))

        fig.update_layout(
            title="Embedding Density Heatmap",
            width=1000,
            height=800
        )

        return fig

2. クラスタリング分析

# app/visualization/clustering_analyzer.py
from sklearn.cluster import KMeans, DBSCAN
from sklearn.metrics import silhouette_score
import pandas as pd
import numpy as np

class ClusteringAnalyzer:
    """クラスタリング分析"""

    def kmeans_clustering(
        self,
        embeddings: np.ndarray,
        n_clusters: int = 5
    ) -> dict:
        """K-meansクラスタリング"""

        kmeans = KMeans(
            n_clusters=n_clusters,
            random_state=42,
            n_init=10
        )

        labels = kmeans.fit_predict(embeddings)
        silhouette = silhouette_score(embeddings, labels)

        return {
            "labels": labels,
            "centers": kmeans.cluster_centers_,
            "silhouette_score": silhouette,
            "inertia": kmeans.inertia_
        }

    def dbscan_clustering(
        self,
        embeddings: np.ndarray,
        eps: float = 0.5,
        min_samples: int = 5
    ) -> dict:
        """DBSCANクラスタリング"""

        dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine')
        labels = dbscan.fit_predict(embeddings)

        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = list(labels).count(-1)

        return {
            "labels": labels,
            "n_clusters": n_clusters,
            "n_noise_points": n_noise
        }

    def find_optimal_k(
        self,
        embeddings: np.ndarray,
        k_range: range = range(2, 11)
    ) -> dict:
        """最適なクラスタ数を探索(エルボー法)"""

        inertias = []
        silhouettes = []

        for k in k_range:
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            labels = kmeans.fit_predict(embeddings)

            inertias.append(kmeans.inertia_)
            silhouettes.append(silhouette_score(embeddings, labels))

        return {
            "k_values": list(k_range),
            "inertias": inertias,
            "silhouettes": silhouettes
        }

    def get_cluster_statistics(
        self,
        df: pd.DataFrame,
        labels: np.ndarray
    ) -> pd.DataFrame:
        """クラスタごとの統計"""

        df_with_labels = df.copy()
        df_with_labels['cluster'] = labels

        stats = df_with_labels.groupby('cluster').agg({
            'id': 'count',
            'content': lambda x: ' '.join(x[:3])  # 代表的なテキスト
        }).rename(columns={'id': 'count', 'content': 'sample_texts'})

        return stats

3. 異常検知

# app/visualization/anomaly_detector.py
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import numpy as np

class AnomalyDetector:
    """Embedding異常検知"""

    def isolation_forest(
        self,
        embeddings: np.ndarray,
        contamination: float = 0.1
    ) -> dict:
        """Isolation Forestによる異常検知"""

        clf = IsolationForest(
            contamination=contamination,
            random_state=42
        )

        predictions = clf.fit_predict(embeddings)
        scores = clf.score_samples(embeddings)

        # -1が異常、1が正常
        anomalies = np.where(predictions == -1)[0]

        return {
            "predictions": predictions,
            "scores": scores,
            "anomaly_indices": anomalies,
            "n_anomalies": len(anomalies)
        }

    def local_outlier_factor(
        self,
        embeddings: np.ndarray,
        n_neighbors: int = 20,
        contamination: float = 0.1
    ) -> dict:
        """LOFによる異常検知"""

        lof = LocalOutlierFactor(
            n_neighbors=n_neighbors,
            contamination=contamination
        )

        predictions = lof.fit_predict(embeddings)
        scores = lof.negative_outlier_factor_

        anomalies = np.where(predictions == -1)[0]

        return {
            "predictions": predictions,
            "scores": scores,
            "anomaly_indices": anomalies,
            "n_anomalies": len(anomalies)
        }

    def detect_distribution_shift(
        self,
        old_embeddings: np.ndarray,
        new_embeddings: np.ndarray
    ) -> dict:
        """分布シフトの検知"""
        from scipy.stats import ks_2samp

        # 各次元ごとにKS検定
        p_values = []
        for i in range(old_embeddings.shape[1]):
            stat, p_value = ks_2samp(
                old_embeddings[:, i],
                new_embeddings[:, i]
            )
            p_values.append(p_value)

        # p値が低い次元が多ければ分布シフトあり
        shift_detected = np.mean(p_values) < 0.05

        return {
            "shift_detected": shift_detected,
            "mean_p_value": np.mean(p_values),
            "shifted_dimensions": np.where(np.array(p_values) < 0.05)[0]
        }

MkDocs統合

1. 可視化ページの作成

<!-- docs/visualization/embeddings.md -->
# Embedding可視化

AIモデルのベクトル埋め込み(pgvector)を **Arize Phoenix** でリアルタイム可視化しています。

## インタラクティブダッシュボード

<iframe
    src="http://localhost:6006"
    width="100%"
    height="800px"
    frameborder="0"
    allowfullscreen
    style="border: 1px solid #ddd; border-radius: 8px;"
></iframe>

## 分析機能

### UMAP 2D/3D投影
- 高次元ベクトルを2次元/3次元に圧縮して可視化
- クラスタの形成を直感的に理解

### クラスタリング
- K-meansやDBSCANによる自動グループ化
- 類似文書の発見

### 異常検知
- 品質の低い埋め込みを特定
- モデルの改善ポイントを発見

## 使い方

1. **検索**: 上部の検索バーで特定の文書を探索
2. **フィルタ**: 日付やメタデータでフィルタリング
3. **詳細**: ポイントをクリックして詳細情報を表示

2. Streamlit統合ダッシュボード

# app/ui/embedding_dashboard.py
import streamlit as st
import pandas as pd
import psycopg2
from app.visualization.umap_visualizer import UMAPVisualizer
from app.visualization.clustering_analyzer import ClusteringAnalyzer
from app.visualization.anomaly_detector import AnomalyDetector
import plotly.express as px

st.set_page_config(page_title="Embedding分析", layout="wide")

st.title("🔍 Embedding可視化ダッシュボード")

# サイドバー
with st.sidebar:
    st.header("設定")

    # データソース選択
    data_limit = st.slider("データ件数", 100, 10000, 1000)

    # 可視化設定
    viz_type = st.selectbox(
        "可視化タイプ",
        ["UMAP 2D", "UMAP 3D", "PCA", "密度ヒートマップ"]
    )

    # クラスタリング設定
    enable_clustering = st.checkbox("クラスタリング有効")
    if enable_clustering:
        n_clusters = st.slider("クラスタ数", 2, 10, 5)

    # 異常検知
    enable_anomaly = st.checkbox("異常検知有効")

# データ読み込み
@st.cache_data
def load_data(limit: int):
    conn = psycopg2.connect(st.secrets["database_url"])
    query = f"""
    SELECT id, content, embedding::text, created_at
    FROM documents
    ORDER BY created_at DESC
    LIMIT {limit}
    """
    df = pd.read_sql(query, conn)
    conn.close()

    # Embedding変換
    import ast
    df['embedding'] = df['embedding'].apply(
        lambda x: ast.literal_eval(x.strip('[]'))
    )

    return df

df = load_data(data_limit)

# 統計情報
col1, col2, col3, col4 = st.columns(4)
with col1:
    st.metric("総文書数", len(df))
with col2:
    st.metric("次元数", len(df['embedding'].iloc[0]))
with col3:
    avg_length = df['content'].str.len().mean()
    st.metric("平均文字数", f"{avg_length:.0f}")
with col4:
    st.metric("最新更新", df['created_at'].max().strftime("%Y-%m-%d"))

# Embedding抽出
embeddings = pd.DataFrame(df['embedding'].tolist())

# 可視化
st.header("可視化")

visualizer = UMAPVisualizer(
    n_components=3 if viz_type == "UMAP 3D" else 2
)

reduced = visualizer.fit_transform(embeddings)
df_viz = pd.concat([df[['content']], reduced], axis=1)

# クラスタリング
if enable_clustering:
    analyzer = ClusteringAnalyzer()
    clustering_result = analyzer.kmeans_clustering(
        embeddings.values,
        n_clusters=n_clusters
    )
    df_viz['cluster'] = clustering_result['labels']

    st.info(f"シルエットスコア: {clustering_result['silhouette_score']:.3f}")

# 異常検知
if enable_anomaly:
    detector = AnomalyDetector()
    anomaly_result = detector.isolation_forest(embeddings.values)
    df_viz['is_anomaly'] = anomaly_result['predictions'] == -1

    st.warning(f"異常検知: {anomaly_result['n_anomalies']}件")

# プロット
color_by = None
if enable_clustering:
    color_by = 'cluster'
elif enable_anomaly:
    color_by = 'is_anomaly'

if viz_type == "UMAP 2D":
    fig = visualizer.plot_2d(df_viz, color_by=color_by)
elif viz_type == "UMAP 3D":
    fig = visualizer.plot_3d(df_viz, color_by=color_by)
elif viz_type == "密度ヒートマップ":
    fig = visualizer.plot_density(df_viz)

st.plotly_chart(fig, use_container_width=True)

# データテーブル
st.header("データ詳細")
st.dataframe(df_viz, use_container_width=True)

# クラスタ統計
if enable_clustering:
    st.header("クラスタ統計")
    stats = analyzer.get_cluster_statistics(df, clustering_result['labels'])
    st.dataframe(stats)

パフォーマンス最適化

1. データのサンプリング

def sample_data(df: pd.DataFrame, max_size: int = 5000) -> pd.DataFrame:
    """大量データの場合はサンプリング"""
    if len(df) > max_size:
        return df.sample(n=max_size, random_state=42)
    return df

2. キャッシング

from functools import lru_cache

@lru_cache(maxsize=10)
def cached_umap_transform(embeddings_hash: str):
    """UMAP変換結果をキャッシュ"""
    pass

3. 増分更新

def incremental_umap_update(
    existing_reduced: np.ndarray,
    new_embeddings: np.ndarray,
    reducer: umap.UMAP
) -> np.ndarray:
    """新しいデータポイントのみ変換"""
    return reducer.transform(new_embeddings)

ベストプラクティス

  1. 適切なサンプリング: 10,000件以上はサンプリング推奨
  2. 定期的な再計算: モデル更新時はUMAPを再計算
  3. メタデータ活用: 色分けで時系列や品質を可視化
  4. 異常検知の活用: 定期的に品質をモニタリング
  5. MkDocs統合: ドキュメント内に可視化を埋め込み

トラブルシューティング

問題1: メモリ不足

# バッチ処理で対応
def batch_transform(embeddings, batch_size=1000):
    for i in range(0, len(embeddings), batch_size):
        yield reducer.transform(embeddings[i:i+batch_size])

問題2: 可視化が遅い

# 次元削減のパラメータ調整
n_neighbors=5  # デフォルト15から減らす
min_dist=0.3   # 広げて計算を軽量化

参考リソース