构建高吞吐量异步科学计算管道集成Solid.js Celery与读写分离数据库


我们的初始系统是一个标准的单体Web应用,用户通过界面提交参数,后端同步执行一个基于SciPy的蒙特卡洛模拟。当模拟规模较小时,一切尚可。但随着计算复杂度的增加,请求处理时间从几秒飙升到数分钟,直接导致HTTP网关超时。前端界面完全冻结,用户无法进行任何操作,也无法得知计算进度,体验极差。更糟糕的是,并发执行的几个计算任务就能轻易耗尽服务器的CPU和内存资源,拖垮整个服务。这个技术痛点是典型的计算密集型任务与Web服务耦合过深导致的系统性崩溃。

初步构想与技术选型决策

解决这个问题的核心思路是解耦。必须将耗时的计算任务从同步的HTTP请求-响应循环中剥离出去,转为异步处理。

  1. 异步任务队列: Celery
    在Python生态中,Celery是处理分布式异步任务的事实标准。它的稳定性和功能集,如任务路由、定时任务、工作流(Chains, Chords, Groups),使其成为不二之选。我们选择Redis作为其Broker和Result Backend,因为它轻量、快速,并且能很好地支持我们后续需要构建的实时消息通知机制。

  2. 前端实时反馈: Solid.js 与 WebSocket
    任务异步化后,新的问题是如何将后端任务的实时进度反馈给前端。传统的HTTP轮询方式对服务器压力大,且实时性差。Server-Sent Events (SSE) 是个不错的单向通信方案,但WebSocket提供了双向通信能力,为未来可能的交互式控制(如暂停、取消任务)预留了接口。
    在前端框架选择上,我们放弃了React。对于需要频繁、细粒度更新UI(如实时更新的进度条、数据点)的场景,Solid.js的无虚拟DOM、细粒度响应式模型能提供卓越的性能,避免不必要的组件重渲染,这正是我们所需要的。

  3. 数据持久化瓶颈: 读写分离
    模拟过程会产生海量的数据点,需要在任务执行期间持续写入数据库。同时,用户可能随时刷新页面或查看历史结果,产生大量的读请求。单一数据库实例很快就会在混合读写负载下不堪重负。引入主从复制的读写分离架构是解决此问题的经典方案。所有Celery worker节点的写操作都指向主库(Primary),而所有来自前端的数据查询请求都路由到从库(Replica)。这能有效分散数据库压力,保证写入的吞吐量和查询的响应速度。

架构设计

整个系统的核心数据流如下:

sequenceDiagram
    participant User as 用户 (Solid.js)
    participant API as 后端API (FastAPI)
    participant Redis as Redis (Broker/PubSub)
    participant Celery as Celery Worker
    participant DB_P as 主数据库 (Write)
    participant DB_R as 从数据库 (Read)

    User->>+API: POST /simulations (启动模拟)
    API->>Redis: publish(task)
    API-->>-User: { "task_id": "..." }
    
    Celery->>+Redis: fetch(task)
    Note over Celery: 执行SciPy计算
    loop 持续计算
        Celery->>DB_P: INSERT results
        Celery->>Redis: PUBLISH progress (task_id, progress)
    end
    
    User->>API: WebSocket /ws/{task_id}
    API->>Redis: SUBSCRIBE channel:{task_id}
    Note right of API: 监听进度更新
    
    Redis-->>API: on_message(progress)
    API-->>User: WSSend(progress_data)
    Note left of User: 界面实时更新
    
    User->>+API: GET /results/{task_id}
    API->>DB_R: SELECT * FROM results
    API-->>-User: { "results": [...] }

步骤化实现:代码与解析

1. 后端核心:Celery与SciPy任务

首先是Celery的配置。我们创建一个celery_app实例,并配置好Broker和Result Backend。

project/worker/celery_config.py

import os

# 使用环境变量配置,这在生产环境中是最佳实践
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")

# Celery 配置
broker_url = REDIS_URL
result_backend = REDIS_URL

# 任务序列化和接受的内容类型
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True

# 结果过期时间,例如1天
result_expires = 86400

# 任务路由,虽然此示例简单,但在复杂系统中非常有用
# task_routes = {
#     'project.worker.tasks.run_monte_carlo_simulation': {'queue': 'simulations'},
# }

project/worker/celery_instance.py

from celery import Celery

# 创建Celery应用实例
celery_app = Celery("scientific_tasks")

# 从配置文件加载配置
celery_app.config_from_object("project.worker.celery_config")

# 自动发现任务
celery_app.autodiscover_tasks(["project.worker.tasks"])

接下来是核心的计算任务。这个任务模拟一个简化的粒子衰变过程,它会分批次运行,并在每个批次结束后更新进度和保存数据。注意,任务内部直接处理了数据库写入和进度上报的逻辑。

