构建从Service Worker到Java后端CV服务的全链路追踪与韧性架构


技术痛点:消失的请求与失控的链路

我们面临一个棘手的生产问题。一个用于现场质检的Web应用,用户通过平板在车间内拍摄零件照片并上传进行CV(计算机视觉)分析。车间的网络环境极不稳定,WiFi信号时断时续。业务方反馈大量“提交失败”和“任务状态未知”的工单。从后端监控看,我们只看到零星的请求入口,根本无法与用户报告的失败案例对应。整个流程,从用户点击上传那一刻,到CV服务处理完毕,对我们来说是一个巨大的黑盒。定位一个请求究竟是在用户设备上就失败了,还是在后端某个环节卡住了,耗时巨大且全凭猜测。

初步诊断指向两个核心矛盾:

  1. 前端韧性缺失:传统的HTTP请求在弱网环境下就是一场灾难。一次网络抖动就足以让一次几十MB的图片上传失败,且无法自动恢复。
  2. 后端可观测性黑洞:我们的Java后端由一个API网关、一个业务逻辑服务和一个独立的CV处理服务构成。服务间通过消息队列解耦,这在提高吞吐量的同时也切断了传统的调用链。一个请求的完整生命周期被分割在不同服务的日志文件中,无法串联。SkyWalking虽然部署了,但其追踪数据在跨越消息队列时出现了断裂,无法形成完整的调用链路。

目标很明确:构建一个既能抵抗网络波动,又能提供端到端可观测性的新架构。

初步构想与架构演进

我们的方案必须同时解决前端的“韧性”和后端的“可观测性”。

  1. 前端改造:引入Service Worker。利用其拦截网络请求和后台同步(Background Sync)的能力,将一次性的上传操作变为一个可保证最终送达的异步任务。即使用户关闭了浏览器,只要网络恢复,Service Worker就能在后台重新尝试上传。
  2. 后端打通:全链路追踪。关键在于如何将一个在浏览器中发起的操作,与其在后端分布式系统中的完整轨迹关联起来。我们需要一个全局唯一的Trace ID,它必须从Service Worker产生,一路透传到API网关、业务服务、消息队列,最终抵达CV处理服务。SkyWalking是实现这一目标的核心工具,但需要我们手动配合,确保追踪上下文(Trace Context)在跨越边界(浏览器->服务器,生产者->消费者)时不丢失。

ORM(我们使用JPA/Hibernate)在其中扮演着“状态记录器”的角色,它需要持久化每一个CV任务的元数据和处理状态,为异步查询和问题排查提供数据基础。

演进后的架构如下:

sequenceDiagram
    participant PWA as 浏览器PWA
    participant SW as Service Worker
    participant GW as API网关 (Java)
    participant MQ as 消息队列
    participant Biz as 业务服务 (Java)
    participant DB as 数据库
    
    PWA->>+SW: 用户发起上传(携带图片)
    Note right of SW: 1. 拦截请求
2. 生成全局Trace ID
3. 将任务存入IndexedDB SW->>PWA: 立即返回“已接收” SW->>SW: 注册后台同步事件('upload-task') alt 网络正常 SW->>+GW: 发起fetch请求 (Header携带Trace ID) else 网络异常 Note right of SW: 请求失败,等待sync事件 end Note over SW, GW: 网络恢复后,sync事件触发... SW->>+GW: 重新发起fetch请求 (Header携带Trace ID) GW->>+Biz: 转发请求 (SkyWalking自动传递Trace Context) Biz->>+DB: ORM: 创建CV任务记录 (状态: PENDING) DB-->>-Biz: 返回任务ID Biz->>+MQ: 发送CV处理消息 (携带Trace Context) MQ-->>-Biz: 确认接收 Biz-->>-GW: 返回任务ID GW-->>-SW: HTTP 202 Accepted Note left of SW: 清理IndexedDB中的任务 SW-->>-PWA: (可选) 通过Push API通知PWA participant CV as CV处理服务 (Java) MQ->>+CV: 消费消息 (恢复Trace Context) Note right of CV: 开始耗时的CV分析 CV->>+DB: ORM: 更新任务状态 (COMPLETED/FAILED) DB-->>-CV: 确认更新 CV-->>-MQ: 确认消费

步骤化实现:代码是架构的最终表达

1. Service Worker:韧性的基石

