构建异构系统统一可观测性平面 APISIX 日志插件与 OpenSearch 的集成架构


我们面临的局面并不罕见:一个历经数年演进的系统,核心业务逻辑分散在两个主要技术栈中——一个是用 Ruby on Rails 编写的庞大单体应用,负责处理核心交易和用户管理;另一个则是由多个 PHP-FPM 服务构成的集群,处理着内容管理、营销活动等外围功能。运维团队的日常充满了挑战,其中最棘手的就是日志问题。

两个技术栈的日志系统完全独立。Rails 应用使用 Lograge 将日志输出为 JSON 格式的单行文本,存储在本地磁盘;而 PHP 服务则通过 Monolog 将日志推送到一个老旧的 Syslog-ng 服务器。当一个用户请求跨越了这两个系统时,追踪其完整生命周期几乎是不可能完成的任务。排查线上问题时,工程师需要在两个不同的系统、使用两种不同的查询方式(grep 和 Syslog 查询)手动关联时间戳和有限的上下文信息,效率极低,且极易出错。

graph TD
    subgraph 用户请求
        direction LR
        U(User) --> GW(Legacy Nginx)
    end

    subgraph 后端服务
        direction LR
        GW --> R(Ruby on Rails Monolith)
        GW --> P1(PHP Service 1)
        GW --> P2(PHP Service 2)
    end

    subgraph 日志孤岛
        direction TB
        R --> RF(Log File on Disk)
        P1 --> SY(Syslog-ng Server)
        P2 --> SY
    end

    style RF fill:#f9f,stroke:#333,stroke-width:2px
    style SY fill:#ccf,stroke:#333,stroke-width:2px

这个架构的痛点显而易见:缺乏统一的日志视图、查询和分析能力弱、无法进行跨服务链路追踪、日志格式不统一导致解析成本高昂。我们需要一个统一的可观测性平面。

方案A:传统的代理模式 (Agent-based Approach)

第一个进入考虑范围的方案是行业内的标准做法:在每一台运行 Rails 和 PHP 应用的服务器上部署日志采集代理,例如 Filebeat 或 Fluentd。这些代理负责监控本地日志文件或 Syslog 输出,然后将数据统一发送到一个集中的存储与分析系统,比如 OpenSearch。

优势分析:

  1. 解耦: 日志采集与应用程序完全解耦。应用只需要关注将日志写入本地即可,无需关心下游的处理逻辑。这对于改造遗留系统来说,侵入性较低。
  2. 成熟生态: Filebeat 和 Fluentd 拥有庞大的社区和丰富的插件,能够处理各种复杂的日志格式和输出目标。
  3. 可靠性: 这些代理经过了大规模生产环境的考验,具备磁盘缓冲、背压等机制,能有效防止数据丢失。

劣势分析:

  1. 运维成本: 我们需要在数十台服务器上部署、配置、监控和升级这些代理。随着服务器数量的增加,这会变成一个沉重的负担。
  2. 资源消耗: 每个代理都会占用所在服务器的 CPU 和内存资源。对于一些已经处于性能临界点的老旧服务,这可能会成为压垮骆驼的最后一根稻草。
  3. 格式治理困难: 虽然代理可以解析和转换日志,但源头的日志格式仍然由各个开发团队自行决定。我们无法从根本上强制推行一个统一的、富含上下文的日志结构。Rails 团队和 PHP 团队对“结构化日志”的理解可能完全不同。
  4. 上下文信息缺失: 代理只能采集到应用自身记录的信息。对于请求头、响应状态码、处理耗时等网关层面的关键信息,应用日志中可能并未完整记录,或者记录的字段名不统一。

在真实项目中,运维成本和治理困难是两个最致命的问题。我们很快意识到,即使引入了代理,我们大部分时间仍在处理数据清洗和格式对齐,而不是在分析数据。

方案B:网关层日志拦截与增强 (Gateway-centric Approach)

