在为一组内部 Serverless API 提供安全防护时,我们评估了标准的 AWS WAF。它功能强大,但对于我们这种需要高度定制化业务规则、且预算敏感的项目而言,其成本模型和规则集的僵化成了主要障碍。常规的 AWS WAF 规则按月计费,每条规则处理的请求数也单独计费,当规则集变得复杂时,成本会线性攀升。更重要的是,我们的一些防护逻辑需要根据业务上下文动态调整,例如,只对特定租户的特定API端点启用SQL注入检测,这种精细化控制在标准WAF中难以实现。
技术痛点很明确:我们需要一个低成本、可编程、与 API Gateway 无缝集成的 Web Application Firewall (WAF) 层。它必须能够轻松地由开发团队维护,并且其规则能够像应用程序代码一样进行版本控制。
我们的初步构想是利用 AWS Lambda 的事件驱动特性,在请求到达业务逻辑 Lambda 之前插入一个安全检查层。API Gateway 的 Lambda Authorizer (曾称 Custom Authorizer) 似乎是实现这一点的理想挂载点。它天生就是为了在 API Gateway 层面执行自定义认证和授权逻辑,但其本质是一个同步调用的 Lambda,完全可以扩展用于执行任何前置检查,包括安全过滤。如果这个 Authorizer Lambda 能返回一个 Deny 策略,API Gateway 就会直接以 403 Forbidden 拒绝请求,实现了 WAF 的核心功能。
技术选型与架构决策
我们决定使用 Python 作为实现语言,因为它在 Lambda 环境中性能均衡且生态成熟。为了避免在 Lambda handler 中编写混乱的 if/else 逻辑,我们引入了 Flask。这看起来有些反直觉——在一个无服务器函数中运行一个 Web 框架。但这里的关键在于,我们并不运行 Flask 的 HTTP 服务器,而是利用其强大的上下文管理、请求对象封装和蓝图(Blueprints)等特性来组织我们的 WAF 规则逻辑。这让我们的 WAF 代码结构清晰,易于测试和扩展,就像一个迷你的 Web 应用。
最终的请求处理流程如下:
sequenceDiagram
participant Client as 客户端
participant APIGW as API Gateway
participant WafLambda as WAF Lambda (Authorizer)
participant AppLambda as 业务逻辑 Lambda
Client->>+APIGW: 发起 API 请求 (携带 Headers, Query Params)
APIGW->>+WafLambda: 调用 Lambda Authorizer, 传入请求上下文
Note over WafLambda: 1. Flask App 初始化
2. 将 API GW 事件转换为 Flask Request
3. 执行规则链
WafLambda-->>-APIGW: 返回 IAM Policy (Allow / Deny)
alt 请求被拒绝
APIGW-->>Client: 返回 403 Forbidden
else 请求被允许
APIGW->>+AppLambda: 转发原始请求
AppLambda-->>-APIGW: 返回业务处理结果
APIGW-->>-Client: 返回最终响应
end
这种架构的核心优势在于:
- 可编程性: 所有WAF规则都是Python代码,可以实现任意复杂的逻辑。
- 成本效益: 只为实际执行的请求付费,没有固定的月费。对于流量不均的应用,成本优势明显。
- 无缝集成: 作为 Lambda Authorizer,它与 API Gateway 原生集成,无需对下游服务做任何改造。
- 可维护性: 使用 Flask 组织代码,并可将规则配置外部化存储,便于管理和迭代。
核心实现:构建 Flask-based WAF Lambda
我们的项目结构如下:
waf-lambda/
├── app/
│ ├── __init__.py # Flask App 工厂
│ ├── rules/ # 规则引擎目录
│ │ ├── __init__.py
│ │ ├── base.py # 规则基类
│ │ ├── sql_injection.py # SQL注入检测规则
│ │ └── header_check.py # 请求头校验规则
│ ├── services/
│ │ └── config_loader.py # 规则配置加载器
│ └── main.py # Flask 路由和核心逻辑
├── handler.py # Lambda handler 入口
├── serverless.yml # Serverless Framework 配置文件
├── requirements.txt
└── tests/
├── ...
1. Lambda Handler 与 Flask App 的桥梁
handler.py 是 Lambda 的入口点,它的职责是将 API Gateway 传递过来的 event 对象转换为 Flask 应用能够理解的 WSGI 请求。我们使用 serverless-wsgi 这个库来简化这个过程。
# handler.py
import serverless_wsgi
from app.main import app
def handle(event, context):
"""
Lambda handler function.
It uses serverless-wsgi to handle the API Gateway event
and pass it to the Flask application.
"""
# 这里我们只关心 Authorizer event, 并且将其转换为 Flask 能处理的格式
# Lambda Authorizer 的事件结构与标准的 Proxy Integration 不同
# 为了复用 Flask 的 request 对象,我们需要手动构造一个类似 HTTP 请求的上下文
# 构造一个伪路径,以便 Flask 路由匹配
path = f"/check/{event.get('httpMethod', 'GET')}{event.get('path', '/')}"
# 将关键信息放入 headers,供 Flask 内部访问
headers = event.get('headers', {})
headers['X-Request-Context-MethodArn'] = event.get('methodArn')
# 将 event 包装成 wsgi 兼容的格式
wsgi_event = {
"path": path,
"httpMethod": "GET", # Authorizer 总是 GET
"headers": headers,
"queryStringParameters": event.get('queryStringParameters', {}),
"requestContext": event.get('requestContext', {})
}
return serverless_wsgi.handle(wsgi_event, context, app)
2. Flask 应用核心与规则引擎
app/main.py 是我们的 Flask 应用核心。它不处理真正的 HTTP 响应,而是利用 Flask 的请求处理流程来执行 WAF 规则。它的最终输出是一个包含 IAM Policy 的 JSON 字符串。
# app/main.py
import json
import logging
from flask import Flask, request, jsonify
from .services.config_loader import ConfigLoader
from .rules import get_all_rules
# 初始化日志
logger = logging.getLogger()
logger.setLevel(logging.INFO)
app = Flask(__name__)
config_loader = ConfigLoader()
# 加载所有定义的规则类
ALL_RULES = {rule.__name__: rule for rule in get_all_rules()}
def generate_policy(principal_id, effect, resource):
"""生成 API Gateway 所需的 IAM Policy 文档"""
return {
'principalId': principal_id,
'policyDocument': {
'Version': '2012-10-17',
'Statement': [{
'Action': 'execute-api:Invoke',
'Effect': effect,
'Resource': resource
}]
}
}
@app.route('/check/<path:subpath>', methods=['GET'])
def check_request(subpath):
"""
核心 WAF 检查路由。所有 Authorizer 请求都将进入这里。
"""
# 从伪造的请求头中获取真实的 methodArn
method_arn = request.headers.get('X-Request-Context-MethodArn')
if not method_arn:
logger.error("Method ARN not found in request context.")
# Fail-closed: 出现内部错误时,默认拒绝请求
return jsonify(generate_policy('user', 'Deny', '*'))
try:
# 从配置服务加载当前 API (methodArn) 需要应用的规则
# 这里的规则配置可以来自 S3, DynamoDB, 或 SSM Parameter Store
active_rules_config = config_loader.get_rules_for_resource(method_arn)
# 遍历并执行规则
for rule_config in active_rules_config:
rule_name = rule_config.get('name')
params = rule_config.get('params', {})
rule_class = ALL_RULES.get(rule_name)
if not rule_class:
logger.warning(f"Rule '{rule_name}' not found. Skipping.")
continue
# 实例化规则并执行检查
rule_instance = rule_class(**params)
is_violated, reason = rule_instance.check(request)
if is_violated:
# 规则被触发,记录详细日志并立即拒绝请求
log_payload = {
"message": "WAF rule violated",
"rule_name": rule_name,
"reason": reason,
"source_ip": request.headers.get('X-Forwarded-For', '').split(',')[0],
"path": subpath,
"method_arn": method_arn
}
# 使用结构化日志
logger.info(json.dumps(log_payload))
# 返回 Deny Policy
return jsonify(generate_policy('user', 'Deny', method_arn))
# 所有规则检查通过
logger.debug(f"All WAF checks passed for {method_arn}")
return jsonify(generate_policy('user', 'Allow', method_arn))
except Exception as e:
logger.exception(f"An unexpected error occurred during WAF check for {method_arn}")
# 在生产中,需要有更精细的错误处理和告警机制
# 默认采用 fail-closed 策略保证安全
return jsonify(generate_policy('user', 'Deny', method_arn))
3. 可扩展的规则设计
每个规则都是一个独立的类,继承自一个基类,这使得添加新规则变得非常简单。
# app/rules/base.py
from abc import ABC, abstractmethod
from flask import Request
class BaseRule(ABC):
"""所有 WAF 规则的抽象基类"""
def __init__(self, **kwargs):
# 可以在这里接收规则的参数
pass
@abstractmethod
def check(self, request: Request) -> (bool, str):
"""
执行检查逻辑。
:param request: Flask request object.
:return: A tuple (is_violated, reason).
is_violated (bool): True if the rule is violated.
reason (str): A description of why it was violated.
"""
raise NotImplementedError
# app/rules/__init__.py
# 动态加载所有规则
import pkgutil
import inspect
from .base import BaseRule
def get_all_rules():
rules = []
for loader, name, is_pkg in pkgutil.walk_packages(__path__):
module = loader.find_module(name).load_module(name)
for member_name, member_obj in inspect.getmembers(module):
if inspect.isclass(member_obj) and issubclass(member_obj, BaseRule) and member_obj is not BaseRule:
rules.append(member_obj)
return rules
一个具体的SQL注入检测规则示例如下,这里我们使用一个简化的正则匹配作为演示。在真实项目中,应该使用更成熟的库。
# app/rules/sql_injection.py
import re
from flask import Request
from .base import BaseRule
class SQLInjectionRule(BaseRule):
"""
一个基础的 SQL 注入模式检测规则。
"""
def __init__(self, sensitivity='medium'):
super().__init__()
# 这是一个非常简化的示例模式
# 生产环境需要更复杂、更全面的模式库
self.patterns = [
r"(\%27)|(\')|(\-\-)|(\%23)|(#)", # basic SQL comment and string termination
r"\b(ALTER|CREATE|DELETE|DROP|EXEC|INSERT|MERGE|SELECT|UPDATE|UNION)\b" # SQL keywords
]
self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.patterns]
def check(self, request: Request) -> (bool, str):
# 检查所有查询参数
for key, value in request.args.items():
for pattern in self.compiled_patterns:
if pattern.search(value):
reason = f"SQLi pattern '{pattern.pattern}' found in query parameter '{key}'"
return True, reason
# 生产环境中还应检查请求体、路径、部分Headers
# (此处为简化示例,省略了对 body 的检查)
return False, ""
4. 外部化规则配置
硬编码规则是不可接受的。我们需要一个机制来动态加载规则。这里我们设计一个 ConfigLoader,它可以从 AWS SSM Parameter Store 或 S3 文件中读取配置。
# app/services/config_loader.py
import os
import json
import boto3
from functools import lru_cache
class ConfigLoader:
"""
负责加载 WAF 规则配置。
配置可以存储在 S3 或 SSM Parameter Store 中。
"""
def __init__(self):
self.config_source = os.environ.get('CONFIG_SOURCE', 'SSM')
self.config_path = os.environ.get('CONFIG_PATH', '/my-app/waf/rules')
# 在 Lambda 的全局作用域初始化 boto3 客户端以复用连接
if self.config_source == 'SSM':
self.client = boto3.client('ssm')
elif self.config_source == 'S3':
# S3 client initialization...
pass
@lru_cache(maxsize=1) # 缓存配置以避免每次调用都重新拉取
def _load_config(self):
"""
从源加载原始配置。lru_cache 保证在单个 Lambda 容器生命周期内只加载一次。
"""
if self.config_source == 'SSM':
try:
parameter = self.client.get_parameter(Name=self.config_path, WithDecryption=True)
return json.loads(parameter['Parameter']['Value'])
except Exception as e:
# 关键:配置加载失败时,必须有降级策略
# 可能是返回一个空的规则集,或者一个最小化的安全规则集
print(f"Failed to load config from SSM: {e}")
return {"default": []} # 降级为无规则
# S3 加载逻辑...
return {"default": []}
def get_rules_for_resource(self, method_arn: str) -> list:
"""
根据 method_arn 匹配适用的规则集。
支持通配符匹配。
"""
config = self._load_config()
# 这里的匹配逻辑可以非常灵活,例如:
# arn:aws:execute-api:us-east-1:123456789012:abcdef123/prod/POST/users
# 精确匹配
if method_arn in config:
return config[method_arn]
# 通配符匹配 (简化示例)
parts = method_arn.split('/')
# e.g., arn:aws:execute-api:us-east-1:...:abcdef123/prod/POST/*
wildcard_arn = '/'.join(parts[:-1]) + '/*'
if wildcard_arn in config:
return config[wildcard_arn]
# 返回默认规则集
return config.get("default", [])
一个示例的 SSM 参数存储的 JSON 配置:
{
"default": [
{ "name": "HeaderCheckRule", "params": { "required_headers": ["X-Request-ID"] } }
],
"arn:aws:execute-api:us-east-1:*:*/prod/POST/sensitive_data": [
{ "name": "HeaderCheckRule", "params": { "required_headers": ["X-Request-ID", "X-Tenant-ID"] } },
{ "name": "SQLInjectionRule", "params": { "sensitivity": "high" } }
],
"arn:aws:execute-api:us-east-1:*:*/prod/GET/*": [
{ "name": "HeaderCheckRule", "params": { "required_headers": ["X-Request-ID"] } }
]
}
5. Serverless Framework 配置
最后,serverless.yml 将所有部分串联起来,定义 Lambda 函数和 API Gateway Authorizer。
# serverless.yml
service: programmable-api-waf
provider:
name: aws
runtime: python3.9
region: us-east-1
iam:
role:
statements:
# 允许 Lambda 读取 SSM 参数
- Effect: "Allow"
Action:
- "ssm:GetParameter"
Resource: "arn:aws:ssm:us-east-1:*:parameter/my-app/waf/rules"
functions:
apiWafAuthorizer:
handler: handler.handle
environment:
CONFIG_SOURCE: SSM
CONFIG_PATH: /my-app/waf/rules
# 如果有必要,可以启用 Provisioned Concurrency 来缓解冷启动
# provisionedConcurrency: 2
# 将此 authorizer 应用到你的 API Gateway endpoints
# 这里是一个示例,展示如何在一个具体的 HTTP API 上使用它
resources:
Resources:
MyHttpApi:
Type: AWS::ApiGatewayV2::Api
Properties:
Name: MyApiWithCustomWaf
ProtocolType: HTTP
MyApiAuthorizer:
Type: AWS::ApiGatewayV2::Authorizer
Properties:
ApiId: !Ref MyHttpApi
AuthorizerType: REQUEST
Name: WafLambdaAuthorizer
IdentitySource:
- "$request.header.Authorization" # 必须指定一个 Identity Source
AuthorizerUri:
Fn::Join:
- ""
- - "arn:aws:apigateway:"
- !Ref AWS::Region
- ":lambda:path/2015-03-31/functions/"
- !GetAtt ApiWafAuthorizerLambdaFunction.Arn
- "/invocations"
AuthorizerPayloadFormatVersion: "2.0" # 使用 2.0 payload
EnableSimpleResponses: true # 简化响应格式
# ... 在你的路由上引用这个 Authorizer
# MyApiRoute:
# Type: AWS::ApiGatewayV2::Route
# Properties:
# ApiId: !Ref MyHttpApi
# RouteKey: "GET /data"
# AuthorizationType: CUSTOM
# AuthorizerId: !Ref MyApiAuthorizer
# Target: !Join ...
局限性与未来展望
这个方案虽然灵活且成本可控,但也存在一些需要正视的局限性。
首先是性能。Lambda 的冷启动会给请求增加延迟。对于需要极低延迟的 API,这个 WAF 层的延迟可能是个问题。使用 Provisioned Concurrency 可以缓解此问题,但这会带来额外的固定成本,削弱了纯按需计费的优势。Lambda Authorizer 的结果可以被 API Gateway 缓存,对于基于 Token 或 Headers 的、在一定时间内不变的请求,这能极大地提升性能。但对于我们这种需要检查每个请求具体参数的场景,缓存 TTL 只能设为 0,无法利用缓存优势。
其次是功能完整性。这个轻量级 WAF 无法像成熟的商业 WAF 产品那样提供复杂的威胁情报、机器学习驱动的异常检测或对 L7 DDoS 的全面防护。它的强项在于可编程的业务逻辑安全,而非通用的威胁防护。
未来的优化路径可以探索几个方向。可以将对性能要求极高的规则(如复杂的正则表达式匹配)用 Go 或 Rust 编写成 Lambda Layer,通过 Python 的 CFFI 调用,以提升执行效率。规则配置的管理可以做得更完善,比如构建一个简单的管理界面来动态更新 S3 或 DynamoDB 中的规则集,并实现版本控制。最后,WAF 的日志应被送入集中的安全分析平台(如 OpenSearch 或 Splunk),与其他安全信号关联,形成更完整的安全态势感知能力。