마이크로서비스 아키텍처의 장기 실행 작업

in #kr-dev2 years ago

체크아웃 프로세스의 마이크로서비스 구현

장바구니 모놀리식 애플리케이션을 다시 설계하고 비즈니스 기능을 기반으로 여러 마이크로서비스로 분해해 보겠습니다 .

그 결과 이제 체크아웃 프로세스를 지원하는 5개의 마이크로서비스가 있습니다.

  • ShoppingCartApi/checkout끝점을 UI 클라이언트에 노출하고 체크아웃 프로세스를 트리거합니다 .
  • StockValidatorService– 모든 품목의 재고 가용성을 확인합니다.
  • TaxCalculatorService– 세금 기반 온라인 항목 및 고객 주소를 계산합니다.
  • PaymentProcessingService– 신용 카드 세부 정보, 항목 및 계산된 세금을 기반으로 결제를 처리합니다.
  • ReceptGeneratorService– 구매 영수증을 생성하여 저장합니다. 또한 고객과의 이메일 커뮤니케이션을 담당합니다.

이 데모의 경우 마이크로 서비스에는 비즈니스 논리가 포함되어 있지 않습니다. 몇 초가 걸릴 수 있는 프로세스 흐름을 시뮬레이션합니다.

마이크로서비스가 장기 실행 작업을 처리하는 방법

일반적인 시나리오에서 는 ShoppingCartApiHTTP를 통한 REST와 같은 동기 패턴을 사용하여 통신했을 것 StockValidatorService입니다. ReceptGeneratorService그러나 이 경우 최종 사용자는 영수증을 생성하거나 오류 응답을 반환할 때까지 응답을 기다려야 했습니다 . 이로 인해 엔드포인트가 요청을 완료하는 데 6초 이상 걸렸을 것입니다. 따라서 이것은 사용자 경험 관점에서 직관적인 디자인이 아닙니다.

여기에서 우리의 마이크로서비스는 장기 실행 체크아웃 프로세스에 RabbitMq 대기열을 활용하여 마이크로서비스 간에 비동기 통신 패턴을 사용하여 통신함으로써 이 문제를 해결합니다.

마이크로서비스 아키텍처

데모 애플리케이션에서 /checkout엔드포인트는 의 상태 코드를 반환합니다 OK(200). ReceptGeneratorService주문 처리 완료 후 최종 사용자에게 주문 상태가 포함된 이메일을 보냅니다 . 그러나 처리가 끝날 때 결과를 사용할 수 있는 다른 끝점 대신 /checkoutHTTP 상태 코드를 반환하도록 끝점을 설계할 수 있습니다. Accepted(202)``OK(200)그런 다음 클라이언트 응용 프로그램은 응답을 사용할 수 있을 때까지 이 끝점을 호출하도록 선택할 수 있습니다.

결제 프로세스 시작

에는 이미 소개한 구현 과 유사한 엔드포인트 ShoppingCartApi가 있습니다. 체크 아웃 프로세스와 관련된 장기 실행 작업을 시작합니다./checkout

이제 CheckoutProcessor클래스가 메서드를 구현하는 ProcessCheckoutAsync()방법을 살펴보겠습니다.

public class CheckoutProcessor : ICheckoutProcessor
{
    public BlockingCollection<CheckoutItem> CheckoutQueue { get; }

    public CheckoutProcessor()
    {
        CheckoutQueue = new BlockingCollection<CheckoutItem>(new ConcurrentQueue<CheckoutItem>());
    }

    public Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request)
    {
        var response = new CheckoutResponse
        {
            OrderId = Guid.NewGuid(),
            OrderStatus = OrderStatus.Inprogress,
            Message = "Your order is in progress," 
                         + "you will receive an email with all details."
        };

        var item = new CheckoutItem
        {
           OrderId = response.OrderId,
           Request = request
        };

        CheckoutQueue.Add(item);

        return Task.FromResult(response);
    }
}

모놀리식 구현과 마찬가지로 CheckoutResponse주문 상태 Inprogress, 관련 메시지 및 새 주문 ID가 있는 클래스의 인스턴스를 만듭니다. 최종 사용자는 이 응답을 받습니다.

