跳至主要內容
技術

Multi-Agent 系統:Orchestrator 與 Subagent 設計模式

Multi-Agent 系統:Orchestrator 與 Subagent 設計模式
Claude API & Agent SDK 完全指南 第 11 / 15 篇

本篇是「Claude API & Agent SDK 完全指南」系列的第 11 / 15 篇。你可以從系列總覽開始閱讀,也可以直接接著看本文。

我第一次嘗試用單一 agent 做市場分析報告,是個讓我難忘的失敗經驗。

那個 agent 需要做的事情包括:搜尋競爭對手資訊、爬取相關新聞、分析財務數據、查詢用戶評論、整合所有資料,最後寫成一份 20 頁的報告。我把所有工具都給了它,寫了一個超詳細的 system prompt,然後讓它跑。

結果呢?

大約在搜尋了 15 個競爭對手、累積了大量搜尋結果之後,agent 開始「迷路」了。它忘記自己在做什麼,開始重複搜尋已經搜尋過的東西,最後生成了一份混亂、重複、前後矛盾的報告。Context window 被塞滿了大量原始搜尋結果,根本沒有空間做真正的分析思考。

這就是單一 agent 的天花板。

先講一件容易被誤導的事:Anthropic 沒有 handoff() 這種「agent 互相交接」的原語——那是 OpenAI Agents SDK 的形狀。在 Claude 這邊,multi-agent 的真相樸素很多:你在自己的程式碼裡當 orchestrator,依判斷呼叫各個 subagent。每個 subagent 不是什麼神祕物件,就是一個帶專屬 system prompt +(可選)一組工具的 Claude 呼叫或 agentic loop。沒有框架幫你「自動交接」,routing 邏輯就是你寫的 if、或是你讓 Claude 回傳「該派誰」的一次呼叫。本章所有程式碼都只用官方 anthropic SDK(pip install anthropic),照抄就能跑。

為什麼單一 Agent 不夠用

要理解 multi-agent 的必要性,先要理解單一 agent 的三個根本限制:

第一:Context Window 是有限資源。 Claude 的 context window 很大(claude-opus-4-8 有 1M tokens),但真實任務消耗 context 的速度驚人。搜尋結果、文件內容、對話歷史……每一樣都在佔用空間。當 context 接近上限,模型的推理品質會下降——它開始「忘記」前面說過的事情,做出不一致的決策。

第二:無法平行化。 一個 agent 是線性執行的——做完 A 才能做 B。但很多任務本質上可以平行:分析五個競爭對手的時候,為什麼不讓五個 subagent 同時去分析?

第三:難以深度專業化。 一個 agent 要同時會搜尋、分析、寫作,必然每樣都只能做到 generalist 水準。但如果一個 subagent 專注做搜尋、另一個專注做分析,每個的提示可以針對該任務深度優化。

Multi-agent 系統就是解這三個問題的架構。

Orchestrator-Worker 模式

這是 multi-agent 系統最常見也最實用的模式。

結構很簡單:一個 Orchestrator 負責規劃和協調,多個 Worker(subagent)負責執行具體任務

Orchestrator 做什麼?

  • 接收最初的任務目標
  • 拆解任務、決定執行順序
  • 分配子任務給各個 Worker
  • 收集 Worker 的結果
  • 整合最終輸出

Worker 做什麼?

  • 接收一個明確、具體的子任務
  • 用自己的工具和能力完成它
  • 把結果回傳給 Orchestrator
  • 不需要知道更大的任務目標是什麼

我認為這個分工的關鍵是:Worker 應該是「愚蠢」的——它只管把分配到的任務做好,不需要理解全局。這樣設計讓每個 Worker 的 context window 保持乾淨,只裝著跟當前子任務相關的資訊。

這裡要把抽象落地成程式碼:在 Claude 的世界裡,「一個 agent」具體就是「一個帶專屬 system prompt(+可選工具)的 Claude 呼叫」。Orchestrator 是你的 Python 程式本身——它持有判斷邏輯,決定何時呼叫哪個 subagent 函式。沒有任何框架在背後偷偷幫你做這件事,全部都是你看得到的程式碼。

