跳至主要內容
技術

生產環境部署:錯誤處理、限流與可觀測性

生產環境部署:錯誤處理、限流與可觀測性
Claude API & Agent SDK 完全指南 第 14 / 15 篇

本篇是「Claude API & Agent SDK 完全指南」系列的第 14 / 15 篇。你可以從系列總覽開始閱讀,也可以直接接著看本文。

你的應用在開發環境完美運作,每次 demo 都沒出問題。

然後你把它部署到生產環境,十個用戶同時使用,五分鐘後你收到第一個錯誤報告。

這個場景幾乎每個做 AI 應用的工程師都經歷過。開發環境和生產環境之間有個巨大的鴻溝,而這個鴻溝在 AI API 應用中特別明顯。

開發環境 vs 生產環境的本質差異

並發性:開發環境通常是你一個人在用,偶爾跑幾個測試。生產環境可能有幾十甚至幾百個請求同時進來——這意味著 Rate Limit 問題、資源競爭問題會突然全部冒出來。

錯誤頻率:在開發環境,你的請求大部分都成功。生產環境中,網路問題、Anthropic API 的偶發故障、用戶輸入的邊緣 case——這些都會讓你看到你從來沒見過的錯誤。

可觀測性需求:開發環境你可以直接 print() 來 debug。生產環境你需要結構化的 log、metrics、追蹤系統,因為當出問題的時候,你不在場。

成本壓力:開發環境的錯誤和重試只花你一點時間。生產環境的錯誤和不必要的重試會直接換算成錢。

這一章我要告訴你,一個 AI API 應用要進生產環境,需要做哪些事。

Rate Limits 完整解析

Anthropic 的 Rate Limit 有四個維度,分別計算:

RPM(Requests Per Minute):每分鐘的請求次數限制。對多數用戶,這個限制在 50-2000 RPM 之間,視你的帳戶 tier 而定。

TPM(Tokens Per Minute):每分鐘的 token 使用量限制。這是最常觸發的限制——一個複雜的請求可能用掉 10K tokens,如果很多請求同時進來,很快就會碰到 TPM 上限。

ITPM(Input Tokens Per Minute):輸入 token 的專屬限制,有些 tier 對輸入 token 有獨立的限制。

OTPM(Output Tokens Per Minute):輸出 token 的專屬限制。如果你的應用需要生成大量長文,OTPM 可能是你最先碰到的瓶頸。

查看你的帳戶限制:

# 透過 Anthropic Console API 查看
curl https://api.anthropic.com/v1/organizations/limits \
  -H "x-api-key: $ANTHROPIC_API_KEY" \
  -H "anthropic-version: 2023-06-01"

或是在 console.anthropic.com 的 Settings → Limits 頁面查看。

如果你需要更高的限制,可以在 Console 申請提高——通常需要幾個工作天審核,並且需要說明你的使用場景。

當你碰到 Rate Limit,API 會回傳 HTTP 429 狀態碼,response 中會包含 Retry-After header,告訴你需要等待幾秒。

指數退避的正確實作

碰到 429 的時候,很多初學者的直覺是「等一秒再試」。但這是錯的。

如果你有 100 個並發請求都在等一秒,一秒後它們同時再試,又同時碰到 429,然後又同時等一秒……這個「驚群效應(Thundering Herd)」會讓情況越來越糟。

正確的做法是指數退避(Exponential Backoff)加上隨機 Jitter

import anthropic
import time
import random
import logging
from typing import Optional

logger = logging.getLogger(__name__)

