构建基于容器化 LlamaIndex 与 SSE 的 Google Cloud Functions 流式 RAG 管道


一个生产环境的 RAG (Retrieval-Augmented Generation) 应用,其响应延迟直接决定了用户体验。当用户提交一个查询后,长达数十秒的等待时间是完全不可接受的。问题在于,一个完整的 RAG 流程,包括查询向量化、相似度检索、上下文注入和最终的 LLM 生成,本质上是耗时的。流式响应(Streaming)并非一个可选项,而是一个必需品。

但在无服务器(Serverless)架构,特别是 Google Cloud Functions (GCF) 上实现这一切,会立刻遇到几个棘手的工程问题:

  1. 依赖地狱: LlamaIndex 及其底层依赖(如 PyTorch, Transformers)构成了一个复杂的 Python 环境。GCF 基于 Zip 包的部署方式在这种场景下极其脆弱且难以管理。
  2. 冷启动与模型加载: RAG 应用的核心是索引和模型。对于一个无状态的函数实例,这些资源在每次冷启动时都必须重新加载,这可能导致首次请求的延迟达到无法容忍的程度。
  3. 执行超时: Serverless 函数有严格的执行时间限制。Google Cloud Functions (Gen2) 的上限是 60 分钟,但默认值要短得多。一次复杂的 LLM 流式生成任务很可能触碰到这个天花板。

我们的目标是构建一个能稳定承载 LlamaIndex、实现 SSE 流式输出、并且能可靠部署在 GCF 上的 RAG 服务。第一步的技术选型决策是放弃传统的 Zip 部署,转向 GCF Gen2 对容器镜像的支持。这直接解决了依赖管理问题,并为后续优化提供了坚实的基础。

第一步:项目结构与开发环境规约

在真实项目中,混乱的代码格式和依赖管理是协作的巨大障碍。我们首先要解决这个问题。项目结构如下:

.
├── .github/workflows/         # (可选) CI/CD 流水线
├── .prettierrc.json           # Prettier 配置文件
├── app/                       # 应用代码目录
│   ├── core/                  # 核心业务逻辑
│   │   ├── __init__.py
│   │   └── rag_service.py     # RAG 服务的实现
│   ├── routers/               # API 路由
│   │   ├── __init__.py
│   │   └── stream.py          # SSE 流式接口
│   ├── __init__.py
│   └── main.py                # FastAPI 应用入口
├── tests/                     # 单元测试与集成测试
│   ├── __init__.py
│   └── test_rag_service.py
├── Dockerfile                 # 容器构建文件
├── pyproject.toml             # Python 项目元数据与依赖
└── README.md

pyproject.toml 使用 Poetry 进行依赖管理,这比 requirements.txt 更具确定性。

# pyproject.toml

