构建从 Qwik 前端到 AWS EKS Prometheus 的定制化时序性能遥测管道


现有的 Real User Monitoring (RUM) 解决方案要么成本高昂,要么无法精确捕捉 Qwik 框架独特的性能指标。Qwik 的核心优势在于其“可恢复性”(Resumability),它绕过了传统框架的“水合”(Hydration)过程,这意味着衡量传统的 DOMContentLoadedload 事件并不能完全反映用户感知的交互延迟。我们需要的是一个能够精确度量从用户交互到第一个 JavaScript 代码块(QRL)被下载并执行所需时间的系统。标准 RUM 工具对此无能为力。

我们的技术栈运行在 AWS EKS 上,Prometheus 作为核心的指标监控系统。将前端遥测数据集成到这个已有的、经过生产验证的体系中,是最符合成本效益和运维习惯的选择。但这立刻引出了核心的技术挑战:Prometheus 是一个基于拉(Pull)模型设计的系统,而前端遥测数据本质上是推送(Push)的。直接使用 Prometheus Pushgateway 是一个常见的诱惑,但在处理大量、短暂的客户端事件时,它是一个已知的反模式,会造成数据一致性问题和成为单点瓶颈。

因此,我们决定自行构建一个轻量级的遥测管道。这个管道包含三部分:

  1. 一个嵌入 Qwik 应用的、非阻塞的遥测客户端。
  2. 一个部署在 EKS 上的、无状态的 Go 语言编写的指标接收与转换服务。
  3. 利用 Prometheus Operator 实现对该服务的自动发现与指标抓取。

最终的目标是能够执行这样的 PromQL 查询,来获取特定页面的 P95 恢复时间:

# 计算过去5分钟内,/product/detail 页面用户感知的Qwik恢复时间(从交互到代码执行)的95分位数
histogram_quantile(0.95, sum(rate(qwik_resume_duration_seconds_bucket{k8s_namespace="production", path="/product/detail"}[5m])) by (le))

第一步:Qwik 端的遥测客户端设计

遥测客户端必须满足两个基本原则:极低的性能开销和数据发送的可靠性。它本身不能成为影响用户体验的因素。

我们将利用 Qwik 的 useVisibleTask$ Hook 在组件首次可见时启动 PerformanceObserver,来监听 Core Web Vitals (LCP, FCP, CLS, FID) 指标。同时,我们将封装 Qwik 的事件监听器,以便在事件触发 QRL 加载时精确计时。

为了保证数据在页面卸载时也能成功发送,navigator.sendBeacon 是不二之_
选择。它会异步地发送一个小型 HTTP 请求,并且不会延迟页面的卸载。

以下是这个遥测客户端的核心实现,我们将其封装为一个 Qwik Hook,方便在根组件中统一调用。

src/hooks/use-performance-telemetry.ts:

import { useVisibleTask$, noSerialize, type NoSerialize } from '@builder.io/qwik';

// 定义需要上报的遥测数据结构
interface TelemetryPayload {
  metricName: string;
  value: number;
  tags: Record<string, string | number>;
  timestamp: number;
}

// 定义一个轻量级的遥测队列
class TelemetryQueue {
  private queue: TelemetryPayload[] = [];
  private endpoint: string;
  private batchSize: number;
  private flushInterval: number;
  private timerId: ReturnType<typeof setInterval> | null = null;

  constructor(endpoint: string, batchSize = 10, flushInterval = 5000) {
    this.endpoint = endpoint;
    this.batchSize = batchSize;
    this.flushInterval = flushInterval;
  }

  public start() {
    // 启动定时器,定期清空队列
    // 在真实项目中,这里可以增加更复杂的逻辑,例如根据网络状态动态调整间隔
    if (this.timerId === null) {
      this.timerId = setInterval(() => this.flush(), this.flushInterval);
    }
  }

  public stop() {
    if (this.timerId !== null) {
      clearInterval(this.timerId);
      this.timerId = null;
    }
    // 确保在停止时,所有剩余数据都被发送
    this.flush();
  }

  public push(payload: Omit<TelemetryPayload, 'timestamp'>) {
    this.queue.push({ ...payload, timestamp: Date.now() });
    if (this.queue.length >= this.batchSize) {
      this.flush();
    }
  }

