构建基于 Phoenix Channels 和 Tornado 的异构实时 GitHub CI 日志流系统


最初的需求听起来很简单:为我们的 CI/CD 流程构建一个实时的日志查看器。当 GitHub Actions 触发一个工作流时,我们希望开发人员能在一个 Web 界面上看到实时滚动的日志输出。最初的原型是一个简单的 Python Flask 应用,它接收 GitHub Webhook,执行一个子进程,然后通过轮询端点暴露日志文件的最新内容。这个方案在开发环境中运行良好,但在第一次有超过十个开发者同时关注一个紧急修复的构建时,它就彻底崩溃了。服务器的 CPU 占用率飙升,日志更新延迟严重,用户体验极差。

问题根源在于轮询机制和 Python 在处理大量并发长连接时的固有挑战。即便切换到像 Tornado 这样的异步框架,直接管理成千上万个 WebSocket 连接对于单个 Python 进程来说,仍然是一个沉重的负担,尤其是在 GIL 的限制下,资源调度会变得非常复杂。

我们的核心构建和分析脚本是用 Python 编写的,这是一个无法改变的技术现实。重写这些复杂的脚本是不可行的。我们需要一个能将 Python 的业务逻辑能力与一个真正为高并发实时通信而生的平台结合起来的方案。这让我们把目光投向了 Elixir 和 Phoenix 框架。Phoenix 建立在 Erlang/OTP 之上,其轻量级进程模型和为每个 WebSocket 连接分配一个独立进程的设计,使其能够以极低的资源消耗轻松处理数十万甚至上百万的并发连接。

最终的技术决策是构建一个异构系统:使用 Phoenix 作为面向客户端的实时通信网关,专门处理海量的 WebSocket 连接和消息广播;同时,保留我们现有的 Python 脚本,并将其封装在 Tornado 应用中,作为后端的工作节点(Worker),负责执行实际的 CI 任务。两者之间通过一个轻量级的消息队列(我们选择了 Redis Pub/Sub)进行解耦和通信。

架构概览

整个数据流被设计为单向且清晰的。

graph TD
    A[GitHub Webhook] --> B{Phoenix Endpoint};
    B --> C[Redis Pub/Sub: `ci:jobs`];
    D[Tornado Worker Pool] -- Subscribes --> C;
    D -- Spawns --> E[Python CI Script];
    E -- stdout/stderr --> D;
    D -- Publishes logs --> F[Redis Pub/Sub: `ci:logs:job_id`];
    G[Browser via WebSocket] -- Joins Channel --> H{Phoenix Channel `logs:job_id`};
    H -- Subscribes --> F;
    F --> H;
    H -- Pushes logs --> G;
  1. GitHub Webhook -> Phoenix: GitHub Actions 触发的 Webhook 请求被发送到 Phoenix 应用的一个专用控制器。
  2. Phoenix -> Redis (Jobs): Phoenix 控制器验证请求后,将一个包含仓库信息和任务 ID 的作业消息发布到 Redis 的 ci:jobs 频道。它的职责到此为止,立即返回响应给 GitHub。
  3. Redis (Jobs) -> Tornado Worker: 一个或多个 Tornado Worker 实例订阅 ci:jobs 频道。收到新作业消息后,其中一个 Worker 会接手处理。
  4. Tornado Worker -> CI Script: Worker 启动一个子进程来执行实际的 Python 构建/分析脚本,并异步地读取其 stdoutstderr
  5. Tornado Worker -> Redis (Logs): Worker 将从子进程读取到的每一行日志,都发布到与该作业 ID 对应的特定 Redis 频道,例如 ci:logs:some-unique-job-id
  6. Browser -> Phoenix Channel: 前端页面通过 WebSocket 连接到 Phoenix,并请求加入一个与作业 ID 匹配的 Channel,如 logs:some-unique-job-id
  7. Phoenix Channel -> Redis (Logs): Phoenix Channel 进程在用户加入时,会订阅对应的 Redis 日志频道 ci:logs:some-unique-job-id
  8. Redis -> Phoenix -> Browser: 当 Tornado Worker 发布日志到 Redis 时,Phoenix Channel 进程会接收到消息,并立即通过 WebSocket 将其广播给所有订阅了该 Channel 的客户端。

