构建服务于 Next.js 前端的近实时数据管道 Apache Hudi 与 ClickHouse 的架构权衡


我们需要一个直接面向用户的分析仪表盘,它必须在亚秒级内响应复杂的查询,同时后端数据平台需要支持对海量事件流的更新、删除和历史回溯。这是一个典型的、存在于现代数据应用中的核心矛盾:分析查询的速度与数据湖的灵活性和可维护性之间的冲突。

直接将事件流写入 ClickHouse 这样的 OLAP 引擎,虽然查询速度极快,但却是一条单行道。数据变更(例如,用户更新了个人信息,需要回溯修改历史事件中的用户属性)变得异常困难,数据湖的可重处理性也无从谈起。反之,一个纯粹的数据湖(如 S3 上的 Parquet 文件),虽然灵活且成本低廉,但无法满足用户交互所需的低延迟查询。

问题的本质在于,我们需要一个能够同时扮演“事务性数据湖”和“高速分析查询引擎”两个角色的系统。单一技术无法优雅地解决这个问题,因此,一个分层的、各司其职的架构成为必然选择。

方案A:直接写入 OLAP 引擎

这是最直接的方案。Vercel Functions 作为事件接收器,将数据直接写入 ClickHouse。

graph TD
    subgraph Frontend
        A[Next.js App with Valtio]
    end
    subgraph Vercel Platform
        B(Vercel Function: /api/ingest)
    end
    subgraph Data Layer
        C[ClickHouse Cluster]
    end

    A -- "POST event" --> B
    B -- "INSERT INTO events" --> C
    A -- "GET /api/query" --> B
    B -- "SELECT ... FROM events" --> C

优点:

  1. 极低延迟: 数据路径最短,几乎可以实现实时。
  2. 架构简单: 移动部件最少,易于理解和初步实现。

致命缺陷:

  1. 数据可变性差: ClickHouse 对 UPDATEDELETE 的支持(ALTER TABLE ... UPDATE/DELETE)是异步的、重量级的 Mutation 操作,不适用于高频次的单记录更新。修复历史数据或处理 GDPR 删除请求将成为运维噩梦。
  2. 缺乏数据湖能力: 原始数据直接进入专有格式的 OLAP 引擎,失去了在通用计算框架(如 Spark, Flink)中进行复杂 ETL、机器学习特征工程或数据重处理的灵活性。我们被 ClickHouse 锁定了。
  3. 读写强耦合: 写入负载的峰值会直接影响查询性能,因为它们竞争的是同一套集群资源。

在真实项目中,数据永远不是完美的,业务逻辑会变更,数据需要勘误。方案 A 的刚性结构在生产环境中会迅速退化为技术债。

方案B:数据湖与 OLAP 引擎分离的最终选择

我们选择的架构引入了 Apache Hudi 作为核心的中间层,它构建在廉价的对象存储(如 AWS S3)之上,为数据湖带来了数据库的核心能力:事务、更新/删除(Upserts)和增量处理。

graph TD
    subgraph Frontend
        A[Next.js App with Valtio]
    end

    subgraph API & Ingestion Layer
        B[Vercel Function: /api/ingest]
        D[Vercel Function: /api/query]
    end

    subgraph Data Lake on Object Storage
        C{S3 Bucket: Raw Events}
        E[Apache Hudi Table on S3]
    end

    subgraph Data Processing
        F[Spark Job on EMR/Databricks]
    end

    subgraph Serving Layer
        G[ClickHouse Cluster]
    end

    subgraph Sync Process
        H[Python Script: Hudi to ClickHouse]
    end

    A -- "POST event" --> B
    B -- "PutObject" --> C
    F -- "Reads" --> C
    F -- "Upserts" --> E
    H -- "Incremental Pull" --> E
    H -- "Batch Insert" --> G
    A -- "GET /api/query" --> D
    D -- "SELECT ... FROM events" --> G

这个架构将写入路径和查询路径解耦:

  1. 写入路径: Next.js -> Vercel Ingest Function -> S3 Raw Bucket -> Spark/Hudi Job -> Hudi Table
  2. 同步路径: Hudi Table -> Incremental Sync Script -> ClickHouse
  3. 查询路径: Next.js -> Vercel Query Function -> ClickHouse

这种设计通过增加架构深度,换取了系统的鲁棒性、可维护性和扩展性。Hudi 解决了数据湖的可变性问题,ClickHouse 专注于其最擅长的事情——极速查询。

