使用 Rust 与 WASI 为 Micronaut 应用构建高性能且安全的插件系统


我们的核心业务运行在一个基于 Micronaut 的 JVM 服务上,一直很稳定。但最近一个需求打破了这种平静:我们需要允许第三方合作伙伴动态注入自定义的数据处理逻辑。最初的讨论很直接,要么为每个合作伙伴部署一个独立的微服务,要么硬着头皮上 JNI。前者运维成本太高,网络开销和服务发现的复杂性让我们望而却步;后者则像是在雷区里跳舞,任何一个内存错误都可能导致整个 JVM 崩溃,这是生产环境绝对无法接受的。

我们需要一个方案,既能隔离第三方代码,保证主服务的稳定性,又能提供接近原生的性能,避免引入过高的延迟。这个技术痛点,将我们引向了 WebAssembly (WASM) 和 WASI (WebAssembly System Interface)。

初步构想与技术选型

我们的构想是构建一个插件化的执行引擎。Micronaut 应用作为“宿主 (Host)”,负责加载和管理插件生命周期。插件本身则是编译成 WASM 格式的独立模块,在沙箱环境中运行。这种架构的核心优势在于:

  1. 安全性: WASM 默认在内存沙箱中运行,无法直接访问宿主内存或系统资源,从根本上杜绝了插件代码影响主应用稳定性的可能性。
  2. 高性能: WASM 是一种二进制指令格式,可以被即时 (JIT) 或提前 (AOT) 编译成原生机器码,执行效率远高于解释型语言。
  3. 语言无关: 任何能编译到 WASM 的语言都可以用来编写插件。

基于此,我们的技术栈选型也逐渐清晰:

  • 宿主应用: **Micronaut (Java)**。我们现有的技术栈,其依赖注入和 AOT 编译能力能提供快速的启动和较低的内存占用。
  • 插件语言: Rust。它无 GC、内存安全,且对 WASM 有一流的支持。用 Rust 编写性能敏感的插件,可以避免 JVM 和插件双重 GC 带来的不可预测的停顿。
  • WASM 运行时: Wasmtime。由字节码联盟 (Bytecode Alliance) 支持,性能出色,并且提供了稳定的 Java API,便于在 Micronaut 中集成。
  • 插件存储: **MongoDB (文档型 NoSQL)**。用于存储插件的元数据甚至 Wasm 二进制文件本身,其灵活的 Schema 非常适合这种场景。
  • 管理前端: React。构建一个简单的内部管理界面,用于上传、启用或禁用插件。

架构与执行流程

整个系统的核心交互流程可以被可视化,它清晰地展示了从前端请求到插件执行的完整路径。

sequenceDiagram
    participant FE as React Admin UI
    participant BE as Micronaut Host
    participant Wasm as Wasmtime Runtime
    participant Plugin as Rust Plugin (WASM)
    participant DB as MongoDB

    FE->>+BE: POST /api/plugins/upload (plugin.wasm)
    BE->>+DB: 存储插件二进制文件及元数据
    DB-->>-BE: 返回存储结果
    BE-->>-FE: 上传成功

    Note over BE: 服务启动或按需加载时
    BE->>DB: 查询可用的插件
    DB->>BE: 返回插件列表
    BE->>Wasm: 加载 Wasm 模块并实例化
    Wasm->>BE: 准备就绪的插件实例

    participant Client as API Client
    Client->>+BE: POST /api/process/{pluginName} (requestData)
    BE->>BE: 定位已加载的插件实例
    BE->>+Wasm: 1. 分配共享内存
    BE->>+Wasm: 2. 将 requestData 写入共享内存
    BE->>+Plugin: 3. 调用导出的 `process` 函数 (传递内存指针和长度)
    Plugin->>Plugin: 执行核心处理逻辑
    Plugin->>+Wasm: 4. 将结果写入共享内存
    Plugin-->>-BE: 5. 返回结果的指针和长度
    Wasm-->>-BE: 
    BE->>+Wasm: 6. 从共享内存读取结果
    Wasm-->>-BE: 
    BE-->>-Client: 返回处理结果

步骤化实现:从宿主到插件

我们将整个实现过程分解为几个关键步骤,并展示核心代码。

1. Micronaut 宿主环境搭建

首先,我们需要在 Micronaut 项目中引入 Wasmtime 的 Java 绑定。

build.gradle.kts

