基于Ruby与Redis构建分布式有状态WebSocket应用层防火墙


我们的实时推送服务节点从2个扩展到10个后,一个潜藏已久的问题终于爆发了。传统的网络防火墙策略,即在L4层面对端口和IP进行访问控制,对于我们基于WebSocket的应用层攻击束手无策。攻击者通过代理IP池,建立大量合法连接,然后发送海量小尺寸的垃圾数据包,或者构造畸形的JSON负载。这些攻击流量轻松穿透了边界防火墙,直接打到我们的Ruby on Rails ActionCable服务上,导致应用服务器CPU飙升,最终造成部分节点响应延迟,甚至服务不可用。

问题很明确:我们需要一个L7层的应用防火墙,它必须理解WebSocket协议,并能对消息内容和频率进行状态化检测。

方案A:单体应用内的In-Process中间件

初步的构想是在Rails应用内部实现一个中间件,比如一个Rack Middleware或者直接挂载在ActionCable的连接钩子上。

# /app/middleware/simple_websocket_firewall.rb
class SimpleWebsocketFirewall
  def initialize(app)
    @app = app
    # 使用内存哈希表存储IP访问频率
    @ip_tracker = {}
    @lock = Mutex.new
  end

  def call(env)
    # 仅对WebSocket连接进行处理
    if Faye::WebSocket.websocket?(env)
      ip = env['REMOTE_ADDR']
      
      @lock.synchronize do
        @ip_tracker[ip] ||= { count: 0, first_seen: Time.now }
        
        # 简单的速率限制:10秒内超过100次连接请求则拒绝
        if Time.now - @ip_tracker[ip][:first_seen] > 10
          @ip_tracker[ip] = { count: 1, first_seen: Time.now }
        else
          @ip_tracker[ip][:count] += 1
        end
        
        if @ip_tracker[ip][:count] > 100
          # 返回403 Forbidden,终止连接升级
          return [403, {'Content-Type' => 'text/plain'}, ['Forbidden']]
        end
      end
    end
    
    @app.call(env)
  end
end

这种方法的优势是实现简单,零外部依赖。但它的缺陷是致命的:状态无法在分布式节点间共享。我们的服务部署在多个Pod上,负载均衡器会将用户的连接随机分配到任意一个节点。攻击者可以轻易地通过轮询连接不同的节点来绕过单个节点内的内存速率限制。每个节点都只有一个关于流量的局部视图,无法形成全局的防御策略。在真实项目中,这种方案几乎没有实用价值。

方案B:专用的外部API网关或WAF

另一个方向是引入一个重量级的外部组件,例如使用Nginx + Lua脚本,或者采购商业的Web应用防火墙(WAF)产品,让它来终结WebSocket连接,并对流量进行深度包检测。

这种方案的优点在于功能强大、专业。商业WAF通常内置了丰富的规则集来防御常见的Web攻击。但它也带来了新的复杂性:

  1. 运维成本高昂: 部署和维护一套高可用的网关集群本身就是一项复杂的工程。
  2. 灵活性受限: 通用型WAF产品可能无法很好地理解我们应用特有的业务逻辑。例如,我们可能需要根据消息体内的action字段来做更精细的限流,这在通用WAF上配置起来非常困难甚至不可能。
  3. 延迟增加: 所有流量都必须经过一个额外的网络跳点,对于延迟敏感的实时应用来说,这可能无法接受。
  4. 技术锁定: 一旦采用特定厂商的解决方案,后续的迁移和定制都会变得非常困难。

在评估了运维开销和灵活性之后,我们放弃了这个选项。我们需要的是一个既能感知分布式状态,又能与我们的应用逻辑紧密结合的轻量级解决方案。

最终选择:构建基于Redis的分布式有状态防火墙中间件

我们的最终决策是自行构建一个分布式的应用层防火墙。它以Ruby Gem的形式存在,无缝集成到现有的Rails应用中。其核心是利用一个所有节点共享的外部状态存储——Redis,来同步和维护每个连接的安全上下文。

这个方案兼顾了性能、灵活性和分布式特性。Redis的高性能读写和丰富的数据结构为我们提供了实现复杂规则的基础,同时它作为我们技术栈的已有组件,不会引入新的运维负担。

架构设计

整体架构如下,防火墙中间件嵌入在每个Ruby应用节点中,所有节点共享同一个Redis实例来同步安全状态。

