在DigitalOcean上用Zig和ClickHouse构建结构化日志摄取与查询系统


日志系统的成本在服务规模化后会急剧膨胀。市面上的SaaS解决方案虽然功能强大,但其基于摄取量和保留时间的定价模型,对于日志密集型应用而言是一笔巨大的开销。当我们的一个实时分析服务的日志量达到每日数十亿条时,每月数千美元的账单迫使我们寻找一个自建、高性能且成本可控的替代方案。目标很明确:一个能够处理高并发写入、提供快速查询能力、并且部署和维护成本极低的日志平台。

这套系统的核心是一个轻量级的日志摄取服务。它必须能够以极低的资源消耗处理海量的HTTP请求,对传入的JSON日志进行最少的处理,然后高效地批量写入ClickHouse。Go和Rust是常见的选项,但我们最终选择了Zig。原因在于其对内存的精细控制能力、无隐藏的控制流、以及极其简单的交叉编译,这使得构建一个无GC、内存占用稳定、单文件部署的静态二进制文件成为可能,完美契合我们在DigitalOcean小型Droplet上运行的目标。

我们的技术栈组合如下:

  • 日志摄取与查询代理: Zig语言编写的HTTP服务。
  • 数据存储与分析: DigitalOcean Managed ClickHouse。
  • 基础设施: DigitalOcean Droplet (用于Zig服务) + Spaces (用于静态UI)。
  • 前端查询界面: 一个极简的Headless UI,通过API与Zig服务通信。

架构概览

整个系统的数据流非常直接,分为写入路径和查询路径。

