基于Docker Swarm与Redis构建高可用WebSocket广播层的TDD实现


一台服务器上的 WebSocket 服务是脆弱的。当服务的单个实例承载了所有连接时,它不仅是性能瓶颈,更是一个致命的单点故障。在生产环境中,这意味着一次意外的进程崩溃或服务器重启,将导致所有实时会话瞬间中断。为了解决这个问题,我们自然会想到水平扩展——将服务部署为多个副本。Docker Swarm 提供了一个相对轻量的容器编排方案,能轻易实现这一点。但问题随之而来:如果一个用户连接在 node-1 的容器上,而触发其消息的业务逻辑发生在 node-2 的容器上,这条消息该如何跨越容器边界,准确地送达目标用户?

这就是我们面临的第一个,也是最核心的技术痛点:在分布式环境中,如何实现一个无状态、可水平扩展的 WebSocket 消息广播层。 任何依赖于单机内存来维护连接状态的方案,在集群环境下都会失效。

我们的初步构想是引入一个所有服务实例共享的“信使”,一个消息背板(Backplane)。当任何一个实例需要广播消息时,它不再直接推送给本地连接的客户端,而是将消息发布到这个共享背板上。所有实例都订阅这个背板。当它们收到来自背板的消息时,再检查各自本地维护的连接,将消息推送给对应的客户端。

对于这个背板,Redis 的 Pub/Sub 功能是一个非常务实的选择。它足够快,几乎所有技术栈都有成熟的客户端,且运维成本相对较低。虽然它不保证消息必达,但对于许多实时通知、状态同步等场景,这种“尽力而为”的交付模型已经足够。

整个架构的核心逻辑——连接管理、消息发布、背板订阅和本地派发——充满了并发操作和状态变更,极易出错。在真实项目中,依靠手动测试来保证这种分布式逻辑的正确性几乎是不可能的。因此,测试驱动开发(TDD)不是一个选项,而是必需品。它强制我们先定义清楚每个组件的行为和边界,然后通过编码让测试通过,这在构建这类复杂中间件时能提供极大的信心。

TDD起步:从单机模型开始

我们从最核心的组件 Hub 开始。Hub 的职责是管理 WebSocket 连接的生命周期。我们将使用 Go 语言,其内建的并发原语(goroutine 和 channel)非常适合处理这类 I/O 密集型任务。

首先,定义 Hub 的基本行为。我们需要一个测试来验证它能正确地注册、注销客户端,并且在广播时只向已注册的客户端发送消息。

// hub_test.go

package main

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
)

// MockClient 模拟一个真实的WebSocket连接,用于测试
type MockClient struct {
	id      string
	mu      sync.Mutex
	lastMsg []byte
	isClosed bool
}

func (c *MockClient) ID() string {
	return c.id
}

func (c *MockClient) Write(msg []byte) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.isClosed {
		return assert.AnError
	}
	c.lastMsg = msg
	return nil
}

func (c *MockClient) Close() {
    c.mu.Lock()
    defer c.mu.Unlock()
	c.isClosed = true
}

func (c *MockClient) LastMessage() []byte {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.lastMsg
}

// TestHub_SingleInstanceBroadcast 验证单实例Hub的基本功能
func TestHub_SingleInstanceBroadcast(t *testing.T) {
	hub := NewHub()
	go hub.Run(context.Background())

	client1 := &MockClient{id: "client-1"}
	client2 := &MockClient{id: "client-2"}
	
	// 注册两个客户端
	hub.Register(client1)
	hub.Register(client2)
	
	// 等待注册事件处理完成
	time.Sleep(100 * time.Millisecond) 

	// 广播消息
	message := []byte("hello world")
	hub.Broadcast(message)

	// 等待广播事件处理完成
	time.Sleep(100 * time.Millisecond)

	// 验证两个客户端都收到了消息
	assert.Equal(t, message, client1.LastMessage())
	assert.Equal(t, message, client2.LastMessage())

	// 注销一个客户端
	hub.Unregister(client1)
	time.Sleep(100 * time.Millisecond)
	
	// 再次广播
	newMessage := []byte("only for client-2")
	hub.Broadcast(newMessage)
	time.Sleep(100 * time.Millisecond)

	// 验证只有client2收到了新消息,client1的消息未变
	assert.Equal(t, message, client1.LastMessage()) // 仍是旧消息
	assert.Equal(t, newMessage, client2.LastMessage())
}

