跳至主要內容
技術

完整案例:從 0 到 1 打造 AI 客服系統

完整案例:從 0 到 1 打造 AI 客服系統
Claude API & Agent SDK 完全指南 第 15 / 15 篇

本篇是「Claude API & Agent SDK 完全指南」系列的第 15 / 15 篇。你可以從系列總覽開始閱讀,也可以直接接著看本文。

在結束之前,讓我們把整本書學到的東西全部用上。

這一章不是玩具範例——是一個你可以真正部署的 AI 客服系統。它會用到這本書介紹過的所有核心技術:Messages API、Tool Use、Prompt Caching、Streaming、Multi-Agent 架構,以及第十三章的成本優化策略。

我把這個系統叫做 Helios(太陽神,意味著「照亮問題」)。

系統需求

Helios 需要做這幾件事:

  1. 接收用戶問題:透過 REST API 接受問題,支援 Streaming 回覆
  2. 查詢知識庫(RAG):在公司的知識庫中搜尋相關文件,作為回覆的依據
  3. 判斷是否需要轉人工:某些問題(退款、投訴、技術問題)應該轉給真人處理
  4. 生成高品質回覆:基於知識庫內容,用自然語言回覆用戶

非功能性需求:

  • P95 延遲 < 10 秒(包含 streaming 開始前的等待)
  • 每月 API 成本 < $500(假設 1000 名日活用戶)
  • 不記錄用戶的個人資訊

技術架構

用戶


FastAPI Server (Cloud Run)

 ├── 分類 Agent (Haiku) ─── 判斷問題類型

 ├── 知識庫查詢工具 ─────── pgvector (Cloud SQL)

 ├── 回覆 Agent (Sonnet) ── 生成回覆 + Streaming

 └── 升級工具 ───────────── Zendesk API (建立工單)

Redis ─── 對話歷史快取(TTL: 1 小時)

技術選擇說明:

  • FastAPI:Python 非同步框架,原生支援 async/await,搭配 Anthropic SDK 的 streaming 最方便
  • pgvector:PostgreSQL 的向量搜尋擴充,不需要另外建立 Pinecone/Weaviate 等獨立服務,維護成本低
  • Redis:對話歷史的暫存,TTL 設 1 小時,超過就重置
  • Cloud Run:無伺服器容器部署,只有請求時才消耗資源,成本低

資料流設計

一個用戶問題的完整旅程:

1. 用戶 POST /chat {"session_id": "xxx", "message": "我的訂單什麼時候到?"}

2. 從 Redis 讀取對話歷史

3. 分類 Agent(Haiku)判斷:
   - 問題類型(訂單查詢 / 退款 / 技術問題 / 一般諮詢)
   - 是否需要轉人工(高風險問題)

4a. 如果需要轉人工 → 建立 Zendesk 工單 → 回覆「已為您轉接專員」
4b. 如果不需要轉人工 → 繼續步驟 5

5. 回覆 Agent(Sonnet):
   - 呼叫知識庫搜尋工具
   - 根據搜尋結果生成回覆
   - Streaming 回覆給用戶

6. 把這輪對話加入 Redis(對話歷史)

環境設定

# 建立 Poetry 專案
poetry new helios-support
cd helios-support

# 安裝依賴
poetry add anthropic fastapi uvicorn redis psycopg2-binary pgvector
poetry add python-dotenv pydantic

# .env
ANTHROPIC_API_KEY=sk-ant-api03-xxx
REDIS_URL=redis://localhost:6379
DATABASE_URL=postgresql://user:pass@localhost:5432/helios
ZENDESK_SUBDOMAIN=yourcompany
ZENDESK_EMAIL=support@yourcompany.com
ZENDESK_API_TOKEN=xxx

知識庫(RAG 層)

首先建立知識庫的資料存取層:

# helios/knowledge_base.py
import psycopg2
from pgvector.psycopg2 import register_vector
import anthropic
import os
from dataclasses import dataclass

client = anthropic.Anthropic()

@dataclass
class Document:
    id: str
    title: str
    content: str
    similarity: float

def embed_text(text: str) -> list[float]:
    """使用 Anthropic 的 embedding API 把文字轉成向量"""
    # 注意:Anthropic 目前沒有自己的 embedding model
    # 建議使用 OpenAI text-embedding-3-small 或 Google text-embedding-004
    # 這裡假設你已經設定好 embedding function
    # 使用你偏好的 embedding service
    raise NotImplementedError("請替換成你的 embedding service")

