基于 AWS SQS 与分布式锁实现有状态 CV 视频流的顺序处理架构


一套用于视频分析的CV算法,部署在数十个无状态的EC2实例上,通过AWS SQS消费任务。这个架构在处理独立图片时运行良好,但在切换到视频流分析后,灾难开始了。视频流的每一帧都带有上下文,例如目标跟踪需要在连续帧之间维持状态。当Worker-A处理完video-123的第5帧后,第6帧可能被空闲的Worker-B抢走,导致跟踪状态丢失,分析结果支离破碎。

初步的想法是使用SQS FIFO队列。它能保证消息的顺序性,但它解决不了“处理器亲和性”问题。FIFO队列确保消息按顺序被消费,却无法保证同一个消息组(例如,来自video-123的所有帧)始终由同一个消费者处理。只要有多个消费者,一个消费者处理完frame_5后,frame_6依然可能被另一个消费者取走。我们需要的是一种机制,能将一个视频流的所有帧“锁定”到单个工作节点上,直到该视频流处理完毕。

这就是引入分布式锁的切入点。我们的目标是构建一个轻量级的处理框架,它能让任何一个工作节点在处理一个视频流的首帧前,先获取该视频流ID的全局锁。只要该节点持有锁,其他节点就不能处理这个流的任何帧,从而强制实现了处理器亲和性与状态维护。

我们选择Redis作为分布式锁的实现,因为它速度快,并且有成熟的库支持。基本逻辑如下:

  1. Worker从SQS拉取一批消息。
  2. 遍历消息,检查每条消息对应的video_id
  3. 尝试为video_id获取一个分布式锁。
  4. 如果获取成功:该Worker成为此视频流的“主控节点”。它将处理这条消息,并继续从队列中寻找属于该video_id的其他消息进行处理。
  5. 如果获取失败:意味着其他Worker正在处理此视频流。该Worker必须忽略这条消息,并且不能将其从队列中删除。让消息的可见性超时(Visibility Timeout)自然到期,它会重新出现在队列中,最终被持有锁的那个主控节点发现并处理。

这个模型的核心在于锁的生命周期管理和SQS消息可见性超时的精妙配合。

架构设计与环境配置

我们将使用Python构建这个处理框架。首先是环境依赖。

requirements.txt:

boto3
redis
python-dotenv
opencv-python-headless # 用于模拟CV处理

配置文件 .env:

# AWS Credentials
AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY
AWS_REGION=us-east-1

# SQS Queue URL
SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/cv-frame-queue

# Redis Connection
REDIS_HOST=localhost
REDIS_PORT=6379

# Worker Configuration
LOCK_TTL_SECONDS=60 # 分布式锁的TTL,防止worker崩溃导致死锁
LOCK_RENEWAL_INTERVAL_SECONDS=20 # 锁续期检查间隔
PROCESSING_IDLE_TIMEOUT_SECONDS=30 # 处理一个流时,如果超过这个时间没新帧,就释放锁

消息体结构约定如下,这是一个投递到SQS的JSON字符串:

{
    "message_id": "uuid-...",
    "video_id": "video-stream-001",
    "frame_id": 125,
    "timestamp": "2023-10-27T10:00:00Z",
    "s3_bucket": "my-video-frames",
    "s3_key": "raw/video-stream-001/frame_0125.jpg"
}

分布式锁管理器的实现

一个健壮的分布式锁管理器是整个系统的基石。它不仅要处理加锁和释放,还必须考虑锁的自动过期(防止死锁)和锁续期(防止长任务执行时锁被意外释放)。

