DEV Community

Cover image for Implementando o padrão Outbox Pattern com MySQL e usando Brighter
Rafael Andrade
Rafael Andrade

Posted on

Implementando o padrão Outbox Pattern com MySQL e usando Brighter

Introdução

Este guia demonstra como implementar o Padrão de Caixa de Saída (Outbox Pattern) com MySQL e .NET 8 usando a biblioteca Brighter para garantir consistência transacional entre atualizações de banco de dados e publicação de mensagens.

Projeto

O objetivo é processar um comando CreateNewOrder que publique dois eventos (OrderPlaced, OrderPaid) somente se a transação for bem-sucedida. Se ocorrer um erro (ex.: violação de regra de negócio), ambas as alterações no banco de dados e publicações de mensagens serão revertidas.

Requisitos

Mensagens

Para este projeto, usaremos estas 3 mensagens: CreateNewOrder, OrderPlaced e OrderPaid

public class CreateNewOrder() : Command(Guid.NewGuid())
{
    public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
    public decimal Value { get; set; }
}
public class OrderPaid() : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Mapeadores de Mensagens

Como apenas os eventos OrderPlaced e OrderPaid são publicados no RabbitMQ, precisamos implementar mapeadores para eles usando serialização JSON

public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
    public Message MapToMessage(OrderPlaced request)
    {
        var header = new MessageHeader();
        header.Id = request.Id;
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "order-placed";
        header.MessageType = MessageType.MT_EVENT;
        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }
    public OrderPlaced MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
    }
}

public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
    public Message MapToMessage(OrderPaid request)
    {
        var header = new MessageHeader();
        header.Id = request.Id;
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "order-paid";
        header.MessageType = MessageType.MT_EVENT;
        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }
    public OrderPaid MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
    }
}
Enter fullscreen mode Exit fullscreen mode

Manipuladores de Requisições

Para OrderPlaced e OrderPaid vamos registrar logs da mensagem recebida.

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
    public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
        return base.HandleAsync(command, cancellationToken);
    }
}
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
    public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} paid", command.OrderId);
        return base.HandleAsync(command, cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

Criar Novo Pedido

O manipulador CreateNewOrder vai esperar 10ms para emular um processo, depois publica o OrderPlaced, se o valor for divisível por 3 lança uma exceção (simulando um erro de negócio), caso contrário publica OrderPaid.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
    IUnitOfWork unitOfWork,
    ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
    public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
    {
        await unitOfWork.BeginTransactionAsync(cancellationToken);
        try
        {
            string id = Guid.NewGuid().ToString();
            logger.LogInformation("Creating a new order: {OrderId}", id);
            await Task.Delay(10, cancellationToken); // emulando um processo
            _ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken);
            if (command.Value % 3 == 0)
            {
                throw new InvalidOperationException("invalid value");
            }
            _ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken);
            await unitOfWork.CommitAsync(cancellationToken);
            return await base.HandleAsync(command, cancellationToken);
        }
        catch
        {
            logger.LogError("Invalid data");
            await unitOfWork.RollbackAsync(cancellationToken);
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Entendimento Chave:

  • DepositPostAsync armazena mensagens na caixa de saída dentro da mesma transação dos dados de negócio.

  • Se ocorrer uma exceção (ex.: InvalidOperationException), a transação é revertida, garantindo que não haja mensagens órfãs.

Configurando MySQL

Para integrar o Padrão de Caixa de Saída com MySQL, primeiro garanta que a tabela outbox_messages exista.

1. Esquema da Tabela SQL

CREATE TABLE IF NOT EXISTS `outbox_messages`(
  `MessageId` CHAR(36) NOT NULL,
  `Topic` VARCHAR(255) NOT NULL,
  `MessageType` VARCHAR(32) NOT NULL,
  `Timestamp` TIMESTAMP(3) NOT NULL,
  `CorrelationId` CHAR(36) NULL,
  `ReplyTo` VARCHAR(255) NULL,
  `ContentType` VARCHAR(128) NULL,
  `Dispatched` TIMESTAMP(3) NULL,
  `HeaderBag` TEXT NOT NULL,
  `Body` TEXT NOT NULL,
  `Created` TIMESTAMP(3) NOT NULL DEFAULT NOW(3),
  `CreatedID` INT(11) NOT NULL AUTO_INCREMENT,
   UNIQUE(`CreatedID`),
   PRIMARY KEY (`MessageId`)
);
Enter fullscreen mode Exit fullscreen mode

2. Configuração de Injeção de Dependência

Registre a caixa de saída e transação.

services
    .AddServiceActivator(opt => { /* Configuração de assinatura (ver artigo anterior) */ })
    .UseMySqlOutbox(new MySqlConfiguration(ConnectionString, "outbox_messages"), typeof(MySqlConnectionProvider), ServiceLifetime.Scoped))
    .UseMySqTransactionConnectionProvider(typeof(MySqlConnectionProvider))
    .UseOutboxSweeper(opt => opt.BatchSize = 10);
