コンテンツにスキップ

マルチエージェントシステムガイド

概要

マルチエージェントシステムは、複数の専門AIエージェントが協調して複雑なタスクを解決する高度なAIアーキテクチャです。CrewAI、Intelligent Knowledge Agent、Cognitive Memoryを組み合わせることで、人間のチームのような協調的な問題解決を実現します。

システム構成

graph TD
    A[ユーザータスク] --> B[Coordinator Agent]
    B --> C[Tech Expert Agent]
    B --> D[Business Analyst Agent]
    B --> E[Security Advisor Agent]
    C --> F[Knowledge Graph]
    D --> F
    E --> F
    F --> G[Cognitive Memory]
    G --> H[Meta Evaluator]
    H --> I[統合された回答]
    I --> J[自己反省ループ]
    J --> G

コアコンポーネント

コンポーネント 技術 役割
エージェント管理 CrewAI / LangGraph エージェントの役割と連携制御
知識ベース Neo4j + pgvector 共有知識グラフ
メモリシステム Cognitive Memory 思考プロセスの記録と学習
評価システム Meta Evaluator 出力品質の評価と改善
オーケストレーター LangChain ワークフロー制御

実装

1. CrewAIベースのマルチエージェント

# app/agents/multi_agent_system.py
from crewai import Agent, Task, Crew, Process
from langchain.chat_models import ChatOpenAI
from app.infrastructure.langchain.graph_rag_orchestrator import GraphRAGOrchestrator
import os