최종 사용자에게 응답을 반환하기 전에 코드 CheckoutItem는 생성된 주문 ID 및 기타 요청 속성을 사용하여 클래스의 인스턴스를 만듭니다. 이제 차단 컬렉션이 이 인스턴스를 대기열에 넣습니다. 이 클래스 BlockingCollection<CheckoutItem>의 공용 속성으로 노출됩니다 .CheckoutProcessor

CheckoutItem인스턴스는 다른 마이크로서비스와의 통신을 위한 메시지 본문을 나타냅니다 .

public class CheckoutItem
{
   public Guid OrderId { get; set; }

   public CheckoutRequest Request { get; set; }
}

CheckoutBackroundWorker.NET의 클래스에서 상속하고 추가 BackgroundService처리를 위해 메시지를 RabbitMq 메시지 대기열로 보냅니다.

Code Maze 팀에 합류하여 더 멋진 .NET/C# 콘텐츠를 제작하고 보수를 받고 싶습니까? >> 가입하세요! <<

public CheckoutBackgroundWorker(IMessageProducer<CheckoutItem> producer,
                                ICheckoutProcessor checkoutProcessor)
{
    _producer = producer;
    _checkoutProcessor = checkoutProcessor;
           
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await Task.Factory.StartNew(() =>  Process(stoppingToken), TaskCreationOptions.LongRunning);  
}

private void Process(CancellationToken stoppingToken)
{
    foreach (var item in _checkoutProcessor.CheckoutQueue.GetConsumingEnumerable(stoppingToken))
    {
        _producer.SendMessage(item);
    }
}

여기서는 클래스 BlockingCollection.GetConsumingEnumerableCheckoutQueue속성에 the를 사용합니다. 대기열의 각 항목은 클래스 CheckoutProcessor를 사용하여 메시지 대기열에 게시됩니다 .CheckoutItemProducer

끝점 BlockingCollection호출 시 메시지를 직접 게시하는 대신 여기에서 a가 사용되는 이유가 궁금할 수 있습니다 . /checkout대기열에 메시지를 게시하면 호출자에 대한 응답이 차단됩니다. 더 정확하게 말하면 최종 사용자는 코드가 메시지를 대기열에 게시할 때까지 응답을 받지 못합니다. 최종 사용자가 응답을 받는 동안 메시지 게시 작업이 병렬로 발생할 수 있으므로 이 시나리오에서는 이러한 차단이 필요하지 않습니다.

를 사용하여 동일한 작업을 수행할 수도 있습니다 System.Threading.Channel.

클래스 는 추상 클래스와 인터페이스 를 CheckoutItemProducer구현합니다 .ProducerBase``IMessageProducer

마이크로서비스 간 비동기 통신

이 비즈니스 사례는 재고 확인, 지불 처리 등과 같은 체크아웃 프로세스가 비동기식으로 실행될 수 있음을 나타냅니다. 따라서 우리는 RabbitMq 메시지 브로커를 사용하여 마이크로 서비스 간에 비동기식 통신 모드를 사용할 것입니다.

ShoppingCart 웹 API

엔드포인트를 호출하면 /checkout체크아웃 프로세스가 트리거됩니다. 다른 마이크로 서비스와의 추가 통신은 메시지 대기열을 사용하여 이루어집니다.

public class CheckoutItemProducer : ProducerBase<CheckoutItem>
{
    public CheckoutItemProducer(ConnectionFactory connectionFactory,
                                ILogger<RabbitMqClientBase> logger,
                                ILogger<ProducerBase<CheckoutItem>> producerBaseLogger) 
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "stock-validator";
}  

먼저 클래스 를 사용하여 큐 에 메시지를 ShoppingCartApi보냅니다 .CheckoutItem``stock-validator``CheckoutItemProducer

주식 검사기 서비스 기능

는 대기열 StockValidatorService에서 메시지를 소비하고 stock-validator재고 유효성 검사 논리를 실행합니다.

