A bounded single-producer single-consumer wait-free and lock-free queue written in C++11

Overview

SPSCQueue.h

Build Status C/C++ CI License

A single producer single consumer wait-free and lock-free fixed size queue written in C++11.

Example

SPSCQueue<int> q(2);
auto t = std::thread([&] {
  while (!q.front());
  std::cout << *q.front() << std::endl;
  q.pop();
});
q.push(1);
t.join();

See src/SPSCQueueExample.cpp for the full example.

Usage

  • SPSCQueue<T>(size_t capacity);

    Create a SPSCqueue holding items of type T with capacity capacity. Capacity need to be greater than 2.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • void push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> void push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> void try_push(P &&v);

    Try to enqueue an item using move construction. Returns true on success and false if queue is full. Participates in overload resolution only if std::is_constructible<T, P&&>::value == true.

  • T *front();

    Return pointer to front of queue. Returns nullptr if queue is empty.

  • pop();

    Dequeue first elment of queue. Invalid to call if queue is empty. Requires std::is_nothrow_destructible<T>::value == true.

Only a single writer thread can perform enqueue operations and only a single reader thread can perform dequeue operations. Any other usage is invalid.

Huge page support

In addition to supporting custom allocation through the standard custom allocator interface this library also supports standard proposal P0401R3 Providing size feedback in the Allocator interface. This allows convenient use of huge pages without wasting any allocated space. Using size feedback is only supported when C++17 is enabled.

The library currently doesn't include a huge page allocator since the APIs for allocating huge pages are platform dependent and handling of huge page size and NUMA awareness is application specific.

Below is an example huge page allocator for Linux:

#include <sys/mman.h>

template <typename T> struct Allocator {
  using value_type = T;

  struct AllocationResult {
    T *ptr;
    size_t count;
  };

  size_t roundup(size_t n) { return (((n - 1) >> 21) + 1) << 21; }

  AllocationResult allocate_at_least(size_t n) {
    size_t count = roundup(sizeof(T) * n);
    auto p = static_cast<T *>(mmap(nullptr, count, PROT_READ | PROT_WRITE,
                                   MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
                                   -1, 0));
    if (p == MAP_FAILED) {
      throw std::bad_alloc();
    }
    return {p, count / sizeof(T)};
  }

  void deallocate(T *p, size_t n) { munmap(p, roundup(sizeof(T) * n)); }
};

See src/SPSCQueueExampleHugepages.cpp for the full example on how to use huge pages on Linux.

Implementation

Memory layout

The underlying implementation is a ring buffer.

Care has been taken to make sure to avoid any issues with false sharing. The head and tail pointers are aligned and padded to the false sharing range (cache line size). The slots buffer is padded with the false sharing range at the beginning and end.

References:

Testing

Testing lock-free algorithms is hard. I'm using two approaches to test the implementation:

  • A single threaded test that the functionality works as intended, including that the element constructor and destructor is invoked correctly.
  • A multithreaded fuzz test that all elements are enqueued and dequeued correctly under heavy contention.

Benchmarks

Throughput benchmark measures throughput between 2 threads for a SPSCQueue<int> of size 256.

Latency benchmark measures round trip time between 2 threads communicating using 2 queues of type SPSCQueue<int>.

The following numbers are for a 2 socket machine with 2 x Intel(R) Xeon(R) CPU E5-2620 0 @ 2.00GHz.

NUMA Node / Core / Hyper-Thread Throughput (ops/ms) Latency RTT (ns)
#0,#0,#0 & #0,#0,#1 63942 60
#0,#0,#0 & #0,#1,#0 37739 238
#0,#0,#0 & #1,#0,#0 25744 768

Cited by

SPSCQueue have been cited by the following papers:

  • Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506

About

This project was created by Erik Rigtorp <[email protected]>.

