项目初期,我们面临一个典型的异构系统集成难题。核心业务由一套稳定的 Java 系统承载,多年来一直通过 ActiveMQ 进行系统间的事件通知。现在,数据科学团队需要消费这些事件,用他们最新的 Python 模型进行实时推理。同时,运营团队要求一个现代化的 Web 仪表盘,能够实时展示每一条事件的推理结果,并对异常进行告警。技术栈的选择泾渭分明:Java 稳定输出,Python 专注算法,而仪表盘团队则选定了 Elixir 的 Phoenix 框架,看重的是其在并发和实时通信上的卓越表现。
最初的构想是 Python 服务消费 ActiveMQ,然后通过 REST API 将结果写入 Phoenix 后端的数据库,前端再轮询拉取。这个方案很快被否决。轮询带来的延迟和服务器压力对于“实时”监控的需求是不可接受的。我们需要的是一个从事件产生到结果展示的端到端、低延迟的流式管道。
这个挑战的核心在于如何优雅地将三个完全不同的技术生态——JVM 的消息队列、Python 的数据处理能力、BEAM 虚拟机的实时通信——粘合在一起,构建一个可靠、可维护且高性能的“推理总线”。
技术选型决策:STOMP 与 Channels 的双向奔赴
在真实项目中,协议的选择往往比框架本身更重要。
为什么放弃 JMS,选择 STOMP?
虽然 ActiveMQ 是一个 JMS (Java Message Service) 的标准实现,但 JMS 客户端通常是重型的,并且在非 Java 语言中支持不佳。我们不能为了消费一个消息,而在 Python 服务中引入复杂的 JPype 或类似的跨语言调用方案,这会带来巨大的维护成本和不稳定性。
STOMP (Simple Text Oriented Messaging Protocol) 则是一个轻量级的、基于文本的协议,它被设计用来简化与消息中间件的互操作性。ActiveMQ 对 STOMP 提供了原生支持。Python 有成熟的stomp.py
库,Elixir 社区也有对应的客户端。选择 STOMP 意味着我们的 Python 服务可以作为一个“一等公民”与 ActiveMQ 通信,无需任何 Java 依赖。为什么是 Phoenix Channels,而不是简单的 WebSocket?
Phoenix Channels 不仅仅是 WebSocket 的一层薄封装。它提供了一套完整的发布/订阅、主题(Topic)隔离、授权、心跳检测和自动重连机制。这意味着我们可以为不同的数据流创建不同的主题,例如inference:results
用于正常结果,inference:alerts
用于异常告警。Python 服务作为客户端连接到 Phoenix,将处理结果推送到指定主题,Phoenix 服务器则负责将这些消息广播给所有订阅了该主题的 Web 客户端。这种模式将 Python 服务与前端的复杂连接管理完全解耦。
最终的架构图如下:
graph TD A[Java Legacy System] -- JMS --> B((ActiveMQ)); B -- STOMP --> C{Python Inference Service}; C -- AI Model --> C; C -- WebSocket --> D{Phoenix Server}; D -- Phoenix Channels --> E[Web Dashboard UI]; subgraph "Python Consumer" C end subgraph "Elixir/Phoenix Backend" D end subgraph "Browser" E end style C fill:#f9f,stroke:#333,stroke-width:2px style D fill:#9cf,stroke:#333,stroke-width:2px
第一步:配置 ActiveMQ 暴露 STOMP 连接器
在生产环境中,任何对中间件的改动都必须小心谨慎。我们需要在不影响现有 JMS 客户端的情况下,增加 STOMP 支持。这通过修改 ActiveMQ 的核心配置文件 conf/activemq.xml
来实现。
找到 <transportConnectors>
部分,在其中添加一个新的 <transportConnector>
定义:
<!-- conf/activemq.xml -->
<transportConnectors>
<!-- Keep existing connectors for JMS, etc. -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<!--
This is our new STOMP connector.
- uri: Binds to all network interfaces on port 61613.
- wireFormat.maxFrameSize: Increase default frame size for potentially large payloads.
- transport.hb_grace_period_multiplier: A crucial setting for network stability.
It allows the heartbeat grace period to be more lenient, preventing premature disconnection
on networks with minor latency spikes. A value of 1.5 is a safe starting point.
-->
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?transport.defaultHeartBeat=5000,5000&transport.hb_grace_period_multiplier=1.5"/>
</transportConnectors>
这里的坑在于心跳设置 transport.defaultHeartBeat=5000,5000
。第一个 5000
表示服务端期望客户端每 5 秒发送一次心跳,第二个 5000
表示服务端会每 5 秒向客户端发送一次心跳。这对于及时发现死连接至关重要。如果网络环境不稳定,客户端或服务端可能会因为没有及时收到心跳而误判对方掉线。hb_grace_period_multiplier=1.5
提供了一个缓冲,允许 1.5 倍的心跳间隔延迟,增强了连接的韧性。
第二步:构建具备韧性的 Python 推理消费者
这是整个管道的核心。它不仅要能正确处理消息,还必须能在 ActiveMQ 重启、网络抖动、模型推理失败等各种异常情况下保持健壮。
我们将创建一个名为 inference_consumer.py
的服务。
项目结构:
/inference_service
|-- inference_consumer.py
|-- model.py
|-- config.py
|-- requirements.txt
requirements.txt
:
stomp.py==8.1.0
websockets==12.0
config.py
- 集中管理配置
# config.py
import os
# ActiveMQ STOMP configuration
ACTIVEMQ_HOSTS = [('localhost', 61613)]
ACTIVEMQ_USER = os.getenv('ACTIVEMQ_USER', 'admin')
ACTIVEMQ_PASSWORD = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
# The queue our service will listen to
SOURCE_QUEUE = '/queue/ai.events.raw'
# Dead Letter Queue for failed messages
DLQ_QUEUE = '/queue/ai.events.dlq'
# Heartbeat settings in milliseconds (client-side)
HEARTBEATS = (10000, 10000)
# Phoenix Channels WebSocket configuration
PHOENIX_WS_URL = 'ws://localhost:4000/socket/websocket'
# A token for the Python service to authenticate with Phoenix
# In a real system, this should be a securely managed JWT or API key.
PHOENIX_AUTH_TOKEN = os.getenv('PHOENIX_SERVICE_TOKEN', 'supersecret')
# The topic to push results to
PHOENIX_TOPIC = 'service:inference'
model.py
- 模拟的AI模型
# model.py
import random
import time
import json
import logging
class InferenceError(Exception):
"""Custom exception for model inference failures."""
pass
def run_inference(payload: dict) -> dict:
"""
Simulates a time-consuming and potentially failing AI model inference.
In a real application, this would call a scikit-learn, tensorflow, or pytorch model.
"""
request_id = payload.get('request_id', 'unknown')
logging.info(f"[{request_id}] Starting inference...")
# Simulate processing time
processing_time = random.uniform(0.1, 0.5)
time.sleep(processing_time)
# Simulate potential failures
if random.random() < 0.05: # 5% failure rate
logging.error(f"[{request_id}] Simulated model failure.")
raise InferenceError("Model prediction failed due to internal error.")
# Simulate successful inference result
result = {
'request_id': request_id,
'original_payload': payload,
'prediction': {
'class': random.choice(['A', 'B', 'C']),
'confidence': round(random.uniform(0.7, 0.99), 4)
},
'processing_ms': int(processing_time * 1000),
'status': 'SUCCESS'
}
logging.info(f"[{request_id}] Inference successful.")
return result
inference_consumer.py
- 核心逻辑
这是最关键的部分,包含了连接管理、消息处理、错误处理和与 Phoenix 的通信。
# inference_consumer.py
import stomp
import time
import json
import logging
import asyncio
import websockets
import uuid
from threading import Lock
import config
from model import run_inference, InferenceError
# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class PhoenixPusher:
"""Handles WebSocket connection and message pushing to Phoenix Channels."""
def __init__(self, uri, topic, auth_token):
self._uri = uri
self._topic = topic
self._auth_params = {"token": auth_token}
self._websocket = None
self._ref = 0
self._lock = asyncio.Lock()
self._connected = asyncio.Event()
async def connect(self):
while True:
try:
logging.info("Connecting to Phoenix WebSocket...")
# The 'vsn=2.0.0' parameter is required by Phoenix
full_uri = f"{self._uri}?vsn=2.0.0"
self._websocket = await websockets.connect(full_uri)
self._connected.set()
logging.info("Successfully connected to Phoenix WebSocket.")
# Start a task to handle incoming messages (like pings)
asyncio.create_task(self._listen())
await self._join_topic()
break
except (websockets.exceptions.ConnectionClosedError, OSError) as e:
logging.error(f"Phoenix connection failed: {e}. Retrying in 5 seconds...")
self._connected.clear()
await asyncio.sleep(5)
async def _listen(self):
try:
async for message in self._websocket:
# Handle Phoenix heartbeat replies
msg_data = json.loads(message)
if msg_data.get("event") == "phx_reply" and msg_data.get("payload", {}).get("status") == "ok":
if msg_data.get("ref") == str(self._ref -1): # Matched our join request
logging.info(f"Successfully joined Phoenix topic: {self._topic}")
# Other incoming messages can be handled here
except websockets.exceptions.ConnectionClosed:
logging.warning("Phoenix WebSocket connection closed. Triggering reconnect.")
self._connected.clear()
asyncio.create_task(self.connect())
async def _join_topic(self):
async with self._lock:
join_msg = {
"topic": self._topic,
"event": "phx_join",
"payload": self._auth_params,
"ref": str(self._ref)
}
await self._websocket.send(json.dumps(join_msg))
self._ref += 1
async def push(self, event: str, payload: dict):
if not self._connected.is_set():
logging.warning("Cannot push message, Phoenix not connected. Waiting...")
await self._connected.wait() # Wait until connection is re-established
async with self._lock:
ref = str(uuid.uuid4())
push_msg = {
"topic": self._topic,
"event": event,
"payload": payload,
"ref": ref
}
try:
await self._websocket.send(json.dumps(push_msg))
logging.info(f"Pushed event '{event}' to Phoenix topic '{self._topic}'.")
except websockets.exceptions.ConnectionClosed:
logging.error("Failed to push to Phoenix, connection is closed.")
# The listener will handle reconnection
self._connected.clear()
class InferenceListener(stomp.ConnectionListener):
"""STOMP Listener for processing messages from ActiveMQ."""
def __init__(self, conn, pusher):
self.conn = conn
self.pusher = pusher
self.loop = asyncio.get_event_loop()
self.ack_lock = Lock() # stomp.py is multi-threaded, lock acknowledgements
def on_error(self, frame):
logging.error(f'Received an error frame: {frame.body}')
def on_disconnected(self):
logging.warning("Disconnected from ActiveMQ. Reconnection logic in main loop will handle this.")
def on_message(self, frame):
message_id = frame.headers.get('message-id')
subscription = frame.headers.get('subscription')
try:
data = json.loads(frame.body)
logging.info(f"Received message {message_id} with payload: {data}")
# Run inference
result = run_inference(data)
# Push to Phoenix via the asyncio event loop
asyncio.run_coroutine_threadsafe(
self.pusher.push("new_result", result), self.loop
).result() # Wait for the push to complete
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON for message {message_id}. Body: {frame.body}")
# This is a poison pill. Send to DLQ.
# Here we should also implement logic to send the raw message body to DLQ
except InferenceError as e:
logging.error(f"Inference failed for message {message_id}: {e}")
# Create a failure report and push to Phoenix for alerting
failure_payload = {
"request_id": data.get("request_id", "unknown"),
"error": str(e),
"original_payload": data,
"status": "FAILED"
}
asyncio.run_coroutine_threadsafe(
self.pusher.push("inference_failed", failure_payload), self.loop
).result()
except Exception as e:
logging.critical(f"Unhandled exception processing message {message_id}: {e}", exc_info=True)
# For unknown errors, we also don't want to lose the message.
# A robust system would put it into a DLQ. For now, we log it critically.
finally:
# Acknowledge the message so ActiveMQ removes it from the queue.
# This is CRITICAL. `ack='client-individual'` means we control when a message is considered "done".
# If the script crashes before this line, the message will be redelivered.
with self.ack_lock:
self.conn.ack(message_id, subscription)
logging.info(f"Acknowledged message {message_id}.")
async def main():
"""Main function to setup connections and run indefinitely."""
pusher = PhoenixPusher(config.PHOENIX_WS_URL, config.PHOENIX_TOPIC, config.PHOENIX_AUTH_TOKEN)
# Run Phoenix connection in the background
asyncio.create_task(pusher.connect())
conn = stomp.Connection(host_and_ports=config.ACTIVEMQ_HOSTS, heartbeats=config.HEARTBEATS)
conn.set_listener('', InferenceListener(conn, pusher))
while True:
try:
if not conn.is_connected():
logging.info("Connecting to ActiveMQ...")
conn.connect(config.ACTIVEMQ_USER, config.ACTIVEMQ_PASSWORD, wait=True)
# The `ack='client-individual'` is key for manual acknowledgement.
conn.subscribe(destination=config.SOURCE_QUEUE, id=1, ack='client-individual')
logging.info(f"Connected to ActiveMQ and subscribed to {config.SOURCE_QUEUE}")
time.sleep(1) # Main thread sleeps, listener threads do the work
except stomp.exception.ConnectFailedException:
logging.error("Failed to connect to ActiveMQ. Retrying in 10 seconds...")
time.sleep(10)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Shutting down consumer.")
这段 Python 代码的设计核心在于“韧性”:
PhoenixPusher
负责与 Phoenix 的 WebSocket 连接,并包含完整的自动重连逻辑。它使用asyncio.Event
来同步连接状态,确保在连接断开时,消息推送会等待重连成功。InferenceListener
是stomp.py
的回调类。关键在于on_message
的实现:- 详尽的异常捕获:区分了 JSON 解析失败、模型推理失败和未知异常。
client-individual
应答模式:只有在消息被完全处理(无论成功或失败)后,才调用conn.ack()
。如果在处理过程中脚本崩溃,ActiveMQ 会将消息重新投递给另一个消费者实例,保证了消息不丢失。- 异步集成:
stomp.py
是基于线程的,而websockets
是基于asyncio
的。我们通过asyncio.run_coroutine_threadsafe
将推送任务安全地从stomp.py
的工作线程提交到asyncio
的事件循环中。
第三步:搭建 Phoenix 服务端接收数据
Phoenix 端相对简单,主要工作是定义一个 Channel 来处理来自 Python 服务的连接和消息广播。
1. 创建 Channel
$ mix phx.gen.channel Inference
2. 修改 lib/your_app_web/channels/user_socket.ex
我们需要定义一个 service
套接字,并添加认证逻辑,只允许我们的 Python 服务连接。
# lib/your_app_web/channels/user_socket.ex
defmodule YourAppWeb.UserSocket do
use Phoenix.Socket
# Channel for regular browser clients
channel "room:*", YourAppWeb.RoomChannel
# Channel for our internal Python service
channel "service:*", YourAppWeb.InferenceChannel
@impl true
def connect(%{"token" => token}, socket) do
# This is a simplified auth mechanism. In production, use a proper JWT library.
case verify_token(token) do
{:ok, service_id} ->
{:ok, assign(socket, :service_id, service_id)}
:error ->
:error
end
end
# Fallback for connections without a token (e.g., browser clients)
def connect(_params, socket) do
# For this example, we allow anonymous browser connections.
# In a real app, you would have user authentication here.
{:ok, assign(socket, :user_id, "guest-#{:rand.uniform(1000)}")}
end
@impl true
def id(socket), do: "users_socket:#{socket.assigns.service_id || socket.assigns.user_id}"
# In a real system, this would involve cryptographic verification.
defp verify_token(token) do
# Compare with the secret token from config
service_token = Application.get_env(:your_app, :service_auth_token)
if token == service_token do
{:ok, "python_inference_service"}
else
:error
end
end
end
别忘了在 config/config.exs
中添加 service_auth_token
。
3. 实现 lib/your_app_web/channels/inference_channel.ex
# lib/your_app_web/channels/inference_channel.ex
defmodule YourAppWeb.InferenceChannel do
use YourAppWeb, :channel
alias YourAppWeb.Endpoint
@impl true
def join("service:inference", %{"token" => _token}, socket) do
# The authorization already happened in UserSocket.connect/2.
# We only allow our verified service to join this specific topic.
if socket.assigns[:service_id] do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
# This handles incoming messages FROM the Python service
@impl true
def handle_in("new_result", payload, socket) do
# Broadcast the result to all subscribed browser clients on a different topic.
# This separation prevents browser clients from listening to the service topic.
Endpoint.broadcast("public:results", "new_result", payload)
{:noreply, socket}
end
@impl true
def handle_in("inference_failed", payload, socket) do
# Broadcast failures to a specific alert topic
Endpoint.broadcast("public:alerts", "inference_failed", payload)
{:noreply, socket}
end
# Catch-all for any other events from the service
def handle_in(_event, _payload, socket) do
{:noreply, socket}
end
end
一个常见的错误是让 Web 客户端和后端服务订阅同一个主题。这里的最佳实践是进行隔离:
- Python 服务加入并推送到私有主题
service:inference
。 InferenceChannel
收到消息后,将其广播到公共主题,如public:results
和public:alerts
。- Web 客户端只被允许订阅这些公共主题。
4. 前端 JavaScript
在 assets/js/socket.js
中,添加客户端逻辑来连接并监听公共主题。
// assets/js/socket.js
import {Socket} from "phoenix"
let socket = new Socket("/socket", {params: {}}) // No token for guest browser
socket.connect()
// Channel for successful results
let resultsChannel = socket.channel("public:results", {})
resultsChannel.on("new_result", payload => {
console.log("Received new inference result:", payload);
// Code to update the DOM, e.g., add a new row to a table
const resultsTable = document.getElementById("results-table-body");
if (resultsTable) {
const row = resultsTable.insertRow(0); // Insert at the top
row.insertCell(0).innerText = payload.request_id;
row.insertCell(1).innerText = payload.prediction.class;
row.insertCell(2).innerText = payload.prediction.confidence;
row.insertCell(3).innerText = `${payload.processing_ms} ms`;
row.classList.add('new-row-animation');
}
})
resultsChannel.join()
.receive("ok", resp => { console.log("Joined public:results successfully", resp) })
.receive("error", resp => { console.log("Unable to join public:results", resp) })
// Channel for alerts
let alertsChannel = socket.channel("public:alerts", {})
alertsChannel.on("inference_failed", payload => {
console.error("Received inference failure alert:", payload);
// Code to show a toast notification or update an alert panel
const alertPanel = document.getElementById("alerts-panel");
if(alertPanel){
const alertDiv = document.createElement("div");
alertDiv.className = "alert alert-danger";
alertDiv.innerText = `[${payload.request_id}] FAILED: ${payload.error}`;
alertPanel.prepend(alertDiv);
}
})
alertsChannel.join()
.receive("ok", resp => { console.log("Joined public:alerts successfully", resp) })
.receive("error", resp => { console.log("Unable to join public:alerts", resp) })
export default socket
至此,整个端到端的管道已经打通。从 Java 系统产生的事件,流经 ActiveMQ,被 Python 服务消费并执行模型推理,最终结果毫秒级地呈现在 Web 仪表盘上。
方案的局限性与未来优化路径
这个架构虽然解决了核心问题,但在生产环境中,还有几个方面需要进一步加固。
背压(Backpressure)机制的缺失:
当前的数据流是纯粹的“推”模型。如果上游 Java 系统瞬间产生大量事件,Python 消费者的处理速度跟不上,消息会在 ActiveMQ 中堆积。这本身是消息队列的正常功能,但如果 Python 推送给 Phoenix 的速度超过了 Phoenix 的广播能力或前端的处理能力,可能会导致 Phoenix 服务器内存压力增大。一个更完善的方案是在 Phoenix 和 Python 之间建立一种简单的背压信号,例如当 Phoenix 负载过高时,可以通过 Channel 通知 Python 服务暂停从 ActiveMQ 拉取消息。Python 消费者的横向扩展:
当前的 Python 服务是单体实例。要提高吞吐量,我们可以利用 ActiveMQ 队列的“竞争消费者”模式,简单地启动多个 Python 消费者实例。它们会自动从同一个队列中获取消息,实现负载均衡。但这要求我们的 AI 模型是无状态的,或者其状态管理是分布式的。数据契约与序列化:
目前我们在各个环节都使用 JSON 进行数据交换,这很灵活但缺乏严格的模式校验。在大型系统中,一个微小的字段名或类型变更都可能导致整个管道中断。引入如 Protobuf 或 Avro 等二进制序列化格式,可以提供强类型约束,提高序列化/反序列化性能,并减少网络负载。可观测性:
我们虽然有日志,但缺少度量(Metrics)和追踪(Tracing)。应该在 Python 服务中集成 Prometheus 客户端,暴露处理消息的速率、模型推理的耗时分布、错误率等指标。同时,通过 OpenTelemetry 实现分布式追踪,可以跟踪单个事件从 ActiveMQ 到达最终仪表盘的完整生命周期和延迟,这对于排查性能瓶颈至关重要。