构建Saga补偿事务网关以纳管Oracle遗留系统并集成Cypress进行一致性断言


我们面临一个棘手的现实:一个承载核心业务的巨型Oracle数据库,以及一个基于Node.js构建的新微服务。业务要求一个跨越这两个系统的操作必须保持原子性——要么全部成功,要么全部回滚到初始状态。具体场景是:在新微服务中创建“合约”记录,然后必须同步扣减Oracle中某个遗留表的“配额”。直接使用两阶段提交(2PC)由于网络分区风险和对遗留系统的高侵入性,从一开始就被否决了。

数据不一致的风险是显而易见的。如果在创建合约后,扣减Oracle配额失败,我们就会凭空创造出一份无法履约的合约,这是生产事故。反之,如果先扣减配额,但创建合约失败,我们又会损失掉本应可用的业务配额。

初步的构想是引入一个轻量级的编排网关,专门负责处理这类跨系统事务。这个网关将作为Saga模式的Orchestrator(编排者),负责驱动整个流程、处理失败并执行补偿操作。这避免了让各个服务之间通过事件总线进行通信(Choreography/协同式Saga),因为我们无法改造那个庞大的Oracle系统让它发出或监听事件。一个集中的协调器是唯一务实的选择。

技术选型决策如下:

  1. Saga编排网关: 使用Node.js + TypeScript + Fastify。选择Node.js是因为其非阻塞I/O模型非常适合处理这种以网络调用为主、计算量不大的编排任务。Fastify则以其高性能和低开销著称。
  2. Saga日志存储: 使用一个独立的PostgreSQL数据库。我们需要一个可靠的地方来持久化Saga事务的每一步状态。在失败恢复时,编排器能从日志中知道哪些步骤已完成,需要执行哪些补偿操作。选择PostgreSQL而非在Oracle中创建日志表,是为了实现关注点分离,避免对核心库造成额外压力。
  3. Oracle交互: 使用官方的node-oracledb驱动。这是与Oracle交互最直接、最可靠的方式。
  4. 一致性验证: 使用Cypress进行端到端(E2E)测试。这是一个关键决策。单元测试或集成测试无法完整覆盖整个分布式流程,尤其是在模拟网络故障或服务宕机时。我们需要一个能模拟用户行为,并能“穿透”到后端数据库层面进行状态断言的工具。Cypress的cy.taskcy.intercept能力使其成为验证数据最终一致性的理想选择。

步骤化实现:从Saga定义到Cypress断言

1. 定义Saga流程与日志结构

首先,我们需要在PostgreSQL中创建Saga日志表。这个表是整个补偿机制的核心。

