基于Akka Streams与MobX构建支持CRDT的实时协同NLP标注引擎


我们团队最近接手了一个棘手的任务:构建一个NLP数据标注工具,但要求是多名标注员可以像在Google Docs里一样,同时对一份文档进行实体标注、关系抽取等操作,所有人的修改都必须实时同步,且不能有冲突。传统的基于锁或“最后写入为准”的策略在这种高频交互场景下体验极差,甚至会导致数据丢失。这问题直接把我们引向了CRDT(无冲突复制数据类型)这个领域。

最初的构想很简单,但很快就发现魔鬼全在细节里。我们需要一个能稳定处理大量长连接的后端,一个能优雅响应复杂状态变化的前端,以及一套能在异构语言(后端的JVM和前端的JavaScript)之间无缝工作的CRDT实现。这趟技术探索之旅最终的选型是:后端采用Scala + Akka Streams,前端是React + MobX。这套组合听起来有些非主流,但它所带来的开发体验和最终性能,简直是为这个场景量身定做的。

技术选型决策:为何是Scala、Akka与MobX?

放弃主流的Node.js或Spring WebFlux,选择Scala和Akka,核心考量在于对并发和状态管理的极致追求。一个协同编辑会话,本质上是一个有状态的、长生命周期的流式处理过程。Akka Streams对这种场景的抽象能力无人能及。它提供的背压机制(back-pressure)和图状DSL,能让我们以一种声明式的方式构建出健壮、可预测的数据流管道,轻松处理WebSocket连接的建立、消息处理和断开。更重要的是,Akka Actor模型非常适合管理每个协同文档的独立状态,天然隔离,避免了多线程编程中复杂的锁和同步问题。

前端方面,当协同状态变得复杂(文档内容、多人光标位置、标注集、NLP实时分析结果),Redux那套模板化的代码和手动订阅更新的模式会变得异常繁琐。MobX的魔法就在于“透明的函数式响应编程”(TFRP)。你只需要把状态标记为observable,在修改它时使用action,任何依赖这些状态的React组件就会自动、精准地重新渲染。这种“修改状态,UI自动更新”的模式,将我们从繁琐的状态同步逻辑中解放出来,可以更专注于CRDT算法和业务逻辑本身。

核心架构:事件流与CRDT状态机

整个系统的核心是一个基于WebSocket的事件流管道。用户在前端的任何操作(插入/删除字符、添加/删除标注)都会被转换成一个CRDT操作对象,发送到后端。后端处理这个操作,更新文档的CRDT模型,然后将这个操作广播给同一会话中的所有其他客户端。客户端收到操作后,应用到本地的CRDT模型副本上。由于CRDT的数学特性(交换律、结合律、幂等性),无论这些操作以何种顺序到达,最终所有客户端的状态都会收敛到一致。

sequenceDiagram
    participant UserA as Client A (MobX)
    participant Server as Scala Backend (Akka)
    participant UserB as Client B (MobX)

    UserA->>Server: 1. Keystroke 'H' (generates InsertOp)
    Server->>Server: 2. Apply InsertOp to document's CRDT state
    Server-->>UserA: 3. Ack (optional)
    Server-->>UserB: 4. Broadcast InsertOp

    UserB->>Server: 5. Keystroke 'i' (generates InsertOp)
    Server->>Server: 6. Apply InsertOp to document's CRDT state
    Server-->>UserB: 7. Ack (optional)
    Server-->>UserA: 8. Broadcast InsertOp

    Note over UserA, UserB: Both apply received Ops to local CRDT.
    Note over UserA, UserB: Due to CRDT properties, final state is "Hi", not "iH".

后端实现:Akka Streams的威力

后端的入口是一个Akka HTTP路由,它负责将HTTP请求升级为WebSocket连接。最酷的部分在于,每个WebSocket连接都可以被建模为一个Flow[Message, Message, Any]

我们的设计是,为每一个文档会话(docId)启动一个专用的Akka Actor (DocumentSessionActor)。这个Actor持有该文档的CRDT状态,并负责处理所有相关的操作。客户端连接上来后,会被路由到对应的Actor。

1. WebSocket路由与Actor集成

// build.sbt dependencies
// "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0",
// "com.typesafe.akka" %% "akka-stream-typed" % "2.8.0",
// "com.typesafe.akka" %% "akka-http" % "10.5.0",
// "io.circe" %% "circe-core" % "0.14.5",
// "io.circe" %% "circe-generic" % "0.14.5",
// "io.circe" %% "circe-parser" % "0.14.5",

