跳至主要內容
技術

Streaming:打造即時回應的用戶體驗

Streaming:打造即時回應的用戶體驗
Claude API & Agent SDK 完全指南 第 3 / 15 篇

本篇是「Claude API & Agent SDK 完全指南」系列的第 3 / 15 篇。你可以從系列總覽開始閱讀,也可以直接接著看本文。

我要先說一個反直覺的事實:使用 streaming 並不會讓 Claude 回答得更快

Claude 生成每個 token 的速度是一樣的,不管你用不用 streaming。

那為什麼 streaming 很重要?

因為感知等待時間(perceived latency)和實際等待時間是兩件事

沒有 streaming:使用者看著空白螢幕等了 5 秒,然後一大段文字突然全部出現。他的感受是「等了 5 秒」。

有了 streaming:使用者在你按下送出後 200 毫秒就看到第一個字,然後文字一個接一個地流出來。即使整體等待的總秒數一樣,他的感受是「回應很快」。

這就是為什麼所有主流的 AI 產品(ChatGPT、Claude.ai、Gemini)都預設使用 streaming。如果你在建一個使用者會直接互動的 AI 應用,streaming 幾乎是必要的。

SSE(Server-Sent Events)原理

在深入 Claude 的 streaming 實作之前,先快速了解底層的技術:SSE。

HTTP 請求通常是「請求-回應」的模式:客戶端發送請求,伺服器處理完,送回完整的回應,連線結束。

SSE 是一個例外:客戶端發送請求後,伺服器保持連線開放,持續地推送資料,直到它主動關閉連線。每個推送的資料片段格式如下:

data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"你好"}}

data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"!"}}

data: [DONE]

每個 data: 行是一個事件,事件之間用空行分隔,[DONE] 表示串流結束。

Claude API 的 streaming 就是建立在 SSE 上的。

Claude Streaming 的事件類型

Claude 的 streaming 不只是「把文字一個字一個字傳過來」,它有一套完整的事件體系,讓你知道現在正在發生什麼。

理解這些事件類型對於正確處理 streaming 回應非常重要:

message_start          → 串流開始,包含 message ID 和初始 usage 資訊
content_block_start    → 一個新的 content block 開始(例如開始產生文字)
content_block_delta    → content block 的一段增量(你的文字就在這裡)
content_block_stop     → 一個 content block 結束
message_delta          → message 層級的更新(包含 stop_reason 和最終 usage)
message_stop           → 整個 message 串流結束

完整的事件流大致是這樣的:

{"type": "message_start", "message": {"id": "msg_01...", "type": "message", "role": "assistant", "content": [], "model": "claude-sonnet-4-6-20251101", "usage": {"input_tokens": 25, "output_tokens": 1}}}

{"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}

{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "你好"}}
{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "!我是"}}
{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": " Claude"}}

{"type": "content_block_stop", "index": 0}

{"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": null}, "usage": {"output_tokens": 42}}

{"type": "message_stop"}

在大多數情況下,你只需要關心 content_block_delta 事件裡的 delta.text——那就是要顯示給使用者的文字。但 message_delta 裡的 stop_reason 也很重要,你需要知道為什麼 Claude 停止了。

Python SDK Streaming 實作

Python SDK 提供了非常優雅的 streaming 介面,使用 context manager 語法:

import anthropic

client = anthropic.Anthropic()

# 使用 stream() context manager
with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "請寫一首關於台灣的短詩,大約 100 字。"}
    ]
) as stream:
    # 最簡單的方式:直接迭代文字
    for text in stream.text_stream:
        print(text, end="", flush=True)

print()  # 換行

# 串流結束後,你可以取得完整的 message 物件
final_message = stream.get_final_message()
print(f"\nStop reason: {final_message.stop_reason}")
print(f"Total tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}")

stream.text_stream 是一個 generator,每次 yield 一個文字片段。flush=True 確保每個片段立即輸出到 terminal,而不是等緩衝區滿了再印出。

如果你需要更細緻的控制,可以迭代原始的事件流:

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "解釋一下量子纏繞"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                # 即時處理每個文字片段
                process_text_chunk(event.delta.text)
        elif event.type == "message_delta":
            # 串流結束,取得最終狀態
            print(f"Done! Stop reason: {event.delta.stop_reason}")

累積完整回應

有時候你需要同時串流給使用者,同時保存完整的回應文字(例如要存進資料庫):

full_response = []

with client.messages.stream(...) as stream:
    for text in stream.text_stream:
        full_response.append(text)
        yield text  # 串流給前端

complete_text = "".join(full_response)
save_to_db(complete_text)

