基于 Raft 协议从零构建一个支持日志加密的 Docker 化分布式密钥存储节点


项目初期,我们需要一个轻量、可靠的方式来同步几个核心服务的关键配置,主要是动态的特性开关和一些敏感凭证。评估了 etcd、Consul 甚至 Zookeeper,但它们的整体架构对于我们这个需求来说过于庞大。引入任何一个都意味着要带入一整套复杂的运维体系。当时团队面临的痛点很明确:我们需要的是一个“螺丝刀”,而不是一个“瑞士军机床”。于是,一个想法浮现出来:我们能否自己动手,用 Go 实现一个极简的、基于 Raft 的分布式 KV 存储,专门用于这个场景?并且,考虑到存储的是敏感信息,Raft 日志本身必须是加密的。

这个想法的核心挑战在于:

  1. Raft 协议的正确实现:Raft 听起来简单,但工程实现中的并发、定时器、网络分区等问题处处是陷阱。
  2. 安全的密钥管理:如何对 Raft Log 进行加密?加密密钥本身如何安全地分发和管理,避免其成为新的单点故障?
  3. 可部署与可测试:如何方便地在本地模拟出一个多节点的集群环境,进行功能和容错性测试?Docker 看起来是天然的解决方案。

最终的目标是构建一个单一的 Go 二进制文件,它可以通过不同的环境变量或配置文件启动为集群中的一个节点,并通过 Docker Compose 快速拉起一个三节点的容错集群。

初步构想:状态机与核心数据结构

Raft 的本质是一个用于管理复制日志(Replicated Log)的一致性算法。每个节点都是一个状态机,在任何时候都处于 FollowerCandidateLeader 三种状态之一。

stateDiagram-v2
    [*] --> Follower
    Follower --> Candidate: election timeout
    Candidate --> Candidate: election timeout
    Candidate --> Follower: discovers leader or new term
    Candidate --> Leader: receives votes from majority
    Leader --> Follower: discovers server with higher term

首先,我们需要定义节点的核心数据结构。在真实项目中,一个常见的错误是把所有状态都揉在一个巨大的 struct 里。更好的做法是清晰地分离状态、持久化数据和节点间的通信。

// node.go

package main

import (
	"log"
	"math/rand"
	"net/http"
	"sync"
	"time"
)

// NodeState defines the role of a node in the Raft cluster.
type NodeState int

const (
	Follower NodeState = iota
	Candidate
	Leader
)

// LogEntry represents a command to be applied to the state machine.
// In a real system, this would be more complex, maybe an interface{}.
type LogEntry struct {
	Term    int
	Command interface{} // The actual command, e.g., "SET key value"
	// For our secure store, this will be encrypted.
	EncryptedCommand []byte
}

// RaftNode is the core structure for a single node in the Raft cluster.
type RaftNode struct {
	mu sync.Mutex

	// Node's identity and cluster configuration
	id      string   // Unique ID for this node
	peers   []string // Addresses of other nodes in the cluster, including self
	server  *http.Server // HTTP server for RPCs

	// Persistent state on all servers (Updated on stable storage before responding to RPCs)
	currentTerm int
	votedFor    string
	log         []LogEntry

	// Volatile state on all servers
	commitIndex int
	lastApplied int

	// Volatile state on leaders (Reinitialized after election)
	nextIndex  map[string]int
	matchIndex map[string]int

	// Node's current state
	state NodeState

	// Timers
	electionTimer  *time.Timer
	heartbeatTimer *time.Timer

	// Channels for communication and control
	shutdownCh chan struct{}
	applyCh    chan LogEntry // Channel to send committed entries to the state machine

	// The application's state machine (a simple K/V store)
	kvStore map[string]string

	// Cryptography components
	encryptionKey []byte // 32 bytes for AES-256
}

func NewRaftNode(id string, peers []string, key []byte) *RaftNode {
	node := &RaftNode{
		id:            id,
		peers:         peers,
		currentTerm:   0,
		votedFor:      "",
		log:           make([]LogEntry, 1), // Dummy entry at index 0
		commitIndex:   0,
		lastApplied:   0,
		state:         Follower,
		shutdownCh:    make(chan struct{}),
		applyCh:       make(chan LogEntry, 128),
		kvStore:       make(map[string]string),
		encryptionKey: key,
	}
	
    // Initialization of nextIndex and matchIndex would happen upon becoming a leader.
	
	log.Printf("[Node %s] Initialized in Follower state.", node.id)
	return node
}

