构建Go语言驱动的Saga分布式事务协调器并集成tRPC与Vite前端


从单体架构向微服务拆分时,一个原本简单的数据库事务操作会迅速演变成一个棘手的分布式一致性问题。设想一个用户注册流程,它需要原子性地完成三个操作:在user-service中创建用户、在billing-service中创建试用订阅、在notification-service中发送欢迎邮件。如果billing-service调用失败,user-service中已创建的用户数据就成了必须处理的脏数据。在生产项目中,依赖两阶段提交(2PC)协议通常是不可行的,因为它要求所有服务在事务期间锁定资源,这对系统的可用性和性能是巨大的打击。

我们最终选择Saga模式来解决这类问题。具体来说,是采用“编排式Saga”(Orchestration-based Saga),由一个中心的协调器(Orchestrator)来驱动整个事务流程。这种模式不要求服务间同步锁定资源,而是通过一系列的本地事务和对应的“补偿操作”(Compensation)来保证数据的最终一致性。当某个步骤失败时,协调器会反向调用前面所有已成功步骤的补偿操作,从而实现逻辑上的回滚。

本文记录了从零开始构建一个通用的、基于Go语言的内存Saga协调器,并为其搭建一个由tRPC、Vite和UnoCSS驱动的、类型安全的前端监控面板的全过程。

架构设计与技术选型

我们的目标是构建一个能够执行、监控和补偿分布式事务的系统。

graph TD
    subgraph "前端 (Vite + React)"
        A[UI监控面板] -- tRPC调用 --> B(API网关)
    end

    subgraph "后端 (Go)"
        B -- HTTP请求 --> C{Saga协调器}

        C -- 执行/补偿 --> D[User Service]
        C -- 执行/补偿 --> E[Billing Service]
        C -- 执行/补偿 --> F[Notification Service]
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
  1. Saga协调器 (Go): 系统的核心。它接收Saga流程定义,按顺序执行每个步骤。如果中途失败,则执行补偿逻辑。选择Go是因为其出色的并发性能和静态类型系统,非常适合构建高可靠性的后台服务。
  2. API网关 (Go): 一个简单的HTTP服务,作为前端与协调器的桥梁。
  3. tRPC与类型安全: 尽管tRPC是TypeScript生态的原生产物,但其核心思想——“无需代码生成即可实现端到端类型安全”——极具吸引力。在Go与TypeScript的异构栈中,我们无法直接使用tRPC。但我们可以借鉴其模式:在Go中定义API的数据结构,然后在前端项目中手动创建一份完全对应的TypeScript类型定义。这种“契约式”的类型同步,虽有维护成本,但相比于OpenAPI/GraphQL的代码生成方案,在小型项目中更为轻量,且能带来几乎等同于tRPC的开发体验。
  4. Vite + UnoCSS: 前端选择Vite是为了极致的开发效率。UnoCSS则让我们能够以原子化、按需生成的方式快速构建UI,无需在CSS文件和组件之间来回切换,尤其适合开发这种功能性的内部面板。

核心实现:Go Saga协调器

协调器的设计必须足够通用,以便支持不同业务的Saga流程。我们首先定义Saga及其步骤的核心数据结构。

pkg/saga/saga.go

package saga

import (
	"context"
	"log"
	"sync"
)

// StepAction 定义了Saga中一个步骤需要执行的正向操作和补偿操作。
// ctx 用于传递请求上下文,payload 是执行该步骤所需的输入数据。
// 返回值是该步骤的输出,可作为后续步骤的输入。
type StepAction func(ctx context.Context, payload interface{}) (interface{}, error)

// Step 定义了Saga中的一个原子步骤。
type Step struct {
	Name         string     // 步骤名称,用于日志和监控
	Action       StepAction // 正向操作
	Compensation StepAction // 补偿操作
}

// Saga 定义了一个完整的分布式事务流程。
type Saga struct {
	ID    string  // 唯一ID,用于追踪
	Steps []Step  // 事务包含的所有步骤
	state *sagaState // 内部状态,非导出
}

