一套用于视频分析的CV算法,部署在数十个无状态的EC2实例上,通过AWS SQS消费任务。这个架构在处理独立图片时运行良好,但在切换到视频流分析后,灾难开始了。视频流的每一帧都带有上下文,例如目标跟踪需要在连续帧之间维持状态。当Worker-A处理完video-123的第5帧后,第6帧可能被空闲的Worker-B抢走,导致跟踪状态丢失,分析结果支离破碎。
初步的想法是使用SQS FIFO队列。它能保证消息的顺序性,但它解决不了“处理器亲和性”问题。FIFO队列确保消息按顺序被消费,却无法保证同一个消息组(例如,来自video-123的所有帧)始终由同一个消费者处理。只要有多个消费者,一个消费者处理完frame_5后,frame_6依然可能被另一个消费者取走。我们需要的是一种机制,能将一个视频流的所有帧“锁定”到单个工作节点上,直到该视频流处理完毕。
这就是引入分布式锁的切入点。我们的目标是构建一个轻量级的处理框架,它能让任何一个工作节点在处理一个视频流的首帧前,先获取该视频流ID的全局锁。只要该节点持有锁,其他节点就不能处理这个流的任何帧,从而强制实现了处理器亲和性与状态维护。
我们选择Redis作为分布式锁的实现,因为它速度快,并且有成熟的库支持。基本逻辑如下:
- Worker从SQS拉取一批消息。
- 遍历消息,检查每条消息对应的
video_id。 - 尝试为
video_id获取一个分布式锁。 - 如果获取成功:该Worker成为此视频流的“主控节点”。它将处理这条消息,并继续从队列中寻找属于该
video_id的其他消息进行处理。 - 如果获取失败:意味着其他Worker正在处理此视频流。该Worker必须忽略这条消息,并且不能将其从队列中删除。让消息的可见性超时(Visibility Timeout)自然到期,它会重新出现在队列中,最终被持有锁的那个主控节点发现并处理。
这个模型的核心在于锁的生命周期管理和SQS消息可见性超时的精妙配合。
架构设计与环境配置
我们将使用Python构建这个处理框架。首先是环境依赖。
requirements.txt:
boto3
redis
python-dotenv
opencv-python-headless # 用于模拟CV处理
配置文件 .env:
# AWS Credentials
AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY
AWS_REGION=us-east-1
# SQS Queue URL
SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/cv-frame-queue
# Redis Connection
REDIS_HOST=localhost
REDIS_PORT=6379
# Worker Configuration
LOCK_TTL_SECONDS=60 # 分布式锁的TTL,防止worker崩溃导致死锁
LOCK_RENEWAL_INTERVAL_SECONDS=20 # 锁续期检查间隔
PROCESSING_IDLE_TIMEOUT_SECONDS=30 # 处理一个流时,如果超过这个时间没新帧,就释放锁
消息体结构约定如下,这是一个投递到SQS的JSON字符串:
{
"message_id": "uuid-...",
"video_id": "video-stream-001",
"frame_id": 125,
"timestamp": "2023-10-27T10:00:00Z",
"s3_bucket": "my-video-frames",
"s3_key": "raw/video-stream-001/frame_0125.jpg"
}
分布式锁管理器的实现
一个健壮的分布式锁管理器是整个系统的基石。它不仅要处理加锁和释放,还必须考虑锁的自动过期(防止死锁)和锁续期(防止长任务执行时锁被意外释放)。
# lock_manager.py
import redis
import time
import logging
import threading
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class RedisLockManager:
"""
一个基于Redis的分布式锁管理器,支持自动续期。
真实项目中,你可能会使用更健壮的库,如redlock-py,但这里为了阐述原理,我们手动实现。
"""
def __init__(self, redis_host, redis_port, lock_ttl_seconds=60, renewal_interval_seconds=20):
"""
初始化Redis连接和配置。
:param redis_host: Redis主机
:param redis_port: Redis端口
:param lock_ttl_seconds: 锁的生存时间(TTL),单位秒
:param renewal_interval_seconds: 锁续期间隔,必须小于TTL
"""
self.redis_client = redis.StrictRedis(host=redis_host, port=redis_port, decode_responses=True)
self.lock_ttl = lock_ttl_seconds
self.renewal_interval = renewal_interval_seconds
self.active_locks = {} # 存储当前持有的锁及其续期线程: {'lock_key': renewal_thread}
self._stop_event = threading.Event()
def _lock_renewer(self, lock_key: str):
"""
在后台线程中为给定的锁续期。
"""
while not self._stop_event.is_set():
try:
# 使用pexpire来以毫秒为单位设置过期时间,更精确
# 如果键存在,则续期成功,返回1
if self.redis_client.expire(lock_key, self.lock_ttl):
logging.info(f"[LockRenewer] 成功为锁 '{lock_key}' 续期 {self.lock_ttl} 秒。")
else:
# 如果键不存在,说明锁可能已被手动释放或因某种原因丢失
logging.warning(f"[LockRenewer] 尝试为锁 '{lock_key}' 续期失败,锁可能已不存在。")
break # 停止续期
except redis.exceptions.RedisError as e:
logging.error(f"[LockRenewer] 为锁 '{lock_key}' 续期时发生Redis错误: {e}")
break
# 等待下一个续期间隔
time.sleep(self.renewal_interval)
def acquire(self, lock_key: str, owner_id: str) -> bool:
"""
尝试获取一个分布式锁。
:param lock_key: 锁的唯一标识 (例如 'lock:video-stream-001')
:param owner_id: 锁的持有者标识,用于调试和防止误释放
:return: 如果成功获取锁,返回True,否则返回False
"""
# nx=True 表示只有当key不存在时才设置,这保证了原子性
# ex=self.lock_ttl 表示设置过期时间
is_acquired = self.redis_client.set(lock_key, owner_id, nx=True, ex=self.lock_ttl)
if is_acquired:
logging.info(f"Worker '{owner_id}' 成功获取锁 '{lock_key}'")
# 启动后台续期线程
self._stop_event.clear()
renewal_thread = threading.Thread(target=self._lock_renewer, args=(lock_key,))
renewal_thread.daemon = True
self.active_locks[lock_key] = renewal_thread
renewal_thread.start()
return True
return False
def release(self, lock_key: str, owner_id: str):
"""
释放一个分布式锁。
为了防止误释放,只有当锁的持有者匹配时才执行删除。
这里使用Lua脚本保证操作的原子性。
"""
# 停止续期线程
if lock_key in self.active_locks:
self._stop_event.set() # 发送停止信号
renewal_thread = self.active_locks.pop(lock_key)
renewal_thread.join(timeout=1.0) # 等待线程结束
logging.info(f"已停止锁 '{lock_key}' 的续期线程。")
# Lua脚本确保原子性地检查和删除
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
try:
script = self.redis_client.register_script(lua_script)
result = script(keys=[lock_key], args=[owner_id])
if result == 1:
logging.info(f"Worker '{owner_id}' 成功释放锁 '{lock_key}'")
else:
logging.warning(f"Worker '{owner_id}' 尝试释放锁 '{lock_key}' 失败,锁可能已被他人持有或已过期。")
except redis.exceptions.RedisError as e:
logging.error(f"释放锁 '{lock_key}' 时发生Redis错误: {e}")
def cleanup(self):
"""在worker关闭时,清理所有持有的锁。"""
logging.info("开始清理所有活动的锁...")
# 复制键以避免在迭代时修改字典
for lock_key in list(self.active_locks.keys()):
# 假设worker ID是固定的,这里用一个占位符
self.release(lock_key, "worker-shutdown-cleanup")
核心处理工作节点的实现
这是整个框架的核心,它协调了SQS消息的消费和分布式锁的生命周期。
# worker.py
import boto3
import json
import logging
import os
import time
import uuid
from dotenv import load_dotenv
from lock_manager import RedisLockManager
# 加载环境变量
load_dotenv()
# 日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(worker_id)s - %(levelname)s - %(message)s')
class CVStreamProcessor:
def __init__(self):
self.worker_id = f"worker-{uuid.uuid4().hex[:8]}"
self.extra_log_info = {'worker_id': self.worker_id}
self.logger = logging.LoggerAdapter(logging.getLogger(__name__), self.extra_log_info)
# 初始化AWS SQS客户端
self.sqs_client = boto3.client(
'sqs',
region_name=os.getenv('AWS_REGION'),
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
)
self.queue_url = os.getenv('SQS_QUEUE_URL')
# 初始化分布式锁管理器
self.lock_manager = RedisLockManager(
redis_host=os.getenv('REDIS_HOST'),
redis_port=int(os.getenv('REDIS_PORT')),
lock_ttl_seconds=int(os.getenv('LOCK_TTL_SECONDS')),
renewal_interval_seconds=int(os.getenv('LOCK_RENEWAL_INTERVAL_SECONDS'))
)
self.processing_idle_timeout = int(os.getenv('PROCESSING_IDLE_TIMEOUT_SECONDS'))
# 用于跟踪当前正在处理的视频流及其状态
self.active_streams = {} # format: {'video_id': {'lock_key': '...', 'last_processed_time': ...}}
def _process_frame(self, message_body: dict):
"""
模拟CV处理逻辑。
在真实场景中,这里会包含下载图片、模型推理等耗时操作。
"""
video_id = message_body['video_id']
frame_id = message_body['frame_id']
self.logger.info(f"开始处理 video_id: {video_id}, frame_id: {frame_id}")
# 模拟耗时操作
time.sleep(2)
# 这是一个关键的陷阱:如果处理时间超过锁的TTL且没有续期机制,锁将被释放,导致其他worker介入。
# 我们的LockManager已经通过后台线程解决了这个问题。
self.logger.info(f"完成处理 video_id: {video_id}, frame_id: {frame_id}")
return True
def _delete_sqs_message(self, receipt_handle: str):
"""从SQS队列中删除已处理的消息。"""
try:
self.sqs_client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
self.logger.error(f"删除消息失败 (Receipt: {receipt_handle}): {e}")
def run(self):
"""主循环,持续从SQS拉取和处理消息。"""
self.logger.info("Worker已启动,开始监听SQS队列...")
while True:
try:
# 1. 拉取消息
response = self.sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10, # 一次拉取多条以提高效率
WaitTimeSeconds=20, # 长轮询
AttributeNames=['All']
)
messages = response.get('Messages', [])
if not messages:
# 如果没有新消息,检查是否有超时的流需要释放锁
self._check_and_release_idle_locks()
continue
# 2. 消息分类和锁获取
for msg in messages:
receipt_handle = msg['ReceiptHandle']
try:
body = json.loads(msg['Body'])
video_id = body['video_id']
lock_key = f"lock:{video_id}"
# 检查是否已持有该流的锁
if video_id in self.active_streams:
self.logger.info(f"已持有锁 '{lock_key}', 直接处理帧 {body['frame_id']}")
if self._process_frame(body):
self._delete_sqs_message(receipt_handle)
self.active_streams[video_id]['last_processed_time'] = time.time()
continue
# 尝试获取锁
if self.lock_manager.acquire(lock_key, self.worker_id):
self.logger.info(f"新获得锁 '{lock_key}', 开始处理视频流 '{video_id}'")
self.active_streams[video_id] = {
'lock_key': lock_key,
'last_processed_time': time.time()
}
if self._process_frame(body):
self._delete_sqs_message(receipt_handle)
else:
# 获取锁失败,说明其他worker正在处理。
# 不做任何事,让消息的Visibility Timeout过期后重新可见。
self.logger.info(f"获取锁 '{lock_key}' 失败,跳过消息。")
except (json.JSONDecodeError, KeyError) as e:
self.logger.error(f"消息格式错误: {e}. 将消息发送到死信队列(通过SQS配置实现)")
# 消息格式错误,直接删除,防止队列阻塞
self._delete_sqs_message(receipt_handle)
except Exception as e:
self.logger.critical(f"处理消息时发生未知严重错误: {e}")
# 3. 处理完一批消息后,检查并释放空闲锁
self._check_and_release_idle_locks()
except Exception as e:
self.logger.error(f"主循环发生异常: {e}")
time.sleep(5) # 发生异常时短暂休眠,避免CPU空转
def _check_and_release_idle_locks(self):
"""检查当前持有的锁,如果某个流在一段时间内没有新消息,则释放锁。"""
now = time.time()
# 创建副本以安全地在循环中修改字典
for video_id, state in list(self.active_streams.items()):
if now - state['last_processed_time'] > self.processing_idle_timeout:
self.logger.info(f"视频流 '{video_id}' 空闲超时,释放锁 '{state['lock_key']}'")
self.lock_manager.release(state['lock_key'], self.worker_id)
del self.active_streams[video_id]
if __name__ == '__main__':
processor = CVStreamProcessor()
try:
processor.run()
except KeyboardInterrupt:
processor.logger.info("接收到关闭信号,正在清理资源...")
processor.lock_manager.cleanup()
架构流程的可视化
我们可以使用Mermaid.js来描绘一个Worker获取锁并处理消息的成功序列。
sequenceDiagram
participant Worker A
participant SQS
participant Redis
Worker A->>SQS: receive_message()
SQS-->>Worker A: [msg1(v1, f1), msg2(v2, f1)]
Worker A->>Redis: SET lock:video-1 nx ex 60
Note right of Worker A: 尝试为 video-1 获取锁
Redis-->>Worker A: OK (获取成功)
Worker A->>Worker A: 启动后台线程为 lock:video-1 续期
Worker A->>Worker A: process_frame(v1, f1)
Worker A->>SQS: delete_message(msg1)
Worker A->>Redis: SET lock:video-2 nx ex 60
Note right of Worker A: 尝试为 video-2 获取锁
Redis-->>Worker A: nil (获取失败, 被其他Worker持有)
Note right of Worker A: 忽略 msg2, 等待其超时后重现
Worker A->>SQS: receive_message()
SQS-->>Worker A: [msg3(v1, f2)]
Note right of Worker A: 已持有 lock:video-1
Worker A->>Worker A: process_frame(v1, f2)
Worker A->>SQS: delete_message(msg3)
loop 空闲超时检查
Note over Worker A: 超过30秒没有 video-1 的新帧
Worker A->>Redis: DEL lock:video-1
Note right of Worker A: 释放锁
end
单元测试思路
对这种分布式系统进行测试是复杂的,但我们可以对关键组件进行单元测试。
- LockManager测试:
-
test_acquire_and_release: 模拟一个客户端成功获取锁,然后成功释放。验证Redis中key的创建和删除。 -
test_acquire_locked: 模拟客户端A获取锁后,客户端B尝试获取同一个锁,应失败。 -
test_lock_ttl: 客户端A获取锁后,不续期,等待超过TTL时间,验证Redis中的key自动过期。然后客户端B应能成功获取该锁。 test_lock_renewal: 客户端A获取锁,验证续期线程能成功延长锁的TTL。需要使用mock来控制time.sleep。
.
-
- Worker逻辑测试:
- 使用
moto库来模拟AWS SQS服务。 -
test_process_new_stream: 模拟队列中有一个新视频流的消息,验证worker能成功获取锁、处理消息、删除消息,并将video_id加入active_streams。 -
test_process_existing_stream: 模拟worker已持有一个锁,队列中来了属于该流的新消息,验证worker能直接处理并删除。 -
test_skip_locked_stream: 模拟队列中的消息对应的锁已被其他worker持有(通过预先在mock的Redis中设置锁),验证worker会跳过该消息,并且不会删除它。 -
test_idle_lock_release: 模拟一个流处理完成后,在指定时间内没有新消息,验证worker会自动释放锁并从active_streams中移除。
- 使用
方案的局限性与未来展望
此架构虽然解决了核心的顺序处理和状态维持问题,但并非没有代价和局限性。
首先,引入Redis增加了系统的复杂性和一个单点故障(尽管Redis可以通过哨兵或集群模式实现高可用)。每次处理新视频流前都需要一次网络往返来获取锁,这会带来微小的延迟。
其次,这种“抢占式”的锁机制可能导致某些worker过载,而其他worker相对空闲。如果某个视频流的帧率极高,持有该流锁的worker将持续繁忙,而其他worker可能因为无法获取该锁而无法处理该流的消息,造成资源分配不均。
最后,锁的TTL和续期机制需要仔细调优。TTL太短,长任务可能因续期失败而中断;TTL太长,worker崩溃后需要更长时间才能恢复处理。processing_idle_timeout也需要根据业务场景的帧间隔来设定,设得太短可能过早释放锁,太长则会延迟其他worker接管的机会。
一个可能的优化路径是,放弃纯粹的抢占模型,引入一个轻量级的调度器或协调器。该协调器可以基于worker的负载和视频流的特性,主动地将某个视频流“分配”给一个特定的worker一段时间,而不是让所有worker去竞争。这可以通过在Redis中维护一个video_id -> worker_id的映射表实现,但这将演变成一个更复杂的自定义调度系统。