利用 Vercel 与 EKS 构建混合架构实现 LangChain 异步任务处理及 SQLite 状态持久化


我们面临一个具体的工程挑战:需要部署一个基于 LangChain 的文档问答(RAG)服务。这个服务的核心工作流包含几个计算密集且耗时较长的步骤:接收用户查询、从向量数据库检索相关文档片段、构建复杂的提示(Prompt)、最后调用大语言模型(LLM)生成答案。单个请求的处理时间可能在 15 秒到 2 分钟之间,这取决于文档的复杂度和 LLM 的响应速度。

将这样的服务直接部署为同步 API 显然是不可行的,它会轻易地超出 API 网关和客户端的超时限制。因此,一个异步处理架构是必然选择。问题在于,选择什么样的技术栈来构建这个架构,才能在成本、性能和运维复杂度之间找到最佳平衡点。

方案 A:纯粹的无服务器(Serverless)架构

第一个进入考虑范围的方案是完全基于 AWS Lambda 或 Vercel Functions 的无服务器架构。

  • 入口: 一个 Vercel Function 作为 API Gateway,接收用户请求。
  • 异步通信: 该 Function 将任务参数推送到 Amazon SQS 队列。
  • 处理: 另一个长时间运行的 Lambda Function (配置最大超时15分钟) 消费 SQS 消息,执行完整的 LangChain RAG 流程。
  • 状态与数据: 向量数据存储在专用的向量数据库(如 Pinecone)或通过 pgvector 扩展的 RDS PostgreSQL 中。任务状态通过 DynamoDB 进行跟踪。

优势分析:

  1. 极致的成本效益: 在没有请求时,计算成本几乎为零。这对于流量波动大的应用极具吸引力。
  2. 免运维: 无需管理服务器、操作系统或运行时。开发人员可以专注于业务逻辑。
  3. 自动扩缩容: 平台自动处理并发请求的伸缩。

劣势分析:

  1. 计算限制: 即使是15分钟的超时,对于未来可能更复杂的模型或更长的文档处理任务,也存在风险。更重要的是,无服务器函数通常在 CPU 和内存资源上受到严格限制,这对于需要加载大型本地模型或处理大型数据集的 LangChain 应用来说是个硬伤。
  2. 状态管理复杂性: 无服务器的无状态特性意味着每次调用都需要重新初始化 LangChain 的执行器、模型和工具,这会带来显著的冷启动延迟。虽然有预置并发(Provisioned Concurrency)可以缓解,但这会增加成本,削弱了 Serverless 的核心优势。
  3. 依赖网络 I/O: RAG 流程中的每一步,无论是从 S3 加载数据、查询向量数据库,还是调用外部 LLM API,都严重依赖网络 I/O。在资源受限的函数环境中,高延迟的网络调用会进一步压缩本已紧张的执行时间窗口。

在真实项目中,我们发现纯 Serverless 方案对于这种重量级 AI 推理任务显得力不从心。成本模型在持续高负载下并不优于专有计算实例,而性能瓶颈和初始化开销成为了无法忽视的障碍。

方案 B:完全基于 Kubernetes (EKS) 的架构

另一个极端是将所有组件都容器化,部署在一个 AWS EKS 集群上。

  • 入口: 一个 Kubernetes Service (类型为 LoadBalancer) 暴露一个 Ingress 控制器,将流量路由到 API 服务 Pod。
  • 异步通信: 在集群内部署一个消息队列,如 RabbitMQ 或 NATS。
  • 处理: 一组专用的 Worker Pod,负责消费队列消息并执行 LangChain 任务。可以利用 KEDA (Kubernetes-based Event Driven Autoscaling) 根据队列长度自动伸缩 Worker Pod 的数量。
  • 状态与数据: 同样使用外部的向量数据库和状态数据库。

优势分析:

  1. 强大的计算能力和灵活性: 我们可以为 Worker Pod 分配充足的 vCPU 和内存,甚至挂载 GPU 资源。没有执行时间的硬性限制。
  2. 环境一致性: 从开发到生产,所有环境都基于容器,减少了“在我机器上可以运行”的问题。
  3. 生态系统成熟: Kubernetes 社区提供了丰富的工具来处理日志、监控、服务发现等问题。

劣势分析:

  1. 成本高昂: 即使在没有流量的情况下,也需要为 EKS 控制平面和最小节点组(Node Group)支付固定费用。对于初创项目或内部工具,这是一笔不小的开销。
  2. 运维复杂度: 管理 EKS 集群、配置网络策略、处理节点升级、维护 Helm Charts 等,需要一个专门的 DevOps 团队或具备深厚云原生技能的工程师。
  3. 冷启动与扩容速度: 虽然 KEDA 可以实现自动扩容,但从零个 Pod 扩容到第一个 Pod(或节点扩容)仍然需要分钟级的时间,无法像 Serverless 那样实现毫秒级的即时响应。

