我们面临一个棘手的现实:一个承载核心业务的巨型Oracle数据库,以及一个基于Node.js构建的新微服务。业务要求一个跨越这两个系统的操作必须保持原子性——要么全部成功,要么全部回滚到初始状态。具体场景是:在新微服务中创建“合约”记录,然后必须同步扣减Oracle中某个遗留表的“配额”。直接使用两阶段提交(2PC)由于网络分区风险和对遗留系统的高侵入性,从一开始就被否决了。
数据不一致的风险是显而易见的。如果在创建合约后,扣减Oracle配额失败,我们就会凭空创造出一份无法履约的合约,这是生产事故。反之,如果先扣减配额,但创建合约失败,我们又会损失掉本应可用的业务配额。
初步的构想是引入一个轻量级的编排网关,专门负责处理这类跨系统事务。这个网关将作为Saga模式的Orchestrator(编排者),负责驱动整个流程、处理失败并执行补偿操作。这避免了让各个服务之间通过事件总线进行通信(Choreography/协同式Saga),因为我们无法改造那个庞大的Oracle系统让它发出或监听事件。一个集中的协调器是唯一务实的选择。
技术选型决策如下:
- Saga编排网关: 使用Node.js + TypeScript + Fastify。选择Node.js是因为其非阻塞I/O模型非常适合处理这种以网络调用为主、计算量不大的编排任务。Fastify则以其高性能和低开销著称。
- Saga日志存储: 使用一个独立的PostgreSQL数据库。我们需要一个可靠的地方来持久化Saga事务的每一步状态。在失败恢复时,编排器能从日志中知道哪些步骤已完成,需要执行哪些补偿操作。选择PostgreSQL而非在Oracle中创建日志表,是为了实现关注点分离,避免对核心库造成额外压力。
- Oracle交互: 使用官方的
node-oracledb
驱动。这是与Oracle交互最直接、最可靠的方式。 - 一致性验证: 使用Cypress进行端到端(E2E)测试。这是一个关键决策。单元测试或集成测试无法完整覆盖整个分布式流程,尤其是在模拟网络故障或服务宕机时。我们需要一个能模拟用户行为,并能“穿透”到后端数据库层面进行状态断言的工具。Cypress的
cy.task
和cy.intercept
能力使其成为验证数据最终一致性的理想选择。
步骤化实现:从Saga定义到Cypress断言
1. 定义Saga流程与日志结构
首先,我们需要在PostgreSQL中创建Saga日志表。这个表是整个补偿机制的核心。
-- DDL for Saga Log in PostgreSQL
CREATE TABLE "saga_execution_logs" (
"id" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"saga_name" VARCHAR(255) NOT NULL,
"correlation_id" UUID NOT NULL,
"current_step" INT NOT NULL DEFAULT 0,
"status" VARCHAR(50) NOT NULL, -- PENDING, EXECUTING, COMPENSATING, SUCCEEDED, FAILED
"payload" JSONB NOT NULL,
"step_logs" JSONB DEFAULT '[]'::jsonb,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_saga_correlation_id ON "saga_execution_logs"("correlation_id");
CREATE INDEX idx_saga_status ON "saga_execution_logs"("status");
-- step_logs column will store an array of objects like:
-- { "step": "CREATE_CONTRACT", "status": "SUCCEEDED", "timestamp": "..." }
-- { "step": "DEDUCT_ORACLE_QUOTA", "status": "FAILED", "error": "...", "timestamp": "..." }
这里的correlation_id
至关重要,它唯一标识了一次完整的业务操作。step_logs
记录了每一步的执行细节,为调试和恢复提供了依据。
接着,我们用代码定义Saga的各个阶段。每个阶段都包含一个execute
(执行)和一个compensate
(补偿)方法。
// src/sagas/contract-creation.saga.ts
import { OracleService } from '../services/oracle.service';
import { ContractService } from '../services/contract.service';
// Payload for this specific saga
interface ContractCreationPayload {
userId: string;
amount: number;
contractId?: string; // Will be populated after the first step
}
// Represents a single step in the saga
interface SagaStep {
name: string;
execute: (payload: ContractCreationPayload) => Promise<Partial<ContractCreationPayload>>;
compensate: (payload: ContractCreationPayload) => Promise<void>;
}
// The complete Saga definition
export const ContractCreationSaga: SagaStep[] = [
{
name: 'CREATE_CONTRACT',
execute: async (payload) => {
// In a real project, this service would call the microservice API
const contract = await ContractService.create(payload.userId, payload.amount);
console.log(`Step [CREATE_CONTRACT] succeeded for user ${payload.userId}.`);
// Pass the newly created contractId to the next step
return { contractId: contract.id };
},
compensate: async (payload) => {
if (!payload.contractId) {
// A common trap: compensation might be called for a step that never completed.
// The payload might not have the necessary ID. Robust checks are crucial.
console.warn(`Compensation [CREATE_CONTRACT]: contractId not found, skipping.`);
return;
}
await ContractService.delete(payload.contractId);
console.log(`Compensation [CREATE_CONTRACT] succeeded for contract ${payload.contractId}.`);
},
},
{
name: 'DEDUCT_ORACLE_QUOTA',
execute: async (payload) => {
// This service encapsulates the 'node-oracledb' logic
await OracleService.deductQuota(payload.userId, payload.amount);
console.log(`Step [DEDUCT_ORACLE_QUOTA] succeeded for user ${payload.userId}.`);
return {}; // No payload change
},
compensate: async (payload) => {
// The compensation logic must be idempotent.
// Re-running it should not add the quota twice.
// This is often handled inside the OracleService by using specific transaction logic.
await OracleService.addQuota(payload.userId, payload.amount);
console.log(`Compensation [DEDUCT_ORACLE_QUOTA] succeeded for user ${payload.userId}.`);
},
},
];
2. 构建Saga编排器网关
网关的核心是一个SagaOrchestrator
服务,它负责驱动Saga的执行流程。
// src/services/saga.orchestrator.ts
import { Pool } from 'pg';
import { ContractCreationSaga } from '../sagas/contract-creation.saga';
const pgPool = new Pool({ /* ... connection config ... */ });
export class SagaOrchestrator {
public async start(sagaName: string, correlationId: string, initialPayload: object) {
// 1. Create the initial saga log entry
await pgPool.query(
'INSERT INTO saga_execution_logs (saga_name, correlation_id, status, payload) VALUES ($1, $2, $3, $4)',
[sagaName, correlationId, 'PENDING', initialPayload]
);
// Asynchronously execute the saga to avoid blocking the initial request
this.execute(correlationId);
return { status: 'Saga started', correlationId };
}
private async execute(correlationId: string) {
const client = await pgPool.connect();
try {
// Use a transaction to ensure log updates are atomic
await client.query('BEGIN');
const { rows } = await client.query(
'SELECT * FROM saga_execution_logs WHERE correlation_id = $1 FOR UPDATE',
[correlationId]
);
const log = rows[0];
if (!log || log.status !== 'PENDING') {
// Already processed or in another state
await client.query('COMMIT');
client.release();
return;
}
await client.query(
'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE id = $2',
['EXECUTING', log.id]
);
await client.query('COMMIT');
// The actual execution logic
const sagaDefinition = ContractCreationSaga; // In a real app, this would be dynamically selected
let currentPayload = log.payload;
for (let i = 0; i < sagaDefinition.length; i++) {
const step = sagaDefinition[i];
try {
// Execute the step
const result = await step.execute(currentPayload);
currentPayload = { ...currentPayload, ...result };
// Log step success
await this.logStepState(correlationId, step.name, 'SUCCEEDED', {});
} catch (error) {
console.error(`Saga step [${step.name}] failed for ${correlationId}:`, error);
await this.logStepState(correlationId, step.name, 'FAILED', { message: (error as Error).message });
// If a step fails, trigger compensation
await this.compensate(correlationId, i, currentPayload); // Compensate up to the failed step
client.release();
return;
}
}
// If all steps succeed
await pgPool.query(
'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE correlation_id = $2',
['SUCCEEDED', correlationId]
);
} catch (e) {
console.error('Orchestrator internal error:', e);
// Here you need a retry mechanism or move the saga to a dead-letter queue
await client.query('ROLLBACK');
} finally {
if(client) client.release();
}
}
private async compensate(correlationId: string, failedStepIndex: number, payload: any) {
await pgPool.query(
'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE correlation_id = $2',
['COMPENSATING', correlationId]
);
const sagaDefinition = ContractCreationSaga;
// Compensate backwards from the step before the failed one
for (let i = failedStepIndex; i >= 0; i--) {
const step = sagaDefinition[i];
try {
await step.compensate(payload);
await this.logStepState(correlationId, step.name, 'COMPENSATED', {});
} catch (error) {
console.error(`Compensation for step [${step.name}] failed for ${correlationId}:`, error);
// This is a critical failure. The saga is now in a failed state and requires manual intervention.
await pgPool.query(
'UPDATE saga_execution_logs SET status = $1, updated_at = NOW() WHERE correlation_id = $2',
['FAILED', correlationId]
);
return;
}
}
console.log(`Saga ${correlationId} successfully compensated.`);
}
private async logStepState(correlationId: string, stepName: string, status: string, details: object) {
const stepLog = {
step: stepName,
status: status,
timestamp: new Date().toISOString(),
details: details
};
await pgPool.query(
`UPDATE saga_execution_logs
SET step_logs = step_logs || $1::jsonb, updated_at = NOW()
WHERE correlation_id = $2`,
[JSON.stringify(stepLog), correlationId]
);
}
}
这个编排器代码是整个系统的核心,它处理了状态转换、正向执行和反向补偿。一个常见的错误是在execute
方法中忘记使用数据库锁(FOR UPDATE
),在高并发下可能导致同一个Saga被两个实例同时处理。
3. Oracle 交互的韧性设计
与遗留Oracle系统的交互是最脆弱的一环。这里的代码必须健壮。
// src/services/oracle.service.ts
import oracledb from 'oracledb';
// Production configuration requires careful pool management
oracledb.initOracleClient({ libDir: '/opt/oracle/instantclient_21_7' });
const dbConfig = { /* ... user, password, connectString ... */ };
let pool: oracledb.Pool;
async function getPool(): Promise<oracledb.Pool> {
if (!pool) {
pool = await oracledb.createPool(dbConfig);
}
return pool;
}
export class OracleService {
static async deductQuota(userId: string, amount: number): Promise<void> {
const connection = await (await getPool()).getConnection();
try {
// The WHERE clause ensures we don't go below zero.
// This is a form of optimistic locking if the quota is a shared resource.
const result = await connection.execute(
`UPDATE USER_QUOTAS SET QUOTA = QUOTA - :amount
WHERE USER_ID = :userId AND QUOTA >= :amount`,
{ userId, amount },
{ autoCommit: true } // autoCommit is often discouraged, but for a single statement saga step it can simplify things.
// In more complex logic, manual transaction control is a must.
);
if (result.rowsAffected === 0) {
// This is a business logic failure, not a technical one.
throw new Error(`Insufficient quota for user ${userId} or user not found.`);
}
} catch (err) {
console.error('Oracle deductQuota failed:', err);
// Re-throw to let the orchestrator catch it
throw err;
} finally {
if (connection) {
await connection.close();
}
}
}
static async addQuota(userId: string, amount: number): Promise<void> {
const connection = await (await getPool()).getConnection();
try {
// Compensation should be safe to re-run.
// A simple UPDATE is generally idempotent.
await connection.execute(
`UPDATE USER_QUOTAS SET QUOTA = QUOTA + :amount WHERE USER_ID = :userId`,
{ userId, amount },
{ autoCommit: true }
);
} catch (err) {
console.error('Oracle addQuota (compensation) failed:', err);
// Compensation failure is critical and requires alerting.
throw err;
} finally {
if (connection) {
await connection.close();
}
}
}
}
4. 使用Cypress进行端到端一致性断言
这是将架构与质量保证连接起来的桥梁。我们编写的Cypress测试不仅模拟UI点击,更重要的是,它要验证整个分布式系统的最终状态。
首先,配置Cypress使其能够与后端数据库交互。
// cypress/plugins/index.js
const { Pool } = require('pg');
const oracledb = require('oracledb');
const pgPool = new Pool({ /* ... */ });
// Oracle pool setup similar to the application
module.exports = (on, config) => {
on('task', {
async queryPg(query) {
const client = await pgPool.connect();
try {
const res = await client.query(query);
return res.rows;
} finally {
client.release();
}
},
async queryOracle(query) {
// ... logic to connect to Oracle and execute query ...
// This is crucial for verifying the state of the legacy system
return results;
},
async cleanUp(userId) {
// Helper to reset state between tests
// ... delete contract from new service DB ...
// ... reset quota in Oracle ...
return null;
}
});
};
现在,编写测试规约。我们将测试两种核心场景:成功路径和失败补偿路径。
// cypress/integration/contract_creation.spec.js
describe('Contract Creation Saga E2E', () => {
const userId = 'test-user-123';
const initialQuota = 1000;
const deductAmount = 100;
beforeEach(() => {
// Reset state before each test run
cy.task('cleanUp', userId);
// Seed initial state in Oracle
cy.task('queryOracle', `INSERT INTO USER_QUOTAS (USER_ID, QUOTA) VALUES ('${userId}', ${initialQuota})`);
});
it('should successfully create a contract and deduct quota from Oracle', () => {
// Action: Trigger the saga via the gateway's API endpoint
cy.request('POST', 'http://localhost:3000/api/contracts', {
userId,
amount: deductAmount,
}).then((response) => {
expect(response.status).to.eq(200);
const { correlationId } = response.body;
// Assertions: We need to poll until the async saga completes
cy.waitUntil(() =>
cy.task('queryPg', `SELECT status FROM saga_execution_logs WHERE correlation_id = '${correlationId}'`)
.then(rows => rows[0]?.status === 'SUCCEEDED'),
{ timeout: 10000, interval: 500 }
);
// Verify final state in the new service's DB
cy.task('queryPg', `SELECT * FROM contracts WHERE user_id = '${userId}'`)
.then(contracts => {
expect(contracts).to.have.lengthOf(1);
expect(contracts[0].amount).to.eq(deductAmount);
});
// **CRITICAL ASSERTION**: Verify final state in Oracle
cy.task('queryOracle', `SELECT QUOTA FROM USER_QUOTAS WHERE USER_ID = '${userId}'`)
.then(results => {
expect(results[0].QUOTA).to.eq(initialQuota - deductAmount);
});
});
});
it('should trigger compensation and restore Oracle quota when a later step fails', () => {
// Setup: We'll use cy.intercept to simulate a failure of a hypothetical "notification" service
// which would be the third step in the saga.
cy.intercept('POST', '/api/notifications', { statusCode: 500 }).as('failNotification');
// Action
cy.request({
method: 'POST',
url: 'http://localhost:3000/api/contracts',
body: { userId, amount: deductAmount },
failOnStatusCode: false // Prevent Cypress from failing the test on a non-2xx response
}).then((response) => {
const { correlationId } = response.body;
// Assertions: Poll until the saga is fully compensated or failed
cy.waitUntil(() =>
cy.task('queryPg', `SELECT status, step_logs FROM saga_execution_logs WHERE correlation_id = '${correlationId}'`)
.then(rows => {
const status = rows[0]?.status;
const logs = rows[0]?.step_logs;
// Check that compensation was triggered correctly
const compensatedStep = logs.find(log => log.step === 'DEDUCT_ORACLE_QUOTA' && log.status === 'COMPENSATED');
return (status === 'COMPENSATING' || status === 'FAILED') && compensatedStep;
}),
{ timeout: 10000, interval: 500 }
);
// Verify the contract was deleted (compensation for the first step)
cy.task('queryPg', `SELECT * FROM contracts WHERE user_id = '${userId}'`)
.then(contracts => {
expect(contracts).to.have.lengthOf(0);
});
// **CRITICAL ASSERTION**: Verify Oracle quota was restored
cy.task('queryOracle', `SELECT QUOTA FROM USER_QUOTAS WHERE USER_ID = '${userId}'`)
.then(results => {
expect(results[0].QUOTA).to.eq(initialQuota);
});
});
});
});
最终成果与架构图
通过这个流程,我们构建了一个独立的、可重用的Saga编排网关。它将复杂的分布式事务逻辑与核心业务服务解耦。当新的跨系统事务需求出现时,我们只需要定义新的Saga流程,而无需改动现有服务的代码。
sequenceDiagram participant Client participant Saga Gateway participant Contract Service participant Oracle DB participant Saga Log (Postgres) Client->>+Saga Gateway: POST /api/contracts (userId, amount) Saga Gateway->>+Saga Log (Postgres): CREATE log (status: PENDING) Saga Gateway-->>-Client: 200 OK (correlationId) Note right of Saga Gateway: Async Execution Starts Saga Gateway->>Saga Log (Postgres): UPDATE log (status: EXECUTING) Saga Gateway->>+Contract Service: CreateContract(userId, amount) Contract Service-->>-Saga Gateway: contractId Saga Gateway->>Saga Log (Postgres): LOG step 'CREATE_CONTRACT' SUCCEEDED Saga Gateway->>+Oracle DB: UPDATE USER_QUOTAS SET QUOTA = ... Oracle DB-->>-Saga Gateway: OK Saga Gateway->>Saga Log (Postgres): LOG step 'DEDUCT_ORACLE_QUOTA' SUCCEEDED Saga Gateway->>Saga Log (Postgres): UPDATE log (status: SUCCEEDED) %% ---- Compensation Path ---- %% alt Failure during Oracle deduct Client->>+Saga Gateway: POST /api/contracts Saga Gateway->>+Saga Log (Postgres): CREATE log (status: PENDING) Saga Gateway-->>-Client: 200 OK Saga Gateway->>+Contract Service: CreateContract(...) Contract Service-->>-Saga Gateway: contractId Saga Gateway->>Saga Log (Postgres): LOG step 'CREATE_CONTRACT' SUCCEEDED Saga Gateway->>+Oracle DB: UPDATE USER_QUOTAS... Oracle DB-->>-Saga Gateway: Fails (e.g., lock timeout) Saga Gateway->>Saga Log (Postgres): LOG step 'DEDUCT_ORACLE_QUOTA' FAILED Saga Gateway->>Saga Log (Postgres): UPDATE log (status: COMPENSATING) Note right of Saga Gateway: Compensation logic triggered Saga Gateway->>+Contract Service: DeleteContract(contractId) Contract Service-->>-Saga Gateway: OK Saga Gateway->>Saga Log (Postgres): LOG step 'CREATE_CONTRACT' COMPENSATED end
这个方案的当前实现并非没有局限性。首先,Saga编排网关本身是一个单点,生产环境需要部署为高可用集群,并依赖PostgreSQL的锁机制来保证同一时间只有一个实例处理一个Saga。其次,补偿逻辑的失败(例如,addQuota
也失败了)会导致事务进入一个需要人工干预的“毒丸”状态,必须建立配套的监控告警系统来捕获这类事件。最后,随着Saga流程变多,动态加载和管理Saga定义会成为一个新的挑战,可能需要引入更复杂的Saga定义存储和版本控制机制。未来的迭代方向将聚焦于提升网关的弹性和可观测性。