使用 Go Kit 构建可持久化且具备可观测性的 Saga 编排器


在分布式系统中,原子性是一个绕不开的坎。两阶段提交(2PC)因其同步阻塞和协调者单点问题,在高性能微服务场景下往往显得力不从心。Saga 模式作为一种最终一致性的分布式事务解决方案,通过将长事务拆分为一系列本地事务,并为每个事务提供补偿操作,来保证业务层面的原子性。但理论的优雅背后,是工程实现的泥潭。一个健壮的 Saga 实现必须回答几个棘手的问题:如果编排器节点自身崩溃,如何恢复未完成的事务?如果一个事务流程横跨数十个服务,如何快速定位其中失败的一环?

这正是我们要解决的核心挑战:构建一个不仅能执行 Saga 流程,还能在崩溃后恢复、并且每一步都清晰可追溯的编排器。我们将使用 Go 语言的简洁和 Go Kit 的微服务思想来打造这个组件。

我们的起点不是一个复杂的业务,而是一个简单的接口定义。这套接口是整个 Saga 编排器的基石。

package saga

import "context"

// Step 定义了 Saga 流程中的一个独立步骤。
// 它包含一个正向操作 (Execute) 和一个逆向补偿操作 (Compensate)。
// 在真实项目中,这两个函数通常是调用其他微服务的 gRPC 或 HTTP 客户端。
type Step struct {
	// Name 是步骤的唯一标识符,用于日志记录和状态持久化。
	Name string
	// Execute 是正向操作的执行函数。
	Execute func(ctx context.Context) error
	// Compensate 是补偿操作的执行函数,必须保证幂等性。
	Compensate func(ctx context.Context) error
}

// Definition 代表一个完整的 Saga 流程定义。
type Definition struct {
	// Name 是整个 Saga 流程的名称。
	Name  string
	Steps []Step
}

// Instance 代表一个 Saga 流程的运行时实例。
type Instance struct {
	ID          string
	Definition  *Definition
	CurrentStep int // 当前执行到的步骤索引
	State       State // 当前实例的状态
	// ExecutionLog 记录每个步骤的执行结果,用于补偿决策。
	// 在生产环境中,这可能会被更复杂的结构替代。
	ExecutionLog map[string]error
}

// State 是 Saga 实例的几种可能状态。
type State string

const (
	StatePending      State = "pending"
	StateExecuting    State = "executing"
	StateCompensating State = "compensating"
	StateCompleted    State = "completed"
	StateFailed       State = "failed"
)

这个定义的核心在于 Step 结构,它将一个操作和它的“解药”绑定在一起。而 Instance 则是一个具体的 Saga 流程在运行时的快照,它的 StateCurrentStep 是我们实现崩溃恢复的关键。

编排器的核心职责:状态机与持久化

一个 Saga 编排器本质上是一个状态机管理器。它接收一个 Saga 定义,创建一个实例,然后驱动这个实例根据预设的步骤向前(执行)或向后(补偿)。如果编排器是无状态的,那么一旦进程崩溃,所有进行中的 Saga 状态都会丢失,导致数据不一致。因此,持久化是不可或缺的一环。

我们将设计一个 Orchestrator,它依赖一个 Store 接口来读写 Saga 实例的状态。这种解耦设计使得底层存储可以轻易替换,无论是用于测试的内存存储,还是生产环境的 PostgreSQL 或 etcd。

stateDiagram-v2
    direction LR

    [*] --> Pending: 创建实例
    Pending --> Executing: 开始执行
    Executing --> Executing: 执行下一步成功
    Executing --> Compensating: 步骤执行失败
    Executing --> Completed: 所有步骤成功

    Compensating --> Compensating: 补偿下一步成功
    Compensating --> Failed: 补偿失败 (需要人工干预)
    Compensating --> Failed: 所有步骤补偿成功

    state "需要人工干预" as ManualIntervention
    Compensating --> ManualIntervention: 补偿函数返回严重错误

上图是 Saga 实例的生命周期。我们的编排器代码必须严格遵循这个状态转换图。

package saga

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/google/uuid"
	"github.com/opentracing/opentracing-go"
	"github.com/opentracing/opentracing-go/ext"
)

