构建跨语言 AI 推理总线:集成 ActiveMQ STOMP、Python 数据处理与 Phoenix Channels


项目初期,我们面临一个典型的异构系统集成难题。核心业务由一套稳定的 Java 系统承载,多年来一直通过 ActiveMQ 进行系统间的事件通知。现在,数据科学团队需要消费这些事件,用他们最新的 Python 模型进行实时推理。同时,运营团队要求一个现代化的 Web 仪表盘,能够实时展示每一条事件的推理结果,并对异常进行告警。技术栈的选择泾渭分明:Java 稳定输出,Python 专注算法,而仪表盘团队则选定了 Elixir 的 Phoenix 框架,看重的是其在并发和实时通信上的卓越表现。

最初的构想是 Python 服务消费 ActiveMQ,然后通过 REST API 将结果写入 Phoenix 后端的数据库,前端再轮询拉取。这个方案很快被否决。轮询带来的延迟和服务器压力对于“实时”监控的需求是不可接受的。我们需要的是一个从事件产生到结果展示的端到端、低延迟的流式管道。

这个挑战的核心在于如何优雅地将三个完全不同的技术生态——JVM 的消息队列、Python 的数据处理能力、BEAM 虚拟机的实时通信——粘合在一起,构建一个可靠、可维护且高性能的“推理总线”。

技术选型决策:STOMP 与 Channels 的双向奔赴

在真实项目中,协议的选择往往比框架本身更重要。

  1. 为什么放弃 JMS,选择 STOMP?
    虽然 ActiveMQ 是一个 JMS (Java Message Service) 的标准实现,但 JMS 客户端通常是重型的,并且在非 Java 语言中支持不佳。我们不能为了消费一个消息,而在 Python 服务中引入复杂的 JPype 或类似的跨语言调用方案,这会带来巨大的维护成本和不稳定性。
    STOMP (Simple Text Oriented Messaging Protocol) 则是一个轻量级的、基于文本的协议,它被设计用来简化与消息中间件的互操作性。ActiveMQ 对 STOMP 提供了原生支持。Python 有成熟的 stomp.py 库,Elixir 社区也有对应的客户端。选择 STOMP 意味着我们的 Python 服务可以作为一个“一等公民”与 ActiveMQ 通信,无需任何 Java 依赖。

  2. 为什么是 Phoenix Channels,而不是简单的 WebSocket?
    Phoenix Channels 不仅仅是 WebSocket 的一层薄封装。它提供了一套完整的发布/订阅、主题(Topic)隔离、授权、心跳检测和自动重连机制。这意味着我们可以为不同的数据流创建不同的主题,例如 inference:results 用于正常结果,inference:alerts 用于异常告警。Python 服务作为客户端连接到 Phoenix,将处理结果推送到指定主题,Phoenix 服务器则负责将这些消息广播给所有订阅了该主题的 Web 客户端。这种模式将 Python 服务与前端的复杂连接管理完全解耦。

最终的架构图如下:

graph TD
    A[Java Legacy System] -- JMS --> B((ActiveMQ));
    B -- STOMP --> C{Python Inference Service};
    C -- AI Model --> C;
    C -- WebSocket --> D{Phoenix Server};
    D -- Phoenix Channels --> E[Web Dashboard UI];

    subgraph "Python Consumer"
        C
    end

    subgraph "Elixir/Phoenix Backend"
        D
    end

    subgraph "Browser"
        E
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#9cf,stroke:#333,stroke-width:2px

第一步:配置 ActiveMQ 暴露 STOMP 连接器

在生产环境中,任何对中间件的改动都必须小心谨慎。我们需要在不影响现有 JMS 客户端的情况下,增加 STOMP 支持。这通过修改 ActiveMQ 的核心配置文件 conf/activemq.xml 来实现。

找到 <transportConnectors> 部分,在其中添加一个新的 <transportConnector> 定义:

<!-- conf/activemq.xml -->
<transportConnectors>
    <!-- Keep existing connectors for JMS, etc. -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

    <!-- 
      This is our new STOMP connector.
      - uri: Binds to all network interfaces on port 61613.
      - wireFormat.maxFrameSize: Increase default frame size for potentially large payloads.
      - transport.hb_grace_period_multiplier: A crucial setting for network stability.
        It allows the heartbeat grace period to be more lenient, preventing premature disconnection
        on networks with minor latency spikes. A value of 1.5 is a safe starting point.
    -->
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?transport.defaultHeartBeat=5000,5000&amp;transport.hb_grace_period_multiplier=1.5"/>
</transportConnectors>