这种架构的优势在于职责分离。Phoenix 只做它最擅长的事:管理海量连接。Tornado 也只做它必须做的事:运行 Python 代码。Redis 则充当了两者之间高效、解耦的缓冲层。

Phoenix: 构建 WebSocket 网关

首先,我们初始化一个新的 Phoenix 项目,并添加 Redis 客户端依赖 redix

# mix.exs
def deps do
  [
    # ...
    {:redix, "~> 1.1"}
  ]
end

我们需要一个 Redis 连接进程,用于发布和订阅。在 application.ex 中,我们将其作为监督树的一部分启动。

# lib/ci_viewer/application.ex
defmodule CiViewer.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      CiViewer.Repo,
      CiViewerWeb.Telemetry,
      {Phoenix.PubSub, name: CiViewer.PubSub},
      # 启动一个命名为 :redix_pub 的 Redix 进程
      {Redix, name: :redix_pub, url: "redis://localhost:6379/1"},
      # 启动一个命名为 :redix_sub 的 Redix 进程,用于订阅
      {Redix, name: :redix_sub, url: "redis://localhost:6379/1"},
      CiViewerWeb.Endpoint
    ]

    opts = [strategy: :one_for_one, name: CiViewer.Supervisor]
    Supervisor.start_link(children, opts)
  end
  # ...
end

在真实项目中,Redis URL 应当从环境变量中读取。这里为了简化,直接硬编码。

接下来是处理 GitHub Webhook 的控制器。这个控制器非常轻量,它的唯一工作就是将作业信息推送到 Redis。

# lib/ci_viewer_web/controllers/github_controller.ex
defmodule CiViewerWeb.Controllers.GitHubController do
  use CiViewerWeb, :controller
  require Logger

  def webhook(conn, %{"repository" => %{"full_name" => repo}, "head_commit" => %{"id" => commit_id}}) do
    job_id = "#{repo |> String.replace("/", "-")}-#{commit_id}"
    payload = %{repo: repo, commit_id: commit_id, job_id: job_id} |> Jason.encode!()

    # 使用 :redix_pub 连接发布作业
    case Redix.command(:redix_pub, ["PUBLISH", "ci:jobs", payload]) do
      {:ok, _} ->
        Logger.info("Published job #{job_id} to Redis")
        # 返回 job_id,以便前端可以立即开始监听
        json(conn, %{status: "ok", job_id: job_id})
      {:error, reason} ->
        Logger.error("Failed to publish job to Redis: #{inspect(reason)}")
        conn
        |> put_status(:internal_server_error)
        |> json(%{status: "error", message: "Internal server error"})
    end
  end

  # 处理其他无效的 webhook payload
  def webhook(conn, _) do
    conn
    |> put_status(:bad_request)
    |> json(%{status: "error", message: "Invalid payload"})
  end
end

核心部分是 LogChannel。它负责处理客户端的 WebSocket 连接。