既然所有外部流量的入口都将统一收敛到 API 网关,我们是否可以利用网关来完成日志的采集、格式化和发送?我们将技术选型锁定在了 Apache APISIX,因为它基于 Nginx 和 LuaJIT,性能卓越,并且拥有一个高度可扩展的插件生态。

这个方案的核心思想是:开发一个自定义的 APISIX 插件,该插件在请求的 log 阶段被触发。它会捕获请求和响应的全部信息,按照我们预定义的统一 Schema 组装成一个 JSON 对象,然后直接异步推送到 OpenSearch 集群。

优势分析:

  1. 零侵入与集中治理: 后端 Rails 和 PHP 服务完全无需改动。所有日志格式的定义、修改和升级都在 APISIX 的插件中集中完成。我们可以强制要求所有通过网关的日志都包含 request_id, client_ip, user_agent, upstream_latency 等关键字段。
  2. 丰富的上下文: APISIX 作为流量入口,天然拥有最完整的请求上下文信息,包括请求头、响应头、TLS 握手信息、客户端 IP、URI、请求体、响应体、以及上下游延迟等。这些信息对于问题排查至关重要,而应用层往往会忽略。
  3. 性能优势: 日志处理的开销从应用服务器转移到了专用的网关节点。APISIX 的异步处理机制和 LuaJIT 的高性能确保了日志收集不会阻塞正常的请求处理。
  4. 简化架构: 无需在应用服务器上部署和维护任何代理,极大地降低了系统复杂度和运维成本。

劣势分析:

  1. 无法覆盖内部日志: 此方案只能捕获通过网关的请求-响应日志。对于应用内部的异步任务(如 Rails 中的 Sidekiq job)或纯粹的内部服务间调用(如果它们不经过网关),此方案无法覆盖。
  2. 网关成为关键依赖: 日志系统的稳定性现在强依赖于 APISIX 及其自定义插件。插件中的任何 bug 或性能问题都可能影响到整个日志链路。
  3. 网络开销: APISIX 节点需要与 OpenSearch 集群建立网络连接并推送数据。在高流量下,这可能成为一个新的瓶颈。

决策与理由

权衡之后,我们决定采用方案B。理由如下:当前最大的痛点是面向用户请求的跨服务追踪,方案B完美地解决了这个问题。对于其无法覆盖内部日志的缺陷,我们认为可以接受:首先,80% 的问题都发生在请求-响应链路上;其次,对于关键的异步任务,我们仍然可以保留一个轻量级的本地日志,作为辅助排查手段。我们相信,一个能解决80%问题的、简洁高效的架构,远胜于一个试图解决100%问题但复杂臃肿的架构。

我们将这个新的架构命名为“统一可观测性平面”,其核心是 APISIX 和 OpenSearch。

graph TD
    subgraph 统一入口与处理
        direction LR
        U(User) --> A(APISIX Gateway)
        A -- route --> R(Ruby on Rails Monolith)
        A -- route --> P(PHP Services)
    end

    subgraph 统一日志流
        A -- "Custom opensearch-logger Plugin (Async Batch POST)" --> OS(OpenSearch Cluster)
    end

    subgraph 统一查询与分析
        direction LR
        OS --> Dashboard(Analytics Dashboard
Built with Turbopack) OS --> Alerting(OpenSearch Alerting) OS --> Dev(Developer Query Interface) end style A fill:#f60,stroke:#333,stroke-width:2px style OS fill:#069,stroke:#333,stroke-width:2px

核心实现:APISIX 自定义日志插件

我们用 Lua 编写了一个名为 opensearch-logger 的 APISIX 插件。这个插件是整个架构的核心。

1. 插件代码 (opensearch-logger.lua)

代码必须是生产级的,考虑到了批量发送、错误处理和异步HTTP客户端。