Code Maze 팀에 합류하여 더 멋진 .NET/C# 콘텐츠를 제작하고 보수를 받고 싶습니까? >> 가입하세요! <<

public class StockValidatorConsumer : ConsumerBase, IHostedService
{
    private readonly IMessageProducer<Failure> _failureProducer;
    private readonly IMessageProducer<CheckoutItem> _successProducer;
    private readonly IValidator _validator;
    private readonly ILogger<StockValidatorConsumer> _logger;

    public StockValidatorConsumer(ConnectionFactory connectionFactory, 
                                  ILogger<RabbitMqClientBase> rabbitMqClientBaseLogger,
                                  ILogger<ConsumerBase> consumerBaseLogger,
                                  IMessageProducer<Failure> failureProducer,
                                  IMessageProducer<CheckoutItem> successProducer,
                                  IValidator validator,
                                  ILogger<StockValidatorConsumer> logger)
        : base(connectionFactory, rabbitMqClientBaseLogger, consumerBaseLogger)
    {
        _failureProducer = failureProducer;
        _successProducer = successProducer;
        _validator = validator;
        _logger = logger;
    }

    protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
    {
        try
        {
            var body = Encoding.UTF8.GetString(@event.Body.ToArray());
            var checkoutItem = JsonConvert.DeserializeObject<CheckoutItem>(body);

            if (await _validator.ValidateAsync(checkoutItem.Request.LineItems))
            {
                _successProducer.SendMessage(checkoutItem);
                return;
            }

            var failureMessage = new Failure
            {
                CustomerId = checkoutItem.Request.CustomerId,
                OrderId = checkoutItem.OrderId,
                Message = "Item not available in stock",
                Source = typeof(Program).Assembly.GetName().Name ?? string.Empty
            };

            _failureProducer.SendMessage(failureMessage);
        }
        catch (Exception ex)
        {
            _logger.LogCritical(ex, "Error while retrieving message from queue.");
        }
        finally
        {
            Channel.BasicAck(@event.DeliveryTag, false);
        }
    }

    protected override string QueueName => "stock-validator";
    ...
    ...
        
}

클래스는 에서 StockValidatorConsumer상속되고 인터페이스 ConsumerBase에 대한 구체적인 구현으로 등록됩니다 .IHostedService

builder.Services.AddHostedService<StockValidatorConsumer>();

builder.Services.AddHostedService<StockValidatorConsumer>();

이렇게 하면 애플리케이션 이 애플리케이션 시작 시 생성자 코드를 StockValidatorService실행합니다 . ConsumerBase생성자 코드에는 메시지 구독 논리가 포함되어 있습니다.

이 데모의 모든 마이크로서비스는 ConsumerBase.

따라서 성공적인 재고 검증의 경우 대기열 에 CheckoutItem메시지를 게시합니다.stock-validation-successful

public class StockValidationSuccessfulProducer : ProducerBase<CheckoutItem>
{        
    public StockValidationSuccessfulProducer(ConnectionFactory connectionFactory,
                                             ILogger<RabbitMqClientBase> logger,
                                             ILogger<ProducerBase<CheckoutItem>> producerBaseLogger) 
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "stock-validation-successful";
}

는 추상 클래스를 StockValidationSuccessfulProducer상속합니다 . ProducerBase이 클래스에는 메시지를 게시하기 위한 논리가 포함되어 있습니다. 이 데모의 모든 마이크로서비스는 ProducerBase클래스를 구현하여 메시지 게시에 대해 동일한 전략을 사용합니다.

그러나 실패하면 Failure메시지를 stock-validation-failure대기열에 게시합니다.

public class Failure
{
    public Guid OrderId { get; set; }
        
    public Guid CustomerId { get; set; }

    public string Message { get; set; }

    public string Source { get; set; }
}

public class StockValidationFailureProducer : ProducerBase<Failure>
{
    public StockValidationFailureProducer(ConnectionFactory connectionFactory, 
                                          ILogger<RabbitMqClientBase> logger, 
                                          ILogger<ProducerBase<Failure>> producerBaseLogger)
            : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "stock-validation-failure";
}