对于我们的场景,一个始终在线的 EKS 集群来处理可能每天只有几次调用高峰的任务,无疑是一种资源浪费。

最终选择:Vercel + EKS + SQLite 的混合架构

权衡利弊后,我们决定采用一种混合架构,结合两者的优点,同时引入一个非传统的组件——SQLite,来解决一个特定的性能瓶颈。

graph TD
    subgraph "用户侧"
        Client[客户端]
    end

    subgraph "Vercel Edge Network"
        VercelFunc[Vercel Function API Endpoint]
    end

    subgraph "AWS Cloud"
        SQS[Amazon SQS Queue]
        S3[S3 Bucket for Litestream]
        subgraph "AWS EKS Cluster"
            direction LR
            Deployment[K8s Deployment]
            Pod[Worker Pod]
            Deployment --> Pod
            Pod -- polls --> SQS
            subgraph Pod
                direction TB
                AppContainer[LangChain App Container]
                SidecarContainer[Litestream Sidecar]
                AppContainer -- reads/writes --> SharedVolume
                SidecarContainer -- watches & replicates --> SharedVolume
            end
            SharedVolume[Shared Volume
/data/app.db] SidecarContainer --> S3 end end Client -- HTTP POST /api/process --> VercelFunc VercelFunc -- Enqueue Job --> SQS VercelFunc -- HTTP 202 Accepted (jobId) --> Client

这个架构的核心思想是 职责分离

  • Vercel Functions 作为轻量级入口: 利用 Vercel 的全球边缘网络和即时扩容能力,处理所有面向用户的 API 请求。它只做一件事:验证请求、生成一个唯一的任务 ID、将任务载荷推送到 SQS 队列,然后立即向客户端返回 202 Accepted 响应和任务 ID。这个过程极快,成本极低。
  • EKS 作为重量级计算后端: 一个规模可以缩减到零的 EKS 节点组(使用 Cluster Autoscaler 和 Karpenter)承载着我们真正的 LangChain Worker。Worker Pod 由一个 Deployment 管理,其副本数由 KEDA 根据 SQS 队列中的可见消息数量动态调整。当没有任务时,Worker Pod 数量为0;当任务涌入时,KEDA 会在几秒钟内启动新的 Pod。
  • SQLite + Litestream 作为低延迟状态存储: 这是整个架构中最具争议也最有价值的部分。对于我们的 RAG 应用,向量数据和元数据一旦生成,在一段时间内是只读的。每次任务启动都从远程数据库(如 RDS)拉取GB级别的数据,会引入显著的网络延迟和成本。我们的解决方案是:将这部分数据预先打包成一个 SQLite 数据库文件。在 Worker Pod 启动时,Litestream sidecar 会从 S3 将最新的数据库副本恢复到 Pod 内的一个共享卷(Shared Volume)上。LangChain 应用直接读取这个本地文件,实现了纳秒级的本地磁盘 I/O,完全消除了网络延迟。如果任务过程中有状态更新(例如记录处理进度),应用会写入本地 SQLite 数据库,Litestream 会自动、持续地将变更流式传输回 S3,为其他 Pod 或未来的 Pod 实例提供状态一致性。

核心实现概览

1. Vercel Function (Next.js API Route)

这里的代码必须是生产级的,包含完整的类型定义、环境变量处理和错误捕获。

src/pages/api/process.ts:

import { NextApiRequest, NextApiResponse } from 'next';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { randomUUID } from 'crypto';

// 确保在 Vercel 环境变量中配置了这些值
const sqsClient = new SQSClient({
  region: process.env.AWS_REGION!,
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
  },
});

const SQS_QUEUE_URL = process.env.SQS_QUEUE_URL!;

interface ProcessRequestBody {
  query: string;
  document_scope: string[];
}