在程式碼裡做 routing:orchestrator 把任務交給 subagent

先把每個 subagent 寫成一個普通的 Python 函式。每個函式內部就是一次 client.messages.create(),帶自己的 system prompt:

import anthropic

client = anthropic.Anthropic()  # 從 ANTHROPIC_API_KEY 讀金鑰

def search_subagent(query: str) -> str:
    """搜尋 subagent:只管整理事實,不做分析。"""
    resp = client.messages.create(
        model="claude-opus-4-8",
        max_tokens=2000,
        system=(
            "你是一個專業的網路搜尋整理 agent。\n"
            "任務:根據給定的查詢詞,整理相關事實。\n"
            "輸出格式:\n"
            "- 列出找到的關鍵事實,每條一行\n"
            "- 注明資訊來源\n"
            "- 不要做分析,只整理事實\n"
            "- 字數控制在 500 字以內"
        ),
        messages=[{"role": "user", "content": query}],
    )
    return next(b.text for b in resp.content if b.type == "text")

def analysis_subagent(raw_data: str) -> str:
    """分析 subagent:根據原始資料做深度分析。"""
    resp = client.messages.create(
        model="claude-opus-4-8",
        max_tokens=4000,
        system=(
            "你是一個商業分析 agent。\n"
            "任務:根據提供的原始資料,進行深度分析。\n"
            "分析框架:市場規模和趨勢 / 主要競爭者優劣勢 / 機會與威脅 / 建議行動方向"
        ),
        messages=[{"role": "user", "content": raw_data}],
    )
    return next(b.text for b in resp.content if b.type == "text")

注意這裡沒有 Agent(...) 類別、沒有 Runner、更沒有 handoff()「agent」只是個帶 system prompt 的函式,「交接」就是 orchestrator 呼叫下一個函式並把上一個的輸出傳進去。

那 orchestrator 怎麼決定要派給誰?最簡單的版本,當任務流程固定時,直接用 Python 控制流程:

def orchestrator(research_question: str) -> str:
    """Orchestrator:規劃 → 搜尋 → 分析 → 撰寫。流程由程式碼掌控。"""
    # 步驟 1:先用一次 Claude 呼叫把問題拆成要搜尋的主題
    plan_resp = client.messages.create(
        model="claude-opus-4-8",
        max_tokens=1000,
        system="你是研究規劃 agent。把使用者的研究問題拆成 3-5 個具體的搜尋查詢詞,每行一個,不要多餘文字。",
        messages=[{"role": "user", "content": research_question}],
    )
    plan_text = next(b.text for b in plan_resp.content if b.type == "text")
    queries = [line.strip() for line in plan_text.splitlines() if line.strip()]

    # 步驟 2:對每個主題呼叫 search subagent
    search_results = [search_subagent(q) for q in queries]
    combined = "\n\n".join(search_results)

    # 步驟 3:把彙整後的資料交給 analysis subagent
    return analysis_subagent(combined)

這就是「routing」的本質:orchestrator 是你寫的程式,它根據判斷呼叫對應的 subagent 函式。上一個 subagent 的輸出(搜尋結果)變成下一個 subagent 的輸入(分析資料)——這就是 Claude 世界裡的「交接」,沒有任何魔法。

Routing 判斷怎麼設計

上面的範例流程是寫死的(固定先搜尋、再分析)。但很多時候 orchestrator 需要動態決定要派給哪個 subagent——例如客服系統收到一句話,要判斷該交給「退款 agent」還是「技術支援 agent」。

這種動態 routing 有兩種真實做法。

做法一:規則 / 關鍵字(最便宜、最可預測)

def route_by_keyword(user_message: str) -> str:
    text = user_message.lower()
    if any(k in text for k in ["退款", "退費", "refund"]):
        return "refund"
    if any(k in text for k in ["當機", "錯誤", "bug", "壞掉"]):
        return "tech_support"
    return "general"