-- opensearch-logger.lua
local core = require("apisix.core")
local http = require("resty.http")
local plugin = require("apisix.plugin")
local json = require("cjson.safe")
local batch_processor = require("apisix.core.batch_processor")

-- 定义插件的 schema,用于在 Dashboard 中配置
local schema = {
    type = "object",
    properties = {
        endpoint = {
            type = "string",
            description = "OpenSearch _bulk API endpoint, e.g., http://opensearch-node1:9200/_bulk",
        },
        index_prefix = {
            type = "string",
            description = "Prefix for the OpenSearch index, e.g., apisix-logs-",
            default = "apisix-logs-"
        },
        auth_user = {
            type = "string",
            description = "Basic auth username for OpenSearch."
        },
        auth_password = {
            type = "string",
            description = "Basic auth password for OpenSearch."
        },
        batch_max_size = {
            type = "integer",
            description = "Max number of logs to buffer before sending.",
            default = 1000
        },
        inactive_timeout = {
            type = "integer",
            description = "Max seconds to wait before flushing buffer.",
            default = 5
        }
    },
    required = {"endpoint"}
}

local _M = {
    version = "0.1",
    priority = 50, -- 日志插件优先级通常较低
    name = "opensearch-logger",
    schema = schema
}

-- 插件初始化函数,创建 batch_processor 实例
function _M.init(conf)
    local processor_key = core.utils.md5(_M.name .. conf.endpoint)

    -- 每个配置创建一个独立的 batch processor
    -- 这是关键,确保不同路由的日志配置互不干扰
    local processor = batch_processor.new({
        key = processor_key,
        batch_max_size = conf.batch_max_size,
        inactive_timeout = conf.inactive_timeout,
    })

    -- 定义批处理函数,这是实际发送数据到 OpenSearch 的地方
    local function batch_handler(entries)
        local ok, err

        -- 构造 OpenSearch 的 _bulk API 请求体
        -- 格式: action_and_meta_data\n optional_source\n
        local body_parts = {}
        local index_name = conf.index_prefix .. os.date("!%Y.%m.%d")
        for _, entry in ipairs(entries) do
            -- 动作和元数据行
            table.insert(body_parts, json.encode({ index = { _index = index_name } }))
            -- 文档源数据行
            table.insert(body_parts, entry)
        end
        local body = table.concat(body_parts, "\n") .. "\n"

        -- 创建 HTTP 客户端实例
        local httpc, err = http.new()
        if not httpc then
            core.log.error("failed to create http client: ", err)
            return
        end

        -- 设置请求头和认证
        local headers = {
            ["Content-Type"] = "application/x-ndjson"
        }
        if conf.auth_user and conf.auth_password then
            local auth_str = core.base64.encode(conf.auth_user .. ":" .. conf.auth_password)
            headers["Authorization"] = "Basic " .. auth_str
        end

        -- 发起请求
        -- 这里的超时设置非常重要,防止慢请求阻塞 worker 进程
        local res, err = httpc:request_uri(conf.endpoint, {
            method = "POST",
            body = body,
            headers = headers,
            timeout = 10000 -- 10秒超时
        })

        if not res then
            core.log.error("failed to send logs to OpenSearch: ", err)
            return
        }

        if res.status >= 300 then
            core.log.error("OpenSearch responded with status ", res.status, ", body: ", res.body)
            return
        }

        -- 可选:检查 bulk response 是否有错误
        local resp_body, json_err = json.decode(res.body)
        if not resp_body or json_err then
             core.log.warn("failed to decode OpenSearch bulk response: ", json_err)
             return
        end
        if resp_body.errors then
            core.log.error("some logs failed to be indexed in OpenSearch. ", res.body)
        end
    end

    processor:set_handler(batch_handler)
    return true
end

