A bounded multi-producer multi-consumer concurrent queue written in C++11

Overview

MPMCQueue.h

Build Status C/C++ CI License

A bounded multi-producer multi-consumer concurrent queue written in C++11.

It's battle hardened and used daily in production:

It's been cited by the following papers:

  • Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506

Example

MPMCQueue<int> q(10);
auto t1 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t1 " << v << "\n";
});
auto t2 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t2 " << v << "\n";
});
q.push(1);
q.push(2);
t1.join();
t2.join();

Usage

  • MPMCQueue<T>(size_t capacity);

    Constructs a new MPMCQueue holding items of type T with capacity capacity.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • void push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> void push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> bool try_push(P &&v);

    Try to enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Returns true on success and false if queue is full.

  • void pop(T &v);

    Dequeue an item by copying or moving the item into v. Blocks if queue is empty.

  • bool try_pop(T &v);

    Try to dequeue an item by copying or moving the item into v. Return true on sucess and false if the queue is empty.

All operations except construction and destruction are thread safe.

Implementation

Memory layout

Enqeue:

  1. Acquire next write ticket from head.
  2. Wait for our turn (2 * (ticket / capacity)) to write slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the readers we are done writing.

Dequeue:

  1. Acquire next read ticket from tail.
  2. Wait for our turn (2 * (ticket / capacity) + 1) to read slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the writers we are done reading.

References:

Testing

Testing concurrency algorithms is hard. I'm using two approaches to test the implementation:

  • A single threaded test that the functionality works as intended, including that the element constructor and destructor is invoked correctly.
  • A multithreaded fuzz test that all elements are enqueued and dequeued correctly under heavy contention.

TODO

  • Add allocator supports so that the queue could be used with huge pages and shared memory
  • Add benchmarks and compare to boost::lockfree::queue and others
  • Use C++20 concepts instead of static_assert if available
  • Use std::hardware_destructive_interference_size if available
  • Add API for zero-copy deqeue and batch dequeue operations

About

This project was created by Erik Rigtorp <[email protected]>.

