Skip to main content
3 of 5
Changed the impl to track write index as well.

Lock-Free Ring Implementation

I am wondering if someone can take a look at my lock-free, circular ring implementation which I use to implement a background logger.

The CircularRing pre-allocates LoggableEntity elements and stores them in an plain array. Then, I have multiple producer threads which will enqueue elements and single consumer thread which will dequeue elements in a bulk and write them to a storage device.

I track the index where the next produced item and the next consumed item should come from. Once the capacity is reached, the indices wrap.

My goals behind this implementations are the following:

  1. Create a lock free, thread-safe data structure which can store pre-allocated elements.

  2. Producer should never blocks to enqueue but rather wrap if the capacity is reached.

  3. The relative order of elements within a producer (thread) must be respected. For example, if producer1 and producer2 produces A, B, C and P, Q, R at the same time; any interspersing of the elements is fine as long as the relative order is maintained.

  4. Finally, when the consumer index is equal to the producer index; the consumer waits by busy spinning.

Edit

public final class CircularRing{
    
    private final int ringCapacity;
    private final AtomicInteger writerIndex;
    private final AtomicInteger producerIndex;
    private final LoggableEntity[] ringBuffer;
    
    private final static int DEFAULT_RING_CAPACITY  = SIXTY_FOUR * SIXTY_FOUR;
    private final static Logger LOGGER              = LoggerFactory.getLogger("Ring");

    public CircularRing( ){
        this( DEFAULT_RING_CAPACITY );
    }    
        
    
    public CircularRing( int ringCapacity ){
       
        if( !powerOfTwo( ringCapacity ) ){
            throw new IllegalArgumentException("Ring capacity [" + ringCapacity + "] must be a power of 2.");
        }
        
        this.ringCapacity     = ringCapacity;
        this.writerIndex      = new AtomicInteger( NEGATIVE_ONE );
        this.producerIndex    = new AtomicInteger( NEGATIVE_ONE );
        this.ringBuffer       = new LoggableEntity[ ringCapacity ];
    
        for( int i=ZERO; i< ringCapacity; i++ ){
            ringBuffer[ i ] = new LoggableEntity( );
        }
        
        LOGGER.info("Successfully created Ring with a capacity of {}.", ringCapacity );
    
    }
    
   
    protected final int getWriterCount(){
        return writerIndex.get();
    }
    
    
    protected final LoggableEntity poll( int index ){
        return ringBuffer[ index ];
    }
    
    
    public final int getRingCapacity(){
        return ringCapacity;
    }
    
    
    public final LoggableEntity pollNext(){
        for( ;; ){
            int current = producerIndex.get();
            int next    = current + ONE;
            
            if( producerIndex.compareAndSet(current, next) ){
                next    = (next % ringCapacity);    
                return ringBuffer[ next ];
            }
        }
    }
    
    
    public final void offer( LoggableEntity entity ){
        for( ;; ){
            int current = writerIndex.get();
            int next    = current + ONE;
           
            if( writerIndex.compareAndSet(current, next) ){
                next    = (next % ringCapacity);
                ringBuffer[ next ]  = entity;
                return;
            }
        }
    }
       
}

public final class RingLogger{
        
        private volatile boolean keepLogging;
        
        private final CircularRing eRing;
        private final BackgroundLogger backLogger;
        private final ExecutorService executor;
            
        private final static int DEFAULT_BLOCK_SIZE = THIRTY_TWO * SIXTY_FOUR;
        private final static Logger LOGGER          = LoggerFactory.getLogger( "RingLogger" ); 
        
        
        public RingLogger( int ringSize, LogWriter writer ) throws IOException{
            this( DEFAULT_BLOCK_SIZE, ringSize, writer, new DefaultMessageFormatter() );
        }
        
        public RingLogger( int blockSize, int ringSize, LogWriter writer ) throws IOException{
            this( blockSize, ringSize, writer, new DefaultMessageFormatter() );
        }
        
        
        public RingLogger( int blockSize, int ringSize, LogWriter writer, MessageFormatter formatter ) throws IOException{
            this.eRing          = new CircularRing( ringSize );
            this.backLogger     = new BackgroundLogger( formatter, writer );
            this.executor       = Executors.newCachedThreadPool( );
        }
        
        
        public final void init(){
            keepLogging = true;
            executor.execute( backLogger );
            
            LOGGER.info("Successfully initialized logger.");
        }
            
        
        public final LoggableEntity pollNext( ){
            return eRing.pollNext( );
        }
        
                
        public final void offer( LoggableEntity data ){
            eRing.offer( data );
        }
            
       
        public final void stop(){
            try {
                LOGGER.info("Received request to stop.");
                executor.shutdown();
                executor.awaitTermination( TWO, TimeUnit.SECONDS );
                keepLogging = false;
                
            }catch( Exception e ){
                LOGGER.warn("Exception while stopping logger.", e);
            }
        
        }
       
    
            
            private final class BackgroundLogger implements Runnable{
        
                private int ringWriterCount     = NEGATIVE_ONE;
                private int logWriterCount      = NEGATIVE_ONE;
                
                private final int blockSize;
                private final int ringCapacity;
                private final LogWriter writer;
                private final MessageFormatter formatter;
                private final ReusableStringBuilder builder;
                
                public BackgroundLogger( MessageFormatter formatter, LogWriter writer ){
                    this.writer         = writer;
                    this.formatter      = formatter;
                    this.blockSize      = writer.getBlockSize();
                    this.builder        = new ReusableStringBuilder();
                    this.ringCapacity   = eRing.getRingCapacity();
                }
                
                
                public void run(){
                    
                    while( keepLogging ){
        
                        while ( keepLogging && ( logWriterCount == (ringWriterCount = circularRing.getWriterCount()) ) ){
                            LockSupport.parkNanos( ONE );
                        }
                    
                        int entityCount = 0;
                              
                        while( (entityCount < blockSize) && (logWriterCount < ringWriterCount) ){
                            
                            int writerIndex = logWriterCount + ONE;
                            writerIndex     = (writerIndex >= ringCapacity) ? (writerIndex % ringCapacity) : writerIndex;
                   
                            LoggableEntity data = circularRing.poll( writerIndex );
                            formatter.formatMessage( builder, data );
                        
                            ++logWriterCount;
                            ++entityCount;
                        }
                    
                        writer.writeBuffered( builder );
                        builder.wipeout();
                    
                    }
                
                    LOGGER.info("Background Logger thread successfully stopped.");
                }
                
            }
                
        }

public final class LoggableEntity{
        
            private Level level;
            private String msg;
            
            public void debug( String msg ){
                populateLog( DEBUG, name );
            }
            
            public void info( String name, String msg ){
                populateLog( INFO, name );
            }
        
            private final void populateLog( Level level, String msg ){
                this.level     = level;
                this.msg       = msg;
            }
        
           public final Level getLevel( ){
                return level;
            }
            
            
            public final String getMessage( ){
                return msg;
            }
        
        }

public class Tester{


    public static void main( String[] args ) throws Exception{

            int iteration       = 16;
            int blockSize       = 8;
        String TEMPLATE     = "This is a simulated dummy log message number ";
          
            LogWriter writer    = new DefaultLogWriter( blockSize, "Test.log");
            RingLogger logger   = new RingLogger( iteration, writer );
            logger.init();
            
            for( int i =0; i < iteration; i++ ){
               LoggableEntity data = logger.pollNext( );
               data.debug( NAME, TEMPLATE + i );
               logger.offer( data );
            }
     
            logger.stop();

  }

}

Any comments, pointers etc are appreciated. Thank you and happy holidays.

-Cheers