import akka.actor.typed.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.typed.scaladsl.ActorSource
import akka.stream.{Materializer, OverflowStrategy}
import io.circe.parser.decode
import io.circe.syntax._

// Simplified CRDT Operations Protocol
sealed trait CrdtOperation
case class InsertOp(char: Char, position: List[Int], siteId: String) extends CrdtOperation
case class DeleteOp(position: List[Int], siteId: String) extends CrdtOperation
case class AnnotateOp(startPos: List[Int], endPos: List[Int], label: String, siteId: String) extends CrdtOperation

// Protocol for communication with Actor
object DocumentSessionActor {
  sealed trait Command
  final case class UserJoined(userWs: ActorRef[CrdtOperation]) extends Command
  final case class UserLeft(userWs: ActorRef[CrdtOperation]) extends Command
  final case class OperationReceived(op: CrdtOperation, from: ActorRef[CrdtOperation]) extends Command
}

class CollaborationRoutes(implicit system: ActorSystem[_], mat: Materializer) {

  // A simplified map to hold actors for active documents.
  // In a real app, this would be managed by a cluster-aware entity manager.
  private val sessionActors: collection.mutable.Map[String, ActorRef[DocumentSessionActor.Command]] = collection.mutable.Map()

  private def getOrCreateSessionActor(docId: String): ActorRef[DocumentSessionActor.Command] = {
    sessionActors.getOrElseUpdate(docId, {
      system.log.info(s"Creating new session actor for document: $docId")
      // Here you would spawn a new DocumentSessionActor
      // For simplicity, we assume it's created elsewhere and referenced here.
      ??? 
    })
  }
  
  def collaborationFlow(docId: String, siteId: String): Flow[Message, Message, Any] = {
    val sessionActor = getOrCreateSessionActor(docId)

    // The source of messages comes from the actor, which broadcasts operations.
    val source: Source[Message, ActorRef[CrdtOperation]] =
      ActorSource.actorRef[CrdtOperation](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        bufferSize = 256,
        overflowStrategy = OverflowStrategy.dropHead
      ).mapMaterializedValue { userWsActor =>
        // This is the magic link. When the stream materializes,
        // we get an ActorRef representing the WebSocket connection.
        // We send this ref to our session manager actor.
        sessionActor ! DocumentSessionActor.UserJoined(userWsActor)
        userWsActor
      }.map(op => TextMessage(op.asJson.noSpaces)) // Encode outgoing ops to JSON

    // The sink of messages processes incoming messages from the client.
    val sink: Sink[Message, Any] =
      Flow[Message]
        .collect { case tm: TextMessage => tm.getStrictText }
        .map { jsonString =>
          // Decode JSON string to our CRDT operation type
          decode[CrdtOperation](jsonString) match {
            case Right(op) => Some(op)
            case Left(error) =>
              system.log.error(s"Failed to decode CRDT operation: $error")
              None
          }
        }
        .collect { case Some(op) => op }
        .to(Sink.foreach { op =>
            // How to get the userWsActor ref here to send to the sessionActor?
            // This is a classic Akka Streams challenge. We solve it differently.
        })
    
    // The correct way to create a Flow that has access to its materialized value.
    Flow.fromSinkAndSourceCoupled(sink, source)
      .watchTermination() { (userWsActor, done) =>
        // When the stream terminates (client disconnects), unregister from the actor.
        done.onComplete(_ => sessionActor ! DocumentSessionActor.UserLeft(userWsActor))
        userWsActor
      }

    // A more robust implementation coupling sink and source
    def createCoupledFlow(sessionActor: ActorRef[DocumentSessionActor.Command]): Flow[Message, Message, Any] = {
      val (outgoingActorRef, source) = ActorSource
        .actorRef[CrdtOperation](
          completionMatcher = PartialFunction.empty,
          failureMatcher = PartialFunction.empty,
          bufferSize = 256,
          overflowStrategy = OverflowStrategy.dropHead
        )
        .map(op => TextMessage(op.asJson.noSpaces))
        .preMaterialize()

      sessionActor ! DocumentSessionActor.UserJoined(outgoingActorRef)

      val sink = Flow[Message]
        .collect { case tm: TextMessage => tm.getStrictText }
        .map(decode[CrdtOperation])
        .collect { case Right(op) => op }
        .to(Sink.foreach(op => sessionActor ! DocumentSessionActor.OperationReceived(op, outgoingActorRef)))

      Flow.fromSinkAndSource(sink, source).watchTermination() { (_, done) =>
        done.onComplete(_ => sessionActor ! DocumentSessionActor.UserLeft(outgoingActorRef))
      }
    }

    createCoupledFlow(sessionActor)
  }

