マルチエージェントシステムガイド
概要
マルチエージェントシステムは、複数の専門AIエージェントが協調して複雑なタスクを解決する高度なAIアーキテクチャです。CrewAI、Intelligent Knowledge Agent、Cognitive Memoryを組み合わせることで、人間のチームのような協調的な問題解決を実現します。
システム構成
graph TD
A[ユーザータスク] --> B[Coordinator Agent]
B --> C[Tech Expert Agent]
B --> D[Business Analyst Agent]
B --> E[Security Advisor Agent]
C --> F[Knowledge Graph]
D --> F
E --> F
F --> G[Cognitive Memory]
G --> H[Meta Evaluator]
H --> I[統合された回答]
I --> J[自己反省ループ]
J --> G
コアコンポーネント
| コンポーネント |
技術 |
役割 |
| エージェント管理 |
CrewAI / LangGraph |
エージェントの役割と連携制御 |
| 知識ベース |
Neo4j + pgvector |
共有知識グラフ |
| メモリシステム |
Cognitive Memory |
思考プロセスの記録と学習 |
| 評価システム |
Meta Evaluator |
出力品質の評価と改善 |
| オーケストレーター |
LangChain |
ワークフロー制御 |
実装
1. CrewAIベースのマルチエージェント
# app/agents/multi_agent_system.py
from crewai import Agent, Task, Crew, Process
from langchain.chat_models import ChatOpenAI
from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator
import os
class MultiAgentSystem:
"""マルチエージェント協調システム"""
def __init__(self):
self.llm = ChatOpenAI(
model="gpt-4o",
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
self.rag = GraphRAGOrchestrator()
# エージェント定義
self.agents = self._create_agents()
def _create_agents(self) -> dict:
"""専門エージェントの作成"""
# 技術エキスパート
tech_expert = Agent(
role="Technical Expert",
goal="技術的な正確性と実装可能性を評価する",
backstory="""
あなたは15年の経験を持つシニアエンジニアです。
アーキテクチャ、セキュリティ、パフォーマンスに精通しています。
""",
llm=self.llm,
verbose=True,
allow_delegation=True
)
# ビジネスアナリスト
business_analyst = Agent(
role="Business Analyst",
goal="ビジネス価値とROIを評価する",
backstory="""
あなたはビジネス戦略の専門家です。
コスト、市場価値、競争優位性の観点から提案を評価します。
""",
llm=self.llm,
verbose=True,
allow_delegation=True
)
# セキュリティアドバイザー
security_advisor = Agent(
role="Security Advisor",
goal="セキュリティリスクと対策を特定する",
backstory="""
あなたはサイバーセキュリティの専門家です。
脆弱性、コンプライアンス、データ保護の観点から評価します。
""",
llm=self.llm,
verbose=True,
allow_delegation=True
)
# メタオブザーバー(調整役)
meta_observer = Agent(
role="Meta Observer",
goal="各エージェントの意見を統合し、最終判断を下す",
backstory="""
あなたはプロジェクトマネージャーです。
技術、ビジネス、セキュリティの観点を総合的に判断します。
""",
llm=self.llm,
verbose=True,
allow_delegation=False
)
return {
"tech": tech_expert,
"business": business_analyst,
"security": security_advisor,
"meta": meta_observer
}
def execute_collaborative_task(self, task_description: str) -> dict:
"""協調タスクの実行"""
# 1. 知識グラフから関連情報を取得
context = self.rag.query_with_graph_context(task_description)
# 2. 各エージェントにタスクを割り当て
tech_task = Task(
description=f"""
以下の提案を技術的観点から評価してください:
{task_description}
参考情報:
{context['answer']}
""",
agent=self.agents["tech"],
expected_output="技術的評価レポート(実装可能性、リスク、推奨事項)"
)
business_task = Task(
description=f"""
以下の提案をビジネス観点から評価してください:
{task_description}
参考情報:
{context['answer']}
""",
agent=self.agents["business"],
expected_output="ビジネス評価レポート(ROI、市場価値、優先度)"
)
security_task = Task(
description=f"""
以下の提案をセキュリティ観点から評価してください:
{task_description}
参考情報:
{context['answer']}
""",
agent=self.agents["security"],
expected_output="セキュリティ評価レポート(リスク、対策、コンプライアンス)"
)
# 3. メタタスク(統合)
meta_task = Task(
description="""
技術、ビジネス、セキュリティの各評価を統合し、
最終的な推奨事項をまとめてください。
""",
agent=self.agents["meta"],
expected_output="統合レポート(総合評価、推奨アクション、優先順位)",
context=[tech_task, business_task, security_task]
)
# 4. Crewの作成と実行
crew = Crew(
agents=[
self.agents["tech"],
self.agents["business"],
self.agents["security"],
self.agents["meta"]
],
tasks=[tech_task, business_task, security_task, meta_task],
process=Process.sequential, # 順次実行
verbose=True
)
# 実行
result = crew.kickoff()
return {
"task": task_description,
"result": result,
"tech_evaluation": tech_task.output,
"business_evaluation": business_task.output,
"security_evaluation": security_task.output,
"final_recommendation": meta_task.output
}
def parallel_research(self, topics: list[str]) -> dict:
"""並列リサーチタスク"""
research_agents = []
research_tasks = []
for i, topic in enumerate(topics):
# 各トピックごとにリサーチャーを作成
researcher = Agent(
role=f"Researcher {i+1}",
goal=f"{topic}について調査し、要約する",
backstory="あなたは専門的なリサーチャーです。",
llm=self.llm,
verbose=False
)
task = Task(
description=f"{topic}について調査し、重要なポイントをまとめてください。",
agent=researcher,
expected_output="調査レポート(要約、重要事項、参考情報)"
)
research_agents.append(researcher)
research_tasks.append(task)
# 統合タスク
synthesizer = Agent(
role="Research Synthesizer",
goal="各リサーチ結果を統合する",
backstory="あなたは情報統合の専門家です。",
llm=self.llm,
verbose=False
)
synthesis_task = Task(
description="各リサーチ結果を統合し、全体像をまとめてください。",
agent=synthesizer,
expected_output="統合リサーチレポート",
context=research_tasks
)
# 並列実行
crew = Crew(
agents=research_agents + [synthesizer],
tasks=research_tasks + [synthesis_task],
process=Process.parallel, # 並列実行
verbose=True
)
result = crew.kickoff()
return {
"topics": topics,
"synthesis": result
}
2. Cognitive Memory(認知メモリ)
# app/agents/cognitive_memory_agent.py
from datetime import datetime
import json
import os
from pathlib import Path
class CognitiveMemoryAgent:
"""AIの思考プロセスを記録・分析するメタ記憶システム"""
def __init__(self, memory_path: str = "data/cognitive_memory.json"):
self.memory_path = memory_path
self.rag = GraphRAGOrchestrator()
self.llm = OpenAIClient(api_key=os.getenv("OPENAI_API_KEY"))
def _load_memory(self) -> list:
"""メモリ読み込み"""
if not os.path.exists(self.memory_path):
return []
with open(self.memory_path, 'r', encoding='utf-8') as f:
return json.load(f)
def _save_memory(self, memory: list):
"""メモリ保存"""
Path(self.memory_path).parent.mkdir(parents=True, exist_ok=True)
with open(self.memory_path, 'w', encoding='utf-8') as f:
json.dump(memory, f, ensure_ascii=False, indent=2)
def process_with_memory(self, question: str) -> dict:
"""メモリを活用した思考プロセス"""
# 1. 過去の類似質問を検索
memory = self._load_memory()
similar_memories = self._find_similar_memories(question, memory)
# 2. RAG検索
result = self.rag.query_with_graph_context(question)
# 3. メタ評価(自己反省)
meta_summary = self._meta_evaluate(question, result, similar_memories)
# 4. メモリに記録
memory_entry = {
"timestamp": datetime.utcnow().isoformat(),
"question": question,
"answer": result["answer"],
"meta_summary": meta_summary,
"confidence": self._calculate_confidence(result),
"similar_past_questions": [m["question"] for m in similar_memories]
}
memory.append(memory_entry)
self._save_memory(memory[-100:]) # 最新100件のみ保持
return {
"answer": result["answer"],
"meta_analysis": meta_summary,
"learning_from_past": len(similar_memories) > 0
}
def _find_similar_memories(self, question: str, memory: list) -> list:
"""類似する過去の記憶を検索"""
if not memory:
return []
# 簡易的な類似度計算(実際にはEmbeddingを使用)
similar = []
for mem in memory[-50:]: # 直近50件から検索
similarity = self._text_similarity(question, mem["question"])
if similarity > 0.7:
similar.append(mem)
return similar[:3] # 上位3件
def _text_similarity(self, text1: str, text2: str) -> float:
"""テキスト類似度(簡易版)"""
# 実際にはEmbeddingのコサイン類似度を使用
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union) if union else 0.0
def _meta_evaluate(
self,
question: str,
result: dict,
similar_memories: list
) -> str:
"""メタ評価(思考の反省)"""
prompt = f"""
以下の思考プロセスを分析し、改善点を提案してください:
質問: {question}
回答: {result['answer']}
参照元: {len(result['sources'])}件
過去の類似質問: {len(similar_memories)}件
分析項目:
1. 回答の妥当性
2. 情報源の適切性
3. 過去の学習の活用度
4. 改善すべき点
"""
return self.llm.ask(prompt)
def _calculate_confidence(self, result: dict) -> float:
"""信頼度計算"""
source_score = min(len(result["sources"]) / 5.0, 1.0)
graph_score = min(len(result.get("graph_relations", [])) / 3.0, 1.0)
return (source_score + graph_score) / 2.0
def get_memory_stats(self) -> dict:
"""メモリ統計"""
memory = self._load_memory()
if not memory:
return {"total": 0}
avg_confidence = sum(m.get("confidence", 0) for m in memory) / len(memory)
return {
"total_memories": len(memory),
"avg_confidence": avg_confidence,
"date_range": {
"first": memory[0]["timestamp"],
"last": memory[-1]["timestamp"]
}
}
3. Intelligent Knowledge Agent
# app/agents/intelligent_knowledge_agent.py
from crewai import Agent, Task, Crew
from neo4j import GraphDatabase
class IntelligentKnowledgeAgent:
"""Neo4jグラフ上のノードをエージェント化"""
def __init__(
self,
neo4j_uri: str = "bolt://localhost:7687",
neo4j_user: str = "neo4j",
neo4j_password: str = "password"
):
self.driver = GraphDatabase.driver(
neo4j_uri,
auth=(neo4j_user, neo4j_password)
)
self.llm = ChatOpenAI(
model="gpt-4o",
openai_api_key=os.getenv("OPENAI_API_KEY")
)
def create_agents_from_graph(self, domain: str) -> list:
"""グラフから動的にエージェントを生成"""
with self.driver.session() as session:
# ドメインに関連するノードを取得
result = session.run("""
MATCH (n:Entity)-[r]->(m:Entity)
WHERE n.domain = $domain OR m.domain = $domain
RETURN DISTINCT n.name as name, n.expertise as expertise
LIMIT 5
""", domain=domain)
agents = []
for record in result:
agent = Agent(
role=f"{record['name']} Expert",
goal=f"{record['expertise']}の観点から分析する",
backstory=f"あなたは{record['name']}の専門家です。",
llm=self.llm,
verbose=False
)
agents.append(agent)
return agents
def graph_collaborative_reasoning(self, question: str, domain: str) -> dict:
"""グラフベースの協調推論"""
# 1. ドメインから動的にエージェントを生成
agents = self.create_agents_from_graph(domain)
if not agents:
return {"error": "No agents found for domain"}
# 2. 各エージェントにタスク割り当て
tasks = []
for agent in agents:
task = Task(
description=f"{question}\n\n{agent.role}の観点から回答してください。",
agent=agent,
expected_output="専門的な分析結果"
)
tasks.append(task)
# 3. 統合エージェント
coordinator = Agent(
role="Knowledge Coordinator",
goal="各専門家の意見を統合する",
backstory="あなたは知識統合の専門家です。",
llm=self.llm,
verbose=True
)
coordination_task = Task(
description="各専門家の意見を統合し、包括的な回答を作成してください。",
agent=coordinator,
expected_output="統合された回答",
context=tasks
)
# 4. Crew実行
crew = Crew(
agents=agents + [coordinator],
tasks=tasks + [coordination_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
# 5. グラフに結果を記録
self._save_reasoning_to_graph(question, result, domain)
return {
"question": question,
"answer": result,
"agents_used": [a.role for a in agents]
}
def _save_reasoning_to_graph(self, question: str, answer: str, domain: str):
"""推論結果をグラフに保存"""
with self.driver.session() as session:
session.run("""
MERGE (q:Question {text: $question})
MERGE (a:Answer {text: $answer})
MERGE (d:Domain {name: $domain})
MERGE (q)-[:HAS_ANSWER]->(a)
MERGE (q)-[:IN_DOMAIN]->(d)
SET q.timestamp = datetime()
""", question=question, answer=answer, domain=domain)
4. FastAPI統合
# app/routes/multi_agent_routes.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.agents.multi_agent_system import MultiAgentSystem
from app.agents.cognitive_memory_agent import CognitiveMemoryAgent
from app.agents.intelligent_knowledge_agent import IntelligentKnowledgeAgent
router = APIRouter(prefix="/api/agents", tags=["Multi-Agent"])
class TaskRequest(BaseModel):
description: str
domain: str = "general"
class ResearchRequest(BaseModel):
topics: list[str]
def get_multi_agent():
return MultiAgentSystem()
def get_cognitive_memory():
return CognitiveMemoryAgent()
def get_knowledge_agent():
return IntelligentKnowledgeAgent()
@router.post("/collaborative-task")
async def execute_task(
request: TaskRequest,
system: MultiAgentSystem = Depends(get_multi_agent)
):
"""協調タスク実行"""
try:
result = system.execute_collaborative_task(request.description)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/parallel-research")
async def parallel_research(
request: ResearchRequest,
system: MultiAgentSystem = Depends(get_multi_agent)
):
"""並列リサーチ"""
try:
result = system.parallel_research(request.topics)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/cognitive-query")
async def cognitive_query(
request: TaskRequest,
agent: CognitiveMemoryAgent = Depends(get_cognitive_memory)
):
"""認知メモリ付き検索"""
try:
result = agent.process_with_memory(request.description)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/memory-stats")
async def memory_stats(
agent: CognitiveMemoryAgent = Depends(get_cognitive_memory)
):
"""メモリ統計取得"""
return agent.get_memory_stats()
@router.post("/graph-reasoning")
async def graph_reasoning(
request: TaskRequest,
agent: IntelligentKnowledgeAgent = Depends(get_knowledge_agent)
):
"""グラフベース推論"""
try:
result = agent.graph_collaborative_reasoning(
request.description,
request.domain
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
利用シーン
| シーン |
使用エージェント |
効果 |
| 技術設計レビュー |
Tech + Business + Security |
多角的評価 |
| 市場調査 |
並列リサーチャー |
効率的情報収集 |
| 意思決定支援 |
Meta Observer |
統合的判断 |
| 学習システム |
Cognitive Memory |
継続的改善 |
| ドメイン特化分析 |
Knowledge Agent |
専門知識活用 |
ベストプラクティス
- 役割の明確化: 各エージェントの専門性を明確に定義
- 適切な粒度: タスクを適切な大きさに分割
- メモリ活用: 過去の学習を効果的に利用
- 評価ループ: Meta Evaluatorで品質を継続改善
- グラフ連携: Neo4jで知識を構造化して共有
パフォーマンス最適化
並列実行の最大化
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.parallel, # 並列実行
max_rpm=10 # RPM制限
)
キャッシング
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_agent_response(task_hash: str):
"""エージェント応答のキャッシュ"""
pass
トラブルシューティング
問題1: エージェント間の矛盾
# Meta Evaluatorで矛盾を解消
def resolve_conflicts(responses: list) -> str:
"""矛盾する意見の調停"""
pass
問題2: レスポンスタイムアウト
# タイムアウト設定
crew = Crew(
agents=agents,
tasks=tasks,
timeout=300 # 5分
)
参考リソース