# lib/ci_viewer_web/channels/log_channel.ex
defmodule CiViewerWeb.Channels.LogChannel do
  use Phoenix.Channel
  require Logger

  # 客户端加入 channel, topic 格式为 "logs:job_id"
  def join("logs:" <> job_id, _payload, socket) do
    Logger.info("Client joined logs channel for job #{job_id}")
    # 启动一个任务来监听 Redis
    {:ok, _pid} = Task.start_link(fn -> listen_for_logs(job_id, self()) end)
    {:ok, %{status: "joined", job_id: job_id}, assign(socket, :job_id, job_id)}
  end

  # 服务器端的消息处理,这里我们不需要客户端发消息
  def handle_in(_, _, socket) do
    {:noreply, socket}
  end

  # 这是在独立进程中运行的监听函数
  defp listen_for_logs(job_id, channel_pid) do
    log_channel = "ci:logs:#{job_id}"
    # 使用一个独立的 Redix 连接进行订阅,避免阻塞
    {:ok, sub_conn} = Redix.start_link(url: "redis://localhost:6379/1")
    Redix.command!(sub_conn, ["SUBSCRIBE", log_channel])
    Logger.info("Subscribed to Redis channel #{log_channel}")

    # 循环接收 Redis 消息
    loop_receive_messages(sub_conn, channel_pid)
  end

  defp loop_receive_messages(sub_conn, channel_pid) do
    receive do
      # Redix 将 Redis Pub/Sub 消息封装为这种格式
      {:redix, ^sub_conn, ["message", _log_channel, log_line]} ->
        # 将日志推送到 Phoenix Channel
        push_log(channel_pid, log_line)
        loop_receive_messages(sub_conn, channel_pid)

      {:redix_disconnect, ^sub_conn, _reason} ->
        Logger.error("Redis subscriber disconnected.")
        # 在真实项目中,这里应该有重连逻辑

      _other ->
        Logger.warn("Received unexpected message in Redis listener")
        loop_receive_messages(sub_conn, channel_pid)
    end
  end

  # 将消息推送到 channel 进程,再由它广播出去
  defp push_log(channel_pid, log_line) do
    endpoint = CiViewerWeb.Endpoint
    # 使用 pid 直接向 channel 进程发送消息
    # 避免了 Phoenix PubSub 的内部开销,因为我们知道确切的进程
    endpoint.broadcast_from(channel_pid, "new_log", %{line: log_line})
  end
end

注意这里的 listen_for_logs 函数。每个加入 Channel 的客户端都会触发启动一个 Task。这个 Task 独立于 Channel 进程,负责创建一个新的 Redis 连接并进入一个阻塞的接收循环。这种模式确保了 Channel 进程本身不会因为等待 Redis 消息而被阻塞,可以继续响应其他 Channel 事件。

最后,在 socket.ex 中定义 channel 路由:

# lib/ci_viewer_web/channels/user_socket.ex
channel "logs:*", CiViewerWeb.Channels.LogChannel

Tornado: 运行 Python 作业的 Worker

Tornado Worker 是一个独立的 Python 应用。它需要 tornadoaioredis 库。

# worker.py
import asyncio
import logging
import signal
import os
import aioredis
import json
from tornado.process import Subprocess
from tornado.iostream import StreamClosedError

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/1")
JOBS_CHANNEL = "ci:jobs"