核心实现:代码与配置

1. Vercel Function: 事件采集端点 (/api/ingest)

这个 Serverless 函数的职责是接收前端事件,进行初步校验,然后快速地将其存入 S3 的一个“着陆区”(landing zone)。它的设计必须是轻量和高可用的。

pages/api/ingest.ts

import { NextApiRequest, NextApiResponse } from 'next';
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { v4 as uuidv4 } from 'uuid';

// 在生产环境中,S3Client 应该在函数外部初始化以复用连接
const s3Client = new S3Client({
  region: process.env.AWS_S3_REGION,
  credentials: {
    accessKeyId: process.env.AWS_S3_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_S3_SECRET_ACCESS_KEY!,
  },
});

// 定义一个基础的事件结构,实际项目中会使用 Zod 或类似库进行严格校验
interface AnalyticsEvent {
  eventType: string;
  userId: string;
  payload: Record<string, any>;
  clientTimestamp: string; // ISO 8601 format
}

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  if (req.method !== 'POST') {
    res.setHeader('Allow', ['POST']);
    return res.status(405).end(`Method ${req.method} Not Allowed`);
  }

  try {
    const event = req.body as AnalyticsEvent;

    // 关键步骤:生产级校验
    // 这里的校验非常基础,真实项目中应更复杂
    if (!event.eventType || !event.userId || !event.clientTimestamp) {
      console.warn('Invalid event received', { body: req.body });
      return res.status(400).json({ error: 'Invalid event structure.' });
    }

    const serverTimestamp = new Date().toISOString();
    const eventId = uuidv4();

    // 增强事件,加入服务器时间戳和唯一ID
    const enrichedEvent = {
      ...event,
      eventId,
      serverTimestamp,
    };

    const date = new Date(serverTimestamp);
    const year = date.getUTCFullYear();
    const month = String(date.getUTCMonth() + 1).padStart(2, '0');
    const day = String(date.getUTCDate()).padStart(2, '0');
    const hour = String(date.getUTCHours()).padStart(2, '0');
    
    // 按小时分区写入,这是数据湖的标准实践
    const key = `raw/events/year=${year}/month=${month}/day=${day}/hour=${hour}/${eventId}.json`;

    const command = new PutObjectCommand({
      Bucket: process.env.AWS_S3_BUCKET_NAME!,
      Key: key,
      Body: JSON.stringify(enrichedEvent),
      ContentType: 'application/json',
    });

    await s3Client.send(command);

    // 仅在成功写入后返回202 Accepted
    // 客户端不需要等待后续处理
    return res.status(202).json({ status: 'accepted', eventId });

  } catch (error) {
    // 关键步骤:错误处理与日志
    console.error('Failed to ingest event:', error);
    // 避免向客户端暴露过多内部错误细节
    return res.status(500).json({ error: 'Internal server error during ingestion.' });
  }
}

这里的核心考量是:

  • 异步确认: 返回 202 Accepted 告诉客户端请求已被接收,但处理是异步的。这解耦了客户端与后端数据处理。
  • 分区路径: year માતા/day=/hour= 的 S3 Key 结构是 Hive 风格的分区,这对于后续的 Spark/Hudi 高效处理至关重要。
  • 错误处理: 捕获异常并记录,但不对外暴露实现细节。

2. Apache Hudi 作业: 构建事务性数据湖

这是架构的核心。一个周期性运行的 Spark 作业(例如每15分钟一次)会读取 S3 landing zone 中的原始 JSON 文件,并将它们 upsert 到 Hudi 表中。我们选择 COPY_ON_WRITE 模式,因为它为读密集型分析场景提供了更好的性能。

process_events_with_hudi.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, lit
from pyspark.sql.types import StructType, StructField, StringType, MapType, TimestampType

# Hudi 配置
HUDI_TABLE_NAME = "user_events"
HUDI_BASE_PATH = "s3a://your-datalake-bucket/hudi/user_events"
RAW_DATA_PATH = "s3a://your-ingest-bucket/raw/events/*/*/*/*" # 读取所有分区

