Паттерн Saga c использованием MassTransit

В этой статье кратко о том, что такое Saga, и как с ней работать в MassTransit на платформе .NET.

Преимущества микросервисов оборачиваются и некоторой сложностью: поскольку каждый микросервис имеет свою базу данных, то у нас нет возможности атомарно выполнить транзакцию, касающуюся нескольких микросервисов. На помощь приходит паттерн Saga, которая позволяет разбить долго живущую транзакцию (long-living transaction, LLT) на несколько простых шагов. Каждый из шагов выполняется атомарно и может выполняться в разных сервисах. Сага считается завершенной, если все шаги завершились успешно. Если один из шагов не был успешен, то для ранее выполненных шагов возможно выполнить компенсационные транзакции.

Пример: покупка билета на концерт.

Пусть мы имеем следующие микросервисы:

  • Booking Service - резервирование места и отмена бронирования
  • Payment Service - платежи
  • Document Service - формирование различных документов, в том числе и билетов

Чтобы купить билет в такой системе нам потребуется скоординировать действия всех трех сервисов, при этом отслеживать прогресс выполнения и в случае необходимости выполнять компенсации. Вот как это может выглядеть с применением паттерна Saga:

Пример архитектуры

Для реализации подобной схемы с нуля требуется написать изрядное количество кода, при этом учитывая массу нюансов. Для платформы .NET не так много готовых реализаций саг, в отличие например от Java. Но к счастью есть MassTransit, который помогает избежать написания рутинного кода и, более того, имеет механизмы для решения типовых проблем в такой схеме взаимодействия.

Для реализации саги с MassTransit потребуется сделать следующее:

  • описать, что будет представлять собой хранимое состояние саги (реализация интерфейса SagaStateMachineInstance)
  • описать сообщения, используемые сагой
  • описать граф переходов между состояниями (наследник MassTransitStateMachine<>)
  • зарегистрировать сагу в DI контейнере

Хранимое состояние

Чтобы сага завелась, класс состояния саги должен реализовывать интерфейс SagaStateMachineInstance. Название этого и других интерфейса в MassTransit не содержит префикса I по историческим причинам.

 1public class PurchaseState : SagaStateMachineInstance
 2{
 3    public Guid CorrelationId { get; set; }
 4
 5    public int RowNumber { get; set; }
 6    public int SeatNumber { get; set; }
 7    public decimal Amount { get; set; }
 8
 9    public string CurrentState { get; set; }
10}

Самое важное свойство здесь это CorrelationId, которое является ключом для адресации состояния саги. Любое обрабатываемое сагой сообщение сопоставляется с конкретным экземпляром при помощи этого ключа. Кроме того, класс содержит поля, переданные во входной команде, а также поле Amount (стоимость) билета. Все эти поля сохраняются и в микросервисах, где эти данные обрабатываются (Seat, Row в Booking Service, Amount в PaymentService), но для удобства отладки и мониторинга удобно сохранять это и в состоянии саги. Кроме того, некоторые поля отправляются в свой сервис не сразу, а значит должны где-то быть сохранены.

Команды и события

Чтобы сага работала, по выбранному транспорту (шине) должны циркулировать сообщения, которые приводят сагу в действие. Их условно можно разделить на две группы:

  • Команды: сообщения, которые приходят на вход микросервиса и принуждают его к выполнению определенного действия
  • События: сообщения, которые информируют остальных участников системе о том, что в результате работы микросервиса что-то произошло, например успешно забронировано место или генерация документа прошла с ошибкой. В этом демо Orchestrator порождает команды для остальных микросервисов и слушает их события в своей шине.
 1// Commands
 2public record PurchaseTicketCommand(Guid OrderId, int RowNumber, int SeatNumber);
 3public record ReserveSeatCommand(Guid OrderId, int RowNumber, int SeatNumber);
 4public record ReleaseSeatCommand(Guid OrderId);
 5public record ProcessPaymentCommand(Guid OrderId, decimal Amount);
 6public record RefundPaymentCommand(Guid OrderId);
 7public record GenerateTicketCommand(Guid OrderId);
 8
 9// Events
10public record SeatReserved(Guid OrderId, int RowNumber, int SeatNumber);
11public record SeatReservationFailed(Guid OrderId);
12
13public record PaymentProcessed(Guid OrderId);
14public record PaymentFailed(Guid OrderId);
15
16public record TicketGenerated(Guid OrderId);
17public record TicketGenerationFailed(Guid OrderId);

Команда PurchaseTicketCommand приходит на вход самому Orchestrator, остальные команды он отправляет сам нижележащим микросервисам.

Эти сообщения должны быть доступны всем микросервисам, участвующим в саге, поэтому они должны быть в отдельном проекте и могут быть оформлены в виде NuGet пакета.

Машина состояний