能用規則就用規則。它零成本、零延遲、可單元測試。但語意一複雜(「我用不了所以想退錢」既是技術又是退款)規則就會失準。

做法二:用 structured output 讓 Claude 回傳「該派誰」

讓一次 Claude 呼叫只做分類這一件事,並用 messages.parse() 把回傳約束成你定義的 schema,拿到已驗證的物件:

from pydantic import BaseModel
from typing import Literal

class RouteDecision(BaseModel):
    target: Literal["refund", "tech_support", "general"]
    reason: str

def route_with_claude(user_message: str) -> RouteDecision:
    resp = client.messages.parse(
        model="claude-opus-4-8",
        max_tokens=512,
        system=(
            "你是客服路由 agent。判斷使用者訊息該交給哪個專責 agent:"
            "refund(退款相關)、tech_support(技術問題)、general(其他)。"
        ),
        messages=[{"role": "user", "content": user_message}],
        output_format=RouteDecision,
    )
    return resp.parsed_output  # 已驗證的 RouteDecision

# orchestrator 拿到決策後,呼叫對應的 subagent 函式
def dispatch(user_message: str) -> str:
    decision = route_with_claude(user_message)
    handlers = {
        "refund": handle_refund,
        "tech_support": handle_tech_support,
        "general": handle_general,
    }
    return handlers[decision.target](user_message)

messages.parse() 會把回應約束成 RouteDecision 並驗證,所以 decision.target 一定是那三個合法值之一——你可以放心拿它當 dict 的 key 去查表,不必擔心模型回了個你沒處理的字串。

我的經驗是:routing 決策越明確越好。不要寫一個模糊的 system prompt 叫 Claude「自己看著辦」,而是把可選項目(refund / tech_support / general)和判準清楚列出來,用 structured output 鎖死回傳格式。模糊的指令讓模型有太多解釋空間,容易產生不預期的行為——這點跟設計工具的 tool_choice 是同樣的道理。

平行 Agent 執行

順序執行已經很有用了,但真正的威力來自平行執行

如果我要分析五個競爭對手,沒有理由讓搜尋 subagent 一個一個來——讓五個搜尋呼叫同時跑,總時間從 5x 變成 1x。

關鍵是用 AsyncAnthropicasyncio.gather。每個 subagent 是一個 async 函式,gather 讓它們並行:

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def research_competitor(competitor_name: str) -> dict:
    """對單一競爭對手做一次 subagent 呼叫。"""
    resp = await client.messages.create(
        model="claude-opus-4-8",
        max_tokens=2000,
        system=(
            f"你專門研究 {competitor_name} 這家公司。收集以下資訊:\n"
            "- 公司規模和市場定位\n"
            "- 主要產品和定價\n"
            "- 近期動態(過去 6 個月)\n"
            "- 用戶評價\n"
            "輸出結構化的 Markdown。"
        ),
        messages=[{"role": "user", "content": f"研究 {competitor_name} 的詳細資訊"}],
    )
    text = next(b.text for b in resp.content if b.type == "text")
    return {"competitor": competitor_name, "data": text}

async def parallel_market_research(competitors: list[str]) -> list[dict]:
    """平行研究所有競爭對手——同時發起所有呼叫。"""
    tasks = [research_competitor(c) for c in competitors]
    return await asyncio.gather(*tasks)

async def main():
    competitors = ["Notion", "Obsidian", "Roam Research", "Logseq", "Capacities"]
    print(f"開始平行研究 {len(competitors)} 個競爭對手...")
    research_data = await parallel_market_research(competitors)

    # orchestrator 把所有結果彙整,再丟一次 Claude 做市場分析
    combined = "\n\n".join(f"## {r['competitor']}\n{r['data']}" for r in research_data)
    analysis = await client.messages.create(
        model="claude-opus-4-8",
        max_tokens=4000,
        system="你是市場分析 agent。根據競爭對手資料,撰寫市場競爭分析報告。",
        messages=[{"role": "user", "content": combined}],
    )
    print(next(b.text for b in analysis.content if b.type == "text"))