首先是service-worker.js的实现。这里的核心是fetch事件拦截和sync事件处理。

installactivate阶段是标准的PWA缓存逻辑,此处略过。

fetch事件拦截器

当PWA发起上传请求时,我们拦截它。我们不直接发送,而是将任务持久化到IndexedDB,然后注册一个后台同步任务。这是确保数据不丢失的关键。

// service-worker.js

import { openDB } from 'idb';

const DB_NAME = 'cv-request-queue';
const STORE_NAME = 'requests';

const dbPromise = openDB(DB_NAME, 1, {
  upgrade(db) {
    if (!db.objectStoreNames.contains(STORE_NAME)) {
      db.createObjectStore(STORE_NAME, { keyPath: 'id', autoIncrement: true });
    }
  },
});

// 拦截所有指向 /api/cv/upload 的POST请求
self.addEventListener('fetch', (event) => {
  const url = new URL(event.request.url);
  if (url.pathname === '/api/cv/upload' && event.request.method === 'POST') {
    event.respondWith(
      handleUploadRequest(event.request)
    );
  }
});

async function handleUploadRequest(request) {
  try {
    const requestData = await request.clone().formData(); // 克隆请求体以供后续使用
    const db = await dbPromise;
    
    // 关键:将请求数据存入IndexedDB,而不是立即发送
    await db.put(STORE_NAME, {
      url: request.url,
      method: request.method,
      body: requestData,
      timestamp: Date.now()
    });

    // 注册一个后台同步标签
    await self.registration.sync.register('upload-task');

    // 立即向前端返回一个成功的响应,表示任务已被接受
    // 这里的响应体可以自定义,告知前端任务已进入后台队列
    return new Response(JSON.stringify({ message: 'Request queued for background sync' }), {
      status: 202, // Accepted
      headers: { 'Content-Type': 'application/json' }
    });

  } catch (error) {
    console.error('Failed to queue request:', error);
    // 如果连入队都失败了,则返回错误
    return new Response(JSON.stringify({ error: 'Failed to queue request' }), {
      status: 500,
      headers: { 'Content-Type': 'application/json' }
    });
  }
}

sync事件处理器与Trace ID注入

当网络恢复且浏览器认为时机合适时,sync事件会被触发。在这里,我们从IndexedDB中读取所有待处理的请求并逐一发送。这是注入全链路追踪ID的最佳时机。我们将使用SkyWalking默认的sw8头格式。

// service-worker.js (续)

self.addEventListener('sync', (event) => {
  if (event.tag === 'upload-task') {
    event.waitUntil(processRequestQueue());
  }
});

async function processRequestQueue() {
  const db = await dbPromise;
  const requests = await db.getAll(STORE_NAME);
  if (!requests.length) {
    return;
  }
  
  console.log(`Processing ${requests.length} queued requests...`);

  for (const req of requests) {
    try {
      // 核心:生成并注入SkyWalking的sw8头
      // 格式: 1-{traceId}-{traceSegmentId}-{spanId}-{parentService}-{parentInstance}-{parentEndpoint}-{clientIp}
      const traceId = generateTraceId(); // 自定义一个足够唯一的ID生成器
      const traceSegmentId = generateTraceId();
      const spanId = 0; // 起始span
      const parentService = 'PWA::ServiceWorker';
      const parentInstance = self.registration.scope;
      const parentEndpoint = '/sync/upload-task';
      const sampled = 1; // 1表示采样

      const sw8Header = `${sampled}-${btoa(traceId)}-${btoa(traceSegmentId)}-${spanId}-${btoa(parentService)}-${btoa(parentInstance)}-${btoa(parentEndpoint)}-${btoa('0.0.0.0:0')}`;
      
      const headers = new Headers({ 'sw8': sw8Header });

      const response = await fetch(req.url, {
        method: req.method,
        body: req.body,
        headers: headers
      });

      if (!response.ok) {
        // 如果后端返回非2xx,这是一个业务逻辑错误,不应重试
        console.error(`Request ${req.id} failed with status ${response.status}. It will not be retried.`);
        await db.delete(STORE_NAME, req.id); // 从队列中删除
      } else {
        console.log(`Request ${req.id} successfully sent.`);
        await db.delete(STORE_NAME, req.id); // 成功后删除
      }
    } catch (error) {
      // 网络错误或其他fetch异常,任务会保留在队列中,等待下一次sync事件
      console.error(`Network error for request ${req.id}. It will be retried later.`, error);
      // 中断循环,等待下一次sync
      break; 
    }
  }
}