project/worker/tasks.py

import time
import uuid
import logging
from celery import shared_task
from celery.utils.log import get_task_logger
import numpy as np
from scipy import stats
import redis
import json

from ..db.session import get_db_session
from ..db.models import SimulationResult
from .celery_instance import celery_app

# 获取 Celery 的 logger
logger = get_task_logger(__name__)
logger.setLevel(logging.INFO)

# 连接到 Redis 用于发布/订阅
# 这里的 Redis 连接应该使用连接池,并且与 Celery Broker 分开,避免干扰
redis_client = redis.Redis.from_url(celery_app.conf.broker_url, decode_responses=True)

@shared_task(bind=True)
def run_monte_carlo_simulation(self, total_particles, decay_probability, batches):
    """
    一个耗时的蒙特卡洛模拟任务。
    
    :param self: Celery 任务实例,通过 bind=True 注入
    :param total_particles: 模拟的总粒子数
    :param decay_probability: 每个时间步长内单个粒子的衰变概率
    :param batches: 将总任务拆分成多少个批次来执行
    """
    task_id = self.request.id
    logger.info(f"[{task_id}] Simulation started with {total_particles} particles and {batches} batches.")
    
    particles_per_batch = total_particles // batches
    
    # 获取数据库会话,这里依赖于我们定义的会话管理
    # 这里的session将会指向主库
    db_session = get_db_session('primary')

    try:
        for i in range(batches):
            start_time = time.time()
            
            # 使用 SciPy/NumPy 进行核心计算
            # 模拟在这一批次中衰变的粒子数,这是一个二项分布
            decayed_count = np.random.binomial(particles_per_batch, decay_probability)
            
            # 模拟衰变能量分布(例如,使用正态分布)
            energies = stats.norm.rvs(loc=100, scale=15, size=decayed_count)
            
            # 将结果批量写入数据库
            results_to_save = [
                SimulationResult(
                    task_id=task_id,
                    batch_num=i + 1,
                    energy_level=float(energy)
                ) for energy in energies
            ]
            db_session.add_all(results_to_save)
            db_session.commit()

            progress = (i + 1) / batches * 100
            
            # 更新Celery任务状态,附加自定义元数据
            meta = {
                'progress': f"{progress:.2f}",
                'current_batch': i + 1,
                'total_batches': batches,
                'batch_duration': f"{time.time() - start_time:.4f}s"
            }
            self.update_state(state='PROGRESS', meta=meta)
            
            # 通过 Redis Pub/Sub 发布实时进度
            # 频道名称与 task_id 关联
            channel = f"task_progress:{task_id}"
            redis_client.publish(channel, json.dumps(meta))
            
            logger.info(f"[{task_id}] Batch {i+1}/{batches} completed. Progress: {progress:.2f}%")

        logger.info(f"[{task_id}] Simulation completed successfully.")
        return {'status': 'Completed', 'total_particles': total_particles, 'task_id': task_id}
    except Exception as e:
        logger.error(f"[{task_id}] Simulation failed: {e}", exc_info=True)
        db_session.rollback()
        # 任务失败时也需要更新状态
        self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
        # 也可以通过Pub/Sub通知前端失败
        channel = f"task_progress:{task_id}"
        redis_client.publish(channel, json.dumps({'status': 'FAILURE', 'error': str(e)}))
        # 重新引发异常,让Celery知道任务失败了
        raise
    finally:
        db_session.close()

这里的关键点:

  • bind=True: 允许我们将任务实例 self 注入到函数中,从而可以调用 self.update_state()
  • 分批处理: 将大任务分解为小批次是至关重要的。这使得进度更新和数据持久化可以增量进行,避免了在任务结束时一次性写入大量数据导致的数据库压力和内存问题。
  • 状态更新与消息发布: 我们同时使用了 self.update_state 和 Redis Pub/Sub。update_state 是 Celery 的标准机制,用于存储最终结果和状态,可供以后查询。Redis Pub/Sub 则是一个推送机制,用于将进度实时、低延迟地广播给监听者(我们的 API 服务器)。

2. 数据层:实现读写分离

我们使用SQLAlchemy。实现读写分离的关键是创建一个自定义的Session管理器,它能根据操作类型(读/写)选择不同的数据库引擎。

project/db/database.py

import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

# 从环境变量中获取数据库连接信息
PRIMARY_DB_URL = os.getenv("PRIMARY_DB_URL", "postgresql://user:pass@localhost:5432/primary_db")
REPLICA_DB_URL = os.getenv("REPLICA_DB_URL", "postgresql://user:pass@localhost:5433/replica_db")

# 创建主库和从库的引擎
# 生产环境中需要配置连接池参数,如 pool_size, max_overflow
engines = {
    'primary': create_engine(PRIMARY_DB_URL, pool_recycle=3600),
    'replica': create_engine(REPLICA_DB_URL, pool_recycle=3600),
}