asyncio.run(main())

這個範例中,五個搜尋呼叫同時送出,每個都有自己獨立的 context(互不污染)。整個研究時間從原本的線性累加,變成等於最慢那個呼叫的回應時間。

一個實務提醒:平行度不要無限放大。同時送出幾十個請求會撞到 rate limit(anthropic SDK 預設會自動重試 429,但重試本身也要時間)。常見做法是用 asyncio.Semaphore 把同時在跑的呼叫數壓在合理範圍:

sem = asyncio.Semaphore(5)  # 最多 5 個同時在跑

async def research_competitor(competitor_name: str) -> dict:
    async with sem:
        resp = await client.messages.create(...)
        ...

Agent 間的通訊設計:共用狀態 vs 把輸出當輸入

當多個 subagent 需要協作,它們之間的資訊傳遞方式直接影響系統的可靠性。在 Claude 的世界裡,這純粹是程式碼層面的資料傳遞,不是什麼框架機制——你有兩種風格可選。

**把上一個的輸出當下一個的輸入(直接傳遞)**是最直接的方式:orchestrator 拿到 subagent A 的回傳值(一段文字),直接當作 subagent B 的 messages 內容傳進去。前面 orchestrator()search_results 串接後丟給 analysis_subagent 就是這種。

優點:清晰、可追蹤、容易除錯——資料流就是你的函式呼叫鏈。 缺點:如果 subagent A 的輸出很大,整段塞進 subagent B 的 prompt 會佔用大量 context(和成本)。

**共用狀態(shared dict / 外部儲存)**則是讓各個 subagent 把結果寫進一個共用的資料結構,orchestrator 再從裡面挑需要的給下一個 subagent。小規模時就是一個 Python dict;跨程序或要持久化時才升級成 Redis、資料庫或檔案。

import json
import redis  # 跨程序 / 要持久化時才需要

# --- 小規模:一個共用 dict 就夠 ---
shared_state: dict[str, str] = {}

def search_into_state(topic: str) -> None:
    """subagent 把結果寫進共用狀態,而不是直接回傳一大包。"""
    shared_state[topic] = search_subagent(f"研究 {topic}")

# orchestrator 之後只挑需要的 key 餵給下一個 subagent
def summarize_topic(topic: str) -> str:
    raw = shared_state.get(topic, "")
    return analysis_subagent(raw)


# --- 跨程序 / 要持久化:外部儲存 ---
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

def save_research_result(key: str, data: dict) -> None:
    redis_client.setex(f"research:{key}", 3600, json.dumps(data, ensure_ascii=False))

def load_research_result(key: str) -> dict:
    raw = redis_client.get(f"research:{key}")
    return json.loads(raw) if raw else {}

我的建議是:對於小資料(< 1000 tokens)用直接傳遞;對於大資料(文件、長報告)用共用狀態存原文,只把摘要在函式之間傳遞。混合使用效果最好:subagent 的摘要直接回傳給 orchestrator,完整的原始資料存在外部,需要時才用 key 撈回來。

防止 Agent 失控

這是我認為 multi-agent 系統設計中最被忽視的部分。因為 orchestrator 是你寫的程式,所有的安全閥也都是你自己加的——沒有框架會替你擋。

Agent 會失控的情況:

  • 無限迴圈(某個 subagent 內部跑 agentic loop,一直呼叫工具卻不收斂)
  • 超出預算(持續呼叫昂貴的工具或反覆送大 prompt)
  • 發散行為(subagent 偏離原始目標,做了大量不相關的事)

每個 subagent 的 max_iterations / token 上限 / 逾時是第一道防線。如果某個 subagent 內部是手寫的 agentic loop(要用工具的那種),一定要自己加迴圈計數器跳出;單次呼叫則用 max_tokens 封頂、用 SDK 的 timeout 設逾時:

import anthropic

# 單次呼叫的逾時可以掛在 client 上,或單次 with_options 覆寫
client = anthropic.Anthropic(timeout=60.0)  # 秒

def bounded_tool_subagent(user_input: str, tools, max_iterations: int = 8) -> str:
    """手寫 agentic loop 的 subagent,自己加上 max_iterations 安全閥。"""
    messages = [{"role": "user", "content": user_input}]

    for _ in range(max_iterations):
        response = client.messages.create(
            model="claude-opus-4-8",
            max_tokens=4000,          # 單次回應 token 上限
            tools=tools,
            messages=messages,
        )
        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        # Claude 要呼叫工具:執行後回灌結果,繼續迴圈
        tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
        messages.append({"role": "assistant", "content": response.content})
        tool_results = []
        for tool in tool_use_blocks:
            result = execute_tool(tool.name, tool.input)  # 你的實作
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tool.id,
                "content": result,
            })
        messages.append({"role": "user", "content": tool_results})

    # 撞到 max_iterations 還沒結束:明確中止,不要無限跑下去
    return "ERROR: subagent 達到最大迭代次數,未能收斂"

小提醒:手寫 loop 一定要看 stop_reasonend_turn 代表 Claude 講完了;tool_use 代表它要呼叫工具(你要回灌結果再續);max_tokens 代表被 max_tokens 截斷了。max_iterations你的 loop 計數器,不是 API 欄位——這正是手寫 loop 防無限循環的正解。

Orchestrator 的總預算與步數上限是第二道防線。orchestrator 自己要記帳:總共派了幾個 subagent、累積花了多少 token,超過上限就停。我自己的做法是包一個簡單的 tracker:

class BudgetTracker:
    def __init__(self, max_tokens: int = 200_000, max_steps: int = 30):
        self.max_tokens = max_tokens
        self.max_steps = max_steps
        self.used_tokens = 0
        self.steps = 0

    def charge(self, response) -> None:
        """每次 subagent 呼叫後,把實際用量記進去。"""
        self.steps += 1
        self.used_tokens += response.usage.input_tokens + response.usage.output_tokens
        if self.steps > self.max_steps:
            raise RuntimeError(f"orchestrator 超過步數上限 {self.max_steps}")
        if self.used_tokens > self.max_tokens:
            raise RuntimeError(f"orchestrator 超過 token 預算 {self.max_tokens}")

每個 response 物件都帶 usage.input_tokensusage.output_tokens,所以你拿到的是實際用量,不是估算。orchestrator 在每次呼叫 subagent 之後 budget.charge(response),超標就丟例外中止整個流程。

明確的終止條件是第三道防線。每個 subagent 的 system prompt 都應該明確定義什麼時候算「完成」:

SEARCH_SYSTEM = """你的任務是搜尋並整理某主題的相關資訊。

完成條件(達到任一條件即完成,立即回傳結果):
- 已整理出 5 個以上相關事實
- 已涵蓋主題的主要面向
- 已累積約 500 字的資料

不要持續追求「完美」的資料。整理到足夠就停下來回傳。
"""

Tracing 和可觀測性

Multi-agent 系統最難除錯的地方,就是你不知道哪個 subagent 在做什麼、為什麼做這個決定。Anthropic 沒有 enable_tracing 那種一行開啟的 tracing API——但要做到可觀測,其實有幾個樸實又夠用的真實手段。

第一:用環境變數打開 SDK 的 debug log。ANTHROPIC_LOG=debug,SDK 就會把每個 HTTP 請求/回應印出來,你能看到實際送出去的 body 和收到的內容:

ANTHROPIC_LOG=debug python market_research.py

第二:記錄每個呼叫的 response._request_id 每個 response 都帶一個 request id,回報問題給 Anthropic 支援時用得上,自己對帳哪個 subagent 的哪次呼叫出事也靠它:

import logging

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("orchestrator")