// 简化的唯一ID生成器
function generateTraceId() {
  return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}

一个常见的错误是:在fetch拦截器中直接尝试发送请求,如果失败再存入队列。这种做法在弱网环境下会导致大量不必要的失败尝试,并可能因为页面刷新而丢失请求。正确的做法是“先入队,后发送”。

2. Java后端:接收Trace并跨越消息队列

我们的后端是基于Spring Boot的。首先确保项目中已引入SkyWalking Agent的依赖,并配置好启动参数。

API网关/业务服务

这个服务接收来自Service Worker的请求。好消息是,只要sw8头存在,SkyWalking Java Agent会自动解析它,并创建或延续一个追踪链路。我们几乎不需要为此编写任何代码。

// CvTaskController.java
@RestController
@RequestMapping("/api/cv")
@Slf4j
public class CvTaskController {

    @Autowired
    private CvTaskService cvTaskService;

    @PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public ResponseEntity<Map<String, String>> submitCvTask(@RequestParam("image") MultipartFile image) {
        // SkyWalking Agent 在进入此方法前已自动处理了 sw8 header
        // 当前线程已处于一个被追踪的上下文中
        log.info("Received new CV task submission.");

        try {
            String taskId = cvTaskService.createAndDispatchTask(image);
            log.info("CV task created with ID: {}", taskId);
            return ResponseEntity.accepted().body(Map.of("taskId", taskId));
        } catch (IOException e) {
            log.error("Failed to process uploaded image.", e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                                 .body(Map.of("error", "Failed to process image file."));
        }
    }
}

ORM实体与服务层

使用JPA定义我们的任务实体。

// CvTask.java
@Entity
@Table(name = "cv_tasks")
@Data
public class CvTask {

    @Id
    @GeneratedValue(generator = "uuid2")
    @GenericGenerator(name = "uuid2", strategy = "org.hibernate.id.UUIDGenerator")
    private String id;

    private String originalFilename;
    private String storagePath;
    
    @Enumerated(EnumType.STRING)
    private TaskStatus status;

    private String result;

    @CreationTimestamp
    private Instant createdAt;

    @UpdateTimestamp
    private Instant updatedAt;

    public enum TaskStatus {
        PENDING, PROCESSING, COMPLETED, FAILED
    }
}

服务层的关键在于,在将任务发送到消息队列之前,必须捕获当前的SkyWalking追踪上下文,并将其附加到消息中。这是打通链路的第二个关键点。

// CvTaskService.java
@Service
@Slf4j
public class CvTaskService {

    @Autowired
    private CvTaskRepository taskRepository;
    
    // 假设使用KafkaTemplate,其他MQ客户端同理
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String CV_TOPIC = "cv-analysis-topic";

    @Transactional
    public String createAndDispatchTask(MultipartFile image) throws IOException {
        // 1. 保存文件(此处省略具体实现)
        String storagePath = saveImageToFileSystem(image);

        // 2. 使用ORM创建任务记录
        CvTask task = new CvTask();
        task.setOriginalFilename(image.getOriginalFilename());
        task.setStoragePath(storagePath);
        task.setStatus(CvTask.TaskStatus.PENDING);
        CvTask savedTask = taskRepository.save(task);
        log.info("Task {} saved to DB with status PENDING.", savedTask.getId());

        // 3. 准备消息
        CvTaskMessage message = new CvTaskMessage(savedTask.getId(), savedTask.getStoragePath());
        
        // 4. 核心:捕获并传递SkyWalking上下文
        // SkyWalking的 Kafka-plugin 会自动完成这个过程,但为了展示原理,我们手动演示
        // 在真实项目中,确保引入了 skywalking-kafka-plugin 依赖即可。
        // 如果是自定义或不支持的MQ,则需要手动操作:
        String traceContext = ContextManager.getGlobalTraceId(); // 获取Trace ID
        ProducerRecord<String, String> record = new ProducerRecord<>(CV_TOPIC, GSON.toJson(message));
        // 将traceId注入到消息头中
        record.headers().add("sw8-trace-id", traceContext.getBytes(StandardCharsets.UTF_8)); 
        
        log.info("Dispatching task {} to Kafka. Trace context attached.", savedTask.getId());
        kafkaTemplate.send(record);

        return savedTask.getId();
    }
    