这个测试定义了 Hub 最基础的功能。现在我们来实现它。Hub 内部使用 channel 来处理并发的注册、注销和广播请求,避免了使用锁的复杂性。

// hub.go

package main

import (
	"context"
	"log"
)

// Client 定义了WebSocket连接需要实现的接口
type Client interface {
	ID() string
	Write(msg []byte) error
	Close()
}

// Hub 维护活跃的客户端集合,并向它们广播消息
type Hub struct {
	clients    map[string]Client
	broadcast  chan []byte
	register   chan Client
	unregister chan Client
}

func NewHub() *Hub {
	return &Hub{
		broadcast:  make(chan []byte),
		register:   make(chan Client),
		unregister: make(chan Client),
		clients:    make(map[string]Client),
	}
}

// Run 启动Hub的消息处理循环,这是Hub的核心
func (h *Hub) Run(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			// 上下文取消,关闭所有客户端连接
			for _, client := range h.clients {
				client.Close()
			}
			return
		case client := <-h.register:
			h.clients[client.ID()] = client
			log.Printf("Client registered: %s. Total clients: %d", client.ID(), len(h.clients))
		case client := <-h.unregister:
			if _, ok := h.clients[client.ID()]; ok {
				delete(h.clients, client.ID())
				client.Close()
				log.Printf("Client unregistered: %s. Total clients: %d", client.ID(), len(h.clients))
			}
		case message := <-h.broadcast:
			// 向所有本地连接的客户端广播消息
			for id, client := range h.clients {
				if err := client.Write(message); err != nil {
					log.Printf("Error writing to client %s: %v", id, err)
					// 在实际项目中,这里可能会触发一个注销操作
				}
			}
		}
	}
}

// Broadcast 是一个非阻塞的广播方法
func (h *Hub) Broadcast(message []byte) {
	select {
	case h.broadcast <- message:
	default:
		// 如果广播通道已满,可以选择丢弃或记录日志
		log.Println("Broadcast channel is full. Message dropped.")
	}
}

func (h *Hub) Register(client Client) {
	h.register <- client
}

func (h *Hub) Unregister(client Client) {
	h.unregister <- client
}

运行测试,它应该能顺利通过。我们现在有了一个功能完备但只能在单机运行的 Hub

引入Redis背板:从单机到集群

现在,我们必须直面分布式环境的挑战。当前的 Broadcast 方法只会将消息发送给连接到 同一个 Hub 实例的客户端。我们需要引入 Redis Pub/Sub 来连接所有 Hub 实例。

我们的 TDD 流程继续。编写一个失败的测试,模拟分布式场景。这个测试需要模拟两个 Hub 实例,一个 Broadcaster 将消息发布到 Redis,然后验证两个 Hub 实例都能收到消息并推送给各自的客户端。为了让测试环境更真实可控,我们可以使用 testcontainers-go 在测试期间启动一个临时的 Redis 容器。

// integration_test.go

package main

import (
    "context"
    "fmt"
    "testing"
    "time"

    "github.com/redis/go-redis/v9"
    "github.com/stretchr/testify/require"
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
)