// Store 是 Saga 实例的持久化存储接口。
// 在真实项目中,其实现会对接数据库或 KV 存储。
type Store interface {
	// Save 保存或更新一个 Saga 实例的状态。
	Save(ctx context.Context, instance *Instance) error
	// Load 通过 ID 加载一个 Saga 实例。
	Load(ctx context.Context, id string) (*Instance, error)
	// ListUnfinished 列出所有未完成(Executing 或 Compensating)的 Saga 实例。
	// 这是崩溃恢复的关键。
	ListUnfinished(ctx context.Context) ([]*Instance, error)
}

// Orchestrator 是 Saga 流程的驱动核心。
type Orchestrator struct {
	logger       log.Logger
	tracer       opentracing.Tracer
	store        Store
	definitions  map[string]*Definition
	mu           sync.RWMutex
}

// NewOrchestrator 创建一个新的编排器实例。
func NewOrchestrator(logger log.Logger, tracer opentracing.Tracer, store Store) *Orchestrator {
	return &Orchestrator{
		logger:      log.With(logger, "component", "saga_orchestrator"),
		tracer:      tracer,
		store:       store,
		definitions: make(map[string]*Definition),
	}
}

// RegisterDefinition 注册一个 Saga 定义,使其可以被执行。
func (o *Orchestrator) RegisterDefinition(def *Definition) {
	o.mu.Lock()
	defer o.mu.Unlock()
	o.definitions[def.Name] = def
}

// ExecuteSaga 启动一个新的 Saga 流程。
// 这是整个流程的入口点。
func (o *Orchestrator) ExecuteSaga(ctx context.Context, sagaName string) (string, error) {
	o.mu.RLock()
	def, ok := o.definitions[sagaName]
	o.mu.RUnlock()
	if !ok {
		return "", fmt.Errorf("saga definition '%s' not found", sagaName)
	}

	instance := &Instance{
		ID:           uuid.NewString(),
		Definition:   def,
		CurrentStep:  -1, // -1 表示还未开始
		State:        StatePending,
		ExecutionLog: make(map[string]error),
	}

	// 持久化初始状态,这是保证可恢复性的第一步。
	if err := o.store.Save(ctx, instance); err != nil {
		level.Error(o.logger).Log("msg", "failed to save initial saga instance", "saga_id", instance.ID, "err", err)
		return "", err
	}
	level.Info(o.logger).Log("msg", "starting new saga", "saga_id", instance.ID, "saga_name", sagaName)

	// 异步执行,避免阻塞调用方。
	go o.run(instance)

	return instance.ID, nil
}

// run 是 Saga 的核心执行循环。
func (o *Orchestrator) run(instance *Instance) {
	// 为整个 Saga 实例的运行创建一个新的上下文和 tracing span。
	span, ctx := o.startSagaSpan(context.Background(), instance)
	defer span.Finish()

	// 1. 正向执行阶段
	instance.State = StateExecuting
	instance.CurrentStep = 0
	if err := o.store.Save(ctx, instance); err != nil {
		level.Error(o.logger).Log("msg", "failed to update saga state to executing", "saga_id", instance.ID, "err", err)
		// 无法持久化状态,这是一个严重问题,标记为失败并终止。
		o.failSaga(ctx, instance, err)
		return
	}

	executeSuccess := true
	for i := range instance.Definition.Steps {
		instance.CurrentStep = i
		step := instance.Definition.Steps[i]
		
		err := o.executeStep(ctx, instance, &step)
		instance.ExecutionLog[step.Name] = err

		// 在每一步执行后都进行持久化,这是崩溃恢复的关键点。
		// 代价是性能开销,但在一致性要求高的场景下是必要的。
		if saveErr := o.store.Save(ctx, instance); saveErr != nil {
			level.Error(o.logger).Log("msg", "failed to save saga progress, aborting", "saga_id", instance.ID, "step", step.Name, "err", saveErr)
			o.failSaga(ctx, instance, saveErr)
			return
		}

		if err != nil {
			level.Warn(o.logger).Log("msg", "saga step failed, starting compensation", "saga_id", instance.ID, "step", step.Name, "err", err)
			executeSuccess = false
			break
		}
	}
	
	// 2. 决策与补偿阶段
	if executeSuccess {
		// 所有步骤成功
		instance.State = StateCompleted
		if err := o.store.Save(ctx, instance); err != nil {
			level.Error(o.logger).Log("msg", "failed to mark saga as completed", "saga_id", instance.ID, "err", err)
			o.failSaga(ctx, instance, err)
		} else {
			level.Info(o.logger).Log("msg", "saga completed successfully", "saga_id", instance.ID)
			span.SetTag("saga.state", "completed")
		}
	} else {
		// 发生错误,需要补偿
		o.compensate(ctx, instance)
	}
}

