Паттерн Fan-Out/Fan-In с использованием MassTransit
В предыдущем примере реализации саги оркестратор дожидается ответа от каждого сервиса прежде чем отправить команду следующему. Это надежный подход, который позволяет четко контролировать процесс выполнения, в результате все операции выполняются строго последовательно. Однако иногда результаты шага не требуются на следующем, а значит шаги можно запускать параллельно и тем самым сильно сокращать общее время выполнения операции.
Для таких ситуаций в распреденной архитектуре прижился термин fan-out fan-in, то есть поток выполнения сначала разветвляется, а потом вновь соединяется в одну нить.
Применение Fan-Out/Fan-In
Если применить Fan-Out/Fan-In к исходной реализации саги, то все команды и события, пересылаемые между сервисами, остаются прежними, однако нам не требуется отдельное состояние на каждый шаг, выполняемый в вызываемом сервисе, достаточно начального состояния и двух конечных
1public class PurchaseStateMachine : MassTransitStateMachine<PurchaseState>
2{
3 public State ReservationRequested { get; private set; }
4 public State Failed { get; private set; }
5 public State Completed { get; private set; }
6 //...
7}
Мы по-прежнему хотим выполнять компенсационные действия как и в последовательной реализации саги, а значит нам надо запоминать, какие из параллельных операций выполнились и с каким статусом, поэтому в хранимое состояние саги добавляем новые поля:
1public class PurchaseState : SagaStateMachineInstance
2{
3 //...
4 public StepResult PaymentStepResult { get; set; }
5 public StepResult BookingStepResult { get; set; }
6 public StepResult TicketGenerationStepRsult { get; set; }
7 //...
8}
где перечисление StepResult
имеет следующий вид:
1public enum StepResult { Pending, Success, Failed }
Теперь когда оркестратор получает запрос на проведение операции, он отправляет сразу три команды и переходит в следующее состояние:
1Initially(
2 When(ReservationRequestedEvent)
3 .Then(ctx =>
4 {
5 ctx.Saga.RowNumber = ctx.Message.RowNumber;
6 ctx.Saga.SeatNumber = ctx.Message.SeatNumber;
7 ctx.Saga.Amount = Random.Shared.Next(100, 500);
8 })
9 .Send(new Uri("queue:booking-service"), ctx =>
10 new ReserveSeatCommand(ctx.Saga.CorrelationId, ctx.Message.RowNumber, ctx.Message.SeatNumber))
11 .Send(new Uri("queue:payment-service"), ctx =>
12 new ProcessPaymentCommand(ctx.Saga.CorrelationId, ctx.Saga.Amount))
13 .Send(new Uri("queue:document-service"), ctx =>
14 new GenerateTicketCommand(ctx.Saga.CorrelationId))
15 .TransitionTo(ReservationRequested)
16);
Когда же приходят ответы от вызываемых сервисов, то нужно заполнить результат и проверить, все ли сервисы отработали. Если да, то надо перевести сагу в финальное состояние:
1During(ReservationRequested,
2 When(SeatReservedEvent)
3 .Then(ctx => ctx.Saga.BookingStepResult = StepResult.Success)
4 .ThenAsync(CheckIfAllCompletedOrFailed),
5 When(PaymentProcessedEvent)
6 .Then(ctx => ctx.Saga.PaymentStepResult = StepResult.Success)
7 .ThenAsync(CheckIfAllCompletedOrFailed),
8 When(TicketGeneratedEvent)
9 .Then(ctx => ctx.Saga.TicketGenerationStepRsult = StepResult.Success)
10 .ThenAsync(CheckIfAllCompletedOrFailed),
11 When(SeatReservationFailedEvent)
12 .Then(ctx => ctx.Saga.BookingStepResult = StepResult.Failed)
13 .ThenAsync(CheckIfAllCompletedOrFailed),
14 When(PaymentFailedEvent)
15 .Then(ctx => ctx.Saga.PaymentStepResult = StepResult.Failed)
16 .ThenAsync(CheckIfAllCompletedOrFailed),
17 When(TicketGenerationFailedEvent)
18 .Then(ctx => ctx.Saga.TicketGenerationStepRsult = StepResult.Failed)
19 .ThenAsync(CheckIfAllCompletedOrFailed)
20);
Метод CheckIfAllCompletedOrFailed
как раз и занимается проверкой результатов и переводом в финальное состояние:
1private async Task CheckIfAllCompletedOrFailed(BehaviorContext<PurchaseState> context)
2{
3 var saga = context.Saga;
4 if (saga.AllStepsAreCompleted)
5 {
6 if (saga.AllStepsAreSuccessful)
7 {
8 saga.CurrentState = Completed.Name;
9 Console.WriteLine($"Order {saga.CorrelationId} was processed successfully.");
10 }
11 else
12 {
13 await PerformCompensations(context);
14 saga.CurrentState = Failed.Name;
15 Console.WriteLine($"Order {saga.CorrelationId} processing failed.");
16 }
17 }
18}
Компенсацию проводим только там, где сервис отработал и компенсация поддерживается, в нашем случае это PaymentService и BookingService:
1private async Task PerformCompensations(BehaviorContext<PurchaseState> context)
2{
3 var saga = context.Saga;
4
5 if (saga.PaymentStepResult == StepResult.Success)
6 await context.Send(new Uri("queue:payment-service"), new RefundPaymentCommand(saga.CorrelationId));
7
8 if (saga.BookingStepResult == StepResult.Success)
9 await context.Send(new Uri("queue:booking-service"), new ReleaseSeatCommand(saga.CorrelationId));
10}
Все остальное можно оставить неизменным, и этого достаточно, чтобы работа выполнялась параллельно.
Composite Event
К слову MassTransit имеет интересную конструкцию Composite Event
, которая позволяет выполнить действие, когда получены несколько указанных событий. Например:
1CompositeEvent(() => OrderReady, x => x.ReadyEventStatus, SubmitOrder, OrderAccepted);
Этот код генерирует событие OrderReady
в тот момент, когда получены все события из списка: SubmitOrder
, OrderAccepted
. Порядок событий не имеет значения, OrderReady
будет порожден, когда придет последнее из событий. Для хранения информации о том, какие события уже получены, используется поле, указанное вторым параметром - ReadyEventStatus
, это integer поле, которое используется как битовая маска.
Это удобная конструкция, и возникает соблазн использовать ее для реализации Fan-Out/Fan-In, однако этот подход позволяет реагировать только на события об успехе от вызываемых сервисов, а значит выполнить компенсации будет проблематично.
Заключение
Fan-Out/Fan-In - отличная вещь, которая позволяет сократить общее время выполнения саги, запустив некоторые шаги параллельно. Полный код демо в репозитории.