interface ApiResponse {
  jobId?: string;
  message: string;
}

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse<ApiResponse>
) {
  if (req.method !== 'POST') {
    return res.status(405).json({ message: 'Method Not Allowed' });
  }

  try {
    const { query, document_scope } = req.body as ProcessRequestBody;

    // 服务端输入验证
    if (!query || typeof query !== 'string' || query.length < 5) {
      return res.status(400).json({ message: 'Invalid "query" parameter.' });
    }
    if (!Array.isArray(document_scope) || document_scope.length === 0) {
      return res.status(400).json({ message: 'Invalid "document_scope" parameter.' });
    }

    const jobId = randomUUID();
    const taskPayload = {
      jobId,
      query,
      document_scope,
      submittedAt: new Date().toISOString(),
    };

    const command = new SendMessageCommand({
      QueueUrl: SQS_QUEUE_URL,
      MessageBody: JSON.stringify(taskPayload),
      MessageGroupId: 'langchain-rag-tasks', // 对于 FIFO 队列
      MessageDeduplicationId: jobId, // 对于 FIFO 队列
    });
    
    // 这里的日志在 Vercel Functions 的控制台可见
    console.log(`Sending job ${jobId} to SQS...`);
    await sqsClient.send(command);
    console.log(`Job ${jobId} successfully sent.`);

    return res.status(202).json({ jobId, message: 'Processing started.' });

  } catch (error) {
    console.error('Failed to enqueue task:', error);
    // 避免向客户端暴露详细的内部错误
    return res.status(500).json({ message: 'Internal Server Error' });
  }
}

这段代码简洁而健壮。它承担了 API 入口的全部职责,并将状态和计算完全外包给了 AWS。

2. EKS Worker Deployment

这是架构的核心。我们需要一个 Kubernetes Deployment YAML 文件,它定义了主应用容器和 Litestream sidecar 容器,以及它们如何共享数据卷。

k8s/worker-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: langchain-rag-worker
  namespace: ai-processing
spec:
  replicas: 0 # KEDA 将会管理副本数
  selector:
    matchLabels:
      app: langchain-rag-worker
  template:
    metadata:
      labels:
        app: langchain-rag-worker
    spec:
      containers:
      - name: langchain-app
        image: your-account.dkr.ecr.us-west-2.amazonaws.com/langchain-worker:latest
        env:
        - name: DATABASE_PATH
          value: "/data/app.db"
        - name: AWS_REGION
          value: "us-west-2"
        - name: SQS_QUEUE_URL
          value: "https://sqs.us-west-2.amazonaws.com/..."
        # 假设使用 IAM Roles for Service Accounts (IRSA) 来授予 Pod 访问 SQS 的权限
        volumeMounts:
        - name: db-storage
          mountPath: /data
      
      - name: litestream-sidecar
        image: litestream/litestream:latest
        args:
        - "replicate"
        # 这里的命令会先尝试从 S3 恢复最新数据库,然后持续监听本地文件变更并复制到 S3
        - "-config"
        - "/etc/litestream.yml"
        env:
        # Litestream 需要 AWS 凭证来访问 S3
        - name: LITESTREAM_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: aws-credentials
              key: access-key-id
        - name: LITESTREAM_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: aws-credentials
              key: secret-access-key
        volumeMounts:
        - name: db-storage
          mountPath: /data
        - name: litestream-config
          mountPath: /etc

      volumes:
      - name: db-storage
        emptyDir: {} # Pod 内的临时存储,生命周期与 Pod 相同
      - name: litestream-config
        configMap:
          name: litestream-configmap

配套的 litestream.yml ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: litestream-configmap
  namespace: ai-processing
data:
  litestream.yml: |
    # litestream.yml
    dbs:
      - path: /data/app.db
        replicas:
          - type: s3
            bucket: "your-litestream-backup-bucket"
            path: "rag_db"
            region: "us-west-2"

3. Python Worker Script

这个 Python 脚本是运行在 langchain-app 容器中的主程序。它使用 boto3 轮询 SQS,并使用 langchainsqlite3 处理任务。

worker/main.py:

import os
import json
import logging
import sqlite3
import time
import boto3
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS # 假设使用 FAISS,它可以轻松序列化/反序列化
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import OpenAI

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从环境变量获取配置
DATABASE_PATH = os.getenv("DATABASE_PATH", "/data/app.db")
SQS_QUEUE_URL = os.getenv("SQS_QUEUE_URL")
AWS_REGION = os.getenv("AWS_REGION")

# 初始化 AWS SQS 客户端
sqs = boto3.client("sqs", region_name=AWS_REGION)