Enter fullscreen mode Exit fullscreen mode

Por Que Funciona:

  • UseMySqlOutbox vincula a caixa de saída ao SQL Server.
  • UseOutboxSweeper configura a verificação em segundo plano para mensagens não entregues.

3. Gestão de Transações

Para garantir atomicidade entre lógica de negócio e publicação de mensagens na Brighter, implemente IMySqlTransactionConnectionProvider e IUnitOfWork para compartilhar o contexto de transação. Isso garante que mensagens só sejam armazenadas na caixa de saída se a transação do banco de dados for confirmada com sucesso.

a. SqlConnectionProvider
public class MySqlConnectionProvider(MySqlUnitOfWork sqlConnection) : IMySqlTransactionConnectionProvider
{
    private readonly MySqlUnitOfWork _sqlConnection = sqlConnection;
    public MySqlConnection GetConnection()
    {
        return _sqlConnection.Connection;
    }
    public Task<MySqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
    {
        return Task.FromResult(_sqlConnection.Connection);
    }
    public MySqlTransaction? GetTransaction()
    {
        return _sqlConnection.Transaction;
    }
    public bool HasOpenTransaction => _sqlConnection.Transaction != null;
    public bool IsSharedConnection => true;
}
Enter fullscreen mode Exit fullscreen mode
b. Unit of Work

E finalmente precisamos criar uma nova interface chamada IUnitOfWork

public interface IUnitOfWork
{
    Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable);
    Task CommitAsync(CancellationToken cancellationToken);
    Task RollbackAsync(CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode
c. Implementação do SqlUnitOfWork
public class MySqlUnitOfWork : IUnitOfWork
{
    public MySqlUnitOfWork(MySqlConfiguration configuration)
    {
        Connection = new(configuration.ConnectionString);
        Connection.Open();
    }
    public MySqlConnection Connection { get; }
    public MySqlTransaction? Transaction { get; private set; }
    public async Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable)
    {
        if (Transaction == null)
        {
            Transaction = await Connection.BeginTransactionAsync(isolationLevel);
        }
    }
    public async Task CommitAsync(CancellationToken cancellationToken)
    {
        if (Transaction != null)
        {
            await Transaction.CommitAsync(cancellationToken);
        }
    }
    public async Task RollbackAsync(CancellationToken cancellationToken)
    {
        if (Transaction != null)
        {
            await Transaction.RollbackAsync(cancellationToken);
        }
    }
    public Task<MySqlCommand> CreateSqlCommandAsync(string sql, MySqlParameter[] parameters, CancellationToken cancellationToken)
    {
        var command = Connection.CreateCommand();
        if (Transaction != null)
        {
            command.Transaction = Transaction;
        }
        command.CommandText = sql;
        if (parameters.Length > 0)
        {
            command.Parameters.AddRange(parameters);
        }
        return Task.FromResult(command);
    }
}
Enter fullscreen mode Exit fullscreen mode
d. Registrar Serviços na Injeção de Dependência
services
    .AddScoped<MySqlUnitOfWork, MySqlUnitOfWork>()
    .TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>());
Enter fullscreen mode Exit fullscreen mode

Conclusão

Ao implementar o Padrão de Caixa de Saída com Brighter e MySQL, demonstramos como alcançar consistência transacional entre atualizações de banco de dados e publicação de mensagens. Essa abordagem garante que:

  1. Mensagens são publicadas somente se a transação for confirmada com sucesso
    • Usando DepositPostAsync, mensagens como OrderPlaced e OrderPaid são armazenadas na tabela outbox_messages dentro da mesma transação dos dados de negócio. Se o manipulador falhar (ex.: erro simulado), a transação é revertida e nenhuma mensagem órfã é enviada.
    • O IMySqlTransactionConnectionProvider da Brighter garante que atualizações de banco de dados e depósitos de mensagens compartilhem a mesma transação.
  2. Tolerância a Falhas via Outbox Sweeper
    • O UseOutboxSweeper verifica periodicamente mensagens não entregues e tenta reenviá-las até que sejam reconhecidas pelo RabbitMQ. Isso desacopla a publicação de mensagens da execução do manipulador, garantindo confiabilidade mesmo que o broker esteja temporariamente indisponível.
  3. Arquitetura Desacoplada
    • Aplicações focam em transações locais, enquanto a Brighter trata a entrega de mensagens de forma assíncrona. Isso evita acoplamento com a infraestrutura de mensagens e simplifica a escalabilidade. Esta implementação mostra como a Brighter abstrai complexidades, permitindo que desenvolvedores se concentrem na lógica de negócio enquanto garantem confiabilidade em sistemas distribuídos. Para uso em produção, combine esse padrão com ferramentas de monitoramento (ex.: Prometheus), filas de mensagens mortas (DLQs) para tratar mensagens corrompidas e adicione índices na tabela de caixa de saída nas colunas Dispatched e Timestamp.

Referência

Top comments (0)