The CircularRingCircularRing pre-allocates LoggableEntityLoggableEntity elements and stores them in an plain arrayAtomicReferenceArray. Then, I have multiple producer threadsmultiple producer threads which will enqueue elements and single consumer threadsingle consumer thread which will dequeue elements in a bulk and write them to a storage device.
Create a lock free, thread-safe data structure which can store pre-allocated elements.
Producer should never blockswait to enqueue but rather wrap if the capacity is reached.
The relative order of elements within a producer (thread) must be respected. For example, if producer1
producer1and producer2producer2produces A, B, C and P, Q, R at the same time; any interspersing of the elements is fine as long as the relative order is maintained.Finally, when the consumer index is equal to the producer index; the consumer waits by busy spinning.
public final class LoggableEntity {
private long time;
private String message;
public static final LoggableEntity getInstance() {
return new LoggableEntity();
}
//Setters and Getters
}
public final class CircularRing {
private final int ringCapacity;
private final int ringCapacityIndex;
private final AtomicInteger consumerIndex;
private final AtomicInteger producerIndex;
private final AtomicReferenceArray<LoggableEntity> ringBuffer;
private final static int DEFAULT_BUFFER_CAPACITY = 1024;
private final static int DEFAULT_RING_CAPACITY = 16 * 1024;
public CircularRing( ) {
this( DEFAULT_RING_CAPACITY, DEFAULT_BUFFER_CAPACITY );
}
public CircularRing( int ringCapacity, int bufferCapacity ) {
this.ringCapacity = ringCapacity;
this.ringCapacityIndex = ringCapacity -1;
this.consumerIndex = new AtomicInteger( -1 );
this.producerIndex = new AtomicInteger( -1 );
this.ringBuffer = new AtomicReferenceArray<LoggableEntity>( ringCapacity );
for( int i =0; i< ringCapacity; i++ ) {
addLazily( i , LoggableEntity.getInstance() );
}
}
public final int getCurrentConsumerIndex() {
return consumerIndex.get();
}
public final int getNextConsumerIndex() {
return incrementModAndGet( consumerIndex );
}
private final int incrementModAndGet( AtomicInteger aInt ) {
if ( aInt.get() < ringCapacityIndex ) {
return aInt.incrementAndGet();
} else {
for (;;) {
int current = aInt.get();
int next = (current + 1) % ringCapacity;
if( aInt.compareAndSet(current, next) )
return next;
}
}
}
public final int getCurrentProducerIndex() {
return producerIndex.get();
}
public final int getNextProducerIndex() {
return incrementModAndGet( producerIndex );
}
public final LoggableEntity poll( int index ) {
return ringBuffer.get( index );
}
public final void addLazily( int index, LoggableEntity entity ) {
ringBuffer.lazySet( index, entity );
}
EditLogger
public final class CircularRing{
private final int ringCapacity;
private final AtomicInteger writerIndex;
private final AtomicInteger producerIndex;
private final LoggableEntity[] ringBuffer;
private final static int DEFAULT_RING_CAPACITY = SIXTY_FOUR * SIXTY_FOUR;
private final static Logger LOGGER = LoggerFactory.getLogger("Ring");
public CircularRing(CircularSmartLogger ){
this( DEFAULT_RING_CAPACITY );
}
public CircularRing( int ringCapacity ){
if( !powerOfTwo( ringCapacity ) ){
throw new IllegalArgumentException("Ring capacity [" + ringCapacity + "] must be a power of 2.");
}
this.ringCapacity = ringCapacity;
this.writerIndex = new AtomicInteger( NEGATIVE_ONE );
this.producerIndex = new AtomicInteger( NEGATIVE_ONE );
this.ringBuffer = newprivate LoggableEntity[volatile ringCapacityboolean ];keepLogging;
for( int i=ZERO; i<private ringCapacity;final i++CircularRing ){circularRing;
ringBuffer[ i ] =private newfinal LoggableEntity(ExecutorService );executor;
private final BackgroundCircularLogger }backLogger;
public CircularSmartLogger( int bulkSize ){
LOGGER this.infocircularRing = new CircularRing("Successfully);
created Ring with a capacity of {} this.",backLogger ringCapacity = new BackgroundCircularLogger( bulkSize );
this.executor = Executors.newCachedThreadPool();
}
protectedpublic final intvoid getWriterCountinit() {
returnkeepLogging writerIndex= true;
executor.getexecute( backLogger );
}
protectedpublic final LoggableEntity poll( int indexgetNextProducerIndex( ) {
return ringBuffer[ index ];circularRing.getNextProducerIndex();
}
public final intLoggableEntity getRingCapacitypoll( int index ) {
return ringCapacity;circularRing.poll( index );
}
public final LoggableEntityvoid pollNextaddLazily(){
int index, for(LoggableEntity ;;data ) {
int current = producerIndexcircularRing.get();
int next = current + ONE;
ifaddLazily( producerIndex.compareAndSet(currentindex, next) ){
next = (next %data ringCapacity);
return ringBuffer[ next ];
}
}
}
public final void offer( LoggableEntity entity ){
for( ;; ){
int current = writerIndex.get();
int next = current + ONE;
public final if(void writerIndex.compareAndSetstop(current, next) ){
next = (next % ringCapacity);
ringBuffer[ next ] keepLogging = entity;
return;
}
}false;
}
}
public final class RingLogger{
private volatile boolean keepLogging;
private final CircularRing eRing;
private final BackgroundLogger backLogger;
class privateBackgroundCircularLogger finalimplements ExecutorServiceRunnable executor;{
private final static int DEFAULT_BLOCK_SIZE = THIRTY_TWO * SIXTY_FOUR;pIndex;
private final static Logger LOGGER = LoggerFactory.getLogger( "RingLogger" );
public RingLogger( int ringSize, LogWriter writer ) throws IOException{
this( DEFAULT_BLOCK_SIZE, ringSize, writer, new DefaultMessageFormatter() );
}
public RingLogger( int blockSize, int ringSize, LogWriter writer ) throws IOException{
this( blockSize, ringSize, writer, new DefaultMessageFormatter() );
}
public RingLogger( int blockSize, int ringSize, LogWriter writer, MessageFormatter formatter ) throws IOException{
this.eRing = new CircularRing( ringSize );
this.backLogger = new BackgroundLogger( formatter, writer );
this.executor = Executors.newCachedThreadPool( );
}
public final void init(){
keepLogging = true;
executor.execute( backLogger );
LOGGER.info("Successfully initialized logger.");
}
public final LoggableEntity pollNext( ){
return eRing.pollNext( );
}
public final void offer( LoggableEntity data ){
eRing.offer( data );
}
cIndex;
publicprivate final void stop(){
try {
LOGGER.info("Received request to stop.");
executor.shutdown();
executor.awaitTermination( TWO, TimeUnit.SECONDS );
keepLogging =int false;bulkSize;
public }catchBackgroundCircularLogger( Exceptionint ebulkSize ) {
this.pIndex LOGGER.warn("Exception while= stopping-1;
logger.", e);
this.cIndex } = -1;
this.bulkSize = bulkSize;
}
private final class BackgroundLogger implements Runnable{
private int ringWriterCount = NEGATIVE_ONE;
private int logWriterCount = NEGATIVE_ONE;
private final int blockSize;
private final int ringCapacity;
private final LogWriter writer;
private final MessageFormatter formatter;@Override
public privatevoid finalrun( ReusableStringBuilder) builder;{
public BackgroundLoggerwhile( MessageFormatter formatter, LogWriter writerkeepLogging ) {
this.writer = writer;
this.formatter while ( (cIndex = formatter;
thiscircularRing.blockSize getCurrentConsumerIndex()) == (pIndex = writercircularRing.getBlockSizegetCurrentProducerIndex();
this.builder = new) ReusableStringBuilder(); {
thisLockSupport.ringCapacity parkNanos( =1L eRing.getRingCapacity();
}
int items = pIndex - cIndex;
items = ( items > bulkSize ) ? bulkSize : items;
public void run(){
while( keepLogging ){
while ( keepLogging && ( logWriterCount == (ringWriterCount = circularRing.getWriterCount()) ) ){
for LockSupport.parkNanos( ONE );
}
int entityCount = 0;
i while(=0; (entityCounti < blockSize) && (logWriterCountitems; <i++ ringWriterCount) ){
int writerIndex = logWriterCount + ONE;
writerIndex = (writerIndex >= ringCapacity) ? (writerIndex % ringCapacity) : writerIndex;
nextIdx LoggableEntity data = circularRing.pollgetNextConsumerIndex( writerIndex );
formatter.formatMessage( builder,LoggableEntity data );
++logWriterCount;
++entityCount;
}
= writercircularRing.writeBufferedpoll( builder );
nextIdx builder.wipeout();
}
LOGGER.info("Background Logger thread successfully stopped.");
}
}
}
public final class LoggableEntity{
private Level level;
private String msg;
public void debug( String msg ){
populateLog( DEBUG, name );
}
public void info( String name, String msg ){
populateLog( INFO, name );
}
private final void populateLog( Level level, String msg ){
this.level = level;
this.msg = msg;
}
public final Level getLevel( ){
return level;
}
public final String getMessage( ){
return msg;
}
}
public class Tester{
public static void main( String[] args ) throws Exception{
int iteration = 16;
int blockSize = 8;
String TEMPLATE //Write attributes of ="data" "Thisit isto a simulated dummy log message number ";
LogWriter writer = new DefaultLogWriter( blockSize, "Test.log");
RingLogger logger = new RingLogger( iteration, writer );
logger.init();
for( int i =0; i < iteration; i++ ){
LoggableEntity data = logger.pollNext(Storage );Device
data.debug( NAME, TEMPLATE + i} );
logger.offer( data );
}
}
} logger.stop();
}
}Tester
Any comments, pointers etc are appreciated. Thank you and happy holidays.
public class CircularRingTester {
public static void main( String[] args ) {
CircularSmartLogger logger = new CircularSmartLogger( 50 );
logger.init();
int next = logger.getNextProducerIndex();
LoggableEntity data = logger.poll( next );
data.setTime( System.currentTimeMillis() );
data.setMessage("This is a simulated message.");
logger.addLazily( next, data );
}
}
-Cheers Any comments, pointers etc are appreciated.