def search_knowledge_base(
    query: str,
    limit: int = 3,
    similarity_threshold: float = 0.7
) -> list[Document]:
    """在知識庫中搜尋相關文件"""
    query_vector = embed_text(query)

    conn = psycopg2.connect(os.environ["DATABASE_URL"])
    register_vector(conn)

    try:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT id, title, content,
                       1 - (embedding <=> %s::vector) AS similarity
                FROM knowledge_base
                WHERE 1 - (embedding <=> %s::vector) > %s
                ORDER BY similarity DESC
                LIMIT %s
            """, (query_vector, query_vector, similarity_threshold, limit))

            rows = cur.fetchall()
            return [
                Document(id=row[0], title=row[1], content=row[2], similarity=row[3])
                for row in rows
            ]
    finally:
        conn.close()

工具定義

接下來定義 Agent 可以呼叫的工具:

# helios/tools.py
import anthropic
import requests
import json
import os
from .knowledge_base import search_knowledge_base

# 工具定義(傳給 Anthropic API 的 tools 參數)
SUPPORT_TOOLS = [
    {
        "name": "search_knowledge_base",
        "description": """搜尋公司知識庫,找到跟用戶問題相關的文件。
        在回覆任何問題之前,應該先用這個工具搜尋相關資訊。
        如果找不到相關資訊,誠實告訴用戶你不知道,不要猜測。""",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "搜尋查詢詞,應該是描述問題的關鍵詞"
                }
            },
            "required": ["query"]
        }
    },
    {
        "name": "escalate_to_human",
        "description": """把用戶的問題升級給真人客服專員處理。
        在以下情況應該使用這個工具:
        - 用戶要求退款
        - 用戶表達強烈不滿或憤怒
        - 涉及帳號安全問題
        - 問題超出你的知識範疇,用戶急需解決""",
        "input_schema": {
            "type": "object",
            "properties": {
                "reason": {
                    "type": "string",
                    "description": "升級原因的簡短說明(給客服專員看的,不是給用戶看的)"
                },
                "urgency": {
                    "type": "string",
                    "enum": ["low", "medium", "high"],
                    "description": "緊急程度"
                }
            },
            "required": ["reason", "urgency"]
        }
    }
]

def execute_tool(tool_name: str, tool_input: dict) -> str:
    """執行工具並回傳結果"""
    if tool_name == "search_knowledge_base":
        docs = search_knowledge_base(tool_input["query"])
        if not docs:
            return "知識庫中找不到相關資訊。"

        results = []
        for doc in docs:
            results.append(f"## {doc.title}\n{doc.content[:500]}")
        return "\n\n".join(results)

    elif tool_name == "escalate_to_human":
        ticket_id = create_zendesk_ticket(
            reason=tool_input["reason"],
            urgency=tool_input["urgency"]
        )
        return json.dumps({
            "escalated": True,
            "ticket_id": ticket_id,
            "message": f"已建立工單 #{ticket_id},專員將在 24 小時內與您聯繫"
        })

    return f"ERROR: 未知工具 {tool_name}"

def create_zendesk_ticket(reason: str, urgency: str) -> str:
    """建立 Zendesk 工單(簡化版)"""
    url = f"https://{os.environ['ZENDESK_SUBDOMAIN']}.zendesk.com/api/v2/tickets.json"
    auth = (
        f"{os.environ['ZENDESK_EMAIL']}/token",
        os.environ["ZENDESK_API_TOKEN"]
    )

    priority_map = {"low": "low", "medium": "normal", "high": "urgent"}

    response = requests.post(url, auth=auth, json={
        "ticket": {
            "subject": "AI 客服升級",
            "comment": {"body": f"升級原因:{reason}"},
            "priority": priority_map.get(urgency, "normal"),
            "tags": ["ai-escalated"]
        }
    })

    if response.ok:
        return str(response.json()["ticket"]["id"])
    return "UNKNOWN"

對話管理

# helios/conversation.py
import redis
import json
import os
from datetime import timedelta

redis_client = redis.from_url(os.environ["REDIS_URL"])
SESSION_TTL = timedelta(hours=1)

def get_conversation_history(session_id: str) -> list[dict]:
    """從 Redis 讀取對話歷史"""
    data = redis_client.get(f"session:{session_id}")
    if data:
        return json.loads(data)
    return []

def save_conversation_history(session_id: str, messages: list[dict]):
    """把對話歷史存到 Redis"""
    redis_client.setex(
        f"session:{session_id}",
        SESSION_TTL,
        json.dumps(messages, ensure_ascii=False)
    )

def build_messages_with_cache(
    history: list[dict],
    new_message: str
) -> list[dict]:
    """建立帶有 Prompt Caching 標記的訊息列表"""
    messages = history.copy()

    # 如果歷史超過 6 輪,截斷最舊的
    if len(messages) > 12:  # 6 輪 = 12 條訊息
        messages = messages[-12:]

    # 對倒數第二條訊息(最新的 assistant 訊息)加快取標記
    for i in range(len(messages) - 1, -1, -1):
        if messages[i]["role"] == "assistant":
            msg = messages[i].copy()
            content = msg["content"]
            if isinstance(content, str):
                msg["content"] = [
                    {
                        "type": "text",
                        "text": content,
                        "cache_control": {"type": "ephemeral"}
                    }
                ]
            messages[i] = msg
            break

    messages.append({"role": "user", "content": new_message})
    return messages

核心 Agent 邏輯

# helios/agent.py
import anthropic
from typing import AsyncIterator
from .tools import SUPPORT_TOOLS, execute_tool
from .conversation import (
    get_conversation_history,
    save_conversation_history,
    build_messages_with_cache
)

client = anthropic.Anthropic()

# 大型知識庫摘要(幾千 tokens,固定內容,適合快取)
COMPANY_CONTEXT = """
你是 Helios 電商平台的 AI 客服助手。

