Arize Phoenix可視化ガイド¶
概要¶
Arize Phoenixは、Embedding(埋め込みベクトル)の品質分析、クラスタリング、異常検知を可能にするオープソースの可視化ツールです。UMAP/PCAによる次元削減、MkDocsとの統合により、AIシステムの挙動を直感的に理解できます。
主要機能¶
| 機能 | 説明 | 用途 |
|---|---|---|
| UMAP可視化 | 高次元ベクトルを2D/3Dに投影 | クラスタの視覚的確認 |
| PCA分析 | 主成分分析による次元削減 | データの分散理解 |
| クラスタリング | K-means/DBSCAN | 類似文書のグループ化 |
| 異常検知 | Outlier detection | 品質の低い埋め込み特定 |
| 時系列分析 | 埋め込み変化の追跡 | モデル改善の効果測定 |
| MkDocs統合 | iframe埋め込み | ドキュメント内での可視化 |
アーキテクチャ¶
graph LR
A[PostgreSQL + pgvector] --> B[Phoenix Server]
C[Neo4j] --> B
B --> D[UMAP/PCA処理]
D --> E[可視化レイヤー]
E --> F[Phoenix UI]
E --> G[MkDocs iframe]
H[Streamlit] --> B
インストールと設定¶
1. 依存関係¶
pip install arize-phoenix
pip install psycopg2-binary pandas numpy
pip install scikit-learn umap-learn
2. Phoenix Server起動¶
# app/visualization/phoenix_server.py
import phoenix as px
import pandas as pd
import psycopg2
from datetime import datetime
class PhoenixServer:
"""Phoenix可視化サーバー"""
def __init__(
self,
db_connection: str = "postgresql://appuser:password@localhost:5432/vectors",
port: int = 6006
):
self.db_connection = db_connection
self.port = port
def load_embeddings_from_db(self) -> pd.DataFrame:
"""pgvectorからEmbeddingを読み込み"""
conn = psycopg2.connect(self.db_connection)
query = """
SELECT
id,
content,
embedding::text,
created_at,
metadata
FROM documents
ORDER BY created_at DESC
LIMIT 10000
"""
df = pd.read_sql(query, conn)
conn.close()
# Embedding文字列をリストに変換
df['embedding'] = df['embedding'].apply(self._parse_vector)
return df
def _parse_vector(self, vector_str: str) -> list:
"""pgvectorの文字列形式をリストに変換"""
# "[0.1, 0.2, ...]" -> [0.1, 0.2, ...]
import ast
return ast.literal_eval(vector_str.strip('[]'))
def launch(self):
"""Phoenix UIを起動"""
df = self.load_embeddings_from_db()
# Phoenix起動
session = px.launch_app(
primary=px.Inferences(
dataframe=df,
embedding_feature_column_names={
"text_embedding": "embedding"
},
text_column_name="content"
),
port=self.port
)
print(f"Phoenix UI: http://localhost:{self.port}")
return session
def export_for_analysis(self, output_path: str = "exports/embeddings.parquet"):
"""分析用にデータをエクスポート"""
df = self.load_embeddings_from_db()
df.to_parquet(output_path)
print(f"Exported to {output_path}")
3. Docker Compose統合¶
version: "3.9"
services:
postgres:
image: ankane/pgvector:pg16
environment:
POSTGRES_USER: appuser
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: vectors
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- backend
phoenix:
build:
context: ./phoenix
dockerfile: Dockerfile
environment:
DATABASE_URL: postgresql://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
ports:
- "6006:6006"
depends_on:
- postgres
networks:
- backend
- web
volumes:
- ./exports:/exports
api:
build: .
environment:
DATABASE_URL: postgresql+psycopg2://appuser:${POSTGRES_PASSWORD}@postgres:5432/vectors
PHOENIX_URL: http://phoenix:6006
ports:
- "8000:8000"
depends_on:
- postgres
- phoenix
networks:
- backend
- web
volumes:
postgres_data:
networks:
backend:
web:
4. Phoenix Dockerfile¶
# phoenix/Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN pip install arize-phoenix psycopg2-binary pandas numpy scikit-learn
COPY phoenix_server.py .
EXPOSE 6006
CMD ["python", "phoenix_server.py"]
高度な可視化¶
1. UMAP 2D/3D可視化¶
# app/visualization/umap_visualizer.py
import umap
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from sklearn.preprocessing import StandardScaler
class UMAPVisualizer:
"""UMAP可視化エンジン"""
def __init__(self, n_components: int = 2, n_neighbors: int = 15):
self.n_components = n_components
self.n_neighbors = n_neighbors
self.reducer = None
def fit_transform(self, embeddings: pd.DataFrame) -> pd.DataFrame:
"""Embeddingを2D/3Dに変換"""
# 正規化
scaler = StandardScaler()
embeddings_scaled = scaler.fit_transform(embeddings)
# UMAP変換
self.reducer = umap.UMAP(
n_components=self.n_components,
n_neighbors=self.n_neighbors,
min_dist=0.1,
metric='cosine'
)
reduced = self.reducer.fit_transform(embeddings_scaled)
# DataFrameに変換
columns = [f"UMAP_{i+1}" for i in range(self.n_components)]
df_reduced = pd.DataFrame(reduced, columns=columns)
return df_reduced
def plot_2d(
self,
df: pd.DataFrame,
color_by: str = None,
title: str = "UMAP 2D Visualization"
):
"""2Dプロット"""
fig = px.scatter(
df,
x="UMAP_1",
y="UMAP_2",
color=color_by,
hover_data=df.columns,
title=title,
width=1200,
height=800
)
fig.update_traces(marker=dict(size=5, opacity=0.7))
return fig
def plot_3d(
self,
df: pd.DataFrame,
color_by: str = None,
title: str = "UMAP 3D Visualization"
):
"""3Dプロット"""
fig = px.scatter_3d(
df,
x="UMAP_1",
y="UMAP_2",
z="UMAP_3",
color=color_by,
hover_data=df.columns,
title=title,
width=1200,
height=800
)
fig.update_traces(marker=dict(size=3, opacity=0.6))
return fig
def plot_density(self, df: pd.DataFrame):
"""密度ヒートマップ"""
fig = go.Figure(data=go.Histogram2d(
x=df["UMAP_1"],
y=df["UMAP_2"],
colorscale="Viridis",
nbinsx=50,
nbinsy=50
))
fig.update_layout(
title="Embedding Density Heatmap",
width=1000,
height=800
)
return fig
2. クラスタリング分析¶
# app/visualization/clustering_analyzer.py
from sklearn.cluster import KMeans, DBSCAN
from sklearn.metrics import silhouette_score
import pandas as pd
import numpy as np
class ClusteringAnalyzer:
"""クラスタリング分析"""
def kmeans_clustering(
self,
embeddings: np.ndarray,
n_clusters: int = 5
) -> dict:
"""K-meansクラスタリング"""
kmeans = KMeans(
n_clusters=n_clusters,
random_state=42,
n_init=10
)
labels = kmeans.fit_predict(embeddings)
silhouette = silhouette_score(embeddings, labels)
return {
"labels": labels,
"centers": kmeans.cluster_centers_,
"silhouette_score": silhouette,
"inertia": kmeans.inertia_
}
def dbscan_clustering(
self,
embeddings: np.ndarray,
eps: float = 0.5,
min_samples: int = 5
) -> dict:
"""DBSCANクラスタリング"""
dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine')
labels = dbscan.fit_predict(embeddings)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
return {
"labels": labels,
"n_clusters": n_clusters,
"n_noise_points": n_noise
}
def find_optimal_k(
self,
embeddings: np.ndarray,
k_range: range = range(2, 11)
) -> dict:
"""最適なクラスタ数を探索(エルボー法)"""
inertias = []
silhouettes = []
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans.fit_predict(embeddings)
inertias.append(kmeans.inertia_)
silhouettes.append(silhouette_score(embeddings, labels))
return {
"k_values": list(k_range),
"inertias": inertias,
"silhouettes": silhouettes
}
def get_cluster_statistics(
self,
df: pd.DataFrame,
labels: np.ndarray
) -> pd.DataFrame:
"""クラスタごとの統計"""
df_with_labels = df.copy()
df_with_labels['cluster'] = labels
stats = df_with_labels.groupby('cluster').agg({
'id': 'count',
'content': lambda x: ' '.join(x[:3]) # 代表的なテキスト
}).rename(columns={'id': 'count', 'content': 'sample_texts'})
return stats
3. 異常検知¶
# app/visualization/anomaly_detector.py
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import numpy as np
class AnomalyDetector:
"""Embedding異常検知"""
def isolation_forest(
self,
embeddings: np.ndarray,
contamination: float = 0.1
) -> dict:
"""Isolation Forestによる異常検知"""
clf = IsolationForest(
contamination=contamination,
random_state=42
)
predictions = clf.fit_predict(embeddings)
scores = clf.score_samples(embeddings)
# -1が異常、1が正常
anomalies = np.where(predictions == -1)[0]
return {
"predictions": predictions,
"scores": scores,
"anomaly_indices": anomalies,
"n_anomalies": len(anomalies)
}
def local_outlier_factor(
self,
embeddings: np.ndarray,
n_neighbors: int = 20,
contamination: float = 0.1
) -> dict:
"""LOFによる異常検知"""
lof = LocalOutlierFactor(
n_neighbors=n_neighbors,
contamination=contamination
)
predictions = lof.fit_predict(embeddings)
scores = lof.negative_outlier_factor_
anomalies = np.where(predictions == -1)[0]
return {
"predictions": predictions,
"scores": scores,
"anomaly_indices": anomalies,
"n_anomalies": len(anomalies)
}
def detect_distribution_shift(
self,
old_embeddings: np.ndarray,
new_embeddings: np.ndarray
) -> dict:
"""分布シフトの検知"""
from scipy.stats import ks_2samp
# 各次元ごとにKS検定
p_values = []
for i in range(old_embeddings.shape[1]):
stat, p_value = ks_2samp(
old_embeddings[:, i],
new_embeddings[:, i]
)
p_values.append(p_value)
# p値が低い次元が多ければ分布シフトあり
shift_detected = np.mean(p_values) < 0.05
return {
"shift_detected": shift_detected,
"mean_p_value": np.mean(p_values),
"shifted_dimensions": np.where(np.array(p_values) < 0.05)[0]
}
MkDocs統合¶
1. 可視化ページの作成¶
<!-- docs/visualization/embeddings.md -->
# Embedding可視化
AIモデルのベクトル埋め込み(pgvector)を **Arize Phoenix** でリアルタイム可視化しています。
## インタラクティブダッシュボード
<iframe
src="http://localhost:6006"
width="100%"
height="800px"
frameborder="0"
allowfullscreen
style="border: 1px solid #ddd; border-radius: 8px;"
></iframe>
## 分析機能
### UMAP 2D/3D投影
- 高次元ベクトルを2次元/3次元に圧縮して可視化
- クラスタの形成を直感的に理解
### クラスタリング
- K-meansやDBSCANによる自動グループ化
- 類似文書の発見
### 異常検知
- 品質の低い埋め込みを特定
- モデルの改善ポイントを発見
## 使い方
1. **検索**: 上部の検索バーで特定の文書を探索
2. **フィルタ**: 日付やメタデータでフィルタリング
3. **詳細**: ポイントをクリックして詳細情報を表示
2. Streamlit統合ダッシュボード¶
# app/ui/embedding_dashboard.py
import streamlit as st
import pandas as pd
import psycopg2
from app.visualization.umap_visualizer import UMAPVisualizer
from app.visualization.clustering_analyzer import ClusteringAnalyzer
from app.visualization.anomaly_detector import AnomalyDetector
import plotly.express as px
st.set_page_config(page_title="Embedding分析", layout="wide")
st.title("🔍 Embedding可視化ダッシュボード")
# サイドバー
with st.sidebar:
st.header("設定")
# データソース選択
data_limit = st.slider("データ件数", 100, 10000, 1000)
# 可視化設定
viz_type = st.selectbox(
"可視化タイプ",
["UMAP 2D", "UMAP 3D", "PCA", "密度ヒートマップ"]
)
# クラスタリング設定
enable_clustering = st.checkbox("クラスタリング有効")
if enable_clustering:
n_clusters = st.slider("クラスタ数", 2, 10, 5)
# 異常検知
enable_anomaly = st.checkbox("異常検知有効")
# データ読み込み
@st.cache_data
def load_data(limit: int):
conn = psycopg2.connect(st.secrets["database_url"])
query = f"""
SELECT id, content, embedding::text, created_at
FROM documents
ORDER BY created_at DESC
LIMIT {limit}
"""
df = pd.read_sql(query, conn)
conn.close()
# Embedding変換
import ast
df['embedding'] = df['embedding'].apply(
lambda x: ast.literal_eval(x.strip('[]'))
)
return df
df = load_data(data_limit)
# 統計情報
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("総文書数", len(df))
with col2:
st.metric("次元数", len(df['embedding'].iloc[0]))
with col3:
avg_length = df['content'].str.len().mean()
st.metric("平均文字数", f"{avg_length:.0f}")
with col4:
st.metric("最新更新", df['created_at'].max().strftime("%Y-%m-%d"))
# Embedding抽出
embeddings = pd.DataFrame(df['embedding'].tolist())
# 可視化
st.header("可視化")
visualizer = UMAPVisualizer(
n_components=3 if viz_type == "UMAP 3D" else 2
)
reduced = visualizer.fit_transform(embeddings)
df_viz = pd.concat([df[['content']], reduced], axis=1)
# クラスタリング
if enable_clustering:
analyzer = ClusteringAnalyzer()
clustering_result = analyzer.kmeans_clustering(
embeddings.values,
n_clusters=n_clusters
)
df_viz['cluster'] = clustering_result['labels']
st.info(f"シルエットスコア: {clustering_result['silhouette_score']:.3f}")
# 異常検知
if enable_anomaly:
detector = AnomalyDetector()
anomaly_result = detector.isolation_forest(embeddings.values)
df_viz['is_anomaly'] = anomaly_result['predictions'] == -1
st.warning(f"異常検知: {anomaly_result['n_anomalies']}件")
# プロット
color_by = None
if enable_clustering:
color_by = 'cluster'
elif enable_anomaly:
color_by = 'is_anomaly'
if viz_type == "UMAP 2D":
fig = visualizer.plot_2d(df_viz, color_by=color_by)
elif viz_type == "UMAP 3D":
fig = visualizer.plot_3d(df_viz, color_by=color_by)
elif viz_type == "密度ヒートマップ":
fig = visualizer.plot_density(df_viz)
st.plotly_chart(fig, use_container_width=True)
# データテーブル
st.header("データ詳細")
st.dataframe(df_viz, use_container_width=True)
# クラスタ統計
if enable_clustering:
st.header("クラスタ統計")
stats = analyzer.get_cluster_statistics(df, clustering_result['labels'])
st.dataframe(stats)
パフォーマンス最適化¶
1. データのサンプリング¶
def sample_data(df: pd.DataFrame, max_size: int = 5000) -> pd.DataFrame:
"""大量データの場合はサンプリング"""
if len(df) > max_size:
return df.sample(n=max_size, random_state=42)
return df
2. キャッシング¶
from functools import lru_cache
@lru_cache(maxsize=10)
def cached_umap_transform(embeddings_hash: str):
"""UMAP変換結果をキャッシュ"""
pass
3. 増分更新¶
def incremental_umap_update(
existing_reduced: np.ndarray,
new_embeddings: np.ndarray,
reducer: umap.UMAP
) -> np.ndarray:
"""新しいデータポイントのみ変換"""
return reducer.transform(new_embeddings)
ベストプラクティス¶
- 適切なサンプリング: 10,000件以上はサンプリング推奨
- 定期的な再計算: モデル更新時はUMAPを再計算
- メタデータ活用: 色分けで時系列や品質を可視化
- 異常検知の活用: 定期的に品質をモニタリング
- MkDocs統合: ドキュメント内に可視化を埋め込み
トラブルシューティング¶
問題1: メモリ不足¶
# バッチ処理で対応
def batch_transform(embeddings, batch_size=1000):
for i in range(0, len(embeddings), batch_size):
yield reducer.transform(embeddings[i:i+batch_size])