Introduction
This guide demonstrates how to implement the Outbox Pattern with MySQL and .NET 8 using the Brighter library to ensure transactional consistency between database updates and message publishing.
Project
The goal is to process a CreateNewOrder command that publishes two events (OrderPlaced
, OrderPaid
) only if the transaction succeeds. If an error occurs (e.g., a business rule violation), both the database changes and message publications are rolled back.
Requirement
- .NET 8+
- Podman (or Docker) to run local containers:
- MySql
- RabbitMQ
- Brighter knowledge about RabbitMQ
- Nuget packages
Messages
For this project, we will need these 3 messages: CreateNewOrder
, OrderPlaced
and OrderPaid
public class CreateNewOrder() : Command(Guid.NewGuid())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
}
Message Mappers
Since only OrderPlaced
and OrderPaid
events are published to RabbitMQ, we need to implement mappers for them using JSON serialization
public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
public Message MapToMessage(OrderPlaced request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-placed";
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPlaced MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
}
}
public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
public Message MapToMessage(OrderPaid request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-paid";
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPaid MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
}
}
Request Handlers
For OrderPlaced
and OrderPaid
we are going to log the received message.
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
return base.HandleAsync(command, cancellationToken);
}
}
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} paid", command.OrderId);
return base.HandleAsync(command, cancellationToken);
}
}
Create New Order
The CreateNewOrder
handler is going to wait for 10ms to emulate a process, then publish the OrderPlaced
, if the value is mod 3 throw an exception (emulation a business error), otherwise publish OrderPaid
.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
IUnitOfWork unitOfWork,
ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
{
await unitOfWork.BeginTransactionAsync(cancellationToken);
try
{
string id = Guid.NewGuid().ToString();
logger.LogInformation("Creating a new order: {OrderId}", id);
await Task.Delay(10, cancellationToken); // emulating an process
_ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken);
if (command.Value % 3 == 0)
{
throw new InvalidOperationException("invalid value");
}
_ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken);
await unitOfWork.CommitAsync(cancellationToken);
return await base.HandleAsync(command, cancellationToken);
}
catch
{
logger.LogError("Invalid data");
await unitOfWork.RollbackAsync(cancellationToken);
throw;
}
}
}
Key Insight:
-
DepositPostAsync
stores messages in the outbox within the same transaction as business data. - If an exception occurs (e.g.,
InvalidOperationException
), the transaction rolls back, ensuring no orphaned messages.
Configuring MySQL
To integrate the Outbox Pattern with MySQLL, first ensure the outbox_messages
table exists.
1. SQL Table Schema
CREATE TABLE IF NOT EXISTS `outbox_messages`(
`MessageId` CHAR(36) NOT NULL,
`Topic` VARCHAR(255) NOT NULL,
`MessageType` VARCHAR(32) NOT NULL,
`Timestamp` TIMESTAMP(3) NOT NULL,
`CorrelationId` CHAR(36) NULL,
`ReplyTo` VARCHAR(255) NULL,
`ContentType` VARCHAR(128) NULL,
`Dispatched` TIMESTAMP(3) NULL,
`HeaderBag` TEXT NOT NULL,
`Body` TEXT NOT NULL,
`Created` TIMESTAMP(3) NOT NULL DEFAULT NOW(3),
`CreatedID` INT(11) NOT NULL AUTO_INCREMENT,
UNIQUE(`CreatedID`),
PRIMARY KEY (`MessageId`)
);
2. Dependency Injection Setup
Register the outbox and transaction.
services
.AddServiceActivator(opt => { // Subscription setup (see previous article) })
.UseMySqlOutbox(new MySqlConfiguration(ConnectionString, "outbox_messages"), typeof(MySqlConnectionProvider), ServiceLifetime.Scoped)) .UseMySqTransactionConnectionProvider(typeof(MySqlConnectionProvider))
.UseOutboxSweeper(opt => opt.BatchSize = 10);
Why This Works:
-
UseMySqlOutbox
links the outbox to SQL Server. -
UseOutboxSweeper
configures background polling for undelivered messages.
3. Transaction Management
To ensure atomicity between business logic and message publishing in Brighter, implement IMySqlTransactionConnectionProvider
and IUnitOfWork
for shared transaction context. This guarantees that messages are only stored in the outbox if the database transaction commits successfully.
a. MY+YSqlConnectionProvider
public class MySqlConnectionProvider(MySqlUnitOfWork sqlConnection) : IMySqlTransactionConnectionProvider
{
private readonly MySqlUnitOfWork _sqlConnection = sqlConnection;
public MySqlConnection GetConnection()
{
return _sqlConnection.Connection;
}
public Task<MySqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(_sqlConnection.Connection);
}
public MySqlTransaction? GetTransaction()
{
return _sqlConnection.Transaction;
}
public bool HasOpenTransaction => _sqlConnection.Transaction != null;
public bool IsSharedConnection => true;
}
b. Unit of work
And finally we need to create a new interface and implement an interface called IUnitOfWork
public interface IUnitOfWork
{
Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable);
Task CommitAsync(CancellationToken cancellationToken);
Task RollbackAsync(CancellationToken cancellationToken);
}
c. MySqlUnitOfWork Implementation
public class MySqlUnitOfWork : IUnitOfWork
{
public MySqlUnitOfWork(MySqlConfiguration configuration)
{
Connection = new(configuration.ConnectionString);
Connection.Open();
}
public MySqlConnection Connection { get; }
public MySqlTransaction? Transaction { get; private set; }
public async Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable)
{
if (Transaction == null)
{
Transaction = await Connection.BeginTransactionAsync(isolationLevel);
}
}
public async Task CommitAsync(CancellationToken cancellationToken)
{
if (Transaction != null)
{
await Transaction.CommitAsync(cancellationToken);
}
}
public async Task RollbackAsync(CancellationToken cancellationToken)
{
if (Transaction != null)
{
await Transaction.RollbackAsync(cancellationToken);
}
}
public Task<MySqlCommand> CreateSqlCommandAsync(string sql, MySqlParameter[] parameters, CancellationToken cancellationToken)
{
var command = Connection.CreateCommand();
if (Transaction != null)
{
command.Transaction = Transaction;
}
command.CommandText = sql;
if (parameters.Length > 0)
{
command.Parameters.AddRange(parameters);
}
return Task.FromResult(command);
}
}
d. Register Services in Dependency Injection
services
.AddScoped<MySqlUnitOfWork, MySqlUnitOfWork>()
.TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>());
Conclusion
By implementing the Outbox Pattern with Brighter and MySQL, we’ve demonstrated how to achieve transactional consistency between database updates and message publishing. This approach ensures that:
-
Messages are only published if the transaction commits successfully
- Using
DepositPostAsync
, messages likeOrderPlaced
andOrderPaid
are stored in theoutbox_messages
table within the same transaction as business data. If the handler fails (e.g., due to a simulated error), the transaction rolls back, and no orphaned messages are sent. - Brighter's
IMySqlTransactionConnectionProvider
guarantees that database updates and message deposits share the same transaction.
- Using
-
Fault Tolerance via the Outbox Sweeper
- The
UseOutboxSweeper
polls for undelivered messages and retries them until acknowledged by RabbitMQ. This decouples message publishing from the handler’s execution, ensuring reliability even if the broker is temporarily unavailable.
- The
-
Decoupled Architecture
- Applications focus on local transactions, while Brighter handles message delivery asynchronously. This avoids tight coupling to the messaging infrastructure and simplifies scalability.
This implementation showcases how Brighter abstracts complexity, enabling developers to focus on business logic while ensuring reliability in distributed systems. For production use, pair this pattern with monitoring tools (e.g., Prometheus), dead-letter queues (DLQs) to handle poisoned messages and add index on the outbox table on Dispatched
and Timestamp
columns.
Top comments (0)