關於我們的服務:
- 商品交貨時間:標準配送 3-5 個工作天,快速配送 1-2 個工作天
- 退貨政策:收到商品 7 天內可申請退貨,商品需保持原狀
- 退款時間:申請通過後 5-10 個工作天退款到原付款方式
- 客服時間:週一到週五 9:00-18:00

常見問題處理準則:
[更多公司政策內容...]
"""

SYSTEM_PROMPT_BASE = """你是 Helios 電商平台的 AI 客服助手,名字叫 Helios。

你的職責:
1. 用友善、專業的態度回應用戶問題
2. 在回覆前,用 search_knowledge_base 工具查詢相關資訊
3. 只根據知識庫中的資訊回覆,不要猜測或編造
4. 遇到退款、投訴、帳號安全問題,主動使用 escalate_to_human 工具

回覆風格:
- 簡潔清晰,避免廢話
- 中文回覆,但保留必要的技術術語
- 如果不知道答案,誠實說明並提供升級選項
"""

async def stream_support_response(
    session_id: str,
    user_message: str
) -> AsyncIterator[str]:
    """
    處理用戶問題,Streaming 回覆。
    這個 generator 會 yield 回覆的文字片段。
    """
    # 讀取對話歷史
    history = get_conversation_history(session_id)
    messages = build_messages_with_cache(history, user_message)

    # Agent 的 agentic loop
    current_messages = messages.copy()
    final_response_parts = []
    assistant_message_content = []

    while True:
        # 呼叫 API(使用 streaming)
        with client.messages.stream(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            system=[
                {
                    "type": "text",
                    "text": SYSTEM_PROMPT_BASE,
                },
                {
                    "type": "text",
                    "text": COMPANY_CONTEXT,
                    "cache_control": {"type": "ephemeral"}  # 快取公司資訊
                }
            ],
            tools=SUPPORT_TOOLS,
            messages=current_messages,
        ) as stream:

            current_tool_calls = []
            current_text = ""

            for event in stream:
                if hasattr(event, 'type'):
                    if event.type == 'content_block_start':
                        if hasattr(event.content_block, 'type'):
                            if event.content_block.type == 'text':
                                pass  # 準備收文字
                            elif event.content_block.type == 'tool_use':
                                current_tool_calls.append({
                                    "id": event.content_block.id,
                                    "name": event.content_block.name,
                                    "input": ""
                                })

                    elif event.type == 'content_block_delta':
                        if hasattr(event.delta, 'text'):
                            # 文字片段:直接 yield 給用戶
                            text_chunk = event.delta.text
                            current_text += text_chunk
                            final_response_parts.append(text_chunk)
                            yield text_chunk

                        elif hasattr(event.delta, 'partial_json'):
                            # 工具呼叫的參數(累積)
                            if current_tool_calls:
                                current_tool_calls[-1]["input"] += event.delta.partial_json

            # 取得完整的 response
            final_response = stream.get_final_message()

        # 檢查 stop_reason
        if final_response.stop_reason == "end_turn":
            # 完成!把這輪對話加入歷史
            assistant_content = []
            if current_text:
                assistant_content.append({"type": "text", "text": current_text})

            current_messages.append({
                "role": "assistant",
                "content": assistant_content if assistant_content else "好的。"
            })

            # 儲存更新後的對話歷史
            save_conversation_history(session_id, current_messages)
            return

        elif final_response.stop_reason == "tool_use":
            # 需要執行工具
            tool_results = []

            for content_block in final_response.content:
                if content_block.type == "tool_use":
                    tool_name = content_block.name
                    tool_input = content_block.input

                    # 執行工具
                    result = execute_tool(tool_name, tool_input)

                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": content_block.id,
                        "content": result
                    })

                    # 如果是升級工具,向用戶發送通知
                    if tool_name == "escalate_to_human":
                        import json
                        result_data = json.loads(result)
                        yield f"\n\n我已經為您建立工單(#{result_data['ticket_id']}),我們的專員將盡快與您聯繫。"

            # 把工具呼叫和結果加入訊息
            current_messages.append({
                "role": "assistant",
                "content": final_response.content
            })
            current_messages.append({
                "role": "user",
                "content": tool_results
            })
            # 繼續 loop,讓 agent 根據工具結果繼續生成回覆

        else:
            # max_tokens 或其他停止原因
            yield "\n\n(回覆已截斷,請繼續詢問)"
            return

FastAPI Server

# helios/main.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
import logging

from .agent import stream_support_response

app = FastAPI(title="Helios AI Support")
logger = logging.getLogger(__name__)

class ChatRequest(BaseModel):
    session_id: str
    message: str

    class Config:
        # 限制輸入長度,防止超長輸入
        max_anystr_length = 2000

@app.post("/chat")
async def chat(request: ChatRequest):
    """接受用戶問題,返回 Streaming 回覆"""

    # 基本輸入驗證
    if len(request.message.strip()) < 2:
        raise HTTPException(status_code=400, detail="問題太短,請重新輸入")

    if len(request.session_id) > 100:
        raise HTTPException(status_code=400, detail="無效的 session ID")

    async def generate():
        try:
            async for chunk in stream_support_response(
                session_id=request.session_id,
                user_message=request.message
            ):
                # SSE 格式
                yield f"data: {chunk}\n\n"
            yield "data: [DONE]\n\n"

        except Exception as e:
            logger.exception(f"Error in chat stream: {e}")
            yield f"data: 抱歉,發生了技術問題,請稍後再試。\n\n"
            yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 緩衝,確保 streaming 正常
        }
    )

@app.get("/health")
async def health_check():
    """健康檢查 endpoint"""
    return {"status": "ok"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)

成本追蹤整合

把第十三章的成本追蹤整合進來:

# helios/metrics.py
import time
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class RequestMetrics:
    session_id: str
    model: str
    input_tokens: int = 0
    output_tokens: int = 0
    cache_read_tokens: int = 0
    cache_creation_tokens: int = 0
    tool_calls: int = 0
    escalated: bool = False
    latency_ms: float = 0.0

    def log(self):
        # 計算成本
        input_cost = self.input_tokens * 3.0 / 1_000_000
        output_cost = self.output_tokens * 15.0 / 1_000_000
        cache_read_cost = self.cache_read_tokens * 0.30 / 1_000_000
        total_cost = input_cost + output_cost + cache_read_cost

        cache_hit_rate = (
            self.cache_read_tokens / max(self.input_tokens, 1) * 100
        )

        logger.info("request_completed", extra={
            "session_id": self.session_id[:8] + "...",  # 截斷,避免洩漏
            "model": self.model,
            "input_tokens": self.input_tokens,
            "output_tokens": self.output_tokens,
            "cache_read_tokens": self.cache_read_tokens,
            "cache_hit_rate_pct": round(cache_hit_rate, 1),
            "tool_calls": self.tool_calls,
            "escalated": self.escalated,
            "latency_ms": round(self.latency_ms),
            "estimated_cost_usd": round(total_cost, 6),
        })

部署到 Cloud Run

Dockerfile:

FROM python:3.12-slim

WORKDIR /app

COPY pyproject.toml poetry.lock ./
RUN pip install poetry && poetry install --no-dev

COPY helios/ ./helios/

EXPOSE 8080
CMD ["poetry", "run", "uvicorn", "helios.main:app", "--host", "0.0.0.0", "--port", "8080"]

部署指令:

# Build 和 Push 到 Google Container Registry
gcloud builds submit --tag gcr.io/YOUR_PROJECT/helios-support

# 部署到 Cloud Run
gcloud run deploy helios-support \
  --image gcr.io/YOUR_PROJECT/helios-support \
  --platform managed \
  --region asia-east1 \
  --memory 512Mi \
  --concurrency 80 \
  --set-secrets "ANTHROPIC_API_KEY=anthropic-api-key:latest" \
  --set-env-vars "REDIS_URL=redis://10.x.x.x:6379" \
  --set-env-vars "DATABASE_URL=postgresql://user:pass@10.x.x.x/helios" \
  --allow-unauthenticated

幾個 Cloud Run 的注意事項:

--concurrency 80 代表每個 Cloud Run instance 同時處理 80 個請求。AI streaming 請求會長時間佔用連線,這個值可以根據你的實際情況調整。

Secrets(API Key、資料庫密碼)用 Google Secret Manager 管理,透過 --set-secrets 注入,不要放在環境變數裡直接傳。

成本估算

這個系統在 1000 名日活用戶、每人平均 5 個問題的情況下:

每天請求數:5,000 每次請求的 token 估算:

  • Input(含 COMPANY_CONTEXT):~3,000 tokens,其中 ~2,500 可以快取
  • Output:~300 tokens
  • 快取命中後,有效輸入成本:(500 × $3 + 2500 × $0.30) / 1,000,000 = $0.00225 / 請求

每天成本:5,000 × ($0.00225 + 300 × $15/1,000,000) = $11.25 + $22.50 = $33.75/天

每月成本:~$1,000/月

這比第十三章的目標 $500/月 高一點。要進一步降低,可以:

  1. 把分類用的 Haiku 做 routing,簡單問題完全用 Haiku 回答(降至 ~$600)
  2. 提高快取命中率(確保 5 分鐘內有足夠的請求維持快取熱度)

這本書的旅程

你剛讀完了「Claude API & Agent SDK 完全指南」的最後一章。

我想用幾段話回顧一下我們走過的路程。

這本書是 Claude 三部曲的第三本。第一本(Claude 使用指南)告訴你怎麼跟 Claude 對話。第二本(Claude Code 完全指南)告訴你怎麼讓 AI 幫你寫程式碼。這本書——第三本——告訴你怎麼用 Claude 建造東西:真正的應用、真正的系統、真正可以服務用戶的產品。

從第一章的第一個 client.messages.create() 呼叫,到這一章的多 agent、streaming、RAG 整合系統——中間走了很長的路。

如果你真的把這本書的範例都跑過了,你現在應該能做這些事:

  • 用 Messages API 建立任何對話型應用
  • 用 Tool Use 讓 AI 操控外部系統
  • 用 Prompt Caching 把成本降到最低
  • 用 Streaming 讓用戶體驗更流暢
  • 用 Agent SDK 建立多 agent 協作系統
  • 開發自己的 MCP Server
  • 把這一切部署到生產環境,並且知道怎麼監控和維護

接下來呢?

AI 應用開發這個領域正在高速演進。Anthropic 每隔幾個月就會推出新功能——更好的模型、更低的成本、更強的 agent 能力。我的建議是:

建立一個真實的東西。不要停在讀書和做練習。找一個你真正有需求的問題,用你在這本書學到的技術解決它。你在解決真實問題的過程中學到的東西,遠比讀任何書都多。

關注官方文件。Anthropic 的 docs.anthropic.com 是最可靠的資訊來源,這本書的範例和定價資訊可能因版本更新而改變,但官方文件永遠是最新的。

加入社群。Anthropic 的 Discord、各種 AI 開發者社群——這些地方有很多正在解決和你類似問題的開發者。

最後,我想說:

我花了很多時間在這三本書裡,試著把「如何真正用好 Claude」說清楚——不是表面的功能介紹,而是讓你能真正建造東西的深度知識。寫作的過程其實也是我重新整理自己理解的過程。

如果這本書對你有幫助,最好的感謝方式,就是去建造一個真實的東西。

祝你的 AI 應用上線順利。

—— Bobo

留言討論

esc
輸入關鍵字搜尋文章...
查看收藏 →