// ... other dependencies
implementation("io.micronaut.data:micronaut-data-mongodb")
implementation("io.wasmtime:wasmtime-java:6.0.0")
// ...

接下来,我们创建一个 PluginExecutorService,这是整个插件系统的核心。它负责加载、管理和执行 WASM 插件。

PluginExecutorService.java

package com.example.host;

import io.micronaut.context.annotation.Context;
import io.micronaut.core.annotation.NonNull;
import io.wasmtime.*;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

@Singleton
@Context // Eagerly initialized on startup
public class PluginExecutorService {

    private static final Logger LOG = LoggerFactory.getLogger(PluginExecutorService.class);

    // Wasmtime 核心对象,Engine 是线程安全的,可以共享
    private final Engine engine = Engine.create();
    private final Map<String, Module> compiledModules = new ConcurrentHashMap<>();

    /**
     * 加载并编译一个 WASM 插件。
     * 在真实项目中,`wasmBytes` 会从数据库或文件系统加载。
     * 这里我们只做编译和缓存,实例化是执行时的事情。
     * @param name 插件的唯一标识
     * @param wasmBytes 插件的 .wasm 文件字节码
     */
    public void loadAndCompilePlugin(@NonNull String name, @NonNull byte[] wasmBytes) {
        try {
            LOG.info("Compiling plugin: {}", name);
            Module module = Module.compile(engine, wasmBytes);
            compiledModules.put(name, module);
        } catch (WasmtimeException e) {
            LOG.error("Failed to compile plugin: {}", name, e);
            // 生产环境中应有更完善的错误处理策略
        }
    }

    /**
     * 执行指定的插件。
     * 这是与 WASM 交互的核心所在。
     * @param pluginName 要执行的插件名
     * @param inputJson 输入的 JSON 字符串
     * @return 插件处理后的输出 JSON 字符串
     */
    public Optional<String> executePlugin(@NonNull String pluginName, @NonNull String inputJson) {
        Module module = compiledModules.get(pluginName);
        if (module == null) {
            LOG.warn("Plugin not found: {}", pluginName);
            return Optional.empty();
        }

        // 使用 try-with-resources 确保 Wasmtime 资源被正确释放
        // Store, Linker, Instance 都不是线程安全的,每次执行都需要创建
        try (Store<Void> store = Store.create(engine, null);
             Linker linker = new Linker(engine)) {

            // WASI 的标准配置,允许插件使用 stdout 等基本功能
            WasiCtx wasi = new WasiCtxBuilder().inheritStdout().inheritStderr().build();
            linker.wasmtimeWasi(wasi);

            // 实例化模块
            Instance instance = linker.instantiate(store, module);
            Memory memory = instance.getMemory(store, "memory")
                    .orElseThrow(() -> new RuntimeException("WASM module must export a memory."));

            // 1. 在 WASM 内存中为输入数据分配空间
            // 我们需要调用插件导出的 `allocate` 函数来实现
            Function allocFunc = instance.getFunction(store, "allocate")
                    .orElseThrow(() -> new RuntimeException("Function 'allocate' not found in plugin."));
            
            byte[] inputBytes = inputJson.getBytes(StandardCharsets.UTF_8);
            // 调用 allocate 获取内存指针
            Val[] results = allocFunc.call(store, Val.fromI32(inputBytes.length));
            if (results.length == 0 || results[0].getType() != Val.Type.I32) {
                throw new RuntimeException("allocate function did not return a valid pointer.");
            }
            int inputPtr = results[0].i32();

            // 2. 将输入数据写入 WASM 的共享内存
            ByteBuffer buffer = memory.buffer(store);
            buffer.position(inputPtr);
            buffer.put(inputBytes);
            
            // 3. 调用插件的核心处理函数
            Function processFunc = instance.getFunction(store, "process")
                    .orElseThrow(() -> new RuntimeException("Function 'process' not found in plugin."));
            
            // `process` 函数返回一个 64 位整数,高 32 位是结果指针,低 32 位是结果长度
            Val[] processResults = processFunc.call(store, Val.fromI32(inputPtr), Val.fromI32(inputBytes.length));
            if (processResults.length == 0 || processResults[0].getType() != Val.Type.I64) {
                 throw new RuntimeException("process function did not return a valid result pack.");
            }
            long resultPack = processResults[0].i64();
            int resultPtr = (int) (resultPack >> 32);
            int resultLen = (int) resultPack;
            
            // 4. 从共享内存中读取结果
            byte[] outputBytes = new byte[resultLen];
            buffer.position(resultPtr);
            buffer.get(outputBytes);
            
            // 5. 释放 WASM 内部为输入和输出分配的内存
            Function deallocFunc = instance.getFunction(store, "deallocate")
                .orElseThrow(() -> new RuntimeException("Function 'deallocate' not found in plugin."));
            deallocFunc.call(store, Val.fromI32(inputPtr), Val.fromI32(inputBytes.length));
            deallocFunc.call(store, Val.fromI32(resultPtr), Val.fromI32(resultLen));

            return Optional.of(new String(outputBytes, StandardCharsets.UTF_8));

        } catch (WasmtimeException e) {
            LOG.error("Error executing plugin: {}", pluginName, e);
            return Optional.empty();
        }
    }
}

