日志系统的成本在服务规模化后会急剧膨胀。市面上的SaaS解决方案虽然功能强大,但其基于摄取量和保留时间的定价模型,对于日志密集型应用而言是一笔巨大的开销。当我们的一个实时分析服务的日志量达到每日数十亿条时,每月数千美元的账单迫使我们寻找一个自建、高性能且成本可控的替代方案。目标很明确:一个能够处理高并发写入、提供快速查询能力、并且部署和维护成本极低的日志平台。
这套系统的核心是一个轻量级的日志摄取服务。它必须能够以极低的资源消耗处理海量的HTTP请求,对传入的JSON日志进行最少的处理,然后高效地批量写入ClickHouse。Go和Rust是常见的选项,但我们最终选择了Zig。原因在于其对内存的精细控制能力、无隐藏的控制流、以及极其简单的交叉编译,这使得构建一个无GC、内存占用稳定、单文件部署的静态二进制文件成为可能,完美契合我们在DigitalOcean小型Droplet上运行的目标。
我们的技术栈组合如下:
- 日志摄取与查询代理: Zig语言编写的HTTP服务。
- 数据存储与分析: DigitalOcean Managed ClickHouse。
- 基础设施: DigitalOcean Droplet (用于Zig服务) + Spaces (用于静态UI)。
- 前端查询界面: 一个极简的Headless UI,通过API与Zig服务通信。
架构概览
整个系统的数据流非常直接,分为写入路径和查询路径。
graph TD
subgraph "写入路径 (High-Throughput Ingestion)"
A[Applications] -- JSON over HTTP --> B(Zig Ingestion Service on DigitalOcean Droplet);
B -- Native TCP Protocol Batch Insert --> C(Managed ClickHouse Cluster);
end
subgraph "查询路径 (Ad-hoc Querying)"
D[Developer's Browser] --> E{Headless UI on DigitalOcean Spaces};
E -- Query API Request --> F(Zig Query Proxy on DigitalOcean Droplet);
F -- HTTP GET Request --> C;
C -- Query Result --> F;
F -- JSON Response --> E;
end
写入路径是性能关键。应用通过HTTP POST将JSON格式的日志发送到Zig服务。服务在内存中累积日志,达到一定阈值(如10000条或1秒)后,通过ClickHouse的原生TCP协议将数据块高效地批量写入。原生协议避免了HTTP的开销和JSON解析的成本,是实现高吞吐量的关键。
查询路径则注重功能性。一个基于Headless UI组件库(如Radix UI或Headless UI by Tailwind Labs)构建的静态单页应用,允许工程师编写SQL风格的查询。请求被发送到Zig服务的另一个端点,该端点充当查询代理,将请求转发给ClickHouse的HTTP接口,并将结果以JSON格式返回给前端。
ClickHouse Schema设计
在ClickHouse中,一个好的表结构设计是性能的基石。对于日志数据,分区和排序是两个最重要的优化点。
一个常见的错误是使用过于精细的分区,比如按小时分区。这会导致大量的分区目录,对元数据管理和查询性能造成负面影响。对于大多数场景,按天分区是最佳实践。
排序键(ORDER BY)决定了数据在物理上如何存储和索引。查询时,如果WHERE子句能够利用排序键,ClickHouse可以极大地减少需要扫描的数据量。我们将项目ID和时间戳作为主排序键,因为绝大多数查询都会按项目和时间范围进行过滤。
-- 在DigitalOcean Managed ClickHouse中执行
CREATE TABLE default.logs (
-- 时间戳,精度到毫秒
`timestamp` DateTime64(3, 'UTC'),
-- 服务/项目标识符,用于数据隔离和查询过滤
`project_id` LowCardinality(String),
-- 日志级别,同样适合低基数优化
`level` LowCardinality(String) Enum8('DEBUG'=0, 'INFO'=1, 'WARN'=2, 'ERROR'=3, 'FATAL'=4),
-- 日志主体消息
`message` String,
-- 动态的元数据字段,存储JSON对象
`metadata` String,
-- 索引以加速特定字段的查询
INDEX idx_project_id project_id TYPE set(0) GRANULARITY 1,
INDEX idx_message message TYPE tokenbf_v1(8192, 3, 0) GRANULARITY 1
)
ENGINE = MergeTree()
-- 按天分区,保留最近30天的数据
PARTITION BY toYYYYMMDD(timestamp)
-- 主排序键,决定数据物理存储顺序和主键索引
ORDER BY (project_id, timestamp)
-- 数据保留策略
TTL toDate(timestamp) + INTERVAL 30 DAY
SETTINGS index_granularity = 8192;
这里的metadata字段我们选择用String类型存储原始JSON。虽然ClickHouse支持JSON对象类型,但在写入密集型场景下,将其作为字符串处理可以简化客户端逻辑并提高写入吞吐量。查询时可以使用JSONExtract*系列函数进行解析。
Zig日志摄取服务的实现
这是整个系统的核心。我们将用Zig的标准库构建一个异步HTTP服务器,并自己实现一个简化的ClickHouse原生TCP协议客户端。
项目结构:
.
├── build.zig // Zig构建脚本
└── src
├── main.zig // 程序入口和HTTP服务器
├── clickhouse.zig// ClickHouse原生客户端
└── types.zig // 数据结构定义
build.zig
Zig的构建系统非常强大,无需外部构建工具。我们定义一个可执行文件并链接标准库。
const std = @import("std");
pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});
const exe = b.addExecutable(.{
.name = "log-ingester",
.root_source_file = .{ .path = "src/main.zig" },
.target = target,
.optimize = optimize,
});
b.installArtifact(exe);
const run_cmd = b.addRunArtifact(exe);
run_cmd.step.dependOn(b.getInstallStep());
if (b.args) |args| {
run_cmd.addArgs(args);
}
const run_step = b.step("run", "Run the application");
run_step.dependOn(&run_cmd.step);
}
src/types.zig
定义与ClickHouse表结构对应的日志条目结构体。
const std = @import("std");
pub const LogLevel = enum(u8) {
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3,
FATAL = 4,
};
pub const LogEntry = struct {
timestamp: u64, // Unix timestamp in milliseconds
project_id: []const u8,
level: LogLevel,
message: []const u8,
metadata: []const u8,
};
src/clickhouse.zig
这是技术挑战最大的部分。我们需要实现一个客户端来与ClickHouse原生TCP端口(通常是9000)通信。协议比较复杂,但对于写入操作,我们只需要实现其核心部分:握手、发送查询、发送数据块。
```zig
// src/clickhouse.zig
const std = @import(“std”);
const types = @import(“types.zig”);
const LogEntry = types.LogEntry;
// ClickHouse wire protocol constants
const CLIENT_HELLO: u8 = 0;
const CLIENT_QUERY: u8 = 1;
const CLIENT_DATA: u8 = 2;
const SERVER_HELLO: u8 = 0;
const SERVER_DATA: u8 = 1;
const SERVER_EXCEPTION: u8 = 2;
const SERVER_END_OF_STREAM: u8 = 4;
const REVISION = 54455; // Use a recent revision number
// A simplified client for batch inserting data
pub const Client = struct {
conn: std.net.Stream,
writer: std.io.Writer,
reader: std.io.Reader,
allocator: std.mem.Allocator,
pub fn connect(allocator: std.mem.Allocator, host: []const u8, port: u16) !Client {
const address = try std.net.Address.resolveIp(host, port);
var stream = try std.net.tcpConnectToAddress(address);
return Client{
.conn = stream,
.writer = stream.writer(),
.reader = stream.reader(),
.allocator = allocator,
};
}
pub fn deinit(self: *Client) void {
self.conn.close();
}
// Handshake with the ClickHouse server
fn handshake(self: *Client, db: []const u8, user: []const u8, password: []const u8) !void {
var bw = std.io.bufferedWriter(self.writer);
const writer = bw.writer();
// Client Hello
try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_HELLO, &[_]u8{0} ** 10));
try writeString(writer, "log-ingester-zig");
try writer.writeAll(std.zig.c_translation.Uleb128.to(1, &[_]u8{0} ** 10)); // Major
try writer.writeAll(std.zig.c_translation.Uleb128.to(0, &[_]u8{0} ** 10)); // Minor
try writer.writeAll(std.zig.c_translation.Uleb128.to(REVISION, &[_]u8{0} ** 10));
try writeString(writer, db);
try writeString(writer, user);
try writeString(writer, password);
try bw.flush();
// Server Hello
var br = std.io.bufferedReader(self.reader);
const reader = br.reader();
const packet_type = try readUleb128(reader);
if (packet_type != SERVER_HELLO) {
std.log.err("expected SERVER_HELLO, got {d}", .{packet_type});
return error.HandshakeFailed;
}
// Read and discard server info
_ = try readString(reader, self.allocator); // name
_ = try readUleb128(reader); // major
_ = try readUleb128(reader); // minor
_ = try readUleb128(reader); // revision
// some revisions have timezone
if (REVISION >= 54338) {
_ = try readString(reader, self.allocator);
}
}
// Main public method for inserting a batch of logs
pub fn batchInsert(self: *Client, logs: []const LogEntry) !void {
// In a real project, handshake should be done once on connection establishment
// For simplicity, we do it here.
// NOTE: Reading credentials from env vars is crucial for production.
const db_name = std.os.getenv("CH_DB").? catch "default";
const db_user = std.os.getenv("CH_USER").? catch "default";
const db_pass = std.os.getenv("CH_PASSWORD").? catch "";
try self.handshake(db_name, db_user, db_pass);
var bw = std.io.bufferedWriter(self.writer);
const writer = bw.writer();
var br = std.io.bufferedReader(self.reader);
const reader = br.reader();
// 1. Send Query Packet
try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_QUERY, &[_]u8{0} ** 10));
try writeString(writer, ""); // query id
// Client Info
try writer.writeByte(1); // if_exists
try writeString(writer, ""); // client_host
try writeString(writer, ""); // client_name
try writeString(writer, "log-ingester-zig");
try writer.writeAll(std.zig.c_translation.Uleb128.to(1, &[_]u8{0} ** 10)); // Major
try writer.writeAll(std.zig.c_translation.Uleb128.to(0, &[_]u8{0} ** 10)); // Minor
try writer.writeAll(std.zig.c_translation.Uleb128.to(REVISION, &[_]u8{0} ** 10));
try writer.writeAll(std.zig.c_translation.Uleb128.to(2, &[_]u8{0} ** 10)); // tcp protocol
// Settings (none for this case)
try writeString(writer, "");
// End of settings marker
try writer.writeAll(std.zig.c_translation.Uleb128.to(2, &[_]u8{0} ** 10));
// The actual query
try writeString(writer, "INSERT INTO default.logs FORMAT BlockInfo");
// Secret (none)
try writer.writeByte(0);
try bw.flush();
// 2. Wait for ServerData response indicating we can send data
var packet_type = try readUleb128(reader);
while (packet_type != SERVER_DATA) {
// handle other packets like progress, profile info, etc.
// For insertion, we can mostly ignore them and wait for the DATA packet.
if (packet_type == SERVER_EXCEPTION) {
std.log.err("Server exception during query prep: {s}", .{try readString(reader, self.allocator)});
return error.QueryFailed;
}
// Discard unexpected packet content. A production client needs to parse this properly.
_ = try readString(reader, self.allocator); // temp table name, might be empty
_ = try readBlock(reader, self.allocator);
packet_type = try readUleb128(reader);
}
_ = try readString(reader, self.allocator); // temp table name
// 3. Send Data Block
try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_DATA, &[_]u8{0} ** 10));
try writeString(writer, ""); // temp table name
try self.writeLogsBlock(writer, logs);
// Send an empty block to signal end of data
try writer.writeAll(std.zig.c_translation.Uleb128.to(CLIENT_DATA, &[_]u8{0} ** 10));
try writeString(writer, "");
try self.writeLogsBlock(writer, &[_]LogEntry{});
try bw.flush();
// 4. Wait for End of Stream
packet_type = try readUleb128(reader);
while (packet_type != SERVER_END_OF_STREAM) {
if (packet_type == SERVER_EXCEPTION) {
std.log.err("Server exception during data send: {s}", .{try readString(reader, self.allocator)});
return error.InsertFailed;
}
// Ignore other packets
packet_type = try readUleb128(reader);
}
}
fn writeLogsBlock(self: *Client, writer: anytype, logs: []const LogEntry) !void {
// BlockInfo
try writer.writeByte(1); // is_overflows
try writer.writeByte(2); // bucket_num
const num_columns: u64 = 5;
const num_rows: u64 = logs.len;
try writer.writeAll(std.zig.c_translation.Uleb128.to(num_columns, &[_]u8{0} ** 10));
try writer.writeAll(std.zig.c_translation.Uleb128.to(num_rows, &[_]u8{0} ** 10));
// --- Column 1: timestamp ---
try writeString(writer, "timestamp");
try writeString(writer, "DateTime64(3, 'UTC')");
for (logs) |log| {
// DateTime64 is stored as Int64
try writer.writeInt(i64, @intCast(log.timestamp), .little);
}
// --- Column 2: project_id ---
try writeString(writer, "project_id");
try writeString(writer, "LowCardinality(String)");
for (logs) |log| {
try writeString(writer, log.project_id);
}
// --- Column 3: level ---
try writeString(writer, "level");
try writeString(writer, "Enum8('DEBUG'=0, 'INFO'=1, 'WARN'=2, 'ERROR'=3, 'FATAL'=4)");
for (logs) |log| {
try writer.writeInt(u8, @intFromEnum(log.level), .little);
}
// --- Column 4: message ---
try writeString(writer, "message");
try writeString(writer, "String");
for (logs) |log| {
try writeString(writer, log.message);
}
// --- Column 5: metadata ---
try writeString(writer, "metadata");
try writeString(writer, "String");
for (logs) |log| {
try writeString(writer, log.metadata);
}
}
// Helper functions for reading/writing ClickHouse protocol primitives
fn writeString(writer: anytype, s: []const u8) !void {
try writer.writeAll(std.zig.c_translation.Uleb128.to(s.len, &[_]u8{0} ** 10));
try writer.writeAll(s);
}
fn readUleb128(reader: anytype) !u64 {
return std.zig.c_translation.Uleb128.read(reader);
}
fn readString(reader: anytype, allocator: std.mem.Allocator)