[tool.poetry]
name = "serverless-rag-stream"
version = "0.1.0"
description = "Streaming RAG pipeline on GCF"
authors = ["Your Name <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.11"
fastapi = "^0.104.1"
uvicorn = {extras = ["standard"], version = "^0.23.2"}
llama-index = "^0.8.54"
google-cloud-storage = "^2.13.0"
# torch 和 torchvision 需要根据你的基础镜像和硬件(CPU/GPU)指定版本
# 对于 Cloud Functions (CPU),使用 CPU 版本
torch = {version = "2.1.0+cpu", source = "pytorch_cpu"}
torchvision = {version = "0.16.0+cpu", source = "pytorch_cpu"}

[[tool.poetry.source]]
name = "pytorch_cpu"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

这里的坑在于,torch 的安装需要特别指定源,以确保我们拉取的是 CPU 版本,这对于在 GCF 这种通用计算环境中运行至关重要。

为了确保代码风格的一致性,我们引入 Prettier。它不仅仅用于前端,通过插件,可以格式化 JSON、YAML、Dockerfile 等,确保基础设施配置文件也保持规范。

// .prettierrc.json
{
  "semi": false,
  "singleQuote": true,
  "trailingComma": "es5",
  "printWidth": 80
}

第二步:核心 RAG 服务与索引持久化

无状态是 Serverless 的核心原则,但我们的索引文件是有状态的。一个常见的错误是试图将索引打包进容器镜像,这会导致镜像臃肿,并且索引无法动态更新。正确的做法是将索引存储在外部持久化服务中,如 Google Cloud Storage (GCS)。

服务在冷启动时,会从 GCS 下载索引文件到函数实例的临时文件系统中 (/tmp)。

# app/core/rag_service.py

import os
import logging
from pathlib import Path
from typing import AsyncGenerator

from llama_index import (
    StorageContext,
    load_index_from_storage,
    VectorStoreIndex,
    ServiceContext,
)
from llama_index.llms import OpenAI # 示例使用 OpenAI, 可替换
from google.cloud import storage

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从环境变量获取配置
GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME")
INDEX_GCS_PATH = os.getenv("INDEX_GCS_PATH", "rag_indices/default")
LOCAL_INDEX_PATH = "/tmp/rag_index"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

class RAGService:
    """
    封装 LlamaIndex RAG 核心逻辑.
    处理索引的加载、缓存和流式查询.
    """
    _index: VectorStoreIndex = None
    _storage_client: storage.Client = None

    def __init__(self):
        if not GCS_BUCKET_NAME:
            raise ValueError("GCS_BUCKET_NAME 环境变量未设置")
        if not OPENAI_API_KEY:
            raise ValueError("OPENAI_API_KEY 环境变量未设置")
        
        # 在真实项目中, 这里的 LLM 和 ServiceContext 会有更复杂的配置
        # 例如模型参数、embedding model 等
        self._service_context = ServiceContext.from_defaults(
            llm=OpenAI(model="gpt-4-1106-preview", api_key=OPENAI_API_KEY, temperature=0.1)
        )

    def _download_index_from_gcs(self):
        """
        从 GCS 下载索引文件到本地临时目录.
        这是一个阻塞 IO 操作, 只应在冷启动时执行一次.
        """
        logging.info(f"开始从 gs://{GCS_BUCKET_NAME}/{INDEX_GCS_PATH} 下载索引...")
        if self._storage_client is None:
            self._storage_client = storage.Client()

        bucket = self._storage_client.bucket(GCS_BUCKET_NAME)
        blobs = bucket.list_blobs(prefix=INDEX_GCS_PATH)
        
        index_path = Path(LOCAL_INDEX_PATH)
        index_path.mkdir(parents=True, exist_ok=True)
        
        download_count = 0
        for blob in blobs:
            if not blob.name.endswith('/'): # 忽略文件夹对象
                destination_uri = index_path / Path(blob.name).relative_to(INDEX_GCS_PATH)
                destination_uri.parent.mkdir(parents=True, exist_ok=True)
                blob.download_to_filename(str(destination_uri))
                download_count += 1
        
        if download_count == 0:
            logging.warning("在 GCS 路径中未找到任何索引文件.")
            return False

        logging.info(f"索引下载完成, 共 {download_count} 个文件.")
        return True

    def _load_index(self):
        """
        加载索引. 如果本地已存在, 直接加载; 否则, 从 GCS 下载.
        这是一个同步方法, 确保在服务接受请求前索引已就绪.
        """
        try:
            logging.info(f"尝试从本地路径 {LOCAL_INDEX_PATH} 加载索引...")
            storage_context = StorageContext.from_defaults(persist_dir=LOCAL_INDEX_PATH)
            self._index = load_index_from_storage(storage_context, service_context=self._service_context)
            logging.info("从本地缓存成功加载索引.")
        except Exception:
            logging.info("本地索引加载失败, 尝试从 GCS 下载.")
            if self._download_index_from_gcs():
                storage_context = StorageContext.from_defaults(persist_dir=LOCAL_INDEX_PATH)
                self._index = load_index_from_storage(storage_context, service_context=self._service_context)
                logging.info("从 GCS 下载后成功加载索引.")
            else:
                # 在真实项目中, 这里可能需要一个回退机制, 比如创建一个空索引
                # 或者直接抛出异常使函数实例启动失败.
                logging.error("无法从任何来源加载索引, RAG 服务不可用.")
                raise RuntimeError("Failed to load RAG index.")

    def get_index(self) -> VectorStoreIndex:
        """
        单例模式获取索引实例.
        确保索引只在第一次请求时加载一次 (对于每个函数实例).
        """
        if self._index is None:
            self._load_index()
        return self._index

    async def stream_chat(self, query: str) -> AsyncGenerator[str, None]:
        """
        执行流式 RAG 查询.
        返回一个异步生成器, 逐个产出 LLM 生成的 token.
        """
        index = self.get_index()
        # similarity_top_k 控制检索的文档数量, 这是性能和成本的关键参数
        chat_engine = index.as_chat_engine(chat_mode="condense_plus_context", similarity_top_k=3)
        
        streaming_response = await chat_engine.astream_chat(query)
        
        async for token in streaming_response.async_response_gen():
            yield token

# 创建一个全局单例, GCF 的每个实例将持有这个对象
# 这利用了 Python 模块的缓存机制, 在同一个函数实例的多次调用之间共享 RAGService 实例
rag_service_instance = RAGService()

此代码段的核心设计是 rag_service_instance 这个全局单例。GCF 在处理完一个请求后,会“冻结”函数实例,并在下一个请求到来时“解冻”。只要实例不被回收,全局变量的状态就会被保留。这意味着 RAGService 实例及其加载的索引 _index 可以在多次调用之间复用,极大地降低了后续请求的延迟。

第三步:实现 SSE 流式 API

Server-Sent Events (SSE) 是一种轻量级的、单向的服务器推送技术,非常适合实现 LLM 的流式输出。FastAPI 对此提供了出色的支持。

# app/routers/stream.py

import asyncio
import json
import logging
from fastapi import APIRouter, Request, HTTPException
from sse_starlette.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

from app.core.rag_service import rag_service_instance

router = APIRouter()

class ChatRequest(BaseModel):
    query: str

async def rag_event_generator(query: str) -> AsyncGenerator[ServerSentEvent, None]:
    """
    将 RAG 服务的 token 流包装成 SSE 事件流.
    """
    try:
        # 这里的 for 循环会一直等待 RAG 服务的下一个 token
        async for token in rag_service_instance.stream_chat(query):
            # SSE 规范要求数据是字符串. 我们用 JSON 包装.
            # 这样未来可以方便地扩展, 比如附带 token 的元数据.
            yield ServerSentEvent(data=json.dumps({"token": token}))
            # 短暂的休眠可以让事件循环处理其他任务, 避免在高速生成时阻塞
            await asyncio.sleep(0.01)
        
        # 流结束时, 发送一个特殊的 'done' 事件
        yield ServerSentEvent(data=json.dumps({"status": "done"}), event="end")

    except Exception as e:
        logging.error(f"RAG 流处理异常: {e}", exc_info=True)
        # 在流中向客户端报告错误
        error_payload = json.dumps({"error": "处理请求时发生内部错误."})
        yield ServerSentEvent(data=error_payload, event="error")

@router.post("/stream-chat")
async def stream_chat_endpoint(chat_request: ChatRequest):
    """
    接收用户查询并返回一个 SSE 流式响应.
    """
    if not chat_request.query:
        raise HTTPException(status_code=400, detail="查询内容不能为空")
    
    # EventSourceResponse 是 sse-starlette 提供的核心功能
    return EventSourceResponse(rag_event_generator(chat_request.query))

# 在应用主入口加载路由
# app/main.py

from fastapi import FastAPI
from app.routers import stream

app = FastAPI(
    title="Serverless RAG Streaming API",
    description="使用 LlamaIndex, FastAPI 和 GCF 实现的流式 RAG 服务",
    version="1.0.0",
)

# 在应用启动时就尝试加载索引, 从而将冷启动的 IO 延迟前置
@app.on_event("startup")
def on_startup():
    from app.core.rag_service import rag_service_instance
    try:
        # 这个调用会触发 GCS 下载和索引加载
        rag_service_instance.get_index()
    except Exception as e:
        # 如果启动失败, 函数将无法处理请求, 这是一种快速失败的策略
        logging.critical(f"应用启动失败: 无法加载 RAG 索引. {e}", exc_info=True)
        # 在 GCF 环境中, 抛出异常将导致实例被标记为不健康
        raise

app.include_router(stream.router, prefix="/api/v1")

@app.get("/health")
def health_check():
    return {"status": "ok"}

这里的关键是 EventSourceResponse。它接收一个异步生成器,并将其转换为一个符合 SSE 规范的 HTTP 响应。我们还设计了错误处理和流结束的信令,这在生产环境中是必不可少的。在 on_startup 事件中预加载索引,虽然会增加冷启动时间,但保证了第一个用户请求能被更快地处理,而不是在请求处理路径中才进行耗时的 IO 操作。

第四步:容器化与部署

Dockerfile 是将这一切打包的关键。一个常见的错误是构建一个包含所有构建工具和中间产物的臃肿镜像。在 Serverless 场景下,镜像大小直接影响冷启动速度和成本。我们必须使用多阶段构建来优化。

# Dockerfile

# ---- Stage 1: Builder ----
# 使用一个包含完整构建工具的镜像来安装依赖
FROM python:3.11-slim as builder

# 设置工作目录
WORKDIR /app

# 安装 Poetry
RUN pip install poetry

# 仅复制依赖定义文件
COPY pyproject.toml poetry.lock ./

# 安装依赖项, --no-root 确保项目本身不被安装
# --only main 只安装生产依赖, 忽略开发依赖 (如 pytest)
# 这一步会利用 Docker 的层缓存, 只要 pyproject.toml 和 poetry.lock 不变, 就不需要重新安装
RUN poetry config virtualenvs.create false && \
    poetry install --no-interaction --no-ansi --no-root --only main

# ---- Stage 2: Final Image ----
# 使用一个非常精简的基础镜像
FROM python:3.11-slim

# 设置环境变量, 确保 Python 输出不被缓冲
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# Gunicorn/Uvicorn 将绑定到这个端口
ENV PORT 8080

WORKDIR /app

# 从 builder 阶段复制已安装的依赖
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# 复制应用代码
COPY ./app ./app

# GCF 需要一个 web 服务器来启动应用.
# 我们使用 uvicorn. Gunicorn + uvicorn worker 也是一个常见的选择, 可以提供更好的并发管理.
# GCF 会自动注入 PORT 环境变量.
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

这个 Dockerfile 的精髓在于:

  1. 多阶段构建: builder 阶段处理了所有繁重的依赖安装,最终镜像只包含必要的运行时文件和 site-packages,体积大大减小。
  2. 层缓存: 先复制 pyproject.toml 再安装依赖,可以最大限度地利用 Docker 的缓存。只有在依赖变更时,安装步骤才会重新运行。
  3. 生产服务器: 使用 uvicorn 作为 ASGI 服务器。在 GCF 环境中,它会被 Functions Framework 自动管理。

部署到 Google Cloud Functions 的命令如下:

# 确保你已经通过 gcloud auth login 登录
# 并通过 gcloud config set project [YOUR_PROJECT_ID] 设置了项目

# 启用必要的服务
gcloud services enable \
    run.googleapis.com \
    cloudfunctions.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com

# 创建一个 Artifact Registry 仓库来存储 Docker 镜像
gcloud artifacts repositories create rag-images \
    --repository-format=docker \
    --location=asia-east1 # 选择离你近的区域

# 构建并推送镜像到 Artifact Registry
# GCF 会使用 Cloud Build 在云端完成这个过程
gcloud functions deploy rag-stream-service \
    --gen2 \
    --runtime=python311 \
    --region=asia-east1 \
    --source=. \
    --entry-point=app \
    --trigger-http \
    --allow-unauthenticated \
    --set-env-vars="GCS_BUCKET_NAME=your-rag-indices-bucket,INDEX_GCS_PATH=rag_indices/default,OPENAI_API_KEY=your_openai_key" \
    --memory=4Gi \
    --cpu=2 \
    --timeout=900s # 设置 15 分钟超时

这里的部署命令利用了 GCF 的源部署功能,它会自动使用 Cloud Build 根据 Dockerfile 构建镜像并推送到 Artifact Registry。我们必须为函数分配足够的内存和 CPU,因为 LlamaIndex 和 PyTorch 是资源密集型的。同时,将超时时间设置为一个较大的值(例如900秒)是应对长查询的关键。

graph TD
    subgraph Client
        A[Browser/Mobile App]
    end

    subgraph Google Cloud
        A -- HTTPS POST /api/v1/stream-chat --> B(Cloud Functions HTTP Trigger)
        B -- Invokes --> C{Function Instance}
        
        subgraph C
            D[Container: FastAPI/Uvicorn]
            D -- Cold Start --> E{RAGService Singleton}
            E -- Loads Index --> F[GCS Bucket: /rag_indices]
            D -- Query --> G[LlamaIndex Chat Engine]
            G -- Generates Tokens --> H[External LLM API]
            H -- Tokens --> G
            G -- Yields Tokens --> D
        end

        C -- SSE Stream (text/event-stream) --> A
    end
    
    style C fill:#f9f,stroke:#333,stroke-width:2px

局限性与未来迭代路径

这套架构解决了核心的依赖管理和流式响应问题,但在生产环境中,它并非终点。

  1. 冷启动延迟依然存在: 尽管索引加载被优化,但容器镜像的拉取、Python 解释器的启动、FastAPI 应用的初始化仍需要时间,对于延迟敏感的应用,首次请求的延迟可能仍在 5-15 秒之间。解决方案可以是配置 GCF 的 min-instances 为 1 或更高,但这会带来持续的成本,本质上是用资金换取延迟。
  2. 超时硬顶: 虽然 GCF Gen2 的超时时间可以设置到 60 分钟,但 SSE 依赖于一个不中断的 HTTP 连接。任何中间网络代理(如公司的防火墙)都可能提前切断长连接。更健壮的方案可能需要引入 WebSocket,或者在客户端实现自动重连逻辑。
  3. 成本考量: LlamaIndex 和 LLM 的调用是计算密集型的。GCF 的计费模型基于 vCPU-秒和 GB-秒。长时间运行的流式请求可能会产生高昂的费用。对于高并发场景,Google Cloud Run 可能是一个更具成本效益的选择,因为它提供了更好的并发控制和更灵活的扩缩容策略。
  4. 无状态的挑战: 当前实现是无状态的。要支持多轮对话,必须引入外部存储(如 Firestore 或 Redis)来管理对话历史。这将增加架构的复杂性,需要在 RAGService 中引入会话管理逻辑。

  目录