// setupRedisContainer 启动一个用于集成测试的Redis容器
func setupRedisContainer(ctx context.Context) (redis.UniversalClient, func(), error) {
    req := testcontainers.ContainerRequest{
        Image:        "redis:7-alpine",
        ExposedPorts: []string{"6379/tcp"},
        WaitingFor:   wait.ForLog("Ready to accept connections"),
    }
    redisContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
        ContainerRequest: req,
        Started:          true,
    })
    if err != nil {
        return nil, nil, err
    }

    host, err := redisContainer.Host(ctx)
    if err != nil {
        return nil, nil, err
    }
    port, err := redisContainer.MappedPort(ctx, "6379")
    if err != nil {
        return nil, nil, err
    }

    redisAddr := fmt.Sprintf("%s:%s", host, port.Port())
    rdb := redis.NewClient(&redis.Options{Addr: redisAddr})

    cleanup := func() {
        _ = redisContainer.Terminate(ctx)
    }

    return rdb, cleanup, nil
}

func TestHub_DistributedBroadcast(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping integration test in short mode.")
    }

    ctx := context.Background()
    rdb, cleanup, err := setupRedisContainer(ctx)
    require.NoError(t, err)
    defer cleanup()

    redisChannel := "test-broadcast-channel"

    // 创建两个模拟不同节点的Hub实例
    hub1 := NewDistributedHub(rdb, redisChannel)
    hub2 := NewDistributedHub(rdb, redisChannel)
    
    go hub1.Run(ctx)
    go hub2.Run(ctx)
    
    // 给每个Hub注册一个客户端
    client1 := &MockClient{id: "client-on-hub1"}
    client2 := &MockClient{id: "client-on-hub2"}
    hub1.Register(client1)
    hub2.Register(client2)
    time.Sleep(200 * time.Millisecond) // 等待注册和订阅完成

    // 使用一个独立的Broadcaster发布消息到Redis
    broadcaster := NewRedisBroadcaster(rdb, redisChannel)
    message := []byte("distributed hello")
    err = broadcaster.Publish(ctx, message)
    require.NoError(t, err)
    
    // 等待消息通过Redis传播并被Hub处理
    time.Sleep(200 * time.Millisecond)

    // 验证两个Hub上的客户端都收到了消息
    assert.Equal(t, message, client1.LastMessage())
    assert.Equal(t, message, client2.LastMessage())
}

这个测试现在会失败,因为 NewDistributedHubNewRedisBroadcaster 还不存在。我们的任务就是让它通过。

我们需要修改 Hub,让它在启动时订阅 Redis Channel。同时,创建一个新的 RedisBroadcaster 用于发布消息。

// broadcaster.go

package main

import (
	"context"

	"github.com/redis/go-redis/v9"
)

type Broadcaster interface {
	Publish(ctx context.Context, message []byte) error
}

type RedisBroadcaster struct {
	client  redis.UniversalClient
	channel string
}

func NewRedisBroadcaster(client redis.UniversalClient, channel string) *RedisBroadcaster {
	return &RedisBroadcaster{
		client:  client,
		channel: channel,
	}
}

func (b *RedisBroadcaster) Publish(ctx context.Context, message []byte) error {
	return b.client.Publish(ctx, b.channel, message).Err()
}

现在是改造 Hub 的关键部分。我们需要在 Run 方法中增加一个 goroutine 来处理来自 Redis 的订阅消息。当收到消息时,将其推送到内部的 broadcast channel,这样就能复用之前的本地广播逻辑。

// distributed_hub.go (修改后的hub.go)

package main

import (
	"context"
	"log"
	
	"github.com/redis/go-redis/v9"
)

// ... Client接口定义不变 ...

type DistributedHub struct {
	clients    map[string]Client
	broadcast  chan []byte
	register   chan Client
	unregister chan Client

	// 新增Redis相关字段
	redisClient redis.UniversalClient
	channel     string
}

func NewDistributedHub(rdb redis.UniversalClient, channel string) *DistributedHub {
	return &DistributedHub{
		broadcast:   make(chan []byte, 256), // 增加缓冲区
		register:    make(chan Client),
		unregister:  make(chan Client),
		clients:     make(map[string]Client),
		redisClient: rdb,
		channel:     channel,
	}
}