# Hudi 写入选项
# 生产环境中这些配置需要仔细调优
hudi_options = {
    'hoodie.table.name': HUDI_TABLE_NAME,
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', # 读优化
    'hoodie.datasource.write.recordkey.field': 'eventId', # 唯一记录键
    'hoodie.datasource.write.partitionpath.field': 'eventDate', # 分区字段
    'hoodie.datasource.write.precombine.field': 'serverTimestamp', # 合并时选择最新记录的依据
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.hive_sync.enable': 'true', # 如果使用Hive Metastore
    'hoodie.datasource.hive_sync.table': HUDI_TABLE_NAME,
    'hoodie.datasource.hive_sync.database': 'analytics',
    'hoodie.datasource.hive_sync.partition_fields': 'eventDate',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.compact.inline': 'false', # 可配置异步的compaction
    'hoodie.parquet.compression.codec': 'snappy'
}

def main():
    spark = SparkSession.builder \
        .appName("Hudi Events Ingestion") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .getOrCreate()

    # 从S3读取原始JSON数据
    # 在生产中,你需要一个机制来只读取新的文件,避免重复处理
    # 例如,使用 SQS 事件通知或维护一个已处理文件的清单
    raw_df = spark.read.text(RAW_DATA_PATH)
    
    # 定义JSON Schema
    schema = StructType([
        StructField("eventId", StringType(), False),
        StructField("eventType", StringType(), True),
        StructField("userId", StringType(), True),
        StructField("payload", MapType(StringType(), StringType()), True),
        StructField("clientTimestamp", StringType(), True),
        StructField("serverTimestamp", StringType(), False),
    ])

    # 解析JSON并进行转换
    events_df = raw_df.select(from_json(col("value"), schema).alias("data")).select("data.*")
    
    # 添加分区列 eventDate,从 serverTimestamp 转换而来
    transformed_df = events_df.withColumn(
        "serverTimestamp", col("serverTimestamp").cast(TimestampType())
    ).withColumn(
        "eventDate", col("serverTimestamp").cast("date").cast("string")
    )

    if transformed_df.count() == 0:
        print("No new events to process.")
        return

    # 将数据写入 Hudi 表
    print(f"Writing {transformed_df.count()} records to Hudi table {HUDI_TABLE_NAME}...")
    transformed_df.write.format("hudi") \
        .options(**hudi_options) \
        .mode("append") \
        .save(HUDI_BASE_PATH)
        
    print("Hudi write operation completed.")

if __name__ == "__main__":
    main()

这段 PySpark 脚本的几个关键点:

  • Record Key (eventId): Hudi 使用它来唯一标识一条记录,实现 upsert
  • Precombine Key (serverTimestamp): 当多条记录有相同的 recordKey 时,Hudi 会保留 precombineKey 值最大的那条。这对于处理延迟或重复事件至关重要。
  • Partition Path (eventDate): 物理上将数据按天组织在 S3 上,极大提升了按时间范围查询的性能。

3. 同步 Hudi 到 ClickHouse

Hudi 的一个强大功能是增量查询。我们可以编写一个简单的 Python 脚本,定期查询自上次同步以来的所有变更(新增和更新),然后批量写入 ClickHouse。

sync_hudi_to_clickhouse.py

import os
from datetime import datetime, timedelta
from clickhouse_driver import Client
from pyspark.sql import SparkSession

# --- 配置 ---
HUDI_BASE_PATH = "s3a://your-datalake-bucket/hudi/user_events"
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_TABLE = "analytics.events_local" # 使用本地表进行写入
STATE_FILE = "/opt/sync/last_sync_commit_time.txt" # 存储上次同步点

def get_last_sync_commit_time():
    """读取上次同步的Hudi commit时间"""
    if not os.path.exists(STATE_FILE):
        # 首次运行时,从1天前开始同步
        return (datetime.now() - timedelta(days=1)).strftime("%Y%m%d%H%M%S")
    with open(STATE_FILE, 'r') as f:
        return f.read().strip()

def save_last_sync_commit_time(commit_time):
    """保存本次同步的Hudi commit时间"""
    with open(STATE_FILE, 'w') as f:
        f.write(commit_time)