클래스 는 StockValidationFailureProducer실패 메시지 게시를 담당합니다.

Code Maze 팀에 합류하여 더 멋진 .NET/C# 콘텐츠를 제작하고 보수를 받고 싶습니까? >> 가입하세요! <<

세금 계산기 서비스 기능

에는 대기열 메시지 의 소비자 TaxCalculatorService인 백그라운드 작업자 클래스 가 포함되어 있습니다.StockValidationSuccessConsumer``stock-validation-successful

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
        var body = Encoding.UTF8.GetString(@event.Body.ToArray());
        var checkoutItem = JsonConvert.DeserializeObject<CheckoutItem>(body);

        var tax = await _calculator.CalculateTaxAsync(checkoutItem.Request.CustomerId, 
                                                      checkoutItem.Request.LineItems);

        var taxMessage = new Tax
        {
            OrderId = checkoutItem.OrderI
            Request = checkoutItem.Request,
            TaxAmount = tax
        };

        _taxProducer.SendMessage(taxMessage);
               
     }
     catch (Exception ex)
     {
        _logger.LogCritical(ex, "Error while retrieving message from queue.");
     }
     finally
     {
        Channel.BasicAck(@event.DeliveryTag, false);
     }
}
 
protected override string QueueName => "stock-validation-successful";

메시지를 받으면 세금을 계산하고 Tax세금 금액이 포함된 메시지를 tax대기열에 게시합니다.

public class Tax : CheckoutItem
{
   public int TaxAmount { get; set; }
}

public class TaxProducer : ProducerBase<Tax>
{
    public TaxProducer(ConnectionFactory connectionFactory, 
                       ILogger<RabbitMqClientBase> logger, 
                       ILogger<ProducerBase<Tax>> producerBaseLogger) 
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "tax";       
}

여기에서 Tax메시지 클래스는 에서 파생되며 CheckoutItem추가 속성을 갖습니다 TaxAmount. TaxProducer클래스는 이 메시지를 대기열에 게시하는 일을 담당합니다 .

지불 처리 서비스 기능

백그라운드 작업자 클래스 를 사용하여 큐 PaymentProcessingService에서 메시지를 수신합니다 . 메시지가 도착하면 메시지를 소비하고 총 금액을 계산한 다음 지불 처리를 시도합니다.tax``TaxConsumer

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
        var body = Encoding.UTF8.GetString(@event.Body.ToArray());
        var taxMessage = JsonConvert.DeserializeObject<Tax>(body);
               
        var amount = taxMessage.Request.LineItems.Sum(li => li.Quantity * li.Price) + taxMessage.TaxAmount;
                
        if (await _paymentProcessor.ProcessAsync(taxMessage.Request.CustomerId, 
                                                 taxMessage.Request.PaymentInfo, 
                                                 amount))
        {
            var paymentSuccessMessage = new PaymentSuccess
            {
                 OrderId = taxMessage.OrderId,
                 Request = taxMessage.Request,
                 Amount = amount
            };
            _successProducer.SendMessage(paymentSuccessMessage);
            return;
         }

         var failureMessage = new Failure
         {
            CustomerId = taxMessage.Request.CustomerId,
            OrderId = taxMessage.OrderId,
            Message = "Payment failure",
            Source = typeof(Program).Assembly.GetName().Name ?? string.Empty
         };

         _failureProducer.SendMessage(failureMessage);
     }
     catch (Exception ex)
     {
         _logger.LogCritical(ex, "Error while retrieving message from queue.");
     }
     finally
     {
          Channel.BasicAck(@event.DeliveryTag, false);
     }
}
 
protected override string QueueName => "tax";

PaymentSuccess이제 결제가 성공적으로 처리되면 금액이 포함된 메시지를 payment-successful대기열 에 게시합니다 .

public class PaymentSuccess : CheckoutItem
{
    public int Amount { get; set; }
}