这里的坑在于心跳设置 transport.defaultHeartBeat=5000,5000。第一个 5000 表示服务端期望客户端每 5 秒发送一次心跳,第二个 5000 表示服务端会每 5 秒向客户端发送一次心跳。这对于及时发现死连接至关重要。如果网络环境不稳定,客户端或服务端可能会因为没有及时收到心跳而误判对方掉线。hb_grace_period_multiplier=1.5 提供了一个缓冲,允许 1.5 倍的心跳间隔延迟,增强了连接的韧性。

第二步:构建具备韧性的 Python 推理消费者

这是整个管道的核心。它不仅要能正确处理消息,还必须能在 ActiveMQ 重启、网络抖动、模型推理失败等各种异常情况下保持健壮。

我们将创建一个名为 inference_consumer.py 的服务。

项目结构:

/inference_service
|-- inference_consumer.py
|-- model.py
|-- config.py
|-- requirements.txt

requirements.txt:

stomp.py==8.1.0
websockets==12.0

config.py - 集中管理配置

# config.py
import os

# ActiveMQ STOMP configuration
ACTIVEMQ_HOSTS = [('localhost', 61613)]
ACTIVEMQ_USER = os.getenv('ACTIVEMQ_USER', 'admin')
ACTIVEMQ_PASSWORD = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
# The queue our service will listen to
SOURCE_QUEUE = '/queue/ai.events.raw'
# Dead Letter Queue for failed messages
DLQ_QUEUE = '/queue/ai.events.dlq'
# Heartbeat settings in milliseconds (client-side)
HEARTBEATS = (10000, 10000)

# Phoenix Channels WebSocket configuration
PHOENIX_WS_URL = 'ws://localhost:4000/socket/websocket'
# A token for the Python service to authenticate with Phoenix
# In a real system, this should be a securely managed JWT or API key.
PHOENIX_AUTH_TOKEN = os.getenv('PHOENIX_SERVICE_TOKEN', 'supersecret')
# The topic to push results to
PHOENIX_TOPIC = 'service:inference'

model.py - 模拟的AI模型

# model.py
import random
import time
import json
import logging

class InferenceError(Exception):
    """Custom exception for model inference failures."""
    pass

def run_inference(payload: dict) -> dict:
    """
    Simulates a time-consuming and potentially failing AI model inference.
    In a real application, this would call a scikit-learn, tensorflow, or pytorch model.
    """
    request_id = payload.get('request_id', 'unknown')
    logging.info(f"[{request_id}] Starting inference...")

    # Simulate processing time
    processing_time = random.uniform(0.1, 0.5)
    time.sleep(processing_time)

    # Simulate potential failures
    if random.random() < 0.05: # 5% failure rate
        logging.error(f"[{request_id}] Simulated model failure.")
        raise InferenceError("Model prediction failed due to internal error.")

    # Simulate successful inference result
    result = {
        'request_id': request_id,
        'original_payload': payload,
        'prediction': {
            'class': random.choice(['A', 'B', 'C']),
            'confidence': round(random.uniform(0.7, 0.99), 4)
        },
        'processing_ms': int(processing_time * 1000),
        'status': 'SUCCESS'
    }
    logging.info(f"[{request_id}] Inference successful.")
    return result

inference_consumer.py - 核心逻辑

这是最关键的部分,包含了连接管理、消息处理、错误处理和与 Phoenix 的通信。

# inference_consumer.py
import stomp
import time
import json
import logging
import asyncio
import websockets
import uuid
from threading import Lock

import config
from model import run_inference, InferenceError

# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class PhoenixPusher:
    """Handles WebSocket connection and message pushing to Phoenix Channels."""
    def __init__(self, uri, topic, auth_token):
        self._uri = uri
        self._topic = topic
        self._auth_params = {"token": auth_token}
        self._websocket = None
        self._ref = 0
        self._lock = asyncio.Lock()
        self._connected = asyncio.Event()

    async def connect(self):
        while True:
            try:
                logging.info("Connecting to Phoenix WebSocket...")
                # The 'vsn=2.0.0' parameter is required by Phoenix
                full_uri = f"{self._uri}?vsn=2.0.0"
                self._websocket = await websockets.connect(full_uri)
                self._connected.set()
                logging.info("Successfully connected to Phoenix WebSocket.")
                # Start a task to handle incoming messages (like pings)
                asyncio.create_task(self._listen())
                await self._join_topic()
                break
            except (websockets.exceptions.ConnectionClosedError, OSError) as e:
                logging.error(f"Phoenix connection failed: {e}. Retrying in 5 seconds...")
                self._connected.clear()
                await asyncio.sleep(5)
    
    async def _listen(self):
        try:
            async for message in self._websocket:
                # Handle Phoenix heartbeat replies
                msg_data = json.loads(message)
                if msg_data.get("event") == "phx_reply" and msg_data.get("payload", {}).get("status") == "ok":
                    if msg_data.get("ref") == str(self._ref -1): # Matched our join request
                         logging.info(f"Successfully joined Phoenix topic: {self._topic}")
                # Other incoming messages can be handled here
        except websockets.exceptions.ConnectionClosed:
            logging.warning("Phoenix WebSocket connection closed. Triggering reconnect.")
            self._connected.clear()
            asyncio.create_task(self.connect())


    async def _join_topic(self):
        async with self._lock:
            join_msg = {
                "topic": self._topic,
                "event": "phx_join",
                "payload": self._auth_params,
                "ref": str(self._ref)
            }
            await self._websocket.send(json.dumps(join_msg))
            self._ref += 1

    async def push(self, event: str, payload: dict):
        if not self._connected.is_set():
            logging.warning("Cannot push message, Phoenix not connected. Waiting...")
            await self._connected.wait() # Wait until connection is re-established

        async with self._lock:
            ref = str(uuid.uuid4())
            push_msg = {
                "topic": self._topic,
                "event": event,
                "payload": payload,
                "ref": ref
            }
            try:
                await self._websocket.send(json.dumps(push_msg))
                logging.info(f"Pushed event '{event}' to Phoenix topic '{self._topic}'.")
            except websockets.exceptions.ConnectionClosed:
                 logging.error("Failed to push to Phoenix, connection is closed.")
                 # The listener will handle reconnection
                 self._connected.clear()


class InferenceListener(stomp.ConnectionListener):
    """STOMP Listener for processing messages from ActiveMQ."""
    def __init__(self, conn, pusher):
        self.conn = conn
        self.pusher = pusher
        self.loop = asyncio.get_event_loop()
        self.ack_lock = Lock() # stomp.py is multi-threaded, lock acknowledgements

    def on_error(self, frame):
        logging.error(f'Received an error frame: {frame.body}')

    def on_disconnected(self):
        logging.warning("Disconnected from ActiveMQ. Reconnection logic in main loop will handle this.")

    def on_message(self, frame):
        message_id = frame.headers.get('message-id')
        subscription = frame.headers.get('subscription')
        
        try:
            data = json.loads(frame.body)
            logging.info(f"Received message {message_id} with payload: {data}")

            # Run inference
            result = run_inference(data)
            
            # Push to Phoenix via the asyncio event loop
            asyncio.run_coroutine_threadsafe(
                self.pusher.push("new_result", result), self.loop
            ).result() # Wait for the push to complete

        except json.JSONDecodeError:
            logging.error(f"Failed to decode JSON for message {message_id}. Body: {frame.body}")
            # This is a poison pill. Send to DLQ.
            # Here we should also implement logic to send the raw message body to DLQ
        except InferenceError as e:
            logging.error(f"Inference failed for message {message_id}: {e}")
            # Create a failure report and push to Phoenix for alerting
            failure_payload = {
                "request_id": data.get("request_id", "unknown"),
                "error": str(e),
                "original_payload": data,
                "status": "FAILED"
            }
            asyncio.run_coroutine_threadsafe(
                self.pusher.push("inference_failed", failure_payload), self.loop
            ).result()
        except Exception as e:
            logging.critical(f"Unhandled exception processing message {message_id}: {e}", exc_info=True)
            # For unknown errors, we also don't want to lose the message.
            # A robust system would put it into a DLQ. For now, we log it critically.
        finally:
            # Acknowledge the message so ActiveMQ removes it from the queue.
            # This is CRITICAL. `ack='client-individual'` means we control when a message is considered "done".
            # If the script crashes before this line, the message will be redelivered.
            with self.ack_lock:
                self.conn.ack(message_id, subscription)
            logging.info(f"Acknowledged message {message_id}.")


