Intro
This time, I was in the mood for concurrent programing (ProducerConsumerSimulation.java). To this end, I have ended up with the code below. My primary concerns are:
- Class design is trash,
- The code contains concurrency bugs.
Code
io.github.coderodde.simulation.producerconsumer.AbstractQueueListener.java:
package io.github.coderodde.simulation.producerconsumer;
import java.util.Objects;
/**
* This abstract class defines the API for queue listeners.
*
* @param <E> the queue element type.
* @param <R> the result element type.
* @version 1.0.0
* @since 1.0.0
*/
public abstract class AbstractQueueListener<E, R> {
/**
* The target queue.
*/
protected final BoundedConcurrentQueue<E, R> queue;
/**
* The action to perform on each queue element popped in order to produce a
* result.
*/
protected final ConsumerAction<E, R> action;
/**
* Constructs this abstract queue listener.
*
* @param queue the target queue to listen for.
* @param action the action to perform on the queue elements.
*/
public AbstractQueueListener(final BoundedConcurrentQueue<E, R> queue,
final ConsumerAction<E, R> action) {
this.queue = Objects.requireNonNull(queue, "Input queue is null");
this.action = Objects.requireNonNull(action, "Input action is null");
}
/**
* This callback is called on each push operation to the queue.
*
* @param thread the target thread pushing to the queue.
* @param element the element to push to queue.
*/
public abstract void onPush(final AbstractSimulationThread<E, R> thread,
final E element);
/**
* This callback is called on each pop operation from the queue.
*
* @param thread the target thread popping from the queue.
* @param element the element popped from the queue.
*/
public abstract void onPop(final ConsumerThread<E, R> thread,
final E element);
}
io.github.coderodde.simulation.producerconsumer.AbstractSimulationThread.java:
package io.github.coderodde.simulation.producerconsumer;
import java.util.Objects;
/**
* This class implements an abstract base class for producer/consumer threads.
*
* @param <E> the queue element type.
* @param <R> the queue result element type.
* @version 1.0.0
* @since 1.0.0
*/
public abstract class AbstractSimulationThread<E, R> extends Thread {
/**
* The element that is there for signalling that a consumer thread must
* halt.
*/
protected final E haltingElement;
/**
* The queue this thread works on.
*/
protected final BoundedConcurrentQueue<E, R> queue;
/**
* The ID of this abstract thread.
*/
protected final int threadId;
/**
* The queue action.
*/
protected final ConsumerAction<E, R> action;
/**
* The shared producer thread state used for requesting the exit.
*/
protected final SharedProducerThreadState sharedState;
/**
* Constructs this thread.
*
* @param id the ID of this thread.
* @param haltingElement the halting element.
* @param queue the target queue.
* @param sharedState the shared state.
* @param action the action.
*/
public AbstractSimulationThread(
final int id,
final E haltingElement,
final BoundedConcurrentQueue<E, R> queue,
final SharedProducerThreadState sharedState,
final ConsumerAction<E, R> action) {
this.threadId = id;
this.haltingElement =
Objects.requireNonNull(
haltingElement,
"The input haltingElement is null");
this.queue = Objects.requireNonNull(queue, "The input queue is null");
this.sharedState = sharedState;
this.action = Objects.requireNonNull(action, "action is null");
}
/**
* Returns the ID of this thread.
*
* @return the ID of this thread.
*/
public int getThreadId() {
return threadId;
}
}
io.github.coderodde.simulation.producerconsumer.BoundedConcurrentQueue.java:
package io.github.coderodde.simulation.producerconsumer;
import java.util.concurrent.Semaphore;
/**
* This class implements a bounded concurrent queue.
*
* @param <E> the queue element type.
* @param <R> the result element type.
* @version 1.0.0
* @since 1.0.0
*/
public final class BoundedConcurrentQueue<E, R> {
/**
* The semaphore for protecting against pops on empty queue.
*/
private final Semaphore semaphoreFreeSpots;
/**
* The semaphore for protecting against pushes to a full queue.
*/
private final Semaphore semaphoreFillSpots;
/**
* A mutex for synchronizing the operations on this queue.
*/
private final Semaphore mutex;
/**
* The array actually holding the elements in this queue.
*/
private final E[] array;
/**
* The index of the head element that is the next one that will be removed
* with {@link #pop()}.
*/
private int headIndex = 0;
/**
* The number of elements stored in this queue.
*/
private int size = 0;
/**
* Possible queue notifier object.
*/
private AbstractQueueListener<E, R> queueListener;
/**
* Constructs this bounded concurrent queue.
*
* @param capacity the maximum size of the queue.
*/
public BoundedConcurrentQueue(final int capacity) {
checkCapacity(capacity);
semaphoreFreeSpots = new Semaphore(0, true);
semaphoreFillSpots = new Semaphore(capacity, true);
mutex = new Semaphore(1, true);
array = (E[]) new Object[capacity];
}
/**
* Pushes the {@code element} to the end of this queue.
*
* @param element the element to push.
* @param thread the thread pushing.
*/
public void push(final E element,
final AbstractSimulationThread<E, R> thread) {
semaphoreFreeSpots.release();
semaphoreFillSpots.acquireUninterruptibly();
mutex.acquireUninterruptibly();
array[logicalIndexToPhysicalIndex(size++)] = element;
if (queueListener != null) {
queueListener.onPush(thread,
element);
}
mutex.release();
}
/**
* Pops the head element from this queue.
*
* @param thread the thread popping.
*
* @return the popped element.
*/
public E pop(final ConsumerThread<E, R> thread) {
semaphoreFillSpots.release();
semaphoreFreeSpots.acquireUninterruptibly();
mutex.acquireUninterruptibly();
final E element = array[headIndex];
headIndex = (headIndex + 1) % array.length;
--size;
if (queueListener != null) {
queueListener.onPop(thread,
element);
}
mutex.release();
return element;
}
/**
* Peeks at the head element of this queue.
*
* @return the head element.
*/
public E top() {
semaphoreFreeSpots.acquireUninterruptibly();
mutex.acquireUninterruptibly();
final E topElement = array[headIndex];
mutex.release();
semaphoreFreeSpots.release();
return topElement;
}
/**
* Return the length of the queue.
*
* @return the length of the queue.
*/
public int size() {
mutex.acquireUninterruptibly();
final int sz = size;
mutex.release();
return sz;
}
/**
* Return the textual representation of this queue.
*
* @return the textual representation.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("[");
boolean firstIteration = true;
for (int index = 0; index < size; ++index) {
if (firstIteration) {
firstIteration = false;
} else {
sb.append(", ");
}
sb.append(array[logicalIndexToPhysicalIndex(index)]);
}
return sb.append("]").toString();
}
/**
* Sets the queue notifier.
*
* @param queueListener the queue listener to set.
*/
public void setQueueNotifier(
final AbstractQueueListener<E, R> queueListener) {
this.queueListener = queueListener;
}
/**
* Converts logical index of an element to physical.
*
* @param index the logical index of an element.
*
* @return the physical index of an element.
*/
private int logicalIndexToPhysicalIndex(final int index) {
return (headIndex + index) % array.length;
}
/**
* Checks that the input capacity is sensible (at least 1).
*/
private static void checkCapacity(final int capacity) {
if (capacity < 1) {
final String exceptionMessage =
String.format("capacity(%d) < 1", capacity);
throw new IllegalArgumentException(exceptionMessage);
}
}
}
io.github.coderodde.simulation.producerconsumer.ConsumerAction.java:
package io.github.coderodde.simulation.producerconsumer;
import java.util.function.Function;
/**
* This interface merely renames the function of type {@code E -> R}.
*
* @param <E> the argument element type.
* @param <R> the result element type.
* @version 1.0.0
* @since 1.0.0
*/
public interface ConsumerAction<E, R> extends Function<E, R> {
}
io.github.coderodde.simulation.producerconsumer.ConsumerThread.java:
package io.github.coderodde.simulation.producerconsumer;
/**
* This class implements a generic consumer thread.
*
* @param <E> the element type.
* @param <R> the result type.
* @version 1.0.0
* @since 1.0.0
*/
public final class ConsumerThread<E, R> extends AbstractSimulationThread<E, R> {
/**
* Constructs this consumer thread.
*
* @param id the ID of this consumer thread.
* @param haltingElement the element indicating this thread should exit.
* @param queue the target queue.
* @param action the pop action.
*/
public ConsumerThread(final int id,
final E haltingElement,
final BoundedConcurrentQueue<E, R> queue,
final ConsumerAction<E, R> action) {
super(id,
haltingElement,
queue,
null,
action);
}
/**
* Enters to thread.
*/
@Override
public void run() {
while (true) {
E element = queue.top();
if (element.equals(haltingElement)) {
System.out.printf("[STATUS] Consumer %d exited.\n", threadId);
return;
}
queue.pop(this);
}
}
/**
* Returns the name of this thread.
*
* @return the name of this thread.
*/
@Override
public String toString() {
return "Consumer thread";
}
}
io.github.coderodde.simulation.producerconsumer.ElementProvider.java:
package io.github.coderodde.simulation.producerconsumer;
/**
* This interface defines the API for element providers.
*
* @param <E> the element type.
* @version 1.0.0
* @since 1.0.0
*/
public interface ElementProvider<E> {
/**
* Provides an element to push to a queue.
*
* @return an element to push.
*/
public E provide();
/**
* Returns the halting element.
*
* @return the halting element.
*/
public E getHaltingElement();
}
io.github.coderodde.simulation.producerconsumer.ProducerThread.java:
package io.github.coderodde.simulation.producerconsumer;
import java.util.Objects;
/**
* This class implements producer threads.
*
* @param <E> the element type.
* @param <R> the result type.
* @version 1.0.0
* @since 1.0.0
*/
public final class ProducerThread<E, R> extends AbstractSimulationThread<E, R> {
/**
* The element provider.
*/
private final ElementProvider<E> elementProvider;
/**
* Constructs this producer thread.
*
* @param id the ID of this producer thread.
* @param elementProvider the element provider.
* @param queue the target queue.
* @param sharedState the shared state.
* @param action the action.
*/
public ProducerThread(final int id,
final ElementProvider<E> elementProvider,
final BoundedConcurrentQueue<E, R> queue,
final SharedProducerThreadState sharedState,
final ConsumerAction<E, R> action) {
super(id,
elementProvider.getHaltingElement(),
queue,
sharedState,
action);
this.elementProvider = elementProvider;
}
/**
* Runs the simulation.
*/
@Override
public void run() {
while (true) {
if (sharedState.isHaltRequested()) {
System.out.printf("[STATUS] Producer %d exited.\n", threadId);
return;
}
final E element = elementProvider.provide();
if (Objects.equals(element, haltingElement)) {
sharedState.requestHalt();
queue.push(haltingElement, this);
return;
}
queue.push(element, this);
}
}
/**
* Returns the name of this thread.
*
* @return the name of this thread.
*/
@Override
public String toString() {
return "Producer thread";
}
}
io.github.coderodde.simulation.producerconsumer.SharedProducerThreadState.java:
package io.github.coderodde.simulation.producerconsumer;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class implements a shared state consisting of a flag indicating whether
* the producer threads must exit.
*
* @version 1.0.0
* @since 1.0.0
*/
public final class SharedProducerThreadState {
/**
* Indicates whether the halt is requested. Is requested if set to
* {@code true}.
*/
private final AtomicBoolean haltRequestedFlag = new AtomicBoolean(false);
/**
* Return a boolean indicating whether the producer threads sharing this
* state must halt or not.
*
* @return {@code true} if the producer threads must halt. {@code false}
* otherwise.
*/
public boolean isHaltRequested() {
return haltRequestedFlag.get();
}
/**
* Request the halt status.
*/
public void requestHalt() {
haltRequestedFlag.set(true);
}
}
io.github.coderodde.simulation.producerconsumer.Simulator.java:
package io.github.coderodde.simulation.producerconsumer;
import java.util.ArrayList;
import java.util.List;
/**
* This class implements a simulator that orchestrates the threads.
*
* @param <E> the queue element type.
* @param <R> the result element type.
* @version 1.0.0
* @since 1.0.0
*/
public final class Simulator<E, R> {
/**
* The number of producer threads.
*/
private final int numberOfProducerThreads;
/**
* The number of consumer threads.
*/
private final int numberOfConsumerThreads;
/**
* The target queue.
*/
private BoundedConcurrentQueue<E, R> queue;
/**
* The element provider.
*/
private ElementProvider<E> elementProvider;
/**
* The queue notifier.
*/
private AbstractQueueListener<E, R> queueListener;
/**
* The action performed on each element popped from {@link #queue}.
*/
private ConsumerAction<E, R> action;
/**
* Constructs this simulator.
*
* @param numberOfProducerThreads the number of producers.
* @param numberOfConsumerThreads the number of consumers.
*/
public Simulator(final int numberOfProducerThreads,
final int numberOfConsumerThreads) {
this.numberOfProducerThreads =
checkNumberOfProducerThreads(numberOfProducerThreads);
this.numberOfConsumerThreads =
checkNumberOfConsumerThreads(numberOfConsumerThreads);
}
/**
* Sets the queue.
*
* @param queue the target queue to set.
*/
public void setQueue(final BoundedConcurrentQueue<E, R> queue) {
this.queue = queue;
}
/**
* Sets the element provider.
*
* @param elementProducer the element provider to set.
*/
public void setElementProvider(final ElementProvider<E> elementProducer) {
this.elementProvider = elementProducer;
}
/**
* Sets the queue notifier.
*
* @param queueListener the queue notifier to set.
*/
public void setQueueListener(
final AbstractQueueListener<E, R> queueListener) {
this.queueListener = queueListener;
}
/**
* Sets the consumer action.
*
* @param action the action to set.
*/
public void setAction(final ConsumerAction<E, R> action) {
this.action = action;
}
/**
* Runs the actual simulation.
*/
public void run() {
queue.setQueueNotifier(queueListener);
final List<ConsumerThread<E, R>> consumerThreadList;
final List<ProducerThread<E, R>> producerThreadList;
final SharedProducerThreadState sharedState =
new SharedProducerThreadState();
consumerThreadList = new ArrayList<>(numberOfConsumerThreads);
producerThreadList = new ArrayList<>(numberOfProducerThreads);
for (int i = 0; i < numberOfConsumerThreads; ++i) {
final ConsumerThread<E, R> thread =
new ConsumerThread<>(i,
elementProvider.getHaltingElement(),
queue,
action);
consumerThreadList.add(thread);
thread.start();
}
for (int i = 0; i < numberOfProducerThreads; ++i) {
final ProducerThread<E, R> thread =
new ProducerThread<>(i,
elementProvider,
queue,
sharedState,
action);
producerThreadList.add(thread);
thread.start();
}
for (final ConsumerThread<E, R> thread
: consumerThreadList) {
try {
thread.join();
} catch (final InterruptedException ex) {
ex.printStackTrace();
System.exit(1);
}
}
for (final ProducerThread<E, R> thread
: producerThreadList) {
try {
thread.join();
} catch (final InterruptedException ex) {
ex.printStackTrace();
System.exit(1);
}
}
System.out.println("[STATUS] Simulation done!");
}
/**
* Checks that there is at least one producer thread requested.
*
* @param numberOfProducerThreads the number of producer threads to create.
*
* @return the same as input.
*/
private static int
checkNumberOfProducerThreads(final int numberOfProducerThreads) {
if (numberOfProducerThreads < 1) {
final String exceptionMessage =
String.format(
"numberOfProducerThreads(%d) < 1",
numberOfProducerThreads);
throw new IllegalArgumentException(exceptionMessage);
}
return numberOfProducerThreads;
}
/**
* Checks that there is at least one consumer thread requested.
*
* @param numberOfConsumerThreads the number of consumer threads to create.
*
* @return the same as input.
*/
private static int
checkNumberOfConsumerThreads(final int numberOfConsumerThreads) {
if (numberOfConsumerThreads < 1) {
final String exceptionMessage =
String.format(
"numberOfConsumerThreads(%d) < 1",
numberOfConsumerThreads);
throw new IllegalArgumentException(exceptionMessage);
}
return numberOfConsumerThreads;
}
}
io.github.coderodde.simulation.producerconsumer.demo.ProducerConsumerSimulationDemo.java:
package io.github.coderodde.simulation.producerconsumer.demo;
import io.github.coderodde.simulation.producerconsumer.BoundedConcurrentQueue;
import io.github.coderodde.simulation.producerconsumer.Simulator;
import io.github.coderodde.simulation.producerconsumer.impl.FibonacciConsumerAction;
import io.github.coderodde.simulation.producerconsumer.impl.LongQueueListener;
import io.github.coderodde.simulation.producerconsumer.impl.LongElementProvider;
import java.math.BigInteger;
/**
* This class implements the producer/consumer simulation demonstration.
*
* @version 1.0.0
* @since 1.0.0
*/
public final class ProducerConsumerSimulationDemo {
private static final int DEFAULT_NUMBER_OF_PRODUCERS = 3;
private static final int DEFAULT_NUMBER_OF_CONSUMERS = 5;
private static final int DEFAULT_QUEUE_CAPACITY = 6;
private static final long DEFAULT_HALTING_ELEMENT = -1;
private static final long LONG_BOUND = 31;
private static final long TOTAL_OPERATIONS = 20;
public static void main(String[] args) {
final LongElementProvider elementProvider =
new LongElementProvider(LONG_BOUND,
TOTAL_OPERATIONS,
DEFAULT_HALTING_ELEMENT);
final FibonacciConsumerAction action = new FibonacciConsumerAction();
final BoundedConcurrentQueue<Long, BigInteger> queue =
new BoundedConcurrentQueue<>(DEFAULT_QUEUE_CAPACITY);
final LongQueueListener queueNotifier =
new LongQueueListener(queue,
action,
DEFAULT_NUMBER_OF_CONSUMERS,
DEFAULT_NUMBER_OF_PRODUCERS,
DEFAULT_HALTING_ELEMENT);
final Simulator<Long, BigInteger> simulator =
new Simulator<>(DEFAULT_NUMBER_OF_PRODUCERS,
DEFAULT_NUMBER_OF_CONSUMERS);
simulator.setQueue(queue);
simulator.setElementProvider(elementProvider);
simulator.setQueueListener(queueNotifier);
simulator.setAction(action);
simulator.run();
}
}
io.github.coderodde.simulation.producerconsumer.impl.FibonacciConsumerAction.java:
package io.github.coderodde.simulation.producerconsumer.impl;
import io.github.coderodde.simulation.producerconsumer.ConsumerAction;
import java.math.BigInteger;
/**
* This class implements a consumer action that computes Fibonacci numbers.
*
* @version 1.0.0
* @since 1.0.0
*/
public final class FibonacciConsumerAction
implements ConsumerAction<Long, BigInteger> {
/**
* Applies the argument.
*
* @param arg the argument to apply.
*
* @return {@code arg}th Fibonacci number.
*/
@Override
public BigInteger apply(final Long arg) {
return fibonacci(arg);
}
/**
* Computes {@code index}th Fibonacci number.
*
* @param index the index of the number. Starts from zero (0).
*
* @return {@code index}th Fibonacci number.
*/
private static BigInteger fibonacci(final long index) {
BigInteger a = BigInteger.ZERO;
BigInteger b = BigInteger.ONE;
for (int i = 0; i < index; ++i) {
BigInteger c = a.add(b);
a = b;
b = c;
}
return a;
}
}
io.github.coderodde.simulation.producerconsumer.impl.LongElementProvider.java:
package io.github.coderodde.simulation.producerconsumer.impl;
import io.github.coderodde.simulation.producerconsumer.ElementProvider;
import java.util.concurrent.ThreadLocalRandom;
/**
* This class implements an element provider.
*
* @version 1.0.0
* @since 1.0.0
*/
public final class LongElementProvider implements ElementProvider<Long> {
/**
* Maximum element value.
*/
private final long bound;
/**
* The number of elements to provide before requesting halting.
*/
private final long totalNumberOfOperations;
/**
* The number of elements provided so far.
*/
private long numberOfOperationsPerformed = 0;
/**
* The halting element indicating that consumer threads reading this from
* the queue must exit.
*/
private final long haltingElement;
/**
* Constructs this element provider.
*
* @param bound the bound value (maximum).
* @param totalNumberOfOperations the total number of elements to provide.
* @param haltingElement the halting element.
*/
public LongElementProvider(final long bound,
final long totalNumberOfOperations,
final long haltingElement) {
this.bound = bound;
this.totalNumberOfOperations = totalNumberOfOperations;
this.haltingElement = haltingElement;
}
/**
* Returns the halting element.
*
* @return the halting element.
*/
@Override
public Long getHaltingElement() {
return haltingElement;
}
/**
* Provides a randomly chosen element.
*
* @return a randomly chose element.
*/
@Override
public Long provide() {
if (numberOfOperationsPerformed++ < totalNumberOfOperations) {
return ThreadLocalRandom.current().nextLong(bound);
} else {
return haltingElement;
}
}
}
io.github.coderodde.simulation.producerconsumer.LongQueueListener.java:
package io.github.coderodde.simulation.producerconsumer.impl;
import io.github.coderodde.simulation.producerconsumer.AbstractQueueListener;
import io.github.coderodde.simulation.producerconsumer.AbstractSimulationThread;
import io.github.coderodde.simulation.producerconsumer.BoundedConcurrentQueue;
import io.github.coderodde.simulation.producerconsumer.ConsumerThread;
import java.math.BigInteger;
/**
* This class implements a queue listener.
*
* @version 1.0.0
* @since 1.0.0
*/
public final class LongQueueListener
extends AbstractQueueListener<Long, BigInteger> {
/**
* Push message format.
*/
private final String pushFormat;
/**
* Pop message format.
*/
private final String popFormat;
/**
* Constructs this queue notifier.
*
* @param queue the target queue.
* @param action the consumer action.
* @param consumerCount the number of consumers.
* @param producerCount the number of producers.
* @param haltingElement the halting element.
*/
public LongQueueListener(
final BoundedConcurrentQueue<Long, BigInteger> queue,
final FibonacciConsumerAction action,
final int consumerCount,
final int producerCount,
final long haltingElement) {
super(queue, action);
this.pushFormat = "%s %"
+ Integer.toString(producerCount).length()
+ "d produced %"
+ Long.toString(haltingElement).length()
+ "d: %s\n";
this.popFormat = "%s %"
+ Integer.toString(consumerCount).length()
+ "d consumed %"
+ Long.toString(haltingElement).length()
+ "d: %s: Result = %s\n";
}
/**
* {@inheritDoc }
*/
@Override
public void onPush(
final AbstractSimulationThread<Long, BigInteger> thread,
final Long element) {
System.out.printf(pushFormat,
thread.toString(),
thread.getThreadId(),
element,
queue);
}
/**
* {@inheritDoc }
*/
@Override
public void onPop(final ConsumerThread<Long, BigInteger> thread,
final Long element) {
System.out.printf(popFormat,
thread.toString(),
thread.getThreadId(),
element,
queue,
action == null ? "null" : action.apply(element));
}
}
Typical output
Producer thread 1 produced 9: [9]
Producer thread 0 produced 30: [9, 30]
Producer thread 2 produced 0: [9, 30, 0]
Producer thread 1 produced 6: [9, 30, 0, 6]
Producer thread 0 produced 7: [9, 30, 0, 6, 7]
Producer thread 2 produced 22: [9, 30, 0, 6, 7, 22]
Consumer thread 0 consumed 9: [30, 0, 6, 7, 22]: Result = 34
Consumer thread 1 consumed 30: [0, 6, 7, 22]: Result = 832040
Consumer thread 2 consumed 0: [6, 7, 22]: Result = 0
Producer thread 1 produced 16: [6, 7, 22, 16]
Consumer thread 4 consumed 6: [7, 22, 16]: Result = 8
Producer thread 0 produced 9: [7, 22, 16, 9]
Consumer thread 3 consumed 7: [22, 16, 9]: Result = 13
Producer thread 2 produced 24: [22, 16, 9, 24]
Producer thread 1 produced 5: [22, 16, 9, 24, 5]
Producer thread 0 produced 7: [22, 16, 9, 24, 5, 7]
Consumer thread 0 consumed 22: [16, 9, 24, 5, 7]: Result = 17711
Producer thread 2 produced 27: [16, 9, 24, 5, 7, 27]
Consumer thread 1 consumed 16: [9, 24, 5, 7, 27]: Result = 987
Consumer thread 2 consumed 9: [24, 5, 7, 27]: Result = 34
Producer thread 1 produced 20: [24, 5, 7, 27, 20]
Consumer thread 4 consumed 24: [5, 7, 27, 20]: Result = 46368
Producer thread 0 produced 11: [5, 7, 27, 20, 11]
Consumer thread 3 consumed 5: [7, 27, 20, 11]: Result = 5
Producer thread 2 produced 26: [7, 27, 20, 11, 26]
Producer thread 1 produced 1: [7, 27, 20, 11, 26, 1]
Consumer thread 0 consumed 7: [27, 20, 11, 26, 1]: Result = 13
Producer thread 0 produced 6: [27, 20, 11, 26, 1, 6]
Consumer thread 1 consumed 27: [20, 11, 26, 1, 6]: Result = 196418
Producer thread 2 produced 20: [20, 11, 26, 1, 6, 20]
Consumer thread 2 consumed 20: [11, 26, 1, 6, 20]: Result = 6765
Producer thread 1 produced 29: [11, 26, 1, 6, 20, 29]
[STATUS] Producer 1 exited.
Consumer thread 4 consumed 11: [26, 1, 6, 20, 29]: Result = 89
Consumer thread 3 consumed 26: [1, 6, 20, 29]: Result = 121393
Producer thread 0 produced 30: [1, 6, 20, 29, 30]
[STATUS] Producer 0 exited.
Producer thread 2 produced -1: [1, 6, 20, 29, 30, -1]
Consumer thread 0 consumed 1: [6, 20, 29, 30, -1]: Result = 1
Consumer thread 1 consumed 6: [20, 29, 30, -1]: Result = 8
Consumer thread 2 consumed 20: [29, 30, -1]: Result = 6765
Consumer thread 4 consumed 29: [30, -1]: Result = 514229
Consumer thread 3 consumed 30: [-1]: Result = 832040
[STATUS] Consumer 0 exited.
[STATUS] Consumer 3 exited.
[STATUS] Consumer 4 exited.
[STATUS] Consumer 1 exited.
[STATUS] Consumer 2 exited.
[STATUS] Simulation done!
Critique request
Please, tell me anything that comes to mind. Thanks in advance! :-)