业务分析团队需要对存储在 Hadoop HDFS 上的数十 TB 原始日志进行探索性分析,但前端应用(React/Vue)无法直接与 Hive 或 Impala 进行交互。传统的数据接口开发模式面临两个核心矛盾:前端需要灵活的数据查询能力,而后端长达数分钟的同步 Hadoop 查询会彻底阻塞 Node.js 事件循环,导致服务不可用。
我们的目标是构建一个中间层网关,它必须满足:
- 为客户端提供现代化的 GraphQL 接口,允许按需获取数据。
- 能够处理长时间运行的 Hadoop 查询(30秒到10分钟不等)而不会阻塞主服务。
- 向客户端提供查询任务的实时状态反馈。
- 架构上具备水平扩展能力,以应对未来增长的查询负载。
直接暴露一个接受 HiveQL 字符串的 RESTful 端点,然后同步执行查询,是最直观但也是最不可行的方案。一个长时间的 HTTP 请求会轻易导致客户端、负载均衡器和 NestJS 服务本身的超时。更糟糕的是,在查询执行期间,该 NestJS 实例的事件循环将被完全占用,无法处理任何其他请求。
另一种常见方案是由数据团队预先计算好各种维度的聚合数据,存入 Elasticsearch 或 ClickHouse 等高性能 OLAP 引擎中。这种模式适用于固定的报表需求,但完全扼杀了业务分析团队所必需的探索性查询能力。
最终我们选择的架构是:一个异步查询网关。客户端通过 GraphQL 提交一个查询请求,网关立即响应一个唯一的任务ID。网关将实际的 Hadoop 查询作为一个后台作业来处理。客户端则使用任务ID通过另一个 GraphQL 端点轮询查询状态,直到任务完成并获取最终结果。
架构决策与流程
该架构的核心是将查询的“提交”与“结果获取”两个阶段完全解耦。我们使用 Redis 作为任务队列和状态存储的中间件,解耦了API服务和执行查询的工作进程。
sequenceDiagram
participant Client as GraphQL Client
participant Gateway as NestJS Gateway (API)
participant Queue as Redis (BullMQ)
participant Worker as NestJS Worker
participant Hadoop as Hadoop Cluster (Hive)
participant Store as Result Store (Redis/S3)
Client->>+Gateway: mutation submitQuery($query: String!)
Gateway->>Gateway: 生成唯一的 jobId
Gateway->>+Queue: Enqueue({jobId, query})
Gateway-->>-Client: Response: { jobId }
Note right of Worker: Worker进程持续监听队列
Queue-->>+Worker: Dequeue Job
Worker->>Worker: 更新任务状态为 'RUNNING' (Redis)
Worker->>+Hadoop: 执行 HiveQL 查询 (长时间操作)
Hadoop-->>-Worker: 返回查询结果 (e.g., CSV/JSON)
Worker->>+Store: 存储查询结果,以 jobId 为 key
Worker->>Worker: 更新任务状态为 'COMPLETED' (Redis)
deactivate Worker
Note over Client, Gateway: 客户端开始轮询
loop Every 5 seconds
Client->>+Gateway: query getStatus($jobId: ID!)
Gateway->>Store: 读取 jobId 的状态和结果URI
Gateway-->>-Client: Response: { status, resultUrl? }
end
这个设计有几个关键优势:
- API服务高可用: NestJS Gateway 只负责接收请求、入队和状态查询,这些都是毫秒级的操作,事件循环永远不会被阻塞。
- 负载隔离: 计算密集型的 Hive 查询由独立的 Worker 进程池处理。Worker 的数量可以根据 Hadoop 集群的承载能力和查询并发需求独立扩展,不影响 API 服务的性能。
- 健壮性: 即使 Worker 进程因执行一个恶劣的查询而崩溃,也只会影响该任务本身。借助 BullMQ 等成熟的任务队列库,我们可以实现任务重试、失败记录等机制。
核心实现:NestJS 网关与 Worker
我们将项目分为两个 NestJS 应用:api-gateway 和 query-worker。它们共享一部分 DTOs 和服务定义。
1. 网关 (api-gateway) 的 GraphQL 定义
首先定义 GraphQL Schema。我们需要两个核心操作:一个 mutation 用于提交查询,一个 query 用于获取状态和结果。
schema.graphql
type Query {
"""
根据任务ID获取查询状态和结果。
"""
getQueryStatus(jobId: ID!): QueryJob
}
type Mutation {
"""
提交一个新的Hadoop查询任务。
"""
submitHiveQuery(query: String!): QueryJob
}
"""
代表一个查询任务的状态。
"""
type QueryJob {
jobId: ID!
status: JobStatus!
createdAt: String!
completedAt: String
errorMessage: String
resultUrl: String # 结果存储在S3或其他地方的URL
resultPreview: String # 对于小结果集,可以直接返回部分预览
}
enum JobStatus {
PENDING
RUNNING
COMPLETED
FAILED
CANCELLED
}
2. 网关:任务提交 Resolver
这个 Resolver 注入了 BullMQ 的 Queue 对象。它的唯一职责就是将任务推送到名为 hive-queries 的队列中,然后立即返回 jobId 和初始状态。
query.resolver.ts
import { InjectQueue } from '@nestjs/bullmq';
import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
import { Queue } from 'bullmq';
import { v4 as uuidv4 } from 'uuid';
import { QueryJob, JobStatus } from './dto/query-job.dto'; // GraphQL DTOs
import { JobStateCacheService } from './job-state-cache.service';
@Resolver(() => QueryJob)
export class QueryResolver {
constructor(
@InjectQueue('hive-queries') private readonly hiveQueryQueue: Queue,
private readonly jobStateCache: JobStateCacheService,
) {}
@Mutation(() => QueryJob)
async submitHiveQuery(@Args('query') query: string): Promise<QueryJob> {
const jobId = uuidv4();
const initialJobState: QueryJob = {
jobId,
status: JobStatus.PENDING,
createdAt: new Date().toISOString(),
};
// 1. 先将初始状态写入Redis缓存,以便客户端可以立即查询
await this.jobStateCache.setJobState(jobId, initialJobState);
// 2. 将任务推送到队列,供Worker消费
// 我们将jobId作为BullMQ的job id,方便追踪
await this.hiveQueryQueue.add('execute-hive-query', {
jobId,
query,
});
// 3. 立即返回初始状态
return initialJobState;
}
@Query(() => QueryJob)
async getQueryStatus(@Args('jobId') jobId: string): Promise<QueryJob | null> {
// 直接从Redis缓存中获取最新状态
const jobState = await this.jobStateCache.getJobState(jobId);
if (!jobState) {
// 在真实项目中,这里应该抛出一个GraphQL NotFoundException
throw new Error(`Job with ID ${jobId} not found.`);
}
return jobState;
}
}
job-state-cache.service.ts 是一个简单的 Redis 封装,用于读写任务状态。
import { Injectable } from '@nestjs/common';
import { InjectRedis } from '@liaoliaots/nestjs-redis';
import Redis from 'ioredis';
import { QueryJob } from './dto/query-job.dto';
const JOB_STATE_KEY_PREFIX = 'hive-job-state:';
@Injectable()
export class JobStateCacheService {
constructor(@InjectRedis() private readonly redis: Redis) {}
private getKey(jobId: string): string {
return `${JOB_STATE_KEY_PREFIX}${jobId}`;
}
async setJobState(jobId: string, state: QueryJob): Promise<void> {
// 设置一个合理的过期时间,例如7天,防止废弃数据堆积
await this.redis.set(this.getKey(jobId), JSON.stringify(state), 'EX', 60 * 60 * 24 * 7);
}
async getJobState(jobId: string): Promise<QueryJob | null> {
const data = await this.redis.get(this.getKey(jobId));
if (!data) {
return null;
}
return JSON.parse(data) as QueryJob;
}
async updateJobState(jobId: string, updates: Partial<QueryJob>): Promise<QueryJob> {
const currentState = await this.getJobState(jobId);
if (!currentState) {
throw new Error(`Cannot update non-existent job state for ${jobId}`);
}
const newState = { ...currentState, ...updates };
await this.setJobState(jobId, newState);
return newState;
}
}
3. Worker:任务处理器
Worker 应用的核心是一个 BullMQ 的 Processor。它监听 hive-queries 队列,并在收到任务时执行。
hive-query.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { JobStateCacheService } from '../shared/job-state-cache.service';
import { HiveExecutionService } from './hive-execution.service';
import { JobStatus, QueryJob } from '../shared/dto/query-job.dto';
import { Logger } from '@nestjs/common';
@Processor('hive-queries', {
// 根据机器核心数和Hadoop集群并发能力设置
concurrency: 4,
})
export class HiveQueryProcessor extends WorkerHost {
private readonly logger = new Logger(HiveQueryProcessor.name);
constructor(
private readonly jobStateCache: JobStateCacheService,
private readonly hiveExecutor: HiveExecutionService,
) {
super();
}
async process(job: Job<{ jobId: string; query: string }>): Promise<any> {
const { jobId, query } = job.data;
this.logger.log(`[${jobId}] Starting to process query.`);
try {
// 更新状态为 RUNNING
await this.jobStateCache.updateJobState(jobId, { status: JobStatus.RUNNING });
// 核心执行逻辑
const result = await this.hiveExecutor.executeQuery(jobId, query);
// 任务成功,更新状态和结果
await this.jobStateCache.updateJobState(jobId, {
status: JobStatus.COMPLETED,
completedAt: new Date().toISOString(),
resultUrl: result.url,
resultPreview: result.preview,
errorMessage: null,
});
this.logger.log(`[${jobId}] Processing completed successfully.`);
return { status: 'OK', resultUrl: result.url };
} catch (error) {
this.logger.error(`[${jobId}] Processing failed`, error.stack);
// 任务失败,记录错误信息
await this.jobStateCache.updateJobState(jobId, {
status: JobStatus.FAILED,
completedAt: new Date().toISOString(),
errorMessage: error.message || 'An unknown error occurred.',
});
// 向上抛出异常,BullMQ会根据配置决定是否重试
throw error;
}
}
// 监听事件,用于日志和监控
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.log(`Job ${job.id} has completed.`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`Job ${job.id} has failed with error: ${err.message}`);
}
}
4. Worker:与 Hadoop 的交互
hive-execution.service.ts 封装了与 Hive 交互的底层细节。在生产环境中,这通常通过 node-hive、jdbc 或直接调用 beeline 命令行工具来实现。这里我们使用 exec 调用 beeline 作为示例,因为它不依赖复杂的驱动配置,更具通用性。
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { exec } from 'child_process';
import * as fs from 'fs/promises';
import * as path from 'path';
interface HiveQueryResult {
url: string | null; // S3 or internal storage URL
preview: string | null; // First N lines of the result
}
@Injectable()
export class HiveExecutionService {
private readonly logger = new Logger(HiveExecutionService.name);
private readonly hiveHost: string;
private readonly hivePort: number;
private readonly resultDir: string;
constructor(private readonly configService: ConfigService) {
this.hiveHost = this.configService.get<string>('HIVE_HOST');
this.hivePort = this.configService.get<number>('HIVE_PORT');
this.resultDir = path.resolve(this.configService.get<string>('RESULT_DIR', './query_results'));
}
async executeQuery(jobId: string, query: string): Promise<HiveQueryResult> {
await fs.mkdir(this.resultDir, { recursive: true });
const outputFilePath = path.join(this.resultDir, `${jobId}.csv`);
const sanitizedQuery = query.replace(/"/g, '\\"').replace(/;/g, '');
// 生产环境中,连接字符串和认证方式会更复杂,例如使用 Kerberos
const connectionUrl = `jdbc:hive2://${this.hiveHost}:${this.hivePort}/default`;
const command = `beeline -u "${connectionUrl}" -n hadoop --outputformat=csv2 -e "${sanitizedQuery}" > ${outputFilePath}`;
this.logger.debug(`Executing command for job ${jobId}: ${command}`);
return new Promise((resolve, reject) => {
const proc = exec(command, { timeout: 10 * 60 * 1000 /* 10 minutes timeout */ });
proc.on('error', (err) => {
reject(new Error(`Failed to start beeline process: ${err.message}`));
});
proc.on('exit', async (code, signal) => {
if (code === 0) {
try {
const preview = await this.getResultPreview(outputFilePath);
// 在真实项目中,这里会将 outputFilePath 上传到 S3 并返回 S3 URL
const resultUrl = `/results/${jobId}.csv`;
resolve({ url: resultUrl, preview });
} catch (readError) {
reject(new Error(`Query executed but failed to read result file: ${readError.message}`));
}
} else {
// 尝试读取 stderr 获取Hive的错误信息
// 注意:beeline 可能会将错误信息输出到 stdout
const errorLog = await fs.readFile(outputFilePath, 'utf-8').catch(() => 'Failed to read error log.');
reject(new Error(`Beeline process exited with code ${code}, signal ${signal}. Log: ${errorLog.substring(0, 500)}`));
}
});
});
}
private async getResultPreview(filePath: string, lines = 10): Promise<string | null> {
try {
const content = await fs.readFile(filePath, 'utf-8');
return content.split('\n').slice(0, lines).join('\n');
} catch (e) {
this.logger.warn(`Could not generate preview for ${filePath}: ${e.message}`);
return null;
}
}
}
5. 客户端实现轮询
客户端(例如使用 Apollo Client 的 React 应用)在提交查询后,会启动一个轮询逻辑。
useHiveQuery.ts (React Hook 示例)
import { gql, useMutation, useQuery } from '@apollo/client';
import { useState, useEffect } from 'react';
const SUBMIT_HIVE_QUERY = gql`
mutation submitHiveQuery($query: String!) {
submitHiveQuery(query: $query) {
jobId
status
}
}
`;
const GET_QUERY_STATUS = gql`
query getQueryStatus($jobId: ID!) {
getQueryStatus(jobId: $jobId) {
jobId
status
resultUrl
resultPreview
errorMessage
completedAt
}
}
`;
export function useHiveQuery() {
const [jobId, setJobId] = useState<string | null>(null);
const [submitQuery, { loading: isSubmitting }] = useMutation(SUBMIT_HIVE_QUERY);
const { data, loading, error, startPolling, stopPolling } = useQuery(GET_QUERY_STATUS, {
variables: { jobId },
skip: !jobId, // 如果没有jobId,则跳过此查询
pollInterval: 5000, // 每5秒轮询一次
});
const job = data?.getQueryStatus;
// 当任务状态变为最终态时停止轮询
useEffect(() => {
if (job && ['COMPLETED', 'FAILED', 'CANCELLED'].includes(job.status)) {
stopPolling();
}
}, [job, stopPolling]);
const executeQuery = async (query: string) => {
try {
const result = await submitQuery({ variables: { query } });
const newJobId = result.data?.submitHiveQuery?.jobId;
if (newJobId) {
setJobId(newJobId);
startPolling(5000);
}
} catch (e) {
console.error("Failed to submit query:", e);
}
};
return {
job,
isLoading: loading || isSubmitting,
error,
executeQuery,
};
}
架构的局限性与未来迭代路径
这个架构有效地解决了长查询阻塞服务的问题,但它并非银弹。首先,轮询机制会给网关带来持续的、可预见的负载,虽然单次请求开销极低,但在大规模并发查询下也需要关注。一个可行的优化是引入 WebSocket,当任务完成时由服务器主动推送消息给客户端,从而消除轮询。
其次,目前的方案对用户提交的查询内容没有做任何限制。一个恶意的或编写拙劣的查询(如无LIMIT的全表扫描)可能会消耗大量 Hadoop 集群资源,影响其他任务。未来的迭代需要加入查询校验与改写层,例如:强制添加 LIMIT、禁止某些高危操作、甚至基于查询的复杂度进行初步的成本估算和配额限制。
最后,结果存储目前较为简单。对于大型结果集,直接通过网关下载并不高效。更稳健的做法是将结果文件从 Worker 直接上传到对象存储(如 S3),并生成一个有时效性的预签名 URL,客户端通过此 URL 直接从 S3 下载数据,进一步减轻网关的带宽压力。