一台服务器上的 WebSocket 服务是脆弱的。当服务的单个实例承载了所有连接时,它不仅是性能瓶颈,更是一个致命的单点故障。在生产环境中,这意味着一次意外的进程崩溃或服务器重启,将导致所有实时会话瞬间中断。为了解决这个问题,我们自然会想到水平扩展——将服务部署为多个副本。Docker Swarm 提供了一个相对轻量的容器编排方案,能轻易实现这一点。但问题随之而来:如果一个用户连接在 node-1 的容器上,而触发其消息的业务逻辑发生在 node-2 的容器上,这条消息该如何跨越容器边界,准确地送达目标用户?
这就是我们面临的第一个,也是最核心的技术痛点:在分布式环境中,如何实现一个无状态、可水平扩展的 WebSocket 消息广播层。 任何依赖于单机内存来维护连接状态的方案,在集群环境下都会失效。
我们的初步构想是引入一个所有服务实例共享的“信使”,一个消息背板(Backplane)。当任何一个实例需要广播消息时,它不再直接推送给本地连接的客户端,而是将消息发布到这个共享背板上。所有实例都订阅这个背板。当它们收到来自背板的消息时,再检查各自本地维护的连接,将消息推送给对应的客户端。
对于这个背板,Redis 的 Pub/Sub 功能是一个非常务实的选择。它足够快,几乎所有技术栈都有成熟的客户端,且运维成本相对较低。虽然它不保证消息必达,但对于许多实时通知、状态同步等场景,这种“尽力而为”的交付模型已经足够。
整个架构的核心逻辑——连接管理、消息发布、背板订阅和本地派发——充满了并发操作和状态变更,极易出错。在真实项目中,依靠手动测试来保证这种分布式逻辑的正确性几乎是不可能的。因此,测试驱动开发(TDD)不是一个选项,而是必需品。它强制我们先定义清楚每个组件的行为和边界,然后通过编码让测试通过,这在构建这类复杂中间件时能提供极大的信心。
TDD起步:从单机模型开始
我们从最核心的组件 Hub 开始。Hub 的职责是管理 WebSocket 连接的生命周期。我们将使用 Go 语言,其内建的并发原语(goroutine 和 channel)非常适合处理这类 I/O 密集型任务。
首先,定义 Hub 的基本行为。我们需要一个测试来验证它能正确地注册、注销客户端,并且在广播时只向已注册的客户端发送消息。
// hub_test.go
package main
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// MockClient 模拟一个真实的WebSocket连接,用于测试
type MockClient struct {
id string
mu sync.Mutex
lastMsg []byte
isClosed bool
}
func (c *MockClient) ID() string {
return c.id
}
func (c *MockClient) Write(msg []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.isClosed {
return assert.AnError
}
c.lastMsg = msg
return nil
}
func (c *MockClient) Close() {
c.mu.Lock()
defer c.mu.Unlock()
c.isClosed = true
}
func (c *MockClient) LastMessage() []byte {
c.mu.Lock()
defer c.mu.Unlock()
return c.lastMsg
}
// TestHub_SingleInstanceBroadcast 验证单实例Hub的基本功能
func TestHub_SingleInstanceBroadcast(t *testing.T) {
hub := NewHub()
go hub.Run(context.Background())
client1 := &MockClient{id: "client-1"}
client2 := &MockClient{id: "client-2"}
// 注册两个客户端
hub.Register(client1)
hub.Register(client2)
// 等待注册事件处理完成
time.Sleep(100 * time.Millisecond)
// 广播消息
message := []byte("hello world")
hub.Broadcast(message)
// 等待广播事件处理完成
time.Sleep(100 * time.Millisecond)
// 验证两个客户端都收到了消息
assert.Equal(t, message, client1.LastMessage())
assert.Equal(t, message, client2.LastMessage())
// 注销一个客户端
hub.Unregister(client1)
time.Sleep(100 * time.Millisecond)
// 再次广播
newMessage := []byte("only for client-2")
hub.Broadcast(newMessage)
time.Sleep(100 * time.Millisecond)
// 验证只有client2收到了新消息,client1的消息未变
assert.Equal(t, message, client1.LastMessage()) // 仍是旧消息
assert.Equal(t, newMessage, client2.LastMessage())
}
这个测试定义了 Hub 最基础的功能。现在我们来实现它。Hub 内部使用 channel 来处理并发的注册、注销和广播请求,避免了使用锁的复杂性。
// hub.go
package main
import (
"context"
"log"
)
// Client 定义了WebSocket连接需要实现的接口
type Client interface {
ID() string
Write(msg []byte) error
Close()
}
// Hub 维护活跃的客户端集合,并向它们广播消息
type Hub struct {
clients map[string]Client
broadcast chan []byte
register chan Client
unregister chan Client
}
func NewHub() *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan Client),
unregister: make(chan Client),
clients: make(map[string]Client),
}
}
// Run 启动Hub的消息处理循环,这是Hub的核心
func (h *Hub) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
// 上下文取消,关闭所有客户端连接
for _, client := range h.clients {
client.Close()
}
return
case client := <-h.register:
h.clients[client.ID()] = client
log.Printf("Client registered: %s. Total clients: %d", client.ID(), len(h.clients))
case client := <-h.unregister:
if _, ok := h.clients[client.ID()]; ok {
delete(h.clients, client.ID())
client.Close()
log.Printf("Client unregistered: %s. Total clients: %d", client.ID(), len(h.clients))
}
case message := <-h.broadcast:
// 向所有本地连接的客户端广播消息
for id, client := range h.clients {
if err := client.Write(message); err != nil {
log.Printf("Error writing to client %s: %v", id, err)
// 在实际项目中,这里可能会触发一个注销操作
}
}
}
}
}
// Broadcast 是一个非阻塞的广播方法
func (h *Hub) Broadcast(message []byte) {
select {
case h.broadcast <- message:
default:
// 如果广播通道已满,可以选择丢弃或记录日志
log.Println("Broadcast channel is full. Message dropped.")
}
}
func (h *Hub) Register(client Client) {
h.register <- client
}
func (h *Hub) Unregister(client Client) {
h.unregister <- client
}
运行测试,它应该能顺利通过。我们现在有了一个功能完备但只能在单机运行的 Hub。
引入Redis背板:从单机到集群
现在,我们必须直面分布式环境的挑战。当前的 Broadcast 方法只会将消息发送给连接到 同一个 Hub 实例的客户端。我们需要引入 Redis Pub/Sub 来连接所有 Hub 实例。
我们的 TDD 流程继续。编写一个失败的测试,模拟分布式场景。这个测试需要模拟两个 Hub 实例,一个 Broadcaster 将消息发布到 Redis,然后验证两个 Hub 实例都能收到消息并推送给各自的客户端。为了让测试环境更真实可控,我们可以使用 testcontainers-go 在测试期间启动一个临时的 Redis 容器。
// integration_test.go
package main
import (
"context"
"fmt"
"testing"
"time"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
// setupRedisContainer 启动一个用于集成测试的Redis容器
func setupRedisContainer(ctx context.Context) (redis.UniversalClient, func(), error) {
req := testcontainers.ContainerRequest{
Image: "redis:7-alpine",
ExposedPorts: []string{"6379/tcp"},
WaitingFor: wait.ForLog("Ready to accept connections"),
}
redisContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
return nil, nil, err
}
host, err := redisContainer.Host(ctx)
if err != nil {
return nil, nil, err
}
port, err := redisContainer.MappedPort(ctx, "6379")
if err != nil {
return nil, nil, err
}
redisAddr := fmt.Sprintf("%s:%s", host, port.Port())
rdb := redis.NewClient(&redis.Options{Addr: redisAddr})
cleanup := func() {
_ = redisContainer.Terminate(ctx)
}
return rdb, cleanup, nil
}
func TestHub_DistributedBroadcast(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode.")
}
ctx := context.Background()
rdb, cleanup, err := setupRedisContainer(ctx)
require.NoError(t, err)
defer cleanup()
redisChannel := "test-broadcast-channel"
// 创建两个模拟不同节点的Hub实例
hub1 := NewDistributedHub(rdb, redisChannel)
hub2 := NewDistributedHub(rdb, redisChannel)
go hub1.Run(ctx)
go hub2.Run(ctx)
// 给每个Hub注册一个客户端
client1 := &MockClient{id: "client-on-hub1"}
client2 := &MockClient{id: "client-on-hub2"}
hub1.Register(client1)
hub2.Register(client2)
time.Sleep(200 * time.Millisecond) // 等待注册和订阅完成
// 使用一个独立的Broadcaster发布消息到Redis
broadcaster := NewRedisBroadcaster(rdb, redisChannel)
message := []byte("distributed hello")
err = broadcaster.Publish(ctx, message)
require.NoError(t, err)
// 等待消息通过Redis传播并被Hub处理
time.Sleep(200 * time.Millisecond)
// 验证两个Hub上的客户端都收到了消息
assert.Equal(t, message, client1.LastMessage())
assert.Equal(t, message, client2.LastMessage())
}
这个测试现在会失败,因为 NewDistributedHub 和 NewRedisBroadcaster 还不存在。我们的任务就是让它通过。
我们需要修改 Hub,让它在启动时订阅 Redis Channel。同时,创建一个新的 RedisBroadcaster 用于发布消息。
// broadcaster.go
package main
import (
"context"
"github.com/redis/go-redis/v9"
)
type Broadcaster interface {
Publish(ctx context.Context, message []byte) error
}
type RedisBroadcaster struct {
client redis.UniversalClient
channel string
}
func NewRedisBroadcaster(client redis.UniversalClient, channel string) *RedisBroadcaster {
return &RedisBroadcaster{
client: client,
channel: channel,
}
}
func (b *RedisBroadcaster) Publish(ctx context.Context, message []byte) error {
return b.client.Publish(ctx, b.channel, message).Err()
}
现在是改造 Hub 的关键部分。我们需要在 Run 方法中增加一个 goroutine 来处理来自 Redis 的订阅消息。当收到消息时,将其推送到内部的 broadcast channel,这样就能复用之前的本地广播逻辑。
// distributed_hub.go (修改后的hub.go)
package main
import (
"context"
"log"
"github.com/redis/go-redis/v9"
)
// ... Client接口定义不变 ...
type DistributedHub struct {
clients map[string]Client
broadcast chan []byte
register chan Client
unregister chan Client
// 新增Redis相关字段
redisClient redis.UniversalClient
channel string
}
func NewDistributedHub(rdb redis.UniversalClient, channel string) *DistributedHub {
return &DistributedHub{
broadcast: make(chan []byte, 256), // 增加缓冲区
register: make(chan Client),
unregister: make(chan Client),
clients: make(map[string]Client),
redisClient: rdb,
channel: channel,
}
}
// subscribeToRedis 启动一个goroutine来监听Redis频道
func (h *DistributedHub) subscribeToRedis(ctx context.Context) {
pubsub := h.redisClient.Subscribe(ctx, h.channel)
// 确保在函数退出时关闭订阅,释放资源
defer pubsub.Close()
// 等待订阅确认
_, err := pubsub.Receive(ctx)
if err != nil {
log.Printf("Failed to subscribe to Redis channel %s: %v", h.channel, err)
return
}
ch := pubsub.Channel()
log.Printf("Successfully subscribed to Redis channel: %s", h.channel)
for {
select {
case <-ctx.Done():
log.Println("Context cancelled, stopping Redis subscription.")
return
case msg := <-ch:
// 从Redis收到消息,推送到内部广播channel
// 注意:这里我们不做任何过滤,所有实例都会收到,然后由本地广播逻辑处理
log.Printf("Received message from Redis: %s", msg.Payload)
h.broadcast <- []byte(msg.Payload)
}
}
}
func (h *DistributedHub) Run(ctx context.Context) {
// 启动Redis订阅goroutine
go h.subscribeToRedis(ctx)
for {
select {
case <-ctx.Done():
for _, client := range h.clients {
client.Close()
}
return
case client := <-h.register:
h.clients[client.ID()] = client
log.Printf("Client registered: %s. Total local clients: %d", client.ID(), len(h.clients))
case client := <-h.unregister:
if _, ok := h.clients[client.ID()]; ok {
delete(h.clients, client.ID())
client.Close()
log.Printf("Client unregistered: %s. Total local clients: %d", client.ID(), len(h.clients))
}
case message := <-h.broadcast:
// 这个逻辑保持不变,它只负责向本地连接的客户端广播
for id, client := range h.clients {
if err := client.Write(message); err != nil {
log.Printf("Error writing to client %s: %v", id, err)
}
}
}
}
}
// Register, Unregister方法保持不变
func (h *DistributedHub) Register(client Client) {
h.register <- client
}
func (h *DistributedHub) Unregister(client Client) {
h.unregister <- client
}
再次运行我们的集成测试,现在它应该能够通过了。这个 TDD 周期验证了我们架构的核心:消息可以通过 Redis 背板在不同的服务实例间正确传递。
部署到 Docker Swarm
我们的代码逻辑已经过验证,现在是时候将它部署到生产模拟环境 Docker Swarm 中了。我们需要一个 Dockerfile 来容器化应用,以及一个 docker-compose.yml 文件来定义服务栈。
Dockerfile:
# Dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
# 使用-ldflags "-s -w"来减小最终二进制文件的大小
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /ws-gateway .
FROM alpine:latest
# 安全考量:以非root用户运行
RUN addgroup -S appgroup && adduser -S appuser -G appgroup
USER appuser
WORKDIR /home/appuser
COPY /ws-gateway .
# 暴露端口
EXPOSE 8080
# 启动命令
CMD ["./ws-gateway"]
docker-compose.yml:
# docker-compose.yml
version: "3.8"
services:
redis:
image: "redis:7-alpine"
networks:
- ws-net
deploy:
replicas: 1
placement:
constraints: [node.role == manager] # 将Redis固定在manager节点
ws-gateway:
image: your-repo/ws-gateway:latest # 替换为你的镜像仓库地址
networks:
- ws-net
ports:
- "8080:8080"
environment:
- REDIS_ADDR=redis:6379 # Swarm内部DNS解析
- REDIS_CHANNEL=global_broadcast
- LISTEN_ADDR=:8080
depends_on:
- redis
deploy:
mode: replicated
replicas: 3 # 部署3个副本以测试水平扩展
update_config:
parallelism: 1
delay: 10s
restart_policy:
condition: on-failure
networks:
ws-net:
driver: overlay
attachable: true
部署这个服务栈的命令非常简单:docker stack deploy -c docker-compose.yml websocket-stack。
Docker Swarm 会自动创建3个 ws-gateway 服务的副本和一个 Redis 服务。Swarm 内置的 ingress routing mesh 会将外部对 8080 端口的请求负载均衡到这3个副本上。当一个新的 WebSocket 连接建立时,它会被路由到其中一个健康的容器。由于我们使用了 Redis 背板,无论业务逻辑在哪一个容器中触发广播,所有连接到任何一个容器的客户端都能收到消息。
我们可以通过 docker service logs websocket-stack_ws-gateway -f 来观察日志,看到不同副本中的客户端注册和接收消息的记录。
graph LR
subgraph Client-Side
C1[Client 1]
C2[Client 2]
C3[Client 3]
end
subgraph Docker Swarm Cluster
LB(Ingress Load Balancer)
subgraph Node 1
GW1(Gateway Replica 1)
end
subgraph Node 2
GW2(Gateway Replica 2)
end
subgraph Node 3
GW3(Gateway Replica 3)
end
subgraph Shared Services
Redis(Redis Pub/Sub)
end
GW1 -- subscribes/publishes --> Redis
GW2 -- subscribes/publishes --> Redis
GW3 -- subscribes/publishes --> Redis
end
subgraph Business Logic
API[API Service]
end
C1 -- ws conn --> LB --> GW1
C2 -- ws conn --> LB --> GW2
C3 -- ws conn --> LB --> GW1
API -- HTTP POST /broadcast --> LB --> GW3
GW3 -- 1. Publish to Redis --> Redis
Redis -- 2. Fan-out --> GW1 & GW2 & GW3
GW1 -- 3. Push to local clients --> C1
GW1 -- 3. Push to local clients --> C3
GW2 -- 3. Push to local clients --> C2
这个架构图清晰地展示了消息的流动路径。一个来自外部的广播请求可能被路由到 GW3,但通过 Redis 背板,连接在 GW1 和 GW2 上的客户端也能最终收到消息。
方案局限性与未来迭代方向
我们通过 TDD 构建了一个可水平扩展、容错的 WebSocket 广播层,并成功部署在 Docker Swarm 上。但这套方案并非银弹,它存在一些固有的局限性,在选择使用时必须清楚地认识到。
首先,Redis Pub/Sub 是一种“即发即弃”的消息模型。如果某个 ws-gateway 实例在消息发布时恰好处于离线状态(例如,正在重启或网络分区),它将永久性地错过这条消息。对于需要强一致性保证的场景,例如交易或重要指令通知,这个方案是不适用的。此时,应考虑使用 Redis Streams 或专门的消息队列(如 RabbitMQ, NATS Streaming)作为背板,它们提供了消息持久化和至少一次的投递保证。
其次,当前的实现是一个纯粹的广播系统。所有消息都被发送给所有实例,再由实例分发给所有本地连接的客户端。如果需要实现点对点消息或按频道/房间订阅,架构需要变得更复杂。一种可能的演进方向是为每个用户或每个房间创建独立的 Redis channel,但这会极大地增加 Redis 的订阅负载。一个更优的方案可能是在 Redis 中维护一个映射表(例如 Hash),记录{userID -> nodeID},广播时先查询此表,然后只向目标用户所在的节点发送消息,但这又引入了状态维护的复杂性。
最后,虽然 Docker Swarm 简化了部署,但它在自动伸缩、精细化资源控制和生态系统丰富度方面不如 Kubernetes。当业务规模增长到一定程度,需要更复杂的调度策略、服务网格或可观测性工具时,迁移到 Kubernetes 可能是不可避免的下一步。我们当前的设计是云原生的,更换底层的编排平台并不会对核心代码逻辑产生颠覆性影响,这本身也是一个好的架构特性。