Batch API:大量任務的高效非同步處理
本篇是「Claude API & Agent SDK 完全指南」系列的第 8 / 15 篇。你可以從系列總覽開始閱讀,也可以直接接著看本文。
前幾章我們用的都是「即時 API」——你發一個請求,馬上等到回應。這種模式適合互動式應用,但對某些場景來說是浪費:
你有 5,000 份客服對話需要分類。你有 10,000 筆產品描述需要翻譯。你有 2,000 份合約需要提取關鍵條款。
這些任務都是非即時的。你不需要每個回應即時到達,你只需要在某個時間點「全部完成」。
對這類場景,用普通 Messages API 一個一個送,不只慢(受 rate limit 限制),還貴。Batch API 是為這個痛點設計的。
Batch API 是什麼?
Batch API(正式名稱 Message Batches API)讓你一次提交大量請求,Claude 在後台非同步處理,最長 24 小時內完成。
與普通 Messages API 的主要差異:
| 維度 | Messages API(即時) | Batch API(非同步) |
|---|---|---|
| 回應方式 | 即時(同步或 streaming) | 非同步,輪詢狀態 |
| 完成時間 | 毫秒到秒 | 分鐘到 24 小時 |
| 價格 | 標準 | 50% 折扣 |
| Rate limit | 受即時 rate limit 限制 | 獨立的 batch quota |
| 適用場景 | 互動式應用 | 離線批次處理 |
50% 折扣是很吸引人的數字。但重點是:你換掉的是即時性。如果你的任務不需要即時回應,這筆交換非常划算。
適用 vs 不適用場景
在決定用 Batch API 之前,先確認你的場景是否合適。
非常適合:
- 大規模文字分類(客服對話、評論、文章)
- 批次翻譯(幾百到幾千份文件)
- 資料集標注(AI 訓練資料生成)
- 報告自動生成(定期跑批次)
- SEO 內容生成(批次生成商品描述)
- 文件資料提取(合約、發票、報告)
不適合:
- 用戶等待即時回應的互動功能
- 需要根據 Claude 回應動態調整下一步的 agentic 任務
- 任何對時間敏感的任務
我在生產環境的規則是:如果用戶不在等你,就考慮 Batch API。
API 使用流程
Batch API 的使用分三步:
1. create_batch() ← 提交批次任務
↓
2. poll_status() ← 輪詢直到完成
↓
3. get_results() ← 下載並處理結果
Step 1:建立 Batch(Create Batch)
import anthropic
client = anthropic.Anthropic()
# 準備你的任務列表
requests = [
{
"custom_id": "review-001", # 你自定義的 ID,用來對應結果
"params": {
"model": "claude-haiku-4-5", # Batch API 通常用 Haiku 降成本
"max_tokens": 256,
"messages": [
{
"role": "user",
"content": "請將以下評論分類為「正面」、「負面」或「中性」,只回答一個詞:\n\n「這款產品品質很好,但運送太慢了。」"
}
]
}
},
{
"custom_id": "review-002",
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 256,
"messages": [
{
"role": "user",
"content": "請將以下評論分類為「正面」、「負面」或「中性」,只回答一個詞:\n\n「完全不值這個價格,已申請退款。」"
}
]
}
},
# ... 最多 10,000 個請求
]
# 建立批次
batch = client.messages.batches.create(requests=requests)
print(f"Batch ID: {batch.id}")
print(f"狀態: {batch.processing_status}") # 初始為 "in_progress"
print(f"請求數量: {batch.request_counts.processing}")
custom_id 非常重要。Batch API 不保證回應的順序。你只能透過 custom_id 來對應你的請求和 Claude 的回應。建議用你資料庫的主鍵或其他唯一標識符。
Step 2:輪詢狀態(Poll Status)
import time
def wait_for_batch(client: anthropic.Anthropic, batch_id: str, poll_interval: int = 60) -> anthropic.types.MessageBatch:
"""輪詢直到 batch 完成,每隔 poll_interval 秒檢查一次"""
while True:
batch = client.messages.batches.retrieve(batch_id)
print(f"[{time.strftime('%H:%M:%S')}] 狀態: {batch.processing_status} | "
f"處理中: {batch.request_counts.processing} | "
f"成功: {batch.request_counts.succeeded} | "
f"失敗: {batch.request_counts.errored}")
if batch.processing_status == "ended":
return batch
time.sleep(poll_interval)
# 等待完成(通常幾分鐘到幾十分鐘)
completed_batch = wait_for_batch(client, batch.id)
print(f"\nBatch 完成!")
print(f"成功: {completed_batch.request_counts.succeeded}")
print(f"失敗: {completed_batch.request_counts.errored}")
print(f"過期: {completed_batch.request_counts.expired}")
processing_status 的可能值:
in_progress:處理中ended:全部完成(不管成功或失敗)
request_counts 的欄位:
processing:還在處理的請求數succeeded:成功的請求數errored:失敗的請求數canceled:被取消的請求數expired:超時未處理的請求數
Step 3:讀取結果(Retrieve Results)
def process_batch_results(client: anthropic.Anthropic, batch_id: str) -> dict[str, str | None]:
"""讀取並處理 batch 結果,返回 {custom_id: 回應文字} 的映射"""
results = {}
for result in client.messages.batches.results(batch_id):
custom_id = result.custom_id
if result.result.type == "succeeded":
# 成功:提取回應文字
message = result.result.message
results[custom_id] = message.content[0].text
elif result.result.type == "errored":
# 失敗:記錄錯誤
error = result.result.error
print(f"[ERROR] {custom_id}: {error.type} - {error.message}")
results[custom_id] = None
elif result.result.type == "expired":
# 超時:這個請求沒有被處理
print(f"[EXPIRED] {custom_id}: 請求超時未處理")
results[custom_id] = None
return results
# 處理結果
results = process_batch_results(client, batch.id)
for custom_id, classification in results.items():
if classification:
print(f"{custom_id}: {classification}")
else:
print(f"{custom_id}: 處理失敗")
JSONL 格式(進階)
除了用 SDK 直接建立請求陣列,Batch API 也支援 JSONL(JSON Lines)格式。每一行是一個 JSON 物件:
{"custom_id": "review-001", "params": {"model": "claude-haiku-4-5", "max_tokens": 256, "messages": [{"role": "user", "content": "分類這則評論:好用"}]}}
{"custom_id": "review-002", "params": {"model": "claude-haiku-4-5", "max_tokens": 256, "messages": [{"role": "user", "content": "分類這則評論:很差"}]}}
JSONL 格式適合:
- 事先把任務批次寫入檔案
- 用其他工具(例如 Python pandas)生成大量任務
- 跨不同服務傳遞任務佇列
Python 完整範例:批次評論分類系統
這是一個真實可用的批次評論分類系統,包含資料準備、提交、輪詢和結果處理的完整流程:
import anthropic
import time
import json
from pathlib import Path
def classify_reviews_batch(reviews: list[dict]) -> dict[str, str]:
"""
批次分類評論
reviews: [{"id": "001", "text": "評論內容"}, ...]
returns: {"001": "正面", "002": "負面", ...}
"""
client = anthropic.Anthropic()
# --- Step 1: 建立請求列表 ---
classification_prompt = """請將以下評論分類。只能回答「正面」、「負面」或「中性」三者之一,不要任何其他說明。
評論:{review}"""
requests = [
{
"custom_id": review["id"],
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 16, # 分類任務只需要很少 tokens
"messages": [
{
"role": "user",
"content": classification_prompt.format(review=review["text"])
}
]
}
}
for review in reviews
]
print(f"提交 {len(requests)} 個評論分類請求...")
# --- Step 2: 建立 batch ---
batch = client.messages.batches.create(requests=requests)
batch_id = batch.id
print(f"Batch ID: {batch_id}")
# 儲存 batch ID(以防程式中途崩潰,可以恢復)
Path("batch_id.txt").write_text(batch_id)
# --- Step 3: 輪詢狀態 ---
print("等待 batch 完成(每 30 秒檢查一次)...")
start_time = time.time()
while True:
batch_status = client.messages.batches.retrieve(batch_id)
elapsed = int(time.time() - start_time)
counts = batch_status.request_counts
print(f"[{elapsed}s] 處理中: {counts.processing} | 成功: {counts.succeeded} | 失敗: {counts.errored}")
if batch_status.processing_status == "ended":
break
time.sleep(30)
# --- Step 4: 處理結果 ---
results = {}
failed_ids = []
for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
text = result.result.message.content[0].text.strip()
# 標準化輸出(以防 Claude 回答了「正面。」或「這是正面」等格式)
if "正面" in text:
results[result.custom_id] = "正面"
elif "負面" in text:
results[result.custom_id] = "負面"
else:
results[result.custom_id] = "中性"
else:
failed_ids.append(result.custom_id)
results[result.custom_id] = "未知"
if failed_ids:
print(f"\n警告:{len(failed_ids)} 個請求失敗: {failed_ids[:5]}...")
total_time = int(time.time() - start_time)
print(f"\n完成!耗時 {total_time} 秒,成功 {len(requests) - len(failed_ids)}/{len(requests)}")
return results
if __name__ == "__main__":
# 測試資料
sample_reviews = [
{"id": "review-001", "text": "產品品質很好,已經回購三次了!"},
{"id": "review-002", "text": "送達時包裝破損,產品也壞了,非常失望。"},
{"id": "review-003", "text": "還可以,跟描述差不多,沒有特別驚喜。"},
{"id": "review-004", "text": "客服態度超好,解決問題很快速!"},
{"id": "review-005", "text": "等了兩週才到,但產品本身質量不錯。"},
]
classifications = classify_reviews_batch(sample_reviews)
print("\n分類結果:")
for review_id, classification in classifications.items():
print(f" {review_id}: {classification}")
TypeScript 完整範例
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
interface Review {
id: string;
text: string;
}
async function classifyReviewsBatch(
reviews: Review[]
): Promise<Record<string, string>> {
// Step 1: 建立請求
const requests: Anthropic.Messages.MessageCreateParamsNonStreaming[] =
reviews.map((review) => ({
custom_id: review.id,
params: {
model: "claude-haiku-4-5",
max_tokens: 16,
messages: [
{
role: "user" as const,
content: `請將以下評論分類。只能回答「正面」、「負面」或「中性」三者之一。\n\n評論:${review.text}`,
},
],
},
}));
console.log(`提交 ${requests.length} 個評論分類請求...`);
// Step 2: 建立 batch
const batch = await client.messages.batches.create({ requests });
console.log(`Batch ID: ${batch.id}`);
// Step 3: 輪詢狀態
let batchStatus = batch;
while (batchStatus.processing_status !== "ended") {
await new Promise((resolve) => setTimeout(resolve, 30_000)); // 等 30 秒
batchStatus = await client.messages.batches.retrieve(batch.id);
const counts = batchStatus.request_counts;
console.log(
`處理中: ${counts.processing} | 成功: ${counts.succeeded} | 失敗: ${counts.errored}`
);
}
// Step 4: 處理結果
const results: Record<string, string> = {};
const failedIds: string[] = [];
for await (const result of await client.messages.batches.results(batch.id)) {
if (result.result.type === "succeeded") {
const text = (
result.result.message.content[0] as Anthropic.Messages.TextBlock
).text.trim();
if (text.includes("正面")) {
results[result.custom_id] = "正面";
} else if (text.includes("負面")) {
results[result.custom_id] = "負面";
} else {
results[result.custom_id] = "中性";
}
} else {
failedIds.push(result.custom_id);
results[result.custom_id] = "未知";
}
}
if (failedIds.length > 0) {
console.warn(`${failedIds.length} 個請求失敗`);
}
return results;
}
// 使用範例
(async () => {
const reviews: Review[] = [
{ id: "r001", text: "產品品質很好!" },
{ id: "r002", text: "送達時包裝破損。" },
{ id: "r003", text: "還可以,普通。" },
];
const results = await classifyReviewsBatch(reviews);
console.log("分類結果:", results);
})();
錯誤處理:部分失敗的 Batch
Batch API 的一個重要特性:即使部分請求失敗,整個 batch 仍然正常完成。處理完成後,你需要自行處理失敗的請求。
def handle_batch_results_with_retry(
client: anthropic.Anthropic,
batch_id: str,
max_retries: int = 2
) -> dict[str, str | None]:
"""
處理 batch 結果,對失敗的請求自動重試(使用即時 API)
"""
results = {}
failed_requests = []
# 第一輪:處理所有結果
for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
results[result.custom_id] = result.result.message.content[0].text
else:
error_type = result.result.type # "errored" or "expired"
print(f"[{error_type}] {result.custom_id}")
failed_requests.append(result.custom_id)
results[result.custom_id] = None
# 第二輪:對失敗的請求用即時 API 重試
if failed_requests and max_retries > 0:
print(f"\n對 {len(failed_requests)} 個失敗請求進行重試...")
# 這裡你需要保留原始請求的映射,根據 custom_id 找回原始 params
# (在實際應用中,你會從資料庫或原始列表中查找)
for failed_id in failed_requests:
try:
# 用即時 API 重試
retry_response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=16,
messages=[{"role": "user", "content": f"...重試請求內容..."}]
)
results[failed_id] = retry_response.content[0].text
except Exception as e:
print(f"重試失敗 {failed_id}: {e}")
return results
Limits 與最佳實踐
Limits(截至 2026):
- 單次 batch:最多 10,000 個請求
- 每個請求最大:32MB(JSONL 行)
- 整個 batch 最大:200MB(未壓縮 JSONL)
- Batch 保留時間:29 天(超過後自動刪除)
- 最長處理時間:24 小時
最佳實踐:
-
選對模型:Batch 任務大多是分類、提取、翻譯等任務,Claude Haiku 4.5 夠用,比 Opus 便宜 20 倍
-
善用 max_tokens 最小化:批次分類任務 max_tokens 設 16-32 就夠,不要設 1024
-
設計冪等的 custom_id:用你的資料主鍵(例如資料庫 ID)作為 custom_id,方便重跑
-
儲存 batch_id:程式中途崩潰也能繼續輪詢
-
不要阻塞等待:Batch 可能需要幾小時,設計為非阻塞的背景工作,完成後發通知
-
分批提交超大任務:10,000 個請求以上的任務,拆成多個 batch
def split_into_batches(items: list, batch_size: int = 5000) -> list[list]:
"""將大型任務拆分為多個 batch"""
return [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
成本計算
用 Batch API 相比普通 Messages API,輸入和輸出 tokens 都打 5 折。
以一個批次翻譯任務為例:
- 1,000 份文件,每份平均 2,000 tokens
- 翻譯輸出平均 2,200 tokens
- 使用 Claude Haiku 4.5(假設 $0.80/MTok 輸入,$4/MTok 輸出)
| 方式 | 輸入費用 | 輸出費用 | 總計 |
|---|---|---|---|
| Messages API | $1.60 | $8.80 | $10.40 |
| Batch API(5折) | $0.80 | $4.40 | $5.20 |
| 節省 | 50%($5.20) |
在大量任務上,Batch API 是個非常划算的選擇。
Batch API 解決了「大量 + 非即時」這個使用場景。但如果你的任務更複雜——需要動態決策、多步驟推理、呼叫外部工具——光靠 Messages API 或 Batch API 就不夠了。
下一章,我們進入這本書的重頭戲:Agent SDK。我們要從「呼叫 API 得到答案」升級到「讓 AI 自主完成複雜任務」。