public class PaymentSuccessfulProducer : ProducerBase<PaymentSuccess>
{       
    public PaymentSuccessfulProducer(ConnectionFactory connectionFactory,
                                     ILogger<RabbitMqClientBase> logger,
                                     ILogger<ProducerBase<PaymentSuccess>> producerBaseLogger
            : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "payment-successful";
}

메시지 클래스 는 PaymentSuccess에서 상속되며 CheckoutItem추가 속성을 갖습니다 Amount. PaymentSuccessfulProducer할당된 대기열에 결제 성공 메시지를 게시합니다 .

그러나 결제 실패 시 Failure메시지를 payment-failure대기열에 게시합니다.

public class PaymentFailureProducer : ProducerBase<Failure>
{
    public PaymentFailureProducer(ConnectionFactory connectionFactory,
                                  ILogger<RabbitMqClientBase> logger,
                                  ILogger<ProducerBase<Failure>> producerBaseLogger)
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "payment-failure";
}

클래스 는 PaymentFailureProducer실패 메시지를 payment-failure대기열에 게시합니다.

영수증 생성기 서비스 기능

ReceiptGeneratorService개의 대기열에 대한 소비자입니다. 그들은 stock-validation-failure, payment-failure, 및 payment-successful:

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
        var body = Encoding.UTF8.GetString(@event.Body.ToArray());
        var failureMessage = JsonConvert.DeserializeObject<Failure>(body);

        await _generator.ProcessFailuresAsync(failureMessage.CustomerId, 
                                              failureMessage.OrderId, 
                                              failureMessage.Message);               
    }
    catch (Exception ex)
    {
       _logger.LogCritical(ex, "Error while retrieving message from queue.");
    }
    finally
    {
       Channel.BasicAck(@event.DeliveryTag, false);
    }
}

따라서 이것은 PaymentFailureConsumerStockValidationFailureConsumer클래스에 의해 구현된 실패 시나리오에 대한 공통 코드입니다. 이 경우 클래스의 ProcessFailuresAsync()메서드 ReceiptGenerator가 실행됩니다. 이 방법은 실패 이유를 저장하고 사용자에게 주문 상태를 알리는 이메일을 보내는 것을 시뮬레이트합니다.

PaymentSuccessfulConsumer이제 클래스 를 살펴보겠습니다 .

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
         var body = Encoding.UTF8.GetString(@event.Body.ToArray());
         var successMessage = JsonConvert.DeserializeObject<PaymentSuccess>(body);

         await _generator.GenerateAsync(successMessage.Request.CustomerId, 
                                        successMessage.OrderId, 
                                        successMessage.Amount);

    }
    catch (Exception ex)
    {
        _logger.LogCritical(ex, "Error while retrieving message from queue.");
    }
    finally
    {
        Channel.BasicAck(@event.DeliveryTag, false);
    }
}

protected override string QueueName => "payment-successful";

마지막으로 결제가 성공 하면 클래스 GenerateAsync()에서 메서드를 실행합니다. ReciptGenerator이 방법은 영수증 생성 및 저장을 시뮬레이션하고 필요한 세부 정보가 포함된 이메일을 사용자에게 보냅니다.

마이크로 서비스의 생산자 및 소비자 클래스는 대부분의 RabbitMq 상호 작용을 추상화하는 각각의 기본 클래스에서 파생됩니다. 이제 자세히 살펴보겠습니다.

비동기 메시지 처리에 RabbitMq 사용

RabbitMQ.Client우리는 RabbitMq와의 인터페이스를 위해 NuGet 패키지를 사용하고 있습니다.

PM> Install-Package RabbitMQ.Client -Version 6.2.4

PM> Install-Package RabbitMQ.Client -Version 6.2.4

모든 마이크로 서비스는 RabbitMq 생산자 및 소비자 코드를 구현합니다. 코드 중복을 피하기 위해 몇 가지 기본 클래스를 만들었습니다.

Code Maze 팀에 합류하여 더 멋진 .NET/C# 콘텐츠를 제작하고 보수를 받고 싶습니까? >> 가입하세요! <<

RabbitMqClientBase클래스는 RabbitMq 서버에 연결하기 위한 논리를 캡슐화합니다.