class AnthropicClientWithRetry:
    def __init__(
        self,
        api_key: str,
        max_retries: int = 5,
        initial_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
    ):
        self.client = anthropic.Anthropic(api_key=api_key)
        self.max_retries = max_retries
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base

    def create_message(self, **kwargs) -> anthropic.Message:
        """帶有指數退避重試的 message 建立"""
        last_exception = None

        for attempt in range(self.max_retries + 1):
            try:
                response = self.client.messages.create(**kwargs)
                if attempt > 0:
                    logger.info(f"請求在第 {attempt + 1} 次嘗試成功")
                return response

            except anthropic.RateLimitError as e:
                last_exception = e
                if attempt == self.max_retries:
                    logger.error(f"達到最大重試次數 ({self.max_retries}),放棄")
                    raise

                # 從 Retry-After header 取得等待時間(如果有的話)
                retry_after = e.response.headers.get("Retry-After")
                if retry_after:
                    wait_time = float(retry_after)
                else:
                    # 指數退避 + jitter
                    wait_time = min(
                        self.initial_delay * (self.exponential_base ** attempt),
                        self.max_delay
                    )
                    # 加入 ±25% 的隨機 jitter,防止驚群效應
                    wait_time *= (0.75 + random.random() * 0.5)

                logger.warning(
                    f"Rate limit 觸發 (429),等待 {wait_time:.1f} 秒後重試 "
                    f"(第 {attempt + 1}/{self.max_retries} 次)"
                )
                time.sleep(wait_time)

            except anthropic.APIStatusError as e:
                # 5xx 錯誤:Anthropic server 問題,可以重試
                if e.status_code >= 500:
                    last_exception = e
                    if attempt == self.max_retries:
                        raise

                    wait_time = min(
                        self.initial_delay * (self.exponential_base ** attempt),
                        self.max_delay
                    ) * (0.75 + random.random() * 0.5)

                    logger.warning(
                        f"Server error ({e.status_code}),等待 {wait_time:.1f} 秒後重試"
                    )
                    time.sleep(wait_time)
                else:
                    # 4xx 錯誤(除了 429):客戶端錯誤,不要重試
                    raise

            except anthropic.APIConnectionError as e:
                # 網路連線問題,可以重試
                last_exception = e
                if attempt == self.max_retries:
                    raise

                wait_time = min(
                    self.initial_delay * (self.exponential_base ** attempt),
                    self.max_delay
                )
                logger.warning(f"連線錯誤,等待 {wait_time:.1f} 秒後重試")
                time.sleep(wait_time)

        raise last_exception

這個實作處理了三種可重試的情況:Rate Limit(429)、Server Error(5xx)、連線錯誤。每次重試的等待時間是上一次的 2 倍,再加上隨機 jitter,避免驚群效應。

實際上,Anthropic 的官方 Python SDK 已經內建了類似的重試邏輯:

# SDK 內建重試,最多重試 2 次
client = anthropic.Anthropic(
    api_key="...",
    max_retries=2,  # 預設就是 2
    timeout=30.0,   # 請求 timeout(秒)
)

但我推薦自己控制重試邏輯,因為你可以加入更細緻的 logging 和 metrics,知道重試發生的頻率——頻繁的重試是一個需要關注的信號。

錯誤分類與對應策略

不同類型的錯誤需要不同的處理策略。

可重試的錯誤:

錯誤類型HTTP 狀態碼說明策略
RateLimitError429超過 Rate Limit指數退避重試
InternalServerError500Anthropic server 問題短暫等待後重試
ServiceUnavailableError503服務維護或過載長時間等待後重試
APIConnectionErrorN/A網路問題重試
APITimeoutErrorN/A請求逾時重試(考慮增加 timeout)

不可重試的錯誤(客戶端錯誤):

錯誤類型HTTP 狀態碼常見原因策略
AuthenticationError401API Key 無效或已撤銷檢查 API Key,告警
PermissionDeniedError403沒有使用某個功能的權限檢查帳戶設定
NotFoundError404請求了不存在的資源修復 bug
BadRequestError400請求格式錯誤修復 bug,可能需要記錄 request
RequestTooLargeError413輸入超過 context window截斷輸入,記錄
from anthropic import (
    Anthropic,
    AuthenticationError,
    PermissionDeniedError,
    RateLimitError,
    BadRequestError,
    APIConnectionError,
    InternalServerError,
)

