I want to code the following scenario:
- One Producer: produces n (n=100 here) objects
- m Consumers (m = 5 here): consumes k (k = 10 here) objects at a time in a round robin fashion; Consumer 1 first consumes the first 10 objects followed by Consumer 2 consuming another 10 objects and so on. This means that consumer 1 has to wait until consumer 5 is done consuming 10 objects.
I have written the code and it is working, but I feel like it is sub-optimal, might have bugs and has room for improvement.
public static void main(String[] args) {
ConsumerProducer cp = new ConsumerProducer();
cp.StartconsumerProducer();
}
// Creates Threads and waits for them to finish till timeout
public class ConsumerProducer {
public void StartconsumerProducer() {
ConsumerProducerMonitor mon = new ConsumerProducerMonitor();
List threads = new ArrayList();
// Create a producer
Thread p1 = new Thread(new Producer(mon), "P1");
p1.start();
// Create consumer 1
Thread c1 = new Thread(new Consumer(mon), "C1");
c1.start();
// Create consumer 2
Thread c2 = new Thread(new Consumer(mon), "C2");
c2.start();
// Create consumer 3
Thread c3 = new Thread(new Consumer(mon), "C3");
c3.start();
threads.add(p1);
threads.add(c1);
threads.add(c2);
threads.add(c3);
for (int i = 0; i < threads.size(); ++i) {
try {
((Thread)threads.get(i)).join(20000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
// Producer class
public class Producer implements Runnable {
ConsumerProducerMonitor mon;
Producer(ConsumerProducerMonitor mon) {
this.mon = mon;
}
@Override
public void run() {
// while (true) {
for (int i = 0; i < 100; ++i) {
mon.put(i, Thread.currentThread().getName());
}
}
}
// Consumer Class
public class Consumer implements Runnable {
Consumer(ConsumerProducerMonitor mon) {
this.mon = mon;
}
@Override
public void run() {
int ret = 0;
while (ret >= 0) {
ret = mon.get(Thread.currentThread().getName());
}
}
private final ConsumerProducerMonitor mon;
}
// Produces and consumes items in round robin fashion. Uses Semaphores for the same.
public class ConsumerProducerMonitor {
// produces items
public synchronized void put(int item, String threadName) {
if (isProduced) {
return;
}
this.item = item;
System.out.println("Producer " + threadName + " put Item: " + this.item);
if (this.item == 99) {
isProduced = true;
this.notifyAll();
}
}
private void consumeItems(String threadName) {
for (int i = 0; i < 10; ++i) {
if (this.item < 0) {
return;
}
this.item--;
}
System.out.println("Consumer " + threadName + " consumed Items from " + (this.item + 10) + ", to " + this.item);
if (!sem.tryAcquire()) {
System.out.println("Failed to aquire semaphore for consumer: " + threadName);
}
}
// consumes item
public synchronized int get(String threadName) {
if (!isProduced) {
try {
this.wait();
} catch (InterruptedException e) {
System.out.println("Caught Interrupted Exceptino while waiting to consume item: " + e.getMessage());
}
}
if (this.item < 0) {
sem.release(NUM_SEMAPHORES);
return this.item;
}
if (isConsuming) {
try {
this.wait();
isConsuming = true;
} catch (InterruptedException e) {
System.out.println("Caught Interrupted Exceptino while waiting to consume item: " + e.getMessage());
}
}
switch (sem.availablePermits()) {
case 1:
if (threadName.equals("C3")) {
consumeItems(threadName);
if (threadName.equals("C3")) {
sem.release(NUM_SEMAPHORES);
}
}
break;
case 2:
if (threadName.equals("C2")) {
consumeItems(threadName);
}
break;
case 3:
if (threadName.equals("C1")) {
consumeItems(threadName);
}
break;
default:
break;
}
isConsuming = false;
this.notifyAll();
return this.item;
}
private static int NUM_SEMAPHORES = 3;
private final Semaphore sem = new Semaphore(NUM_SEMAPHORES);
private boolean isProduced = false;
private boolean isConsuming = false;
int item;
}
Output:
Producer P1 put Item: 0 Producer P1 put Item: 1 Producer P1 put Item: 2 Producer P1 put Item: 3 Producer P1 put Item: 4 Producer P1 put Item: 5 ... Producer P1 put Item: 99 Consumer C1 consumed Items from 99, to 89 Consumer C2 consumed Items from 89, to 79 Consumer C3 consumed Items from 79, to 69 Consumer C1 consumed Items from 69, to 59 Consumer C2 consumed Items from 59, to 49 Consumer C3 consumed Items from 49, to 39 Consumer C1 consumed Items from 39, to 29 Consumer C2 consumed Items from 29, to 19 Consumer C3 consumed Items from 19, to 9 Consumer C1 consumed Items from 9, to -1