// electionTimeout generates a random duration for the election timer.
// This is critical to prevent split votes where multiple candidates start elections simultaneously.
func (n *RaftNode) electionTimeout() time.Duration {
	// A common practice is to use a base timeout + a random interval.
	// e.g., 150-300ms range.
	return time.Duration(150+rand.Intn(150)) * time.Millisecond
}

func (n *RaftNode) resetElectionTimer() {
	if n.electionTimer != nil {
		n.electionTimer.Stop()
	}
	n.electionTimer = time.NewTimer(n.electionTimeout())
	log.Printf("[Node %s] Election timer reset.", n.id)
}

这里的 electionTimeout 使用随机化至关重要。在生产环境中,如果所有节点的超时时间固定,它们很可能在网络抖动后同时超时并成为 Candidate,导致选举失败,然后再次同时超时,陷入无限的“活锁”循环。

核心逻辑实现:选举与心跳

Raft 的生命周期由选举和日志复制驱动。我们从节点的run循环开始,这是整个状态机的心脏。

// node_run.go

func (n *RaftNode) Run() {
	n.resetElectionTimer()

	for {
		select {
		case <-n.shutdownCh:
			log.Printf("[Node %s] Shutting down.", n.id)
			return

		case <-n.electionTimer.C:
			// Election timeout fired, transition to Candidate state and start election.
			n.mu.Lock()
			n.becomeCandidate()
			n.mu.Unlock()

		case <-n.heartbeatTimer.C:
			// This case is only active for Leaders.
			// The timer fires, leader needs to send AppendEntries (heartbeats) to followers.
			n.mu.Lock()
			if n.state == Leader {
				n.sendHeartbeats()
				// Reset the timer for the next heartbeat interval.
				n.heartbeatTimer.Reset(50 * time.Millisecond)
			}
			n.mu.Unlock()
		}
	}
}

func (n *RaftNode) becomeCandidate() {
	// This function must be called with the lock held.
	log.Printf("[Node %s] Election timeout, becoming Candidate for term %d.", n.id, n.currentTerm+1)
	n.state = Candidate
	n.currentTerm++
	n.votedFor = n.id

	// Reset timer for the new election cycle.
	n.resetElectionTimer()

	// Send RequestVote RPCs to all other peers.
	votesNeeded := (len(n.peers) / 2) + 1
	votesGranted := 1 // Vote for self.

	for _, peer := range n.peers {
		if peer == n.id {
			continue
		}

		go func(peerAddr string) {
			// In a real implementation, you'd use a proper RPC client.
			// For simplicity, we'll outline the logic.
			// args := &RequestVoteArgs{...}
			// reply := &RequestVoteReply{}
			// if call(peerAddr, "RaftNode.RequestVote", args, reply) {
			//    ... process reply ...
			// }
            
            // Simplified logic for vote counting:
            // if reply.VoteGranted {
			//    atomic.AddInt32(&votesGranted, 1)
			//    if atomic.LoadInt32(&votesGranted) >= votesNeeded {
            //        n.becomeLeader()
            //    }
            // } else if reply.Term > n.currentTerm {
            //    n.becomeFollower(reply.Term)
            // }

		}(peer)
	}
}

func (n *RaftNode) becomeLeader() {
	// This function should be called after winning an election.
	n.mu.Lock()
	defer n.mu.Unlock()

    // A candidate only becomes a leader if it's still a candidate.
    // It might have received an AppendEntries from a new leader in the meantime.
	if n.state != Candidate {
		return
	}

	log.Printf("[Node %s] Won election, becoming Leader for term %d.", n.id, n.currentTerm)
	n.state = Leader

	// Initialize leader-specific volatile state.
	n.nextIndex = make(map[string]int)
	n.matchIndex = make(map[string]int)
	lastLogIndex := len(n.log) - 1
	for _, peer := range n.peers {
		n.nextIndex[peer] = lastLogIndex + 1
		n.matchIndex[peer] = 0
	}
	
	// Stop election timer and start heartbeat timer.
	if n.electionTimer != nil {
		n.electionTimer.Stop()
	}
	n.heartbeatTimer = time.NewTimer(50 * time.Millisecond) // Start sending heartbeats immediately
	
	// Send initial empty AppendEntries to assert authority.
	n.sendHeartbeats()
}