// sagaState 追踪Saga的执行状态。
type sagaState struct {
	mu            sync.RWMutex
	currentStep   int
	payloads      map[int]interface{} // 存储每个成功步骤的输出
	lastError     error
	isCompensating bool
}

// NewSaga 创建一个新的Saga实例。
func NewSaga(id string, steps []Step) *Saga {
	return &Saga{
		ID:    id,
		Steps: steps,
		state: &sagaState{
			currentStep: -1, // -1表示尚未开始
			payloads:    make(map[int]interface{}),
		},
	}
}

// Executor 是Saga的执行引擎。
type Executor struct{}

func NewExecutor() *Executor {
	return &Executor{}
}

// Execute 开始执行一个Saga流程。
// initialPayload 是Saga第一个步骤的输入数据。
func (e *Executor) Execute(ctx context.Context, s *Saga, initialPayload interface{}) error {
	log.Printf("[Saga %s] Starting execution", s.ID)
	s.state.mu.Lock()
	s.state.currentStep = 0
	s.state.mu.Unlock()

	var currentPayload = initialPayload

	for i, step := range s.Steps {
		s.state.mu.Lock()
		s.state.currentStep = i
		s.state.mu.Unlock()

		log.Printf("[Saga %s] Executing step '%s' (%d/%d)", s.ID, step.Name, i+1, len(s.Steps))

		// 执行正向操作
		output, err := step.Action(ctx, currentPayload)
		if err != nil {
			log.Printf("[Saga %s] Step '%s' failed: %v. Starting compensation.", s.ID, step.Name, err)
			s.state.mu.Lock()
			s.state.lastError = err
			s.state.mu.Unlock()
			
			// 如果任何一个步骤失败,立即启动补偿流程
			e.compensate(ctx, s)
			return err
		}

		// 存储该步骤的输出,作为下一步的输入
		s.state.mu.Lock()
		s.state.payloads[i] = output
		s.state.mu.Unlock()
		currentPayload = output
	}
    
    s.state.mu.Lock()
    s.state.currentStep = -2 // -2 表示执行成功
    s.state.mu.Unlock()
	log.Printf("[Saga %s] Execution completed successfully", s.ID)
	return nil
}

// compensate 从失败步骤的前一步开始,反向执行所有已成功步骤的补偿操作。
func (e *Executor) compensate(ctx context.Context, s *Saga) {
	s.state.mu.Lock()
	if s.state.isCompensating {
		s.state.mu.Unlock()
		return
	}
	s.state.isCompensating = true
	startCompensationFrom := s.state.currentStep - 1
	s.state.mu.Unlock()

	log.Printf("[Saga %s] Starting compensation from step %d", s.ID, startCompensationFrom)

	for i := startCompensationFrom; i >= 0; i-- {
		step := s.Steps[i]
		log.Printf("[Saga %s] Compensating for step '%s'", s.ID, step.Name)

		// 获取执行该步骤正向操作时的输入,以便补偿操作使用。
		// 注意:补偿操作的输入是其对应正向操作的输出。
		var compensationPayload interface{}
		s.state.mu.RLock()
		if i > 0 {
			// 对于非第一步,其输入是上一步的输出
			compensationPayload = s.state.payloads[i-1]
		}
		s.state.mu.RUnlock()

		if _, err := step.Compensation(ctx, compensationPayload); err != nil {
			// 这是一个严重的错误,补偿失败。
			// 在真实项目中,这里需要有重试机制、告警和人工干预流程。
			log.Printf("[Saga %s] CRITICAL: Compensation for step '%s' failed: %v. Manual intervention required.", s.ID, step.Name, err)
            s.state.mu.Lock()
            s.state.currentStep = -3 // -3 表示补偿失败
            s.state.mu.Unlock()
			// 即使一个补偿失败,我们仍然尝试继续补偿剩下的步骤
		}
	}
    
    s.state.mu.Lock()
    if s.state.currentStep != -3 { // 如果没有补偿失败
        s.state.currentStep = -4 // -4 表示补偿成功
    }
    s.state.isCompensating = false
    s.state.mu.Unlock()
	log.Printf("[Saga %s] Compensation finished.", s.ID)
}