  val routes =
    path("ws" / "docs" / Segment) { docId =>
      parameter("siteId") { siteId =>
        handleWebSocketMessages(collaborationFlow(docId, siteId))
      }
    }
}

2. CRDT与NLP处理逻辑

DocumentSessionActor内部维护了文档的当前状态。为了简化,我们假设一个基于位置标识符的序列CRDT(类似LSEQ)。每个字符不仅仅是字符,它还拥有一个在文档中唯一且有序的位置标识符。

// A simplified CRDT character representation
case class CrdtChar(char: Char, position: List[Int], siteId: String)

// Inside DocumentSessionActor
class DocumentSessionActor(docId: String, nlpService: NlpService) extends /* ... Akka Actor behavior ... */ {
  private var participants: Set[ActorRef[CrdtOperation]] = Set.empty
  private var document: List[CrdtChar] = List.empty // The state

  // When an OperationReceived(op, from) is received:
  // 1. Validate the operation.
  // 2. Apply the operation to the `document` list. This involves complex logic
  //    for finding the correct insertion point or character to delete based on `position`.
  //    This is the core of the CRDT algorithm.
  // 3. Broadcast the operation to all other participants.
  //    participants.filterNot(_ == from).foreach(_ ! op)
  // 4. Trigger NLP analysis asynchronously.
  
  private def applyOperation(op: CrdtOperation): Unit = {
    // ... CRDT logic here ...
    val documentChanged = true // based on op
    if (documentChanged) {
        val currentText = document.map(_.char).mkString
        nlpService.analyze(currentText).onComplete {
            case Success(annotations) => 
                // Create AnnotateOp from NLP results and broadcast them.
                // This requires converting character offsets to CRDT positions.
            case Failure(e) => log.error("NLP analysis failed", e)
        }
    }
  }
}

这里的挑战在于,NLP服务返回的实体位置是基于字符偏移量的(例如,第5到第10个字符),但我们的协同状态是基于CRDT位置标识符的。因此,在广播NLP结果前,必须将这些偏移量精确地转换回CRDT的位置标识符,这是一个非平凡的映射过程。

前端实现:MobX的响应式魔法

前端的DocumentStore是所有交互的核心。它负责维护WebSocket连接,持有文档的CRDT状态,并提供actions来修改这个状态。

1. DocumentStore的结构

// A simplified CRDT library for JS would be used here.
// For example, something that provides `generatePositionBetween()`.

import { makeAutoObservable, runInAction } from "mobx";

// Represents a character in the CRDT model
class CrdtChar {
  constructor(char, position, siteId) {
    this.char = char;
    this.position = position;
    this.siteId = siteId;
    this.id = `${siteId}-${position.join('.')}`; // Unique key for React
  }
}

export class DocumentStore {
  // --- Observables: MobX will track changes to these ---
  document = []; // Array of CrdtChar objects
  annotations = []; // Array of { startPos, endPos, label }
  connectionStatus = "disconnected";
  siteId = Math.random().toString(36).substring(2);
  ws = null;

  constructor(docId) {
    makeAutoObservable(this);
    this.connect(docId);
  }

  // --- Actions: Methods that modify the state ---
  connect(docId) {
    if (this.ws) this.ws.close();
    
    this.ws = new WebSocket(`ws://localhost:8080/ws/docs/${docId}?siteId=${this.siteId}`);
    this.connectionStatus = "connecting";

    this.ws.onopen = () => {
      runInAction(() => {
        this.connectionStatus = "connected";
        console.log("WebSocket connection established.");
      });
    };

    this.ws.onmessage = (event) => {
      const op = JSON.parse(event.data);
      this.applyRemoteOperation(op);
    };

    this.ws.onclose = () => {
      runInAction(() => {
        this.connectionStatus = "disconnected";
        console.log("WebSocket connection closed.");
      });
    };

    this.ws.onerror = (error) => {
      console.error("WebSocket error:", error);
      runInAction(() => this.connectionStatus = "error");
    };
  }

