Skip to main content

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }
   
    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.

  2. If storing my events in the event store succeeds but suddenly theirthere is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency.? I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However, let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queusqueues that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }
   
    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.

  2. If storing my events in the event store succeeds but suddenly their is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency. I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queus that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }
   
    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.

  2. If storing my events in the event store succeeds but suddenly there is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency? I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However, let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queues that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??

added 8 characters in body
Source Link
Josh L
  • 103
  • 5

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method willThe following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }
   
    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.

  2. If storing my events in the event store succeeds but suddenly their is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency. I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queus that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }
   
    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.

  2. If storing my events in the event store succeeds but suddenly their is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency. I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queus that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }
   
    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.

  2. If storing my events in the event store succeeds but suddenly their is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency. I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queus that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??

Source Link
Josh L
  • 103
  • 5

Dealing with eventual consistency when persisting and publishing events

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }
   
    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.

  2. If storing my events in the event store succeeds but suddenly their is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency. I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queus that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??