TypeScript / Node.js Streaming

TypeScript SDK 的 streaming API 設計得很對稱:

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

async function streamResponse() {
  const stream = client.messages.stream({
    model: 'claude-sonnet-4-6',
    max_tokens: 1024,
    messages: [
      {
        role: 'user',
        content: '請寫一首關於台灣的短詩,大約 100 字。',
      },
    ],
  });

  // 方法一:監聽 text 事件
  stream.on('text', (text) => {
    process.stdout.write(text);
  });

  // 等待串流完成
  const finalMessage = await stream.finalMessage();
  console.log('\nStop reason:', finalMessage.stop_reason);
  console.log('Total tokens:', finalMessage.usage.input_tokens + finalMessage.usage.output_tokens);
}

streamResponse();

或者使用 async iterator 風格:

async function streamWithAsyncIterator() {
  const stream = await client.messages.create({
    model: 'claude-sonnet-4-6',
    max_tokens: 1024,
    stream: true,  // 關鍵:設定 stream: true
    messages: [
      { role: 'user', content: '你好!' },
    ],
  });

  for await (const event of stream) {
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text);
    }
  }
}

在 Express.js 中實作 Streaming Endpoint

把 streaming 整合進你的後端 API,讓前端可以接收:

import express from 'express';
import Anthropic from '@anthropic-ai/sdk';

const app = express();
app.use(express.json());

const client = new Anthropic();

app.post('/api/chat/stream', async (req, res) => {
  const { messages, systemPrompt } = req.body;

  // 設定 SSE 必要的 headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('X-Accel-Buffering', 'no');  // 給 Nginx 用的,禁用緩衝

  try {
    const stream = client.messages.stream({
      model: 'claude-sonnet-4-6',
      max_tokens: 2048,
      system: systemPrompt,
      messages,
    });

    // 把每個文字片段轉換成 SSE 格式傳給前端
    stream.on('text', (text) => {
      // SSE 格式:data: {...}\n\n
      res.write(`data: ${JSON.stringify({ type: 'text', text })}\n\n`);
    });

    // 等待串流完成
    const finalMessage = await stream.finalMessage();

    // 傳送結束事件
    res.write(`data: ${JSON.stringify({
      type: 'done',
      stop_reason: finalMessage.stop_reason,
      usage: finalMessage.usage,
    })}\n\n`);

    res.end();

  } catch (error) {
    // 錯誤時通知前端
    res.write(`data: ${JSON.stringify({ type: 'error', message: String(error) })}\n\n`);
    res.end();
  }
});

app.listen(3000);

前端(瀏覽器)接收 SSE

// 前端使用 EventSource 或 fetch 接收 SSE

async function fetchStreamingResponse(userMessage: string) {
  const response = await fetch('/api/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      messages: [{ role: 'user', content: userMessage }],
    }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // 解析 SSE 事件(以 \n\n 分隔)
    const lines = buffer.split('\n\n');
    buffer = lines.pop() || '';  // 最後一個可能不完整,留到下次

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));
        if (data.type === 'text') {
          // 更新 UI:把文字附加到顯示區域
          appendToDisplay(data.text);
        } else if (data.type === 'done') {
          console.log('Stream complete:', data.stop_reason);
        } else if (data.type === 'error') {
          console.error('Stream error:', data.message);
        }
      }
    }
  }
}

在 Next.js 中實作 Streaming

Next.js App Router 原生支援 streaming response,搭配 Claude API 特別好用:

// app/api/chat/route.ts
import { NextRequest } from 'next/server';
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