  // Action triggered by a remote operation from the server
  applyRemoteOperation(op) {
    // This is where the magic happens. We just modify the observable array,
    // and any React component using it will automatically update.
    if (op.type === "InsertOp") {
      const newChar = new CrdtChar(op.char, op.position, op.siteId);
      // Find insert index based on CRDT position comparison
      const index = this._findInsertIndex(op.position);
      this.document.splice(index, 0, newChar);
    } else if (op.type === "DeleteOp") {
      this.document = this.document.filter(
          char => JSON.stringify(char.position) !== JSON.stringify(op.position)
      );
    } else if (op.type === "AnnotateOp") {
      // Logic to add or update annotations
      this.annotations.push(op.annotation);
    }
  }

  // Action triggered by local user input in the editor
  localInsert(char, index) {
    // 1. Determine the CRDT position for the new character
    const prevPos = this.document[index - 1]?.position || []; // Simplified boundary
    const nextPos = this.document[index]?.position || [];     // Simplified boundary
    const newPosition = generatePositionBetween(prevPos, nextPos, this.siteId); // CRDT magic

    // 2. Create the operation object
    const op = { type: "InsertOp", char, position: newPosition, siteId: this.siteId };
    
    // 3. Send to server
    if (this.connectionStatus === "connected") {
      this.ws.send(JSON.stringify(op));
    } else {
        // In a real app, queue this operation for when connection is restored
        console.warn("Not connected, operation not sent.");
    }

    // 4. Optimistic local update. This is why the UI feels instant.
    const newChar = new CrdtChar(char, newPosition, this.siteId);
    this.document.splice(index, 0, newChar);
  }
  
  // A helper function for finding insertion point
  _findInsertIndex(position) {
    // This requires a lexicographical comparison of position arrays.
    // ... implementation omitted for brevity ...
    return 0; // Placeholder
  }

  // --- Computed value: Derives data from state ---
  get text() {
    return this.document.map(c => c.char).join("");
  }
}

2. React Component Integration

在React组件中,我们使用mobx-react-liteobserver函数来包裹组件。这使得组件能够响应DocumentStoreobservable状态的变化。

import React from 'react';
import { observer } from 'mobx-react-lite';
import { DocumentStore } from './DocumentStore';

// Assume we have an instance of the store
const docStore = new DocumentStore('doc-123');

const Editor = observer(() => {
  // This component will re-render whenever `docStore.text` or `docStore.connectionStatus` changes.
  
  const handleTextInput = (e) => {
    // Complex logic to convert textarea changes into insert/delete operations
    // For simplicity, let's assume we just handle single character insertion.
    const char = e.nativeEvent.data;
    const index = e.target.selectionStart - 1;
    docStore.localInsert(char, index);
  };

  return (
    <div>
      <div className={`status ${docStore.connectionStatus}`}>
        Status: {docStore.connectionStatus}
      </div>
      {/* This is a very simplified representation. A real editor would use Slate.js or ProseMirror */}
      <textarea
        value={docStore.text} // The computed value provides the text
        onInput={handleTextInput}
        style={{ width: '100%', height: '300px' }}
      />
      <div>
        <h3>Annotations (from NLP)</h3>
        <ul>
          {docStore.annotations.map((anno, i) => <li key={i}>{anno.label}</li>)}
        </ul>
      </div>
    </div>
  );
});

export default Editor;

这彻底改变了工作方式。我们不再需要手动管理订阅、分发更新、或者用useEffect来同步状态。MobX为我们处理了所有这些。当applyRemoteOperation修改this.document数组时,Editor组件的textareavalue会自动更新,几乎没有任何我们自己写的胶水代码。这种开发体验流畅得令人难以置信。

遗留问题与未来迭代

这个实现虽然验证了核心架构的可行性,但离生产环境还有距离。首先,我们使用的CRDT算法是高度简化的。一个生产级的序列CRDT(如Logoot、LSEQ或YATA)实现要复杂得多,需要处理标识符的分配策略、防止树结构爆炸性增长等问题。

其次,Scala后端DocumentSessionActor的内存占用是个潜在瓶颈。如果一个文档变得非常大,或者协同会话持续很久,Actor的状态会持续增长。未来的优化路径可能包括定期对文档状态进行快照,以及对操作日志进行垃圾回收。

最后,NLP分析与CRDT操作的耦合目前是同步的。对于耗时较长的NLP模型,这种同步调用会阻塞操作广播,影响实时性。一个更优的架构是将NLP分析解耦为一个独立的服务。DocumentSessionActor在应用变更后,发布一个DocumentUpdated事件到消息队列,由专门的NLP消费者服务进行处理,处理完后再通过一个独立的通道将结果推送回前端。这能确保核心编辑协同的低延迟不受重量级计算任务的影响。


  目录