protected RabbitMqClientBase(ConnectionFactory connectionFactory, 
                             ILogger<RabbitMqClientBase> logger)
{
    _connectionFactory = connectionFactor
    _logger = logger
    Connect();
}

private void Connect()
{
    if (_connection == null || _connection.IsOpen == false)
    {
         _connection = _connectionFactory.CreateConnection();
    }

    if (Channel == null || Channel.IsOpen == false)
    {
         Channel = _connection.CreateModel();                
         Channel.QueueDeclare(queue: QueueName, durable: false, exclusive: false, autoDelete: false);                
    }
}

생성자는 메서드를 호출하고 Connect()연결이 이미 설정되었는지 확인합니다. 그렇지 않은 경우 연결을 만듭니다. 이 방법은 또한 RabbitMq 서버에 대기열이 아직 없는 경우 대기열을 생성합니다.

DI 컨테이너에 싱글톤으로 등록하고 ConnectionFactory생성자를 통해 RabbitMqClientBase클래스에 주입합니다.

builder.Services.AddSingleton(sp =>
{
    var uri = new Uri("URL FOR RABBITMQ SERVER");
    return new ConnectionFactory
    {
        Uri = uri
    };
});

RabbitMq 서버/클러스터 설치에 따라 자리 표시자 RabbitMq 서버 URL을 원래 엔드포인트로 바꿉니다.

ProducerBase 코드

ProducerBase이제 모든 메시지 생성자가 상속하는 클래스를 구현하는 방법을 알아보겠습니다 .

public interface IMessageProducer<in T>
{
    void SendMessage(T message);
}

public abstract class ProducerBase<T> : RabbitMqClientBase, IMessageProducer<T>
{
    private readonly ILogger<ProducerBase<T>> _logger;

    protected ProducerBase(ConnectionFactory connectionFactory,
                           ILogger<RabbitMqClientBase> logger,
                           ILogger<ProducerBase<T>> producerBaseLogger) 
       : base(connectionFactory, logger)
    {
        _logger = producerBaseLogger;
    }

    public virtual void SendMessage(T message)
    {
        try
        {
            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
            var properties = Channel.CreateBasicProperties();                
            properties.ContentType = "application/json";
            properties.DeliveryMode = 1; // Doesn't persist to disk
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
            Channel.BasicPublish(exchange: "", routingKey: QueueName, body: body, basicProperties: properties);
         }
         catch (Exception ex)
         {
            _logger.LogCritical(ex, "Error while publishing");
         }
    }
}

클래스 는 ProducerBase클래스에서 상속됩니다 RabbitMqClientBase. 애플리케이션이 메시지를 보내기 전에 RabbitMQ 서버와의 연결을 설정하도록 합니다.

이 클래스는 또한 인터페이스 SendMessage()에서 메서드를 구현합니다. IMessageProducer이 메서드는 메시지를 byte[]POCO의 메시지로 변환하고 메시지에 대한 일부 메타데이터 속성을 설정합니다. 그런 다음 클래스 의 QueueName속성이 나타내는 대기열에 메시지를 게시합니다 .RabbitMqClientBase

이전 섹션에서 볼 수 있듯이 마이크로 서비스 프로젝트의 생산자 클래스는 클래스에서 상속 ProducerBase하고 속성을 재정 QueueName의하여 원하는 대기열에 게시하기만 하면 됩니다.

Code Maze 팀에 합류하여 더 멋진 .NET/C# 콘텐츠를 제작하고 보수를 받고 싶습니까? >> 가입하세요! <<

ConsumerBase 코드

ConsumerBase모든 메시지 소비자가 상속 하는 클래스를 살펴보겠습니다 .

public abstract class ConsumerBase : RabbitMqClientBase
{ 
    protected ConsumerBase(ConnectionFactory connectionFactory,
                           ILogger<RabbitMqClientBase> logger,
                           ILogger<ConsumerBase> consumerLogger) 
       : base(connectionFactory, logger)
    {            
          
       try
       {
           var consumer = new AsyncEventingBasicConsumer(Channel);
           consumer.Received += OnMessageReceived;
           Channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        }
        catch (Exception ex)
        {
           consumerLogger.LogCritical(ex, "Error while consuming message");
        }
   }
}
  