# lock_manager.py
import redis
import time
import logging
import threading

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class RedisLockManager:
    """
    一个基于Redis的分布式锁管理器,支持自动续期。
    真实项目中,你可能会使用更健壮的库,如redlock-py,但这里为了阐述原理,我们手动实现。
    """
    def __init__(self, redis_host, redis_port, lock_ttl_seconds=60, renewal_interval_seconds=20):
        """
        初始化Redis连接和配置。
        
        :param redis_host: Redis主机
        :param redis_port: Redis端口
        :param lock_ttl_seconds: 锁的生存时间(TTL),单位秒
        :param renewal_interval_seconds: 锁续期间隔,必须小于TTL
        """
        self.redis_client = redis.StrictRedis(host=redis_host, port=redis_port, decode_responses=True)
        self.lock_ttl = lock_ttl_seconds
        self.renewal_interval = renewal_interval_seconds
        self.active_locks = {}  # 存储当前持有的锁及其续期线程: {'lock_key': renewal_thread}
        self._stop_event = threading.Event()

    def _lock_renewer(self, lock_key: str):
        """
        在后台线程中为给定的锁续期。
        """
        while not self._stop_event.is_set():
            try:
                # 使用pexpire来以毫秒为单位设置过期时间,更精确
                # 如果键存在,则续期成功,返回1
                if self.redis_client.expire(lock_key, self.lock_ttl):
                    logging.info(f"[LockRenewer] 成功为锁 '{lock_key}' 续期 {self.lock_ttl} 秒。")
                else:
                    # 如果键不存在,说明锁可能已被手动释放或因某种原因丢失
                    logging.warning(f"[LockRenewer] 尝试为锁 '{lock_key}' 续期失败,锁可能已不存在。")
                    break # 停止续期
            except redis.exceptions.RedisError as e:
                logging.error(f"[LockRenewer] 为锁 '{lock_key}' 续期时发生Redis错误: {e}")
                break
            
            # 等待下一个续期间隔
            time.sleep(self.renewal_interval)

    def acquire(self, lock_key: str, owner_id: str) -> bool:
        """
        尝试获取一个分布式锁。
        
        :param lock_key: 锁的唯一标识 (例如 'lock:video-stream-001')
        :param owner_id: 锁的持有者标识,用于调试和防止误释放
        :return: 如果成功获取锁,返回True,否则返回False
        """
        # nx=True 表示只有当key不存在时才设置,这保证了原子性
        # ex=self.lock_ttl 表示设置过期时间
        is_acquired = self.redis_client.set(lock_key, owner_id, nx=True, ex=self.lock_ttl)
        
        if is_acquired:
            logging.info(f"Worker '{owner_id}' 成功获取锁 '{lock_key}'")
            # 启动后台续期线程
            self._stop_event.clear()
            renewal_thread = threading.Thread(target=self._lock_renewer, args=(lock_key,))
            renewal_thread.daemon = True
            self.active_locks[lock_key] = renewal_thread
            renewal_thread.start()
            return True
            
        return False

    def release(self, lock_key: str, owner_id: str):
        """
        释放一个分布式锁。
        为了防止误释放,只有当锁的持有者匹配时才执行删除。
        这里使用Lua脚本保证操作的原子性。
        """
        # 停止续期线程
        if lock_key in self.active_locks:
            self._stop_event.set() # 发送停止信号
            renewal_thread = self.active_locks.pop(lock_key)
            renewal_thread.join(timeout=1.0) # 等待线程结束
            logging.info(f"已停止锁 '{lock_key}' 的续期线程。")

        # Lua脚本确保原子性地检查和删除
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        try:
            script = self.redis_client.register_script(lua_script)
            result = script(keys=[lock_key], args=[owner_id])
            if result == 1:
                logging.info(f"Worker '{owner_id}' 成功释放锁 '{lock_key}'")
            else:
                logging.warning(f"Worker '{owner_id}' 尝试释放锁 '{lock_key}' 失败,锁可能已被他人持有或已过期。")
        except redis.exceptions.RedisError as e:
            logging.error(f"释放锁 '{lock_key}' 时发生Redis错误: {e}")

    def cleanup(self):
        """在worker关闭时,清理所有持有的锁。"""
        logging.info("开始清理所有活动的锁...")
        # 复制键以避免在迭代时修改字典
        for lock_key in list(self.active_locks.keys()):
            # 假设worker ID是固定的,这里用一个占位符
            self.release(lock_key, "worker-shutdown-cleanup")

核心处理工作节点的实现

这是整个框架的核心,它协调了SQS消息的消费和分布式锁的生命周期。

# worker.py
import boto3
import json
import logging
import os
import time
import uuid
from dotenv import load_dotenv

from lock_manager import RedisLockManager

# 加载环境变量
load_dotenv()

# 日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(worker_id)s - %(levelname)s - %(message)s')