class CIWorker:
    def __init__(self):
        self.redis_pub = None
        self.redis_sub = None
        self.shutdown = False

    async def setup(self):
        """初始化 Redis 连接。"""
        try:
            self.redis_pub = await aioredis.from_url(REDIS_URL, decode_responses=True)
            self.redis_sub = self.redis_pub.pubsub()
            logging.info(f"Connected to Redis at {REDIS_URL}")
        except Exception as e:
            logging.error(f"Could not connect to Redis: {e}")
            raise

    async def run_job(self, job_data):
        """执行一个 CI 作业。"""
        repo = job_data.get("repo")
        job_id = job_data.get("job_id")
        if not repo or not job_id:
            logging.warning(f"Invalid job data received: {job_data}")
            return

        log_channel = f"ci:logs:{job_id}"
        logging.info(f"Starting job {job_id} for repo {repo}")

        try:
            # 模拟一个会持续输出日志的脚本
            # 在真实项目中,这里会是你的 build/test/deploy 脚本
            process = Subprocess(
                ['python', '-u', 'mock_ci_script.py', repo],
                stdout=Subprocess.STREAM,
                stderr=Subprocess.STREAM
            )
            
            await self.publish_log(log_channel, f"[SYSTEM] Job '{job_id}' started.")
            
            # 并发读取 stdout 和 stderr
            await asyncio.gather(
                self.stream_logs(process.stdout, log_channel, "[STDOUT]"),
                self.stream_logs(process.stderr, log_channel, "[STDERR]")
            )
            
            exit_code = await process.wait_for_exit(raise_error=False)
            await self.publish_log(log_channel, f"[SYSTEM] Job finished with exit code {exit_code}.")
            logging.info(f"Job {job_id} finished with exit code {exit_code}")

        except Exception as e:
            logging.error(f"Error running job {job_id}: {e}")
            await self.publish_log(log_channel, f"[SYSTEM] Job failed with an internal error: {e}")

    async def stream_logs(self, stream, log_channel, prefix):
        """从子进程流中读取日志并发布到 Redis。"""
        try:
            async for line in stream:
                log_line = line.decode('utf-8').strip()
                await self.publish_log(log_channel, f"{prefix} {log_line}")
        except StreamClosedError:
            pass # 流关闭是正常现象
        except Exception as e:
            logging.error(f"Error streaming logs for {log_channel}: {e}")

    async def publish_log(self, channel, message):
        """发布单条日志到 Redis。"""
        try:
            await self.redis_pub.publish(channel, message)
        except Exception as e:
            logging.error(f"Failed to publish log to {channel}: {e}")

    async def listen_for_jobs(self):
        """监听 `ci:jobs` 频道并分派作业。"""
        await self.redis_sub.subscribe(JOBS_CHANNEL)
        logging.info(f"Subscribed to '{JOBS_CHANNEL}' channel. Waiting for jobs...")
        
        while not self.shutdown:
            try:
                message = await self.redis_sub.get_message(ignore_subscribe_messages=True, timeout=1.0)
                if message and message.get("type") == "message":
                    job_data = json.loads(message['data'])
                    # 不等待作业完成,立即开始处理下一个消息
                    asyncio.create_task(self.run_job(job_data))
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logging.error(f"Error processing message from Redis: {e}")
    
    def handle_shutdown(self, sig, frame):
        """处理关闭信号。"""
        logging.info("Shutdown signal received. Exiting gracefully...")
        self.shutdown = True

    async def close(self):
        """关闭所有连接。"""
        if self.redis_sub:
            await self.redis_sub.close()
        if self.redis_pub:
            await self.redis_pub.close()
        logging.info("Redis connections closed.")

async def main():
    worker = CIWorker()
    signal.signal(signal.SIGINT, worker.handle_shutdown)
    signal.signal(signal.SIGTERM, worker.handle_shutdown)
    
    try:
        await worker.setup()
        await worker.listen_for_jobs()
    finally:
        await worker.close()

if __name__ == "__main__":
    asyncio.run(main())

这个 Worker 的关键在于它完全是异步的。它使用 aioredis 异步地等待作业消息,使用 tornado.process.Subprocessasync for 异步地读取子进程的输出流。这使得单个 Worker 进程可以同时管理多个正在运行的 CI 脚本,极大地提高了资源利用率。

为了配合 Worker,我们还需要一个模拟的 CI 脚本:

# mock_ci_script.py
import time
import sys
import random

def run():
    repo = sys.argv[1] if len(sys.argv) > 1 else "unknown/repo"
    print(f"Cloning repository {repo}...")
    time.sleep(1)
    print("Installing dependencies...")
    for i in range(5):
        print(f"  - Installing package {i+1}...")
        time.sleep(random.uniform(0.2, 0.5))
    
    print("Running tests...")
    time.sleep(1)
    if random.random() < 0.2:
        print("Test failed: Assertion Error in test_critical_feature", file=sys.stderr)
        sys.exit(1)
    
    print("Tests passed.")
    print("Building artifact...")
    time.sleep(2)
    print("Build successful.")
    sys.exit(0)

if __name__ == "__main__":
    run()