    // ... 省略文件保存等辅助方法
}

这里的坑在于:很多MQ的客户端插件(如Spring Cloud Stream)在发送消息时会创建新的线程,这会导致ThreadLocal中存储的SkyWalking上下文丢失。必须使用SkyWalking官方提供的插件(如apm-kafka-plugin),或者像上面那样手动捕获并注入TraceContext,然后在消费端手动恢复。幸运的是,主流MQ的官方插件已经帮我们处理好了这一切。

3. CV处理服务:恢复Trace并完成闭环

这个服务消费消息,执行CV分析,并更新数据库状态。

// CvTaskConsumer.java
@Service
@Slf4j
public class CvTaskConsumer {

    @Autowired
    private CvTaskRepository taskRepository;
    
    // 假设这是我们的CV分析引擎
    @Autowired
    private CvAnalysisEngine analysisEngine;

    @KafkaListener(topics = "cv-analysis-topic", groupId = "cv-processor-group")
    public void handleCvTask(String messageJson, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        // SkyWalking的Kafka插件会自动从消息头中读取上下文,并在此线程中恢复它。
        // 所以从这里开始的所有日志和下游调用都会被关联到同一个Trace。
        log.info("Consumed CV task message.");
        CvTaskMessage message = GSON.fromJson(messageJson, CvTaskMessage.class);

        // 1. 更新ORM实体状态为PROCESSING
        CvTask task = taskRepository.findById(message.getTaskId()).orElse(null);
        if (task == null) {
            log.error("Task {} not found in DB. Discarding message.", message.getTaskId());
            return;
        }
        task.setStatus(CvTask.TaskStatus.PROCESSING);
        taskRepository.save(task);
        log.info("Task {} status updated to PROCESSING.", task.getId());

        try {
            // 2. 模拟耗时的CV处理
            // 这一步的耗时和内部调用都会被SkyWalking监控
            String result = analysisEngine.analyze(task.getStoragePath());

            // 3. 使用ORM更新最终结果
            task.setResult(result);
            task.setStatus(CvTask.TaskStatus.COMPLETED);
            taskRepository.save(task);
            log.info("Task {} processing completed successfully.", task.getId());

        } catch (Exception e) {
            log.error("CV analysis failed for task {}.", task.getId(), e);
            task.setStatus(CvTask.TaskStatus.FAILED);
            task.setResult(e.getMessage());
            taskRepository.save(task);
        }
    }
}

现在,当我们在SkyWalking的UI中搜索由Service Worker生成的traceId时,我们将看到一条完整的链路:

  1. 一个起始于PWA::ServiceWorkerLocalSpan
  2. 一个EntrySpan,表示请求进入API网关(CvTaskController)。
  3. 一个ExitSpan,表示业务服务向Kafka发送消息。
  4. 一个EntrySpan,表示CV处理服务从Kafka消费消息。
  5. 在CV服务内部的多个LocalSpan,对应数据库的读写操作。
  6. 所有这些Span都属于同一个Trace ID,并按时间线和父子关系正确地组织在一起。

方案的局限性与未来展望

这套架构解决了最初的韧性和可观测性问题,但在真实项目中,它并非银弹。

首先,前端的追踪仍然是“半手动”的。我们只记录了从Service Worker发出请求那一刻开始的链路。用户在页面上的交互、数据在IndexedDB中等待的时间等,都没有被包含进来。要实现真正的前端到后端(Real User Monitoring)的完整链路,需要引入像OpenTelemetry JS这样的标准化前端监控库,它可以更精细地捕获浏览器端的性能指标并与后端Trace关联。

其次,后台同步(Background Sync)的调度是由浏览器决定的,我们无法精确控制其执行时机,对于有时效性要求的任务可能不适用。更复杂的场景可能需要结合Web Push API,由服务器在网络恢复时主动通知客户端开始上传。

最后,ORM在此架构中虽然工作良好,但在极高并发的场景下,对任务状态的频繁更新可能会成为数据库瓶颈。对于状态流转,未来可以探索使用事件溯源(Event Sourcing)模式,将状态变更记录为一系列事件,从而提高写入性能和系统的可审计性。


  目录