class MultiAgentSystem:
    """マルチエージェント協調システム"""

    def __init__(self):
        self.llm = ChatOpenAI(
            model="gpt-4o",
            temperature=0.7,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        self.rag = GraphRAGOrchestrator()

        # エージェント定義
        self.agents = self._create_agents()

    def _create_agents(self) -> dict:
        """専門エージェントの作成"""

        # 技術エキスパート
        tech_expert = Agent(
            role="Technical Expert",
            goal="技術的な正確性と実装可能性を評価する",
            backstory="""
            あなたは15年の経験を持つシニアエンジニアです。
            アーキテクチャ、セキュリティ、パフォーマンスに精通しています。
            """,
            llm=self.llm,
            verbose=True,
            allow_delegation=True
        )

        # ビジネスアナリスト
        business_analyst = Agent(
            role="Business Analyst",
            goal="ビジネス価値とROIを評価する",
            backstory="""
            あなたはビジネス戦略の専門家です。
            コスト、市場価値、競争優位性の観点から提案を評価します。
            """,
            llm=self.llm,
            verbose=True,
            allow_delegation=True
        )

        # セキュリティアドバイザー
        security_advisor = Agent(
            role="Security Advisor",
            goal="セキュリティリスクと対策を特定する",
            backstory="""
            あなたはサイバーセキュリティの専門家です。
            脆弱性、コンプライアンス、データ保護の観点から評価します。
            """,
            llm=self.llm,
            verbose=True,
            allow_delegation=True
        )

        # メタオブザーバー(調整役)
        meta_observer = Agent(
            role="Meta Observer",
            goal="各エージェントの意見を統合し、最終判断を下す",
            backstory="""
            あなたはプロジェクトマネージャーです。
            技術、ビジネス、セキュリティの観点を総合的に判断します。
            """,
            llm=self.llm,
            verbose=True,
            allow_delegation=False
        )

        return {
            "tech": tech_expert,
            "business": business_analyst,
            "security": security_advisor,
            "meta": meta_observer
        }

    def execute_collaborative_task(self, task_description: str) -> dict:
        """協調タスクの実行"""

        # 1. 知識グラフから関連情報を取得
        context = self.rag.query_with_graph_context(task_description)

        # 2. 各エージェントにタスクを割り当て
        tech_task = Task(
            description=f"""
            以下の提案を技術的観点から評価してください:
            {task_description}

            参考情報:
            {context['answer']}
            """,
            agent=self.agents["tech"],
            expected_output="技術的評価レポート(実装可能性、リスク、推奨事項)"
        )

        business_task = Task(
            description=f"""
            以下の提案をビジネス観点から評価してください:
            {task_description}

            参考情報:
            {context['answer']}
            """,
            agent=self.agents["business"],
            expected_output="ビジネス評価レポート(ROI、市場価値、優先度)"
        )

        security_task = Task(
            description=f"""
            以下の提案をセキュリティ観点から評価してください:
            {task_description}

            参考情報:
            {context['answer']}
            """,
            agent=self.agents["security"],
            expected_output="セキュリティ評価レポート(リスク、対策、コンプライアンス)"
        )

        # 3. メタタスク(統合)
        meta_task = Task(
            description="""
            技術、ビジネス、セキュリティの各評価を統合し、
            最終的な推奨事項をまとめてください。
            """,
            agent=self.agents["meta"],
            expected_output="統合レポート(総合評価、推奨アクション、優先順位)",
            context=[tech_task, business_task, security_task]
        )

        # 4. Crewの作成と実行
        crew = Crew(
            agents=[
                self.agents["tech"],
                self.agents["business"],
                self.agents["security"],
                self.agents["meta"]
            ],
            tasks=[tech_task, business_task, security_task, meta_task],
            process=Process.sequential,  # 順次実行
            verbose=True
        )

        # 実行
        result = crew.kickoff()

        return {
            "task": task_description,
            "result": result,
            "tech_evaluation": tech_task.output,
            "business_evaluation": business_task.output,
            "security_evaluation": security_task.output,
            "final_recommendation": meta_task.output
        }

    def parallel_research(self, topics: list[str]) -> dict:
        """並列リサーチタスク"""

        research_agents = []
        research_tasks = []

        for i, topic in enumerate(topics):
            # 各トピックごとにリサーチャーを作成
            researcher = Agent(
                role=f"Researcher {i+1}",
                goal=f"{topic}について調査し、要約する",
                backstory="あなたは専門的なリサーチャーです。",
                llm=self.llm,
                verbose=False
            )

            task = Task(
                description=f"{topic}について調査し、重要なポイントをまとめてください。",
                agent=researcher,
                expected_output="調査レポート(要約、重要事項、参考情報)"
            )

            research_agents.append(researcher)
            research_tasks.append(task)

        # 統合タスク
        synthesizer = Agent(
            role="Research Synthesizer",
            goal="各リサーチ結果を統合する",
            backstory="あなたは情報統合の専門家です。",
            llm=self.llm,
            verbose=False
        )

        synthesis_task = Task(
            description="各リサーチ結果を統合し、全体像をまとめてください。",
            agent=synthesizer,
            expected_output="統合リサーチレポート",
            context=research_tasks
        )

        # 並列実行
        crew = Crew(
            agents=research_agents + [synthesizer],
            tasks=research_tasks + [synthesis_task],
            process=Process.parallel,  # 並列実行
            verbose=True
        )

        result = crew.kickoff()

        return {
            "topics": topics,
            "synthesis": result
        }

2. Cognitive Memory(認知メモリ)

# app/agents/cognitive_memory_agent.py
from datetime import datetime
import json
import os
from pathlib import Path

class CognitiveMemoryAgent:
    """AIの思考プロセスを記録・分析するメタ記憶システム"""

    def __init__(self, memory_path: str = "data/cognitive_memory.json"):
        self.memory_path = memory_path
        self.rag = GraphRAGOrchestrator()
        self.llm = OpenAIClient(api_key=os.getenv("OPENAI_API_KEY"))

    def _load_memory(self) -> list:
        """メモリ読み込み"""
        if not os.path.exists(self.memory_path):
            return []

        with open(self.memory_path, 'r', encoding='utf-8') as f:
            return json.load(f)

    def _save_memory(self, memory: list):
        """メモリ保存"""
        Path(self.memory_path).parent.mkdir(parents=True, exist_ok=True)

        with open(self.memory_path, 'w', encoding='utf-8') as f:
            json.dump(memory, f, ensure_ascii=False, indent=2)

    def process_with_memory(self, question: str) -> dict:
        """メモリを活用した思考プロセス"""

        # 1. 過去の類似質問を検索
        memory = self._load_memory()
        similar_memories = self._find_similar_memories(question, memory)

        # 2. RAG検索
        result = self.rag.query_with_graph_context(question)

        # 3. メタ評価(自己反省)
        meta_summary = self._meta_evaluate(question, result, similar_memories)

        # 4. メモリに記録
        memory_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "question": question,
            "answer": result["answer"],
            "meta_summary": meta_summary,
            "confidence": self._calculate_confidence(result),
            "similar_past_questions": [m["question"] for m in similar_memories]
        }

        memory.append(memory_entry)
        self._save_memory(memory[-100:])  # 最新100件のみ保持

        return {
            "answer": result["answer"],
            "meta_analysis": meta_summary,
            "learning_from_past": len(similar_memories) > 0
        }

    def _find_similar_memories(self, question: str, memory: list) -> list:
        """類似する過去の記憶を検索"""
        if not memory:
            return []

        # 簡易的な類似度計算(実際にはEmbeddingを使用)
        similar = []
        for mem in memory[-50:]:  # 直近50件から検索
            similarity = self._text_similarity(question, mem["question"])
            if similarity > 0.7:
                similar.append(mem)

        return similar[:3]  # 上位3件

    def _text_similarity(self, text1: str, text2: str) -> float:
        """テキスト類似度(簡易版)"""
        # 実際にはEmbeddingのコサイン類似度を使用
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())

        intersection = words1.intersection(words2)
        union = words1.union(words2)

        return len(intersection) / len(union) if union else 0.0

    def _meta_evaluate(
        self,
        question: str,
        result: dict,
        similar_memories: list
    ) -> str:
        """メタ評価(思考の反省)"""

        prompt = f"""
        以下の思考プロセスを分析し、改善点を提案してください:

        質問: {question}
        回答: {result['answer']}
        参照元: {len(result['sources'])}

        過去の類似質問: {len(similar_memories)}

        分析項目:
        1. 回答の妥当性
        2. 情報源の適切性
        3. 過去の学習の活用度
        4. 改善すべき点
        """

        return self.llm.ask(prompt)

    def _calculate_confidence(self, result: dict) -> float:
        """信頼度計算"""
        source_score = min(len(result["sources"]) / 5.0, 1.0)
        graph_score = min(len(result.get("graph_relations", [])) / 3.0, 1.0)

        return (source_score + graph_score) / 2.0

    def get_memory_stats(self) -> dict:
        """メモリ統計"""
        memory = self._load_memory()

        if not memory:
            return {"total": 0}

        avg_confidence = sum(m.get("confidence", 0) for m in memory) / len(memory)

        return {
            "total_memories": len(memory),
            "avg_confidence": avg_confidence,
            "date_range": {
                "first": memory[0]["timestamp"],
                "last": memory[-1]["timestamp"]
            }
        }

