DEV Community

Deniss Sudak
Deniss Sudak

Posted on

How to build multi-step message processing

Let’s say you have a service that, in response to some event, performs a series of operations that can’t be wrapped into an atomic transaction. For example, this service might integrate with an accounting system and, in response to a sale event, create an invoice (Step 1) and send that invoice to a customer (Step 2). If either of these two steps fails, the entire sale event processing should be retried. However, if Step 1 succeeds, but emailing the invoice fails, we obviously should retry only the email step and not create a duplicate invoice.

The idea is simple: persist the status of each processing step (e.g. invoice creation and emailing) in a database, and during a retry, resume from the failed step.

These statuses will be fetched from the database using an ID derived from the received message. A status can have one of the three values:

  • PROCESSING
  • SUCCESS
  • TRY_AGAIN

SUCCESS means that the step has completed successfully and not to be executed again.

TRY_AGAIN means that the step has failed in a way that is safe to retry. For example our accounting system returned 4xx error, indicating that the action wasn’t successful.

PROCESSING is set temporarily while the step is in progress. If it succeeds, we set it to SUCCESS. If it fails in a way where it’s unclear whether to retry (e.g. 5xx error), the status remains PROCESSING. In this case, the entire workflow is locked and cannot proceed until a human (or automation) determines what happened and updates the state.

Processing context

Every step may produce data that is required by the next step. For example, sending the invoice requires an invoice ID, which is generated by the external accounting system in the first step.

Data produced by the successful steps (like the invoice ID) will be stored in the message processing context. This way, when the message is retried, all the data is already there and ready to use.

Here’s the example of the event processing record:

{
  "eventId": "sale-1",
  "createdInvoiceId": "abc-xyz-123",
  "stepProcessingStatuses": [
    {
      "stepCode": "create-new-invoice",
      "status": "SUCCESS"
    },
    {
      "stepCode": "email-invoice",
      "status": "TRY_AGAIN"
    }
  ],
  "locked": false
}
Enter fullscreen mode Exit fullscreen mode

Here, “sale-1” is the ID of the event that is being processed. This is the ID used to look up the record when the event is retried. If the “email-invoice” step fails, during the second attempt this record will be fetched. I’ll show that “create-new-invoice” was successful and should be skipped. The “email-invoice” step will then reuse the previously stored “createdInvoiceId”.

What’s “locked”?

To ensure that the same step isn’t executed more than once, we must prevent multiple workers from processing the same message simultaneously.

This might not be an issue depending on your message queue. For example, AWS SQS FIFO queues have exactly-once delivery guarantees.

It’s less critical if duplicate execution isn’t a big deal. For instance, in a multi-step AI pipeline where each step processes data from the previous one, concurrent processing might just result in some extra API usage — not the end of the world.

However if your queue comes with at-least-once delivery guarantee (like standard AWS SQS), and you don’t want to create duplicate invoices then having a way to lock an event processing from other workers is essential.

On a high level this simply means that before processing an event, a worker has to successfully acquire a lock and release the lock after it’s done with the event. If any other worker picks up the message while it’s locked then it would ignore the event.

Pick a database that guarantees atomic updates, so only one worker can successfully acquire the lock at a time.

Code example

I’ve written a working example that demonstrates this pattern using Java Spring, AWS SQS and DynamoDB: https://github.com/denissudak/multi-step-event-processing

DynamoDB is a good choice for this use case.

  • Condition expressions support the locking functionality that we need.
  • Flexible schema allows any shape of processing context data (like “createdInvoiceId”).
  • You can use a single table to store processing records for multiple event types.

Even if you don’t work with Java, I would still suggest checking it out as it communicates the idea via working code.

I hope you find it useful. Enjoy!

Top comments (0)