这段代码是整个系统的核心。它演示了与 WASM 模块之间最关键的交互:通过共享内存进行数据交换。我们没有直接传递复杂的 Java 对象,而是约定了一套基于指针和长度的底层通信协议。插件必须导出 allocate, deallocate, 和 process 这三个函数,这是宿主与插件之间的契apropos。

2. Rust 插件的开发

现在我们切换到 Rust 侧,编写一个符合上述契约的插件。

Cargo.toml

[package]
name = "data-transformer-plugin"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

[lib]
crate-type = ["cdylib"]

cdylib 告诉 Rust 编译器我们正在创建一个可以被其他语言链接的动态库,这对于生成 WASM 模块至关重要。

src/lib.rs

use std::mem;
use std::os::raw::c_void;
use serde::{Deserialize, Serialize};

// 定义输入和输出的数据结构
#[derive(Serialize, Deserialize)]
struct InputData {
    user_id: u32,
    event_type: String,
    payload: serde_json::Value,
}

#[derive(Serialize, Deserialize)]
struct OutputData {
    user_id: u32,
    processed: bool,
    timestamp: u64,
    summary: String,
}

/// 全局分配器是 Rust 与外部世界交互内存的标准方式。
/// Wasmtime/WASI 会提供底层的内存管理。
/// 我们需要导出一个 C ABI 兼容的函数来分配内存。
#[no_mangle]
pub extern "C" fn allocate(size: usize) -> *mut c_void {
    let mut buffer = Vec::with_capacity(size);
    let ptr = buffer.as_mut_ptr();
    // 阻止 Rust 在函数结束时释放这块内存,因为所有权已经转移给了调用方(宿主)
    mem::forget(buffer); 
    ptr
}

/// 导出 deallocate 函数,让宿主可以归还内存所有权,由 Rust 的内存管理器回收
#[no_mangle]
pub extern "C" fn deallocate(ptr: *mut c_void, size: usize) {
    unsafe {
        let _ = Vec::from_raw_parts(ptr, 0, size);
    }
}

/// 插件的核心逻辑
/// 接收一个指向输入数据的指针和其长度
/// 返回一个打包了结果指针和长度的 i64
#[no_mangle]
pub extern "C" fn process(ptr: *mut u8, len: usize) -> i64 {
    // 1. 从共享内存中安全地读取输入数据
    let input_bytes = unsafe { std::slice::from_raw_parts(ptr, len) };
    let input_str = std::str::from_utf8(input_bytes).expect("Invalid UTF-8 from host");

    // 2. 反序列化并执行业务逻辑
    let input: InputData = match serde_json::from_str(input_str) {
        Ok(data) => data,
        Err(_) => {
            // 在真实的错误处理中,应该返回一个表示错误的特定值
            return 0; // 简化处理
        }
    };
    
    let summary = format!(
        "Processed event '{}' for user {}",
        input.event_type, input.user_id
    );

    let output = OutputData {
        user_id: input.user_id,
        processed: true,
        timestamp: std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs(),
        summary,
    };
    
    // 3. 序列化结果并将其写入一块新的内存区域
    let output_bytes = serde_json::to_vec(&output).unwrap();
    let result_ptr = output_bytes.as_ptr() as *mut c_void;
    let result_len = output_bytes.len();
    
    // 同样,阻止 Rust 释放这块内存,因为宿主需要读取它
    mem::forget(output_bytes);

    // 4. 将指针和长度打包成一个 i64 返回
    // 指针在高 32 位,长度在低 32 位
    ((result_ptr as i64) << 32) | (result_len as i64)
}