class CVStreamProcessor:
    def __init__(self):
        self.worker_id = f"worker-{uuid.uuid4().hex[:8]}"
        self.extra_log_info = {'worker_id': self.worker_id}
        self.logger = logging.LoggerAdapter(logging.getLogger(__name__), self.extra_log_info)

        # 初始化AWS SQS客户端
        self.sqs_client = boto3.client(
            'sqs',
            region_name=os.getenv('AWS_REGION'),
            aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
            aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
        )
        self.queue_url = os.getenv('SQS_QUEUE_URL')

        # 初始化分布式锁管理器
        self.lock_manager = RedisLockManager(
            redis_host=os.getenv('REDIS_HOST'),
            redis_port=int(os.getenv('REDIS_PORT')),
            lock_ttl_seconds=int(os.getenv('LOCK_TTL_SECONDS')),
            renewal_interval_seconds=int(os.getenv('LOCK_RENEWAL_INTERVAL_SECONDS'))
        )
        
        self.processing_idle_timeout = int(os.getenv('PROCESSING_IDLE_TIMEOUT_SECONDS'))
        # 用于跟踪当前正在处理的视频流及其状态
        self.active_streams = {}  # format: {'video_id': {'lock_key': '...', 'last_processed_time': ...}}

    def _process_frame(self, message_body: dict):
        """
        模拟CV处理逻辑。
        在真实场景中,这里会包含下载图片、模型推理等耗时操作。
        """
        video_id = message_body['video_id']
        frame_id = message_body['frame_id']
        self.logger.info(f"开始处理 video_id: {video_id}, frame_id: {frame_id}")
        # 模拟耗时操作
        time.sleep(2) 
        # 这是一个关键的陷阱:如果处理时间超过锁的TTL且没有续期机制,锁将被释放,导致其他worker介入。
        # 我们的LockManager已经通过后台线程解决了这个问题。
        self.logger.info(f"完成处理 video_id: {video_id}, frame_id: {frame_id}")
        return True

    def _delete_sqs_message(self, receipt_handle: str):
        """从SQS队列中删除已处理的消息。"""
        try:
            self.sqs_client.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=receipt_handle
            )
        except Exception as e:
            self.logger.error(f"删除消息失败 (Receipt: {receipt_handle}): {e}")

    def run(self):
        """主循环,持续从SQS拉取和处理消息。"""
        self.logger.info("Worker已启动,开始监听SQS队列...")
        while True:
            try:
                # 1. 拉取消息
                response = self.sqs_client.receive_message(
                    QueueUrl=self.queue_url,
                    MaxNumberOfMessages=10,  # 一次拉取多条以提高效率
                    WaitTimeSeconds=20,     # 长轮询
                    AttributeNames=['All']
                )

                messages = response.get('Messages', [])
                if not messages:
                    # 如果没有新消息,检查是否有超时的流需要释放锁
                    self._check_and_release_idle_locks()
                    continue

                # 2. 消息分类和锁获取
                for msg in messages:
                    receipt_handle = msg['ReceiptHandle']
                    try:
                        body = json.loads(msg['Body'])
                        video_id = body['video_id']
                        lock_key = f"lock:{video_id}"

                        # 检查是否已持有该流的锁
                        if video_id in self.active_streams:
                            self.logger.info(f"已持有锁 '{lock_key}', 直接处理帧 {body['frame_id']}")
                            if self._process_frame(body):
                                self._delete_sqs_message(receipt_handle)
                                self.active_streams[video_id]['last_processed_time'] = time.time()
                            continue
                        
                        # 尝试获取锁
                        if self.lock_manager.acquire(lock_key, self.worker_id):
                            self.logger.info(f"新获得锁 '{lock_key}', 开始处理视频流 '{video_id}'")
                            self.active_streams[video_id] = {
                                'lock_key': lock_key,
                                'last_processed_time': time.time()
                            }
                            if self._process_frame(body):
                                self._delete_sqs_message(receipt_handle)
                        else:
                            # 获取锁失败,说明其他worker正在处理。
                            # 不做任何事,让消息的Visibility Timeout过期后重新可见。
                            self.logger.info(f"获取锁 '{lock_key}' 失败,跳过消息。")

                    except (json.JSONDecodeError, KeyError) as e:
                        self.logger.error(f"消息格式错误: {e}. 将消息发送到死信队列(通过SQS配置实现)")
                        # 消息格式错误,直接删除,防止队列阻塞
                        self._delete_sqs_message(receipt_handle)
                    except Exception as e:
                        self.logger.critical(f"处理消息时发生未知严重错误: {e}")

                # 3. 处理完一批消息后,检查并释放空闲锁
                self._check_and_release_idle_locks()

            except Exception as e:
                self.logger.error(f"主循环发生异常: {e}")
                time.sleep(5) # 发生异常时短暂休眠,避免CPU空转

    def _check_and_release_idle_locks(self):
        """检查当前持有的锁,如果某个流在一段时间内没有新消息,则释放锁。"""
        now = time.time()
        # 创建副本以安全地在循环中修改字典
        for video_id, state in list(self.active_streams.items()):
            if now - state['last_processed_time'] > self.processing_idle_timeout:
                self.logger.info(f"视频流 '{video_id}' 空闲超时,释放锁 '{state['lock_key']}'")
                self.lock_manager.release(state['lock_key'], self.worker_id)
                del self.active_streams[video_id]

