Introdução
Este guia demonstra como implementar o Padrão Outbox (Caixa de Saída) com SQLite e .NET 8 usando a biblioteca Brighter, garantindo consistência transacional entre atualizações no banco de dados e publicação de mensagens.
Projeto
O objetivo é processar um comando CreateNewOrder
que publique dois eventos (OrderPlaced
, OrderPaid
) somente se a transação for bem-sucedida. Se ocorrer um erro (por exemplo, violação de regra de negócio), tanto as alterações no banco de dados quanto as publicações de mensagens serão revertidas.
Requisitos
- .NET 8+
- Podman (ou Docker) para executar containers locais:
- RabbitMQ
- Conhecimento sobre Brighter e RabbitMQ
- Pacotes NuGet:
Mensagens
Para este projeto, usaremos as seguintes mensagens: CreateNewOrder
, OrderPlaced
e OrderPaid
.
public class CreateNewOrder() : Command(Guid.NewGuid())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
}
Mapeadores de Mensagens
Como apenas os eventos OrderPlaced
e OrderPaid
são publicados no RabbitMQ, precisamos implementar mapeadores usando serialização JSON:
public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
public Message MapToMessage(OrderPlaced request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-placed";
header.MessageType = MessageType.MT_EVENT;
header.ReplyTo = ""; // Devido a um bug na implementação do SQLite
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPlaced MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
}
}
public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
public Message MapToMessage(OrderPaid request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-paid";
header.MessageType = MessageType.MT_EVENT;
header.ReplyTo = ""; // Devido a um bug na implementação do SQLite
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPaid MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
}
}
Observação: Devido a um bug na implementação atual do outbox no SQLite, é necessário definir ReplyTo
como vazio; será corrigido na versão 10.
Manipuladores de Requisição
Para OrderPlaced
e OrderPaid
, vamos registrar logs das mensagens recebidas:
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} foi realizado com valor {OrderValue}", command.OrderId, command.Value);
return base.HandleAsync(command, cancellationToken);
}
}
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} foi pago", command.OrderId);
return base.HandleAsync(command, cancellationToken);
}
}
Criar Novo Pedido
O manipulador CreateNewOrder
aguardará 10ms para simular um processo, publicará OrderPlaced
, e se o valor for divisível por 3, lançará uma exceção (simulando um erro de negócio). Caso contrário, publicará OrderPaid
.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
IUnitOfWork unitOfWork,
ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
{
await unitOfWork.BeginTransactionAsync(cancellationToken);
try
{
string id = Guid.NewGuid().ToString();
logger.LogInformation("Criando um novo pedido: {OrderId}", id);
await Task.Delay(10, cancellationToken); // Simulando um processo
_ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken);
if (command.Value % 3 == 0)
{
throw new InvalidOperationException("valor inválido");
}
_ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken);
await unitOfWork.CommitAsync(cancellationToken);
return await base.HandleAsync(command, cancellationToken);
}
catch
{
logger.LogError("Dados inválidos");
await unitOfWork.RollbackAsync(cancellationToken);
throw;
}
}
}
Destaques:
-
DepositPostAsync
armazena mensagens no outbox dentro da mesma transação dos dados de negócio. - Se ocorrer uma exceção (ex.:
InvalidOperationException
), a transação é revertida, evitando mensagens órfãs.
Configuração do SQLite
Para integrar o Outbox Pattern com SQLite, primeiro garanta que a tabela outbox_messages
exista.
1. Esquema da Tabela SQL
CREATE TABLE IF NOT EXISTS "outbox_messages"(
[MessageId] TEXT NOT NULL COLLATE NOCASE,
[Topic] TEXT NULL,
[MessageType] TEXT NULL,
[Timestamp] TEXT NULL,
[CorrelationId] TEXT NULL,
[ReplyTo] TEXT NULL,
[ContentType] TEXT NULL,
[Dispatched] TEXT NULL,
[HeaderBag] TEXT NULL,
[Body] TEXT NULL,
CONSTRAINT[PK_MessageId] PRIMARY KEY([MessageId])
);
2. Configuração de Injeção de Dependência
Registre o outbox e a transação:
services
.AddServiceActivator(opt => { /* Configuração de assinatura (ver artigo anterior) */ })
.UseSqlOutbox(new SqliteConfiguration(ConnectionString, "outbox_messages"), typeof(SqliteConnectionProvider), ServiceLifetime.Scoped))
.UseSqliteTransactionConnectionProvider(typeof(SqliteConnectionProvider))
.UseOutboxSweeper(opt => opt.BatchSize = 10);
Funcionamento:
-
UseSqliteOutbox
vincula o outbox ao SQLite. -
UseOutboxSweeper
configura a verificação em segundo plano para mensagens não entregues.
3. Gerenciamento de Transações
Para garantir atomicidade entre lógica de negócio e publicação de mensagens no Brighter, implemente ISqliteTransactionConnectionProvider
e IUnitOfWork
para compartilhar o contexto da transação.
a. SqliteConnectionProvider
public class SqliteConnectionProvider(SqliteUnitOfWork sqlConnection) : ISqliteTransactionConnectionProvider
{
public SqliteConnection GetConnection()
{
return sqlConnection.Connection;
}
public Task<SqliteConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(sqlConnection.Connection);
}
public SqliteTransaction? GetTransaction()
{
return sqlConnection.Transaction;
}
public bool HasOpenTransaction => sqlConnection.Transaction != null;
public bool IsSharedConnection => true;
}
b. Unidade de Trabalho (IUnitOfWork)
public interface IUnitOfWork
{
Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable);
Task CommitAsync(CancellationToken cancellationToken);
Task RollbackAsync(CancellationToken cancellationToken);
}
c. Implementação MySqlUnitOfWork
public class SqliteUnitOfWork : IUnitOfWork
{
public SqliteUnitOfWork(SqliteConfiguration configuration)
{
Connection = new(configuration.ConnectionString);
Connection.Open();
}
public SqliteConnection Connection { get; }
public SqliteTransaction? Transaction { get; private set; }
public async Task BeginTransactionAsync(IsolationLevel isolationLevel = IsolationLevel.Serializable)
{
if (Transaction == null)
{
Transaction = (SqliteTransaction)await Connection.BeginTransactionAsync(isolationLevel);
}
}
public async Task CommitAsync(CancellationToken cancellationToken)
{
if (Transaction != null)
{
await Transaction.CommitAsync(cancellationToken);
}
}
public async Task RollbackAsync(CancellationToken cancellationToken)
{
if (Transaction != null)
{
await Transaction.RollbackAsync(cancellationToken);
}
}
}
d. Registrar Serviços na Injeção de Dependência
services
.AddScoped<SqliteUnitOfWork, SqliteUnitOfWork>()
.TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqliteUnitOfWork>());
Conclusão
Ao implementar o Outbox Pattern com Brighter e SQLite, demonstramos como garantir consistência transacional entre atualizações no banco de dados e publicação de mensagens. Essa abordagem assegura que:
-
Mensagens são publicadas apenas se a transação for confirmada:
- Usando
DepositPostAsync
, mensagens comoOrderPlaced
eOrderPaid
são armazenadas na tabelaoutbox_messages
dentro da mesma transação dos dados de negócio. Se o manipulador falhar (ex.: erro simulado), a transação é revertida, e nenhuma mensagem órfã é enviada. - O
ISqliteTransactionConnectionProvider
do Brighter garante que atualizações no banco de dados e depósitos de mensagens compartilhem a mesma transação.
- Usando
-
Tolerância a Falhas via Outbox Sweeper:
- O
UseOutboxSweeper
verifica periodicamente mensagens não entregues e tenta reenviá-las até que sejam reconhecidas pelo RabbitMQ. Isso desacopla a publicação de mensagens da execução do manipulador, garantindo confiabilidade mesmo se o broker estiver temporariamente indisponível.
- O
-
Arquitetura Desacoplada:
- Aplicações focam em transações locais, enquanto o Brighter lida com a entrega de mensagens assincronamente. Isso evita acoplamento com a infraestrutura de mensagens e simplifica a escalabilidade.
Essa implementação mostra como o Brighter abstrai a complexidade, permitindo que os desenvolvedores se concentrem na lógica de negócio enquanto garantem confiabilidade em sistemas distribuídos.
Top comments (0)