클래스도 클래스 에서 ConsumerBase상속됩니다 RabbitMqClientBase. 생성자는 인스턴스를 인스턴스화 하고 메서드를 메시지 도착에 대한 콜백으로 AsyncEventingBasicConsumer등록합니다 . OnMessageReceived()OnMessageReceived()방법은 여기에서 추상으로 정의됩니다. 마지막으로 from BasicConsume()이 나타내는 대기열의 메서드를 호출합니다 .QueueName``RabbitMqClientBase

이전 섹션에서 볼 수 있듯이. 개별 프로젝트의 소비자 클래스는 이 클래스에서 상속되며 비즈니스 논리에 ConsumerBase따라 메서드를 재정의합니다 . OnMessageReceived()또한 QueueName의도한 대기열에서 사용할 속성을 재정의합니다.

IHostedService소비자가 from .NET을 구현하여 응용 프로그램 시작 시 생성자에서 코드를 실행하는 것을 이미 살펴보았습니다 . 이것은 또한 응용 프로그램이 중지될 때 클래스 Dispose()에서 메서드를 호출하여 RabbitMq 연결이 정상적으로 닫히도록 합니다.RabbitMqClientBase

public void Dispose()
{
    try
    {
        Channel?.Close();
        Channel?.Dispose();
        Channel = null;

        _connection?.Close();
        _connection?.Dispose();
        _connection = null;
    }
    catch (Exception ex)
    {
        _logger.LogCritical(ex, "Cannot dispose RabbitMQ channel or connection");
    }
}

모든 소비자는 인터페이스 의 StopAsync()메서드에서 이 메서드를 호출합니다 .IHostedService

사용자 경험

이제 이전 기사의 동일한 요청을 엔드 /checkout포인트 로 보내 보겠습니다 . 이것은 두 개의 카트 항목의 결제를 나타냅니다.

웹 API 엔드포인트는 성공적인 응답을 반환합니다.

{
  "orderId": "82db8a24-57d0-469d-a4e2-33525f4a8679",
  "orderStatus": "Inprogress",
  "message": "Your order is in progress, you will receive an email with all details."
}

Request duration
128 MS

이제 엔드포인트가 최종 사용자에게 응답을 반환하는 데 훨씬 적은 시간이 걸린다는 것을 알 수 있습니다. 나머지 체크아웃 처리는 사용자에게 전송된 응답과 관계없이 각 책임 마이크로서비스에서 발생합니다. 이것은 개별 마이크로 서비스의 콘솔 로그 항목에서 분명합니다.

Code Maze 팀에 합류하여 더 멋진 .NET/C# 콘텐츠를 제작하고 보수를 받고 싶습니까? >> 가입하세요! <<

info: Microservice.StockValidatorService.Validator[0]
      Stock is validated.

info: Microservice.TaxCalculatorService.Calculator[0]
      Customer lookup completed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6.
info: Microservice.TaxCalculatorService.Calculator[0]
      Tax value calculated for customer is 36.

info: Microservice.PaymentProcessingService.PaymentProcessor[0]
      Payment of 196 has been processed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6

info: Microservice.ReceiptGeneratorService.ReceiptGenerator[0]
      Receipt Generated and Order Status persisted in DB.
info: Microservice.ReceiptGeneratorService.ReceiptGenerator[0]
      Email is sent with Order Status and receipt.

결론

이 기사에서 사용된 데모 마이크로 서비스는 간결함을 위해 몇 가지 모범 사례와 비즈니스 논리를 무시하고 주로 장기 실행 작업 실행에 중점을 둡니다. 그러나 여기서는 엔터프라이즈 수준 애플리케이션과 관련된 견고성을 염두에 두고 대부분의 코드를 구조화했습니다.

출처 : https://code-maze.com/long-running-tasks-microservices/

Sort:  

[광고] STEEM 개발자 커뮤니티에 참여 하시면, 다양한 혜택을 받을 수 있습니다.