我们需要一个直接面向用户的分析仪表盘,它必须在亚秒级内响应复杂的查询,同时后端数据平台需要支持对海量事件流的更新、删除和历史回溯。这是一个典型的、存在于现代数据应用中的核心矛盾:分析查询的速度与数据湖的灵活性和可维护性之间的冲突。
直接将事件流写入 ClickHouse 这样的 OLAP 引擎,虽然查询速度极快,但却是一条单行道。数据变更(例如,用户更新了个人信息,需要回溯修改历史事件中的用户属性)变得异常困难,数据湖的可重处理性也无从谈起。反之,一个纯粹的数据湖(如 S3 上的 Parquet 文件),虽然灵活且成本低廉,但无法满足用户交互所需的低延迟查询。
问题的本质在于,我们需要一个能够同时扮演“事务性数据湖”和“高速分析查询引擎”两个角色的系统。单一技术无法优雅地解决这个问题,因此,一个分层的、各司其职的架构成为必然选择。
方案A:直接写入 OLAP 引擎
这是最直接的方案。Vercel Functions 作为事件接收器,将数据直接写入 ClickHouse。
graph TD subgraph Frontend A[Next.js App with Valtio] end subgraph Vercel Platform B(Vercel Function: /api/ingest) end subgraph Data Layer C[ClickHouse Cluster] end A -- "POST event" --> B B -- "INSERT INTO events" --> C A -- "GET /api/query" --> B B -- "SELECT ... FROM events" --> C
优点:
- 极低延迟: 数据路径最短,几乎可以实现实时。
- 架构简单: 移动部件最少,易于理解和初步实现。
致命缺陷:
- 数据可变性差: ClickHouse 对
UPDATE
和DELETE
的支持(ALTER TABLE ... UPDATE/DELETE
)是异步的、重量级的 Mutation 操作,不适用于高频次的单记录更新。修复历史数据或处理 GDPR 删除请求将成为运维噩梦。 - 缺乏数据湖能力: 原始数据直接进入专有格式的 OLAP 引擎,失去了在通用计算框架(如 Spark, Flink)中进行复杂 ETL、机器学习特征工程或数据重处理的灵活性。我们被 ClickHouse 锁定了。
- 读写强耦合: 写入负载的峰值会直接影响查询性能,因为它们竞争的是同一套集群资源。
在真实项目中,数据永远不是完美的,业务逻辑会变更,数据需要勘误。方案 A 的刚性结构在生产环境中会迅速退化为技术债。
方案B:数据湖与 OLAP 引擎分离的最终选择
我们选择的架构引入了 Apache Hudi 作为核心的中间层,它构建在廉价的对象存储(如 AWS S3)之上,为数据湖带来了数据库的核心能力:事务、更新/删除(Upserts)和增量处理。
graph TD subgraph Frontend A[Next.js App with Valtio] end subgraph API & Ingestion Layer B[Vercel Function: /api/ingest] D[Vercel Function: /api/query] end subgraph Data Lake on Object Storage C{S3 Bucket: Raw Events} E[Apache Hudi Table on S3] end subgraph Data Processing F[Spark Job on EMR/Databricks] end subgraph Serving Layer G[ClickHouse Cluster] end subgraph Sync Process H[Python Script: Hudi to ClickHouse] end A -- "POST event" --> B B -- "PutObject" --> C F -- "Reads" --> C F -- "Upserts" --> E H -- "Incremental Pull" --> E H -- "Batch Insert" --> G A -- "GET /api/query" --> D D -- "SELECT ... FROM events" --> G
这个架构将写入路径和查询路径解耦:
- 写入路径:
Next.js -> Vercel Ingest Function -> S3 Raw Bucket -> Spark/Hudi Job -> Hudi Table
- 同步路径:
Hudi Table -> Incremental Sync Script -> ClickHouse
- 查询路径:
Next.js -> Vercel Query Function -> ClickHouse
这种设计通过增加架构深度,换取了系统的鲁棒性、可维护性和扩展性。Hudi 解决了数据湖的可变性问题,ClickHouse 专注于其最擅长的事情——极速查询。
核心实现:代码与配置
1. Vercel Function: 事件采集端点 (/api/ingest
)
这个 Serverless 函数的职责是接收前端事件,进行初步校验,然后快速地将其存入 S3 的一个“着陆区”(landing zone)。它的设计必须是轻量和高可用的。
pages/api/ingest.ts
import { NextApiRequest, NextApiResponse } from 'next';
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { v4 as uuidv4 } from 'uuid';
// 在生产环境中,S3Client 应该在函数外部初始化以复用连接
const s3Client = new S3Client({
region: process.env.AWS_S3_REGION,
credentials: {
accessKeyId: process.env.AWS_S3_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_S3_SECRET_ACCESS_KEY!,
},
});
// 定义一个基础的事件结构,实际项目中会使用 Zod 或类似库进行严格校验
interface AnalyticsEvent {
eventType: string;
userId: string;
payload: Record<string, any>;
clientTimestamp: string; // ISO 8601 format
}
export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
if (req.method !== 'POST') {
res.setHeader('Allow', ['POST']);
return res.status(405).end(`Method ${req.method} Not Allowed`);
}
try {
const event = req.body as AnalyticsEvent;
// 关键步骤:生产级校验
// 这里的校验非常基础,真实项目中应更复杂
if (!event.eventType || !event.userId || !event.clientTimestamp) {
console.warn('Invalid event received', { body: req.body });
return res.status(400).json({ error: 'Invalid event structure.' });
}
const serverTimestamp = new Date().toISOString();
const eventId = uuidv4();
// 增强事件,加入服务器时间戳和唯一ID
const enrichedEvent = {
...event,
eventId,
serverTimestamp,
};
const date = new Date(serverTimestamp);
const year = date.getUTCFullYear();
const month = String(date.getUTCMonth() + 1).padStart(2, '0');
const day = String(date.getUTCDate()).padStart(2, '0');
const hour = String(date.getUTCHours()).padStart(2, '0');
// 按小时分区写入,这是数据湖的标准实践
const key = `raw/events/year=${year}/month=${month}/day=${day}/hour=${hour}/${eventId}.json`;
const command = new PutObjectCommand({
Bucket: process.env.AWS_S3_BUCKET_NAME!,
Key: key,
Body: JSON.stringify(enrichedEvent),
ContentType: 'application/json',
});
await s3Client.send(command);
// 仅在成功写入后返回202 Accepted
// 客户端不需要等待后续处理
return res.status(202).json({ status: 'accepted', eventId });
} catch (error) {
// 关键步骤:错误处理与日志
console.error('Failed to ingest event:', error);
// 避免向客户端暴露过多内部错误细节
return res.status(500).json({ error: 'Internal server error during ingestion.' });
}
}
这里的核心考量是:
- 异步确认: 返回
202 Accepted
告诉客户端请求已被接收,但处理是异步的。这解耦了客户端与后端数据处理。 - 分区路径:
year માતા/day=/hour=
的 S3 Key 结构是 Hive 风格的分区,这对于后续的 Spark/Hudi 高效处理至关重要。 - 错误处理: 捕获异常并记录,但不对外暴露实现细节。
2. Apache Hudi 作业: 构建事务性数据湖
这是架构的核心。一个周期性运行的 Spark 作业(例如每15分钟一次)会读取 S3 landing zone 中的原始 JSON 文件,并将它们 upsert
到 Hudi 表中。我们选择 COPY_ON_WRITE
模式,因为它为读密集型分析场景提供了更好的性能。
process_events_with_hudi.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, lit
from pyspark.sql.types import StructType, StructField, StringType, MapType, TimestampType
# Hudi 配置
HUDI_TABLE_NAME = "user_events"
HUDI_BASE_PATH = "s3a://your-datalake-bucket/hudi/user_events"
RAW_DATA_PATH = "s3a://your-ingest-bucket/raw/events/*/*/*/*" # 读取所有分区
# Hudi 写入选项
# 生产环境中这些配置需要仔细调优
hudi_options = {
'hoodie.table.name': HUDI_TABLE_NAME,
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', # 读优化
'hoodie.datasource.write.recordkey.field': 'eventId', # 唯一记录键
'hoodie.datasource.write.partitionpath.field': 'eventDate', # 分区字段
'hoodie.datasource.write.precombine.field': 'serverTimestamp', # 合并时选择最新记录的依据
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.hive_sync.enable': 'true', # 如果使用Hive Metastore
'hoodie.datasource.hive_sync.table': HUDI_TABLE_NAME,
'hoodie.datasource.hive_sync.database': 'analytics',
'hoodie.datasource.hive_sync.partition_fields': 'eventDate',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.compact.inline': 'false', # 可配置异步的compaction
'hoodie.parquet.compression.codec': 'snappy'
}
def main():
spark = SparkSession.builder \
.appName("Hudi Events Ingestion") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
# 从S3读取原始JSON数据
# 在生产中,你需要一个机制来只读取新的文件,避免重复处理
# 例如,使用 SQS 事件通知或维护一个已处理文件的清单
raw_df = spark.read.text(RAW_DATA_PATH)
# 定义JSON Schema
schema = StructType([
StructField("eventId", StringType(), False),
StructField("eventType", StringType(), True),
StructField("userId", StringType(), True),
StructField("payload", MapType(StringType(), StringType()), True),
StructField("clientTimestamp", StringType(), True),
StructField("serverTimestamp", StringType(), False),
])
# 解析JSON并进行转换
events_df = raw_df.select(from_json(col("value"), schema).alias("data")).select("data.*")
# 添加分区列 eventDate,从 serverTimestamp 转换而来
transformed_df = events_df.withColumn(
"serverTimestamp", col("serverTimestamp").cast(TimestampType())
).withColumn(
"eventDate", col("serverTimestamp").cast("date").cast("string")
)
if transformed_df.count() == 0:
print("No new events to process.")
return
# 将数据写入 Hudi 表
print(f"Writing {transformed_df.count()} records to Hudi table {HUDI_TABLE_NAME}...")
transformed_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(HUDI_BASE_PATH)
print("Hudi write operation completed.")
if __name__ == "__main__":
main()
这段 PySpark 脚本的几个关键点:
- Record Key (
eventId
): Hudi 使用它来唯一标识一条记录,实现upsert
。 - Precombine Key (
serverTimestamp
): 当多条记录有相同的recordKey
时,Hudi 会保留precombineKey
值最大的那条。这对于处理延迟或重复事件至关重要。 - Partition Path (
eventDate
): 物理上将数据按天组织在 S3 上,极大提升了按时间范围查询的性能。
3. 同步 Hudi 到 ClickHouse
Hudi 的一个强大功能是增量查询。我们可以编写一个简单的 Python 脚本,定期查询自上次同步以来的所有变更(新增和更新),然后批量写入 ClickHouse。
sync_hudi_to_clickhouse.py
import os
from datetime import datetime, timedelta
from clickhouse_driver import Client
from pyspark.sql import SparkSession
# --- 配置 ---
HUDI_BASE_PATH = "s3a://your-datalake-bucket/hudi/user_events"
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_TABLE = "analytics.events_local" # 使用本地表进行写入
STATE_FILE = "/opt/sync/last_sync_commit_time.txt" # 存储上次同步点
def get_last_sync_commit_time():
"""读取上次同步的Hudi commit时间"""
if not os.path.exists(STATE_FILE):
# 首次运行时,从1天前开始同步
return (datetime.now() - timedelta(days=1)).strftime("%Y%m%d%H%M%S")
with open(STATE_FILE, 'r') as f:
return f.read().strip()
def save_last_sync_commit_time(commit_time):
"""保存本次同步的Hudi commit时间"""
with open(STATE_FILE, 'w') as f:
f.write(commit_time)
def main():
spark = SparkSession.builder.appName("HudiToClickHouseSync").getOrCreate()
# 1. 获取 Hudi 表的最新 commit 时间
hudi_commits = spark.getCommits(HUDI_BASE_PATH)
if not hudi_commits:
print("No commits found in Hudi table. Exiting.")
return
latest_commit_time = hudi_commits[0]
# 2. 获取上次同步点
begin_time = get_last_sync_commit_time()
if begin_time >= latest_commit_time:
print(f"No new commits since {begin_time}. Exiting.")
return
print(f"Syncing changes from commit > {begin_time} up to {latest_commit_time}")
# 3. 执行 Hudi 增量查询
hudi_inc_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", begin_time) \
.option("hoodie.datasource.read.end.instanttime", latest_commit_time) \
.load(HUDI_BASE_PATH)
# 选择需要的列并丢弃Hudi元数据列
columns_to_sync = ["eventId", "eventType", "userId", "payload", "clientTimestamp", "serverTimestamp"]
data_to_sync_df = hudi_inc_df.select(columns_to_sync)
if data_to_sync_df.count() == 0:
print("No new data in the incremental view. Updating commit time.")
save_last_sync_commit_time(latest_commit_time)
return
# 4. 批量写入 ClickHouse
# .collect() 可能导致OOM,生产环境应使用 .toLocalIterator() 或分批处理
data_to_insert = [row.asDict() for row in data_to_sync_df.collect()]
print(f"Inserting/Updating {len(data_to_insert)} records into ClickHouse...")
client = Client(host=CLICKHOUSE_HOST)
# ClickHouse 的 ReplacingMergeTree 引擎能够根据排序键处理更新
client.execute(
f"INSERT INTO {CLICKHOUSE_TABLE} VALUES",
data_to_insert,
types_check=True
)
print("Insertion complete.")
# 5. 更新同步点
save_last_sync_commit_time(latest_commit_time)
print(f"Successfully synced up to commit {latest_commit_time}.")
if __name__ == "__main__":
main()
这里的 ClickHouse 表引擎选择至关重要。我们会使用 ReplacingMergeTree
。
-- ClickHouse DDL
CREATE TABLE analytics.events_local ON CLUSTER '{cluster}'
(
`eventId` String,
`eventType` String,
`userId` String,
`payload` Map(String, String),
`clientTimestamp` DateTime,
`serverTimestamp` DateTime,
`sign` Int8 -- 用于 ReplacingMergeTree
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/analytics.events_local', '{replica}', serverTimestamp)
PARTITION BY toYYYYMM(serverTimestamp)
ORDER BY (userId, eventType, eventId);
-- 分布式表,用于查询
CREATE TABLE analytics.events ON CLUSTER '{cluster}' AS analytics.events_local
ENGINE = Distributed('{cluster}', analytics, events_local, rand());
ReplicatedReplacingMergeTree
会在后台合并数据时,对于排序键(ORDER BY
子句)相同的行,只保留版本字段(这里是 serverTimestamp
)最新的那一行,从而优雅地实现了 upsert
语义。
4. Vercel Function: 查询端点 (/api/query
)
这个函数负责接收前端的查询请求,将其转换为 ClickHouse SQL,然后返回结果。
pages/api/query.ts
import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@clickhouse/client-web';
const client = createClient({
host: process.env.CLICKHOUSE_HOST,
username: process.env.CLICKHOUSE_USER || 'default',
password: process.env.CLICKHOUSE_PASSWORD || '',
database: 'analytics',
});
export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
if (req.method !== 'GET') {
return res.status(405).end();
}
try {
const { userId, dateFrom, dateTo, eventType } = req.query;
// 参数校验是必须的
if (!userId || typeof userId !== 'string') {
return res.status(400).json({ error: 'userId is required.' });
}
// ... 其他参数校验
// 构建动态查询和参数,防止SQL注入
let query = `
SELECT
eventType,
count() as eventCount,
toStartOfDay(serverTimestamp) as day
FROM analytics.events
WHERE userId = {userId:String}
`;
const queryParams: Record<string, unknown> = { userId };
if (dateFrom && typeof dateFrom === 'string') {
query += ` AND serverTimestamp >= {dateFrom:DateTime}`;
queryParams.dateFrom = dateFrom;
}
if (dateTo && typeof dateTo === 'string') {
query += ` AND serverTimestamp <= {dateTo:DateTime}`;
queryParams.dateTo = dateTo;
}
if (eventType && typeof eventType === 'string') {
query += ` AND eventType = {eventType:String}`;
queryParams.eventType = eventType;
}
query += `
GROUP BY eventType, day
ORDER BY day ASC
`;
const resultSet = await client.query({
query,
query_params: queryParams,
format: 'JSONEachRow',
});
const data = await resultSet.json();
return res.status(200).json(data);
} catch (error) {
console.error('ClickHouse query failed:', error);
return res.status(500).json({ error: 'Failed to query analytics data.' });
}
}
5. Next.js 与 Valtio 前端状态管理
前端的挑战在于管理复杂的、相互依赖的查询过滤器,并以响应式的方式更新UI。Valtio 以其基于 proxy
的极简 API 在此场景下表现出色。
lib/store.ts
import { proxy } from 'valtio';
import { startOfDay, subDays } from 'date-fns';
interface FilterState {
dateRange: {
from: Date;
to: Date;
};
selectedUserId: string | null;
selectedEventType: string | null;
}
// Valtio store 是一个简单的 JS 对象,用 proxy() 包裹
export const filters = proxy<FilterState>({
dateRange: {
from: subDays(startOfDay(new Date()), 7),
to: new Date(),
},
selectedUserId: 'user-123',
selectedEventType: null,
});
// 派生状态或 action 可以是普通函数
export function setDateRange(from: Date, to: Date) {
filters.dateRange.from = from;
filters.dateRange.to = to;
}
export function setSelectedUser(userId: string) {
filters.selectedUserId = userId;
}
components/AnalyticsDashboard.tsx
import { useSnapshot } from 'valtio';
import useSWR from 'swr';
import { filters, setDateRange } from '../lib/store';
const fetcher = (url: string) => fetch(url).then((res) => res.json());
function AnalyticsDashboard() {
// useSnapshot 创建了对 store 的响应式订阅
// 当 filters 对象的任何属性变化时,组件都会重渲染
const snap = useSnapshot(filters);
// 构建基于当前 filter 状态的 SWR key
const query = new URLSearchParams({
userId: snap.selectedUserId || '',
dateFrom: snap.dateRange.from.toISOString(),
dateTo: snap.dateRange.to.toISOString(),
});
if (snap.selectedEventType) {
query.set('eventType', snap.selectedEventType);
}
const { data, error } = useSWR(`/api/query?${query.toString()}`, fetcher);
// ... UI for Date pickers and other filters
// 例如,一个日期选择器组件的回调函数
const handleDateChange = (newFrom: Date, newTo: Date) => {
// 直接调用 action 函数修改 store,Valtio 会处理好重渲染
setDateRange(newFrom, newTo);
};
if (error) return <div>Failed to load data</div>;
if (!data) return <div>Loading...</div>;
return (
<div>
{/* 渲染图表或数据表格 */}
<pre>{JSON.stringify(data, null, 2)}</pre>
</div>
);
}
export default AnalyticsDashboard;
Valtio 的美妙之处在于其透明性。你只需改变 filters
代理对象上的属性,useSnapshot
就会自动侦测到变化并触发组件的重新渲染。与 Redux 等库相比,它极大地减少了样板代码。
架构的局限性与未来展望
这套架构并非没有成本。它的主要局限性在于:
- 延迟: 这是一套“近实时”系统。数据可见性的延迟取决于 Spark/Hudi 作业和同步脚本的运行频率,通常在分钟级别。它不适用于需要秒级甚至毫秒级延迟的场景。
- 运维复杂性: 引入了 Spark 和一个独立的同步进程,相比直接写入 OLAP 的方案,运维成本和监控要求都更高。你需要管理 Spark 集群的资源、监控作业失败,并确保同步过程的健壮性。
- 成本: 虽然对象存储非常便宜,但持续运行 Spark 集群进行微批处理可能会产生显著的计算成本,需要进行仔细的成本优化和资源调度。
未来的优化路径可能包括:
- 走向流式处理: 使用 Apache Flink 和 Hudi Sink 替换批处理的 Spark 作业,可以将数据写入 Hudi 的延迟从分钟级降低到秒级。
- ClickHouse 物化视图: 对于一些固定的、高频的查询模式,可以在 ClickHouse 内部创建物化视图,进一步加速查询,并将部分聚合计算的压力从查询时转移到写入时。
- 数据质量监控: 在 Hudi 作业中集成数据质量校验(如 Deequ),确保在数据进入可查询层之前发现并隔离坏数据,这是生产级数据管道的必备环节。