编译这个 Rust 项目:

cargo build --target wasm32-wasi --release

这将在 target/wasm32-wasi/release/ 目录下生成 data_transformer_plugin.wasm 文件。这个文件就是我们的插件。

3. 数据持久化与前端交互

在 Micronaut 端,我们使用 Micronaut Data MongoDB 来管理插件。

PluginEntity.java

package com.example.host.data;

import io.micronaut.data.annotation.GeneratedValue;
import io.micronaut.data.annotation.Id;
import io.micronaut.data.annotation.MappedEntity;

@MappedEntity
public class PluginEntity {
    @Id
    @GeneratedValue
    private String id;
    private String name;
    private String version;
    private boolean enabled;
    private byte[] wasmBinary;
    // getters and setters...
}

PluginRepository.java

package com.example.host.data;

import io.micronaut.data.mongodb.repository.MongoRepository;
import java.util.List;

public interface PluginRepository extends MongoRepository<PluginEntity, String> {
    List<PluginEntity> findByEnabled(boolean enabled);
}

在应用启动时,我们可以加载所有启用的插件:

// In a service that depends on PluginRepository and PluginExecutorService
@EventListener
public void onStartup(StartupEvent event) {
    LOG.info("Loading enabled plugins from database...");
    pluginRepository.findByEnabled(true).forEach(plugin -> {
        LOG.info("Loading plugin: {} version {}", plugin.getName(), plugin.getVersion());
        pluginExecutorService.loadAndCompilePlugin(plugin.getName(), plugin.getWasmBinary());
    });
}

一个简单的 React 前端可以提供一个文件上传接口,调用 Micronaut 的 Controller 将 .wasm 文件和元数据保存到 MongoDB。

// A simple React component for uploading a plugin
const PluginUploader = () => {
  const handleUpload = async (event) => {
    const file = event.target.files[0];
    if (!file) return;

    const formData = new FormData();
    formData.append('name', 'new-plugin-from-ui');
    formData.append('version', '1.0.0');
    formData.append('file', file);

    try {
      const response = await fetch('/api/plugins/upload', {
        method: 'POST',
        body: formData,
      });
      if (response.ok) {
        alert('Plugin uploaded successfully!');
      } else {
        alert('Upload failed.');
      }
    } catch (error) {
      console.error('Error uploading plugin:', error);
    }
  };

  return (
    <div>
      <h2>Upload WASM Plugin</h2>
      <input type="file" accept=".wasm" onChange={handleUpload} />
    </div>
  );
};

这个流程闭环了插件的整个生命周期:从开发、编译,到通过管理界面上传,再到被宿主应用加载并执行。

遗留问题与未来迭代路径

这个架构虽然解决了核心的安全和性能问题,但在真实生产环境中,它并非银弹。

首先,宿主和插件之间的通信机制目前还比较原始。通过共享内存传递序列化后的 JSON 字符串,意味着双方都有序列化和反序列化的开销。对于性能要求极高的场景,可以考虑使用更高效的二进制序列化格式,如 Protocol Buffers 或 FlatBuffers。更进一步,可以探索 Wasmtime 对 WebAssembly Component Model 的支持,它旨在提供更高级别、类型安全的跨语言接口,从而摆脱手动管理内存的复杂性。

其次,插件的“冷启动”是一个需要考虑的问题。从数据库加载 WASM 字节码,然后由 Wasmtime JIT 编译成机器码,这个过程会产生一定的延迟。对于需要处理突发流量的系统,可以设计一套插件预热或池化机制,在流量到达前就准备好一定数量的插件实例。

最后,可观测性是一个挑战。当插件内部发生错误或性能下降时,我们如何有效地进行调试和监控?目前我们只能依赖插件通过 WASI 写入的 stdout/stderr,但这远远不够。未来的迭代需要将宿主的可观测性能力(如分布式追踪、指标监控)“注入”到沙箱环境中,让插件可以调用宿主提供的函数来报告自身的健康状况。这需要更精细的 Linker 配置和接口设计。

尽管存在这些待优化的点,但 Micronaut 结合 Rust 与 WASI 的方案,为在 JVM 生态中安全、高效地执行外部逻辑提供了一个强大且现代的范例。它在隔离性、性能和多语言支持之间取得了出色的平衡。


  目录