完整案例:從 0 到 1 打造 AI 客服系統
本篇是「Claude API & Agent SDK 完全指南」系列的第 15 / 15 篇。你可以從系列總覽開始閱讀,也可以直接接著看本文。
在結束之前,讓我們把整本書學到的東西全部用上。
這一章不是玩具範例——是一個你可以真正部署的 AI 客服系統。它會用到這本書介紹過的所有核心技術:Messages API、Tool Use、Prompt Caching、Streaming、Multi-Agent 架構,以及第十三章的成本優化策略。
我把這個系統叫做 Helios(太陽神,意味著「照亮問題」)。
系統需求
Helios 需要做這幾件事:
- 接收用戶問題:透過 REST API 接受問題,支援 Streaming 回覆
- 查詢知識庫(RAG):在公司的知識庫中搜尋相關文件,作為回覆的依據
- 判斷是否需要轉人工:某些問題(退款、投訴、技術問題)應該轉給真人處理
- 生成高品質回覆:基於知識庫內容,用自然語言回覆用戶
非功能性需求:
- P95 延遲 < 10 秒(包含 streaming 開始前的等待)
- 每月 API 成本 < $500(假設 1000 名日活用戶)
- 不記錄用戶的個人資訊
技術架構
用戶
│
▼
FastAPI Server (Cloud Run)
│
├── 分類 Agent (Haiku) ─── 判斷問題類型
│
├── 知識庫查詢工具 ─────── pgvector (Cloud SQL)
│
├── 回覆 Agent (Sonnet) ── 生成回覆 + Streaming
│
└── 升級工具 ───────────── Zendesk API (建立工單)
Redis ─── 對話歷史快取(TTL: 1 小時)
技術選擇說明:
- FastAPI:Python 非同步框架,原生支援 async/await,搭配 Anthropic SDK 的 streaming 最方便
- pgvector:PostgreSQL 的向量搜尋擴充,不需要另外建立 Pinecone/Weaviate 等獨立服務,維護成本低
- Redis:對話歷史的暫存,TTL 設 1 小時,超過就重置
- Cloud Run:無伺服器容器部署,只有請求時才消耗資源,成本低
資料流設計
一個用戶問題的完整旅程:
1. 用戶 POST /chat {"session_id": "xxx", "message": "我的訂單什麼時候到?"}
↓
2. 從 Redis 讀取對話歷史
↓
3. 分類 Agent(Haiku)判斷:
- 問題類型(訂單查詢 / 退款 / 技術問題 / 一般諮詢)
- 是否需要轉人工(高風險問題)
↓
4a. 如果需要轉人工 → 建立 Zendesk 工單 → 回覆「已為您轉接專員」
4b. 如果不需要轉人工 → 繼續步驟 5
↓
5. 回覆 Agent(Sonnet):
- 呼叫知識庫搜尋工具
- 根據搜尋結果生成回覆
- Streaming 回覆給用戶
↓
6. 把這輪對話加入 Redis(對話歷史)
環境設定
# 建立 Poetry 專案
poetry new helios-support
cd helios-support
# 安裝依賴
poetry add anthropic fastapi uvicorn redis psycopg2-binary pgvector
poetry add python-dotenv pydantic
# .env
ANTHROPIC_API_KEY=sk-ant-api03-xxx
REDIS_URL=redis://localhost:6379
DATABASE_URL=postgresql://user:pass@localhost:5432/helios
ZENDESK_SUBDOMAIN=yourcompany
ZENDESK_EMAIL=support@yourcompany.com
ZENDESK_API_TOKEN=xxx
知識庫(RAG 層)
首先建立知識庫的資料存取層:
# helios/knowledge_base.py
import psycopg2
from pgvector.psycopg2 import register_vector
import anthropic
import os
from dataclasses import dataclass
client = anthropic.Anthropic()
@dataclass
class Document:
id: str
title: str
content: str
similarity: float
def embed_text(text: str) -> list[float]:
"""使用 Anthropic 的 embedding API 把文字轉成向量"""
# 注意:Anthropic 目前沒有自己的 embedding model
# 建議使用 OpenAI text-embedding-3-small 或 Google text-embedding-004
# 這裡假設你已經設定好 embedding function
# 使用你偏好的 embedding service
raise NotImplementedError("請替換成你的 embedding service")
def search_knowledge_base(
query: str,
limit: int = 3,
similarity_threshold: float = 0.7
) -> list[Document]:
"""在知識庫中搜尋相關文件"""
query_vector = embed_text(query)
conn = psycopg2.connect(os.environ["DATABASE_URL"])
register_vector(conn)
try:
with conn.cursor() as cur:
cur.execute("""
SELECT id, title, content,
1 - (embedding <=> %s::vector) AS similarity
FROM knowledge_base
WHERE 1 - (embedding <=> %s::vector) > %s
ORDER BY similarity DESC
LIMIT %s
""", (query_vector, query_vector, similarity_threshold, limit))
rows = cur.fetchall()
return [
Document(id=row[0], title=row[1], content=row[2], similarity=row[3])
for row in rows
]
finally:
conn.close()
工具定義
接下來定義 Agent 可以呼叫的工具:
# helios/tools.py
import anthropic
import requests
import json
import os
from .knowledge_base import search_knowledge_base
# 工具定義(傳給 Anthropic API 的 tools 參數)
SUPPORT_TOOLS = [
{
"name": "search_knowledge_base",
"description": """搜尋公司知識庫,找到跟用戶問題相關的文件。
在回覆任何問題之前,應該先用這個工具搜尋相關資訊。
如果找不到相關資訊,誠實告訴用戶你不知道,不要猜測。""",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜尋查詢詞,應該是描述問題的關鍵詞"
}
},
"required": ["query"]
}
},
{
"name": "escalate_to_human",
"description": """把用戶的問題升級給真人客服專員處理。
在以下情況應該使用這個工具:
- 用戶要求退款
- 用戶表達強烈不滿或憤怒
- 涉及帳號安全問題
- 問題超出你的知識範疇,用戶急需解決""",
"input_schema": {
"type": "object",
"properties": {
"reason": {
"type": "string",
"description": "升級原因的簡短說明(給客服專員看的,不是給用戶看的)"
},
"urgency": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "緊急程度"
}
},
"required": ["reason", "urgency"]
}
}
]
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""執行工具並回傳結果"""
if tool_name == "search_knowledge_base":
docs = search_knowledge_base(tool_input["query"])
if not docs:
return "知識庫中找不到相關資訊。"
results = []
for doc in docs:
results.append(f"## {doc.title}\n{doc.content[:500]}")
return "\n\n".join(results)
elif tool_name == "escalate_to_human":
ticket_id = create_zendesk_ticket(
reason=tool_input["reason"],
urgency=tool_input["urgency"]
)
return json.dumps({
"escalated": True,
"ticket_id": ticket_id,
"message": f"已建立工單 #{ticket_id},專員將在 24 小時內與您聯繫"
})
return f"ERROR: 未知工具 {tool_name}"
def create_zendesk_ticket(reason: str, urgency: str) -> str:
"""建立 Zendesk 工單(簡化版)"""
url = f"https://{os.environ['ZENDESK_SUBDOMAIN']}.zendesk.com/api/v2/tickets.json"
auth = (
f"{os.environ['ZENDESK_EMAIL']}/token",
os.environ["ZENDESK_API_TOKEN"]
)
priority_map = {"low": "low", "medium": "normal", "high": "urgent"}
response = requests.post(url, auth=auth, json={
"ticket": {
"subject": "AI 客服升級",
"comment": {"body": f"升級原因:{reason}"},
"priority": priority_map.get(urgency, "normal"),
"tags": ["ai-escalated"]
}
})
if response.ok:
return str(response.json()["ticket"]["id"])
return "UNKNOWN"
對話管理
# helios/conversation.py
import redis
import json
import os
from datetime import timedelta
redis_client = redis.from_url(os.environ["REDIS_URL"])
SESSION_TTL = timedelta(hours=1)
def get_conversation_history(session_id: str) -> list[dict]:
"""從 Redis 讀取對話歷史"""
data = redis_client.get(f"session:{session_id}")
if data:
return json.loads(data)
return []
def save_conversation_history(session_id: str, messages: list[dict]):
"""把對話歷史存到 Redis"""
redis_client.setex(
f"session:{session_id}",
SESSION_TTL,
json.dumps(messages, ensure_ascii=False)
)
def build_messages_with_cache(
history: list[dict],
new_message: str
) -> list[dict]:
"""建立帶有 Prompt Caching 標記的訊息列表"""
messages = history.copy()
# 如果歷史超過 6 輪,截斷最舊的
if len(messages) > 12: # 6 輪 = 12 條訊息
messages = messages[-12:]
# 對倒數第二條訊息(最新的 assistant 訊息)加快取標記
for i in range(len(messages) - 1, -1, -1):
if messages[i]["role"] == "assistant":
msg = messages[i].copy()
content = msg["content"]
if isinstance(content, str):
msg["content"] = [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"}
}
]
messages[i] = msg
break
messages.append({"role": "user", "content": new_message})
return messages
核心 Agent 邏輯
# helios/agent.py
import anthropic
from typing import AsyncIterator
from .tools import SUPPORT_TOOLS, execute_tool
from .conversation import (
get_conversation_history,
save_conversation_history,
build_messages_with_cache
)
client = anthropic.Anthropic()
# 大型知識庫摘要(幾千 tokens,固定內容,適合快取)
COMPANY_CONTEXT = """
你是 Helios 電商平台的 AI 客服助手。
關於我們的服務:
- 商品交貨時間:標準配送 3-5 個工作天,快速配送 1-2 個工作天
- 退貨政策:收到商品 7 天內可申請退貨,商品需保持原狀
- 退款時間:申請通過後 5-10 個工作天退款到原付款方式
- 客服時間:週一到週五 9:00-18:00
常見問題處理準則:
[更多公司政策內容...]
"""
SYSTEM_PROMPT_BASE = """你是 Helios 電商平台的 AI 客服助手,名字叫 Helios。
你的職責:
1. 用友善、專業的態度回應用戶問題
2. 在回覆前,用 search_knowledge_base 工具查詢相關資訊
3. 只根據知識庫中的資訊回覆,不要猜測或編造
4. 遇到退款、投訴、帳號安全問題,主動使用 escalate_to_human 工具
回覆風格:
- 簡潔清晰,避免廢話
- 中文回覆,但保留必要的技術術語
- 如果不知道答案,誠實說明並提供升級選項
"""
async def stream_support_response(
session_id: str,
user_message: str
) -> AsyncIterator[str]:
"""
處理用戶問題,Streaming 回覆。
這個 generator 會 yield 回覆的文字片段。
"""
# 讀取對話歷史
history = get_conversation_history(session_id)
messages = build_messages_with_cache(history, user_message)
# Agent 的 agentic loop
current_messages = messages.copy()
final_response_parts = []
assistant_message_content = []
while True:
# 呼叫 API(使用 streaming)
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=[
{
"type": "text",
"text": SYSTEM_PROMPT_BASE,
},
{
"type": "text",
"text": COMPANY_CONTEXT,
"cache_control": {"type": "ephemeral"} # 快取公司資訊
}
],
tools=SUPPORT_TOOLS,
messages=current_messages,
) as stream:
current_tool_calls = []
current_text = ""
for event in stream:
if hasattr(event, 'type'):
if event.type == 'content_block_start':
if hasattr(event.content_block, 'type'):
if event.content_block.type == 'text':
pass # 準備收文字
elif event.content_block.type == 'tool_use':
current_tool_calls.append({
"id": event.content_block.id,
"name": event.content_block.name,
"input": ""
})
elif event.type == 'content_block_delta':
if hasattr(event.delta, 'text'):
# 文字片段:直接 yield 給用戶
text_chunk = event.delta.text
current_text += text_chunk
final_response_parts.append(text_chunk)
yield text_chunk
elif hasattr(event.delta, 'partial_json'):
# 工具呼叫的參數(累積)
if current_tool_calls:
current_tool_calls[-1]["input"] += event.delta.partial_json
# 取得完整的 response
final_response = stream.get_final_message()
# 檢查 stop_reason
if final_response.stop_reason == "end_turn":
# 完成!把這輪對話加入歷史
assistant_content = []
if current_text:
assistant_content.append({"type": "text", "text": current_text})
current_messages.append({
"role": "assistant",
"content": assistant_content if assistant_content else "好的。"
})
# 儲存更新後的對話歷史
save_conversation_history(session_id, current_messages)
return
elif final_response.stop_reason == "tool_use":
# 需要執行工具
tool_results = []
for content_block in final_response.content:
if content_block.type == "tool_use":
tool_name = content_block.name
tool_input = content_block.input
# 執行工具
result = execute_tool(tool_name, tool_input)
tool_results.append({
"type": "tool_result",
"tool_use_id": content_block.id,
"content": result
})
# 如果是升級工具,向用戶發送通知
if tool_name == "escalate_to_human":
import json
result_data = json.loads(result)
yield f"\n\n我已經為您建立工單(#{result_data['ticket_id']}),我們的專員將盡快與您聯繫。"
# 把工具呼叫和結果加入訊息
current_messages.append({
"role": "assistant",
"content": final_response.content
})
current_messages.append({
"role": "user",
"content": tool_results
})
# 繼續 loop,讓 agent 根據工具結果繼續生成回覆
else:
# max_tokens 或其他停止原因
yield "\n\n(回覆已截斷,請繼續詢問)"
return
FastAPI Server
# helios/main.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
import logging
from .agent import stream_support_response
app = FastAPI(title="Helios AI Support")
logger = logging.getLogger(__name__)
class ChatRequest(BaseModel):
session_id: str
message: str
class Config:
# 限制輸入長度,防止超長輸入
max_anystr_length = 2000
@app.post("/chat")
async def chat(request: ChatRequest):
"""接受用戶問題,返回 Streaming 回覆"""
# 基本輸入驗證
if len(request.message.strip()) < 2:
raise HTTPException(status_code=400, detail="問題太短,請重新輸入")
if len(request.session_id) > 100:
raise HTTPException(status_code=400, detail="無效的 session ID")
async def generate():
try:
async for chunk in stream_support_response(
session_id=request.session_id,
user_message=request.message
):
# SSE 格式
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.exception(f"Error in chat stream: {e}")
yield f"data: 抱歉,發生了技術問題,請稍後再試。\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 禁用 Nginx 緩衝,確保 streaming 正常
}
)
@app.get("/health")
async def health_check():
"""健康檢查 endpoint"""
return {"status": "ok"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
成本追蹤整合
把第十三章的成本追蹤整合進來:
# helios/metrics.py
import time
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class RequestMetrics:
session_id: str
model: str
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_creation_tokens: int = 0
tool_calls: int = 0
escalated: bool = False
latency_ms: float = 0.0
def log(self):
# 計算成本
input_cost = self.input_tokens * 3.0 / 1_000_000
output_cost = self.output_tokens * 15.0 / 1_000_000
cache_read_cost = self.cache_read_tokens * 0.30 / 1_000_000
total_cost = input_cost + output_cost + cache_read_cost
cache_hit_rate = (
self.cache_read_tokens / max(self.input_tokens, 1) * 100
)
logger.info("request_completed", extra={
"session_id": self.session_id[:8] + "...", # 截斷,避免洩漏
"model": self.model,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"cache_read_tokens": self.cache_read_tokens,
"cache_hit_rate_pct": round(cache_hit_rate, 1),
"tool_calls": self.tool_calls,
"escalated": self.escalated,
"latency_ms": round(self.latency_ms),
"estimated_cost_usd": round(total_cost, 6),
})
部署到 Cloud Run
Dockerfile:
FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && poetry install --no-dev
COPY helios/ ./helios/
EXPOSE 8080
CMD ["poetry", "run", "uvicorn", "helios.main:app", "--host", "0.0.0.0", "--port", "8080"]
部署指令:
# Build 和 Push 到 Google Container Registry
gcloud builds submit --tag gcr.io/YOUR_PROJECT/helios-support
# 部署到 Cloud Run
gcloud run deploy helios-support \
--image gcr.io/YOUR_PROJECT/helios-support \
--platform managed \
--region asia-east1 \
--memory 512Mi \
--concurrency 80 \
--set-secrets "ANTHROPIC_API_KEY=anthropic-api-key:latest" \
--set-env-vars "REDIS_URL=redis://10.x.x.x:6379" \
--set-env-vars "DATABASE_URL=postgresql://user:pass@10.x.x.x/helios" \
--allow-unauthenticated
幾個 Cloud Run 的注意事項:
--concurrency 80 代表每個 Cloud Run instance 同時處理 80 個請求。AI streaming 請求會長時間佔用連線,這個值可以根據你的實際情況調整。
Secrets(API Key、資料庫密碼)用 Google Secret Manager 管理,透過 --set-secrets 注入,不要放在環境變數裡直接傳。
成本估算
這個系統在 1000 名日活用戶、每人平均 5 個問題的情況下:
每天請求數:5,000 每次請求的 token 估算:
- Input(含 COMPANY_CONTEXT):~3,000 tokens,其中 ~2,500 可以快取
- Output:~300 tokens
- 快取命中後,有效輸入成本:(500 × $3 + 2500 × $0.30) / 1,000,000 = $0.00225 / 請求
每天成本:5,000 × ($0.00225 + 300 × $15/1,000,000) = $11.25 + $22.50 = $33.75/天
每月成本:~$1,000/月
這比第十三章的目標 $500/月 高一點。要進一步降低,可以:
- 把分類用的 Haiku 做 routing,簡單問題完全用 Haiku 回答(降至 ~$600)
- 提高快取命中率(確保 5 分鐘內有足夠的請求維持快取熱度)
這本書的旅程
你剛讀完了「Claude API & Agent SDK 完全指南」的最後一章。
我想用幾段話回顧一下我們走過的路程。
這本書是 Claude 三部曲的第三本。第一本(Claude 使用指南)告訴你怎麼跟 Claude 對話。第二本(Claude Code 完全指南)告訴你怎麼讓 AI 幫你寫程式碼。這本書——第三本——告訴你怎麼用 Claude 建造東西:真正的應用、真正的系統、真正可以服務用戶的產品。
從第一章的第一個 client.messages.create() 呼叫,到這一章的多 agent、streaming、RAG 整合系統——中間走了很長的路。
如果你真的把這本書的範例都跑過了,你現在應該能做這些事:
- 用 Messages API 建立任何對話型應用
- 用 Tool Use 讓 AI 操控外部系統
- 用 Prompt Caching 把成本降到最低
- 用 Streaming 讓用戶體驗更流暢
- 用 Agent SDK 建立多 agent 協作系統
- 開發自己的 MCP Server
- 把這一切部署到生產環境,並且知道怎麼監控和維護
接下來呢?
AI 應用開發這個領域正在高速演進。Anthropic 每隔幾個月就會推出新功能——更好的模型、更低的成本、更強的 agent 能力。我的建議是:
建立一個真實的東西。不要停在讀書和做練習。找一個你真正有需求的問題,用你在這本書學到的技術解決它。你在解決真實問題的過程中學到的東西,遠比讀任何書都多。
關注官方文件。Anthropic 的 docs.anthropic.com 是最可靠的資訊來源,這本書的範例和定價資訊可能因版本更新而改變,但官方文件永遠是最新的。
加入社群。Anthropic 的 Discord、各種 AI 開發者社群——這些地方有很多正在解決和你類似問題的開發者。
最後,我想說:
我花了很多時間在這三本書裡,試著把「如何真正用好 Claude」說清楚——不是表面的功能介紹,而是讓你能真正建造東西的深度知識。寫作的過程其實也是我重新整理自己理解的過程。
如果這本書對你有幫助,最好的感謝方式,就是去建造一個真實的東西。
祝你的 AI 應用上線順利。
—— Bobo