# 创建一个自定义的 Session 类,用于路由
class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # 如果是写操作(flush),或者明确指定了使用主库,则返回主库引擎
        if self._flushing or kw.get('_bind', 'primary') == 'primary':
            print("DB INFO: Routing to PRIMARY")
            return engines['primary']
        else:
            # 默认为读操作,路由到从库
            print("DB INFO: Routing to REPLICA")
            return engines['replica']

# 创建 session factory
SessionFactory = sessionmaker(class_=RoutingSession)

project/db/session.py

from .database import SessionFactory

def get_db_session(bind_key='replica'):
    """
    提供一个数据库会话。
    Celery worker 写入时应显式请求 'primary'。
    API 读取时默认使用 'replica'。
    """
    if bind_key == 'primary':
        # Celery Worker 在写入时,我们强制它使用主库的会话
        return SessionFactory(_bind='primary')
    
    # API 读取时默认使用从库
    return SessionFactory()

这种实现方式简单有效。Celery worker 在需要写入数据时,会调用 get_db_session('primary') 获取一个绑定到主库的会शिव会。FastAPI 依赖注入系统在处理读请求时,默认调用 get_db_session(),获取到的是一个会自动路由到从库的会话。

3. API层:FastAPI与WebSocket粘合剂

API层负责接收用户请求、启动Celery任务,并通过WebSocket将进度转发给前端。

project/main.py

import asyncio
import json
import logging
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from pydantic import BaseModel
import redis.asyncio as aioredis
from sqlalchemy.orm import Session

from .worker.tasks import run_monte_carlo_simulation
from .db.session import get_db_session
from .db import crud  # 假设crud模块包含了数据库查询函数

app = FastAPI()
redis_client = None

# 应用启动时,初始化异步Redis连接
@app.on_event("startup")
async def startup_event():
    global redis_client
    redis_client = aioredis.from_url("redis://localhost:6379/0", decode_responses=True)

# 应用关闭时,关闭连接
@app.on_event("shutdown")
async def shutdown_event():
    if redis_client:
        await redis_client.close()

class SimulationRequest(BaseModel):
    total_particles: int = 1_000_000
    decay_probability: float = 0.05
    batches: int = 100

@app.post("/simulations")
def start_simulation(request: SimulationRequest):
    """启动一个新的模拟任务"""
    task = run_monte_carlo_simulation.delay(
        total_particles=request.total_particles,
        decay_probability=request.decay_probability,
        batches=request.batches
    )
    return {"task_id": task.id}

@app.get("/results/{task_id}")
def get_simulation_results(task_id: str, db: Session = Depends(get_db_session)):
    """
    获取模拟结果。此请求会路由到只读副本。
    """
    # 这里 get_db_session() 默认返回连接到replica的会话
    results = crud.get_results_by_task_id(db, task_id)
    task_status = run_monte_carlo_simulation.AsyncResult(task_id)
    
    return {
        "task_id": task_id,
        "status": task_status.state,
        "progress_info": task_status.info,
        "results_count": len(results),
        # 在生产中,这里应该做分页处理,而不是返回所有结果
        "results_sample": [res.energy_level for res in results[:10]]
    }

async def redis_listener(websocket: WebSocket, task_id: str):
    """一个独立的协程,用于监听Redis Pub/Sub并推送消息"""
    channel = f"task_progress:{task_id}"
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(channel)
    try:
        while True:
            # 使用 `listen` 方法异步等待消息
            message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=None)
            if message:
                data = json.loads(message['data'])
                await websocket.send_json(data)
                # 如果任务结束或失败,可以终止监听
                if data.get('status') in ['FAILURE', 'Completed']:
                    break
    except Exception as e:
        logging.error(f"Error in Redis listener for {task_id}: {e}")
    finally:
        await pubsub.unsubscribe(channel)


@app.websocket("/ws/{task_id}")
async def websocket_endpoint(websocket: WebSocket, task_id: str):
    """WebSocket端点,用于实时通信"""
    await websocket.accept()
    
    # 启动Redis监听器任务
    listener_task = asyncio.create_task(redis_listener(websocket, task_id))
    
    try:
        # 保持连接,可以接收来自客户端的消息(例如,取消任务的指令)
        while True:
            # 这个循环是为了保持连接,实际的推送逻辑在 listener_task 中
            await websocket.receive_text() 
    except WebSocketDisconnect:
        logging.info(f"WebSocket disconnected for task {task_id}")
    finally:
        # 清理工作
        listener_task.cancel()
        try:
            await listener_task
        except asyncio.CancelledError:
            logging.info(f"Redis listener for {task_id} cancelled.")

4. 前端展现:Solid.js的响应式魔法