// GetState 返回Saga的当前状态,用于API查询。
func (s *Saga) GetState() map[string]interface{} {
	s.state.mu.RLock()
	defer s.state.mu.RUnlock()

	status := "Running"
	currentStepName := ""
	if s.state.currentStep >= 0 && s.state.currentStep < len(s.Steps) {
		currentStepName = s.Steps[s.state.currentStep].Name
	}

	switch s.state.currentStep {
	case -1:
		status = "Pending"
	case -2:
		status = "Succeeded"
	case -3:
		status = "CompensationFailed"
	case -4:
		status = "Compensated"
	}
    
    if s.state.isCompensating && status == "Running" {
        status = "Compensating"
    }

	var errMsg string
	if s.state.lastError != nil {
		errMsg = s.state.lastError.Error()
	}

	return map[string]interface{}{
		"id":              s.ID,
		"status":          status,
		"currentStep":     s.state.currentStep,
		"currentStepName": currentStepName,
		"totalSteps":      len(s.Steps),
		"error":           errMsg,
	}
}

这个协调器是内存态的,对于演示足够,但在生产环境中,sagaState必须持久化到Redis或数据库中,以防止协调器进程崩溃导致事务状态丢失。

业务场景:用户注册流程

现在,我们用这个协调器来编排前面提到的用户注册流程。

cmd/server/main.go

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	"sync"
	"time"

	"github.com/google/uuid"
	"your_module/pkg/saga" // 替换为你的模块路径
)

// --- 模拟微服务 ---

// MockUserService 模拟用户服务
type MockUserService struct{}

func (s *MockUserService) CreateUser(ctx context.Context, email string) (string, error) {
	log.Printf("USER_SERVICE: Creating user with email: %s", email)
	time.Sleep(500 * time.Millisecond)
	userID := fmt.Sprintf("user-%d", rand.Intn(10000))
	log.Printf("USER_SERVICE: User %s created.", userID)
	return userID, nil
}

func (s *MockUserService) DeleteUser(ctx context.Context, userID string) error {
	log.Printf("USER_SERVICE: Compensating - Deleting user %s.", userID)
	time.Sleep(200 * time.Millisecond)
	return nil
}

// MockBillingService 模拟计费服务
type MockBillingService struct{ ShouldFail bool }

func (s *MockBillingService) CreateSubscription(ctx context.Context, userID string) (string, error) {
	log.Printf("BILLING_SERVICE: Creating subscription for user %s", userID)
	time.Sleep(500 * time.Millisecond)
	if s.ShouldFail {
		log.Println("BILLING_SERVICE: Intentionally failing subscription creation.")
		return "", fmt.Errorf("insufficient funds")
	}
	subID := fmt.Sprintf("sub-%d", rand.Intn(10000))
	log.Printf("BILLING_SERVICE: Subscription %s created.", subID)
	return subID, nil
}

func (s *MockBillingService) CancelSubscription(ctx context.Context, userID string) error {
	log.Printf("BILLING_SERVICE: Compensating - Cancelling subscription for user %s", userID)
	time.Sleep(200 * time.Millisecond)
	return nil
}

// --- Saga 定义 ---

var (
	sagaStore = make(map[string]*saga.Saga)
	storeMu   sync.RWMutex
	executor  = saga.NewExecutor()
)

