基于 Raft 协议构建兼容 Memcached 协议的分布式 K-V 存储


在真实项目中,Memcached 依然是许多系统简单、高速缓存层的首选。但它的致命弱点从未改变:纯粹的单点架构。任何一个 Memcached 实例宕机,其上的所有缓存数据都会丢失,并可能引发缓存雪崩,直接冲击后端数据库。虽然客户端可以实现复杂的分片和故障转移逻辑,但这将复杂性转嫁给了业务应用,且无法保证数据在节点间的一致性。

我们面临的挑战是:能否构建一个具备高可用性和数据一致性的缓存集群,同时对现有应用保持 Memcached 协议的兼容性,从而实现无缝替换?这意味着我们需要一个底层的复制状态机,而 Raft 协议正是为此设计的。

初步构想是,用 Raft 协议来保证一系列 K-V 操作指令(如 SET, DELETE)在集群所有节点上以相同的顺序执行。每个节点都维护一个内存中的 K-V 存储。客户端无论连接到哪个节点,其写操作都会被转发到 Raft 集群的 Leader 节点,Leader 成功提交日志后,状态机应用该指令,集群数据达成一致。

技术选型与架构核心

  1. 一致性核心:Apache Ratis
    我们将不重复造轮子,直接采用 Apache Ratis 这一成熟的 Raft 协议 Java 实现。它提供了清晰的 StateMachine 接口,让我们能专注于业务逻辑,即 K-V 操作的实现。

  2. K-V 存储引擎:内存 HashMap
    为了模拟 Memcached 的行为,我们将使用一个简单的 java.util.concurrent.ConcurrentHashMap 作为底层的 K-V 存储。在 Ratis 的 StateMachine 中对它进行操作。

  3. 网络层:Netty
    我们需要一个高性能的网络服务器来解析 Memcached 的文本协议。Netty 是不二之选,它提供了强大的异步事件驱动网络框架,能让我们精确控制协议的编解码过程。

  4. 集群接入层:Caddy
    应用端不应该感知 Raft 集群的 Leader 是谁。我们需要一个智能的代理。Caddy 不仅能代理 HTTP,其 layer4 app 同样可以处理 TCP 流量。我们将利用 Caddy 的健康检查能力,动态地将流量仅转发给 Raft 集群中的 Leader 节点。

  5. 构建与部署:Jib
    最终的服务节点将以容器化形式部署。Jib 可以直接从 Maven 或 Gradle 构建出高度优化的容器镜像,无需 Dockerfile,也无需本地 Docker daemon,非常适合集成到 CI/CD 流程中。

架构流程设计

客户端请求的完整生命周期如下:

sequenceDiagram
    participant Client as 客户端
    participant Caddy as Caddy (TCP 代理)
    participant Follower as Raft Follower 节点
    participant Leader as Raft Leader 节点

    Client->>Caddy: 发送 Memcached 命令 (e.g., SET a 0 0 5\r\nvalue)
    Note over Caddy: 健康检查发现 Leader
    Caddy->>Leader: 转发 TCP 流量
    
    Leader->>Leader: 解析 Memcached 命令
    Leader->>Leader: 封装为 Raft Log Entry
    Leader->>Follower: Replicate Log
    Follower-->>Leader: Acknowledge
    Leader->>Leader: 达到多数派, Commit Log
    Leader->>Leader: 应用到 StateMachine (更新 HashMap)
    Leader-->>Client: 响应 (e.g., STORED\r\n)

    %% 写操作到 Follower 的情况
    Client->>Caddy: (Caddy 配置错误或 Leader 切换瞬间)
    Caddy->>Follower: 转发 TCP 流量
    Follower->>Follower: 解析 Memcached 命令
    Note over Follower: 识别出自身非 Leader
    Follower-->>Client: 返回错误或无响应 (取决于实现)
    Note over Caddy: 健康检查失败,将 Follower 踢出上游

核心实现:Raft 状态机

KeyValueStateMachine.java 是整个系统的核心。它实现了 Ratis 的 StateMachine 接口,所有 Raft 日志的最终应用都在这里发生。

