在真实项目中,Memcached 依然是许多系统简单、高速缓存层的首选。但它的致命弱点从未改变:纯粹的单点架构。任何一个 Memcached 实例宕机,其上的所有缓存数据都会丢失,并可能引发缓存雪崩,直接冲击后端数据库。虽然客户端可以实现复杂的分片和故障转移逻辑,但这将复杂性转嫁给了业务应用,且无法保证数据在节点间的一致性。
我们面临的挑战是:能否构建一个具备高可用性和数据一致性的缓存集群,同时对现有应用保持 Memcached 协议的兼容性,从而实现无缝替换?这意味着我们需要一个底层的复制状态机,而 Raft 协议正是为此设计的。
初步构想是,用 Raft 协议来保证一系列 K-V 操作指令(如 SET, DELETE)在集群所有节点上以相同的顺序执行。每个节点都维护一个内存中的 K-V 存储。客户端无论连接到哪个节点,其写操作都会被转发到 Raft 集群的 Leader 节点,Leader 成功提交日志后,状态机应用该指令,集群数据达成一致。
技术选型与架构核心
一致性核心:Apache Ratis
我们将不重复造轮子,直接采用 Apache Ratis 这一成熟的 Raft 协议 Java 实现。它提供了清晰的StateMachine接口,让我们能专注于业务逻辑,即 K-V 操作的实现。K-V 存储引擎:内存 HashMap
为了模拟 Memcached 的行为,我们将使用一个简单的java.util.concurrent.ConcurrentHashMap作为底层的 K-V 存储。在 Ratis 的StateMachine中对它进行操作。网络层:Netty
我们需要一个高性能的网络服务器来解析 Memcached 的文本协议。Netty 是不二之选,它提供了强大的异步事件驱动网络框架,能让我们精确控制协议的编解码过程。集群接入层:Caddy
应用端不应该感知 Raft 集群的 Leader 是谁。我们需要一个智能的代理。Caddy 不仅能代理 HTTP,其layer4app 同样可以处理 TCP 流量。我们将利用 Caddy 的健康检查能力,动态地将流量仅转发给 Raft 集群中的 Leader 节点。构建与部署:Jib
最终的服务节点将以容器化形式部署。Jib 可以直接从 Maven 或 Gradle 构建出高度优化的容器镜像,无需Dockerfile,也无需本地 Docker daemon,非常适合集成到 CI/CD 流程中。
架构流程设计
客户端请求的完整生命周期如下:
sequenceDiagram
participant Client as 客户端
participant Caddy as Caddy (TCP 代理)
participant Follower as Raft Follower 节点
participant Leader as Raft Leader 节点
Client->>Caddy: 发送 Memcached 命令 (e.g., SET a 0 0 5\r\nvalue)
Note over Caddy: 健康检查发现 Leader
Caddy->>Leader: 转发 TCP 流量
Leader->>Leader: 解析 Memcached 命令
Leader->>Leader: 封装为 Raft Log Entry
Leader->>Follower: Replicate Log
Follower-->>Leader: Acknowledge
Leader->>Leader: 达到多数派, Commit Log
Leader->>Leader: 应用到 StateMachine (更新 HashMap)
Leader-->>Client: 响应 (e.g., STORED\r\n)
%% 写操作到 Follower 的情况
Client->>Caddy: (Caddy 配置错误或 Leader 切换瞬间)
Caddy->>Follower: 转发 TCP 流量
Follower->>Follower: 解析 Memcached 命令
Note over Follower: 识别出自身非 Leader
Follower-->>Client: 返回错误或无响应 (取决于实现)
Note over Caddy: 健康检查失败,将 Follower 踢出上游
核心实现:Raft 状态机
KeyValueStateMachine.java 是整个系统的核心。它实现了 Ratis 的 StateMachine 接口,所有 Raft 日志的最终应用都在这里发生。
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public class KeyValueStateMachine extends BaseStateMachine {
// 实际的 K-V 存储
private final ConcurrentHashMap<String, String> store = new ConcurrentHashMap<>();
/**
* 应用已提交的 Raft 日志。这是唯一可以修改状态机状态的地方。
* 必须是确定性的,即给定相同的输入,总能产生相同的输出。
* @param trx Transaction context
* @return 包含操作结果的 Message
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
RaftProtos.LogEntryProto entry = trx.getLogEntry();
String command = entry.getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8);
// 解析指令,格式为 "COMMAND:KEY:VALUE" 或 "COMMAND:KEY"
String[] parts = command.split(":", 3);
String cmd = parts[0];
String key = parts.length > 1 ? parts[1] : null;
String response = "ERROR\r\n"; // 默认错误响应
try {
switch (cmd) {
case "SET":
if (parts.length == 3) {
String value = parts[2];
store.put(key, value);
response = "STORED\r\n";
} else {
response = "CLIENT_ERROR bad command line format for set\r\n";
}
break;
case "DELETE":
if (key != null && store.remove(key) != null) {
response = "DELETED\r\n";
} else {
response = "NOT_FOUND\r\n";
}
break;
default:
response = "CLIENT_ERROR unknown command\r\n";
break;
}
} catch (Exception e) {
// 在生产环境中,这里需要详尽的日志记录
response = "SERVER_ERROR " + e.getMessage() + "\r\n";
}
return CompletableFuture.completedFuture(Message.valueOf(response));
}
/**
* 处理只读查询请求。这不会通过 Raft 日志,可以直接访问状态机。
* 为了强一致性,可以转发给 Leader;为了高性能,可以接受轻微的延迟,直接在 Follower 上读。
* 我们这里实现一个简单的本地读取。
* @param request The read request
* @return 包含查询结果的 CompletableFuture
*/
@Override
public CompletableFuture<Message> query(Message request) {
String command = request.getContent().toString(StandardCharsets.UTF_8);
String[] parts = command.split(":", 2);
String cmd = parts[0];
String key = parts.length > 1 ? parts[1] : null;
if ("GET".equals(cmd) && key != null) {
String value = store.get(key);
if (value != null) {
// 格式: VALUE <key> <flags> <bytes>\r\n<data>\r\nEND\r\n
String header = String.format("VALUE %s 0 %d\r\n", key, value.getBytes(StandardCharsets.UTF_8).length);
return CompletableFuture.completedFuture(Message.valueOf(header + value + "\r\nEND\r\n"));
} else {
return CompletableFuture.completedFuture(Message.valueOf("END\r\n"));
}
}
return CompletableFuture.completedFuture(Message.valueOf("CLIENT_ERROR bad command line format for get\r\n"));
}
}
这里的关键在于 applyTransaction 方法。所有写操作(SET, DELETE)都必须封装成 Raft 日志,通过该方法原子地应用到 ConcurrentHashMap 中。而 query 方法用于处理读请求(GET),它可以绕过 Raft 日志,直接读取本地状态,但这可能导致读取到旧数据。在真实项目中,需要根据业务对一致性的要求来决定如何处理读请求,比如提供不同的一致性级别选项(线性读、follower 读等)。
网络层:Memcached 协议解析
使用 Netty 实现协议服务器,核心是 MemcachedProtocolHandler。它负责将 TCP 字节流解码为命令,然后将写命令提交给 Ratis client,将读命令直接查询状态机。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import java.nio.charset.StandardCharsets;
public class MemcachedProtocolHandler extends SimpleChannelInboundHandler<String> {
private final RaftClient raftClient;
private final KeyValueStateMachine stateMachine; // 用于本地查询
// 状态机,用于处理多行命令,如 "set"
private enum State {
COMMAND,
DATA
}
private State currentState = State.COMMAND;
private String pendingCommand;
private int dataLength;
public MemcachedProtocolHandler(RaftClient raftClient, KeyValueStateMachine stateMachine) {
this.raftClient = raftClient;
this.stateMachine = stateMachine;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if (currentState == State.DATA) {
handleData(ctx, msg);
return;
}
String[] parts = msg.trim().split("\\s+");
String command = parts[0].toUpperCase();
switch (command) {
case "SET":
handleSetCommand(ctx, parts);
break;
case "GET":
handleGetCommand(ctx, parts);
break;
case "DELETE":
handleDeleteCommand(ctx, parts);
break;
case "QUIT":
ctx.close();
break;
default:
ctx.writeAndFlush("CLIENT_ERROR unsupported command\r\n");
}
}
private void handleSetCommand(ChannelHandlerContext ctx, String[] parts) {
if (parts.length != 5) {
ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'set'\r\n");
return;
}
try {
this.dataLength = Integer.parseInt(parts[4]);
this.pendingCommand = String.format("SET:%s", parts[1]); // Key
this.currentState = State.DATA;
} catch (NumberFormatException e) {
ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'set'\r\n");
}
}
private void handleData(ChannelHandlerContext ctx, String data) {
// 简单实现,未严格检查字节长度,生产代码需要更健壮
if (pendingCommand != null) {
String raftMessage = pendingCommand + ":" + data.trim();
try {
// 写操作必须通过 Raft Client 提交
RaftClientReply reply = raftClient.io().send(Message.valueOf(raftMessage));
if (reply.isSuccess()) {
ctx.writeAndFlush(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
} else {
// 这里需要处理 Raft 提交失败的情况,例如 NOT_LEADER 异常
ctx.writeAndFlush("SERVER_ERROR " + reply.getException().getMessage() + "\r\n");
}
} catch (Exception e) {
ctx.writeAndFlush("SERVER_ERROR raft submission failed\r\n");
}
}
resetState();
}
private void handleGetCommand(ChannelHandlerContext ctx, String[] parts) {
// GET <key>
if (parts.length != 2) {
ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'get'\r\n");
return;
}
String key = parts[1];
String raftQuery = "GET:" + key;
// 读操作直接查询本地状态机,以获得更高性能
stateMachine.query(Message.valueOf(raftQuery))
.thenAccept(reply -> ctx.writeAndFlush(reply.getContent().toString(StandardCharsets.UTF_8)));
}
private void handleDeleteCommand(ChannelHandlerContext ctx, String[] parts) {
// DELETE <key>
if (parts.length != 2) {
ctx.writeAndFlush("CLIENT_ERROR bad command line format for 'delete'\r\n");
return;
}
String key = parts[1];
String raftMessage = "DELETE:" + key;
try {
RaftClientReply reply = raftClient.io().send(Message.valueOf(raftMessage));
ctx.writeAndFlush(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
} catch (Exception e) {
ctx.writeAndFlush("SERVER_ERROR raft submission failed\r\n");
}
}
private void resetState() {
this.currentState = State.COMMAND;
this.pendingCommand = null;
this.dataLength = 0;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 生产级代码应记录异常
cause.printStackTrace();
ctx.close();
}
}
这个处理器是一个简化的实现,它展示了如何区分读写命令,并将它们分别路由到 Raft 提交路径和本地查询路径。一个常见的坑在于对多行命令(如set)的处理,需要一个简单的状态机来管理。
集群接入:Caddy 作为 Leader 感知代理
手动将客户端指向 Leader 节点是不可行的。我们需要一个代理,它能自动发现 Leader 并将流量只导向它。Caddy 的 layer4 app 和 http 健康检查可以巧妙地实现这一点。虽然 Memcached 是 TCP 协议,但我们可以让 Ratis 节点额外暴露一个 HTTP 端点,仅用于健康检查。当节点是 Leader 时,该端点返回 200;否则返回 503。
Caddyfile 配置如下:
# Caddyfile
{
# 启用 layer4 app,用于处理原始 TCP/UDP 流量
"apps": {
"layer4": {
"servers": {
"memcached_cluster": {
"listen": [":11211"], # 对外暴露的 Memcached 端口
"routes": [
{
"match": [{}],
"handle": [
{
"handler": "proxy",
# 负载均衡策略:只选择第一个健康的上游
"load_balancing": {
"policy": "first"
},
"upstreams": [
# 定义 Raft 节点池
{"dial": "raft-node-1:11211"},
{"dial": "raft-node-2:11211"},
{"dial": "raft-node-3:11211"}
],
# 核心:主动健康检查
"health_checks": {
"active": {
"path": "/health", # Ratis 节点暴露的 HTTP 健康检查端点
"port": "8080", # HTTP 端口
"interval": "5s",
"timeout": "2s"
}
}
}
]
}
]
}
}
}
}
}
这个配置的精妙之处在于:
-
layer4处理 TCP 流量,保持了 Memcached 协议的原始性。 -
health_checks.active会定期请求每个上游节点的/health接口。 - 我们编码 Ratis 节点,只有 Leader 节点的
/health才返回HTTP 200。 -
"policy": "first"确保了 Caddy 总是将流量发送到上游列表中第一个健康(即 Leader)的节点。当 Leader 宕机或降级为 Follower,健康检查会失败,Caddy 会自动切换到下一个变为健康的节点(新选举出的 Leader)。
容器化构建:使用 Jib
最后一步是将我们的 Java 应用打包成容器镜像。相比传统的 Dockerfile,Jib 对 Java 应用的优化更胜一筹。它能自动将应用分为多个层(依赖、资源、类文件),当代码变更时,只需重新构建最上层的类文件层,极大地提升了构建速度。
在 pom.xml 中配置 Jib 插件:
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>3.3.2</version>
<configuration>
<from>
<!-- 使用一个精简的基础镜像 -->
<image>eclipse-temurin:17-jre-focal</image>
</from>
<to>
<!-- 目标镜像名称 -->
<image>my-registry/raft-memcached-node:1.0.0</image>
</to>
<container>
<!-- 暴露 Memcached 协议端口和健康检查端口 -->
<ports>
<port>11211</port>
<port>8080</port>
</ports>
<mainClass>com.mycompany.raft.Main</mainClass>
</container>
</configuration>
</plugin>
只需运行 mvn compile jib:build,Jib 就会在无需 Docker daemon 的情况下完成镜像的构建和推送。这对于在没有 Docker-in-Docker 权限的 CI 环境中尤其有用。
方案的局限性与未来路径
这个实现验证了核心架构的可行性,但在生产环境中还存在诸多不足。
首先,GET 请求直接读取本地状态机,这是一种性能与一致性的权衡。它可能读取到尚未应用最新日志的旧数据。要实现线性一致性读,GET 请求也必须通过 Raft 协议,或者使用 Leader 租约(Lease Read)等优化机制,但这会显著增加读请求的延迟。
其次,当前的实现没有处理 Raft 集群的成员变更。在生产系统中,节点的增删是常态,需要一套安全的流程来动态调整 Raft Group 配置,Ratis 本身支持此功能,但需要在应用层进行集成。
再者,状态机快照(Snapshot)机制缺失。随着运行时间增长,Raft 日志会无限膨胀,导致新节点加入或节点重启后的恢复时间过长。定期对内存中的 HashMap 进行快照并截断旧日志是必须实现的功能。
最后,Memcached 协议支持 cas、incr/decr 等原子操作,这些操作需要更复杂的 applyTransaction 逻辑来保证原子性,简单的 put 和 remove 无法满足。这指明了未来功能迭代的方向,即在 Raft 的一致性保证下,实现更丰富的原子命令。