async def main():
    """Main function to setup connections and run indefinitely."""
    pusher = PhoenixPusher(config.PHOENIX_WS_URL, config.PHOENIX_TOPIC, config.PHOENIX_AUTH_TOKEN)
    # Run Phoenix connection in the background
    asyncio.create_task(pusher.connect())

    conn = stomp.Connection(host_and_ports=config.ACTIVEMQ_HOSTS, heartbeats=config.HEARTBEATS)
    conn.set_listener('', InferenceListener(conn, pusher))
    
    while True:
        try:
            if not conn.is_connected():
                logging.info("Connecting to ActiveMQ...")
                conn.connect(config.ACTIVEMQ_USER, config.ACTIVEMQ_PASSWORD, wait=True)
                # The `ack='client-individual'` is key for manual acknowledgement.
                conn.subscribe(destination=config.SOURCE_QUEUE, id=1, ack='client-individual')
                logging.info(f"Connected to ActiveMQ and subscribed to {config.SOURCE_QUEUE}")
            time.sleep(1) # Main thread sleeps, listener threads do the work
        except stomp.exception.ConnectFailedException:
            logging.error("Failed to connect to ActiveMQ. Retrying in 10 seconds...")
            time.sleep(10)

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Shutting down consumer.")

这段 Python 代码的设计核心在于“韧性”:

  • PhoenixPusher 负责与 Phoenix 的 WebSocket 连接,并包含完整的自动重连逻辑。它使用 asyncio.Event 来同步连接状态,确保在连接断开时,消息推送会等待重连成功。
  • InferenceListenerstomp.py 的回调类。关键在于 on_message 的实现:
    • 详尽的异常捕获:区分了 JSON 解析失败、模型推理失败和未知异常。
    • client-individual 应答模式:只有在消息被完全处理(无论成功或失败)后,才调用 conn.ack()。如果在处理过程中脚本崩溃,ActiveMQ 会将消息重新投递给另一个消费者实例,保证了消息不丢失。
    • 异步集成stomp.py 是基于线程的,而 websockets 是基于 asyncio 的。我们通过 asyncio.run_coroutine_threadsafe 将推送任务安全地从 stomp.py 的工作线程提交到 asyncio 的事件循环中。

第三步:搭建 Phoenix 服务端接收数据

Phoenix 端相对简单,主要工作是定义一个 Channel 来处理来自 Python 服务的连接和消息广播。

1. 创建 Channel

$ mix phx.gen.channel Inference

2. 修改 lib/your_app_web/channels/user_socket.ex
我们需要定义一个 service 套接字,并添加认证逻辑,只允许我们的 Python 服务连接。

# lib/your_app_web/channels/user_socket.ex
defmodule YourAppWeb.UserSocket do
  use Phoenix.Socket

  # Channel for regular browser clients
  channel "room:*", YourAppWeb.RoomChannel
  
  # Channel for our internal Python service
  channel "service:*", YourAppWeb.InferenceChannel

  @impl true
  def connect(%{"token" => token}, socket) do
    # This is a simplified auth mechanism. In production, use a proper JWT library.
    case verify_token(token) do
      {:ok, service_id} ->
        {:ok, assign(socket, :service_id, service_id)}
      :error ->
        :error
    end
  end

  # Fallback for connections without a token (e.g., browser clients)
  def connect(_params, socket) do
    # For this example, we allow anonymous browser connections.
    # In a real app, you would have user authentication here.
    {:ok, assign(socket, :user_id, "guest-#{:rand.uniform(1000)}")}
  end

  @impl true
  def id(socket), do: "users_socket:#{socket.assigns.service_id || socket.assigns.user_id}"

  # In a real system, this would involve cryptographic verification.
  defp verify_token(token) do
    # Compare with the secret token from config
    service_token = Application.get_env(:your_app, :service_auth_token)
    if token == service_token do
      {:ok, "python_inference_service"}
    else
      :error
    end
  end
end

别忘了在 config/config.exs 中添加 service_auth_token

3. 实现 lib/your_app_web/channels/inference_channel.ex

# lib/your_app_web/channels/inference_channel.ex
defmodule YourAppWeb.InferenceChannel do
  use YourAppWeb, :channel
  alias YourAppWeb.Endpoint

  @impl true
  def join("service:inference", %{"token" => _token}, socket) do
    # The authorization already happened in UserSocket.connect/2.
    # We only allow our verified service to join this specific topic.
    if socket.assigns[:service_id] do
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  # This handles incoming messages FROM the Python service
  @impl true
  def handle_in("new_result", payload, socket) do
    # Broadcast the result to all subscribed browser clients on a different topic.
    # This separation prevents browser clients from listening to the service topic.
    Endpoint.broadcast("public:results", "new_result", payload)
    {:noreply, socket}
  end

  @impl true
  def handle_in("inference_failed", payload, socket) do
    # Broadcast failures to a specific alert topic
    Endpoint.broadcast("public:alerts", "inference_failed", payload)
    {:noreply, socket}
  end

  # Catch-all for any other events from the service
  def handle_in(_event, _payload, socket) do
    {:noreply, socket}
  end