  public flush() {
    if (this.queue.length === 0) {
      return;
    }

    const batch = this.queue.slice();
    this.queue = [];

    // 使用 sendBeacon API,它保证了即使页面卸载,请求也能大概率被发出
    // 这是 RUM 监控的关键技术点,避免丢失用户离开页面前的最后一些性能数据
    try {
      const data = JSON.stringify(batch);
      // navigator.sendBeacon 的一个限制是它不支持复杂的 Content-Type
      // 因此我们使用 text/plain,并在服务端解析
      // 另一个备选方案是使用 keepalive fetch
      if (!navigator.sendBeacon(this.endpoint, data)) {
          console.error('Telemetry beacon failed to queue.');
          // 在这里可以实现回退逻辑,例如将失败的数据存入 localStorage
      }
    } catch (e) {
      // 捕获 JSON 序列化等错误
      console.error('Failed to send telemetry data:', e);
    }
  }
}

// Qwik Hook 实现
export const usePerformanceTelemetry = () => {
  let telemetryQueue: NoSerialize<TelemetryQueue>;

  // useVisibleTask$ 会在组件在浏览器中可见时执行
  // 这是启动客户端监控逻辑的理想位置
  useVisibleTask$(({ cleanup }) => {
    // '/api/telemetry' 是我们部署在EKS上的接收服务的入口
    telemetryQueue = noSerialize(new TelemetryQueue('/api/telemetry/ingest'));
    if (!telemetryQueue) return;

    telemetryQueue.start();

    const commonTags = {
      path: window.location.pathname,
      sdkVersion: '0.1.0',
      // 在真实项目中,可以添加更多维度,如用户ID、AB测试分组等
      // 但要极其小心高基数问题
      // userAgent: navigator.userAgent, // 注意:UA 会导致基数爆炸,通常在服务端解析
      connectionType: (navigator as any).connection?.effectiveType || 'unknown'
    };
    
    // 1. 监听 Core Web Vitals
    const observer = new PerformanceObserver((list) => {
      for (const entry of list.getEntries()) {
        telemetryQueue?.push({
          metricName: `web_vitals_${entry.name.toLowerCase()}`,
          value: (entry as any).value,
          tags: commonTags,
        });
      }
    });
    observer.observe({ type: 'lcp', buffered: true });
    observer.observe({ type: 'fid', buffered: true });
    observer.observe({ type: 'cls', buffered: true });

    // 2. 监听自定义 Qwik 恢复时间 (这是一个示例,具体实现需要深入Qwik内部)
    // 假设我们有一个全局事件总线来发布此类事件
    const handleQwikResume = (evt: CustomEvent) => {
      const { duration, component } = evt.detail;
      telemetryQueue?.push({
        metricName: 'qwik_resume_duration_seconds',
        value: duration / 1000, // 转换为秒
        tags: { ...commonTags, component },
      });
    };
    document.addEventListener('qwikResume', handleQwikResume as EventListener);


    // 3. 监听页面卸载事件,确保所有数据都被发送
    const handleVisibilityChange = () => {
      if (document.visibilityState === 'hidden') {
        telemetryQueue?.flush();
      }
    };
    document.addEventListener('visibilitychange', handleVisibilityChange);

    // cleanup 函数会在组件卸载时调用
    cleanup(() => {
      observer.disconnect();
      document.removeEventListener('qwikResume', handleQwikResume as EventListener);
      document.removeEventListener('visibilitychange', handleVisibilityChange);

      telemetryQueue?.stop();
    });
  });
};

在应用的根组件 src/routes/layout.tsx 中使用这个 Hook:

import { component$, Slot } from '@builder.io/qwik';
import { usePerformanceTelemetry } from '~/hooks/use-performance-telemetry';

export default component$(() => {
  // 在这里调用 Hook,它将自动为整个应用启动监控
  usePerformanceTelemetry();

  return <Slot />;
});

第二步:Go 语言指标接收与转换服务

这个服务的职责是:

  1. 提供一个 HTTP POST /ingest 端点接收来自 Qwik 客户端的 JSON 数据。
  2. 将 JSON 数据转换为 Prometheus 指标格式。
  3. 维护一个并发安全(thread-safe)的内存指标注册表。
  4. 提供一个 HTTP GET /metrics 端点供 Prometheus Server 抓取。

这里的关键设计在于如何处理高基数(High Cardinality)标签。如果我们将 pathcomponent 甚至用户ID作为标签,每个独特的组合都会创建一个新的时间序列。在流量巨大的网站上,这会迅速耗尽 Prometheus 的内存。

我们的策略是:

  • path 进行规范化。例如,将 /products/123/products/456 归一化为 /products/:id。这必须在接收服务中完成。
  • 严格限制标签数量和基数。任何基数可能超过几百的维度(如 session_id)都绝对不能作为标签。
  • 使用 Histogram 类型来记录延迟,而不是 GaugeCounter。Histogram 可以在客户端聚合数据,只暴露分桶计数,极大地降低了存储和查询压力。