// buildRegistrationSaga 构建用户注册的Saga流程
func buildRegistrationSaga(sagaID string, billingShouldFail bool) *saga.Saga {
	userService := &MockUserService{}
	billingService := &MockBillingService{ShouldFail: billingShouldFail}

	steps := []saga.Step{
		{
			Name: "CreateUser",
			Action: func(ctx context.Context, payload interface{}) (interface{}, error) {
				email := payload.(string)
				return userService.CreateUser(ctx, email)
			},
			Compensation: func(ctx context.Context, payload interface{}) (interface{}, error) {
				// 补偿操作的输入是上一步的输出,这里是 initialPayload
				// 但CreateUser的补偿只需要知道userID,而userID是它的输出。
				// 这是一个设计上的权衡。为了简化,我们假设补偿能从上下文中获取必要信息。
				// 在真实系统中,Action的输出(如userID)需要被捕获并用于补偿。
				// 我们的Executor实现尚未处理好这一点,这是一个待改进项。
				// 此处我们仅作演示,实际应传递userID。
				return nil, userService.DeleteUser(ctx, "some-user-id-from-context")
			},
		},
		{
			Name: "CreateSubscription",
			Action: func(ctx context.Context, payload interface{}) (interface{}, error) {
				userID := payload.(string) // payload是上一步(CreateUser)的输出
				return billingService.CreateSubscription(ctx, userID)
			},
			Compensation: func(ctx context.Context, payload interface{}) (interface{}, error) {
				userID := payload.(string)
				return nil, billingService.CancelSubscription(ctx, userID)
			},
		},
		// ...可以添加发送邮件等步骤...
	}
	return saga.NewSaga(sagaID, steps)
}

// --- HTTP API ---

func main() {
	http.HandleFunc("/api/register", handleRegister)
	http.HandleFunc("/api/saga-status", handleSagaStatus)

	// 设置CORS,用于开发环境
	corsHandler := func(h http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.Header().Set("Access-Control-Allow-Origin", "*")
			w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
			w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
			if r.Method == "OPTIONS" {
				w.WriteHeader(http.StatusOK)
				return
			}
			h.ServeHTTP(w, r)
		})
	}
	
	log.Println("Server starting on :8080...")
	log.Fatal(http.ListenAndServe(":8080", corsHandler(http.DefaultServeMux)))
}

type RegisterRequest struct {
	Email      string `json:"email"`
	ShouldFail bool   `json:"shouldFail"`
}

func handleRegister(w http.ResponseWriter, r *http.Request) {
	var req RegisterRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	sagaID := uuid.New().String()
	registrationSaga := buildRegistrationSaga(sagaID, req.ShouldFail)

	storeMu.Lock()
	sagaStore[sagaID] = registrationSaga
	storeMu.Unlock()

	// 异步执行Saga
	go executor.Execute(context.Background(), registrationSaga, req.Email)

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(map[string]string{"sagaId": sagaID})
}