// executeStep 封装了单步的执行、日志和追踪逻辑。
func (o *Orchestrator) executeStep(ctx context.Context, instance *Instance, step *Step) error {
	span, stepCtx := o.startStepSpan(ctx, instance, step.Name, "execute")
	defer span.Finish()

	level.Info(o.logger).Log("msg", "executing step", "saga_id", instance.ID, "step", step.Name)
	
	err := step.Execute(stepCtx)
	if err != nil {
		ext.Error.Set(span, true)
		span.LogKV("event", "error", "message", err.Error())
		level.Error(o.logger).Log("msg", "step execution failed", "saga_id", instance.ID, "step", step.Name, "err", err)
	} else {
		level.Info(o.logger).Log("msg", "step executed successfully", "saga_id", instance.ID, "step", step.Name)
	}
	return err
}

// compensate 是补偿流程的驱动器。
func (o *Orchestrator) compensate(ctx context.Context, instance *Instance) {
    sagaSpan := opentracing.SpanFromContext(ctx)
    if sagaSpan != nil {
        sagaSpan.SetTag("saga.state", "compensating")
    }

	instance.State = StateCompensating
	if err := o.store.Save(ctx, instance); err != nil {
		level.Error(o.logger).Log("msg", "failed to update saga state to compensating", "saga_id", instance.ID, "err", err)
		o.failSaga(ctx, instance, err)
		return
	}

	// 从失败步骤的前一个步骤开始,反向补偿。
	for i := instance.CurrentStep; i >= 0; i-- {
		// 只有成功执行的步骤才需要补偿。
		step := instance.Definition.Steps[i]
		if instance.ExecutionLog[step.Name] == nil {
			err := o.compensateStep(ctx, instance, &step)
			if err != nil {
				// 补偿失败是一个严重问题,通常需要人工介入。
				level.Error(o.logger).Log("msg", "COMPENSATION FAILED, MANUAL INTERVENTION REQUIRED", "saga_id", instance.ID, "step", step.Name, "err", err)
				o.failSaga(ctx, instance, err)
				return // 停止补偿流程
			}
		}
	}
	
	// 所有补偿成功完成
	level.Warn(o.logger).Log("msg", "saga fully compensated", "saga_id", instance.ID)
	instance.State = StateFailed // 最终状态是失败,因为整个业务逻辑未完成
	if err := o.store.Save(ctx, instance); err != nil {
		level.Error(o.logger).Log("msg", "failed to mark saga as failed after compensation", "saga_id", instance.ID, "err", err)
	}
    if sagaSpan != nil {
        sagaSpan.SetTag("saga.state", "failed")
    }
}


// compensateStep 封装了单步的补偿、日志和追踪逻辑。
func (o *Orchestrator) compensateStep(ctx context.Context, instance *Instance, step *Step) error {
	span, stepCtx := o.startStepSpan(ctx, instance, step.Name, "compensate")
	defer span.Finish()

	level.Warn(o.logger).Log("msg", "compensating step", "saga_id", instance.ID, "step", step.Name)

	err := step.Compensate(stepCtx)
	if err != nil {
		ext.Error.Set(span, true)
		span.LogKV("event", "error", "message", err.Error())
		level.Error(o.logger).Log("msg", "step compensation failed", "saga_id", instance.ID, "step", step.Name, "err", err)
	} else {
		level.Info(o.logger).Log("msg", "step compensated successfully", "saga_id", instance.ID, "step", step.Name)
	}
	return err
}