main.go:

package main

import (
	"encoding/json"
	"io"
	"log"
	"net/http"
	"regexp"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// TelemetryPayload 对应 Qwik 客户端发送的结构
type TelemetryPayload struct {
	MetricName string            `json:"metricName"`
	Value      float64           `json:"value"`
	Tags       map[string]string `json:"tags"`
	Timestamp  int64             `json:"timestamp"` // 暂时未使用,但保留用于未来调试
}

// MetricConverter 负责将遥测数据转换为 Prometheus 指标
// 它内部维护一个动态的指标注册表,以处理不同名称和标签的指标
type MetricConverter struct {
	mu           sync.RWMutex
	histograms   map[string]*prometheus.HistogramVec
	pathRegex    *regexp.Regexp
}

func NewMetricConverter() *MetricConverter {
	// 这个正则表达式用于将动态路径参数化,是控制基数的关键
	// 例子: /users/123 -> /users/:id
	// 在生产环境中,这个规则集会更复杂
	regex := regexp.MustCompile(`/\d+`)

	return &MetricConverter{
		histograms: make(map[string]*prometheus.HistogramVec),
		pathRegex:  regex,
	}
}

// normalizePath 是一个关键的函数,用于降低 'path' 标签的基数
func (c *MetricConverter) normalizePath(path string) string {
	return c.pathRegex.ReplaceAllString(path, "/:id")
}

// IngestHandler 处理来自客户端的遥测数据
func (c *MetricConverter) IngestHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is accepted", http.StatusMethodNotAllowed)
		return
	}
	
	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Error reading request body", http.StatusInternalServerError)
		return
	}
	defer r.Body.Close()

	var payloads []TelemetryPayload
	if err := json.Unmarshal(body, &payloads); err != nil {
		http.Error(w, "Error decoding JSON", http.StatusBadRequest)
		log.Printf("Failed to decode JSON: %v, body: %s", err, string(body))
		return
	}

	for _, p := range payloads {
		c.processPayload(p)
	}

	w.WriteHeader(http.StatusOK)
}

func (c *MetricConverter) processPayload(p TelemetryPayload) {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 规范化标签,特别是 'path'
	if path, ok := p.Tags["path"]; ok {
		p.Tags["path"] = c.normalizePath(path)
	}
	
	// 动态创建或获取 HistogramVec
	// 在真实项目中,我们应该对 metricName 和 tags 进行白名单验证,防止客户端恶意创建大量指标
	labels := make([]string, 0, len(p.Tags))
	for k := range p.Tags {
		labels = append(labels, k)
	}

	histo, ok := c.histograms[p.MetricName]
	if !ok {
		// 定义直方图的 buckets。这个选择对性能分析至关重要。
		// 对于 Web Vitals,我们需要更关注用户感知的低延迟范围。
		buckets := prometheus.DefBuckets
		if p.MetricName == "qwik_resume_duration_seconds" {
			buckets = []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5}
		}

		histo = prometheus.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    p.MetricName,
				Help:    "Histogram for " + p.MetricName,
				Buckets: buckets,
			},
			labels,
		)
		// 注册新的 HistogramVec
		prometheus.MustRegister(histo)
		c.histograms[p.MetricName] = histo
	}

	// 观测数据点
	histo.With(p.Tags).Observe(p.Value)
}