def main():
    spark = SparkSession.builder.appName("HudiToClickHouseSync").getOrCreate()

    # 1. 获取 Hudi 表的最新 commit 时间
    hudi_commits = spark.getCommits(HUDI_BASE_PATH)
    if not hudi_commits:
        print("No commits found in Hudi table. Exiting.")
        return
    latest_commit_time = hudi_commits[0]

    # 2. 获取上次同步点
    begin_time = get_last_sync_commit_time()
    
    if begin_time >= latest_commit_time:
        print(f"No new commits since {begin_time}. Exiting.")
        return

    print(f"Syncing changes from commit > {begin_time} up to {latest_commit_time}")

    # 3. 执行 Hudi 增量查询
    hudi_inc_df = spark.read.format("hudi") \
        .option("hoodie.datasource.query.type", "incremental") \
        .option("hoodie.datasource.read.begin.instanttime", begin_time) \
        .option("hoodie.datasource.read.end.instanttime", latest_commit_time) \
        .load(HUDI_BASE_PATH)

    # 选择需要的列并丢弃Hudi元数据列
    columns_to_sync = ["eventId", "eventType", "userId", "payload", "clientTimestamp", "serverTimestamp"]
    data_to_sync_df = hudi_inc_df.select(columns_to_sync)

    if data_to_sync_df.count() == 0:
        print("No new data in the incremental view. Updating commit time.")
        save_last_sync_commit_time(latest_commit_time)
        return

    # 4. 批量写入 ClickHouse
    # .collect() 可能导致OOM,生产环境应使用 .toLocalIterator() 或分批处理
    data_to_insert = [row.asDict() for row in data_to_sync_df.collect()]
    
    print(f"Inserting/Updating {len(data_to_insert)} records into ClickHouse...")
    client = Client(host=CLICKHOUSE_HOST)
    
    # ClickHouse 的 ReplacingMergeTree 引擎能够根据排序键处理更新
    client.execute(
        f"INSERT INTO {CLICKHOUSE_TABLE} VALUES",
        data_to_insert,
        types_check=True
    )
    print("Insertion complete.")

    # 5. 更新同步点
    save_last_sync_commit_time(latest_commit_time)
    print(f"Successfully synced up to commit {latest_commit_time}.")

if __name__ == "__main__":
    main()

这里的 ClickHouse 表引擎选择至关重要。我们会使用 ReplacingMergeTree

-- ClickHouse DDL
CREATE TABLE analytics.events_local ON CLUSTER '{cluster}'
(
    `eventId` String,
    `eventType` String,
    `userId` String,
    `payload` Map(String, String),
    `clientTimestamp` DateTime,
    `serverTimestamp` DateTime,
    `sign` Int8 -- 用于 ReplacingMergeTree
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/analytics.events_local', '{replica}', serverTimestamp)
PARTITION BY toYYYYMM(serverTimestamp)
ORDER BY (userId, eventType, eventId);

-- 分布式表,用于查询
CREATE TABLE analytics.events ON CLUSTER '{cluster}' AS analytics.events_local
ENGINE = Distributed('{cluster}', analytics, events_local, rand());

ReplicatedReplacingMergeTree 会在后台合并数据时,对于排序键(ORDER BY 子句)相同的行,只保留版本字段(这里是 serverTimestamp)最新的那一行,从而优雅地实现了 upsert 语义。

4. Vercel Function: 查询端点 (/api/query)

这个函数负责接收前端的查询请求,将其转换为 ClickHouse SQL,然后返回结果。

pages/api/query.ts

import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@clickhouse/client-web';

const client = createClient({
  host: process.env.CLICKHOUSE_HOST,
  username: process.env.CLICKHOUSE_USER || 'default',
  password: process.env.CLICKHOUSE_PASSWORD || '',
  database: 'analytics',
});

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  if (req.method !== 'GET') {
    return res.status(405).end();
  }

  try {
    const { userId, dateFrom, dateTo, eventType } = req.query;

    // 参数校验是必须的
    if (!userId || typeof userId !== 'string') {
      return res.status(400).json({ error: 'userId is required.' });
    }
    // ... 其他参数校验

    // 构建动态查询和参数,防止SQL注入
    let query = `
      SELECT 
        eventType,
        count() as eventCount,
        toStartOfDay(serverTimestamp) as day
      FROM analytics.events
      WHERE userId = {userId:String}
    `;
    const queryParams: Record<string, unknown> = { userId };

    if (dateFrom && typeof dateFrom === 'string') {
      query += ` AND serverTimestamp >= {dateFrom:DateTime}`;
      queryParams.dateFrom = dateFrom;
    }
    if (dateTo && typeof dateTo === 'string') {
      query += ` AND serverTimestamp <= {dateTo:DateTime}`;
      queryParams.dateTo = dateTo;
    }
     if (eventType && typeof eventType === 'string') {
      query += ` AND eventType = {eventType:String}`;
      queryParams.eventType = eventType;
    }

    query += `
      GROUP BY eventType, day
      ORDER BY day ASC
    `;

    const resultSet = await client.query({
      query,
      query_params: queryParams,
      format: 'JSONEachRow',
    });
    
    const data = await resultSet.json();

    return res.status(200).json(data);

  } catch (error) {
    console.error('ClickHouse query failed:', error);
    return res.status(500).json({ error: 'Failed to query analytics data.' });
  }
}