-- log 阶段执行,这是插件的核心逻辑
function _M.log(conf, ctx)
    -- 收集日志信息
    local log_entry = {
        ["@timestamp"] = core.time.now() * 1000, -- OpenSearch 倾向于使用毫秒级时间戳
        server_ip = core.request.get_server_addr(),
        client_ip = core.request.get_ip(),
        request = {
            method = ctx.var.request_method,
            uri = ctx.var.request_uri,
            host = ctx.var.host,
            headers = core.request.get_headers(),
            size = ctx.var.request_length
        },
        response = {
            status = ctx.var.status,
            size = ctx.var.response_length,
            headers = core.response.get_headers()
        },
        latencies = {
            request = ctx.var.request_time * 1000,
            upstream = ctx.var.upstream_response_time * 1000,
            apisix = ctx.latency_breakdown and ctx.latency_breakdown.apisix or (ctx.var.request_time - ctx.var.upstream_response_time) * 1000
        },
        upstream = ctx.var.upstream_addr,
        route_id = ctx.route_id,
        service_id = ctx.service_id,
        -- 通过 ngx.ctx 传递的自定义数据,例如 trace_id
        trace_id = ngx.ctx.trace_id 
    }

    -- 移除敏感信息,例如 Authorization 头
    if log_entry.request.headers and log_entry.request.headers.authorization then
        log_entry.request.headers.authorization = "[REDACTED]"
    end

    -- 序列化并推送到批处理器
    local log_str, err = json.encode(log_entry)
    if err then
        core.log.error("failed to encode log entry: ", err)
        return
    end

    local processor = batch_processor.get(core.utils.md5(_M.name .. conf.endpoint))
    if not processor then
        core.log.error("could not get batch processor instance")
        return
    end
    
    local ok, err = processor:push(log_str)
    if not ok then
        core.log.error("failed to push log to batch processor: ", err)
    end
end

return _M

2. OpenSearch 索引模板

为了确保 OpenSearch 能正确地索引我们的数据,我们需要预先定义一个索引模板。这能避免 OpenSearch 动态映射时将 IP 地址映射为 text,或者将状态码映射为 text 等问题。

PUT _index_template/apisix_logs_template
{
  "index_patterns": ["apisix-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "index.lifecycle.name": "apisix_log_policy",
      "index.lifecycle.rollover_alias": "apisix-logs" 
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "server_ip": { "type": "ip" },
        "client_ip": { "type": "ip" },
        "request": {
          "properties": {
            "method": { "type": "keyword" },
            "uri": { "type": "wildcard" },
            "host": { "type": "keyword" },
            "size": { "type": "long" }
          }
        },
        "response": {
          "properties": {
            "status": { "type": "integer" },
            "size": { "type": "long" }
          }
        },
        "latencies": {
          "properties": {
            "request": { "type": "float" },
            "upstream": { "type": "float" },
            "apisix": { "type": "float" }
          }
        },
        "upstream": { "type": "keyword" },
        "route_id": { "type": "keyword" },
        "service_id": { "type": "keyword" },
        "trace_id": { "type": "keyword" }
      }
    }
  }
}

我们还定义了索引生命周期管理 (ILM) 策略 apisix_log_policy,用于自动滚动索引(例如每天或每50GB)并将超过90天的旧数据删除,防止磁盘空间耗尽。

3. 在路由上启用插件

最后,我们为指向 Ruby on Rails 和 PHP 服务的路由启用此插件。

# 示例:为 Rails 服务应用日志插件
curl -i http://127.0.0.1:9180/apisix/admin/routes/1 -X PUT -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -d '
{
    "uri": "/api/rails/*",
    "upstream": {
        "type": "roundrobin",
        "nodes": {
            "rails-server:3000": 1
        }
    },
    "plugins": {
        "opensearch-logger": {
            "endpoint": "http://opensearch-node1:9200/_bulk",
            "index_prefix": "apisix-logs-rails-",
            "auth_user": "admin",
            "auth_password": "your_password",
            "batch_max_size": 500,
            "inactive_timeout": 2
        },
        "proxy-rewrite": {
            "regex_uri": ["/api/rails/(.*)", "/$1"]
        }
    }
}'

