I've written a queue supporting one producer and multiple consumer threads. The idea is that the queue instances a definable number of long running consumer threads. Internally I'm using a BlockingCollection to solve the producer consumer problem. I've done some little testing via a console application, and it seems to work. Can somebody review the code and let me know if there is any flaw?
The code can be found under Github.
Example usage:
static void Main(string[] args)
{
var q = new QueueWithMultipleConsumerThreads<int>(
numberOfWorkerThreads: 10,
actionToBeCalled: i =>
{
Console.WriteLine($"Consumed {i} from thread {Thread.CurrentThread.Name}, id: {Thread.CurrentThread.ManagedThreadId}");
});
// Add some entries to the q
for (int i = 0; i < 10000; i++)
{
q.Enque(i);
}
Thread.Sleep(5000); // Give the q time to work
q.Shutdown();
}
QueueWithMultipleConsumerThreads:
public class QueueWithMultipleConsumerThreads<T>
{
private readonly ConcurrentBag<Thread> threads = new ConcurrentBag<Thread>();
private readonly ConcurrentBag<Worker<T>> workers = new ConcurrentBag<Worker<T>>();
private readonly BlockingCollection<T> queue = new BlockingCollection<T>();
public QueueWithMultipleConsumerThreads(uint numberOfWorkerThreads, Action<T> actionToBeCalled )
{
if (numberOfWorkerThreads == 0) { throw new ArgumentException($"{nameof(numberOfWorkerThreads)} must be > 0"); }
if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
for (var i = 0; i < numberOfWorkerThreads; i++)
{
// Create a worker and assign it to a thread
var threadName = $"Worker thread {i}";
var logger = LogManager.GetLogger(threadName);
var w = new Worker<T>(this.queue, threadName, actionToBeCalled, logger);
var t = new Thread(w.DoWork) { IsBackground = true, Name = threadName};
this.workers.Add(w);
this.threads.Add(t);
t.Start();
}
}
public void Enque(T item)
{
this.queue.Add(item);
}
public int Count()
{
return this.queue.Count;
}
public void Shutdown()
{
while (!this.workers.IsEmpty)
{
Worker<T> w;
this.workers.TryTake(out w);
w?.RequestStop();
}
while (!this.threads.IsEmpty)
{
Thread t;
this.threads.TryTake(out t);
t?.Join(1000);
}
}
}
/// <summary>
/// A worker receives a collection to take elements from. After an element was successfully retrieved it will call <see cref="actionToBeCalled"/>.
/// Stopping the worker can be done via <see cref="RequestStop"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Worker<T>
{
public Worker(BlockingCollection<T> collection, string workerName, Action<T> actionToBeCalled, ILog logger)
{
if (collection == null) { throw new ArgumentNullException(nameof(collection));}
if (workerName == null) { throw new ArgumentNullException(nameof(workerName));}
if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
if (logger == null) { throw new ArgumentNullException(nameof(logger));}
this.collection = collection;
this.workerName = workerName;
this.actionToBeCalled = actionToBeCalled;
this.cancelationTokenSource = new CancellationTokenSource();
this.cancelationToken = this.cancelationTokenSource.Token;
this.logger = logger;
}
public void DoWork()
{
while (!this.shouldStop)
{
try
{
var item = this.collection.Take(this.cancelationToken); // Take should block, until an element was added.
this.actionToBeCalled?.Invoke(item);
}
catch (Exception exception)
{
this.logger.Warn($"[{this.workerName}]: Exception occurred: {exception}");
}
}
this.logger.Debug($"[{this.workerName}]: Shutdown gracefully");
}
public void RequestStop()
{
this.logger.Debug($"[{this.workerName}]: {nameof(this.RequestStop)}");
this.cancelationTokenSource.Cancel();
this.shouldStop = true;
}
// Volatile is used as hint to the compiler that this data member will be accessed by multiple threads.
private volatile bool shouldStop;
private readonly BlockingCollection<T> collection;
private readonly string workerName;
private readonly Action<T> actionToBeCalled;
private readonly CancellationToken cancelationToken;
private readonly CancellationTokenSource cancelationTokenSource;
private readonly ILog logger;
}