// failSaga 将 Saga 标记为最终失败状态,通常在发生不可恢复错误时调用。
func (o *Orchestrator) failSaga(ctx context.Context, instance *Instance, err error) {
	instance.State = StateFailed
	if saveErr := o.store.Save(ctx, instance); saveErr != nil {
		level.Error(o.logger).Log("msg", "CRITICAL: failed to mark saga as failed", "saga_id", instance.ID, "original_err", err, "save_err", saveErr)
	}
    span := opentracing.SpanFromContext(ctx)
    if span != nil {
        span.SetTag("saga.state", "failed")
        ext.Error.Set(span, true)
    }
}

可观测性:用日志和追踪照亮黑盒

上面的代码中,我们已经集成了 Go Kit 的 log 和 OpenTracing API。这不是装饰,而是 Saga 实现中不可或缺的一部分。在一个复杂的分布式事务中,如果无法回答“当前进行到哪一步?”、“为什么会失败?”、“补偿流程是否执行?”,那么这个系统就是不可维护的。

我们通过两个辅助函数来创建结构化的 Span:

package saga

import (
	// ... imports from above
	"github.com/opentracing/opentracing-go/log"
)

func (o *Orchestrator) startSagaSpan(ctx context.Context, instance *Instance) (opentracing.Span, context.Context) {
	span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, o.tracer, "Saga."+instance.Definition.Name)
	span.SetTag("saga.id", instance.ID)
	span.SetTag("saga.name", instance.Definition.Name)
	span.SetTag("saga.state", string(instance.State))
	return span, ctx
}

func (o *Orchestrator) startStepSpan(ctx context.Context, instance *Instance, stepName, operation string) (opentracing.Span, context.Context) {
	parentSpan := opentracing.SpanFromContext(ctx)
	span := o.tracer.StartSpan(
		"SagaStep."+stepName,
		opentracing.ChildOf(parentSpan.Context()),
	)
	span.SetTag("saga.id", instance.ID)
	span.SetTag("step.name", stepName)
	span.SetTag("step.operation", operation)
	logFields := []log.Field{
		log.String("saga_id", instance.ID),
		log.String("step_name", stepName),
		log.String("operation", operation),
	}
	span.LogFields(logFields...)
	return span, opentracing.ContextWithSpan(ctx, span)
}

通过这种方式,每次 Saga 执行都会在 Jaeger 或 Zipkin 这样的追踪系统中留下一条完整的轨迹。每个 ExecuteCompensate 操作都是父级 Saga Span 下的子 Span。当问题发生时,运维人员可以清晰地看到整个调用链,哪个步骤耗时最长,哪个步骤返回了错误,以及补偿逻辑是否被正确触发。

崩溃恢复:从持久化状态中重生

到目前为止,我们设计的编排器可以在单次运行中处理成功和失败。但真正的考验是当编排器进程本身被终止时。这就是 Store 接口中 ListUnfinished 方法的用武之地。

我们需要在编排器启动时增加一个恢复逻辑:

// 在 Orchestrator 结构体中添加一个 WaitGroup
type Orchestrator struct {
	// ... 其他字段
	wg           sync.WaitGroup
}

// ... NewOrchestrator 中初始化 wg

// Recover 未完成的 Saga 流程。
// 这个方法应该在服务启动时调用。
func (o *Orchestrator) Recover(ctx context.Context) error {
	level.Info(o.logger).Log("msg", "starting recovery of unfinished sagas")

	instances, err := o.store.ListUnfinished(ctx)
	if err != nil {
		level.Error(o.logger).Log("msg", "failed to list unfinished sagas for recovery", "err", err)
		return err
	}

	if len(instances) == 0 {
		level.Info(o.logger).Log("msg", "no unfinished sagas found to recover")
		return nil
	}

	level.Info(o.logger).Log("msg", "found unfinished sagas", "count", len(instances))

	for _, instance := range instances {
		// 必须重新加载定义,因为持久化的实例可能不包含它
		def, ok := o.definitions[instance.Definition.Name]
		if !ok {
			level.Error(o.logger).Log("msg", "saga definition not found for recovering instance, skipping", "saga_id", instance.ID, "saga_name", instance.Definition.Name)
			continue
		}
		instance.Definition = def
		
		level.Info(o.logger).Log("msg", "recovering saga", "saga_id", instance.ID, "state", instance.State, "step", instance.CurrentStep)
		
		// 异步恢复执行
		o.wg.Add(1)
		go func(inst *Instance) {
			defer o.wg.Done()
			if inst.State == StateExecuting {
				o.resumeExecution(inst)
			} else if inst.State == StateCompensating {
				o.resumeCompensation(inst)
			}
		}(instance)
	}

	return nil
}