class RagProcessor:
    def __init__(self, db_path):
        self.db_path = db_path
        self._vector_store = None
        self._qa_chain = None
        
        # 这里的关键点是:确保数据库文件存在。
        # Litestream sidecar 应该在容器启动时已经从 S3 恢复了它。
        if not os.path.exists(self.db_path):
            logging.error(f"Database file not found at {self.db_path}. Litestream might have failed.")
            # 这种情况下,Pod 应该失败并重启
            raise FileNotFoundError(f"Database not restored at {self.db_path}")
            
        logging.info("Initializing vector store from local SQLite DB...")
        self._initialize_components()

    def _initialize_components(self):
        """
        从本地 SQLite 加载或构建 LangChain 组件。
        这是一个示例,真实场景可能更复杂。
        例如,FAISS 索引可以被序列化并存储在 SQLite 的 BLOB 字段中。
        """
        try:
            # 假设我们有一个函数可以从 SQLite DB 加载 FAISS 索引
            # conn = sqlite3.connect(self.db_path)
            # serialized_index = conn.execute("SELECT data FROM vector_store WHERE id = 1").fetchone()[0]
            # conn.close()
            # self._vector_store = FAISS.deserialize_from_bytes(serialized_index, OpenAIEmbeddings())

            # 为简化示例,这里我们假设 FAISS 索引文件与 db 在同一目录
            # Litestream 也可以同步多个文件
            self._vector_store = FAISS.load_local("faiss_index", OpenAIEmbeddings())
            self._qa_chain = RetrievalQA.from_chain_type(
                llm=OpenAI(),
                chain_type="stuff",
                retriever=self._vector_store.as_retriever()
            )
            logging.info("LangChain components initialized successfully.")
        except Exception as e:
            logging.error(f"Failed to initialize LangChain components: {e}")
            raise

    def process(self, query: str) -> str:
        if not self._qa_chain:
            raise Exception("QA chain is not initialized.")
        logging.info(f"Processing query: '{query}'")
        result = self._qa_chain.run(query)
        return result

def poll_sqs():
    processor = RagProcessor(DATABASE_PATH)
    logging.info("Worker started, polling SQS queue...")
    
    while True:
        try:
            response = sqs.receive_message(
                QueueUrl=SQS_QUEUE_URL,
                MaxNumberOfMessages=1,
                WaitTimeSeconds=20, # 长轮询
                AttributeNames=['All'],
                MessageAttributeNames=['All']
            )

            if "Messages" in response:
                message = response["Messages"][0]
                receipt_handle = message["ReceiptHandle"]
                
                try:
                    task = json.loads(message["Body"])
                    job_id = task.get("jobId", "unknown")
                    logging.info(f"Received job {job_id}.")

                    answer = processor.process(task["query"])
                    
                    logging.info(f"Job {job_id} completed. Answer: {answer[:100]}...")
                    # 可以在这里将结果写入 S3 或另一个数据库
                    
                    # 任务处理成功后,从队列中删除消息
                    sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
                    logging.info(f"Deleted message for job {job_id} from SQS.")

                except Exception as e:
                    logging.error(f"Error processing message: {e}. Message will be returned to queue.")
                    # 不删除消息,它将在可见性超时后重新出现在队列中
                    # 在生产中,需要配置死信队列(DLQ)来处理持续失败的消息
                    time.sleep(10) # 避免快速失败循环
            else:
                logging.info("No messages in queue. Waiting...")
        except Exception as e:
            logging.error(f"An error occurred during SQS polling: {e}")
            time.sleep(10) # 在出现网络等问题时稍作等待

if __name__ == "__main__":
    poll_sqs()

架构的扩展性与局限性

这个混合架构的优势在于它为特定类型的工作负载提供了极高的性价比和性能。它并非万能药,其适用边界和局限性也非常明确。

局限性:

  1. 写密集型场景不适用: Litestream 本质上是一个单主复制系统。它非常适合读密集、单点写入或批处理写入的场景。如果多个 Worker Pod 需要频繁地、并发地写入同一个 SQLite 数据库,将会导致数据分叉和冲突。在这种情况下,必须回归到传统的、支持并发控制的分布式数据库(如 RDS, DynamoDB)。
  2. 数据一致性是最终一致性: Litestream 将变更异步复制到 S3。这意味着如果一个 Pod 写入数据后立即崩溃,而此时数据尚未完全同步到 S3,那么新启动的 Pod 可能会恢复到一个稍旧的状态。对于我们的 RAG 任务,丢失几秒钟的处理状态通常是可以接受的,但对于需要强事务保证的系统,这是不可行的。
  3. 恢复时间(RTO): 如果 SQLite 数据库非常大(例如几十GB),从 S3 恢复到新 Pod 的过程可能会花费几分钟时间。这会影响到系统的弹性伸缩速度,即第一个任务的处理延迟会包含这个数据恢复时间。

未来迭代路径:
如果业务发展,需要更强的写入并发或更低的数据恢复时间,这个架构可以平滑演进。核心的 Vercel -> SQS -> EKS 异步处理流程保持不变,只需将 Worker Pod 的数据层从 SQLite + Litestream 替换为对 RDS AuroraDynamoDB 的直接读写。这种替换是局部的,不影响整个架构的骨架。这证明了当前设计作为一个起点,具备良好的演化潜力,让我们能够在项目初期以极低的成本和极高的读取性能启动,同时保留了未来向更复杂、更强大的数据存储方案迁移的通道。


  目录