import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class KeyValueStateMachine extends BaseStateMachine {

    // 实际的 K-V 存储
    private final ConcurrentHashMap<String, String> store = new ConcurrentHashMap<>();

    /**
     * 应用已提交的 Raft 日志。这是唯一可以修改状态机状态的地方。
     * 必须是确定性的,即给定相同的输入,总能产生相同的输出。
     * @param trx Transaction context
     * @return 包含操作结果的 Message
     */
    @Override
    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
        RaftProtos.LogEntryProto entry = trx.getLogEntry();
        String command = entry.getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8);

        // 解析指令,格式为 "COMMAND:KEY:VALUE" 或 "COMMAND:KEY"
        String[] parts = command.split(":", 3);
        String cmd = parts[0];
        String key = parts.length > 1 ? parts[1] : null;

        String response = "ERROR\r\n"; // 默认错误响应

        try {
            switch (cmd) {
                case "SET":
                    if (parts.length == 3) {
                        String value = parts[2];
                        store.put(key, value);
                        response = "STORED\r\n";
                    } else {
                        response = "CLIENT_ERROR bad command line format for set\r\n";
                    }
                    break;
                case "DELETE":
                    if (key != null && store.remove(key) != null) {
                        response = "DELETED\r\n";
                    } else {
                        response = "NOT_FOUND\r\n";
                    }
                    break;
                default:
                    response = "CLIENT_ERROR unknown command\r\n";
                    break;
            }
        } catch (Exception e) {
            // 在生产环境中,这里需要详尽的日志记录
            response = "SERVER_ERROR " + e.getMessage() + "\r\n";
        }

        return CompletableFuture.completedFuture(Message.valueOf(response));
    }

    /**
     * 处理只读查询请求。这不会通过 Raft 日志,可以直接访问状态机。
     * 为了强一致性,可以转发给 Leader;为了高性能,可以接受轻微的延迟,直接在 Follower 上读。
     * 我们这里实现一个简单的本地读取。
     * @param request The read request
     * @return 包含查询结果的 CompletableFuture
     */
    @Override
    public CompletableFuture<Message> query(Message request) {
        String command = request.getContent().toString(StandardCharsets.UTF_8);
        String[] parts = command.split(":", 2);
        String cmd = parts[0];
        String key = parts.length > 1 ? parts[1] : null;

        if ("GET".equals(cmd) && key != null) {
            String value = store.get(key);
            if (value != null) {
                // 格式: VALUE <key> <flags> <bytes>\r\n<data>\r\nEND\r\n
                String header = String.format("VALUE %s 0 %d\r\n", key, value.getBytes(StandardCharsets.UTF_8).length);
                return CompletableFuture.completedFuture(Message.valueOf(header + value + "\r\nEND\r\n"));
            } else {
                return CompletableFuture.completedFuture(Message.valueOf("END\r\n"));
            }
        }
        return CompletableFuture.completedFuture(Message.valueOf("CLIENT_ERROR bad command line format for get\r\n"));
    }
}

这里的关键在于 applyTransaction 方法。所有写操作(SET, DELETE)都必须封装成 Raft 日志,通过该方法原子地应用到 ConcurrentHashMap 中。而 query 方法用于处理读请求(GET),它可以绕过 Raft 日志,直接读取本地状态,但这可能导致读取到旧数据。在真实项目中,需要根据业务对一致性的要求来决定如何处理读请求,比如提供不同的一致性级别选项(线性读、follower 读等)。

网络层:Memcached 协议解析

使用 Netty 实现协议服务器,核心是 MemcachedProtocolHandler。它负责将 TCP 字节流解码为命令,然后将写命令提交给 Ratis client,将读命令直接查询状态机。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;

import java.nio.charset.StandardCharsets;

public class MemcachedProtocolHandler extends SimpleChannelInboundHandler<String> {

    private final RaftClient raftClient;
    private final KeyValueStateMachine stateMachine; // 用于本地查询

    // 状态机,用于处理多行命令,如 "set"
    private enum State {
        COMMAND,
        DATA
    }

    private State currentState = State.COMMAND;
    private String pendingCommand;
    private int dataLength;