Самая важная часть логики описывается в классе, наследуемом от MassTransitStateMachine<TState>, по сути Крис, автор библиотеки, реализовал свой собственный DSL для описания состояний и переходов и выделил его в отдельный NuGet пакет Automatonymous, а после применил его в проекте MassTransit.

 1public class PurchaseStateMachine : MassTransitStateMachine<PurchaseState>
 2{
 3    // допустимые состояния саги
 4    public State ReservationRequested { get; private set; }
 5    public State Reserved { get; private set; }
 6    public State Paid { get; private set; }
 7    public State TicketGenerated { get; private set; }
 8    public State Failed { get; private set; }
 9
10    //на какие события сага реагирует
11    public Event<PurchaseTicketCommand> ReservationRequestedEvent { get; private set; }
12    public Event<SeatReserved> SeatReservedEvent { get; private set; }
13    public Event<SeatReservationFailed> SeatReservationFailedEvent { get; private set; }
14    public Event<PaymentProcessed> PaymentProcessedEvent { get; private set; }
15    public Event<PaymentFailed> PaymentFailedEvent { get; private set; }
16    public Event<TicketGenerated> TicketGeneratedEvent { get; private set; }
17    public Event<TicketGenerationFailed> TicketGenerationFailedEvent { get; private set; }
18
19    public PurchaseStateMachine()
20    {
21        //указывает, какое поле хранит текущее состояние саги
22        InstanceState(x => x.CurrentState);
23
24        //описывает, как сага может быть создана
25        Initially(
26            //по событию "запрошено бронирование места"
27            When(ReservationRequestedEvent)
28                .Then(ctx =>
29                {
30                    //задает начальные значения для полей состояния саги
31                    ctx.Saga.RowNumber = ctx.Message.RowNumber;
32                    ctx.Saga.SeatNumber = ctx.Message.SeatNumber;
33                    ctx.Saga.Amount = Random.Shared.Next(100, 500);
34                })
35                //пересылает сообщение одному из сервисов на выполнение
36                .Send(new Uri("queue:booking-service"), ctx =>
37                    new ReserveSeatCommand(ctx.Saga.CorrelationId, ctx.Message.RowNumber, ctx.Message.SeatNumber))
38                //переводит сагу в новое состояние
39                .TransitionTo(ReservationRequested)
40        );
41
42        During(ReservationRequested,
43            When(SeatReservedEvent)
44                .Send(new Uri("queue:payment-service"), ctx =>
45                    new ProcessPaymentCommand(ctx.Saga.CorrelationId, ctx.Saga.Amount))
46                .TransitionTo(Reserved)
47        );
48
49        //описывает переход из конкретного состояния
50        During(ReservationRequested,
51            When(SeatReservationFailedEvent)
52                .Then(ctx =>
53                {
54                    Console.WriteLine($"Seat reservation failed for Order {ctx.Message.OrderId}");
55                })
56                .TransitionTo(Failed)
57                .Finalize()
58        );
59
60        //сокращено для читаемости. Полный код в репозитории
61    }
62}

Регистрация саги

Чтобы вся механика саги заработала, надо ее зарегистрировать и сконфигурировать. В этом примере сага работает по шине RabbitMQ и сохраняет состояние в PostgreSQL.

 1services.AddMassTransit(x =>
 2{
 3    //настраивает автоматическое именование очередей RabbitMQ в стиле my-favorite-service
 4    x.SetKebabCaseEndpointNameFormatter();
 5
 6    //регистрирует собственно state machine
 7    x.AddSagaStateMachine<PurchaseStateMachine, PurchaseState>()
 8    //определяет хранение состояния при помощи EF Core в PostgreSQL
 9    .EntityFrameworkRepository(r =>
10        {
11            r.ConcurrencyMode = ConcurrencyMode.Optimistic;
12
13            r.AddDbContext<DbContext, TicketDbContext>((provider, builder) =>
14            {
15                builder.UseNpgsql(hostContext.Configuration.GetConnectionString("DefaultConnection"));
16            });
17        });
18
19    //определяет RabbitMQ как транспорт для саги
20    x.UsingRabbitMq((context, cfg) =>
21    {
22        cfg.Host("rabbitmq", "/", h =>
23        {
24            h.Username("guest");
25            h.Password("guest");
26        });
27        //задает конкретную очередь как поставщика сообщений для саги
28        cfg.ReceiveEndpoint("ticket-purchase-saga", e =>
29        {
30            e.ConfigureSaga<PurchaseState>(context);
31        });
32    });
33});

Заключение

В заметке описан самый базовый сценарий использования саги в MassTransit. Когда дело доходит до практики, то всплывает масса нюансов, но к счастью, большинство из них покрывается возможностями библиотеки, надо только копнуть документацию поглубже.

Полный код демо выложен в репозитории.