def traced_subagent(name: str, system: str, user: str) -> str:
    resp = client.messages.create(
        model="claude-opus-4-8",
        max_tokens=2000,
        system=system,
        messages=[{"role": "user", "content": user}],
    )
    log.info(
        "[%s] request_id=%s stop_reason=%s in=%d out=%d",
        name,
        resp._request_id,
        resp.stop_reason,
        resp.usage.input_tokens,
        resp.usage.output_tokens,
    )
    return next(b.text for b in resp.content if b.type == "text")

第三:自己存結構化 log。 我在生產環境的做法是,在 orchestrator 每次呼叫 subagent 的前後各記一筆——subagent 名稱、輸入摘要、request_idstop_reason、token 用量、耗時——寫進 JSON log 或送進你既有的觀測系統(OpenTelemetry、Grafana 之類)。因為 orchestrator 是你寫的程式,這些埋點全在你掌控之內,不需要任何特殊 API。這讓我能事後看到「哪個 subagent 慢、哪次呼叫吃掉最多 token、哪個決策走錯」,對優化 system prompt 非常有幫助。

完整範例:三個 subagent 協作產出市場分析報告

把前面所有概念整合起來,這是我實際在生產環境使用的市場研究系統。三個 subagent——市場規模 / 競品 / 風險,各有專屬 system prompt,用 asyncio.gather 平行跑,最後 orchestrator 把三份結果丟給一次 Claude 彙整成報告。整段只用真實 anthropic SDK,照抄就能跑:

import asyncio
import logging
from anthropic import AsyncAnthropic

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("market-research")

client = AsyncAnthropic(timeout=60.0)

MODEL = "claude-opus-4-8"
sem = asyncio.Semaphore(3)  # 同時最多 3 個 subagent 在跑

# =====================
# 三個 subagent:各有專屬 system prompt
# =====================

MARKET_SIZE_SYSTEM = """你是市場規模分析 subagent。
任務:估算並描述目標市場的規模與成長趨勢。
請涵蓋:TAM / SAM / SOM 概念性估算、近 3 年成長率、主要驅動因素。
輸出:結構化 Markdown,800-1000 字,只談市場規模相關,不要離題。
完成條件:涵蓋上述面向即停,不要追求完美數字。"""

COMPETITOR_SYSTEM = """你是競品分析 subagent。
任務:分析目標市場的主要競爭對手。
請涵蓋:3-5 個主要玩家、各自定位與優劣勢、產品差異化、市場份額概況。
分析框架:Porter's Five Forces 的精簡版。
輸出:結構化 Markdown,800-1000 字,只談競品,不要離題。"""

RISK_SYSTEM = """你是風險分析 subagent。
任務:盤點進入此市場的主要風險。
請涵蓋:法規 / 技術 / 市場 / 營運四類風險,各舉具體例子並標示嚴重度(高/中/低)。
輸出:結構化 Markdown,800-1000 字,只談風險,不要離題。"""

SYNTHESIS_SYSTEM = """你是市場研究 orchestrator 的彙整 agent。
任務:整合「市場規模」「競品」「風險」三份分析,產出一份執行摘要。
輸出(共約 600 字):
- 市場概況(2-3 句)
- 主要洞察(3-5 條,跨三份報告交叉得出)
- 風險提示(2-3 條)
- 建議行動(2-3 條)
不要照抄三份原文,要綜合、要有觀點。"""


async def run_subagent(name: str, system: str, user: str) -> str:
    """一個 subagent = 一次帶專屬 system prompt 的 Claude 呼叫。"""
    async with sem:
        resp = await client.messages.create(
            model=MODEL,
            max_tokens=4000,
            system=system,
            messages=[{"role": "user", "content": user}],
        )
    log.info(
        "[%s] request_id=%s stop_reason=%s in=%d out=%d",
        name, resp._request_id, resp.stop_reason,
        resp.usage.input_tokens, resp.usage.output_tokens,
    )
    if resp.stop_reason == "refusal":
        return f"[{name}] 安全拒答,已略過。"
    return next(b.text for b in resp.content if b.type == "text")