graph TD
    subgraph "写入路径 (High-Throughput Ingestion)"
        A[Applications] -- JSON over HTTP --> B(Zig Ingestion Service on DigitalOcean Droplet);
        B -- Native TCP Protocol Batch Insert --> C(Managed ClickHouse Cluster);
    end

    subgraph "查询路径 (Ad-hoc Querying)"
        D[Developer's Browser] --> E{Headless UI on DigitalOcean Spaces};
        E -- Query API Request --> F(Zig Query Proxy on DigitalOcean Droplet);
        F -- HTTP GET Request --> C;
        C -- Query Result --> F;
        F -- JSON Response --> E;
    end

写入路径是性能关键。应用通过HTTP POST将JSON格式的日志发送到Zig服务。服务在内存中累积日志,达到一定阈值(如10000条或1秒)后,通过ClickHouse的原生TCP协议将数据块高效地批量写入。原生协议避免了HTTP的开销和JSON解析的成本,是实现高吞吐量的关键。

查询路径则注重功能性。一个基于Headless UI组件库(如Radix UI或Headless UI by Tailwind Labs)构建的静态单页应用,允许工程师编写SQL风格的查询。请求被发送到Zig服务的另一个端点,该端点充当查询代理,将请求转发给ClickHouse的HTTP接口,并将结果以JSON格式返回给前端。

ClickHouse Schema设计

在ClickHouse中,一个好的表结构设计是性能的基石。对于日志数据,分区和排序是两个最重要的优化点。

一个常见的错误是使用过于精细的分区,比如按小时分区。这会导致大量的分区目录,对元数据管理和查询性能造成负面影响。对于大多数场景,按天分区是最佳实践。

排序键(ORDER BY)决定了数据在物理上如何存储和索引。查询时,如果WHERE子句能够利用排序键,ClickHouse可以极大地减少需要扫描的数据量。我们将项目ID和时间戳作为主排序键,因为绝大多数查询都会按项目和时间范围进行过滤。

-- 在DigitalOcean Managed ClickHouse中执行
CREATE TABLE default.logs (
    -- 时间戳,精度到毫秒
    `timestamp` DateTime64(3, 'UTC'),

    -- 服务/项目标识符,用于数据隔离和查询过滤
    `project_id` LowCardinality(String),

    -- 日志级别,同样适合低基数优化
    `level` LowCardinality(String) Enum8('DEBUG'=0, 'INFO'=1, 'WARN'=2, 'ERROR'=3, 'FATAL'=4),

    -- 日志主体消息
    `message` String,

    -- 动态的元数据字段,存储JSON对象
    `metadata` String,

    -- 索引以加速特定字段的查询
    INDEX idx_project_id project_id TYPE set(0) GRANULARITY 1,
    INDEX idx_message message TYPE tokenbf_v1(8192, 3, 0) GRANULARITY 1
)
ENGINE = MergeTree()
-- 按天分区,保留最近30天的数据
PARTITION BY toYYYYMMDD(timestamp)
-- 主排序键,决定数据物理存储顺序和主键索引
ORDER BY (project_id, timestamp)
-- 数据保留策略
TTL toDate(timestamp) + INTERVAL 30 DAY
SETTINGS index_granularity = 8192;

这里的metadata字段我们选择用String类型存储原始JSON。虽然ClickHouse支持JSON对象类型,但在写入密集型场景下,将其作为字符串处理可以简化客户端逻辑并提高写入吞吐量。查询时可以使用JSONExtract*系列函数进行解析。

Zig日志摄取服务的实现

这是整个系统的核心。我们将用Zig的标准库构建一个异步HTTP服务器,并自己实现一个简化的ClickHouse原生TCP协议客户端。

项目结构:

.
├── build.zig         // Zig构建脚本
└── src
    ├── main.zig      // 程序入口和HTTP服务器
    ├── clickhouse.zig// ClickHouse原生客户端
    └── types.zig     // 数据结构定义

build.zig
Zig的构建系统非常强大,无需外部构建工具。我们定义一个可执行文件并链接标准库。

const std = @import("std");

pub fn build(b: *std.Build) void {
    const target = b.standardTargetOptions(.{});
    const optimize = b.standardOptimizeOption(.{});

    const exe = b.addExecutable(.{
        .name = "log-ingester",
        .root_source_file = .{ .path = "src/main.zig" },
        .target = target,
        .optimize = optimize,
    });

    b.installArtifact(exe);

    const run_cmd = b.addRunArtifact(exe);
    run_cmd.step.dependOn(b.getInstallStep());

    if (b.args) |args| {
        run_cmd.addArgs(args);
    }

    const run_step = b.step("run", "Run the application");
    run_step.dependOn(&run_cmd.step);
}

src/types.zig
定义与ClickHouse表结构对应的日志条目结构体。

const std = @import("std");

pub const LogLevel = enum(u8) {
    DEBUG = 0,
    INFO = 1,
    WARN = 2,
    ERROR = 3,
    FATAL = 4,
};

pub const LogEntry = struct {
    timestamp: u64, // Unix timestamp in milliseconds
    project_id: []const u8,
    level: LogLevel,
    message: []const u8,
    metadata: []const u8,
};

src/clickhouse.zig
这是技术挑战最大的部分。我们需要实现一个客户端来与ClickHouse原生TCP端口(通常是9000)通信。协议比较复杂,但对于写入操作,我们只需要实现其核心部分:握手、发送查询、发送数据块。

```zig
// src/clickhouse.zig
const std = @import(“std”);
const types = @import(“types.zig”);
const LogEntry = types.LogEntry;

// ClickHouse wire protocol constants
const CLIENT_HELLO: u8 = 0;
const CLIENT_QUERY: u8 = 1;
const CLIENT_DATA: u8 = 2;
const SERVER_HELLO: u8 = 0;
const SERVER_DATA: u8 = 1;
const SERVER_EXCEPTION: u8 = 2;
const SERVER_END_OF_STREAM: u8 = 4;

const REVISION = 54455; // Use a recent revision number

// A simplified client for batch inserting data
pub const Client = struct {
conn: std.net.Stream,
writer: std.io.Writer,
reader: std.io.Reader,
allocator: std.mem.Allocator,

pub fn connect(allocator: std.mem.Allocator, host: []const u8, port: u16) !Client {
    const address = try std.net.Address.resolveIp(host, port);
    var stream = try std.net.tcpConnectToAddress(address);

    return Client{
        .conn = stream,
        .writer = stream.writer(),
        .reader = stream.reader(),
        .allocator = allocator,
    };
}

pub fn deinit(self: *Client) void {
    self.conn.close();
}

// Handshake with the ClickHouse server
fn handshake(self: *Client, db: []const u8, user: []const u8, password: []const u8) !void {
    var bw = std.io.bufferedWriter(self.writer);
    const writer = bw.writer();

    // Client Hello
    try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_HELLO, &[_]u8{0} ** 10));
    try writeString(writer, "log-ingester-zig");
    try writer.writeAll(std.zig.c_translation.Uleb128.to(1, &[_]u8{0} ** 10)); // Major
    try writer.writeAll(std.zig.c_translation.Uleb128.to(0, &[_]u8{0} ** 10)); // Minor
    try writer.writeAll(std.zig.c_translation.Uleb128.to(REVISION, &[_]u8{0} ** 10));
    try writeString(writer, db);
    try writeString(writer, user);
    try writeString(writer, password);
    try bw.flush();

    // Server Hello
    var br = std.io.bufferedReader(self.reader);
    const reader = br.reader();
    const packet_type = try readUleb128(reader);
    if (packet_type != SERVER_HELLO) {
        std.log.err("expected SERVER_HELLO, got {d}", .{packet_type});
        return error.HandshakeFailed;
    }

    // Read and discard server info
    _ = try readString(reader, self.allocator); // name
    _ = try readUleb128(reader); // major
    _ = try readUleb128(reader); // minor
    _ = try readUleb128(reader); // revision
    // some revisions have timezone
    if (REVISION >= 54338) {
         _ = try readString(reader, self.allocator);
    }
}

// Main public method for inserting a batch of logs
pub fn batchInsert(self: *Client, logs: []const LogEntry) !void {
    // In a real project, handshake should be done once on connection establishment
    // For simplicity, we do it here.
    // NOTE: Reading credentials from env vars is crucial for production.
    const db_name = std.os.getenv("CH_DB").? catch "default";
    const db_user = std.os.getenv("CH_USER").? catch "default";
    const db_pass = std.os.getenv("CH_PASSWORD").? catch "";
    try self.handshake(db_name, db_user, db_pass);

    var bw = std.io.bufferedWriter(self.writer);
    const writer = bw.writer();
    var br = std.io.bufferedReader(self.reader);
    const reader = br.reader();

    // 1. Send Query Packet
    try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_QUERY, &[_]u8{0} ** 10));
    try writeString(writer, ""); // query id
    // Client Info
    try writer.writeByte(1); // if_exists
    try writeString(writer, ""); // client_host
    try writeString(writer, ""); // client_name
    try writeString(writer, "log-ingester-zig");
    try writer.writeAll(std.zig.c_translation.Uleb128.to(1, &[_]u8{0} ** 10)); // Major
    try writer.writeAll(std.zig.c_translation.Uleb128.to(0, &[_]u8{0} ** 10)); // Minor
    try writer.writeAll(std.zig.c_translation.Uleb128.to(REVISION, &[_]u8{0} ** 10));
    try writer.writeAll(std.zig.c_translation.Uleb128.to(2, &[_]u8{0} ** 10)); // tcp protocol

    // Settings (none for this case)
    try writeString(writer, ""); 

    // End of settings marker
    try writer.writeAll(std.zig.c_translation.Uleb128.to(2, &[_]u8{0} ** 10));

    // The actual query
    try writeString(writer, "INSERT INTO default.logs FORMAT BlockInfo");

    // Secret (none)
    try writer.writeByte(0);

    try bw.flush();

    // 2. Wait for ServerData response indicating we can send data
    var packet_type = try readUleb128(reader);
    while (packet_type != SERVER_DATA) {
        // handle other packets like progress, profile info, etc.
        // For insertion, we can mostly ignore them and wait for the DATA packet.
         if (packet_type == SERVER_EXCEPTION) {
            std.log.err("Server exception during query prep: {s}", .{try readString(reader, self.allocator)});
            return error.QueryFailed;
         }
        // Discard unexpected packet content. A production client needs to parse this properly.
         _ = try readString(reader, self.allocator); // temp table name, might be empty
         _ = try readBlock(reader, self.allocator);
         packet_type = try readUleb128(reader);
    }
    _ = try readString(reader, self.allocator); // temp table name


    // 3. Send Data Block
    try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_DATA, &[_]u8{0} ** 10));
    try writeString(writer, ""); // temp table name

    try self.writeLogsBlock(writer, logs);

    // Send an empty block to signal end of data
    try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_DATA, &[_]u8{0} ** 10));
    try writeString(writer, ""); 
    try self.writeLogsBlock(writer, &[_]LogEntry{});
    
    try bw.flush();


    // 4. Wait for End of Stream
    packet_type = try readUleb128(reader);
    while (packet_type != SERVER_END_OF_STREAM) {
         if (packet_type == SERVER_EXCEPTION) {
             std.log.err("Server exception during data send: {s}", .{try readString(reader, self.allocator)});
             return error.InsertFailed;
         }
         // Ignore other packets
         packet_type = try readUleb128(reader);
    }
}

fn writeLogsBlock(self: *Client, writer: anytype, logs: []const LogEntry) !void {
    // BlockInfo
    try writer.writeByte(1); // is_overflows
    try writer.writeByte(2); // bucket_num

    const num_columns: u64 = 5;
    const num_rows: u64 = logs.len;
    
    try writer.writeAll(std.zig.c_translation.Uleb128.to(num_columns, &[_]u8{0} ** 10));
    try writer.writeAll(std.zig.c_translation.Uleb128.to(num_rows, &[_]u8{0} ** 10));

    // --- Column 1: timestamp ---
    try writeString(writer, "timestamp");
    try writeString(writer, "DateTime64(3, 'UTC')");
    for (logs) |log| {
        // DateTime64 is stored as Int64
        try writer.writeInt(i64, @intCast(log.timestamp), .little);
    }

    // --- Column 2: project_id ---
    try writeString(writer, "project_id");
    try writeString(writer, "LowCardinality(String)");
    for (logs) |log| {
        try writeString(writer, log.project_id);
    }

    // --- Column 3: level ---
    try writeString(writer, "level");
    try writeString(writer, "Enum8('DEBUG'=0, 'INFO'=1, 'WARN'=2, 'ERROR'=3, 'FATAL'=4)");
    for (logs) |log| {
        try writer.writeInt(u8, @intFromEnum(log.level), .little);
    }

    // --- Column 4: message ---
    try writeString(writer, "message");
    try writeString(writer, "String");
    for (logs) |log| {
        try writeString(writer, log.message);
    }

    // --- Column 5: metadata ---
    try writeString(writer, "metadata");
    try writeString(writer, "String");
    for (logs) |log| {
        try writeString(writer, log.metadata);
    }
}

// Helper functions for reading/writing ClickHouse protocol primitives
fn writeString(writer: anytype, s: []const u8) !void {
    try writer.writeAll(std.zig.c_translation.Uleb128.to(s.len, &[_]u8{0} ** 10));
    try writer.writeAll(s);
}

fn readUleb128(reader: anytype) !u64 {
    return std.zig.c_translation.Uleb128.read(reader);
}

fn readString(reader: anytype, allocator: std.mem.Allocator)

  目录