def handle_anthropic_error(e: Exception, context: dict) -> str:
    """統一的錯誤處理,回傳用戶友好的訊息"""

    if isinstance(e, AuthenticationError):
        # 這是嚴重問題——API Key 有問題,要告警給工程師
        logger.critical("API Key 認證失敗,需要立即檢查", extra=context)
        # 觸發 PagerDuty/Slack 告警
        alert_on_call_engineer("API Key 認證失敗")
        return "服務暫時無法使用,我們已收到通知並正在處理"

    elif isinstance(e, RateLimitError):
        # 已在重試邏輯中處理,到這裡代表重試用盡了
        logger.error("Rate limit 重試用盡", extra=context)
        return "目前服務繁忙,請稍後再試"

    elif isinstance(e, BadRequestError):
        # 可能是用戶輸入有問題,也可能是 bug
        error_msg = str(e)
        if "too many tokens" in error_msg.lower():
            logger.warning("輸入過長", extra={**context, "error": error_msg})
            return "您的輸入太長,請縮短後再試"
        else:
            logger.error("Bad request", extra={**context, "error": error_msg})
            return "請求格式有誤,請重試"

    elif isinstance(e, InternalServerError):
        logger.error("Anthropic server error", extra=context)
        return "AI 服務暫時不穩定,請稍後再試"

    else:
        logger.exception("未預期的錯誤", extra=context)
        return "發生未知錯誤,請重試"

API Key 安全管理

API Key 洩漏是 AI 應用最常見的安全事故之一。我見過有人把 API Key 直接 hardcode 在程式碼裡,commit 到 GitHub,然後一週內被人濫用了幾萬塊錢。

永遠不要做的事:

# ❌ 錯誤:hardcode 在程式碼中
client = anthropic.Anthropic(api_key="sk-ant-api03-xxxxx")

# ❌ 錯誤:commit 到 git
# 就算你之後刪掉,git history 還是看得到

正確做法:

# ✅ 從環境變數讀取
import os
import anthropic

# SDK 預設就會讀 ANTHROPIC_API_KEY 環境變數
client = anthropic.Anthropic()  # 自動讀取 os.environ["ANTHROPIC_API_KEY"]

# 或明確指定
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

在生產環境使用 Secret Manager:

# Google Cloud Secret Manager 範例
from google.cloud import secretmanager

def get_api_key() -> str:
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/{PROJECT_ID}/secrets/anthropic-api-key/versions/latest"
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode("UTF-8")

# 在應用啟動時(不是每次請求時)取得 key
ANTHROPIC_API_KEY = get_api_key()
anthropic_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)

另一個重要建議:為不同環境使用不同的 API Key。開發環境、staging 環境、生產環境各自一個 Key。這樣當某個環境的 Key 洩漏,其他環境不受影響,而且你可以透過 Anthropic Console 查看不同 Key 的使用量,更容易追蹤問題。

Logging 策略

Log 要記什麼?不記什麼?這個問題比大多數人想的複雜。

要記錄的:

  • 請求 ID(x-request-id header)——這是 debug 的生命線
  • 使用的 model、max_tokens
  • 回應的 token 使用量(input/output/cache)
  • 請求延遲(從送出到收到回應的時間)
  • 錯誤類型和錯誤訊息
  • 用戶 ID 或 session ID(匿名的,用於追蹤使用模式)

不要記錄的:

  • 用戶的完整 prompt(可能包含個人資訊)
  • API Key
  • 任何 PII(個人識別資訊)
import anthropic
import time
import logging
import uuid

logger = logging.getLogger(__name__)
client = anthropic.Anthropic()