5. Next.js 与 Valtio 前端状态管理

前端的挑战在于管理复杂的、相互依赖的查询过滤器,并以响应式的方式更新UI。Valtio 以其基于 proxy 的极简 API 在此场景下表现出色。

lib/store.ts

import { proxy } from 'valtio';
import { startOfDay, subDays } from 'date-fns';

interface FilterState {
  dateRange: {
    from: Date;
    to: Date;
  };
  selectedUserId: string | null;
  selectedEventType: string | null;
}

// Valtio store 是一个简单的 JS 对象,用 proxy() 包裹
export const filters = proxy<FilterState>({
  dateRange: {
    from: subDays(startOfDay(new Date()), 7),
    to: new Date(),
  },
  selectedUserId: 'user-123',
  selectedEventType: null,
});

// 派生状态或 action 可以是普通函数
export function setDateRange(from: Date, to: Date) {
  filters.dateRange.from = from;
  filters.dateRange.to = to;
}

export function setSelectedUser(userId: string) {
    filters.selectedUserId = userId;
}

components/AnalyticsDashboard.tsx

import { useSnapshot } from 'valtio';
import useSWR from 'swr';
import { filters, setDateRange } from '../lib/store';

const fetcher = (url: string) => fetch(url).then((res) => res.json());

function AnalyticsDashboard() {
  // useSnapshot 创建了对 store 的响应式订阅
  // 当 filters 对象的任何属性变化时,组件都会重渲染
  const snap = useSnapshot(filters);

  // 构建基于当前 filter 状态的 SWR key
  const query = new URLSearchParams({
    userId: snap.selectedUserId || '',
    dateFrom: snap.dateRange.from.toISOString(),
    dateTo: snap.dateRange.to.toISOString(),
  });
  if (snap.selectedEventType) {
    query.set('eventType', snap.selectedEventType);
  }
  
  const { data, error } = useSWR(`/api/query?${query.toString()}`, fetcher);

  // ... UI for Date pickers and other filters
  // 例如,一个日期选择器组件的回调函数
  const handleDateChange = (newFrom: Date, newTo: Date) => {
    // 直接调用 action 函数修改 store,Valtio 会处理好重渲染
    setDateRange(newFrom, newTo);
  };
  
  if (error) return <div>Failed to load data</div>;
  if (!data) return <div>Loading...</div>;

  return (
    <div>
      {/* 渲染图表或数据表格 */}
      <pre>{JSON.stringify(data, null, 2)}</pre>
    </div>
  );
}

export default AnalyticsDashboard;

Valtio 的美妙之处在于其透明性。你只需改变 filters 代理对象上的属性,useSnapshot 就会自动侦测到变化并触发组件的重新渲染。与 Redux 等库相比,它极大地减少了样板代码。

架构的局限性与未来展望

这套架构并非没有成本。它的主要局限性在于:

  1. 延迟: 这是一套“近实时”系统。数据可见性的延迟取决于 Spark/Hudi 作业和同步脚本的运行频率,通常在分钟级别。它不适用于需要秒级甚至毫秒级延迟的场景。
  2. 运维复杂性: 引入了 Spark 和一个独立的同步进程,相比直接写入 OLAP 的方案,运维成本和监控要求都更高。你需要管理 Spark 集群的资源、监控作业失败,并确保同步过程的健壮性。
  3. 成本: 虽然对象存储非常便宜,但持续运行 Spark 集群进行微批处理可能会产生显著的计算成本,需要进行仔细的成本优化和资源调度。

未来的优化路径可能包括:

  • 走向流式处理: 使用 Apache Flink 和 Hudi Sink 替换批处理的 Spark 作业,可以将数据写入 Hudi 的延迟从分钟级降低到秒级。
  • ClickHouse 物化视图: 对于一些固定的、高频的查询模式,可以在 ClickHouse 内部创建物化视图,进一步加速查询,并将部分聚合计算的压力从查询时转移到写入时。
  • 数据质量监控: 在 Hudi 作业中集成数据质量校验(如 Deequ),确保在数据进入可查询层之前发现并隔离坏数据,这是生产级数据管道的必备环节。

  目录