我们团队最近接手了一个棘手的任务:构建一个NLP数据标注工具,但要求是多名标注员可以像在Google Docs里一样,同时对一份文档进行实体标注、关系抽取等操作,所有人的修改都必须实时同步,且不能有冲突。传统的基于锁或“最后写入为准”的策略在这种高频交互场景下体验极差,甚至会导致数据丢失。这问题直接把我们引向了CRDT(无冲突复制数据类型)这个领域。
最初的构想很简单,但很快就发现魔鬼全在细节里。我们需要一个能稳定处理大量长连接的后端,一个能优雅响应复杂状态变化的前端,以及一套能在异构语言(后端的JVM和前端的JavaScript)之间无缝工作的CRDT实现。这趟技术探索之旅最终的选型是:后端采用Scala + Akka Streams,前端是React + MobX。这套组合听起来有些非主流,但它所带来的开发体验和最终性能,简直是为这个场景量身定做的。
技术选型决策:为何是Scala、Akka与MobX?
放弃主流的Node.js或Spring WebFlux,选择Scala和Akka,核心考量在于对并发和状态管理的极致追求。一个协同编辑会话,本质上是一个有状态的、长生命周期的流式处理过程。Akka Streams对这种场景的抽象能力无人能及。它提供的背压机制(back-pressure)和图状DSL,能让我们以一种声明式的方式构建出健壮、可预测的数据流管道,轻松处理WebSocket连接的建立、消息处理和断开。更重要的是,Akka Actor模型非常适合管理每个协同文档的独立状态,天然隔离,避免了多线程编程中复杂的锁和同步问题。
前端方面,当协同状态变得复杂(文档内容、多人光标位置、标注集、NLP实时分析结果),Redux那套模板化的代码和手动订阅更新的模式会变得异常繁琐。MobX的魔法就在于“透明的函数式响应编程”(TFRP)。你只需要把状态标记为observable,在修改它时使用action,任何依赖这些状态的React组件就会自动、精准地重新渲染。这种“修改状态,UI自动更新”的模式,将我们从繁琐的状态同步逻辑中解放出来,可以更专注于CRDT算法和业务逻辑本身。
核心架构:事件流与CRDT状态机
整个系统的核心是一个基于WebSocket的事件流管道。用户在前端的任何操作(插入/删除字符、添加/删除标注)都会被转换成一个CRDT操作对象,发送到后端。后端处理这个操作,更新文档的CRDT模型,然后将这个操作广播给同一会话中的所有其他客户端。客户端收到操作后,应用到本地的CRDT模型副本上。由于CRDT的数学特性(交换律、结合律、幂等性),无论这些操作以何种顺序到达,最终所有客户端的状态都会收敛到一致。
sequenceDiagram
participant UserA as Client A (MobX)
participant Server as Scala Backend (Akka)
participant UserB as Client B (MobX)
UserA->>Server: 1. Keystroke 'H' (generates InsertOp)
Server->>Server: 2. Apply InsertOp to document's CRDT state
Server-->>UserA: 3. Ack (optional)
Server-->>UserB: 4. Broadcast InsertOp
UserB->>Server: 5. Keystroke 'i' (generates InsertOp)
Server->>Server: 6. Apply InsertOp to document's CRDT state
Server-->>UserB: 7. Ack (optional)
Server-->>UserA: 8. Broadcast InsertOp
Note over UserA, UserB: Both apply received Ops to local CRDT.
Note over UserA, UserB: Due to CRDT properties, final state is "Hi", not "iH".
后端实现:Akka Streams的威力
后端的入口是一个Akka HTTP路由,它负责将HTTP请求升级为WebSocket连接。最酷的部分在于,每个WebSocket连接都可以被建模为一个Flow[Message, Message, Any]。
我们的设计是,为每一个文档会话(docId)启动一个专用的Akka Actor (DocumentSessionActor)。这个Actor持有该文档的CRDT状态,并负责处理所有相关的操作。客户端连接上来后,会被路由到对应的Actor。
1. WebSocket路由与Actor集成
// build.sbt dependencies
// "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0",
// "com.typesafe.akka" %% "akka-stream-typed" % "2.8.0",
// "com.typesafe.akka" %% "akka-http" % "10.5.0",
// "io.circe" %% "circe-core" % "0.14.5",
// "io.circe" %% "circe-generic" % "0.14.5",
// "io.circe" %% "circe-parser" % "0.14.5",
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.typed.scaladsl.ActorSource
import akka.stream.{Materializer, OverflowStrategy}
import io.circe.parser.decode
import io.circe.syntax._
// Simplified CRDT Operations Protocol
sealed trait CrdtOperation
case class InsertOp(char: Char, position: List[Int], siteId: String) extends CrdtOperation
case class DeleteOp(position: List[Int], siteId: String) extends CrdtOperation
case class AnnotateOp(startPos: List[Int], endPos: List[Int], label: String, siteId: String) extends CrdtOperation
// Protocol for communication with Actor
object DocumentSessionActor {
sealed trait Command
final case class UserJoined(userWs: ActorRef[CrdtOperation]) extends Command
final case class UserLeft(userWs: ActorRef[CrdtOperation]) extends Command
final case class OperationReceived(op: CrdtOperation, from: ActorRef[CrdtOperation]) extends Command
}
class CollaborationRoutes(implicit system: ActorSystem[_], mat: Materializer) {
// A simplified map to hold actors for active documents.
// In a real app, this would be managed by a cluster-aware entity manager.
private val sessionActors: collection.mutable.Map[String, ActorRef[DocumentSessionActor.Command]] = collection.mutable.Map()
private def getOrCreateSessionActor(docId: String): ActorRef[DocumentSessionActor.Command] = {
sessionActors.getOrElseUpdate(docId, {
system.log.info(s"Creating new session actor for document: $docId")
// Here you would spawn a new DocumentSessionActor
// For simplicity, we assume it's created elsewhere and referenced here.
???
})
}
def collaborationFlow(docId: String, siteId: String): Flow[Message, Message, Any] = {
val sessionActor = getOrCreateSessionActor(docId)
// The source of messages comes from the actor, which broadcasts operations.
val source: Source[Message, ActorRef[CrdtOperation]] =
ActorSource.actorRef[CrdtOperation](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize = 256,
overflowStrategy = OverflowStrategy.dropHead
).mapMaterializedValue { userWsActor =>
// This is the magic link. When the stream materializes,
// we get an ActorRef representing the WebSocket connection.
// We send this ref to our session manager actor.
sessionActor ! DocumentSessionActor.UserJoined(userWsActor)
userWsActor
}.map(op => TextMessage(op.asJson.noSpaces)) // Encode outgoing ops to JSON
// The sink of messages processes incoming messages from the client.
val sink: Sink[Message, Any] =
Flow[Message]
.collect { case tm: TextMessage => tm.getStrictText }
.map { jsonString =>
// Decode JSON string to our CRDT operation type
decode[CrdtOperation](jsonString) match {
case Right(op) => Some(op)
case Left(error) =>
system.log.error(s"Failed to decode CRDT operation: $error")
None
}
}
.collect { case Some(op) => op }
.to(Sink.foreach { op =>
// How to get the userWsActor ref here to send to the sessionActor?
// This is a classic Akka Streams challenge. We solve it differently.
})
// The correct way to create a Flow that has access to its materialized value.
Flow.fromSinkAndSourceCoupled(sink, source)
.watchTermination() { (userWsActor, done) =>
// When the stream terminates (client disconnects), unregister from the actor.
done.onComplete(_ => sessionActor ! DocumentSessionActor.UserLeft(userWsActor))
userWsActor
}
// A more robust implementation coupling sink and source
def createCoupledFlow(sessionActor: ActorRef[DocumentSessionActor.Command]): Flow[Message, Message, Any] = {
val (outgoingActorRef, source) = ActorSource
.actorRef[CrdtOperation](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize = 256,
overflowStrategy = OverflowStrategy.dropHead
)
.map(op => TextMessage(op.asJson.noSpaces))
.preMaterialize()
sessionActor ! DocumentSessionActor.UserJoined(outgoingActorRef)
val sink = Flow[Message]
.collect { case tm: TextMessage => tm.getStrictText }
.map(decode[CrdtOperation])
.collect { case Right(op) => op }
.to(Sink.foreach(op => sessionActor ! DocumentSessionActor.OperationReceived(op, outgoingActorRef)))
Flow.fromSinkAndSource(sink, source).watchTermination() { (_, done) =>
done.onComplete(_ => sessionActor ! DocumentSessionActor.UserLeft(outgoingActorRef))
}
}
createCoupledFlow(sessionActor)
}
val routes =
path("ws" / "docs" / Segment) { docId =>
parameter("siteId") { siteId =>
handleWebSocketMessages(collaborationFlow(docId, siteId))
}
}
}
2. CRDT与NLP处理逻辑
DocumentSessionActor内部维护了文档的当前状态。为了简化,我们假设一个基于位置标识符的序列CRDT(类似LSEQ)。每个字符不仅仅是字符,它还拥有一个在文档中唯一且有序的位置标识符。
// A simplified CRDT character representation
case class CrdtChar(char: Char, position: List[Int], siteId: String)
// Inside DocumentSessionActor
class DocumentSessionActor(docId: String, nlpService: NlpService) extends /* ... Akka Actor behavior ... */ {
private var participants: Set[ActorRef[CrdtOperation]] = Set.empty
private var document: List[CrdtChar] = List.empty // The state
// When an OperationReceived(op, from) is received:
// 1. Validate the operation.
// 2. Apply the operation to the `document` list. This involves complex logic
// for finding the correct insertion point or character to delete based on `position`.
// This is the core of the CRDT algorithm.
// 3. Broadcast the operation to all other participants.
// participants.filterNot(_ == from).foreach(_ ! op)
// 4. Trigger NLP analysis asynchronously.
private def applyOperation(op: CrdtOperation): Unit = {
// ... CRDT logic here ...
val documentChanged = true // based on op
if (documentChanged) {
val currentText = document.map(_.char).mkString
nlpService.analyze(currentText).onComplete {
case Success(annotations) =>
// Create AnnotateOp from NLP results and broadcast them.
// This requires converting character offsets to CRDT positions.
case Failure(e) => log.error("NLP analysis failed", e)
}
}
}
}
这里的挑战在于,NLP服务返回的实体位置是基于字符偏移量的(例如,第5到第10个字符),但我们的协同状态是基于CRDT位置标识符的。因此,在广播NLP结果前,必须将这些偏移量精确地转换回CRDT的位置标识符,这是一个非平凡的映射过程。
前端实现:MobX的响应式魔法
前端的DocumentStore是所有交互的核心。它负责维护WebSocket连接,持有文档的CRDT状态,并提供actions来修改这个状态。
1. DocumentStore的结构
// A simplified CRDT library for JS would be used here.
// For example, something that provides `generatePositionBetween()`.
import { makeAutoObservable, runInAction } from "mobx";
// Represents a character in the CRDT model
class CrdtChar {
constructor(char, position, siteId) {
this.char = char;
this.position = position;
this.siteId = siteId;
this.id = `${siteId}-${position.join('.')}`; // Unique key for React
}
}
export class DocumentStore {
// --- Observables: MobX will track changes to these ---
document = []; // Array of CrdtChar objects
annotations = []; // Array of { startPos, endPos, label }
connectionStatus = "disconnected";
siteId = Math.random().toString(36).substring(2);
ws = null;
constructor(docId) {
makeAutoObservable(this);
this.connect(docId);
}
// --- Actions: Methods that modify the state ---
connect(docId) {
if (this.ws) this.ws.close();
this.ws = new WebSocket(`ws://localhost:8080/ws/docs/${docId}?siteId=${this.siteId}`);
this.connectionStatus = "connecting";
this.ws.onopen = () => {
runInAction(() => {
this.connectionStatus = "connected";
console.log("WebSocket connection established.");
});
};
this.ws.onmessage = (event) => {
const op = JSON.parse(event.data);
this.applyRemoteOperation(op);
};
this.ws.onclose = () => {
runInAction(() => {
this.connectionStatus = "disconnected";
console.log("WebSocket connection closed.");
});
};
this.ws.onerror = (error) => {
console.error("WebSocket error:", error);
runInAction(() => this.connectionStatus = "error");
};
}
// Action triggered by a remote operation from the server
applyRemoteOperation(op) {
// This is where the magic happens. We just modify the observable array,
// and any React component using it will automatically update.
if (op.type === "InsertOp") {
const newChar = new CrdtChar(op.char, op.position, op.siteId);
// Find insert index based on CRDT position comparison
const index = this._findInsertIndex(op.position);
this.document.splice(index, 0, newChar);
} else if (op.type === "DeleteOp") {
this.document = this.document.filter(
char => JSON.stringify(char.position) !== JSON.stringify(op.position)
);
} else if (op.type === "AnnotateOp") {
// Logic to add or update annotations
this.annotations.push(op.annotation);
}
}
// Action triggered by local user input in the editor
localInsert(char, index) {
// 1. Determine the CRDT position for the new character
const prevPos = this.document[index - 1]?.position || []; // Simplified boundary
const nextPos = this.document[index]?.position || []; // Simplified boundary
const newPosition = generatePositionBetween(prevPos, nextPos, this.siteId); // CRDT magic
// 2. Create the operation object
const op = { type: "InsertOp", char, position: newPosition, siteId: this.siteId };
// 3. Send to server
if (this.connectionStatus === "connected") {
this.ws.send(JSON.stringify(op));
} else {
// In a real app, queue this operation for when connection is restored
console.warn("Not connected, operation not sent.");
}
// 4. Optimistic local update. This is why the UI feels instant.
const newChar = new CrdtChar(char, newPosition, this.siteId);
this.document.splice(index, 0, newChar);
}
// A helper function for finding insertion point
_findInsertIndex(position) {
// This requires a lexicographical comparison of position arrays.
// ... implementation omitted for brevity ...
return 0; // Placeholder
}
// --- Computed value: Derives data from state ---
get text() {
return this.document.map(c => c.char).join("");
}
}
2. React Component Integration
在React组件中,我们使用mobx-react-lite的observer函数来包裹组件。这使得组件能够响应DocumentStore中observable状态的变化。
import React from 'react';
import { observer } from 'mobx-react-lite';
import { DocumentStore } from './DocumentStore';
// Assume we have an instance of the store
const docStore = new DocumentStore('doc-123');
const Editor = observer(() => {
// This component will re-render whenever `docStore.text` or `docStore.connectionStatus` changes.
const handleTextInput = (e) => {
// Complex logic to convert textarea changes into insert/delete operations
// For simplicity, let's assume we just handle single character insertion.
const char = e.nativeEvent.data;
const index = e.target.selectionStart - 1;
docStore.localInsert(char, index);
};
return (
<div>
<div className={`status ${docStore.connectionStatus}`}>
Status: {docStore.connectionStatus}
</div>
{/* This is a very simplified representation. A real editor would use Slate.js or ProseMirror */}
<textarea
value={docStore.text} // The computed value provides the text
onInput={handleTextInput}
style={{ width: '100%', height: '300px' }}
/>
<div>
<h3>Annotations (from NLP)</h3>
<ul>
{docStore.annotations.map((anno, i) => <li key={i}>{anno.label}</li>)}
</ul>
</div>
</div>
);
});
export default Editor;
这彻底改变了工作方式。我们不再需要手动管理订阅、分发更新、或者用useEffect来同步状态。MobX为我们处理了所有这些。当applyRemoteOperation修改this.document数组时,Editor组件的textarea的value会自动更新,几乎没有任何我们自己写的胶水代码。这种开发体验流畅得令人难以置信。
遗留问题与未来迭代
这个实现虽然验证了核心架构的可行性,但离生产环境还有距离。首先,我们使用的CRDT算法是高度简化的。一个生产级的序列CRDT(如Logoot、LSEQ或YATA)实现要复杂得多,需要处理标识符的分配策略、防止树结构爆炸性增长等问题。
其次,Scala后端DocumentSessionActor的内存占用是个潜在瓶颈。如果一个文档变得非常大,或者协同会话持续很久,Actor的状态会持续增长。未来的优化路径可能包括定期对文档状态进行快照,以及对操作日志进行垃圾回收。
最后,NLP分析与CRDT操作的耦合目前是同步的。对于耗时较长的NLP模型,这种同步调用会阻塞操作广播,影响实时性。一个更优的架构是将NLP分析解耦为一个独立的服务。DocumentSessionActor在应用变更后,发布一个DocumentUpdated事件到消息队列,由专门的NLP消费者服务进行处理,处理完后再通过一个独立的通道将结果推送回前端。这能确保核心编辑协同的低延迟不受重量级计算任务的影响。