func (n *RaftNode) sendHeartbeats() {
    // A simplified placeholder for sending AppendEntries RPCs to all followers.
	// This function must be called with the lock held.
	log.Printf("[Node %s] Sending heartbeats to peers.", n.id)
	for _, peer := range n.peers {
		if peer == n.id {
			continue
		}
		go func(peerAddr string) {
			// Construct AppendEntriesArgs, potentially with no entries for a heartbeat.
			// Send the RPC.
			// Handle the reply: if follower's term is higher, convert to follower.
			// Otherwise, update nextIndex and matchIndex on success.
		}(peer)
	}
}

func (n *RaftNode) becomeFollower(term int) {
	// This function must be called with the lock held.
	log.Printf("[Node %s] Discovered higher term %d, becoming Follower.", n.id, term)
	n.state = Follower
	n.currentTerm = term
	n.votedFor = "" // Clear vote for the new term.
	
	// If we were a leader, stop sending heartbeats.
	if n.heartbeatTimer != nil {
		n.heartbeatTimer.Stop()
	}
	// All nodes reset their election timer when becoming a follower.
	n.resetElectionTimer()
}

日志加密与状态机应用

现在进入核心的 密钥管理 部分。我们的 Raft Log 条目需要加密。一个常见的错误是在 RPC 层面做加密,但这治标不治本,因为日志一旦落盘就是明文。正确的做法是在 Leader 节点创建 LogEntry 时就对其 Command 字段进行加密,只保留加密后的 EncryptedCommand 字节流。

我们将使用 AES-256-GCM,它是一种带有关联数据的认证加密模式,能同时提供保密性、完整性和真实性。

// crypto.go

package main

import (
	"crypto/aes"
	"crypto/cipher"
	"crypto/rand"
	"encoding/json"
	"errors"
	"io"
)

// In a production system, the command would be more structured.
// For this example, we'll use a simple struct.
type SetCommand struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}


func (n *RaftNode) encryptCommand(cmd interface{}) ([]byte, error) {
	plaintext, err := json.Marshal(cmd)
	if err != nil {
		return nil, err
	}

	block, err := aes.NewCipher(n.encryptionKey)
	if err != nil {
		return nil, err
	}

	gcm, err := cipher.NewGCM(block)
	if err != nil {
		return nil, err
	}

	nonce := make([]byte, gcm.NonceSize())
	if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
		return nil, err
	}

	// Encrypt the data. The nonce is prepended to the ciphertext.
	ciphertext := gcm.Seal(nonce, nonce, plaintext, nil)
	return ciphertext, nil
}


func (n *RaftNode) decryptCommand(ciphertext []byte) (*SetCommand, error) {
	block, err := aes.NewCipher(n.encryptionKey)
	if err != nil {
		return nil, err
	}

	gcm, err := cipher.NewGCM(block)
	if err != nil {
		return nil, err
	}

	nonceSize := gcm.NonceSize()
	if len(ciphertext) < nonceSize {
		return nil, errors.New("ciphertext too short")
	}

	nonce, encryptedMessage := ciphertext[:nonceSize], ciphertext[nonceSize:]
	plaintext, err := gcm.Open(nil, nonce, encryptedMessage, nil)
	if err != nil {
		// This is a critical error, it could mean data corruption or a key mismatch.
		return nil, err
	}

	var cmd SetCommand
	if err := json.Unmarshal(plaintext, &cmd); err != nil {
		return nil, err
	}

	return &cmd, nil
}

Leader 接收到客户端请求后,会创建一条新的 LogEntry,对其进行加密,然后广播给 Followers。当日志被多数节点确认(commit)后,所有节点(包括 Leader)都会通过 applyCh 通道将该条目发送给自己的状态机。状态机在应用该条目之前,必须先对其进行解密。