def call_api_with_logging(
    messages: list,
    system: str,
    model: str = "claude-3-5-sonnet-20241022",
    user_id: str = None,
    feature: str = "unknown"
) -> anthropic.Message:
    """帶有完整 logging 的 API 呼叫"""
    request_id = str(uuid.uuid4())
    start_time = time.time()

    # 記錄請求(注意:不記錄 prompt 內容)
    logger.info("api_request_started", extra={
        "request_id": request_id,
        "feature": feature,
        "model": model,
        "user_id": user_id,  # 應該是匿名 ID,不是真實用戶資料
        "message_count": len(messages),
        # 記錄 prompt 的 token 估算,但不記錄內容
        "estimated_input_chars": sum(len(str(m.get("content", ""))) for m in messages),
    })

    try:
        response = client.messages.create(
            model=model,
            max_tokens=1024,
            system=system,
            messages=messages,
        )

        latency_ms = (time.time() - start_time) * 1000
        usage = response.usage

        # 從回應 headers 取得 Anthropic 的 request ID(用於聯繫 support)
        anthropic_request_id = response._request_id  # SDK 中的 request ID

        logger.info("api_request_succeeded", extra={
            "request_id": request_id,
            "anthropic_request_id": anthropic_request_id,
            "feature": feature,
            "model": model,
            "latency_ms": round(latency_ms),
            "input_tokens": usage.input_tokens,
            "output_tokens": usage.output_tokens,
            "cache_read_tokens": getattr(usage, 'cache_read_input_tokens', 0),
            "cache_creation_tokens": getattr(usage, 'cache_creation_input_tokens', 0),
            "stop_reason": response.stop_reason,
        })

        return response

    except Exception as e:
        latency_ms = (time.time() - start_time) * 1000
        logger.error("api_request_failed", extra={
            "request_id": request_id,
            "feature": feature,
            "model": model,
            "latency_ms": round(latency_ms),
            "error_type": type(e).__name__,
            "error_message": str(e)[:200],  # 截斷,避免 log 過大
        })
        raise

OpenTelemetry 整合

結構化 logging 只是開始。在生產環境,你還需要分散式追蹤(Distributed Tracing)——尤其是當你的 AI 功能只是更大系統的一部分時。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import anthropic

# 設定 OpenTelemetry
provider = TracerProvider()
otlp_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("ai-service")
client = anthropic.Anthropic()

def answer_question(user_question: str, context: str, user_id: str) -> str:
    """帶有 OpenTelemetry 追蹤的 API 呼叫"""
    with tracer.start_as_current_span("claude_api_call") as span:
        # 設定 span 屬性(不包含個人資訊)
        span.set_attribute("ai.model", "claude-3-5-sonnet-20241022")
        span.set_attribute("ai.feature", "question_answering")
        span.set_attribute("user.id", user_id)

        try:
            response = client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                system="你是一個有幫助的助手。",
                messages=[{"role": "user", "content": user_question}]
            )

            # 記錄結果 metrics
            usage = response.usage
            span.set_attribute("ai.input_tokens", usage.input_tokens)
            span.set_attribute("ai.output_tokens", usage.output_tokens)
            span.set_attribute("ai.stop_reason", response.stop_reason)

            return response.content[0].text

        except Exception as e:
            span.record_exception(e)
            span.set_status(trace.StatusCode.ERROR, str(e))
            raise

Latency 監控

API 延遲是用戶體驗的核心指標。claude-3-5-sonnet 的典型延遲在 1-10 秒之間,視 prompt 長度和輸出長度而定。

你應該監控 P50(中位數)、P95(第 95 百分位)、P99——不只是平均值。平均值會隱藏問題,P99 才能反映最差的用戶體驗。

import time
from collections import deque
import threading
import statistics

class LatencyTracker:
    def __init__(self, window_size: int = 1000):
        self.latencies = deque(maxlen=window_size)
        self.lock = threading.Lock()

    def record(self, latency_ms: float):
        with self.lock:
            self.latencies.append(latency_ms)

    def get_percentiles(self) -> dict:
        with self.lock:
            if not self.latencies:
                return {}
            sorted_latencies = sorted(self.latencies)
            n = len(sorted_latencies)
            return {
                "p50": sorted_latencies[int(n * 0.50)],
                "p95": sorted_latencies[int(n * 0.95)],
                "p99": sorted_latencies[int(n * 0.99)],
                "max": sorted_latencies[-1],
                "count": n,
            }