前端使用Solid.js来创建一个能与WebSocket通信并响应式更新UI的组件。

src/SimulationTracker.tsx

import { createSignal, onCleanup, createEffect } from 'solid-js';

const SimulationTracker = () => {
    const [taskId, setTaskId] = createSignal(null);
    const [progress, setProgress] = createSignal("0.00");
    const [status, setStatus] = createSignal("Idle");
    const [batchInfo, setBatchInfo] = createSignal("");
    const [socket, setSocket] = createSignal<WebSocket | null>(null);
    const [isSimulating, setIsSimulating] = createSignal(false);
    
    const startSimulation = async () => {
        if (isSimulating()) return;
        setIsSimulating(true);
        setStatus("Starting...");
        
        try {
            const response = await fetch('/simulations', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({
                    total_particles: 5_000_000,
                    decay_probability: 0.02,
                    batches: 200
                })
            });
            const data = await response.json();
            setTaskId(data.task_id);
        } catch (error) {
            console.error("Failed to start simulation:", error);
            setStatus("Error starting simulation");
            setIsSimulating(false);
        }
    };

    // 当 taskId 变化时,建立 WebSocket 连接
    createEffect(() => {
        const currentTaskId = taskId();
        if (!currentTaskId) return;

        // 如果存在旧的连接,先关闭
        if (socket()) {
            socket().close();
        }

        const ws = new WebSocket(`ws://localhost:8000/ws/${currentTaskId}`);
        setSocket(ws);

        ws.onopen = () => {
            console.log("WebSocket connection established.");
            setStatus("Simulation in progress...");
        };

        ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            if (data.progress) {
                setProgress(data.progress);
            }
            if(data.current_batch && data.total_batches){
                setBatchInfo(`Batch ${data.current_batch}/${data.total_batches} done in ${data.batch_duration}`);
            }
            if (data.status === 'FAILURE') {
                setStatus(`Failed: ${data.error}`);
                setIsSimulating(false);
                ws.close();
            }
        };

        ws.onclose = () => {
            console.log("WebSocket connection closed.");
            // 只有在任务还在进行中时才认为是意外关闭
            if (isSimulating() && status() !== "Completed") {
                setStatus("Connection lost. Please check results manually.");
            }
            setIsSimulating(false);
        };

        ws.onerror = (error) => {
            console.error("WebSocket error:", error);
            setStatus("Connection error.");
            setIsSimulating(false);
        };
    });

    // 组件卸载时清理WebSocket连接
    onCleanup(() => {
        if (socket()) {
            socket().close();
        }
    });

    return (
        <div class="container">
            <h1>Asynchronous Scientific Simulation</h1>
            <button onClick={startSimulation} disabled={isSimulating()}>
                {isSimulating() ? 'Simulation Running...' : 'Start New Simulation'}
            </button>
            
            {taskId() && (
                <div class="status-panel">
                    <h3>Task ID: {taskId()}</h3>
                    <p><strong>Status:</strong> {status()}</p>
                    <div class="progress-bar-container">
                        <div class="progress-bar" style={{ width: `${progress()}%` }}>
                            {progress()}%
                        </div>
                    </div>
                    <p class="batch-info">{batchInfo()}</p>
                </div>
            )}
        </div>
    );
};

export default SimulationTracker;

Solid.js的 createEffect 完美地处理了副作用。当 taskId 这个signal发生变化时,createEffect 会自动运行,清理旧的WebSocket连接并建立新的连接。UI元素(如进度条的宽度)直接绑定到signal (progress()),当WebSocket消息更新signal时,UI会自动、高效地更新,无需任何手动DOM操作。

局限性与未来迭代路径

这套架构解决了最初的性能瓶颈,但并非完美。在生产环境中,仍有几个方面值得深入优化:

  1. 读写分离的复制延迟:如果用户在任务刚写入一个批次后立刻查询结果,由于主从复制的延迟,可能无法立即看到最新的数据。对于要求强一致性的读操作,API层需要有一种机制可以强制从主库读取,但这会牺牲读写分离带来的部分优势。
  2. 任务的健壮性:当前的任务失败处理比较简单。一个生产级的系统需要为Celery任务配置更复杂的重试策略(如指数退避)、死信队列来处理无法恢复的失败任务,以及更完善的事务管理。
  3. WebSocket连接管理:随着用户量增加,API服务器需要管理成千上万的WebSocket长连接,这会消耗大量内存。可以考虑使用专门的WebSocket网关服务,或者在水平扩展API实例时,确保Redis Pub/Sub的消息能被正确路由到持有对应WebSocket连接的实例上。
  4. 工作流编排:对于更复杂的科学计算,可能涉及多个相互依赖的步骤。可以利用Celery的chainchord来编排一个复杂的有向无环图(DAG)工作流,实现更复杂的计算逻辑。

  目录