end

一个常见的错误是让 Web 客户端和后端服务订阅同一个主题。这里的最佳实践是进行隔离:

  • Python 服务加入并推送到私有主题 service:inference
  • InferenceChannel 收到消息后,将其广播到公共主题,如 public:resultspublic:alerts
  • Web 客户端只被允许订阅这些公共主题。

4. 前端 JavaScript
assets/js/socket.js 中,添加客户端逻辑来连接并监听公共主题。

// assets/js/socket.js
import {Socket} from "phoenix"

let socket = new Socket("/socket", {params: {}}) // No token for guest browser
socket.connect()

// Channel for successful results
let resultsChannel = socket.channel("public:results", {})
resultsChannel.on("new_result", payload => {
  console.log("Received new inference result:", payload);
  // Code to update the DOM, e.g., add a new row to a table
  const resultsTable = document.getElementById("results-table-body");
  if (resultsTable) {
    const row = resultsTable.insertRow(0); // Insert at the top
    row.insertCell(0).innerText = payload.request_id;
    row.insertCell(1).innerText = payload.prediction.class;
    row.insertCell(2).innerText = payload.prediction.confidence;
    row.insertCell(3).innerText = `${payload.processing_ms} ms`;
    row.classList.add('new-row-animation');
  }
})

resultsChannel.join()
  .receive("ok", resp => { console.log("Joined public:results successfully", resp) })
  .receive("error", resp => { console.log("Unable to join public:results", resp) })


// Channel for alerts
let alertsChannel = socket.channel("public:alerts", {})
alertsChannel.on("inference_failed", payload => {
    console.error("Received inference failure alert:", payload);
    // Code to show a toast notification or update an alert panel
    const alertPanel = document.getElementById("alerts-panel");
    if(alertPanel){
        const alertDiv = document.createElement("div");
        alertDiv.className = "alert alert-danger";
        alertDiv.innerText = `[${payload.request_id}] FAILED: ${payload.error}`;
        alertPanel.prepend(alertDiv);
    }
})

alertsChannel.join()
  .receive("ok", resp => { console.log("Joined public:alerts successfully", resp) })
  .receive("error", resp => { console.log("Unable to join public:alerts", resp) })


export default socket

至此,整个端到端的管道已经打通。从 Java 系统产生的事件,流经 ActiveMQ,被 Python 服务消费并执行模型推理,最终结果毫秒级地呈现在 Web 仪表盘上。

方案的局限性与未来优化路径

这个架构虽然解决了核心问题,但在生产环境中,还有几个方面需要进一步加固。

  1. 背压(Backpressure)机制的缺失:
    当前的数据流是纯粹的“推”模型。如果上游 Java 系统瞬间产生大量事件,Python 消费者的处理速度跟不上,消息会在 ActiveMQ 中堆积。这本身是消息队列的正常功能,但如果 Python 推送给 Phoenix 的速度超过了 Phoenix 的广播能力或前端的处理能力,可能会导致 Phoenix 服务器内存压力增大。一个更完善的方案是在 Phoenix 和 Python 之间建立一种简单的背压信号,例如当 Phoenix 负载过高时,可以通过 Channel 通知 Python 服务暂停从 ActiveMQ 拉取消息。

  2. Python 消费者的横向扩展:
    当前的 Python 服务是单体实例。要提高吞吐量,我们可以利用 ActiveMQ 队列的“竞争消费者”模式,简单地启动多个 Python 消费者实例。它们会自动从同一个队列中获取消息,实现负载均衡。但这要求我们的 AI 模型是无状态的,或者其状态管理是分布式的。

  3. 数据契约与序列化:
    目前我们在各个环节都使用 JSON 进行数据交换,这很灵活但缺乏严格的模式校验。在大型系统中,一个微小的字段名或类型变更都可能导致整个管道中断。引入如 Protobuf 或 Avro 等二进制序列化格式,可以提供强类型约束,提高序列化/反序列化性能,并减少网络负载。

  4. 可观测性:
    我们虽然有日志,但缺少度量(Metrics)和追踪(Tracing)。应该在 Python 服务中集成 Prometheus 客户端,暴露处理消息的速率、模型推理的耗时分布、错误率等指标。同时,通过 OpenTelemetry 实现分布式追踪,可以跟踪单个事件从 ActiveMQ 到达最终仪表盘的完整生命周期和延迟,这对于排查性能瓶颈至关重要。


  目录