8
\$\begingroup\$

I recently wrote a concurrent, mutex-less (but not lockfree) queue and wanted to know if it is actually correct, and if there are any particular improvements I could make:

template <typename T>
class concurrent_queue
{
protected:
    T *storage;
    std::size_t s;
    std::atomic<T*> consumer_head, producer_head;

    union alignas(16) dpointer
    {
        struct
        {
            T *ptr;
            std::size_t cnt;
        };
        __int128 val;
    };

    dpointer consumer_pending, producer_pending;

public:
    concurrent_queue(std::size_t s): storage(nullptr)
    {
        storage = static_cast<T*>(::operator new((s+1)*sizeof(T)));

        consumer_head = storage;
        __atomic_store_n(&(consumer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);

        producer_head = storage;

        __atomic_store_n(&(producer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);
        this->s = s + 1;
    }
    ~concurrent_queue()
    {
        while(consumer_head != producer_head)
        {
            ((T*)consumer_head)->~T();
            ++consumer_head;
            if(consumer_head == storage + s)
                consumer_head = storage;
        }
        ::operator delete(storage);
    }

    template <typename U>
    bool push(U&& e)
    {
        while(true)
        {
            dpointer a;
            a.val = __atomic_load_n(&(producer_pending.val), __ATOMIC_ACQUIRE);
            auto b = consumer_head.load(std::memory_order_relaxed);
            auto next = a.ptr + 1;
            if(next == storage + s) next = storage;

            if(next == b) return false;
            dpointer newval{next, a.cnt+1};
            if(!__atomic_compare_exchange_n(&(producer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;

            new (a.ptr) T(std::forward<U>(e));

            while(!producer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
            return true;
        }
    }

    template <typename U>
    bool pop(U& result)
    {
        while(true)
        {
            dpointer a;
            a.val = __atomic_load_n(&(consumer_pending.val), __ATOMIC_ACQUIRE);
            auto b = producer_head.load(std::memory_order_relaxed);
            auto next = a.ptr + 1;
            if(next == storage + s) next = storage;

            if(a.ptr == b) return false;
            dpointer newval{next, a.cnt+1};
            if(!__atomic_compare_exchange_n(&(consumer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;

            result = std::move(*(a.ptr));
            (a.ptr)->~T();

            while(!consumer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
            return true;
        }
    }
};

It's platform specific to GCC on x86_64 and a CPU that supports double width CAS right now, but I assume it wouldn't be that hard to adjust it for other platforms.

I've stress tested it with multiple threads pushing and popping, with both POD types and with memory-managing types like std::vector and haven't had any issues so far...before I put in the double width CAS I encountered the ABA problem and had segmentation faults, but with it it seems to be fine.

However, I'm new to all this multithreaded stuff, and wanted someone more experienced than me to tell me if this would work, say, on a system with a weak memory model.

\$\endgroup\$
2
  • \$\begingroup\$ "It's platform specific to GCC on x86_64 [...]" Ok, but can we move away from that? \$\endgroup\$ Commented Jun 5, 2014 at 9:03
  • \$\begingroup\$ @black of course; for 32 bit systems just use a uint64_t instead of __int128, for LL/SC systems a counter isn't even necessary, and for other compilers use the respective equivalent intrinsics. Though, if my memory fences aren't correct it won't work on some systems \$\endgroup\$ Commented Jun 5, 2014 at 12:01

1 Answer 1

4
\$\begingroup\$

Regarding the conditional return types in push() and pop(), it may be clearer to use a different type of infinite loop, such as a for(;;). This will better focus the attention to the loop contents.

You could also use curly braces for the single-line statements. Some of these lines are quite long, so the executed code is at the very end. This should also help distinguish them from the busy-waits.

\$\endgroup\$

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.