latency_tracker = LatencyTracker()

如果你的 P99 延遲突然從 5 秒跳到 30 秒,這是一個嚴重的信號——可能是 Anthropic 服務有問題,或是你的 prompt 突然變長了。

Fallback 策略:降級到更便宜的模型

當 Sonnet 不可用或響應過慢,降級到 Haiku 是一個有效的 fallback:

import anthropic
from typing import Optional

client = anthropic.Anthropic()

MODELS = {
    "primary": "claude-3-5-sonnet-20241022",
    "fallback": "claude-3-5-haiku-20241022",
}

def create_message_with_fallback(
    messages: list,
    system: str,
    max_tokens: int = 1024,
    timeout: float = 30.0
) -> tuple[anthropic.Message, str]:
    """先嘗試主要模型,失敗則降級"""

    for model_tier, model in MODELS.items():
        try:
            response = client.messages.create(
                model=model,
                max_tokens=max_tokens,
                system=system,
                messages=messages,
                timeout=timeout,
            )
            if model_tier == "fallback":
                logger.warning(f"使用了 fallback 模型: {model}")
            return response, model_tier

        except anthropic.InternalServerError as e:
            if model_tier == "primary":
                logger.warning(f"主要模型 {model} 不可用,嘗試 fallback")
                continue
            else:
                raise  # fallback 也失敗了,真的出問題了

        except anthropic.APITimeoutError:
            if model_tier == "primary":
                logger.warning(f"主要模型 {model} 超時,嘗試 fallback")
                continue
            else:
                raise

生產環境 10 項 Checklist

在部署你的 AI 應用到生產環境之前,確認這 10 項:

[1] API Key 安全

  • API Key 存放在環境變數或 Secret Manager,不在程式碼中
  • 不同環境使用不同的 API Key
  • .gitignore 有排除 .env 檔案

[2] 錯誤處理

  • 所有 API 呼叫都有 try/except
  • 429 錯誤有指數退避重試
  • 4xx 客戶端錯誤不重試
  • 錯誤訊息對用戶友好(不暴露技術細節)

[3] Rate Limit 管理

  • 了解你帳戶的 RPM 和 TPM 限制
  • 有機制監控目前的使用量
  • 有 queue 或 semaphore 控制最大並發請求數

[4] Timeout 設定

  • 所有請求都有設定合理的 timeout
  • Streaming 請求有設定 connection timeout

[5] Logging

  • 每次 API 呼叫都記錄延遲和 token 使用量
  • 錯誤有完整的 context 資訊
  • Log 中不包含 PII 或 API Key

[6] 監控和告警

  • P95/P99 延遲有告警
  • 錯誤率有告警
  • API 費用有告警(設定每日/每月上限)

[7] 輸入驗證

  • 用戶輸入有長度限制(防止 context window 爆炸)
  • 有基本的輸入清理

[8] Fallback 機制

  • 主要模型不可用時有降級方案
  • 完全降級時有友好的錯誤提示

[9] 成本控制

  • 啟用 Prompt Caching(如果適合)
  • max_tokens 設定合理的上限
  • 有成本監控 dashboard

[10] 運作驗證

  • 有 health check endpoint(確認 Anthropic API 可達)
  • 有 staging 環境,部署前先驗證
  • 有 rollback 計劃

完成這十項,你的 AI 應用才算真正準備好面對生產環境的挑戰。


這本書的前十四章,我們從最基礎的 Messages API 一路走到 multi-agent 系統、MCP Server、成本優化、生產部署。現在是最後一章——把所有這些拼在一起,建立一個真實的、可以部署的 AI 客服系統。

留言討論

esc
輸入關鍵字搜尋文章...
查看收藏 →