graph TD
    subgraph "用户端"
        Client1
        Client2
        Client3
    end

    subgraph "基础设施"
        LB[负载均衡器]
    end

    subgraph "应用集群 (Kubernetes Pods)"
        Node1[Rails App 1]
        Node2[Rails App 2]
        Node3[Rails App 3]
    end
    
    subgraph "共享状态存储"
        Redis[(Redis)]
    end

    Client1 --> LB
    Client2 --> LB
    Client3 --> LB

    LB --> Node1
    LB --> Node2
    LB --> Node3

    Node1 -- "读/写安全状态" --> Redis
    Node2 -- "读/写安全状态" --> Redis
    Node3 -- "读/写安全状态" --> Redis
    
    style Node1 text-align:left,padding:10px
    style Node2 text-align:left,padding:10px
    style Node3 text-align:left,padding:10px

    Node1_Content["- ActionCable
- Firewall Middleware"] Node2_Content["- ActionCable
- Firewall Middleware"] Node3_Content["- ActionCable
- Firewall Middleware"] Node1 --- Node1_Content Node2 --- Node2_Content Node3 --- Node3_Content

核心实现

我们将这个防火墙封装成一个名为 StatefulWsFirewall 的Gem。

1. 中间件入口 (lib/stateful_ws_firewall/middleware.rb)

这是流量的第一道关卡。我们将其挂载到ActionCable的中间件栈上。它负责在WebSocket连接建立时初始化安全上下文,并在收到每条消息时执行规则检查。

# lib/stateful_ws_firewall/middleware.rb
require 'faye/websocket'
require_relative 'state'
require_relative 'rule_engine'
require_relative 'logger'

module StatefulWsFirewall
  class Middleware
    def initialize(app)
      @app = app
    end

    def call(env)
      return @app.call(env) unless Faye::WebSocket.websocket?(env)
      
      # 从env中获取唯一连接标识和IP
      connection_id = env['action_cable.connection_identifier'] || SecureRandom.uuid
      ip_address = env['REMOTE_ADDR']

      state = State.new(connection_id, ip_address)
      engine = RuleEngine.new(state)

      # 检查连接建立规则
      unless engine.evaluate(:connect)
        Logger.warn("Connection rejected for #{ip_address}. Reason: #{engine.last_rejection_reason}")
        return [403, { 'Content-Type' => 'text/plain' }, ['Forbidden']]
      end
      
      # 包装原始WebSocket,以便拦截消息
      ws = Faye::WebSocket.new(env)
      
      ws.on :message do |event|
        begin
          parsed_data = JSON.parse(event.data)
          # 检查消息规则
          unless engine.evaluate(:message, payload: parsed_data)
            Logger.warn("Message rejected for #{connection_id}. Reason: #{engine.last_rejection_reason}")
            # 主动关闭连接,而不是仅仅忽略消息
            ws.close(1008, 'Policy Violation') 
            next
          end
          # 规则通过,将消息转发给应用
          # 这里需要一种机制将消息传递给ActionCable,实际实现中会更复杂
          # 为简化,我们假设这里有一个转发器
          # ActionCable.server.broadcast("some_channel", { data: event.data })
        rescue JSON::ParserError
          Logger.warn("Malformed JSON received from #{connection_id}. Closing connection.")
          ws.close(1003, 'Unsupported Data')
        end
      end

      ws.on :close do |event|
        # 清理状态
        state.cleanup
        Logger.info("Connection closed for #{connection_id}. Code: #{event.code}, Reason: #{event.reason}")
      end
      
      # 返回WebSocket的响应
      ws.rack_response
    end
  end
end

这里的关键点在于,我们没有直接让ActionCable处理WebSocket,而是自己创建了一个Faye::WebSocket实例来代理它。这使得我们能完全控制消息的生命周期,从而在消息被ActionCable消费前执行我们的防火墙逻辑。

2. 状态管理器 (lib/stateful_ws_firewall/state.rb)

状态管理器是整个系统的核心,它负责与Redis交互。我们使用Redis的Hash来存储每个连接的状态,并利用Lua脚本保证操作的原子性,这对于高并发场景下的计数和检查至关重要。

# lib/stateful_ws_firewall/state.rb
require 'redis'
require 'json'

module StatefulWsFirewall
  class State
    # Lua脚本用于原子性地增加计数器并返回新值
    # ARGV[1]: key, ARGV[2]: field, ARGV[3]: increment_by
    ATOMIC_HINCRBY = <<~LUA
      local current_val = redis.call('HINCRBY', ARGV[1], ARGV[2], ARGV[3])
      return current_val
    LUA

    def initialize(connection_id, ip_address)
      @redis = StatefulWsFirewall.redis
      @conn_id = connection_id
      @ip = ip_address
      @conn_key = "wsfw:conn:#{@conn_id}"
      @ip_key = "wsfw:ip:#{@ip}"

      # 初始化连接状态,设置5分钟过期时间
      @redis.hmset(@conn_key, 'ip', @ip, 'connected_at', Time.now.to_i)
      @redis.expire(@conn_key, 300)
    end

    # 原子性地增加某个计数器
    def increment(scope, counter_name, amount = 1, ttl: 60)
      key = (scope == :connection) ? @conn_key : @ip_key
      
      # 首次写入时设置过期时间
      @redis.expire(key, ttl) unless @redis.ttl(key) > 0

      # 使用evalsha避免每次都传输脚本
      sha = @redis.script(:load, ATOMIC_HINCRBY)
      @redis.evalsha(sha, argv: [key, counter_name.to_s, amount.to_s]).to_i
    end
    
    def get_value(scope, field_name)
      key = (scope == :connection) ? @conn_key : @ip_key
      @redis.hget(key, field_name.to_s)
    end

    def cleanup
      # 在连接关闭时,清理连接特定的状态
      # IP级别的状态通常需要保留一段时间用于分析
      @redis.del(@conn_key)
    end
  end
end

一个常见的错误是在Ruby客户端中执行 “GET -> a++ -> SET” 的操作,这是一个典型的竞态条件。在分布式环境中,必须使用Redis事务(MULTI/EXEC)或Lua脚本来确保原子性。我们选择Lua,因为EVALSHA可以缓存脚本,性能更高。

3. 规则引擎 (lib/stateful_ws_firewall/rule_engine.rb)

规则引擎是防火墙的大脑。我们设计了一个简单的DSL,让开发者可以在配置文件中方便地定义规则。

# lib/stateful_ws_firewall/rule_engine.rb
module StatefulWsFirewall
  class RuleEngine
    attr_reader :last_rejection_reason

    def initialize(state)
      @state = state
      @rules = StatefulWsFirewall.config.rules
      @last_rejection_reason = nil
    end

    def evaluate(event_type, context = {})
      rules_for_event = @rules.fetch(event_type, [])
      
      rules_for_event.all? do |rule|
        # 每个规则是一个lambda,返回true或false
        # 这种设计允许极高的灵活性
        result = rule.call(@state, context)
        unless result
          # 记录失败原因,用于日志和调试
          # 在真实的实现中,规则本身应该能提供更详细的失败信息
          @last_rejection_reason = "Rule failed: #{rule.source_location.join(':')}"
        end
        result
      end
    end
  end
end

4. 配置 (config/initializers/stateful_ws_firewall.rb)

将所有可配置项集中管理,包括Redis连接和防火墙规则。

# config/initializers/stateful_ws_firewall.rb
require 'stateful_ws_firewall'

StatefulWsFirewall.configure do |config|
  # Redis连接配置
  config.redis = Redis.new(url: ENV['REDIS_URL'])

  # 定义防火墙规则
  config.rules = {
    # 连接建立时的规则
    connect: [
      # 规则1: 单个IP在60秒内最多建立10个连接
      lambda do |state, context|
        count = state.increment(:ip, :connection_count, 1, ttl: 60)
        count <= 10
      end
    ],
    # 收到消息时的规则
    message: [
      # 规则2: 单个连接每10秒最多发送50条消息
      lambda do |state, context|
        count = state.increment(:connection, :message_count, 1, ttl: 10)
        count <= 50
      end,
      # 规则3: 消息体必须包含 'action' 字段
      lambda do |state, context|
        payload = context[:payload]
        payload.is_a?(Hash) && payload.key?('action')
      end,
      # 规则4: 'chat' action 的消息体长度不能超过1024字节
      lambda do |state, context|
        payload = context[:payload]
        return true unless payload['action'] == 'chat'
        
        payload.to_json.bytesize <= 1024
      end
    ]
  }
end

# 将中间件插入到ActionCable的连接栈中
Rails.application.config.after_initialize do
  ActionCable.server.config.middleware.use(StatefulWsFirewall::Middleware)
end

这种基于Lambda的规则定义方式非常强大,它允许我们访问state对象和消息上下文context,从而实现任何复杂的、与业务逻辑相关的校验。

单元测试思路

对于这样的核心安全组件,测试至关重要。

  • RuleEngine: 单元测试应覆盖所有规则。使用double来模拟State对象,验证引擎在不同状态和消息负载下是否返回了正确的truefalse
  • State: 这是集成测试。需要一个真实的Redis实例。测试increment的原子性(可以在多线程环境中测试),检查cleanup是否正确删除了键,以及TTL是否按预期设置。
  • Middleware: 进行请求级别的测试。使用rack-test或类似的工具模拟WebSocket连接请求,验证被拒绝的连接是否返回403,以及合法的连接是否能成功建立。

架构的扩展性与局限性

当前这套实现已经能有效地防御我们遇到的应用层DoS攻击。它的扩展性也很好,我们可以轻易地添加更复杂的规则,例如:

  • 基于用户身份(而不仅仅是IP)的限流。
  • 检测消息负载中的恶意模式(SQL注入、XSS等)。
  • 与外部威胁情报库联动,动态封禁恶意IP。

然而,这个方案也并非银弹。它的主要局限性在于:

  1. Redis成为潜在瓶颈: 尽管Redis性能极高,但在超大规模的连接场景下(百万级并发连接),频繁的读写操作仍然可能使其成为瓶颈或单点故障。可能需要引入Redis集群或考虑其他更高性能的键值存储。
  2. 网络延迟: 每次消息处理都需要与Redis进行一次网络往返,这会给消息处理增加固定的延迟。虽然在局域网内通常在1毫秒以内,但在对延迟要求极为严苛的场景(如高频交易)下可能需要权衡。
  3. 规则复杂度的性能影响: 过于复杂的规则(例如需要进行大量计算或多次Redis查询)会拖慢消息处理速度,进而影响整个应用的吞吐量。规则的设计必须始终将性能考虑在内。

最终,这个自研的分布式防火墙中间件,是在特定业务场景、现有技术栈和运维成本之间做出的一个务实权衡。它用相对较低的成本,精准地解决了传统防火墙在现代分布式实时应用面前的无力。


  目录