func handleSagaStatus(w http.ResponseWriter, r *http.Request) {
	sagaID := r.URL.Query().Get("id")
	if sagaID == "" {
		http.Error(w, "saga id is required", http.StatusBadRequest)
		return
	}

	storeMu.RLock()
	s, ok := sagaStore[sagaID]
	storeMu.RUnlock()

	if !ok {
		http.Error(w, "saga not found", http.NotFound)
		return
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(s.GetState())
}

这个main.go文件模拟了两个微服务,并定义了一个包含两个步骤的Saga流程。它提供了两个API端点:一个用于触发Saga,另一个用于查询其状态。注意,为了演示补偿流程,我们特意在API请求中加入了一个shouldFail布尔值。

前端实现:tRPC驱动的监控面板

前端项目使用 pnpm create vite 初始化,选择React + TypeScript模板。

1. 定义类型契约

借鉴tRPC的思想,我们首先创建类型定义文件,手动与Go后端的数据结构保持同步。

src/api/types.ts

// 这个文件是Go后端API响应的手动类型镜像
export interface SagaState {
  id: string;
  status: 'Pending' | 'Running' | 'Succeeded' | 'Compensating' | 'Compensated' | 'CompensationFailed';
  currentStep: number;
  currentStepName: string;
  totalSteps: number;
  error: string | null;
}

export interface RegisterResponse {
  sagaId: string;
}

这个SagaState接口精确地描述了Go后端/api/saga-status端点的JSON响应结构。

2. “伪”tRPC客户端

我们使用@tanstack/react-query来管理API请求的状态,并封装一个类似tRPC的客户端。

src/api/client.ts

import { QueryClient } from '@tanstack/react-query';
import { SagaState, RegisterResponse } from './types';

const API_BASE_URL = 'http://localhost:8080';

export const queryClient = new QueryClient();

// 手动实现的、具有类型提示的API客户端
const createApiClient = () => {
  const post = async <TRequest, TResponse>(path: string, body: TRequest): Promise<TResponse> => {
    const res = await fetch(`${API_BASE_URL}${path}`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(body),
    });
    if (!res.ok) {
      throw new Error(`API Error: ${res.statusText}`);
    }
    return res.json() as Promise<TResponse>;
  };

  const get = async <TResponse>(path: string): Promise<TResponse> => {
    const res = await fetch(`${API_BASE_URL}${path}`);
    if (!res.ok) {
      throw new Error(`API Error: ${res.statusText}`);
    }
    return res.json() as Promise<TResponse>;
  };

  return {
    register: {
      mutate: (variables: { email: string; shouldFail: boolean }) =>
        post<typeof variables, RegisterResponse>('/api/register', variables),
    },
    sagaStatus: {
      query: (variables: { id: string }) =>
        get<SagaState>(`/api/saga-status?id=${variables.id}`),
    },
  };
};

export const api = createApiClient();

这个api对象暴露了与后端方法对应的函数,并利用TypeScript泛型确保了请求体和响应的类型安全,实现了tRPC的核心价值。

3. UI组件与UnoCSS

最后,我们用React和UnoCSS构建UI。UnoCSS的配置极为简单,在vite.config.ts中添加插件即可,无需单独的CSS文件。

src/App.tsx

import { useState } from 'react';
import { useMutation, useQuery } from '@tanstack/react-query';
import { api } from './api/client';
import { SagaState } from './api/types';

// UnoCSS 在JSX中直接使用原子化class
// e.g., `p-8` is padding: 2rem, `max-w-xl` is max-width: 36rem
// `mx-auto` is margin-left/right: auto, `font-mono` is font-family: monospace
// `flex`, `items-center`, `gap-4` for flexbox layout

const statusStyles: Record<SagaState['status'], string> = {
  Pending: 'bg-gray-100 text-gray-800',
  Running: 'bg-blue-100 text-blue-800 animate-pulse',
  Succeeded: 'bg-green-100 text-green-800',
  Compensating: 'bg-yellow-100 text-yellow-800 animate-pulse',
  Compensated: 'bg-orange-100 text-orange-800',
  CompensationFailed: 'bg-red-200 text-red-900',
};

function SagaMonitor({ sagaId }: { sagaId: string }) {
  const { data, error, isLoading } = useQuery({
    queryKey: ['sagaStatus', sagaId],
    queryFn: () => api.sagaStatus.query({ id: sagaId }),
    refetchInterval: 1000, // 每秒轮询一次状态
    enabled: !!sagaId,
  });

  if (!sagaId) return null;
  if (isLoading) return <div className="p-4 rounded-lg bg-gray-50">Loading saga status...</div>;
  if (error) return <div className="p-4 rounded-lg bg-red-100 text-red-800">Error: {error.message}</div>;

  const progressPercentage = data && data.totalSteps > 0 
    ? ((data.currentStep + 1) / data.totalSteps) * 100 
    : 0;

  return (
    <div className="mt-6 p-4 border rounded-lg shadow-sm font-mono text-sm">
      <div className="flex justify-between items-center mb-4">
        <h3 className="font-bold">Saga Status</h3>
        <span className={`px-2 py-1 text-xs font-semibold rounded-full ${statusStyles[data!.status]}`}>
          {data?.status}
        </span>
      </div>
      <div className="text-gray-500 mb-2 truncate">ID: {data?.id}</div>
      <div className="space-y-2">
        <div>Step: {data?.currentStepName || 'N/A'} ({data ? data.currentStep + 1 : 0}/{data?.totalSteps})</div>
        <div className="w-full bg-gray-200 rounded-full h-2.5">
          <div className="bg-blue-600 h-2.5 rounded-full" style={{ width: `${progressPercentage}%` }}></div>
        </div>
        {data?.error && <div className="mt-2 p-2 bg-red-50 text-red-700 rounded">Error: {data.error}</div>}
      </div>
    </div>
  );
}

