I have implemented a event aggregator for our backend.
public class EventAggregator : IEventAggregator
{
private readonly WeakReferenceList<object> _subscribers = new WeakReferenceList<object>();
public void Subscribe(object subscriber)
{
_subscribers.Add(subscriber);
}
public Task PublishAsync<T>(T message) where T : class
{
foreach (var handler in _subscribers.OfType<IHandle<T>>())
handler.Handle(message);
var handlers = _subscribers
.OfType<IHandleAsync<T>>()
.Select(s => s.HandleAsync(message))
.Where(t => t.Status != TaskStatus.RanToCompletion)
.ToList();
if (handlers.Any()) return Task.WhenAll(handlers);
return Task.CompletedTask;
}
}
You can then choose to handle the events async or none async
None async
public class CreatedListener : IBusinessContextListener, IHandle<SavingChangesEvent>
{
public CreatedListener(IEventAggregator eventAggregator)
{
eventAggregator.Subscribe(this);
}
public void Handle(SavingChangesEvent message)
{
foreach (var created in message.Context.Db.ChangeTracker.Entries<ICreated>().Where(c => c.State == EntityState.Added))
{
created.Entity.CreatedBy = message.Context.Username;
created.Entity.CreatedUTC = DateTime.UtcNow;
}
}
}
Async
public class AzureFileStorageExternalAdapter : IExternalPartAdapter, IHandleAsync<TransactionCommitedEvent>
{
private readonly IBusinessContext _ctx;
private readonly List<CloudFile> _proccessedFiles;
public AzureFileStorageExternalAdapter(IBusinessContext ctx, IEventAggregator eventAggregator)
{
_ctx = ctx;
_proccessedFiles = new List<CloudFile>();
eventAggregator.Subscribe(this);
}
public Task<IEnumerable<byte[]>> ListResponses()
{
var folder = GetRemoteFolder($"In:{_ctx.ExecutionContext.Name}");
var files = folder.ListFilesAndDirectories()
.Select(fi => fi as CloudFile)
.Where(file => file != null)
.ToList();
_proccessedFiles.AddRange(files);
return files
.Select(async file =>
{
using (var stream = await file.OpenReadAsync())
{
using (var mem = new MemoryStream())
{
await stream.CopyToAsync(mem);
mem.Position = 0;
return mem.ToArray();
}
}
})
.WhenAll();
}
public Task HandleAsync(TransactionCommitedEvent message)
{
if (!_proccessedFiles.Any()) return Task.CompletedTask;
return _proccessedFiles
.Select(file => file.DeleteAsync())
.WhenAll();
}
}
Publisher has to be async even though only none async listeners are currently listenting
public class BusinessContext : IBusinessContextController, IDisposable
{
private readonly IEventAggregator _eventAggregator;
private IDbContextTransaction _dbContextTransaction;
public BusinessContext(DbContext ctx, IEnumerable<IBusinessContextListener> ctxListeners, IEventAggregator eventAggregator)
{
_eventAggregator = eventAggregator;
Db = ctx;
if(ctxListeners == null) throw new ApplicationException("Do not remove this dependency, it ensures that listeners are awake");
}
public DbContext Db { get; }
public string Username { get; set; }
public ExecutionContext ExecutionContext { get; set; }
public async Task SaveChangesAsync()
{
await _eventAggregator.PublishAsync(new SavingChangesEvent(this));
await Db.SaveChangesAsync();
}
public async Task StartTransactionAsync()
{
if (_dbContextTransaction != null) return;
_dbContextTransaction = await Db.Database.BeginTransactionAsync();
}
public void Dispose()
{
_dbContextTransaction?.Dispose();
}
public Task CommitTransactionAsync()
{
_dbContextTransaction?.Commit();
return _eventAggregator.PublishAsync(new TransactionCommitedEvent());
}
public void RollbackTransaction()
{
_dbContextTransaction?.Rollback();
}
}
Bonus question the interface IBusinessContextListener is a empty markup interface that is only used to mark classes that have no other purpose than to be a event listener (Otherwise they would not be 'awake' and there Handle methods would not invoke. Solid design?