技术痛点:消失的请求与失控的链路
我们面临一个棘手的生产问题。一个用于现场质检的Web应用,用户通过平板在车间内拍摄零件照片并上传进行CV(计算机视觉)分析。车间的网络环境极不稳定,WiFi信号时断时续。业务方反馈大量“提交失败”和“任务状态未知”的工单。从后端监控看,我们只看到零星的请求入口,根本无法与用户报告的失败案例对应。整个流程,从用户点击上传那一刻,到CV服务处理完毕,对我们来说是一个巨大的黑盒。定位一个请求究竟是在用户设备上就失败了,还是在后端某个环节卡住了,耗时巨大且全凭猜测。
初步诊断指向两个核心矛盾:
- 前端韧性缺失:传统的HTTP请求在弱网环境下就是一场灾难。一次网络抖动就足以让一次几十MB的图片上传失败,且无法自动恢复。
- 后端可观测性黑洞:我们的Java后端由一个API网关、一个业务逻辑服务和一个独立的CV处理服务构成。服务间通过消息队列解耦,这在提高吞吐量的同时也切断了传统的调用链。一个请求的完整生命周期被分割在不同服务的日志文件中,无法串联。SkyWalking虽然部署了,但其追踪数据在跨越消息队列时出现了断裂,无法形成完整的调用链路。
目标很明确:构建一个既能抵抗网络波动,又能提供端到端可观测性的新架构。
初步构想与架构演进
我们的方案必须同时解决前端的“韧性”和后端的“可观测性”。
- 前端改造:引入Service Worker。利用其拦截网络请求和后台同步(Background Sync)的能力,将一次性的上传操作变为一个可保证最终送达的异步任务。即使用户关闭了浏览器,只要网络恢复,Service Worker就能在后台重新尝试上传。
- 后端打通:全链路追踪。关键在于如何将一个在浏览器中发起的操作,与其在后端分布式系统中的完整轨迹关联起来。我们需要一个全局唯一的Trace ID,它必须从Service Worker产生,一路透传到API网关、业务服务、消息队列,最终抵达CV处理服务。SkyWalking是实现这一目标的核心工具,但需要我们手动配合,确保追踪上下文(Trace Context)在跨越边界(浏览器->服务器,生产者->消费者)时不丢失。
ORM(我们使用JPA/Hibernate)在其中扮演着“状态记录器”的角色,它需要持久化每一个CV任务的元数据和处理状态,为异步查询和问题排查提供数据基础。
演进后的架构如下:
sequenceDiagram
participant PWA as 浏览器PWA
participant SW as Service Worker
participant GW as API网关 (Java)
participant MQ as 消息队列
participant Biz as 业务服务 (Java)
participant DB as 数据库
PWA->>+SW: 用户发起上传(携带图片)
Note right of SW: 1. 拦截请求
2. 生成全局Trace ID
3. 将任务存入IndexedDB
SW->>PWA: 立即返回“已接收”
SW->>SW: 注册后台同步事件('upload-task')
alt 网络正常
SW->>+GW: 发起fetch请求 (Header携带Trace ID)
else 网络异常
Note right of SW: 请求失败,等待sync事件
end
Note over SW, GW: 网络恢复后,sync事件触发...
SW->>+GW: 重新发起fetch请求 (Header携带Trace ID)
GW->>+Biz: 转发请求 (SkyWalking自动传递Trace Context)
Biz->>+DB: ORM: 创建CV任务记录 (状态: PENDING)
DB-->>-Biz: 返回任务ID
Biz->>+MQ: 发送CV处理消息 (携带Trace Context)
MQ-->>-Biz: 确认接收
Biz-->>-GW: 返回任务ID
GW-->>-SW: HTTP 202 Accepted
Note left of SW: 清理IndexedDB中的任务
SW-->>-PWA: (可选) 通过Push API通知PWA
participant CV as CV处理服务 (Java)
MQ->>+CV: 消费消息 (恢复Trace Context)
Note right of CV: 开始耗时的CV分析
CV->>+DB: ORM: 更新任务状态 (COMPLETED/FAILED)
DB-->>-CV: 确认更新
CV-->>-MQ: 确认消费
步骤化实现:代码是架构的最终表达
1. Service Worker:韧性的基石
首先是service-worker.js的实现。这里的核心是fetch事件拦截和sync事件处理。
install和activate阶段是标准的PWA缓存逻辑,此处略过。
fetch事件拦截器
当PWA发起上传请求时,我们拦截它。我们不直接发送,而是将任务持久化到IndexedDB,然后注册一个后台同步任务。这是确保数据不丢失的关键。
// service-worker.js
import { openDB } from 'idb';
const DB_NAME = 'cv-request-queue';
const STORE_NAME = 'requests';
const dbPromise = openDB(DB_NAME, 1, {
upgrade(db) {
if (!db.objectStoreNames.contains(STORE_NAME)) {
db.createObjectStore(STORE_NAME, { keyPath: 'id', autoIncrement: true });
}
},
});
// 拦截所有指向 /api/cv/upload 的POST请求
self.addEventListener('fetch', (event) => {
const url = new URL(event.request.url);
if (url.pathname === '/api/cv/upload' && event.request.method === 'POST') {
event.respondWith(
handleUploadRequest(event.request)
);
}
});
async function handleUploadRequest(request) {
try {
const requestData = await request.clone().formData(); // 克隆请求体以供后续使用
const db = await dbPromise;
// 关键:将请求数据存入IndexedDB,而不是立即发送
await db.put(STORE_NAME, {
url: request.url,
method: request.method,
body: requestData,
timestamp: Date.now()
});
// 注册一个后台同步标签
await self.registration.sync.register('upload-task');
// 立即向前端返回一个成功的响应,表示任务已被接受
// 这里的响应体可以自定义,告知前端任务已进入后台队列
return new Response(JSON.stringify({ message: 'Request queued for background sync' }), {
status: 202, // Accepted
headers: { 'Content-Type': 'application/json' }
});
} catch (error) {
console.error('Failed to queue request:', error);
// 如果连入队都失败了,则返回错误
return new Response(JSON.stringify({ error: 'Failed to queue request' }), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}
sync事件处理器与Trace ID注入
当网络恢复且浏览器认为时机合适时,sync事件会被触发。在这里,我们从IndexedDB中读取所有待处理的请求并逐一发送。这是注入全链路追踪ID的最佳时机。我们将使用SkyWalking默认的sw8头格式。
// service-worker.js (续)
self.addEventListener('sync', (event) => {
if (event.tag === 'upload-task') {
event.waitUntil(processRequestQueue());
}
});
async function processRequestQueue() {
const db = await dbPromise;
const requests = await db.getAll(STORE_NAME);
if (!requests.length) {
return;
}
console.log(`Processing ${requests.length} queued requests...`);
for (const req of requests) {
try {
// 核心:生成并注入SkyWalking的sw8头
// 格式: 1-{traceId}-{traceSegmentId}-{spanId}-{parentService}-{parentInstance}-{parentEndpoint}-{clientIp}
const traceId = generateTraceId(); // 自定义一个足够唯一的ID生成器
const traceSegmentId = generateTraceId();
const spanId = 0; // 起始span
const parentService = 'PWA::ServiceWorker';
const parentInstance = self.registration.scope;
const parentEndpoint = '/sync/upload-task';
const sampled = 1; // 1表示采样
const sw8Header = `${sampled}-${btoa(traceId)}-${btoa(traceSegmentId)}-${spanId}-${btoa(parentService)}-${btoa(parentInstance)}-${btoa(parentEndpoint)}-${btoa('0.0.0.0:0')}`;
const headers = new Headers({ 'sw8': sw8Header });
const response = await fetch(req.url, {
method: req.method,
body: req.body,
headers: headers
});
if (!response.ok) {
// 如果后端返回非2xx,这是一个业务逻辑错误,不应重试
console.error(`Request ${req.id} failed with status ${response.status}. It will not be retried.`);
await db.delete(STORE_NAME, req.id); // 从队列中删除
} else {
console.log(`Request ${req.id} successfully sent.`);
await db.delete(STORE_NAME, req.id); // 成功后删除
}
} catch (error) {
// 网络错误或其他fetch异常,任务会保留在队列中,等待下一次sync事件
console.error(`Network error for request ${req.id}. It will be retried later.`, error);
// 中断循环,等待下一次sync
break;
}
}
}
// 简化的唯一ID生成器
function generateTraceId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
一个常见的错误是:在fetch拦截器中直接尝试发送请求,如果失败再存入队列。这种做法在弱网环境下会导致大量不必要的失败尝试,并可能因为页面刷新而丢失请求。正确的做法是“先入队,后发送”。
2. Java后端:接收Trace并跨越消息队列
我们的后端是基于Spring Boot的。首先确保项目中已引入SkyWalking Agent的依赖,并配置好启动参数。
API网关/业务服务
这个服务接收来自Service Worker的请求。好消息是,只要sw8头存在,SkyWalking Java Agent会自动解析它,并创建或延续一个追踪链路。我们几乎不需要为此编写任何代码。
// CvTaskController.java
@RestController
@RequestMapping("/api/cv")
@Slf4j
public class CvTaskController {
@Autowired
private CvTaskService cvTaskService;
@PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public ResponseEntity<Map<String, String>> submitCvTask(@RequestParam("image") MultipartFile image) {
// SkyWalking Agent 在进入此方法前已自动处理了 sw8 header
// 当前线程已处于一个被追踪的上下文中
log.info("Received new CV task submission.");
try {
String taskId = cvTaskService.createAndDispatchTask(image);
log.info("CV task created with ID: {}", taskId);
return ResponseEntity.accepted().body(Map.of("taskId", taskId));
} catch (IOException e) {
log.error("Failed to process uploaded image.", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("error", "Failed to process image file."));
}
}
}
ORM实体与服务层
使用JPA定义我们的任务实体。
// CvTask.java
@Entity
@Table(name = "cv_tasks")
@Data
public class CvTask {
@Id
@GeneratedValue(generator = "uuid2")
@GenericGenerator(name = "uuid2", strategy = "org.hibernate.id.UUIDGenerator")
private String id;
private String originalFilename;
private String storagePath;
@Enumerated(EnumType.STRING)
private TaskStatus status;
private String result;
@CreationTimestamp
private Instant createdAt;
@UpdateTimestamp
private Instant updatedAt;
public enum TaskStatus {
PENDING, PROCESSING, COMPLETED, FAILED
}
}
服务层的关键在于,在将任务发送到消息队列之前,必须捕获当前的SkyWalking追踪上下文,并将其附加到消息中。这是打通链路的第二个关键点。
// CvTaskService.java
@Service
@Slf4j
public class CvTaskService {
@Autowired
private CvTaskRepository taskRepository;
// 假设使用KafkaTemplate,其他MQ客户端同理
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String CV_TOPIC = "cv-analysis-topic";
@Transactional
public String createAndDispatchTask(MultipartFile image) throws IOException {
// 1. 保存文件(此处省略具体实现)
String storagePath = saveImageToFileSystem(image);
// 2. 使用ORM创建任务记录
CvTask task = new CvTask();
task.setOriginalFilename(image.getOriginalFilename());
task.setStoragePath(storagePath);
task.setStatus(CvTask.TaskStatus.PENDING);
CvTask savedTask = taskRepository.save(task);
log.info("Task {} saved to DB with status PENDING.", savedTask.getId());
// 3. 准备消息
CvTaskMessage message = new CvTaskMessage(savedTask.getId(), savedTask.getStoragePath());
// 4. 核心:捕获并传递SkyWalking上下文
// SkyWalking的 Kafka-plugin 会自动完成这个过程,但为了展示原理,我们手动演示
// 在真实项目中,确保引入了 skywalking-kafka-plugin 依赖即可。
// 如果是自定义或不支持的MQ,则需要手动操作:
String traceContext = ContextManager.getGlobalTraceId(); // 获取Trace ID
ProducerRecord<String, String> record = new ProducerRecord<>(CV_TOPIC, GSON.toJson(message));
// 将traceId注入到消息头中
record.headers().add("sw8-trace-id", traceContext.getBytes(StandardCharsets.UTF_8));
log.info("Dispatching task {} to Kafka. Trace context attached.", savedTask.getId());
kafkaTemplate.send(record);
return savedTask.getId();
}
// ... 省略文件保存等辅助方法
}
这里的坑在于:很多MQ的客户端插件(如Spring Cloud Stream)在发送消息时会创建新的线程,这会导致ThreadLocal中存储的SkyWalking上下文丢失。必须使用SkyWalking官方提供的插件(如apm-kafka-plugin),或者像上面那样手动捕获并注入TraceContext,然后在消费端手动恢复。幸运的是,主流MQ的官方插件已经帮我们处理好了这一切。
3. CV处理服务:恢复Trace并完成闭环
这个服务消费消息,执行CV分析,并更新数据库状态。
// CvTaskConsumer.java
@Service
@Slf4j
public class CvTaskConsumer {
@Autowired
private CvTaskRepository taskRepository;
// 假设这是我们的CV分析引擎
@Autowired
private CvAnalysisEngine analysisEngine;
@KafkaListener(topics = "cv-analysis-topic", groupId = "cv-processor-group")
public void handleCvTask(String messageJson, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// SkyWalking的Kafka插件会自动从消息头中读取上下文,并在此线程中恢复它。
// 所以从这里开始的所有日志和下游调用都会被关联到同一个Trace。
log.info("Consumed CV task message.");
CvTaskMessage message = GSON.fromJson(messageJson, CvTaskMessage.class);
// 1. 更新ORM实体状态为PROCESSING
CvTask task = taskRepository.findById(message.getTaskId()).orElse(null);
if (task == null) {
log.error("Task {} not found in DB. Discarding message.", message.getTaskId());
return;
}
task.setStatus(CvTask.TaskStatus.PROCESSING);
taskRepository.save(task);
log.info("Task {} status updated to PROCESSING.", task.getId());
try {
// 2. 模拟耗时的CV处理
// 这一步的耗时和内部调用都会被SkyWalking监控
String result = analysisEngine.analyze(task.getStoragePath());
// 3. 使用ORM更新最终结果
task.setResult(result);
task.setStatus(CvTask.TaskStatus.COMPLETED);
taskRepository.save(task);
log.info("Task {} processing completed successfully.", task.getId());
} catch (Exception e) {
log.error("CV analysis failed for task {}.", task.getId(), e);
task.setStatus(CvTask.TaskStatus.FAILED);
task.setResult(e.getMessage());
taskRepository.save(task);
}
}
}
现在,当我们在SkyWalking的UI中搜索由Service Worker生成的traceId时,我们将看到一条完整的链路:
- 一个起始于
PWA::ServiceWorker的LocalSpan。 - 一个
EntrySpan,表示请求进入API网关(CvTaskController)。 - 一个
ExitSpan,表示业务服务向Kafka发送消息。 - 一个
EntrySpan,表示CV处理服务从Kafka消费消息。 - 在CV服务内部的多个
LocalSpan,对应数据库的读写操作。 - 所有这些Span都属于同一个Trace ID,并按时间线和父子关系正确地组织在一起。
方案的局限性与未来展望
这套架构解决了最初的韧性和可观测性问题,但在真实项目中,它并非银弹。
首先,前端的追踪仍然是“半手动”的。我们只记录了从Service Worker发出请求那一刻开始的链路。用户在页面上的交互、数据在IndexedDB中等待的时间等,都没有被包含进来。要实现真正的前端到后端(Real User Monitoring)的完整链路,需要引入像OpenTelemetry JS这样的标准化前端监控库,它可以更精细地捕获浏览器端的性能指标并与后端Trace关联。
其次,后台同步(Background Sync)的调度是由浏览器决定的,我们无法精确控制其执行时机,对于有时效性要求的任务可能不适用。更复杂的场景可能需要结合Web Push API,由服务器在网络恢复时主动通知客户端开始上传。
最后,ORM在此架构中虽然工作良好,但在极高并发的场景下,对任务状态的频繁更新可能会成为数据库瓶颈。对于状态流转,未来可以探索使用事件溯源(Event Sourcing)模式,将状态变更记录为一系列事件,从而提高写入性能和系统的可审计性。