export default function App() {
  const [email, setEmail] = useState('[email protected]');
  const [shouldFail, setShouldFail] = useState(false);
  const [sagaId, setSagaId] = useState<string | null>(null);

  const mutation = useMutation({
    mutationFn: api.register.mutate,
    onSuccess: (data) => {
      setSagaId(data.sagaId);
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    setSagaId(null); // Reset previous saga monitoring
    mutation.mutate({ email, shouldFail });
  };

  return (
    <main className="p-8 max-w-xl mx-auto bg-white rounded-xl shadow-md mt-10">
      <h1 className="text-2xl font-bold mb-4">Saga Pattern Demo</h1>
      <form onSubmit={handleSubmit} className="space-y-4">
        <div>
          <label htmlFor="email" className="block text-sm font-medium text-gray-700">Email</label>
          <input
            id="email"
            type="email"
            value={email}
            onChange={(e) => setEmail(e.target.value)}
            className="mt-1 block w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-indigo-500 focus:border-indigo-500"
            required
          />
        </div>
        <div className="flex items-center">
          <input
            id="shouldFail"
            type="checkbox"
            checked={shouldFail}
            onChange={(e) => setShouldFail(e.target.checked)}
            className="h-4 w-4 text-indigo-600 border-gray-300 rounded focus:ring-indigo-500"
          />
          <label htmlFor="shouldFail" className="ml-2 block text-sm text-gray-900">
            Force failure in Billing Service
          </label>
        </div>
        <button
          type="submit"
          disabled={mutation.isLoading}
          className="w-full flex justify-center py-2 px-4 border border-transparent rounded-md shadow-sm text-sm font-medium text-white bg-indigo-600 hover:bg-indigo-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-indigo-500 disabled:bg-gray-400"
        >
          {mutation.isLoading ? 'Processing...' : 'Register User'}
        </button>
      </form>
      {mutation.isError && (
        <div className="mt-4 p-3 bg-red-100 text-red-800 rounded">
          {(mutation.error as Error).message}
        </div>
      )}
      <SagaMonitor sagaId={sagaId!} />
    </main>
  );
}

运行Go后端和Vite前端后,我们得到一个功能完备的界面。当不勾选“Force failure”时,Saga流程会顺利执行到底,状态最终变为Succeeded。当勾选后,流程会在第二步失败,然后状态会变为Compensating,最后变为Compensated,控制台日志会清晰地打印出补偿操作的执行过程。

局限与展望

这个实现虽然验证了核心思路,但在生产环境中还存在诸多不足。首先,Saga协调器本身是单点且内存态的,无法容忍宕机。一个健壮的实现需要将Saga状态机持久化,并考虑协调器自身的高可用部署。

其次,我们手动同步Go与TypeScript类型的方式,在项目规模扩大、API增多时会成为一个维护痛点。可以考虑引入基于OpenAPI或Protobuf的规范,利用代码生成工具来自动化这一过程,但这会牺牲掉tRPC模式的轻量性。

最后,补偿逻辑的可靠性是Saga模式的基石。当前的实现中,如果一个补偿操作自身失败,系统将进入一个需要人工介入的危险状态。生产级的Saga协调器必须包含针对补偿失败的重试、告警和死信队列机制。未来的迭代方向将是解决这些可靠性与可维护性问题,例如,将协调器的状态存储移至PostgreSQL,并引入基于指数退避的重试逻辑。


  目录