// resumeExecution 从中断处继续执行 Saga。
func (o *Orchestrator) resumeExecution(instance *Instance) {
    // 恢复逻辑与 run 类似,但从 instance.CurrentStep 开始
    // ...
    // 为了简洁,这里直接调用 run,一个更优的实现会拆分 run 方法
    // 以便能从任意步骤开始,而不是从头开始。
    // 这里我们用一个简化的方式,重新触发整个流程,
    // 但让已完成的步骤快速跳过(如果它们是幂等的)。
    // 在真实项目中,你需要一个更精细的 resume 函数。
    level.Info(o.logger).Log("msg", "resuming execution for saga", "saga_id", instance.ID)
    o.run(instance) // 简化处理,实际应为 o.resume(instance, instance.CurrentStep)
}

// resumeCompensation 从中断处继续补偿 Saga。
func (o *Orchestrator) resumeCompensation(instance *Instance) {
    span, ctx := o.startSagaSpan(context.Background(), instance)
    defer span.Finish()
    level.Info(o.logger).Log("msg", "resuming compensation for saga", "saga_id", instance.ID)
    o.compensate(ctx, instance)
}

注意:一个完整的 resumeExecution 实现会更复杂,它需要从 instance.CurrentStep 开始继续循环,而不是简单地重跑整个 run 函数。这里的调用是为了演示恢复流程的入口。

实践中的权衡与局限

我们构建的这个编排器解决了 Saga 模式的核心工程挑战:状态持久化、崩溃恢复和可观测性。然而,它并非银弹,在真实项目中应用时,还需要考虑以下几点:

  1. 存储层的选型:我们使用的 Store 接口隐藏了实现细节。生产环境中,需要一个支持事务性写入的存储(如 PostgreSQL),以确保 Saga 状态的更新本身是原子的。使用 etcd 这样的分布式 KV 存储也是一个不错的选择,可以利用其 CAS (Compare-And-Swap) 操作来避免并发冲突。

  2. 并发执行与隔离:当前的实现通过 go o.run(instance) 异步执行每个 Saga,但编排器内部没有对并发执行的 Saga 数量做限制。在高并发场景下,可能需要一个 worker pool 模型来管理资源。同时,Saga 模式本身不提供事务隔离性,业务必须能容忍在事务执行期间的中间状态。

  3. 补偿操作的幂等性:这是 Saga 模式的硬性要求。如果补偿操作不是幂等的(例如,一个退款操作被执行了两次),崩溃恢复机制反而会造成更大的问题。设计补偿接口时,必须在业务层面保证这一点,例如通过检查操作是否已被执行过来实现。

  4. **编排 vs. 协同 (Orchestration vs. Choreography)**:我们实现的是一个中心化的编排模型。它的优点是逻辑集中,易于监控和管理。缺点是引入了一个单点(尽管可以做高可用部署),并且与服务存在一定的耦合。另一种选择是协同模型,服务通过监听事件来触发下一步操作。协同模型更松散,但端到端的事务流程变得难以追踪,更像是一个黑盒。技术选型需要根据团队和业务的复杂度来权衡。

这个基于 Go Kit 的 Saga 编排器实现,虽然只是一个骨架,但它展示了如何将一个理论模式转化为一个可靠且可维护的工程组件。在微服务架构中,处理分布式事务的复杂性无法避免,但通过精心设计,我们可以构建出足够健壮的工具来驯服这头猛兽。


  目录