    public MemcachedProtocolHandler(RaftClient raftClient, KeyValueStateMachine stateMachine) {
        this.raftClient = raftClient;
        this.stateMachine = stateMachine;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if (currentState == State.DATA) {
            handleData(ctx, msg);
            return;
        }

        String[] parts = msg.trim().split("\\s+");
        String command = parts[0].toUpperCase();

        switch (command) {
            case "SET":
                handleSetCommand(ctx, parts);
                break;
            case "GET":
                handleGetCommand(ctx, parts);
                break;
            case "DELETE":
                handleDeleteCommand(ctx, parts);
                break;
            case "QUIT":
                ctx.close();
                break;
            default:
                ctx.writeAndFlush("CLIENT_ERROR unsupported command\r\n");
        }
    }

    private void handleSetCommand(ChannelHandlerContext ctx, String[] parts) {
        if (parts.length != 5) {
            ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'set'\r\n");
            return;
        }
        try {
            this.dataLength = Integer.parseInt(parts[4]);
            this.pendingCommand = String.format("SET:%s", parts[1]); // Key
            this.currentState = State.DATA;
        } catch (NumberFormatException e) {
            ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'set'\r\n");
        }
    }
    
    private void handleData(ChannelHandlerContext ctx, String data) {
        // 简单实现,未严格检查字节长度,生产代码需要更健壮
        if (pendingCommand != null) {
            String raftMessage = pendingCommand + ":" + data.trim();
            try {
                // 写操作必须通过 Raft Client 提交
                RaftClientReply reply = raftClient.io().send(Message.valueOf(raftMessage));
                if (reply.isSuccess()) {
                    ctx.writeAndFlush(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
                } else {
                    // 这里需要处理 Raft 提交失败的情况,例如 NOT_LEADER 异常
                    ctx.writeAndFlush("SERVER_ERROR " + reply.getException().getMessage() + "\r\n");
                }
            } catch (Exception e) {
                 ctx.writeAndFlush("SERVER_ERROR raft submission failed\r\n");
            }
        }
        resetState();
    }

    private void handleGetCommand(ChannelHandlerContext ctx, String[] parts) {
        // GET <key>
        if (parts.length != 2) {
            ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'get'\r\n");
            return;
        }
        String key = parts[1];
        String raftQuery = "GET:" + key;
        
        // 读操作直接查询本地状态机,以获得更高性能
        stateMachine.query(Message.valueOf(raftQuery))
                .thenAccept(reply -> ctx.writeAndFlush(reply.getContent().toString(StandardCharsets.UTF_8)));
    }

    private void handleDeleteCommand(ChannelHandlerContext ctx, String[] parts) {
        // DELETE <key>
        if (parts.length != 2) {
             ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'delete'\r\n");
            return;
        }
        String key = parts[1];
        String raftMessage = "DELETE:" + key;
        try {
            RaftClientReply reply = raftClient.io().send(Message.valueOf(raftMessage));
            ctx.writeAndFlush(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
        } catch (Exception e) {
            ctx.writeAndFlush("SERVER_ERROR raft submission failed\r\n");
        }
    }

    private void resetState() {
        this.currentState = State.COMMAND;
        this.pendingCommand = null;
        this.dataLength = 0;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 生产级代码应记录异常
        cause.printStackTrace();
        ctx.close();
    }
}

这个处理器是一个简化的实现,它展示了如何区分读写命令,并将它们分别路由到 Raft 提交路径和本地查询路径。一个常见的坑在于对多行命令(如set)的处理,需要一个简单的状态机来管理。

集群接入:Caddy 作为 Leader 感知代理

手动将客户端指向 Leader 节点是不可行的。我们需要一个代理,它能自动发现 Leader 并将流量只导向它。Caddy 的 layer4 app 和 http 健康检查可以巧妙地实现这一点。虽然 Memcached 是 TCP 协议,但我们可以让 Ratis 节点额外暴露一个 HTTP 端点,仅用于健康检查。当节点是 Leader 时,该端点返回 200;否则返回 503。

Caddyfile 配置如下:

# Caddyfile

{
    # 启用 layer4 app,用于处理原始 TCP/UDP 流量
    "apps": {
        "layer4": {
            "servers": {
                "memcached_cluster": {
                    "listen": [":11211"], # 对外暴露的 Memcached 端口
                    "routes": [
                        {
                            "match": [{}],
                            "handle": [
                                {
                                    "handler": "proxy",
                                    # 负载均衡策略:只选择第一个健康的上游
                                    "load_balancing": {
                                        "policy": "first"
                                    },
                                    "upstreams": [
                                        # 定义 Raft 节点池
                                        {"dial": "raft-node-1:11211"},
                                        {"dial": "raft-node-2:11211"},
                                        {"dial": "raft-node-3:11211"}
                                    ],
                                    # 核心:主动健康检查
                                    "health_checks": {
                                        "active": {
                                            "path": "/health", # Ratis 节点暴露的 HTTP 健康检查端点
                                            "port": "8080",    # HTTP 端口
                                            "interval": "5s",
                                            "timeout": "2s"
                                        }
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        }
    }
}

这个配置的精妙之处在于:

  1. layer4 处理 TCP 流量,保持了 Memcached 协议的原始性。
  2. health_checks.active 会定期请求每个上游节点的 /health 接口。
  3. 我们编码 Ratis 节点,只有 Leader 节点的 /health 才返回 HTTP 200
  4. "policy": "first" 确保了 Caddy 总是将流量发送到上游列表中第一个健康(即 Leader)的节点。当 Leader 宕机或降级为 Follower,健康检查会失败,Caddy 会自动切换到下一个变为健康的节点(新选举出的 Leader)。

容器化构建:使用 Jib

最后一步是将我们的 Java 应用打包成容器镜像。相比传统的 Dockerfile,Jib 对 Java 应用的优化更胜一筹。它能自动将应用分为多个层(依赖、资源、类文件),当代码变更时,只需重新构建最上层的类文件层,极大地提升了构建速度。

pom.xml 中配置 Jib 插件:

<plugin>
    <groupId>com.google.cloud.tools</groupId>
    <artifactId>jib-maven-plugin</artifactId>
    <version>3.3.2</version>
    <configuration>
        <from>
            <!-- 使用一个精简的基础镜像 -->
            <image>eclipse-temurin:17-jre-focal</image>
        </from>
        <to>
            <!-- 目标镜像名称 -->
            <image>my-registry/raft-memcached-node:1.0.0</image>
        </to>
        <container>
            <!-- 暴露 Memcached 协议端口和健康检查端口 -->
            <ports>
                <port>11211</port>
                <port>8080</port>
            </ports>
            <mainClass>com.mycompany.raft.Main</mainClass>
        </container>
    </configuration>
</plugin>

只需运行 mvn compile jib:build,Jib 就会在无需 Docker daemon 的情况下完成镜像的构建和推送。这对于在没有 Docker-in-Docker 权限的 CI 环境中尤其有用。

方案的局限性与未来路径

这个实现验证了核心架构的可行性,但在生产环境中还存在诸多不足。

首先,GET 请求直接读取本地状态机,这是一种性能与一致性的权衡。它可能读取到尚未应用最新日志的旧数据。要实现线性一致性读,GET 请求也必须通过 Raft 协议,或者使用 Leader 租约(Lease Read)等优化机制,但这会显著增加读请求的延迟。

其次,当前的实现没有处理 Raft 集群的成员变更。在生产系统中,节点的增删是常态,需要一套安全的流程来动态调整 Raft Group 配置,Ratis 本身支持此功能,但需要在应用层进行集成。

再者,状态机快照(Snapshot)机制缺失。随着运行时间增长,Raft 日志会无限膨胀,导致新节点加入或节点重启后的恢复时间过长。定期对内存中的 HashMap 进行快照并截断旧日志是必须实现的功能。

最后,Memcached 协议支持 casincr/decr 等原子操作,这些操作需要更复杂的 applyTransaction 逻辑来保证原子性,简单的 putremove 无法满足。这指明了未来功能迭代的方向,即在 Raft 的一致性保证下,实现更丰富的原子命令。


  目录