// subscribeToRedis 启动一个goroutine来监听Redis频道
func (h *DistributedHub) subscribeToRedis(ctx context.Context) {
	pubsub := h.redisClient.Subscribe(ctx, h.channel)
	// 确保在函数退出时关闭订阅,释放资源
	defer pubsub.Close()
	
	// 等待订阅确认
	_, err := pubsub.Receive(ctx)
	if err != nil {
		log.Printf("Failed to subscribe to Redis channel %s: %v", h.channel, err)
		return
	}

	ch := pubsub.Channel()
	log.Printf("Successfully subscribed to Redis channel: %s", h.channel)

	for {
		select {
		case <-ctx.Done():
			log.Println("Context cancelled, stopping Redis subscription.")
			return
		case msg := <-ch:
			// 从Redis收到消息,推送到内部广播channel
			// 注意:这里我们不做任何过滤,所有实例都会收到,然后由本地广播逻辑处理
			log.Printf("Received message from Redis: %s", msg.Payload)
			h.broadcast <- []byte(msg.Payload)
		}
	}
}

func (h *DistributedHub) Run(ctx context.Context) {
    // 启动Redis订阅goroutine
	go h.subscribeToRedis(ctx)

	for {
		select {
		case <-ctx.Done():
			for _, client := range h.clients {
				client.Close()
			}
			return
		case client := <-h.register:
			h.clients[client.ID()] = client
			log.Printf("Client registered: %s. Total local clients: %d", client.ID(), len(h.clients))
		case client := <-h.unregister:
			if _, ok := h.clients[client.ID()]; ok {
				delete(h.clients, client.ID())
				client.Close()
				log.Printf("Client unregistered: %s. Total local clients: %d", client.ID(), len(h.clients))
			}
		case message := <-h.broadcast:
			// 这个逻辑保持不变,它只负责向本地连接的客户端广播
			for id, client := range h.clients {
				if err := client.Write(message); err != nil {
					log.Printf("Error writing to client %s: %v", id, err)
				}
			}
		}
	}
}
// Register, Unregister方法保持不变
func (h *DistributedHub) Register(client Client) {
	h.register <- client
}

func (h *DistributedHub) Unregister(client Client) {
	h.unregister <- client
}

再次运行我们的集成测试,现在它应该能够通过了。这个 TDD 周期验证了我们架构的核心:消息可以通过 Redis 背板在不同的服务实例间正确传递。

部署到 Docker Swarm

我们的代码逻辑已经过验证,现在是时候将它部署到生产模拟环境 Docker Swarm 中了。我们需要一个 Dockerfile 来容器化应用,以及一个 docker-compose.yml 文件来定义服务栈。

Dockerfile:

# Dockerfile
FROM golang:1.21-alpine AS builder

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .
# 使用-ldflags "-s -w"来减小最终二进制文件的大小
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /ws-gateway .

FROM alpine:latest

# 安全考量:以非root用户运行
RUN addgroup -S appgroup && adduser -S appuser -G appgroup
USER appuser

WORKDIR /home/appuser
COPY --from=builder /ws-gateway .

# 暴露端口
EXPOSE 8080

# 启动命令
CMD ["./ws-gateway"]

docker-compose.yml:

# docker-compose.yml
version: "3.8"

services:
  redis:
    image: "redis:7-alpine"
    networks:
      - ws-net
    deploy:
      replicas: 1
      placement:
        constraints: [node.role == manager] # 将Redis固定在manager节点

  ws-gateway:
    image: your-repo/ws-gateway:latest # 替换为你的镜像仓库地址
    networks:
      - ws-net
    ports:
      - "8080:8080"
    environment:
      - REDIS_ADDR=redis:6379 # Swarm内部DNS解析
      - REDIS_CHANNEL=global_broadcast
      - LISTEN_ADDR=:8080
    depends_on:
      - redis
    deploy:
      mode: replicated
      replicas: 3 # 部署3个副本以测试水平扩展
      update_config:
        parallelism: 1
        delay: 10s
      restart_policy:
        condition: on-failure

