Streaming:打造即時回應的用戶體驗
本篇是「Claude API & Agent SDK 完全指南」系列的第 3 / 15 篇。你可以從系列總覽開始閱讀,也可以直接接著看本文。
我要先說一個反直覺的事實:使用 streaming 並不會讓 Claude 回答得更快。
Claude 生成每個 token 的速度是一樣的,不管你用不用 streaming。
那為什麼 streaming 很重要?
因為感知等待時間(perceived latency)和實際等待時間是兩件事。
沒有 streaming:使用者看著空白螢幕等了 5 秒,然後一大段文字突然全部出現。他的感受是「等了 5 秒」。
有了 streaming:使用者在你按下送出後 200 毫秒就看到第一個字,然後文字一個接一個地流出來。即使整體等待的總秒數一樣,他的感受是「回應很快」。
這就是為什麼所有主流的 AI 產品(ChatGPT、Claude.ai、Gemini)都預設使用 streaming。如果你在建一個使用者會直接互動的 AI 應用,streaming 幾乎是必要的。
SSE(Server-Sent Events)原理
在深入 Claude 的 streaming 實作之前,先快速了解底層的技術:SSE。
HTTP 請求通常是「請求-回應」的模式:客戶端發送請求,伺服器處理完,送回完整的回應,連線結束。
SSE 是一個例外:客戶端發送請求後,伺服器保持連線開放,持續地推送資料,直到它主動關閉連線。每個推送的資料片段格式如下:
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"你好"}}
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"!"}}
data: [DONE]
每個 data: 行是一個事件,事件之間用空行分隔,[DONE] 表示串流結束。
Claude API 的 streaming 就是建立在 SSE 上的。
Claude Streaming 的事件類型
Claude 的 streaming 不只是「把文字一個字一個字傳過來」,它有一套完整的事件體系,讓你知道現在正在發生什麼。
理解這些事件類型對於正確處理 streaming 回應非常重要:
message_start → 串流開始,包含 message ID 和初始 usage 資訊
content_block_start → 一個新的 content block 開始(例如開始產生文字)
content_block_delta → content block 的一段增量(你的文字就在這裡)
content_block_stop → 一個 content block 結束
message_delta → message 層級的更新(包含 stop_reason 和最終 usage)
message_stop → 整個 message 串流結束
完整的事件流大致是這樣的:
{"type": "message_start", "message": {"id": "msg_01...", "type": "message", "role": "assistant", "content": [], "model": "claude-sonnet-4-6-20251101", "usage": {"input_tokens": 25, "output_tokens": 1}}}
{"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}
{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "你好"}}
{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "!我是"}}
{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": " Claude"}}
{"type": "content_block_stop", "index": 0}
{"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": null}, "usage": {"output_tokens": 42}}
{"type": "message_stop"}
在大多數情況下,你只需要關心 content_block_delta 事件裡的 delta.text——那就是要顯示給使用者的文字。但 message_delta 裡的 stop_reason 也很重要,你需要知道為什麼 Claude 停止了。
Python SDK Streaming 實作
Python SDK 提供了非常優雅的 streaming 介面,使用 context manager 語法:
import anthropic
client = anthropic.Anthropic()
# 使用 stream() context manager
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[
{"role": "user", "content": "請寫一首關於台灣的短詩,大約 100 字。"}
]
) as stream:
# 最簡單的方式:直接迭代文字
for text in stream.text_stream:
print(text, end="", flush=True)
print() # 換行
# 串流結束後,你可以取得完整的 message 物件
final_message = stream.get_final_message()
print(f"\nStop reason: {final_message.stop_reason}")
print(f"Total tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}")
stream.text_stream 是一個 generator,每次 yield 一個文字片段。flush=True 確保每個片段立即輸出到 terminal,而不是等緩衝區滿了再印出。
如果你需要更細緻的控制,可以迭代原始的事件流:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "解釋一下量子纏繞"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
# 即時處理每個文字片段
process_text_chunk(event.delta.text)
elif event.type == "message_delta":
# 串流結束,取得最終狀態
print(f"Done! Stop reason: {event.delta.stop_reason}")
累積完整回應
有時候你需要同時串流給使用者,同時保存完整的回應文字(例如要存進資料庫):
full_response = []
with client.messages.stream(...) as stream:
for text in stream.text_stream:
full_response.append(text)
yield text # 串流給前端
complete_text = "".join(full_response)
save_to_db(complete_text)
TypeScript / Node.js Streaming
TypeScript SDK 的 streaming API 設計得很對稱:
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
async function streamResponse() {
const stream = client.messages.stream({
model: 'claude-sonnet-4-6',
max_tokens: 1024,
messages: [
{
role: 'user',
content: '請寫一首關於台灣的短詩,大約 100 字。',
},
],
});
// 方法一:監聽 text 事件
stream.on('text', (text) => {
process.stdout.write(text);
});
// 等待串流完成
const finalMessage = await stream.finalMessage();
console.log('\nStop reason:', finalMessage.stop_reason);
console.log('Total tokens:', finalMessage.usage.input_tokens + finalMessage.usage.output_tokens);
}
streamResponse();
或者使用 async iterator 風格:
async function streamWithAsyncIterator() {
const stream = await client.messages.create({
model: 'claude-sonnet-4-6',
max_tokens: 1024,
stream: true, // 關鍵:設定 stream: true
messages: [
{ role: 'user', content: '你好!' },
],
});
for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
}
在 Express.js 中實作 Streaming Endpoint
把 streaming 整合進你的後端 API,讓前端可以接收:
import express from 'express';
import Anthropic from '@anthropic-ai/sdk';
const app = express();
app.use(express.json());
const client = new Anthropic();
app.post('/api/chat/stream', async (req, res) => {
const { messages, systemPrompt } = req.body;
// 設定 SSE 必要的 headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // 給 Nginx 用的,禁用緩衝
try {
const stream = client.messages.stream({
model: 'claude-sonnet-4-6',
max_tokens: 2048,
system: systemPrompt,
messages,
});
// 把每個文字片段轉換成 SSE 格式傳給前端
stream.on('text', (text) => {
// SSE 格式:data: {...}\n\n
res.write(`data: ${JSON.stringify({ type: 'text', text })}\n\n`);
});
// 等待串流完成
const finalMessage = await stream.finalMessage();
// 傳送結束事件
res.write(`data: ${JSON.stringify({
type: 'done',
stop_reason: finalMessage.stop_reason,
usage: finalMessage.usage,
})}\n\n`);
res.end();
} catch (error) {
// 錯誤時通知前端
res.write(`data: ${JSON.stringify({ type: 'error', message: String(error) })}\n\n`);
res.end();
}
});
app.listen(3000);
前端(瀏覽器)接收 SSE
// 前端使用 EventSource 或 fetch 接收 SSE
async function fetchStreamingResponse(userMessage: string) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [{ role: 'user', content: userMessage }],
}),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 解析 SSE 事件(以 \n\n 分隔)
const lines = buffer.split('\n\n');
buffer = lines.pop() || ''; // 最後一個可能不完整,留到下次
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.type === 'text') {
// 更新 UI:把文字附加到顯示區域
appendToDisplay(data.text);
} else if (data.type === 'done') {
console.log('Stream complete:', data.stop_reason);
} else if (data.type === 'error') {
console.error('Stream error:', data.message);
}
}
}
}
}
在 Next.js 中實作 Streaming
Next.js App Router 原生支援 streaming response,搭配 Claude API 特別好用:
// app/api/chat/route.ts
import { NextRequest } from 'next/server';
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
export async function POST(req: NextRequest) {
const { messages } = await req.json();
// 建立一個 TransformStream 來轉換 Claude 的 streaming 輸出
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
try {
const claudeStream = client.messages.stream({
model: 'claude-sonnet-4-6',
max_tokens: 2048,
messages,
});
claudeStream.on('text', (text) => {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ text })}\n\n`));
});
await claudeStream.finalMessage();
controller.enqueue(encoder.encode('data: [DONE]\n\n'));
controller.close();
} catch (error) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ error: String(error) })}\n\n`)
);
controller.close();
}
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
前端使用 Vercel AI SDK 的 useChat hook 可以讓這個流程更簡單。但如果你想自己實作,上面的範例已經足夠了。
Streaming 的錯誤處理與重連策略
Streaming 相比一般的 HTTP 請求有更多需要處理的邊際情況。
常見的失敗模式
- 網路中斷:串流到一半連線斷了
- 伺服器過載:Anthropic 在串流過程中回傳 529
- 逾時:長時間的 streaming 可能觸發某些代理伺服器的逾時設定
簡單的重連實作
import anthropic
import time
def stream_with_retry(client, max_retries=3, **kwargs):
"""帶重試機制的 streaming 呼叫"""
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for text in stream.text_stream:
yield text
return # 成功完成,退出
except anthropic.APIConnectionError as e:
if attempt == max_retries - 1:
raise
wait = 2 ** attempt
print(f"Connection error, retrying in {wait}s... (attempt {attempt + 1})")
time.sleep(wait)
except anthropic.APIStatusError as e:
if e.status_code in (429, 529) and attempt < max_retries - 1:
wait = 2 ** attempt
time.sleep(wait)
else:
raise
前端的重連邏輯
async function streamWithReconnect(
messages: Anthropic.Messages.MessageParam[],
maxRetries = 3
) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
await fetchStreamingResponse(messages);
return; // 成功
} catch (error) {
if (attempt === maxRetries - 1) throw error;
const waitMs = Math.pow(2, attempt) * 1000;
console.warn(`Stream failed, retrying in ${waitMs}ms...`);
await new Promise(resolve => setTimeout(resolve, waitMs));
}
}
}
Streaming with Tool Use:特殊處理
當你的應用使用 Tool Use(下一章介紹),streaming 會更複雜。
除了文字的 text_delta,你還會收到 tool use 相關的事件:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
tools=[weather_tool],
messages=messages
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
# Claude 正在呼叫工具
tool_name = event.content_block.name
tool_id = event.content_block.id
print(f"Calling tool: {tool_name}")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
# 普通文字
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
# Tool 的參數(JSON 格式,逐步串流)
# 不建議在這裡解析,等 message_stop 後再解析完整的
pass
elif event.type == "message_stop":
# 取得完整的 message,包含所有 tool_use blocks
final = stream.get_final_message()
# 處理 tool calls...
我的建議:在 streaming 模式下,不要試圖即時解析 tool input。input_json_delta 是 JSON 的片段,很難即時解析。等到 message_stop 事件後,用 stream.get_final_message() 取得完整的 message,再處理 tool calls。
stop_reason 的意義
每個 streaming 回應最終都會有一個 stop_reason,在 message_delta 事件裡:
with client.messages.stream(...) as stream:
for text in stream.text_stream:
pass # 消費所有文字
final = stream.get_final_message()
match final.stop_reason:
case "end_turn":
# 正常結束:Claude 認為它說完了
pass
case "max_tokens":
# 被截斷:需要告知使用者,或增大 max_tokens 繼續
handle_truncation(final)
case "stop_sequence":
# 遇到你定義的 stop_sequence 而停止
pass
case "tool_use":
# Claude 想要呼叫工具,等待你執行工具後繼續
handle_tool_use(final)
效能優化提示
使用更小的模型:Haiku 比 Sonnet 快 3-5 倍,如果你的應用對延遲很敏感,Haiku 可能是更好的選擇。
減少 system prompt 長度:更長的 system prompt 會增加 time-to-first-token(第一個 token 出現的時間)。
平行請求:如果你有多個不相關的 Claude 呼叫,可以平行發送而不是序列等待:
import asyncio
import anthropic
# 使用 AsyncAnthropic 客戶端
client = anthropic.AsyncAnthropic()
async def parallel_streams(prompts: list[str]):
async def single_stream(prompt):
result = []
async with client.messages.stream(
model="claude-haiku-4-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
result.append(text)
return "".join(result)
return await asyncio.gather(*[single_stream(p) for p in prompts])
下一步
Streaming 讓你的 AI 應用在使用者體驗上更接近 Claude.ai 這樣的成熟產品。
但到目前為止,我們的 Claude 都是「說話的機器」——只能輸出文字,沒辦法真正地做事。
下一章,我們來解鎖 Claude 最強大的功能之一:Tool Use(工具使用)。讓 Claude 不只是說話,而是能夠呼叫你的 API、查詢資料庫、執行計算——真正地成為你應用的大腦。