export async function POST(req: NextRequest) {
  const { messages } = await req.json();

  // 建立一個 TransformStream 來轉換 Claude 的 streaming 輸出
  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      try {
        const claudeStream = client.messages.stream({
          model: 'claude-sonnet-4-6',
          max_tokens: 2048,
          messages,
        });

        claudeStream.on('text', (text) => {
          controller.enqueue(encoder.encode(`data: ${JSON.stringify({ text })}\n\n`));
        });

        await claudeStream.finalMessage();
        controller.enqueue(encoder.encode('data: [DONE]\n\n'));
        controller.close();

      } catch (error) {
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({ error: String(error) })}\n\n`)
        );
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

前端使用 Vercel AI SDK 的 useChat hook 可以讓這個流程更簡單。但如果你想自己實作,上面的範例已經足夠了。

Streaming 的錯誤處理與重連策略

Streaming 相比一般的 HTTP 請求有更多需要處理的邊際情況。

常見的失敗模式

  1. 網路中斷:串流到一半連線斷了
  2. 伺服器過載:Anthropic 在串流過程中回傳 529
  3. 逾時:長時間的 streaming 可能觸發某些代理伺服器的逾時設定

簡單的重連實作

import anthropic
import time

def stream_with_retry(client, max_retries=3, **kwargs):
    """帶重試機制的 streaming 呼叫"""
    for attempt in range(max_retries):
        try:
            with client.messages.stream(**kwargs) as stream:
                for text in stream.text_stream:
                    yield text
                return  # 成功完成,退出

        except anthropic.APIConnectionError as e:
            if attempt == max_retries - 1:
                raise
            wait = 2 ** attempt
            print(f"Connection error, retrying in {wait}s... (attempt {attempt + 1})")
            time.sleep(wait)

        except anthropic.APIStatusError as e:
            if e.status_code in (429, 529) and attempt < max_retries - 1:
                wait = 2 ** attempt
                time.sleep(wait)
            else:
                raise

前端的重連邏輯

async function streamWithReconnect(
  messages: Anthropic.Messages.MessageParam[],
  maxRetries = 3
) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      await fetchStreamingResponse(messages);
      return;  // 成功
    } catch (error) {
      if (attempt === maxRetries - 1) throw error;
      const waitMs = Math.pow(2, attempt) * 1000;
      console.warn(`Stream failed, retrying in ${waitMs}ms...`);
      await new Promise(resolve => setTimeout(resolve, waitMs));
    }
  }
}

Streaming with Tool Use:特殊處理

當你的應用使用 Tool Use(下一章介紹),streaming 會更複雜。

除了文字的 text_delta,你還會收到 tool use 相關的事件:

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    tools=[weather_tool],
    messages=messages
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                # Claude 正在呼叫工具
                tool_name = event.content_block.name
                tool_id = event.content_block.id
                print(f"Calling tool: {tool_name}")

        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                # 普通文字
                print(event.delta.text, end="", flush=True)
            elif event.delta.type == "input_json_delta":
                # Tool 的參數(JSON 格式,逐步串流)
                # 不建議在這裡解析,等 message_stop 後再解析完整的
                pass

        elif event.type == "message_stop":
            # 取得完整的 message,包含所有 tool_use blocks
            final = stream.get_final_message()
            # 處理 tool calls...

我的建議:在 streaming 模式下,不要試圖即時解析 tool inputinput_json_delta 是 JSON 的片段,很難即時解析。等到 message_stop 事件後,用 stream.get_final_message() 取得完整的 message,再處理 tool calls。

stop_reason 的意義

每個 streaming 回應最終都會有一個 stop_reason,在 message_delta 事件裡:

with client.messages.stream(...) as stream:
    for text in stream.text_stream:
        pass  # 消費所有文字

    final = stream.get_final_message()
    match final.stop_reason:
        case "end_turn":
            # 正常結束:Claude 認為它說完了
            pass
        case "max_tokens":
            # 被截斷:需要告知使用者,或增大 max_tokens 繼續
            handle_truncation(final)
        case "stop_sequence":
            # 遇到你定義的 stop_sequence 而停止
            pass
        case "tool_use":
            # Claude 想要呼叫工具,等待你執行工具後繼續
            handle_tool_use(final)

效能優化提示

使用更小的模型:Haiku 比 Sonnet 快 3-5 倍,如果你的應用對延遲很敏感,Haiku 可能是更好的選擇。

減少 system prompt 長度:更長的 system prompt 會增加 time-to-first-token(第一個 token 出現的時間)。

平行請求:如果你有多個不相關的 Claude 呼叫,可以平行發送而不是序列等待:

import asyncio
import anthropic

# 使用 AsyncAnthropic 客戶端
client = anthropic.AsyncAnthropic()

async def parallel_streams(prompts: list[str]):
    async def single_stream(prompt):
        result = []
        async with client.messages.stream(
            model="claude-haiku-4-5",
            max_tokens=512,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for text in stream.text_stream:
                result.append(text)
        return "".join(result)

    return await asyncio.gather(*[single_stream(p) for p in prompts])

下一步

Streaming 讓你的 AI 應用在使用者體驗上更接近 Claude.ai 這樣的成熟產品。

但到目前為止,我們的 Claude 都是「說話的機器」——只能輸出文字,沒辦法真正地做事。

下一章,我們來解鎖 Claude 最強大的功能之一:Tool Use(工具使用)。讓 Claude 不只是說話,而是能夠呼叫你的 API、查詢資料庫、執行計算——真正地成為你應用的大腦。

留言討論

esc
輸入關鍵字搜尋文章...
查看收藏 →