这个脚本使用 time.sleep 来模拟耗时操作,并通过 printstdoutstderr 输出信息。-u 标志在 worker.py 中调用时非常重要,它禁用了 Python 的输出缓冲,确保每一行日志都能被实时捕获。

前端: esbuild 驱动的轻量级视图

前端部分力求简单。我们不需要庞大的框架,只需要一个能够连接 WebSocket 并将日志渲染到页面的脚本。esbuild 是完成这个任务的完美工具,它快如闪电。

package.json 配置:

{
  "name": "ci-viewer-frontend",
  "version": "1.0.0",
  "scripts": {
    "build": "esbuild js/app.js --bundle --outfile=../priv/static/assets/app.js --sourcemap",
    "watch": "esbuild js/app.js --bundle --outfile=../priv/static/assets/app.js --sourcemap --watch"
  },
  "dependencies": {
    "phoenix": "file:../deps/phoenix"
  },
  "devDependencies": {
    "esbuild": "^0.19.5"
  }
}

我们直接从 Phoenix 的 deps 目录引用 phoenix JS 库。

前端核心逻辑 assets/js/app.js:

import { Socket } from "phoenix";

// 从 URL 中获取 job_id
const getJobId = () => {
    const params = new URLSearchParams(window.location.search);
    return params.get('job_id');
};

const jobId = getJobId();
const logContainer = document.getElementById("log-container");

if (jobId) {
    logContainer.innerHTML = `<p class="system">Connecting to logs for job: ${jobId}...</p>`;

    // 连接到 Phoenix Socket
    let socket = new Socket("/socket", { params: { token: window.userToken } });
    socket.connect();

    // 创建并加入 Channel
    let channel = socket.channel(`logs:${jobId}`, {});

    channel.on("new_log", payload => {
        const logLine = document.createElement("div");
        logLine.classList.add("log-line");
        if (payload.line.startsWith("[SYSTEM]")) {
            logLine.classList.add("system");
        } else if (payload.line.startsWith("[STDERR]")) {
            logLine.classList.add("stderr");
        }
        logLine.textContent = payload.line;
        logContainer.appendChild(logLine);
        // 自动滚动到底部
        logContainer.scrollTop = logContainer.scrollHeight;
    });

    channel.join()
        .receive("ok", resp => {
            logContainer.querySelector('.system').textContent = `Successfully connected. Waiting for logs...`;
        })
        .receive("error", resp => {
            logContainer.querySelector('.system').textContent = `Unable to join channel: ${resp.reason}`;
            logContainer.querySelector('.system').classList.add('stderr');
        });

} else {
    logContainer.innerHTML = `<p class="system stderr">No job_id specified in URL. Please add ?job_id=... to the URL.</p>`;
}

这个脚本的功能很纯粹:连接 WebSocket,加入正确的 Channel,监听 new_log 事件,然后将接收到的日志行添加到 DOM 中。

局限与未来路径

这个架构虽然解决了最初的高并发问题,但它并非完美。首先,Redis Pub/Sub 是一种“即发即弃”的模型。如果在 Tornado Worker 发布日志时,Phoenix 节点恰好重启或网络中断,那么这条日志就会永久丢失。对于需要严格保证日志完整性的场景,应该用 RabbitMQ 或 Kafka 替换 Redis Pub/Sub,利用它们的消息持久化和确认机制。

其次,当前的错误处理比较基础。一个生产级的 Worker 需要更完善的机制来捕获 CI 脚本的异常、超时和资源耗尽等情况,并将这些明确的失败状态通过 Redis 传递给 Phoenix,最终展示给用户。

最后,系统的状态是瞬时的。一旦作业完成,日志流就停止了。为了提供历史日志查询功能,Tornado Worker 在作业结束时,应将完整的日志和最终状态(成功/失败)持久化到数据库(如 PostgreSQL)或日志存储系统(如 Elasticsearch)中。这将引入一个新的数据读取路径,但与实时流路径是正交的。


  目录