3. Intelligent Knowledge Agent

# app/agents/intelligent_knowledge_agent.py
from crewai import Agent, Task, Crew
from neo4j import GraphDatabase

class IntelligentKnowledgeAgent:
    """Neo4jグラフ上のノードをエージェント化"""

    def __init__(
        self,
        neo4j_uri: str = "bolt://localhost:7687",
        neo4j_user: str = "neo4j",
        neo4j_password: str = "password"
    ):
        self.driver = GraphDatabase.driver(
            neo4j_uri,
            auth=(neo4j_user, neo4j_password)
        )

        self.llm = ChatOpenAI(
            model="gpt-4o",
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

    def create_agents_from_graph(self, domain: str) -> list:
        """グラフから動的にエージェントを生成"""

        with self.driver.session() as session:
            # ドメインに関連するノードを取得
            result = session.run("""
                MATCH (n:Entity)-[r]->(m:Entity)
                WHERE n.domain = $domain OR m.domain = $domain
                RETURN DISTINCT n.name as name, n.expertise as expertise
                LIMIT 5
            """, domain=domain)

            agents = []
            for record in result:
                agent = Agent(
                    role=f"{record['name']} Expert",
                    goal=f"{record['expertise']}の観点から分析する",
                    backstory=f"あなたは{record['name']}の専門家です。",
                    llm=self.llm,
                    verbose=False
                )
                agents.append(agent)

            return agents

    def graph_collaborative_reasoning(self, question: str, domain: str) -> dict:
        """グラフベースの協調推論"""

        # 1. ドメインから動的にエージェントを生成
        agents = self.create_agents_from_graph(domain)

        if not agents:
            return {"error": "No agents found for domain"}

        # 2. 各エージェントにタスク割り当て
        tasks = []
        for agent in agents:
            task = Task(
                description=f"{question}\n\n{agent.role}の観点から回答してください。",
                agent=agent,
                expected_output="専門的な分析結果"
            )
            tasks.append(task)

        # 3. 統合エージェント
        coordinator = Agent(
            role="Knowledge Coordinator",
            goal="各専門家の意見を統合する",
            backstory="あなたは知識統合の専門家です。",
            llm=self.llm,
            verbose=True
        )

        coordination_task = Task(
            description="各専門家の意見を統合し、包括的な回答を作成してください。",
            agent=coordinator,
            expected_output="統合された回答",
            context=tasks
        )

        # 4. Crew実行
        crew = Crew(
            agents=agents + [coordinator],
            tasks=tasks + [coordination_task],
            process=Process.sequential,
            verbose=True
        )

        result = crew.kickoff()

        # 5. グラフに結果を記録
        self._save_reasoning_to_graph(question, result, domain)

        return {
            "question": question,
            "answer": result,
            "agents_used": [a.role for a in agents]
        }

    def _save_reasoning_to_graph(self, question: str, answer: str, domain: str):
        """推論結果をグラフに保存"""
        with self.driver.session() as session:
            session.run("""
                MERGE (q:Question {text: $question})
                MERGE (a:Answer {text: $answer})
                MERGE (d:Domain {name: $domain})
                MERGE (q)-[:HAS_ANSWER]->(a)
                MERGE (q)-[:IN_DOMAIN]->(d)
                SET q.timestamp = datetime()
            """, question=question, answer=answer, domain=domain)

4. FastAPI統合

# app/routes/multi_agent_routes.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.agents.multi_agent_system import MultiAgentSystem
from app.agents.cognitive_memory_agent import CognitiveMemoryAgent
from app.agents.intelligent_knowledge_agent import IntelligentKnowledgeAgent

router = APIRouter(prefix="/api/agents", tags=["Multi-Agent"])

class TaskRequest(BaseModel):
    description: str
    domain: str = "general"

class ResearchRequest(BaseModel):
    topics: list[str]

def get_multi_agent():
    return MultiAgentSystem()

def get_cognitive_memory():
    return CognitiveMemoryAgent()

def get_knowledge_agent():
    return IntelligentKnowledgeAgent()

@router.post("/collaborative-task")
async def execute_task(
    request: TaskRequest,
    system: MultiAgentSystem = Depends(get_multi_agent)
):
    """協調タスク実行"""
    try:
        result = system.execute_collaborative_task(request.description)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/parallel-research")
async def parallel_research(
    request: ResearchRequest,
    system: MultiAgentSystem = Depends(get_multi_agent)
):
    """並列リサーチ"""
    try:
        result = system.parallel_research(request.topics)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/cognitive-query")
async def cognitive_query(
    request: TaskRequest,
    agent: CognitiveMemoryAgent = Depends(get_cognitive_memory)
):
    """認知メモリ付き検索"""
    try:
        result = agent.process_with_memory(request.description)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.get("/memory-stats")
async def memory_stats(
    agent: CognitiveMemoryAgent = Depends(get_cognitive_memory)
):
    """メモリ統計取得"""
    return agent.get_memory_stats()

@router.post("/graph-reasoning")
async def graph_reasoning(
    request: TaskRequest,
    agent: IntelligentKnowledgeAgent = Depends(get_knowledge_agent)
):
    """グラフベース推論"""
    try:
        result = agent.graph_collaborative_reasoning(
            request.description,
            request.domain
        )
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

利用シーン

シーン 使用エージェント 効果
技術設計レビュー Tech + Business + Security 多角的評価
市場調査 並列リサーチャー 効率的情報収集
意思決定支援 Meta Observer 統合的判断
学習システム Cognitive Memory 継続的改善
ドメイン特化分析 Knowledge Agent 専門知識活用

ベストプラクティス

  1. 役割の明確化: 各エージェントの専門性を明確に定義
  2. 適切な粒度: タスクを適切な大きさに分割
  3. メモリ活用: 過去の学習を効果的に利用
  4. 評価ループ: Meta Evaluatorで品質を継続改善
  5. グラフ連携: Neo4jで知識を構造化して共有

パフォーマンス最適化

並列実行の最大化

crew = Crew(
    agents=agents,
    tasks=tasks,
    process=Process.parallel,  # 並列実行
    max_rpm=10  # RPM制限
)

キャッシング

from functools import lru_cache

@lru_cache(maxsize=100)
def cached_agent_response(task_hash: str):
    """エージェント応答のキャッシュ"""
    pass

トラブルシューティング

問題1: エージェント間の矛盾

# Meta Evaluatorで矛盾を解消
def resolve_conflicts(responses: list) -> str:
    """矛盾する意見の調停"""
    pass

問題2: レスポンスタイムアウト

# タイムアウト設定
crew = Crew(
    agents=agents,
    tasks=tasks,
    timeout=300  # 5分
)

参考リソース