Паттерн Saga c использованием MassTransit
В этой статье кратко о том, что такое Saga, и как с ней работать в MassTransit на платформе .NET.
Преимущества микросервисов оборачиваются и некоторой сложностью: поскольку каждый микросервис имеет свою базу данных, то у нас нет возможности атомарно выполнить транзакцию, касающуюся нескольких микросервисов. На помощь приходит паттерн Saga, которая позволяет разбить долго живущую транзакцию (long-living transaction, LLT) на несколько простых шагов. Каждый из шагов выполняется атомарно и может выполняться в разных сервисах. Сага считается завершенной, если все шаги завершились успешно. Если один из шагов не был успешен, то для ранее выполненных шагов возможно выполнить компенсационные транзакции.
Пример: покупка билета на концерт.
Пусть мы имеем следующие микросервисы:
- Booking Service - резервирование места и отмена бронирования
- Payment Service - платежи
- Document Service - формирование различных документов, в том числе и билетов
Чтобы купить билет в такой системе нам потребуется скоординировать действия всех трех сервисов, при этом отслеживать прогресс выполнения и в случае необходимости выполнять компенсации. Вот как это может выглядеть с применением паттерна Saga:
Для реализации подобной схемы с нуля требуется написать изрядное количество кода, при этом учитывая массу нюансов. Для платформы .NET не так много готовых реализаций саг, в отличие например от Java. Но к счастью есть MassTransit, который помогает избежать написания рутинного кода и, более того, имеет механизмы для решения типовых проблем в такой схеме взаимодействия.
Для реализации саги с MassTransit потребуется сделать следующее:
- описать, что будет представлять собой хранимое состояние саги (реализация интерфейса SagaStateMachineInstance)
- описать сообщения, используемые сагой
- описать граф переходов между состояниями (наследник MassTransitStateMachine<>)
- зарегистрировать сагу в DI контейнере
Хранимое состояние
Чтобы сага завелась, класс состояния саги должен реализовывать интерфейс SagaStateMachineInstance
. Название этого и других интерфейса в MassTransit не содержит префикса I
по историческим причинам.
1public class PurchaseState : SagaStateMachineInstance
2{
3 public Guid CorrelationId { get; set; }
4
5 public int RowNumber { get; set; }
6 public int SeatNumber { get; set; }
7 public decimal Amount { get; set; }
8
9 public string CurrentState { get; set; }
10}
Самое важное свойство здесь это CorrelationId
, которое является ключом для адресации состояния саги. Любое обрабатываемое сагой сообщение сопоставляется с конкретным экземпляром при помощи этого ключа. Кроме того, класс содержит поля, переданные во входной команде, а также поле Amount (стоимость) билета. Все эти поля сохраняются и в микросервисах, где эти данные обрабатываются (Seat, Row в Booking Service, Amount в PaymentService), но для удобства отладки и мониторинга удобно сохранять это и в состоянии саги. Кроме того, некоторые поля отправляются в свой сервис не сразу, а значит должны где-то быть сохранены.
Команды и события
Чтобы сага работала, по выбранному транспорту (шине) должны циркулировать сообщения, которые приводят сагу в действие. Их условно можно разделить на две группы:
- Команды: сообщения, которые приходят на вход микросервиса и принуждают его к выполнению определенного действия
- События: сообщения, которые информируют остальных участников системе о том, что в результате работы микросервиса что-то произошло, например успешно забронировано место или генерация документа прошла с ошибкой. В этом демо Orchestrator порождает команды для остальных микросервисов и слушает их события в своей шине.
1// Commands
2public record PurchaseTicketCommand(Guid OrderId, int RowNumber, int SeatNumber);
3public record ReserveSeatCommand(Guid OrderId, int RowNumber, int SeatNumber);
4public record ReleaseSeatCommand(Guid OrderId);
5public record ProcessPaymentCommand(Guid OrderId, decimal Amount);
6public record RefundPaymentCommand(Guid OrderId);
7public record GenerateTicketCommand(Guid OrderId);
8
9// Events
10public record SeatReserved(Guid OrderId, int RowNumber, int SeatNumber);
11public record SeatReservationFailed(Guid OrderId);
12
13public record PaymentProcessed(Guid OrderId);
14public record PaymentFailed(Guid OrderId);
15
16public record TicketGenerated(Guid OrderId);
17public record TicketGenerationFailed(Guid OrderId);
Команда PurchaseTicketCommand
приходит на вход самому Orchestrator, остальные команды он отправляет сам нижележащим микросервисам.
Эти сообщения должны быть доступны всем микросервисам, участвующим в саге, поэтому они должны быть в отдельном проекте и могут быть оформлены в виде NuGet пакета.
Машина состояний
Самая важная часть логики описывается в классе, наследуемом от MassTransitStateMachine<TState>
, по сути Крис, автор библиотеки, реализовал свой собственный DSL для описания состояний и переходов и выделил его в отдельный NuGet пакет Automatonymous
, а после применил его в проекте MassTransit.
1public class PurchaseStateMachine : MassTransitStateMachine<PurchaseState>
2{
3 // допустимые состояния саги
4 public State ReservationRequested { get; private set; }
5 public State Reserved { get; private set; }
6 public State Paid { get; private set; }
7 public State TicketGenerated { get; private set; }
8 public State Failed { get; private set; }
9
10 //на какие события сага реагирует
11 public Event<PurchaseTicketCommand> ReservationRequestedEvent { get; private set; }
12 public Event<SeatReserved> SeatReservedEvent { get; private set; }
13 public Event<SeatReservationFailed> SeatReservationFailedEvent { get; private set; }
14 public Event<PaymentProcessed> PaymentProcessedEvent { get; private set; }
15 public Event<PaymentFailed> PaymentFailedEvent { get; private set; }
16 public Event<TicketGenerated> TicketGeneratedEvent { get; private set; }
17 public Event<TicketGenerationFailed> TicketGenerationFailedEvent { get; private set; }
18
19 public PurchaseStateMachine()
20 {
21 //указывает, какое поле хранит текущее состояние саги
22 InstanceState(x => x.CurrentState);
23
24 //описывает, как сага может быть создана
25 Initially(
26 //по событию "запрошено бронирование места"
27 When(ReservationRequestedEvent)
28 .Then(ctx =>
29 {
30 //задает начальные значения для полей состояния саги
31 ctx.Saga.RowNumber = ctx.Message.RowNumber;
32 ctx.Saga.SeatNumber = ctx.Message.SeatNumber;
33 ctx.Saga.Amount = Random.Shared.Next(100, 500);
34 })
35 //пересылает сообщение одному из сервисов на выполнение
36 .Send(new Uri("queue:booking-service"), ctx =>
37 new ReserveSeatCommand(ctx.Saga.CorrelationId, ctx.Message.RowNumber, ctx.Message.SeatNumber))
38 //переводит сагу в новое состояние
39 .TransitionTo(ReservationRequested)
40 );
41
42 During(ReservationRequested,
43 When(SeatReservedEvent)
44 .Send(new Uri("queue:payment-service"), ctx =>
45 new ProcessPaymentCommand(ctx.Saga.CorrelationId, ctx.Saga.Amount))
46 .TransitionTo(Reserved)
47 );
48
49 //описывает переход из конкретного состояния
50 During(ReservationRequested,
51 When(SeatReservationFailedEvent)
52 .Then(ctx =>
53 {
54 Console.WriteLine($"Seat reservation failed for Order {ctx.Message.OrderId}");
55 })
56 .TransitionTo(Failed)
57 .Finalize()
58 );
59
60 //сокращено для читаемости. Полный код в репозитории
61 }
62}
Регистрация саги
Чтобы вся механика саги заработала, надо ее зарегистрировать и сконфигурировать. В этом примере сага работает по шине RabbitMQ и сохраняет состояние в PostgreSQL.
1services.AddMassTransit(x =>
2{
3 //настраивает автоматическое именование очередей RabbitMQ в стиле my-favorite-service
4 x.SetKebabCaseEndpointNameFormatter();
5
6 //регистрирует собственно state machine
7 x.AddSagaStateMachine<PurchaseStateMachine, PurchaseState>()
8 //определяет хранение состояния при помощи EF Core в PostgreSQL
9 .EntityFrameworkRepository(r =>
10 {
11 r.ConcurrencyMode = ConcurrencyMode.Optimistic;
12
13 r.AddDbContext<DbContext, TicketDbContext>((provider, builder) =>
14 {
15 builder.UseNpgsql(hostContext.Configuration.GetConnectionString("DefaultConnection"));
16 });
17 });
18
19 //определяет RabbitMQ как транспорт для саги
20 x.UsingRabbitMq((context, cfg) =>
21 {
22 cfg.Host("rabbitmq", "/", h =>
23 {
24 h.Username("guest");
25 h.Password("guest");
26 });
27 //задает конкретную очередь как поставщика сообщений для саги
28 cfg.ReceiveEndpoint("ticket-purchase-saga", e =>
29 {
30 e.ConfigureSaga<PurchaseState>(context);
31 });
32 });
33});
Заключение
В заметке описан самый базовый сценарий использования саги в MassTransit. Когда дело доходит до практики, то всплывает масса нюансов, но к счастью, большинство из них покрывается возможностями библиотеки, надо только копнуть документацию поглубже.
Полный код демо выложен в репозитории.