It was an interview question:
Should implement a class - BlockingQueue with those constraints:
- 2 Methods - Adding an item and Take an item from the queue.
- The class should support multiple consumers and multiple producers.
The class will getting a bounded size of queue and a maximum of producers and consumers at constructor:
public BlockingQueue(int boundedCapacity, int producers, int consumers)At method Take - if the queue is empty - should to wait till have least one item in the queue.
- At the method Insert - if the queue is full - should to wait till the queue was dequeued least one item.
I've written a queue class for supporting multiple producers and multiple consumers threads.
the class is getting bounded size and number of producers and consumers.
as following:
class BlockingQueue<T>
{
readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
readonly int _boundedCapacity;
readonly SemaphoreSlim _semaphoreSlimProducer;
readonly SemaphoreSlim _semaphoreSlimConsumer;
public BlockingQueue(int boundedCapacity, int producers, int consumers)
{
_boundedCapacity = boundedCapacity;
_semaphoreSlimProducer = new SemaphoreSlim(0, producers);
_semaphoreSlimConsumer = new SemaphoreSlim(consumers);
}
public void Insert(T item)
{
if(item == null) throw new ArgumentNullException("item");
while(Count >= _boundedCapacity)
{
_semaphoreSlimProducer.Wait(1000);
_semaphoreSlimConsumer.Release();
}
_semaphoreSlimProducer.Wait();
_queue.Enqueue(item);
_semaphoreSlimConsumer.Release();
}
public T Take()
{
while(Count == 0)
{
_semaphoreSlimConsumer.Wait(1000);
_semaphoreSlimProducer.Release();
}
_semaphoreSlimConsumer.Wait();
T retval = default(T);
_queue.TryDequeue(out retval);
_semaphoreSlimProducer.Release();
return retval;
}
public int Count
{
get
{
return _queue.Count;
}
}
}
Testing:
private static void TestBlockingQueue()
{
BlockingQueue<string> b = new BlockingQueue<string>(3, 3, 3);
ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();
for(int i = 0; i < 10; i++)
{
var t = Task.Factory.StartNew(() => {
string item = "Threadid:Thread.CurrentThread.ManagedThreadId_" + Thread.CurrentThread.ManagedThreadId + "_Count_";
int count = 0;
while(true)
{
item += count++;
b.Insert(item);
Console.WriteLine("insert item: " + item);
}
});
tasks.Add(t);
}
for(int i = 0; i < 10; i++)
{
var t = Task.Factory.StartNew(() => {
while(true)
{
var item = b.Take();
Console.WriteLine("taked item: " + item);
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
}
I will be glad for getting a code review for this implementation.