Comments
  • How is this queue lock-free?

    How is this queue lock-free?

    A thread that blocks after the call to fetch_add() will block later producers and consumers that use the same slot. As objects are enqueued (and dequeued), the ring will wrap around and come back to earlier slots. For enqueue, such an event can be pushed far into the future as the queue capacity can be made arbitrarily large (as memory allows) but not for dequeue. Possibly blocking could be avoided if producers and consumers synchronise with each other (e.g. use CAS to update the slot and restart the operation (allocate new index) if the slot doesn't have the expected value.

    opened by WonderfulVoid 9
  • Add try_consume_until_current_head

    Add try_consume_until_current_head

    Inspired by your try_consume PR I added a try_consume_until_current_head method that consumes all elements of the queue that were added before the current head.

    My use case is a macro recorder. The queue captures commands from different threads. Then, when the user triggers the macro, all recorded commands are executed. However it's possible that while the commands belonging to the first recording are still being consumed, commands from a second recording are already being recorded, and they shouldn't be executed until next time the macro is triggered. With try_consume or try_pop I cannot guarantee this.

    I'm not sure what the memory ordering should be for the head_.load at the beginning of the function. Can you help here?

    I'm not sure what's you policy on new features, let me know if you think this doesn't belong to the project.

    opened by ferranpujolcamins 8
  • long latency when push times are much bigger than queue size.

    long latency when push times are much bigger than queue size.

    Hi

    I tried a test pushing 1000000 times into the queue, and when the queue size is small like 64 or 1024, the latency is quite long. and when the queue size is big like 65535, the latency becomes low.

    queue size 1024, push 10000 times, per use 0.3 us. queue size 1024, push 1000000 times, per use 13 us

        using namespace chrono;
       
        vector<thread> threads;
        rigtorp::MPMCQueue<int *> q(1024);
        atomic<int> push_times{0};
        atomic<int> pop_times{0};
    
        {
            steady_clock::time_point t1 = steady_clock::now();
            for(int i=0; i<Consumer_num; ++i) {
                threads.emplace_back(
                    [&] {
                        while(pop_times.load()<test_size) {
                            int* temp;
                            q.pop(temp);
                            pop_times.fetch_add(1);
                        }
                    }
                );
            }
            while(push_times<test_size) {
                int* temp;
                q.push(temp);
                push_times.fetch_add(1);
            }
            steady_clock::time_point t2 = steady_clock::now();
            duration<double, micro> time_span = (duration<double, micro>)(t2 - t1);
    
            this_thread::sleep_for(chrono::microseconds(500));
            cout<<"rigtorp::MPMCQueue: push " << test_size << " data, per cost us:" << (time_span).count()/test_size <<endl;
            if(push_times.load() != pop_times.load()) {
                cout<<"warning: push_times=" << push_times.load() << ", pop_times=" << pop_times.load() <<endl;
            }
        }
        exit(0);
    
    
    opened by colderleo 4
  • Does this work on shared memory?

    Does this work on shared memory?

    For example, Linux open_shm, or MPI_win_allocate_shared, combined with placement new:

    const int N = 10; // capacity
    std::byte* buffer;
    /* allocate buffer as shm on main proc*/
    if (/*main proc*/) {
        new (buffer) MPMCQueue<int> q(N);
        // consume
    } else {
        new (buffer) MPMCQueue<int> q(N);
       // produce
    }
    
    opened by Jerry-Ma 4
  • Fix memory leak on MPMCQueue's construction with null capacity.

    Fix memory leak on MPMCQueue's construction with null capacity.

    If given 'capacity' is < 1 you create a memory leak by throwing after memory allocation since '~MPMCQueue' will not be called. I suggest to move the check in a nested static member function called before 'slots_' initialization.

    opened by Akkarinn 3
  • cannot convert ‘Message*’ to ‘rigtorp::mpmc::Slot<Message>*’

    cannot convert ‘Message*’ to ‘rigtorp::mpmc::Slot*’

    Hi, I am trying to use MPMCQueue with a custom allocator. But I am getting the two following errors:

    /Path/To/Project/Debug/external/include/rigtorp/MPMCQueue.h:129:33: error: cannot convert ‘Message*’ to ‘rigtorp::mpmc::Slot<Message>*’ in assignment
      129 |     slots_ = allocator_.allocate(capacity_ + 1);
    /Path/To/Project/Debug/external/include/rigtorp/MPMCQueue.h:134:29: error: cannot convert ‘rigtorp::mpmc::Slot<Message>*’ to ‘CommMemoryAllocator<Message>::pointer’ {aka ‘Message*’}
      134 |       allocator_.deallocate(slots_, capacity_ + 1);
    

    This is how I define the allocator and pass it to MPMCQueue:

    template <typename T>
    class CommMemoryAllocator
    {
        public:
            typedef size_t size_type;
            typedef ptrdiff_t difference_type;
            typedef T* pointer;
            typedef const T* const_pointer;
            typedef T& reference;
            typedef const T& const_reference;
            typedef T value_type;
    
            T* allocate(size_type num, const void* hint = nullptr);
    
            void deallocate(pointer ptr, size_type num);
    };
    
    template <typename T>
    using comm_mqueue = rigtorp::MPMCQueue<T, CommMemoryAllocator<T>>;
    

    Then inside one of my classes I actually use it like so:

    comm_mqueue<Message> downward_queue{20};
    

    I don't actually use the queues yet (so no pushing or popping), I am getting these errors directly from this simple use.

    Is there something I am missing or is this a bug?I am using Linux, gcc/g++ 11.1.0, and this is being compiled as C++17.

    opened by mewais 2
  • Efficient block while pushing/popping

    Efficient block while pushing/popping

    I noticed that standard c++20 has add wait and notify function to atomic library occasionally:

    • wait blocks the thread until notified and the atomic value changes

    • notify_all notifies all threads blocked waiting on the atomic object

    Thinking that there's an empty queue, one thread got blocked in a while(!is_it_changed) loop while popping an item, it would consume many cpu resources if there is no other thread(s) enqueue an item instantly, this case as same as pushing to a full queue.

    In my opinion, the thread which got blocked while dequeueing from an empty queue (or enqueueing to a full queue) is better go for "sleep" after detecting whether changes has been made to it for several times, then if there are some thread enqueued (or dequeued), it will "wake-up" the sleeping thread(s) again to detect changes, I think using this form of change-detection is often more efficient than simple polling.

    Through my poor implementation seems works and passed the MPMCQueueTest with no significant performance effect, but I may not considered all the different situations in synchronizing threads. Could you have some suggestion? Thanks!

    opened by Unbinilium 2
  • CPU usage in pop block

    CPU usage in pop block

    Used ~30% CPU on wait in pop. Compiled in VS2017

    sample code:

    auto q = std::make_shared<rigtorp::MPMCQueue<int>>(10);
    while(true) {
        int v;
        q->pop(v);
        switch(v) {
        case 4726:
            LOGV << "event=" << v << " user removed";
            break;
        case 4724:
            LOGV << "event=" << v << " user password manually reseted";
            break;
        default:
            LOGV << "event=" << v << " unknow";
            break;
        }
    }
    

    image

    opened by hdhog 2
  • Feature request: is LIFO possible?

    Feature request: is LIFO possible?

    This is a very nice MPMC queue! Is it possible to add LIFO functionality? It looks as if pop() could be modified to use head_.fetch_sub(1) but I am uncertain if this breaks something?

    opened by emmenlau 2
  • Can just set the turn to true or false to represent reading or writing is done

    Can just set the turn to true or false to represent reading or writing is done

    According to the implementation.

    Enqeue:

    • Acquire next write ticket from head.
    • Wait for our turn (true) to write slot (ticket % capacity).
    • Set turn = false to inform the readers we are done writing.

    Dequeue:

    • Acquire next read ticket from tail.
    • Wait for our turn (false) to read slot (ticket % capacity).
    • Set turn = true to inform the writers we are done reading.

    Dose this work?

    Thank you.

    opened by haicoder 1
  • Beginner Question

    Beginner Question

    I'm running into a compile issue using a struct containing a std::string as the type when constructing a new MPMCQueue.

    For example, using a struct:

    struct MyEvent {
    	std::string MyString;
    	long MyLong;
    };
    

    and trying to construct a MPMCQueue:

    MPMCQueue<MyEvent> EventQueue(100);

    I get a compiler message:

    "T must be nothrow copy constructible".

    If I remove the std::string from the struct and replace it with another long for example, it compiles and works perfectly. It also works perfectly if I use just a std::string in constructing the queue. But not when it's used inside the struct.

    Any advice would be appreciated.

    opened by baylf2000 1
  • Modify some array index operations

    Modify some array index operations

    Hi, Erik: It's a great honor to see the lock free project you released on GitHub. The algorithm is very exquisite, and I've learned a lot. Thank you for your creation. In the code of MPMC queue, I found two points that can further improve the performance in the processing of array index and the number of turns. I was inspired by the kfifo queue in the Linux kernel code. And my brief description is as follows:

    // first point:  
    constexpr size_t idx(size_t i) const noexcept { 
        return i % capacity_;
    }
    
    // second point:
    constexpr size_t turn(size_t i) const noexcept {
        return i / capacity_;
    }
    

    ​ Every time you access the array index, you need a '%' operation, If the capacity of the queue is an integer power of 2, when the subscript is increasing in the ring queue, a "&" operator can be used to locate the specific array position, which should be a faster operation. Like this:

    constexpr size_t idx(size_t i) const noexcept {
        return i & (capacity_ - 1);
    }
    /*
    	if capacity is 8, i is 20 sometimes. Expressed in binary:
    	---------------------
    	 8:   0 0 0 0  1 0 0 0
       8-1:   0 0 0 0  0 1 1 1
    	20:   0 0 0 1  0 1 0 0
    	----------------------
    	that, 20 & ( 8 - 1) is 4, same as 20 % 8. 
    	      0 0 0 0  0 1 0 0  
    */
    

    ​ The processing of the number of turns is the same, Each time the turn of the queue loop is calculated, the division method needs to be used. If the capacity keeps the above characteristics, the bit operation can be used to replace the division operation every time the turn is calculated. The process is as follows:

    // capacity_ is an integer power of 2,Logarithmic operation:
    size_t power_of_capacity_ = ::log2(capacity_); 
    constexpr size_t turn(size_t i) const noexcept {
        return  i >> power_of_capacity_;
    }
    /*
    	if capacity is 8, i is 35 sometimes. Expressed in binary:
    	---------------------
    	 8:   0 0 0 0  1 0 0 0  (power_of_capacity: 3)
    	35:   0 0 1 0  0 0 1 1
    	----------------------
    	that: 35 >> 3 is 4, same as 35 / 8. 
    	      0 0 0 0  0 1 0 0
    */
    

    ​ The above two points are what I want to express. At the same time, I have carefully modified your code ^_^. I tried to remove the capacity variable and replace it with its mask value. At the same time, some modifications have been made to the constructor. If the passed capacity parameter is less than the integer power of 2, I will round it up. In this way, the assignment of capacity has to be removed from the formal parameter list, and the const attribute of the capacity variable in the class is also removed.

    ​ These changes will have an impact: the user's capacity parameter is likely not to be the actual number of memory space requests in the queue. Please evaluate.

    ​ I have passed the test program under the src directory with the above modifications, which may need to be improved in other aspects.

    ​ These are some of my considerations. I hope to see your guidance and reply.

    ​ Ps:The Linux kernel code version I read is : linux-4.4.293. Here is the location of the kfifo code, a lightweight SPSC queue using memory barrier.

    kfifo.h:    linux-4.4.293\include\linux\kfifo.h
    kfifo.c:    linux-4.4.293\lib\kfifo.c
    
    opened by Yankefei 0
Releases(v1.0)
Owner
Erik Rigtorp
Erik Rigtorp
C++11 thread safe, multi-producer, multi-consumer blocking queue, stack & priority queue class

BlockingCollection BlockingCollection is a C++11 thread safe collection class that provides the following features: Modeled after .NET BlockingCollect

Code Ex Machina LLC 50 Nov 23, 2022
A bounded single-producer single-consumer wait-free and lock-free queue written in C++11

SPSCQueue.h A single producer single consumer wait-free and lock-free fixed size queue written in C++11. Example SPSCQueue<int> q(2); auto t = std::th

Erik Rigtorp 576 Dec 27, 2022
A fast single-producer, single-consumer lock-free queue for C++

A single-producer, single-consumer lock-free queue for C++ This mini-repository has my very own implementation of a lock-free queue (that I designed f

Cameron 2.9k Jan 5, 2023
Concurrent-deque - Lock-free concurrent work stealing deque in C++

A lock-free work-stealing deque This is a C++ implementation of the Chase-Lev deque, a concurrent single-producer, multi-consumer queue presented in t

Shubham Lagwankar 30 Jan 7, 2023
Vgpu unlock - Unlock vGPU functionality for consumer grade GPUs.

vgpu_unlock Unlock vGPU functionality for consumer-grade Nvidia GPUs. Important! This tool is not guarenteed to work out of the box in some cases, so

Jonathan Johansson 3.6k Dec 29, 2022
Concurrency Kit 2.1k Jan 4, 2023
Concurrent data structures in C++

Junction is a library of concurrent data structures in C++. It contains several hash map implementations: junction::ConcurrentMap_Crude junction::Conc

Jeff Preshing 1.3k Dec 31, 2022
A C++ library of Concurrent Data Structures

CDS C++ library The Concurrent Data Structures (CDS) library is a collection of concurrent containers that don't require external (manual) synchroniza

Max Khizhinsky 2.2k Jan 3, 2023
:copyright: Concurrent Programming Library (Coroutine) for C11

libconcurrent tiny asymmetric-coroutine library. Description asymmetric-coroutine bidirectional communication by yield_value/resume_value native conte

sharow 350 Sep 2, 2022
Code from https://queue.acm.org/detail.cfm?id=3448307 unzipped

Copyright (C) 2020-2021 Terence Kelly. All rights reserved. Author contact: [email protected], [email protected], [email protected] Adde

Breck Yunits 21 May 30, 2021
Smart queue that executes tasks in threadpool-like manner

execq execq is kind of task-based approach of processing data using threadpool idea with extended features. It supports different task sources and mai

Vladimir (Alkenso) 33 Dec 22, 2022
A C++ library providing various concurrent data structures and reclamation schemes.

xenium xenium is a header-only library that provides a collection of concurrent data structures and memory reclamation algorithms. The data structures

Manuel Pöter 351 Dec 28, 2022
Thread pool - Thread pool using std::* primitives from C++17, with optional priority queue/greenthreading for POSIX.

thread_pool Thread pool using std::* primitives from C++11. Also includes a class for a priority thread pool. Requires concepts and C++17, including c

Tyler Hardin 77 Dec 30, 2022
This is a study on how to do create a queue via IPC (inter-process communication)

IPC queue This is a study on how to do create a queue via IPC (inter-process communication). Please take a look at the examples of producer and consum

Tarcísio Zotelli Ferraz 1 Nov 28, 2022
A library for enabling task-based multi-threading. It allows execution of task graphs with arbitrary dependencies.

Fiber Tasking Lib This is a library for enabling task-based multi-threading. It allows execution of task graphs with arbitrary dependencies. Dependenc

RichieSams 796 Dec 30, 2022
Optimized primitives for collective multi-GPU communication

NCCL Optimized primitives for inter-GPU communication. Introduction NCCL (pronounced "Nickel") is a stand-alone library of standard communication rout

NVIDIA Corporation 1.9k Dec 30, 2022
Powerful multi-threaded coroutine dispatcher and parallel execution engine

Quantum Library : A scalable C++ coroutine framework Quantum is a full-featured and powerful C++ framework build on top of the Boost coroutine library

Bloomberg 491 Dec 30, 2022
lc is a fast multi-threaded line counter.

Fast multi-threaded line counter in Modern C++ (2-10x faster than `wc -l` for large files)

Pranav 14 Oct 25, 2022
A library OS for Linux multi-process applications, with Intel SGX support

Graphene Library OS with Intel SGX Support A Linux-compatible Library OS for Multi-Process Applications NOTE: We are in the middle of transitioning ou

The Gramine Project 323 Jan 4, 2023