if __name__ == '__main__':
    processor = CVStreamProcessor()
    try:
        processor.run()
    except KeyboardInterrupt:
        processor.logger.info("接收到关闭信号,正在清理资源...")
        processor.lock_manager.cleanup()

架构流程的可视化

我们可以使用Mermaid.js来描绘一个Worker获取锁并处理消息的成功序列。

sequenceDiagram
    participant Worker A
    participant SQS
    participant Redis

    Worker A->>SQS: receive_message()
    SQS-->>Worker A: [msg1(v1, f1), msg2(v2, f1)]

    Worker A->>Redis: SET lock:video-1 nx ex 60
    Note right of Worker A: 尝试为 video-1 获取锁
    Redis-->>Worker A: OK (获取成功)
    Worker A->>Worker A: 启动后台线程为 lock:video-1 续期

    Worker A->>Worker A: process_frame(v1, f1)
    Worker A->>SQS: delete_message(msg1)
    
    Worker A->>Redis: SET lock:video-2 nx ex 60
    Note right of Worker A: 尝试为 video-2 获取锁
    Redis-->>Worker A: nil (获取失败, 被其他Worker持有)
    Note right of Worker A: 忽略 msg2, 等待其超时后重现

    Worker A->>SQS: receive_message()
    SQS-->>Worker A: [msg3(v1, f2)]
    
    Note right of Worker A: 已持有 lock:video-1
    Worker A->>Worker A: process_frame(v1, f2)
    Worker A->>SQS: delete_message(msg3)
    
    loop 空闲超时检查
        Note over Worker A: 超过30秒没有 video-1 的新帧
        Worker A->>Redis: DEL lock:video-1
        Note right of Worker A: 释放锁
    end

单元测试思路

对这种分布式系统进行测试是复杂的,但我们可以对关键组件进行单元测试。

  1. LockManager测试:
    • test_acquire_and_release: 模拟一个客户端成功获取锁,然后成功释放。验证Redis中key的创建和删除。
    • test_acquire_locked: 模拟客户端A获取锁后,客户端B尝试获取同一个锁,应失败。
    • test_lock_ttl: 客户端A获取锁后,不续期,等待超过TTL时间,验证Redis中的key自动过期。然后客户端B应能成功获取该锁。
    • test_lock_renewal: 客户端A获取锁,验证续期线程能成功延长锁的TTL。需要使用mock来控制time.sleep
      .
  2. Worker逻辑测试:
    • 使用moto库来模拟AWS SQS服务。
    • test_process_new_stream: 模拟队列中有一个新视频流的消息,验证worker能成功获取锁、处理消息、删除消息,并将video_id加入active_streams
    • test_process_existing_stream: 模拟worker已持有一个锁,队列中来了属于该流的新消息,验证worker能直接处理并删除。
    • test_skip_locked_stream: 模拟队列中的消息对应的锁已被其他worker持有(通过预先在mock的Redis中设置锁),验证worker会跳过该消息,并且不会删除它。
    • test_idle_lock_release: 模拟一个流处理完成后,在指定时间内没有新消息,验证worker会自动释放锁并从active_streams中移除。

方案的局限性与未来展望

此架构虽然解决了核心的顺序处理和状态维持问题,但并非没有代价和局限性。

首先,引入Redis增加了系统的复杂性和一个单点故障(尽管Redis可以通过哨兵或集群模式实现高可用)。每次处理新视频流前都需要一次网络往返来获取锁,这会带来微小的延迟。

其次,这种“抢占式”的锁机制可能导致某些worker过载,而其他worker相对空闲。如果某个视频流的帧率极高,持有该流锁的worker将持续繁忙,而其他worker可能因为无法获取该锁而无法处理该流的消息,造成资源分配不均。

最后,锁的TTL和续期机制需要仔细调优。TTL太短,长任务可能因续期失败而中断;TTL太长,worker崩溃后需要更长时间才能恢复处理。processing_idle_timeout也需要根据业务场景的帧间隔来设定,设得太短可能过早释放锁,太长则会延迟其他worker接管的机会。

一个可能的优化路径是,放弃纯粹的抢占模型,引入一个轻量级的调度器或协调器。该协调器可以基于worker的负载和视频流的特性,主动地将某个视频流“分配”给一个特定的worker一段时间,而不是让所有worker去竞争。这可以通过在Redis中维护一个video_id -> worker_id的映射表实现,但这将演变成一个更复杂的自定义调度系统。


  目录