对 PHP 服务也进行类似配置,只需更改 uri, upstreamindex_prefix 即可。现在,所有流经 APISIX 的请求都会被自动、结构化地记录到 OpenSearch 中。

前端展现与 Turbopack

日志收集完成后,我们需要一个高效的方式来查询和可视化这些数据。虽然 OpenSearch Dashboards 提供了开箱即用的能力,但我们希望为开发和运维团队构建一个更定制化、更贴合业务场景的分析平台。

我们选择使用 Next.js 和 Turbopack 来构建这个前端项目。选择 Turbopack 的原因在于其极致的开发性能。在处理复杂的数据可视化组件和频繁的代码迭代时,传统 Webpack 的热更新速度会成为瓶颈。Turbopack 基于 Rust 编写,其增量编译引擎能提供近乎瞬时的开发反馈,这对于需要快速验证查询逻辑和图表展现效果的场景来说,极大地提升了开发体验。

前端应用通过一个轻量的后端 BFF (Backend for Frontend) 层与 OpenSearch 的 REST API 交互。BFF 负责将前端的分析请求转换为 OpenSearch 的 DSL 查询。

例如,要统计过去1小时内 Rails 服务的 API 错误率(HTTP 5xx)并按分钟聚合,前端会发送一个请求,BFF 则会生成如下的 OpenSearch 查询:

POST /apisix-logs-rails-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        { "range": { "@timestamp": { "gte": "now-1h/m", "lte": "now/m" } } },
        { "range": { "response.status": { "gte": 500 } } }
      ]
    }
  },
  "aggs": {
    "errors_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "fixed_interval": "1m",
        "min_doc_count": 0
      }
    }
  }
}

这种由统一、结构化的日志数据驱动的分析能力,是之前基于 grep 的工作流完全无法比拟的。

架构的局限性与未来展望

这个基于 APISIX 的中心化日志架构并非银弹。它的主要局限性在于无法覆盖非请求驱动的业务逻辑,例如 Rails 应用中的 Sidekiq 异步任务。对于这类任务,我们目前的策略是让它们继续输出结构化日志到本地文件,并为这些特定的服务器部署一个轻量级的 Filebeat 代理作为补充。重要的是,这些日志也必须遵循我们在网关层定义的日志 Schema,特别是要包含 trace_id,以便能在 OpenSearch 中与主请求链路进行关联。

未来的演进方向是明确的:

  1. 引入分布式追踪: 当前的 trace_id 是通过 APISIX 的 request-id 插件生成的,只能在网关和直连的后端服务之间传递。下一步是引入完整的 OpenTelemetry 支持,让 trace_idspan_id 能够贯穿整个分布式调用链,包括服务之间的内部调用和异步任务,从而实现真正的全链路追踪。APISIX 社区已经提供了 opentelemetry 插件,这为我们铺平了道路。
  2. 日志管道的鲁棒性: 在当前架构中,如果 OpenSearch 集群短暂不可用,APISIX 的 batch_processor 会在内存中缓存一部分日志,但如果长时间中断或 APISIX 进程重启,日志会丢失。对于金融等要求日志绝不丢失的场景,需要在 APISIX 和 OpenSearch 之间引入一个高可用的消息队列(如 Kafka 或 Pulsar)作为缓冲层。APISIX 可以将日志推送到 Kafka,然后由一个独立的消费者组负责将数据写入 OpenSearch,进一步解耦和增强系统的韧性。

这个架构的实施,标志着我们从被动的、分散的日志处理,迈向了主动的、统一的可观测性体系建设。它不仅极大地提升了问题排查的效率,更重要的是,通过对高质量、结构化数据的分析,我们开始能够度量服务质量、洞察用户行为,为业务决策提供了坚实的数据基础。


  目录