-- DDL for Saga Log in PostgreSQL
CREATE TABLE "saga_execution_logs" (
    "id" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    "saga_name" VARCHAR(255) NOT NULL,
    "correlation_id" UUID NOT NULL,
    "current_step" INT NOT NULL DEFAULT 0,
    "status" VARCHAR(50) NOT NULL, -- PENDING, EXECUTING, COMPENSATING, SUCCEEDED, FAILED
    "payload" JSONB NOT NULL,
    "step_logs" JSONB DEFAULT '[]'::jsonb,
    "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    "updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_saga_correlation_id ON "saga_execution_logs"("correlation_id");
CREATE INDEX idx_saga_status ON "saga_execution_logs"("status");

-- step_logs column will store an array of objects like:
-- { "step": "CREATE_CONTRACT", "status": "SUCCEEDED", "timestamp": "..." }
-- { "step": "DEDUCT_ORACLE_QUOTA", "status": "FAILED", "error": "...", "timestamp": "..." }

这里的correlation_id至关重要,它唯一标识了一次完整的业务操作。step_logs记录了每一步的执行细节,为调试和恢复提供了依据。

接着,我们用代码定义Saga的各个阶段。每个阶段都包含一个execute(执行)和一个compensate(补偿)方法。

// src/sagas/contract-creation.saga.ts

import { OracleService } from '../services/oracle.service';
import { ContractService } from '../services/contract.service';

// Payload for this specific saga
interface ContractCreationPayload {
  userId: string;
  amount: number;
  contractId?: string; // Will be populated after the first step
}

// Represents a single step in the saga
interface SagaStep {
  name: string;
  execute: (payload: ContractCreationPayload) => Promise<Partial<ContractCreationPayload>>;
  compensate: (payload: ContractCreationPayload) => Promise<void>;
}

// The complete Saga definition
export const ContractCreationSaga: SagaStep[] = [
  {
    name: 'CREATE_CONTRACT',
    execute: async (payload) => {
      // In a real project, this service would call the microservice API
      const contract = await ContractService.create(payload.userId, payload.amount);
      console.log(`Step [CREATE_CONTRACT] succeeded for user ${payload.userId}.`);
      // Pass the newly created contractId to the next step
      return { contractId: contract.id };
    },
    compensate: async (payload) => {
      if (!payload.contractId) {
        // A common trap: compensation might be called for a step that never completed.
        // The payload might not have the necessary ID. Robust checks are crucial.
        console.warn(`Compensation [CREATE_CONTRACT]: contractId not found, skipping.`);
        return;
      }
      await ContractService.delete(payload.contractId);
      console.log(`Compensation [CREATE_CONTRACT] succeeded for contract ${payload.contractId}.`);
    },
  },
  {
    name: 'DEDUCT_ORACLE_QUOTA',
    execute: async (payload) => {
      // This service encapsulates the 'node-oracledb' logic
      await OracleService.deductQuota(payload.userId, payload.amount);
      console.log(`Step [DEDUCT_ORACLE_QUOTA] succeeded for user ${payload.userId}.`);
      return {}; // No payload change
    },
    compensate: async (payload) => {
      // The compensation logic must be idempotent. 
      // Re-running it should not add the quota twice.
      // This is often handled inside the OracleService by using specific transaction logic.
      await OracleService.addQuota(payload.userId, payload.amount);
      console.log(`Compensation [DEDUCT_ORACLE_QUOTA] succeeded for user ${payload.userId}.`);
    },
  },
];

2. 构建Saga编排器网关

网关的核心是一个SagaOrchestrator服务,它负责驱动Saga的执行流程。

// src/services/saga.orchestrator.ts

import { Pool } from 'pg';
import { ContractCreationSaga } from '../sagas/contract-creation.saga';

const pgPool = new Pool({ /* ... connection config ... */ });

export class SagaOrchestrator {
  
  public async start(sagaName: string, correlationId: string, initialPayload: object) {
    // 1. Create the initial saga log entry
    await pgPool.query(
      'INSERT INTO saga_execution_logs (saga_name, correlation_id, status, payload) VALUES ($1, $2, $3, $4)',
      [sagaName, correlationId, 'PENDING', initialPayload]
    );
    // Asynchronously execute the saga to avoid blocking the initial request
    this.execute(correlationId); 
    return { status: 'Saga started', correlationId };
  }

  private async execute(correlationId: string) {
    const client = await pgPool.connect();
    try {
      // Use a transaction to ensure log updates are atomic
      await client.query('BEGIN');

      const { rows } = await client.query(
        'SELECT * FROM saga_execution_logs WHERE correlation_id = $1 FOR UPDATE',
        [correlationId]
      );
      const log = rows[0];
      
      if (!log || log.status !== 'PENDING') {
         // Already processed or in another state
        await client.query('COMMIT');
        client.release();
        return;
      }

      await client.query(
        'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE id = $2',
        ['EXECUTING', log.id]
      );
      await client.query('COMMIT');

      // The actual execution logic
      const sagaDefinition = ContractCreationSaga; // In a real app, this would be dynamically selected
      let currentPayload = log.payload;

      for (let i = 0; i < sagaDefinition.length; i++) {
        const step = sagaDefinition[i];
        try {
          // Execute the step
          const result = await step.execute(currentPayload);
          currentPayload = { ...currentPayload, ...result };
          
          // Log step success
          await this.logStepState(correlationId, step.name, 'SUCCEEDED', {});

        } catch (error) {
          console.error(`Saga step [${step.name}] failed for ${correlationId}:`, error);
          await this.logStepState(correlationId, step.name, 'FAILED', { message: (error as Error).message });
          
          // If a step fails, trigger compensation
          await this.compensate(correlationId, i, currentPayload); // Compensate up to the failed step
          client.release();
          return;
        }
      }

      // If all steps succeed
      await pgPool.query(
        'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE correlation_id = $2',
        ['SUCCEEDED', correlationId]
      );

    } catch (e) {
      console.error('Orchestrator internal error:', e);
      // Here you need a retry mechanism or move the saga to a dead-letter queue
      await client.query('ROLLBACK');
    } finally {
      if(client) client.release();
    }
  }

  private async compensate(correlationId: string, failedStepIndex: number, payload: any) {
    await pgPool.query(
      'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE correlation_id = $2',
      ['COMPENSATING', correlationId]
    );

    const sagaDefinition = ContractCreationSaga;
    // Compensate backwards from the step before the failed one
    for (let i = failedStepIndex; i >= 0; i--) {
      const step = sagaDefinition[i];
      try {
        await step.compensate(payload);
        await this.logStepState(correlationId, step.name, 'COMPENSATED', {});
      } catch (error) {
        console.error(`Compensation for step [${step.name}] failed for ${correlationId}:`, error);
        // This is a critical failure. The saga is now in a failed state and requires manual intervention.
        await pgPool.query(
          'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE correlation_id = $2',
          ['FAILED', correlationId]
        );
        return;
      }
    }
     console.log(`Saga ${correlationId} successfully compensated.`);
  }

  private async logStepState(correlationId: string, stepName: string, status: string, details: object) {
    const stepLog = {
      step: stepName,
      status: status,
      timestamp: new Date().toISOString(),
      details: details
    };
    await pgPool.query(
      `UPDATE saga_execution_logs 
       SET step_logs = step_logs || $1::jsonb, updated_at = NOW() 
       WHERE correlation_id = $2`,
      [JSON.stringify(stepLog), correlationId]
    );
  }
}

这个编排器代码是整个系统的核心,它处理了状态转换、正向执行和反向补偿。一个常见的错误是在execute方法中忘记使用数据库锁(FOR UPDATE),在高并发下可能导致同一个Saga被两个实例同时处理。

3. Oracle 交互的韧性设计

与遗留Oracle系统的交互是最脆弱的一环。这里的代码必须健壮。

// src/services/oracle.service.ts
import oracledb from 'oracledb';

// Production configuration requires careful pool management
oracledb.initOracleClient({ libDir: '/opt/oracle/instantclient_21_7' });

const dbConfig = { /* ... user, password, connectString ... */ };
let pool: oracledb.Pool;

async function getPool(): Promise<oracledb.Pool> {
  if (!pool) {
    pool = await oracledb.createPool(dbConfig);
  }
  return pool;
}

export class OracleService {
  static async deductQuota(userId: string, amount: number): Promise<void> {
    const connection = await (await getPool()).getConnection();
    try {
      // The WHERE clause ensures we don't go below zero.
      // This is a form of optimistic locking if the quota is a shared resource.
      const result = await connection.execute(
        `UPDATE USER_QUOTAS SET QUOTA = QUOTA - :amount 
         WHERE USER_ID = :userId AND QUOTA >= :amount`,
        { userId, amount },
        { autoCommit: true } // autoCommit is often discouraged, but for a single statement saga step it can simplify things.
                                // In more complex logic, manual transaction control is a must.
      );

      if (result.rowsAffected === 0) {
        // This is a business logic failure, not a technical one.
        throw new Error(`Insufficient quota for user ${userId} or user not found.`);
      }
    } catch (err) {
      console.error('Oracle deductQuota failed:', err);
      // Re-throw to let the orchestrator catch it
      throw err;
    } finally {
      if (connection) {
        await connection.close();
      }
    }
  }

  static async addQuota(userId: string, amount: number): Promise<void> {
    const connection = await (await getPool()).getConnection();
    try {
      // Compensation should be safe to re-run.
      // A simple UPDATE is generally idempotent.
      await connection.execute(
        `UPDATE USER_QUOTAS SET QUOTA = QUOTA + :amount WHERE USER_ID = :userId`,
        { userId, amount },
        { autoCommit: true }
      );
    } catch (err) {
      console.error('Oracle addQuota (compensation) failed:', err);
      // Compensation failure is critical and requires alerting.
      throw err;
    } finally {
      if (connection) {
        await connection.close();
      }
    }
  }
}

4. 使用Cypress进行端到端一致性断言

这是将架构与质量保证连接起来的桥梁。我们编写的Cypress测试不仅模拟UI点击,更重要的是,它要验证整个分布式系统的最终状态。

首先,配置Cypress使其能够与后端数据库交互。

// cypress/plugins/index.js
const { Pool } = require('pg');
const oracledb = require('oracledb');

const pgPool = new Pool({ /* ... */ });
// Oracle pool setup similar to the application

module.exports = (on, config) => {
  on('task', {
    async queryPg(query) {
      const client = await pgPool.connect();
      try {
        const res = await client.query(query);
        return res.rows;
      } finally {
        client.release();
      }
    },
    async queryOracle(query) {
       // ... logic to connect to Oracle and execute query ...
       // This is crucial for verifying the state of the legacy system
       return results;
    },
    async cleanUp(userId) {
       // Helper to reset state between tests
       // ... delete contract from new service DB ...
       // ... reset quota in Oracle ...
       return null;
    }
  });
};

现在,编写测试规约。我们将测试两种核心场景:成功路径和失败补偿路径。

// cypress/integration/contract_creation.spec.js

describe('Contract Creation Saga E2E', () => {
  const userId = 'test-user-123';
  const initialQuota = 1000;
  const deductAmount = 100;

  beforeEach(() => {
    // Reset state before each test run
    cy.task('cleanUp', userId); 
    // Seed initial state in Oracle
    cy.task('queryOracle', `INSERT INTO USER_QUOTAS (USER_ID, QUOTA) VALUES ('${userId}', ${initialQuota})`);
  });

  it('should successfully create a contract and deduct quota from Oracle', () => {
    // Action: Trigger the saga via the gateway's API endpoint
    cy.request('POST', 'http://localhost:3000/api/contracts', {
      userId,
      amount: deductAmount,
    }).then((response) => {
      expect(response.status).to.eq(200);
      const { correlationId } = response.body;
      
      // Assertions: We need to poll until the async saga completes
      cy.waitUntil(() => 
        cy.task('queryPg', `SELECT status FROM saga_execution_logs WHERE correlation_id = '${correlationId}'`)
          .then(rows => rows[0]?.status === 'SUCCEEDED'),
        { timeout: 10000, interval: 500 }
      );

      // Verify final state in the new service's DB
      cy.task('queryPg', `SELECT * FROM contracts WHERE user_id = '${userId}'`)
        .then(contracts => {
          expect(contracts).to.have.lengthOf(1);
          expect(contracts[0].amount).to.eq(deductAmount);
        });

      // **CRITICAL ASSERTION**: Verify final state in Oracle
      cy.task('queryOracle', `SELECT QUOTA FROM USER_QUOTAS WHERE USER_ID = '${userId}'`)
        .then(results => {
          expect(results[0].QUOTA).to.eq(initialQuota - deductAmount);
        });
    });
  });

  it('should trigger compensation and restore Oracle quota when a later step fails', () => {
    // Setup: We'll use cy.intercept to simulate a failure of a hypothetical "notification" service
    // which would be the third step in the saga.
    cy.intercept('POST', '/api/notifications', { statusCode: 500 }).as('failNotification');

    // Action
    cy.request({
      method: 'POST',
      url: 'http://localhost:3000/api/contracts',
      body: { userId, amount: deductAmount },
      failOnStatusCode: false // Prevent Cypress from failing the test on a non-2xx response
    }).then((response) => {
      const { correlationId } = response.body;

      // Assertions: Poll until the saga is fully compensated or failed
       cy.waitUntil(() => 
        cy.task('queryPg', `SELECT status, step_logs FROM saga_execution_logs WHERE correlation_id = '${correlationId}'`)
          .then(rows => {
              const status = rows[0]?.status;
              const logs = rows[0]?.step_logs;
              // Check that compensation was triggered correctly
              const compensatedStep = logs.find(log => log.step === 'DEDUCT_ORACLE_QUOTA' && log.status === 'COMPENSATED');
              return (status === 'COMPENSATING' || status === 'FAILED') && compensatedStep;
          }),
        { timeout: 10000, interval: 500 }
      );

      // Verify the contract was deleted (compensation for the first step)
      cy.task('queryPg', `SELECT * FROM contracts WHERE user_id = '${userId}'`)
        .then(contracts => {
          expect(contracts).to.have.lengthOf(0);
        });

      // **CRITICAL ASSERTION**: Verify Oracle quota was restored
      cy.task('queryOracle', `SELECT QUOTA FROM USER_QUOTAS WHERE USER_ID = '${userId}'`)
        .then(results => {
          expect(results[0].QUOTA).to.eq(initialQuota);
        });
    });
  });
});

最终成果与架构图

通过这个流程,我们构建了一个独立的、可重用的Saga编排网关。它将复杂的分布式事务逻辑与核心业务服务解耦。当新的跨系统事务需求出现时,我们只需要定义新的Saga流程,而无需改动现有服务的代码。

sequenceDiagram
    participant Client
    participant Saga Gateway
    participant Contract Service
    participant Oracle DB
    participant Saga Log (Postgres)

    Client->>+Saga Gateway: POST /api/contracts (userId, amount)
    Saga Gateway->>+Saga Log (Postgres): CREATE log (status: PENDING)
    Saga Gateway-->>-Client: 200 OK (correlationId)

    Note right of Saga Gateway: Async Execution Starts
    Saga Gateway->>Saga Log (Postgres): UPDATE log (status: EXECUTING)
    
    Saga Gateway->>+Contract Service: CreateContract(userId, amount)
    Contract Service-->>-Saga Gateway: contractId
    Saga Gateway->>Saga Log (Postgres): LOG step 'CREATE_CONTRACT' SUCCEEDED

    Saga Gateway->>+Oracle DB: UPDATE USER_QUOTAS SET QUOTA = ...
    Oracle DB-->>-Saga Gateway: OK
    Saga Gateway->>Saga Log (Postgres): LOG step 'DEDUCT_ORACLE_QUOTA' SUCCEEDED

    Saga Gateway->>Saga Log (Postgres): UPDATE log (status: SUCCEEDED)

    %% ---- Compensation Path ---- %%
    alt Failure during Oracle deduct
        Client->>+Saga Gateway: POST /api/contracts
        Saga Gateway->>+Saga Log (Postgres): CREATE log (status: PENDING)
        Saga Gateway-->>-Client: 200 OK

        Saga Gateway->>+Contract Service: CreateContract(...)
        Contract Service-->>-Saga Gateway: contractId
        Saga Gateway->>Saga Log (Postgres): LOG step 'CREATE_CONTRACT' SUCCEEDED

        Saga Gateway->>+Oracle DB: UPDATE USER_QUOTAS...
        Oracle DB-->>-Saga Gateway: Fails (e.g., lock timeout)
        Saga Gateway->>Saga Log (Postgres): LOG step 'DEDUCT_ORACLE_QUOTA' FAILED
        Saga Gateway->>Saga Log (Postgres): UPDATE log (status: COMPENSATING)

        Note right of Saga Gateway: Compensation logic triggered
        Saga Gateway->>+Contract Service: DeleteContract(contractId)
        Contract Service-->>-Saga Gateway: OK
        Saga Gateway->>Saga Log (Postgres): LOG step 'CREATE_CONTRACT' COMPENSATED
    end

这个方案的当前实现并非没有局限性。首先,Saga编排网关本身是一个单点,生产环境需要部署为高可用集群,并依赖PostgreSQL的锁机制来保证同一时间只有一个实例处理一个Saga。其次,补偿逻辑的失败(例如,addQuota也失败了)会导致事务进入一个需要人工干预的“毒丸”状态,必须建立配套的监控告警系统来捕获这类事件。最后,随着Saga流程变多,动态加载和管理Saga定义会成为一个新的挑战,可能需要引入更复杂的Saga定义存储和版本控制机制。未来的迭代方向将聚焦于提升网关的弹性和可观测性。


  目录