// node_apply.go

// This is the loop that applies committed log entries to the KV store state machine.
func (n *RaftNode) applyLoop() {
	for entry := range n.applyCh {
		// The entry here is already committed by the Raft algorithm.
		// We need to decrypt its command before applying.
		cmd, err := n.decryptCommand(entry.EncryptedCommand)
		if err != nil {
			// This is a fatal error. It implies a bug or data corruption.
			// In a real system, the node should probably panic or shut down for investigation.
			log.Fatalf("[Node %s] FATAL: Failed to decrypt log entry at index %d: %v", n.id, n.lastApplied+1, err)
			continue
		}

		n.mu.Lock()
		log.Printf("[Node %s] Applying command: SET %s = %s", n.id, cmd.Key, cmd.Value)
		n.kvStore[cmd.Key] = cmd.Value
		n.lastApplied++
		n.mu.Unlock()
	}
}

// Client interaction endpoint (example)
func (n *RaftNode) handleSetRequest(w http.ResponseWriter, r *http.Request) {
	n.mu.Lock()
	if n.state != Leader {
		// Redirect to leader or return an error.
		// A robust implementation would cache the leader's address.
		http.Error(w, "Not a leader", http.StatusBadGateway)
		n.mu.Unlock()
		return
	}

	// For simplicity, reading from query params. Production would use POST body.
	key := r.URL.Query().Get("key")
	value := r.URL.Query().Get("value")
	if key == "" {
		http.Error(w, "Key is required", http.StatusBadRequest)
		n.mu.Unlock()
		return
	}
	
	cmd := SetCommand{Key: key, Value: value}
	encryptedCmd, err := n.encryptCommand(cmd)
	if err != nil {
		http.Error(w, "Internal encryption error", http.StatusInternalServerError)
		n.mu.Unlock()
		return
	}

	entry := LogEntry{
		Term:             n.currentTerm,
		EncryptedCommand: encryptedCmd,
	}
	n.log = append(n.log, entry)
	
    // The lock is released here to allow the background replication to proceed
    // while the client waits for the commit. A more advanced implementation
    // would use a channel to wait for the entry to be applied.
	n.mu.Unlock()

	// ... [Logic to replicate the entry and wait for commit] ...
	
	w.WriteHeader(http.StatusOK)
	w.Write([]byte("OK"))
}

这里的坑在于,密钥 encryptionKey 如何分发。在一个真实项目中,绝不能硬编码。最务实的做法是通过环境变量在容器启动时注入。这与 Docker 的实践非常契合。更高级的方案会使用 HashiCorp Vault 或云厂商的 KMS 服务,在节点启动时去动态拉取密钥。但对于我们这个自包含的系统,环境变量是一种可接受的、简单的起点。

Docker化与集群部署

单个二进制文件无法验证分布式一致性。我们需要一个简单的方法来启动一个集群。Docker 和 Docker Compose 是完美工具。

首先是 Dockerfile

# Dockerfile

# Use the official Golang image to create a build artifact.
FROM golang:1.21-alpine as builder

WORKDIR /app

# Copy go mod and sum files
COPY go.mod go.sum ./
# Download all dependencies.
RUN go mod download

# Copy the source code
COPY . .

# Build the Go app, creating a static binary.
# CGO_ENABLED=0 is important for creating a truly portable static binary.
# -ldflags "-s -w" strips debugging information, reducing binary size.
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /raft-node .


# Use a minimal alpine image for a small footprint.
FROM alpine:latest

# Copy the static binary from the builder stage.
COPY --from=builder /raft-node /raft-node

# This is the command that will be run when the container starts.
ENTRYPOINT ["/raft-node"]

接着是 docker-compose.yml,这是将所有部分粘合在一起的关键。它定义了三个服务,每个都运行我们的 Go 程序,并位于同一个 Docker 网络中,使它们可以通过服务名(node1, node2, node3)相互通信。

# docker-compose.yml

version: '3.8'

