DEV Community

Cover image for Implementing the Outbox Pattern with PostgreSQL in Brighter
Rafael Andrade
Rafael Andrade

Posted on

Implementing the Outbox Pattern with PostgreSQL in Brighter

Introduction

Building on our previous exploration of the outbox pattern with SQL Server, this article adapts the pattern to PostgreSQL while addressing its limitations. The goal is to ensure transactional consistency between database updates and message publishing. We’ll use .NET 8, Brighter, and PostgreSQL to handle order creation and event publishing across distributed systems.

Project

The main idea of this project is to send a command to create an order, when the order is create, it'll send 2 messages OrderPlaced & OrderPaid, in case we have a failure, we shouldn't send any message.

Requirement

Messages

For this project we will need these 3 message: CreateNewOrder, OrderPlaced and OrderPaid

public class CreateNewOrder() : Command(Guid.NewGuid())
{
    public decimal Value { get; set; }
}

public class OrderPlaced() : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
    public decimal Value { get; set; }
}

public class OrderPaid() : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Message Mappers

Since only OrderPlaced and OrderPaid events are published to RabbitMQ, we need to implement mappers for them using JSON serialization

public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
    public Message MapToMessage(OrderPlaced request)
    {
        var header = new MessageHeader();
        header.Id = request.Id;
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "order-placed";
        header.MessageType = MessageType.MT_EVENT;

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public OrderPlaced MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
    }
}

public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
    public Message MapToMessage(OrderPaid request)
    {
        var header = new MessageHeader();
        header.Id = request.Id;
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "order-paid";
        header.MessageType = MessageType.MT_EVENT;

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public OrderPaid MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
    }
}
Enter fullscreen mode Exit fullscreen mode

Request Handlers

For OrderPlaced and OrderPaid we are going to log the received message.

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
    public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
        return base.HandleAsync(command, cancellationToken);
    }
}

public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
    public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} paid", command.OrderId);
        return base.HandleAsync(command, cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

Create New Order

The CreateNewOrder handler is going to wait for 10ms to emulate a process, then publish the OrderPlaced, if the value is mod 3 throw an exception (emulation a business error), otherwise publish OrderPaid.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
    ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
    public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
    {
        try
        {
            string id = Guid.NewGuid().ToString();
            logger.LogInformation("Creating a new order: {OrderId}", id);

            await Task.Delay(10, cancellationToken); // emulating an process

            _ = commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
            if (command.Value % 3 == 0)
            {
                throw new InvalidOperationException("invalid value");
            }

            _ = commandProcessor.DepositPost(new OrderPaid { OrderId = id });

            return await base.HandleAsync(command, cancellationToken);
        }
        catch
        {
            logger.LogError("Invalid data");
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuring PostgresSql

To integrate the Outbox Pattern with PostgresSql, first ensure the OutboxMessages table exists.

1. SQL Table Schema

CREATE TABLE IF NOT EXISTS "outboxmessages"
(
  "id" BIGSERIAL NOT NULL,
  "messageid" UUID NOT NULL,
  "topic" VARCHAR(255) NULL,
  "messagetype" VARCHAR(32) NULL,
  "timestamp" TIMESTAMP NULL,
  "correlationid" UUID NULL,
  "replyto" VARCHAR(255) NULL,
  "contenttype" VARCHAR(128) NULL,
  "dispatched" TIMESTAMP NULL,
  "headerbag" TEXT NULL,
  "body" TEXT NULL,
  PRIMARY KEY (Id)
);
Enter fullscreen mode Exit fullscreen mode

2. Dependency Injection Setup

Register the outbox and transaction.

services
    .AddServiceActivator(opt => { // Subscription setup (see previous article) })
    .UsePostgreSqlOutbox(new PostgreSqlOutboxConfiguration(ConnectionString, "OutboxMessages"))
    .UseOutboxSweeper(opt => opt.BatchSize = 10);
Enter fullscreen mode Exit fullscreen mode

Why This Works:

  • UsePostgreSqlOutbox links the outbox to PostgreSql.
  • UseOutboxSweeper configures background polling for undelivered messages.

3. Limitations and Workarounds

  1. Sync-Only Operations: Brighter’s PostgreSQL outbox does not support async methods.
  2. Transaction Barriers: Messages and database updates cannot share transactions, increasing the risk of inconsistency during failures(I tried to make it work, but I was always getting error about transaction was aborted).

Future Improvements
Brighter v10 aims to align PostgreSQL’s outbox implementation with SQL Server’s capabilities, including async support and transactional guarantees.

Conclusion

While PostgreSQL’s current Brighter integration has limitations(such as the lack of async support and transactional consistency) it remains viable for scenarios with moderate throughput. For critical workloads requiring robust transactional guarantees, SQL Server offers a mature implementation of the outbox pattern with proven consistency. By understanding these trade-offs, teams can select the appropriate tools based on their scalability and reliability requirements.

Reference

Top comments (0)