networks:
  ws-net:
    driver: overlay
    attachable: true

部署这个服务栈的命令非常简单:docker stack deploy -c docker-compose.yml websocket-stack

Docker Swarm 会自动创建3个 ws-gateway 服务的副本和一个 Redis 服务。Swarm 内置的 ingress routing mesh 会将外部对 8080 端口的请求负载均衡到这3个副本上。当一个新的 WebSocket 连接建立时,它会被路由到其中一个健康的容器。由于我们使用了 Redis 背板,无论业务逻辑在哪一个容器中触发广播,所有连接到任何一个容器的客户端都能收到消息。

我们可以通过 docker service logs websocket-stack_ws-gateway -f 来观察日志,看到不同副本中的客户端注册和接收消息的记录。

graph LR
    subgraph Client-Side
        C1[Client 1]
        C2[Client 2]
        C3[Client 3]
    end

    subgraph Docker Swarm Cluster
        LB(Ingress Load Balancer)

        subgraph Node 1
            GW1(Gateway Replica 1)
        end
        subgraph Node 2
            GW2(Gateway Replica 2)
        end
        subgraph Node 3
            GW3(Gateway Replica 3)
        end

        subgraph Shared Services
            Redis(Redis Pub/Sub)
        end

        GW1 -- subscribes/publishes --> Redis
        GW2 -- subscribes/publishes --> Redis
        GW3 -- subscribes/publishes --> Redis
    end
    
    subgraph Business Logic
        API[API Service]
    end

    C1 -- ws conn --> LB --> GW1
    C2 -- ws conn --> LB --> GW2
    C3 -- ws conn --> LB --> GW1
    
    API -- HTTP POST /broadcast --> LB --> GW3
    
    GW3 -- 1. Publish to Redis --> Redis
    Redis -- 2. Fan-out --> GW1 & GW2 & GW3
    
    GW1 -- 3. Push to local clients --> C1
    GW1 -- 3. Push to local clients --> C3
    GW2 -- 3. Push to local clients --> C2

这个架构图清晰地展示了消息的流动路径。一个来自外部的广播请求可能被路由到 GW3,但通过 Redis 背板,连接在 GW1GW2 上的客户端也能最终收到消息。

方案局限性与未来迭代方向

我们通过 TDD 构建了一个可水平扩展、容错的 WebSocket 广播层,并成功部署在 Docker Swarm 上。但这套方案并非银弹,它存在一些固有的局限性,在选择使用时必须清楚地认识到。

首先,Redis Pub/Sub 是一种“即发即弃”的消息模型。如果某个 ws-gateway 实例在消息发布时恰好处于离线状态(例如,正在重启或网络分区),它将永久性地错过这条消息。对于需要强一致性保证的场景,例如交易或重要指令通知,这个方案是不适用的。此时,应考虑使用 Redis Streams 或专门的消息队列(如 RabbitMQ, NATS Streaming)作为背板,它们提供了消息持久化和至少一次的投递保证。

其次,当前的实现是一个纯粹的广播系统。所有消息都被发送给所有实例,再由实例分发给所有本地连接的客户端。如果需要实现点对点消息或按频道/房间订阅,架构需要变得更复杂。一种可能的演进方向是为每个用户或每个房间创建独立的 Redis channel,但这会极大地增加 Redis 的订阅负载。一个更优的方案可能是在 Redis 中维护一个映射表(例如 Hash),记录{userID -> nodeID},广播时先查询此表,然后只向目标用户所在的节点发送消息,但这又引入了状态维护的复杂性。

最后,虽然 Docker Swarm 简化了部署,但它在自动伸缩、精细化资源控制和生态系统丰富度方面不如 Kubernetes。当业务规模增长到一定程度,需要更复杂的调度策略、服务网格或可观测性工具时,迁移到 Kubernetes 可能是不可避免的下一步。我们当前的设计是云原生的,更换底层的编排平台并不会对核心代码逻辑产生颠覆性影响,这本身也是一个好的架构特性。


  目录