services:
  node1:
    build: .
    container_name: node1
    ports:
      - "8081:8080"
    environment:
      - NODE_ID=node1
      - PEERS=node1:8080,node2:8080,node3:8080
      - ENCRYPTION_KEY=supersecret32bytestringforexample # Must be 32 bytes
    command: ["-id", "node1", "-peers", "node1:8080,node2:8080,node3:8080"]
    networks:
      - raft-net

  node2:
    build: .
    container_name: node2
    ports:
      - "8082:8080"
    environment:
      - NODE_ID=node2
      - PEERS=node1:8080,node2:8080,node3:8080
      - ENCRYPTION_KEY=supersecret32bytestringforexample
    command: ["-id", "node2", "-peers", "node1:8080,node2:8080,node3:8080"]
    networks:
      - raft-net

  node3:
    build: .
    container_name: node3
    ports:
      - "8083:8080"
    environment:
      - NODE_ID=node3
      - PEERS=node1:8080,node2:8080,node3:8080
      - ENCRYPTION_KEY=supersecret32bytestringforexample
    command: ["-id", "node3", "-peers", "node1:8080,node2:8080,node3:8080"]
    networks:
      - raft-net

networks:
  raft-net:

在主程序 main.go 中,我们需要解析这些命令行参数或环境变量:

// main.go

package main

import (
	"flag"
	"log"
	"strings"
)

func main() {
	id := flag.String("id", "", "Node ID")
	peersStr := flag.String("peers", "", "Comma-separated list of peer addresses")
	// The key should really come from a more secure source in production.
	key := flag.String("key", "default32bytestringforaes256!!", "32-byte AES encryption key")
	flag.Parse()

	if *id == "" || *peersStr == "" {
		log.Fatal("Both -id and -peers arguments are required.")
	}

	if len(*key) != 32 {
		log.Fatal("-key must be exactly 32 bytes long for AES-256.")
	}

	peers := strings.Split(*peersStr, ",")

	node := NewRaftNode(*id, peers, []byte(*key))
	
	// In a complete implementation, Start an RPC server here (e.g., net/rpc or gRPC)
	// and the HTTP server for client interactions.

	// Start the main loop in a goroutine.
	go node.Run()
    go node.applyLoop()
	
	// ... code to start HTTP/RPC server ...
	
	// Block forever. A real app would handle graceful shutdown.
	select {}
}

通过 docker-compose up,我们就能一键启动一个三节点集群。我们可以通过 curl "http://localhost:8081/set?key=config1&value=alpha" 向一个节点写入数据,然后通过 docker-compose stop node1 模拟 Leader 宕机。几秒钟后,集群会选举出新的 Leader (例如 node2,现在监听在 localhost:8082),我们可以通过 curl "http://localhost:8082/get?key=config1"curl "http://localhost:8083/get?key=config1" 来验证数据仍然存在并且可读,证明了系统的容错性。

遗留问题与适用边界

这个从零构建的系统虽然验证了核心概念,但在生产环境中应用前还有很长的路要走。首先,日志持久化没有实现,节点重启后会丢失所有状态。需要引入像 BoltDB 或 RocksDB 这样的嵌入式 KV 存储来持久化 Raft 日志和 currentTerm/votedFor。其次,日志压缩(Log Compaction)和快照(Snapshotting)是必不可少的,否则日志会无限增长,最终耗尽磁盘并拖慢新节点的启动速度。

成员变更(Membership Changes)功能也未实现。当前集群的节点是静态配置的,无法动态地增删节点。这需要实现 Raft 论文中描述的联合共识(Joint Consensus)算法,这是工程上最复杂的部分之一。

此外,客户端交互逻辑过于简单。一个健壮的客户端应该能自动处理 Leader 转发,并在写入失败时进行重试。

因此,这个项目的价值不在于它能直接替代 etcd,而在于它清晰地揭示了构建一个分布式一致性系统所涉及的核心权衡:算法的理论正确性、工程实现的细节、并发控制的复杂性、安全性的集成,以及部署运维的便利性。它证明了将 分布式一致性 (Raft)、密钥管理 (Log Encryption) 和 容器化部署 (Docker) 这三个概念结合,能够创造出一个可控、安全且轻量级的定制化解决方案。它的适用边界是那些对外部依赖敏感、需要强一致性但规模可控的小型场景。


  目录