async def market_research(topic: str) -> str:
    """Orchestrator:平行派三個 subagent,再彙整成報告。"""
    # 步驟 1:三個分析 subagent 平行跑(互不依賴,所以可平行)
    market_size, competitors, risks = await asyncio.gather(
        run_subagent("market_size", MARKET_SIZE_SYSTEM, topic),
        run_subagent("competitors", COMPETITOR_SYSTEM, topic),
        run_subagent("risks", RISK_SYSTEM, topic),
    )

    # 步驟 2:orchestrator 把三份結果當輸入,丟一次 Claude 彙整
    combined = (
        f"# 研究主題\n{topic}\n\n"
        f"# 市場規模分析\n{market_size}\n\n"
        f"# 競品分析\n{competitors}\n\n"
        f"# 風險分析\n{risks}\n"
    )
    report = await run_subagent("synthesis", SYNTHESIS_SYSTEM, combined)
    return report


if __name__ == "__main__":
    question = (
        "我想進入台灣的「知識管理 SaaS」市場(Notion 競爭對手區間)。"
        "請分析市場規模、主要競爭對手定位,以及進入的主要風險。"
    )
    result = asyncio.run(market_research(question))
    print(result)

這個版本和我最初那次失敗的單一 agent 形成鮮明對比:每個 subagent 的 context 都只裝它自己那塊(市場規模 / 競品 / 風險),互不污染;三份分析平行跑,總時間約等於最慢那份;最後彙整是一次乾淨的呼叫,輸入是三份結構化摘要而不是一堆原始搜尋雜訊。orchestrator(market_research 函式)從頭到尾都是你看得懂、改得動、測得了的程式碼。

如果某個 subagent 內部需要用工具(例如真的去搜尋網路),就把那個 run_subagent 換成前面「防止失控」那段的手寫 agentic loop 版本(帶 max_iterations),其餘結構不變。

設計原則總結

經過多個 multi-agent 系統的開發和踩坑,我整理出幾條核心原則:

1. 每個 subagent 的職責要清晰到「用一句話說清楚」。 如果你需要用三句話才能解釋一個 subagent 做什麼,它的職責可能太模糊了——對應到程式碼,就是它的 system prompt 該再聚焦。

2. Routing 判斷要明確到「條件成立就派、不成立就不派」。 別讓 orchestrator「自己看著辦」。能用規則就用規則;需要語意判斷時,用 messages.parse() 的 structured output 把回傳鎖死成你定義的合法選項,再拿去查表呼叫對應 subagent。

3. Subagent 之間靠資料傳遞協作,沒有魔法交接。 上一個的輸出就是下一個的輸入(小資料直接傳),或寫進共用狀態(大資料存外部、只傳摘要)。「handoff」在 Claude 這邊就是你的一行函式呼叫。

4. 每個 subagent 都要有明確的「完成條件」+安全閥。 單次呼叫用 max_tokens 封頂、timeout 設逾時;內部跑 loop 的就自己加 max_iterations。orchestrator 層再加總預算(用 response.usage 記實際 token)和步數上限。沒有框架會替你擋,安全閥都是你自己加的。

5. 先做順序版,再優化成平行版。 平行執行(AsyncAnthropicasyncio.gather)更複雜、更難除錯,也更容易撞 rate limit(記得用 Semaphore 控制併發)。先確認順序版的邏輯正確,再改成平行。

6. 可觀測性靠你自己埋。 ANTHROPIC_LOG=debug 看原始請求、記每次呼叫的 response._request_idusage、把這些寫進你的結構化 log。Anthropic 沒有一鍵 tracing,但因為 orchestrator 是你的程式,埋點全在掌控之內。


下一章,我們來看 multi-agent 系統的另一面:如果你想讓自己的服務成為別人 agent 可以呼叫的工具,你需要開發自己的 MCP Server。

留言討論

esc
輸入關鍵字搜尋文章...
查看收藏 →