我们面临的局面并不罕见:一个历经数年演进的系统,核心业务逻辑分散在两个主要技术栈中——一个是用 Ruby on Rails 编写的庞大单体应用,负责处理核心交易和用户管理;另一个则是由多个 PHP-FPM 服务构成的集群,处理着内容管理、营销活动等外围功能。运维团队的日常充满了挑战,其中最棘手的就是日志问题。
两个技术栈的日志系统完全独立。Rails 应用使用 Lograge
将日志输出为 JSON 格式的单行文本,存储在本地磁盘;而 PHP 服务则通过 Monolog
将日志推送到一个老旧的 Syslog-ng 服务器。当一个用户请求跨越了这两个系统时,追踪其完整生命周期几乎是不可能完成的任务。排查线上问题时,工程师需要在两个不同的系统、使用两种不同的查询方式(grep
和 Syslog 查询)手动关联时间戳和有限的上下文信息,效率极低,且极易出错。
graph TD subgraph 用户请求 direction LR U(User) --> GW(Legacy Nginx) end subgraph 后端服务 direction LR GW --> R(Ruby on Rails Monolith) GW --> P1(PHP Service 1) GW --> P2(PHP Service 2) end subgraph 日志孤岛 direction TB R --> RF(Log File on Disk) P1 --> SY(Syslog-ng Server) P2 --> SY end style RF fill:#f9f,stroke:#333,stroke-width:2px style SY fill:#ccf,stroke:#333,stroke-width:2px
这个架构的痛点显而易见:缺乏统一的日志视图、查询和分析能力弱、无法进行跨服务链路追踪、日志格式不统一导致解析成本高昂。我们需要一个统一的可观测性平面。
方案A:传统的代理模式 (Agent-based Approach)
第一个进入考虑范围的方案是行业内的标准做法:在每一台运行 Rails 和 PHP 应用的服务器上部署日志采集代理,例如 Filebeat 或 Fluentd。这些代理负责监控本地日志文件或 Syslog 输出,然后将数据统一发送到一个集中的存储与分析系统,比如 OpenSearch。
优势分析:
- 解耦: 日志采集与应用程序完全解耦。应用只需要关注将日志写入本地即可,无需关心下游的处理逻辑。这对于改造遗留系统来说,侵入性较低。
- 成熟生态: Filebeat 和 Fluentd 拥有庞大的社区和丰富的插件,能够处理各种复杂的日志格式和输出目标。
- 可靠性: 这些代理经过了大规模生产环境的考验,具备磁盘缓冲、背压等机制,能有效防止数据丢失。
劣势分析:
- 运维成本: 我们需要在数十台服务器上部署、配置、监控和升级这些代理。随着服务器数量的增加,这会变成一个沉重的负担。
- 资源消耗: 每个代理都会占用所在服务器的 CPU 和内存资源。对于一些已经处于性能临界点的老旧服务,这可能会成为压垮骆驼的最后一根稻草。
- 格式治理困难: 虽然代理可以解析和转换日志,但源头的日志格式仍然由各个开发团队自行决定。我们无法从根本上强制推行一个统一的、富含上下文的日志结构。Rails 团队和 PHP 团队对“结构化日志”的理解可能完全不同。
- 上下文信息缺失: 代理只能采集到应用自身记录的信息。对于请求头、响应状态码、处理耗时等网关层面的关键信息,应用日志中可能并未完整记录,或者记录的字段名不统一。
在真实项目中,运维成本和治理困难是两个最致命的问题。我们很快意识到,即使引入了代理,我们大部分时间仍在处理数据清洗和格式对齐,而不是在分析数据。
方案B:网关层日志拦截与增强 (Gateway-centric Approach)
既然所有外部流量的入口都将统一收敛到 API 网关,我们是否可以利用网关来完成日志的采集、格式化和发送?我们将技术选型锁定在了 Apache APISIX,因为它基于 Nginx 和 LuaJIT,性能卓越,并且拥有一个高度可扩展的插件生态。
这个方案的核心思想是:开发一个自定义的 APISIX 插件,该插件在请求的 log
阶段被触发。它会捕获请求和响应的全部信息,按照我们预定义的统一 Schema 组装成一个 JSON 对象,然后直接异步推送到 OpenSearch 集群。
优势分析:
- 零侵入与集中治理: 后端 Rails 和 PHP 服务完全无需改动。所有日志格式的定义、修改和升级都在 APISIX 的插件中集中完成。我们可以强制要求所有通过网关的日志都包含
request_id
,client_ip
,user_agent
,upstream_latency
等关键字段。 - 丰富的上下文: APISIX 作为流量入口,天然拥有最完整的请求上下文信息,包括请求头、响应头、TLS 握手信息、客户端 IP、URI、请求体、响应体、以及上下游延迟等。这些信息对于问题排查至关重要,而应用层往往会忽略。
- 性能优势: 日志处理的开销从应用服务器转移到了专用的网关节点。APISIX 的异步处理机制和 LuaJIT 的高性能确保了日志收集不会阻塞正常的请求处理。
- 简化架构: 无需在应用服务器上部署和维护任何代理,极大地降低了系统复杂度和运维成本。
劣势分析:
- 无法覆盖内部日志: 此方案只能捕获通过网关的请求-响应日志。对于应用内部的异步任务(如 Rails 中的 Sidekiq job)或纯粹的内部服务间调用(如果它们不经过网关),此方案无法覆盖。
- 网关成为关键依赖: 日志系统的稳定性现在强依赖于 APISIX 及其自定义插件。插件中的任何 bug 或性能问题都可能影响到整个日志链路。
- 网络开销: APISIX 节点需要与 OpenSearch 集群建立网络连接并推送数据。在高流量下,这可能成为一个新的瓶颈。
决策与理由
权衡之后,我们决定采用方案B。理由如下:当前最大的痛点是面向用户请求的跨服务追踪,方案B完美地解决了这个问题。对于其无法覆盖内部日志的缺陷,我们认为可以接受:首先,80% 的问题都发生在请求-响应链路上;其次,对于关键的异步任务,我们仍然可以保留一个轻量级的本地日志,作为辅助排查手段。我们相信,一个能解决80%问题的、简洁高效的架构,远胜于一个试图解决100%问题但复杂臃肿的架构。
我们将这个新的架构命名为“统一可观测性平面”,其核心是 APISIX 和 OpenSearch。
graph TD subgraph 统一入口与处理 direction LR U(User) --> A(APISIX Gateway) A -- route --> R(Ruby on Rails Monolith) A -- route --> P(PHP Services) end subgraph 统一日志流 A -- "Custom opensearch-logger Plugin (Async Batch POST)" --> OS(OpenSearch Cluster) end subgraph 统一查询与分析 direction LR OS --> Dashboard(Analytics Dashboard
Built with Turbopack) OS --> Alerting(OpenSearch Alerting) OS --> Dev(Developer Query Interface) end style A fill:#f60,stroke:#333,stroke-width:2px style OS fill:#069,stroke:#333,stroke-width:2px
核心实现:APISIX 自定义日志插件
我们用 Lua 编写了一个名为 opensearch-logger
的 APISIX 插件。这个插件是整个架构的核心。
1. 插件代码 (opensearch-logger.lua
)
代码必须是生产级的,考虑到了批量发送、错误处理和异步HTTP客户端。
-- opensearch-logger.lua
local core = require("apisix.core")
local http = require("resty.http")
local plugin = require("apisix.plugin")
local json = require("cjson.safe")
local batch_processor = require("apisix.core.batch_processor")
-- 定义插件的 schema,用于在 Dashboard 中配置
local schema = {
type = "object",
properties = {
endpoint = {
type = "string",
description = "OpenSearch _bulk API endpoint, e.g., http://opensearch-node1:9200/_bulk",
},
index_prefix = {
type = "string",
description = "Prefix for the OpenSearch index, e.g., apisix-logs-",
default = "apisix-logs-"
},
auth_user = {
type = "string",
description = "Basic auth username for OpenSearch."
},
auth_password = {
type = "string",
description = "Basic auth password for OpenSearch."
},
batch_max_size = {
type = "integer",
description = "Max number of logs to buffer before sending.",
default = 1000
},
inactive_timeout = {
type = "integer",
description = "Max seconds to wait before flushing buffer.",
default = 5
}
},
required = {"endpoint"}
}
local _M = {
version = "0.1",
priority = 50, -- 日志插件优先级通常较低
name = "opensearch-logger",
schema = schema
}
-- 插件初始化函数,创建 batch_processor 实例
function _M.init(conf)
local processor_key = core.utils.md5(_M.name .. conf.endpoint)
-- 每个配置创建一个独立的 batch processor
-- 这是关键,确保不同路由的日志配置互不干扰
local processor = batch_processor.new({
key = processor_key,
batch_max_size = conf.batch_max_size,
inactive_timeout = conf.inactive_timeout,
})
-- 定义批处理函数,这是实际发送数据到 OpenSearch 的地方
local function batch_handler(entries)
local ok, err
-- 构造 OpenSearch 的 _bulk API 请求体
-- 格式: action_and_meta_data\n optional_source\n
local body_parts = {}
local index_name = conf.index_prefix .. os.date("!%Y.%m.%d")
for _, entry in ipairs(entries) do
-- 动作和元数据行
table.insert(body_parts, json.encode({ index = { _index = index_name } }))
-- 文档源数据行
table.insert(body_parts, entry)
end
local body = table.concat(body_parts, "\n") .. "\n"
-- 创建 HTTP 客户端实例
local httpc, err = http.new()
if not httpc then
core.log.error("failed to create http client: ", err)
return
end
-- 设置请求头和认证
local headers = {
["Content-Type"] = "application/x-ndjson"
}
if conf.auth_user and conf.auth_password then
local auth_str = core.base64.encode(conf.auth_user .. ":" .. conf.auth_password)
headers["Authorization"] = "Basic " .. auth_str
end
-- 发起请求
-- 这里的超时设置非常重要,防止慢请求阻塞 worker 进程
local res, err = httpc:request_uri(conf.endpoint, {
method = "POST",
body = body,
headers = headers,
timeout = 10000 -- 10秒超时
})
if not res then
core.log.error("failed to send logs to OpenSearch: ", err)
return
}
if res.status >= 300 then
core.log.error("OpenSearch responded with status ", res.status, ", body: ", res.body)
return
}
-- 可选:检查 bulk response 是否有错误
local resp_body, json_err = json.decode(res.body)
if not resp_body or json_err then
core.log.warn("failed to decode OpenSearch bulk response: ", json_err)
return
end
if resp_body.errors then
core.log.error("some logs failed to be indexed in OpenSearch. ", res.body)
end
end
processor:set_handler(batch_handler)
return true
end
-- log 阶段执行,这是插件的核心逻辑
function _M.log(conf, ctx)
-- 收集日志信息
local log_entry = {
["@timestamp"] = core.time.now() * 1000, -- OpenSearch 倾向于使用毫秒级时间戳
server_ip = core.request.get_server_addr(),
client_ip = core.request.get_ip(),
request = {
method = ctx.var.request_method,
uri = ctx.var.request_uri,
host = ctx.var.host,
headers = core.request.get_headers(),
size = ctx.var.request_length
},
response = {
status = ctx.var.status,
size = ctx.var.response_length,
headers = core.response.get_headers()
},
latencies = {
request = ctx.var.request_time * 1000,
upstream = ctx.var.upstream_response_time * 1000,
apisix = ctx.latency_breakdown and ctx.latency_breakdown.apisix or (ctx.var.request_time - ctx.var.upstream_response_time) * 1000
},
upstream = ctx.var.upstream_addr,
route_id = ctx.route_id,
service_id = ctx.service_id,
-- 通过 ngx.ctx 传递的自定义数据,例如 trace_id
trace_id = ngx.ctx.trace_id
}
-- 移除敏感信息,例如 Authorization 头
if log_entry.request.headers and log_entry.request.headers.authorization then
log_entry.request.headers.authorization = "[REDACTED]"
end
-- 序列化并推送到批处理器
local log_str, err = json.encode(log_entry)
if err then
core.log.error("failed to encode log entry: ", err)
return
end
local processor = batch_processor.get(core.utils.md5(_M.name .. conf.endpoint))
if not processor then
core.log.error("could not get batch processor instance")
return
end
local ok, err = processor:push(log_str)
if not ok then
core.log.error("failed to push log to batch processor: ", err)
end
end
return _M
2. OpenSearch 索引模板
为了确保 OpenSearch 能正确地索引我们的数据,我们需要预先定义一个索引模板。这能避免 OpenSearch 动态映射时将 IP 地址映射为 text,或者将状态码映射为 text 等问题。
PUT _index_template/apisix_logs_template
{
"index_patterns": ["apisix-logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "apisix_log_policy",
"index.lifecycle.rollover_alias": "apisix-logs"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"server_ip": { "type": "ip" },
"client_ip": { "type": "ip" },
"request": {
"properties": {
"method": { "type": "keyword" },
"uri": { "type": "wildcard" },
"host": { "type": "keyword" },
"size": { "type": "long" }
}
},
"response": {
"properties": {
"status": { "type": "integer" },
"size": { "type": "long" }
}
},
"latencies": {
"properties": {
"request": { "type": "float" },
"upstream": { "type": "float" },
"apisix": { "type": "float" }
}
},
"upstream": { "type": "keyword" },
"route_id": { "type": "keyword" },
"service_id": { "type": "keyword" },
"trace_id": { "type": "keyword" }
}
}
}
}
我们还定义了索引生命周期管理 (ILM) 策略 apisix_log_policy
,用于自动滚动索引(例如每天或每50GB)并将超过90天的旧数据删除,防止磁盘空间耗尽。
3. 在路由上启用插件
最后,我们为指向 Ruby on Rails 和 PHP 服务的路由启用此插件。
# 示例:为 Rails 服务应用日志插件
curl -i http://127.0.0.1:9180/apisix/admin/routes/1 -X PUT -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -d '
{
"uri": "/api/rails/*",
"upstream": {
"type": "roundrobin",
"nodes": {
"rails-server:3000": 1
}
},
"plugins": {
"opensearch-logger": {
"endpoint": "http://opensearch-node1:9200/_bulk",
"index_prefix": "apisix-logs-rails-",
"auth_user": "admin",
"auth_password": "your_password",
"batch_max_size": 500,
"inactive_timeout": 2
},
"proxy-rewrite": {
"regex_uri": ["/api/rails/(.*)", "/$1"]
}
}
}'
对 PHP 服务也进行类似配置,只需更改 uri
, upstream
和 index_prefix
即可。现在,所有流经 APISIX 的请求都会被自动、结构化地记录到 OpenSearch 中。
前端展现与 Turbopack
日志收集完成后,我们需要一个高效的方式来查询和可视化这些数据。虽然 OpenSearch Dashboards 提供了开箱即用的能力,但我们希望为开发和运维团队构建一个更定制化、更贴合业务场景的分析平台。
我们选择使用 Next.js 和 Turbopack 来构建这个前端项目。选择 Turbopack 的原因在于其极致的开发性能。在处理复杂的数据可视化组件和频繁的代码迭代时,传统 Webpack 的热更新速度会成为瓶颈。Turbopack 基于 Rust 编写,其增量编译引擎能提供近乎瞬时的开发反馈,这对于需要快速验证查询逻辑和图表展现效果的场景来说,极大地提升了开发体验。
前端应用通过一个轻量的后端 BFF (Backend for Frontend) 层与 OpenSearch 的 REST API 交互。BFF 负责将前端的分析请求转换为 OpenSearch 的 DSL 查询。
例如,要统计过去1小时内 Rails 服务的 API 错误率(HTTP 5xx)并按分钟聚合,前端会发送一个请求,BFF 则会生成如下的 OpenSearch 查询:
POST /apisix-logs-rails-*/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{ "range": { "@timestamp": { "gte": "now-1h/m", "lte": "now/m" } } },
{ "range": { "response.status": { "gte": 500 } } }
]
}
},
"aggs": {
"errors_over_time": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1m",
"min_doc_count": 0
}
}
}
}
这种由统一、结构化的日志数据驱动的分析能力,是之前基于 grep
的工作流完全无法比拟的。
架构的局限性与未来展望
这个基于 APISIX 的中心化日志架构并非银弹。它的主要局限性在于无法覆盖非请求驱动的业务逻辑,例如 Rails 应用中的 Sidekiq 异步任务。对于这类任务,我们目前的策略是让它们继续输出结构化日志到本地文件,并为这些特定的服务器部署一个轻量级的 Filebeat 代理作为补充。重要的是,这些日志也必须遵循我们在网关层定义的日志 Schema,特别是要包含 trace_id
,以便能在 OpenSearch 中与主请求链路进行关联。
未来的演进方向是明确的:
- 引入分布式追踪: 当前的
trace_id
是通过 APISIX 的request-id
插件生成的,只能在网关和直连的后端服务之间传递。下一步是引入完整的 OpenTelemetry 支持,让trace_id
和span_id
能够贯穿整个分布式调用链,包括服务之间的内部调用和异步任务,从而实现真正的全链路追踪。APISIX 社区已经提供了opentelemetry
插件,这为我们铺平了道路。 - 日志管道的鲁棒性: 在当前架构中,如果 OpenSearch 集群短暂不可用,APISIX 的
batch_processor
会在内存中缓存一部分日志,但如果长时间中断或 APISIX 进程重启,日志会丢失。对于金融等要求日志绝不丢失的场景,需要在 APISIX 和 OpenSearch 之间引入一个高可用的消息队列(如 Kafka 或 Pulsar)作为缓冲层。APISIX 可以将日志推送到 Kafka,然后由一个独立的消费者组负责将数据写入 OpenSearch,进一步解耦和增强系统的韧性。
这个架构的实施,标志着我们从被动的、分散的日志处理,迈向了主动的、统一的可观测性体系建设。它不仅极大地提升了问题排查的效率,更重要的是,通过对高质量、结构化数据的分析,我们开始能够度量服务质量、洞察用户行为,为业务决策提供了坚实的数据基础。