Паттерн Fan-Out/Fan-In с использованием MassTransit

В предыдущем примере реализации саги оркестратор дожидается ответа от каждого сервиса прежде чем отправить команду следующему. Это надежный подход, который позволяет четко контролировать процесс выполнения, в результате все операции выполняются строго последовательно. Однако иногда результаты шага не требуются на следующем, а значит шаги можно запускать параллельно и тем самым сильно сокращать общее время выполнения операции.

Для таких ситуаций в распреденной архитектуре прижился термин fan-out fan-in, то есть поток выполнения сначала разветвляется, а потом вновь соединяется в одну нить.

Применение Fan-Out/Fan-In

Если применить Fan-Out/Fan-In к исходной реализации саги, то все команды и события, пересылаемые между сервисами, остаются прежними, однако нам не требуется отдельное состояние на каждый шаг, выполняемый в вызываемом сервисе, достаточно начального состояния и двух конечных

1public class PurchaseStateMachine : MassTransitStateMachine<PurchaseState>
2{
3    public State ReservationRequested { get; private set; }
4    public State Failed { get; private set; }
5    public State Completed { get; private set; }
6    //...
7}

Мы по-прежнему хотим выполнять компенсационные действия как и в последовательной реализации саги, а значит нам надо запоминать, какие из параллельных операций выполнились и с каким статусом, поэтому в хранимое состояние саги добавляем новые поля:

1public class PurchaseState : SagaStateMachineInstance
2{
3    //...
4    public StepResult PaymentStepResult { get; set; }
5    public StepResult BookingStepResult { get; set; }
6    public StepResult TicketGenerationStepRsult { get; set; }
7    //...
8}

где перечисление StepResult имеет следующий вид:

1public enum StepResult { Pending, Success, Failed }

Теперь когда оркестратор получает запрос на проведение операции, он отправляет сразу три команды и переходит в следующее состояние:

 1Initially(
 2    When(ReservationRequestedEvent)
 3        .Then(ctx =>
 4        {
 5            ctx.Saga.RowNumber = ctx.Message.RowNumber;
 6            ctx.Saga.SeatNumber = ctx.Message.SeatNumber;
 7            ctx.Saga.Amount = Random.Shared.Next(100, 500);
 8        })
 9        .Send(new Uri("queue:booking-service"), ctx =>
10            new ReserveSeatCommand(ctx.Saga.CorrelationId, ctx.Message.RowNumber, ctx.Message.SeatNumber))
11        .Send(new Uri("queue:payment-service"), ctx =>
12            new ProcessPaymentCommand(ctx.Saga.CorrelationId, ctx.Saga.Amount))
13        .Send(new Uri("queue:document-service"), ctx =>
14            new GenerateTicketCommand(ctx.Saga.CorrelationId))
15        .TransitionTo(ReservationRequested)
16);

Когда же приходят ответы от вызываемых сервисов, то нужно заполнить результат и проверить, все ли сервисы отработали. Если да, то надо перевести сагу в финальное состояние:

 1During(ReservationRequested,
 2    When(SeatReservedEvent)
 3        .Then(ctx => ctx.Saga.BookingStepResult = StepResult.Success)
 4        .ThenAsync(CheckIfAllCompletedOrFailed),
 5    When(PaymentProcessedEvent)
 6        .Then(ctx => ctx.Saga.PaymentStepResult = StepResult.Success)
 7        .ThenAsync(CheckIfAllCompletedOrFailed),
 8    When(TicketGeneratedEvent)
 9        .Then(ctx => ctx.Saga.TicketGenerationStepRsult = StepResult.Success)
10        .ThenAsync(CheckIfAllCompletedOrFailed),
11    When(SeatReservationFailedEvent)
12        .Then(ctx => ctx.Saga.BookingStepResult = StepResult.Failed)
13        .ThenAsync(CheckIfAllCompletedOrFailed),
14    When(PaymentFailedEvent)
15        .Then(ctx => ctx.Saga.PaymentStepResult = StepResult.Failed)
16        .ThenAsync(CheckIfAllCompletedOrFailed),
17    When(TicketGenerationFailedEvent)
18        .Then(ctx => ctx.Saga.TicketGenerationStepRsult = StepResult.Failed)
19        .ThenAsync(CheckIfAllCompletedOrFailed)
20);

Метод CheckIfAllCompletedOrFailed как раз и занимается проверкой результатов и переводом в финальное состояние:

 1private async Task CheckIfAllCompletedOrFailed(BehaviorContext<PurchaseState> context)
 2{
 3    var saga = context.Saga;
 4    if (saga.AllStepsAreCompleted)
 5    {
 6        if (saga.AllStepsAreSuccessful)
 7        {
 8            saga.CurrentState = Completed.Name;
 9            Console.WriteLine($"Order {saga.CorrelationId} was processed successfully.");
10        }
11        else
12        {
13            await PerformCompensations(context);
14            saga.CurrentState = Failed.Name;
15            Console.WriteLine($"Order {saga.CorrelationId} processing failed.");
16        }
17    }
18}

Компенсацию проводим только там, где сервис отработал и компенсация поддерживается, в нашем случае это PaymentService и BookingService:

 1private async Task PerformCompensations(BehaviorContext<PurchaseState> context)
 2{
 3    var saga = context.Saga;
 4
 5    if (saga.PaymentStepResult == StepResult.Success)
 6        await context.Send(new Uri("queue:payment-service"), new RefundPaymentCommand(saga.CorrelationId));
 7
 8    if (saga.BookingStepResult == StepResult.Success)
 9        await context.Send(new Uri("queue:booking-service"), new ReleaseSeatCommand(saga.CorrelationId));
10}

Все остальное можно оставить неизменным, и этого достаточно, чтобы работа выполнялась параллельно.

Composite Event

К слову MassTransit имеет интересную конструкцию Composite Event, которая позволяет выполнить действие, когда получены несколько указанных событий. Например:

1CompositeEvent(() => OrderReady, x => x.ReadyEventStatus, SubmitOrder, OrderAccepted);

Этот код генерирует событие OrderReady в тот момент, когда получены все события из списка: SubmitOrder, OrderAccepted. Порядок событий не имеет значения, OrderReady будет порожден, когда придет последнее из событий. Для хранения информации о том, какие события уже получены, используется поле, указанное вторым параметром - ReadyEventStatus, это integer поле, которое используется как битовая маска.

Это удобная конструкция, и возникает соблазн использовать ее для реализации Fan-Out/Fan-In, однако этот подход позволяет реагировать только на события об успехе от вызываемых сервисов, а значит выполнить компенсации будет проблематично.

Заключение

Fan-Out/Fan-In - отличная вещь, которая позволяет сократить общее время выполнения саги, запустив некоторые шаги параллельно. Полный код демо в репозитории.