func main() {
	converter := NewMetricConverter()

	// 启动一个 goroutine 来定期清理旧的、不活跃的指标,防止内存泄漏
	// Prometheus Go client v1.12.1+ 提供了更优雅的 Unregister/Reset 方式
	// 这里是一个简化的演示
	go func() {
		for {
			time.Sleep(15 * time.Minute)
			// 在真实场景中,这里应该有逻辑来识别和移除长时间未更新的时间序列
			// 例如,使用 `promhttp.HandlerOpts{DisableCompression: true}` 并解析输出
			log.Println("Periodic metric cleanup would run here.")
		}
	}()

	http.HandleFunc("/ingest", converter.IngestHandler)
	http.Handle("/metrics", promhttp.Handler())
	http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		w.Write([]byte("ok"))
	})

	log.Println("Starting telemetry ingest server on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

对应的 Dockerfile:

FROM golang:1.21-alpine AS builder

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o telemetry-ingestor .

FROM alpine:latest

RUN apk --no-cache add ca-certificates

WORKDIR /root/

COPY --from=builder /app/telemetry-ingestor .

EXPOSE 8080

CMD ["./telemetry-ingestor"]

第三步:部署到 AWS EKS 并集成 Prometheus

我们假设环境中已经安装了 Prometheus Operator,这是在 Kubernetes 中管理 Prometheus 实例的标准方式。

  1. 创建 Deployment 和 Service

    k8s/deployment.yaml:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: telemetry-ingestor
      namespace: monitoring
      labels:
        app: telemetry-ingestor
    spec:
      replicas: 2 # 至少两个副本保证高可用
      selector:
        matchLabels:
          app: telemetry-ingestor
      template:
        metadata:
          labels:
            app: telemetry-ingestor
        spec:
          containers:
          - name: ingestor
            # 替换为你的镜像仓库地址
            image: 123456789012.dkr.ecr.us-east-1.amazonaws.com/telemetry-ingestor:v0.1.0
            ports:
            - containerPort: 8080
              name: http
            resources:
              requests:
                cpu: "100m"
                memory: "128Mi"
              limits:
                cpu: "500m"
                memory: "256Mi"
            livenessProbe:
              httpGet:
                path: /healthz
                port: 8080
              initialDelaySeconds: 5
              periodSeconds: 10
            readinessProbe:
              httpGet:
                path: /healthz
                port: 8080
              initialDelaySeconds: 5
              periodSeconds: 10
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: telemetry-ingestor-svc
      namespace: monitoring
      labels:
        app: telemetry-ingestor
    spec:
      selector:
        app: telemetry-ingestor
      ports:
      - name: http
        port: 8080
        targetPort: http
  2. 创建 ServiceMonitor 让 Prometheus 发现

    k8s/servicemonitor.yaml:

    apiVersion: monitoring.coreos.com/v1
    kind: ServiceMonitor
    metadata:
      name: telemetry-ingestor-monitor
      namespace: monitoring
      labels:
        # 这个标签必须与你的 Prometheus Operator 的 serviceMonitorSelector 匹配
        release: prometheus
    spec:
      selector:
        matchLabels:
          app: telemetry-ingestor
      namespaceSelector:
        matchNames:
        - monitoring
      endpoints:
      - port: http
        path: /metrics
        interval: 15s # 抓取间隔

    应用这些 YAML 文件后,Prometheus Operator 会自动配置 Prometheus Server 来抓取我们 telemetry-ingestor-svc 服务的 /metrics 端点。

数据流架构图

整个遥测数据管道的流向可以用下图清晰地表示:

graph TD
    subgraph Browser
        A[Qwik App] -- 1. navigator.sendBeacon --> B{/api/telemetry/ingest};
    end
    
    subgraph AWS EKS Cluster
        B -- 2. Ingress/LoadBalancer --> C[telemetry-ingestor Service];
        C -- 3. Pod-to-Pod --> D[telemetry-ingestor Pods];
        E[Prometheus Server] -- 4. Scrapes /metrics --> D;
        F[Grafana] -- 5. PromQL Query --> E;
    end

    G[Developer/SRE] -- 6. Views Dashboard --> F;

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#f0ad4e,stroke:#333,stroke-width:2px
    style F fill:#5cb85c,stroke:#333,stroke-width:2px

局限性与未来迭代路径

这套自建管道虽然灵活且成本低廉,但并非完美。在真实生产环境中,它存在几个需要正视的局限性:

  1. 基数控制的脆弱性: 当前的路径规范化和标签白名单机制完全依赖于 telemetry-ingestor 服务的硬编码逻辑。如果前端应用引入了新的、未被预期的动态 URL 格式,可能会瞬间导致基数爆炸,进而影响整个 Prometheus 集群的稳定性。这需要严格的代码审查和持续的监控来防范。

  2. 数据采样与聚合的缺失: 目前的方案是全量上报。对于流量极高的应用,这可能会给用户的上行带宽和接收服务的处理能力带来压力。一个更健壮的系统应该在客户端实现智能采样,或者在边缘节点(如果使用CDN)进行初步聚合。

  3. 单点责任: telemetry-ingestor 服务承担了数据接收、转换、规范化和暴露的多重职责。更符合云原生理念的架构是将其拆分。例如,使用一个专门的接收网关,将数据写入到 Kafka 或 Kinesis 这样的消息队列中,再由一个或多个消费者应用进行处理和转换。这提高了系统的弹性和可扩展性。

未来的迭代方向可以考虑引入 OpenTelemetry Collector。将 telemetry-ingestor 替换为一个配置化的 OpenTelemetry Collector,它可以通过其 prometheusremotewrite 导出器直接将数据写入 Prometheus 或兼容的存储后端(如 Thanos, Mimir),同时利用其丰富的处理器(processors)来实现更复杂的采样、过滤和属性修改,从而将大部分定制化逻辑从代码中剥离到配置里,大大提升了可维护性。


  目录