Comments
  • add T * back() and pop_back actions

    add T * back() and pop_back actions

    at the moment, front() and pop only modify the front of the queue

    eg

    add 1 add 2 [1,2] front() == 1 pop [2,0] push 1 [2,1]

    but I need to be able to modify the back of the queue instead

    add 1 add 2 [1,2] back() == 2 pop_back [1,0] push 2 [1,2]

    opened by mgood7123 6
  • Add try_pop() = front() + pop() operation

    Add try_pop() = front() + pop() operation

    ...as found in MPMCQueue. This is a very common operation. Invoking front() and pop() subsequently would be inefficient.

    Hi, Erik. If you agree I would like to propose to use your implementation for some use cases in Mixxx. Maintaining our own custom SPSC queue implementation doesn't make any sense and the C ring buffer from PortAudio that we also use is error-prone and dangerous for non-POD types.

    Do you have any plans to align the API of both SPSCQueue and MPMCQueue?

    opened by uklotzde 5
  • Error with __cpp_lib_hardware_interference_size on MacOS llvm-g++ 12.x

    Error with __cpp_lib_hardware_interference_size on MacOS llvm-g++ 12.x

    The check for __cpp_lib_hardware_interference_size in include/rigtorp/SPSCQueue.h seems to incorrectly indicate this feature is available in MacOS llvm-g++ but it's not really implemented.

    Not sure how this should be checked correctly, but for example following post suggests instead

    #if __cpp_lib_hardware_interference_size >= 201603

    This was seen on MacOS 10.15 with xcode CLI tools 12.1 and llvm-g++ version clang-1200.0.32.21

    opened by hile 3
  • The alignment of Slot?

    The alignment of Slot?

    I think the alignment of Slot can avoid false sharing between items, like how we do in MPMCQueue. Should we also use an alignment of Slot in an SPSCQueue?

    opened by constroy 2
  • optimize front() for bust of data

    optimize front() for bust of data

    currently, front() always do head_.load(std::memory_order_acquire. I think it can load(std::memory_order_relaxed) first, then try to load(std::memory_order_acquire). This will help when producer has a lot of data coming in at the same time.

    Facebook folly queue has a pull request for this. https://github.com/facebook/folly/pull/1133. Not sure why it was not accepted into master.

    opened by azuresigma 1
  • Provide a way to let consumer know push failed

    Provide a way to let consumer know push failed

    The use case is, if consumer is slow to keep up with incoming events in the queue, consumer can know about it and decide to quit, instead of making bad decisions based on stale information.

    One easy way I can think of, is to have a failedPushCount that can be checked by user. User can decide to continue if failedPushCount(number of dropped messages) is small and stop if the number exceeds a pre-defined threshold.

    It is up to the main implementation. The most important feature I am looking for is to notify consumer if push fails.

    Thanks.

    opened by azuresigma 1
  • capacity

    capacity

    Example: for a 'capacity' of 10 provided to the ctor, only 9 items can be pushed to the queue. I understand this is not a bug, but at least the documentation should state so. The best would have been: for a capacity of X specified by the client, use X - 1 as real capacity. But it's too late to change. This would also have made it clearer why the capacity must be at least 2.

    opened by Philippe91 1
  •  Add peek & clear functions

    Add peek & clear functions

    I wasn't sure if the changes I made are working correctly, so I added the instructions I used to the README as well - feel free to correct if I am mistaken :)

    opened by xeruf 0
  • Benchmarking against other queues ?

    Benchmarking against other queues ?

    such as https://github.com/cameron314/readerwriterqueue ; it would be interesting to have a complete set of benchmarks for SPSC queues like https://max0x7ba.github.io/atomic_queue/html/benchmarks.html does for MPMC queues

    opened by jcelerier 0
  • Assertion failed

    Assertion failed

    Hi!

    I've been using SPSCQueue for some time and just updated to the latest version and now hit this:

    SPSCQueue.hpp:188: void Rigtorp::SPSCQueue<T, Allocator>::pop() [with T = PacBio::CCS::Chunk; Allocator = std::allocator<PacBio::CCS::Chunk>]: Assertion `writeIdx_.load(std::memory_order_acquire) != readIdx' failed.
    

    Any advice? Thank you!

    opened by armintoepfer 3
  •  replace (std::memory_order_release ,std::memory_order_acquire) with (std::memory_order_relax)?

    replace (std::memory_order_release ,std::memory_order_acquire) with (std::memory_order_relax)?

      readIdx_.store(nextReadIdx, std::memory_order_release);
    
    
      while ( nextWriteIdx == readIdxCache_ ) {
        readIdxCache_ = readIdx_.load(std::memory_order_acquire);
      }
    

    replace (std::memory_order_release ,std::memory_order_acquire) with (std::memory_order_relax)?

    opened by lglgdouble 0
  • mmap IPC allocator example

    mmap IPC allocator example

    Hi there,

    I'm trying to use your awesome queue to do inter-process shared memory communication on linux.

    I think it can be done with some modification to the huge page allocator example, but coundn't come up with a correct implementation.

    If IPC is possible, could you please add an IPC demo to the example? Many thanks.

    opened by PengyiPan 0
  • Small things

    Small things

    First of all, kudos to your cache-aware indices. Though the underlying logic is not obvious (!), it does work and improves performances.

    Two small things, not new to version 1.1:

    1. The line: char padding_[kCacheLineSize - sizeof(writeIdxCache_)] has no effect as the global structure has a size that is already quantized, because of alignas(kCacheLineSize) members.

    2. The way you compute kPaddingis not optimal, because if sizeof(T) == kCacheLineSize, then kPaddingis 1, while 0 would be better.

    static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1;

    opened by Philippe91 3
Releases(v1.1)
  • v1.1(Jul 22, 2021)

    This release significantly increases throughput by adding head and tail caches.

    Benchmark results for a AMD Ryzen 9 3900X 12-Core Processor, the 2 threads are running on different cores on the same chiplet:

    | Queue | Throughput (ops/ms) | Latency RTT (ns) | | ---------------------------- | ------------------: | ---------------: | | SPSCQueue | 362723 | 133 | | boost::lockfree::spsc | 209877 | 222 | | folly::ProducerConsumerQueue | 148818 | 147 |

    Source code(tar.gz)
    Source code(zip)
Owner
Erik Rigtorp
Erik Rigtorp
A bounded multi-producer multi-consumer concurrent queue written in C++11

MPMCQueue.h A bounded multi-producer multi-consumer concurrent queue written in C++11. It's battle hardened and used daily in production: In the Frost

Erik Rigtorp 836 Dec 25, 2022
A fast multi-producer, multi-consumer lock-free concurrent queue for C++11

moodycamel::ConcurrentQueue An industrial-strength lock-free queue for C++. Note: If all you need is a single-producer, single-consumer queue, I have

Cameron 7.4k Jan 3, 2023
C++11 thread safe, multi-producer, multi-consumer blocking queue, stack & priority queue class

BlockingCollection BlockingCollection is a C++11 thread safe collection class that provides the following features: Modeled after .NET BlockingCollect

Code Ex Machina LLC 50 Nov 23, 2022
Awesome-lockfree - A collection of resources on wait-free and lock-free programming

Awesome Lock-Free A collection of resources on wait-free and lock-free programming. ?? ?? ?? Even better resource from MattPD: C++ links: atomics, loc

Erik Rigtorp 1.4k Jan 1, 2023
Forkpool - A bleeding-edge, lock-free, wait-free, continuation-stealing scheduler for C++20

riften::Forkpool A bleeding-edge, lock-free, wait-free, continuation-stealing scheduler for C++20. This project uses C++20's coroutines to implement c

Conor Williams 129 Dec 31, 2022
Concurrency Kit 2.1k Jan 4, 2023
Simple and fast C library implementing a thread-safe API to manage hash-tables, linked lists, lock-free ring buffers and queues

libhl C library implementing a set of APIs to efficiently manage some basic data structures such as : hashtables, linked lists, queues, trees, ringbuf

Andrea Guzzo 392 Dec 3, 2022
Vgpu unlock - Unlock vGPU functionality for consumer grade GPUs.

vgpu_unlock Unlock vGPU functionality for consumer-grade Nvidia GPUs. Important! This tool is not guarenteed to work out of the box in some cases, so

Jonathan Johansson 3.6k Dec 29, 2022
Fast, generalized, implementation of the Chase-Lev lock-free work-stealing deque for C++17

riften::Deque A bleeding-edge lock-free, single-producer multi-consumer, Chase-Lev work stealing deque as presented in the paper "Dynamic Circular Wor

Conor Williams 120 Dec 22, 2022
GECOS: A lock-free synchronization mechanism

GECOS GECOS is a lock-free synchronization mechanism, and this repository compares various well-known mechanisms such as RCU and HP (Hazard Pointers).

null 6 Sep 9, 2021
Bikeshed - Lock free hierarchical work scheduler

Branch OSX / Linux / Windows master master bikeshed Lock free hierarchical work scheduler Builds with MSVC, Clang and GCC, header only, C99 compliant,

Dan Engelbrecht 81 Dec 30, 2022
Concurrent-deque - Lock-free concurrent work stealing deque in C++

A lock-free work-stealing deque This is a C++ implementation of the Chase-Lev deque, a concurrent single-producer, multi-consumer queue presented in t

Shubham Lagwankar 30 Jan 7, 2023
Rpmalloc - Public domain cross platform lock free thread caching 16-byte aligned memory allocator implemented in C

rpmalloc - General Purpose Memory Allocator This library provides a public domain cross platform lock free thread caching 16-byte aligned memory alloc

Mattias Jansson 1.7k Jan 5, 2023
Code from https://queue.acm.org/detail.cfm?id=3448307 unzipped

Copyright (C) 2020-2021 Terence Kelly. All rights reserved. Author contact: tpkelly@acm.org, [email protected], [email protected] Adde

Breck Yunits 21 May 30, 2021
Smart queue that executes tasks in threadpool-like manner

execq execq is kind of task-based approach of processing data using threadpool idea with extended features. It supports different task sources and mai

Vladimir (Alkenso) 33 Dec 22, 2022
Thread pool - Thread pool using std::* primitives from C++17, with optional priority queue/greenthreading for POSIX.

thread_pool Thread pool using std::* primitives from C++11. Also includes a class for a priority thread pool. Requires concepts and C++17, including c

Tyler Hardin 77 Dec 30, 2022
This is a study on how to do create a queue via IPC (inter-process communication)

IPC queue This is a study on how to do create a queue via IPC (inter-process communication). Please take a look at the examples of producer and consum

TarcĂ­sio Zotelli Ferraz 1 Nov 28, 2022
RocketOS is a Unix based OS that uses legacy BIOS and GRUB and is written in C17. It is being developed for educational purposes primarily, but it still is a serious project. It is currently in its infancy.

RocketOS What is RocketOS? RocketOS is a Unix based OS that uses legacy BIOS and GRUB and is written in C17. It is being developed for educational pur

null 30 Sep 19, 2022
C++20's jthread for C++11 and later in a single-file header-only library

jthread lite: C++20's jthread for C++11 and later A work in its infancy. Suggested by Peter Featherstone. Contents Example usage In a nutshell License

Martin Moene 50 Dec 8, 2022