C++11 thread safe, multi-producer, multi-consumer blocking queue, stack & priority queue class

Overview

BlockingCollection

BlockingCollection is a C++11 thread safe collection class that provides the following features:

  • Modeled after .NET BlockingCollection class.
  • Implementation of classic Producer/Consumer pattern (i.e. condition variable, mutex);
  • Adding and taking of items from multiple threads.
  • Optional maximum capacity.
  • Insertion and removal operations that block when collection is empty or full.
  • Insertion and removal "try" operations that do not block or that block up to a specified period of time.
  • Insertion and removal 'bulk" operations that allow more than one element to be added or taken at once.
  • Priority-based insertion and removal operations.
  • Encapsulates any collection type that satisfy the ProducerConsumerCollection requirement.
  • Minimizes sleeps, wake ups and lock contention by managing an active subset of producer and consumer threads.
  • Pluggable condition variable and lock types.
  • Range-based loop support.

Bounding and Blocking Support

BlockingCollection supports bounding and blocking. Bounding means that you can set the maximum capacity of the collection. Bounding is important in certain scenarios because it enables you to control the maximum size of the collection in memory, and it prevents the producing threads from moving too far ahead of the consuming threads.

Multiple threads or tasks can add elements to the collection, and if the collection reaches its specified maximum capacity, the producing threads will block until an element is removed. Multiple consumers can remove elements, and if the collection becomes empty, the consuming threads will block until a producer adds an item. A producing thread can call the complete_adding method to indicate that no more elements will be added. Consumers monitor the is_completed property to know when the collection is empty and no more elements will be added. The following example shows a simple BlockingCollection with a bounded capacity of 100. A producer task adds items to the collection as long as some external condition is true, and then calls complete_adding. The consumer task takes items until the is_completed property is true.

// A bounded collection. It can hold no more 
// than 100 items at once.
BlockingCollection<Data*> collection(100);

// a simple blocking consumer
std::thread consumer_thread([&collection]() {

  while (!collection.is_completed())
  {
      Data* data;
      
      // take will block if there is no data to be taken
      auto status = collection.take(data);
      
      if(status == BlockingCollectionStatus::Ok)
      {
          process(data);
      }
      
      // Status can also return BlockingCollectionStatus::Completed meaning take was called 
      // on a completed collection. Some other thread can call complete_adding after we pass 
      // the is_completed check but before we call take. In this example, we can ignore that
      // status since the loop will break on the next iteration.
  }
});

// a simple blocking producer
std::thread producer_thread([&collection]() {

  while (moreItemsToAdd)
  {
      Data* data = GetData(data);
      
      // blocks if collection.size() == collection.bounded_capacity()
      collection.add(data);
  }
  
  // let consumer know we are done
  collection.complete_adding();
});

Timed Blocking Operations

In timed blocking try_add and try_take operations on bounded collections, the method tries to add or take an item. If an item is available it is placed into the variable that was passed in by reference, and the method returns Ok. If no item is retrieved after a specified time-out period the method returns TimedOut. The thread is then free to do some other useful work before trying again to access the collection.

  BlockingCollection<Data*> collection(100);
  
  Data *data;
  
  // if the collection is empty this method returns immediately
  auto status = collection.try_take(data);
  
  // if the collection is still empty after 1 sec this method returns immediately
  status = collection.try_take(data, std::chrono::milliseconds(1000));
  
  // in both case status will return BlockingCollectionStatus::TimedOut if 
  // try_take times out waiting for data to become available

Bulk Operations

BlockingCollection's add and take operations are all thread safe. But it accomplishes this by using a mutex. To minimize mutex contention when adding or taking elements BlockingCollection supports bulk operations. It is usually much cheaper to acquire the mutex and then to add or take a whole batch of elements in one go, than it is to acquire and release the mutex for every add and take.

BlockingCollection<Data*> collection(100);

std::array<Data*, 20> arr;

size_t taken;

auto status = collection.try_take_bulk(arr, arr.size(), taken);

// try_take_bulk will update taken with actual number of items taken

Specifying the Collection Type

When you create a BlockingCollection object, you can specify not only the bounded capacity but also the type of collection to use. For example, you could specify a QueueContainer<T> object for first in, first out (FIFO) behavior, or a StackContainer<T> object for last in, first out (LIFO) behavior. You can use any collection class that supports the ProducerConsumerCollection requirement. The default collection type for BlockingCollection is QueueContainer<T>. The following code example shows how to create a BlockingCollection of strings that has a capacity of 1000 and uses a StackContainer<T>

BlockingCollection<std::string, StackContainer<std::string>> stack(1000);

Type aliases are also available:

BlockingQueue<std::string> blocking_queue;
BlockingStack<std::string> blocking_stack;

Priority based Insertion and Removal

PriorityBlockingCollection offers the same functionality found in BlockingCollection<T, PriorityQueue>. But the add/try_add methods add items to the collection based on their priority - (0 is lowest priority).

FIFO order is maintained when items of the same priority are added consecutively. And the take/try_take methods return the highest priority items in FIFO order.

In addition, PriorityBlockingCollection adds additional methods (i.e. take_prio/try_take_prio) for taking the lowest priority items.

PriorityBlockingCollection's default priority comparer expects that the objects being compared have overloaded < and > operators. If this is not the case then you can provide your own comparer implementation like in the following example.

struct PriorityItem {
  PriorityItem(int priority) : Priority(priority) 
  {}
  
  int Priority;
};

class CustomComparer {
public:
  CustomComparer() {
  }

  int operator() (const PriorityItem &item, const PriorityItem &new_item) {
    if (item.Priority < new_item.Priority)
      return -1;
    else if (item.Priority > new_item.Priority)
      return 1;
    else
      return 0;
  }
};

using CustomPriorityContainer = PriorityContainer<PriorityItem, CustomComparer>;

PriorityBlockingCollection<PriorityItem, CustomPriorityContainer> collection;

Range-based for loop Support

BlockingCollection provides an iterator that enables consumers to use for(auto item : collection) { ... }to remove items until the collection is completed, which means it is empty and no more items will be added. For more information, see

BlockingCollection<Data> collection(100);

// a simple blocking consumer using range-base loop
std::thread consumer_thread([&collection]() {
        
    for(auto data : collection) {
        process(data);
    }    
});

ProducerConsumerCollection Requirement

In order for a container to be used with the BlockingCollection it must meet the ProducerConsumerCollection requirement. The ProducerConsumerCollection requires that all the following method signatures must be supported:

  • size_type size()
  • bool try_add(const value_type& element)
  • bool try_add(value_type&& element)
  • bool try_take(value_type& element)
  • template <typename... Args> bool try_emplace(Args&&... args)

BlockingCollection currently supports three containers:

  • QueueContainer
  • StackContainer
  • PriorityContainer

Performance Optimizations

BlockingCollection can behave like most condition variable based collections. That is, it will by default issue a signal each time a element is added or taken from its underlying Container. But this approach leads to poor application scaling and performance.

So in the interest of performance BlockingCollection can be configured to maintain a subset of active threads that are currently adding and taking elements. This is important because it allows BlockingCollection not to have to issue a signal each time an element is added or taken. Instead in the case of consumers it issues a signal only when an element is taken and there are no active consumers, or when the Container's element count starts to grow beyond a threshold level. And in the case of producers, BlockingCollection will issue a signal only when an element is added and there are no active producers or when the Container's available capacity starts to grow beyond a threshold level.

In both cases, this approach greatly improves performance and makes it more predictable.

Two strategy classes are responsible for implementing the behavior just described.

  1. NotEmptySignalStrategy

    • implements conditions under which a "not empty" condition variable should issue a signal
  2. NotFullSignalStrategy

    • implements conditions under which a "not full" condition variable should issue a signal

NotEmptySignalStrategy

This strategy will return true under two conditions.

  1. All consumers are currently not active (i.e. waiting)
  2. Number of active consumers < total consumers AND number of item in collection per active consumer is > threshold value
template<size_t ItemsPerThread = 16> struct NotEmptySignalStrategy {

    bool should_signal(size_t active_workers, size_t total_workers, size_t item_count, size_t /*capacity*/) const {
        return active_workers == 0 || (active_workers < total_workers && item_count / active_workers > ItemsPerThread);
    }
};

NotFullSignalStrategy

This strategy will return true under two conditions.

  1. All producers are currently not active (i.e. waiting)
  2. Number of active producers < total producers AND current available capacity per active producer is > threshold value
template<size_t ItemsPerThread = 16> struct NotFullSignalStrategy {

    bool should_signal(size_t active_workers, size_t total_workers, size_t item_count, size_t capacity) const {
        return (active_workers == 0 || (active_workers < total_workers && (capacity - item_count) / active_workers > ItemsPerThread));
    }
};

Note that in both strategies the threshold value (i.e. ItemsPerThread) can be specified. And that completely new strategies can be used for both "no empty" and "not full" use cases by creating a new strategy that implements the required method signature.

bool should_signal(size_t active_workers, size_t total_workers, size_t item_count, size_t capacity)

Attaching/Detaching Consumers & Producers

In order for BlockingCollection to maintain a subset of active threads it exposes attach_producer and attach_consumer member function. The calling thread can call either of those functions to attach itself to the BlockingCollection as either a producer or consumer respectively. Note that the thread should remember to detach itself in both cases.

BlockingCollection<Data> collection(100);
  
std::thread consumer_thread([&collection]() {

  collection.attach_consumer();
  
  int item;
  
  for(int i = 0; i < 10; i++)
    collection.take(item);
    
  collection.detach_consumer();
});

Consumer & Producer Guards

In order to mitigate forgetting to attach or detach from a BlockingCollection the BlockingCollection Guard classes (i.e. ProducerGuard and ConsumerGuard) can be used for this purpose. Both Guard classes are a RAII-style mechanism for attaching a thread to the BlockingCollection and detaching it when the thread terminates. As well as in exception scenarios.

In the following examples, ConsumerGuard and ProducerGuard automatically attach and detach the std::threads to the BlockingCollection.

BlockingCollection<int> collection;
  
std::thread consumer_thread([&collection]() {

  ConsumerGuard<BlockingCollection<int>> Guard(collection);
  
  int item;
  
  for(int i = 0; i < 10; i++)
    collection.take(item);
});
std::thread producer_thread([&collection]() {

  ProducerGuard<BlockingCollection<int>> Guard(collection);

  for(int i = 0; i < 10; i++)
    collection.add(i+1);
});

Pluggable Condition Variable and Lock Types

The BlockingCollection class by default will use std::condition_variable and std::mutex classes. But those two synchronization primitives can be overridden by specializing ConditionVarTraits.

template <typename ConditionVarType,  typename LockType>
struct ConditionVarTraits;

template <>
struct ConditionVarTraits<std::condition_variable, std::mutex>
{
    static void initialize(std::condition_variable& cond_var) {
    }

    static void signal(std::condition_variable& cond_var) {
        cond_var.notify_one();
    }

    static void broadcast(std::condition_variable& cond_var) {
        cond_var.notify_all();
    }

    static void wait(std::condition_variable& cond_var, std::unique_lock<std::mutex>& lock) {
        cond_var.wait(lock);
    }

    template<class Rep, class Period> static bool wait_for(std::condition_variable& cond_var, std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& rel_time) {
        return std::cv_status::timeout == cond_var.wait_for(lock, rel_time);
    }
};

In the following example, ConditionVarTraits is specialized to use Win32 CONDITION_VARIABLE and SRW_LOCK.

Note that Win32's SRWLOCK synchronization primitive requires a wrapper class so that it can meet the BasicLockable requirements needed by std::unique_lock.

class WIN32_SRWLOCK {
public:
    WIN32_SRWLOCK() {
        InitializeSRWLock(&srw_);
    }

    void lock() {
        AcquireSRWLockExclusive(&srw_);
    }

    void unlock() {
        ReleaseSRWLockExclusive(&srw_);
    }

    SRWLOCK& native_handle() {
        return srw_;
    }
private:
    SRWLOCK srw_;
};

template <>
struct ConditionVarTraits<CONDITION_VARIABLE, WIN32_SRWLOCK>
{
    static void initialize(CONDITION_VARIABLE& cond_var) {
      InitializeConditionVariable(&cond_var);
    }

    static void signal(CONDITION_VARIABLE& cond_var) {
      WakeConditionVariable(&cond_var);
    }

    static void broadcast(CONDITION_VARIABLE& cond_var) {
      WakeAllConditionVariable(&cond_var);
    }

    static void wait(CONDITION_VARIABLE& cond_var, std::unique_lock<WIN32_SRWLOCK>& lock) {
      SleepConditionVariableSRW(&cond_var, &lock.mutex()->native_handle(), INFINITE, 0);
    }

    template<class Rep, class Period> static bool wait_for(CONDITION_VARIABLE& cond_var, std::unique_lock<WIN32_SRWLOCK>& lock, const std::chrono::duration<Rep, Period>& rel_time) {
    
      DWORD milliseconds = static_cast<DWORD>(rel_time.count());

      if (!SleepConditionVariableSRW(&cond_var, &lock.mutex()->native_handle(), milliseconds, 0)) {
        if (GetLastError() == ERROR_TIMEOUT)
          return true;
      }
      return false;
    }
};

Condition Variable Generator

The BlockingCollection class uses the ConditionVariableGenerator template to generate the condition variable and lock types it will use. In addition, the template also generates the strategy classes.

template<typename ThreadContainerType, typename NotFullSignalStrategy, typename NotEmptySignalStrategy, typename ConditionVarType, typename LockType> 
struct ConditionVariableGenerator {

    using NotFullType = ConditionVariable<ThreadContainerType, NotFullSignalStrategy, ConditionVarType, LockType>;
    using NotEmptyType = ConditionVariable<ThreadContainerType, NotEmptySignalStrategy, ConditionVarType, LockType>;

    using lock_type = LockType;
};

By default, the BlockingCollection class will use the following ConditionVariableGenerator type alias.

using StdConditionVariableGenerator = ConditionVariableGenerator<ThreadContainer<std::thread::id>, NotFullSignalStrategy<16>, NotEmptySignalStrategy<16>, std::condition_variable, std::mutex>;

But it can easily be replaced by something else such as the following.

using Win32ConditionVariableGenerator = ConditionVariableGenerator<ThreadContainer<std::thread::id>, NotFullSignalStrategy<16>, NotEmptySignalStrategy<16>, CONDITION_VARIABLE, WIN32_SRWLOCK>;

A custom condition variable generator can be used like so:

BlockingCollection<int, QueueContainer<int>, Win32ConditionVariableGenerator> collection;

References

  1. ^ a b c Microsoft Docs, "BlockingCollection Overview" link

BlockingCollection implements optimizations described in the following paper:

[2007] Hewlett Packard Development Company, L.P "Techniques for Improving the Scalability of Applications Using POSIX Thread Condition Variables" pdf

License

BlockingCollection uses the GPLv3 license that is available here.

Issues
Owner
Code Ex Machina LLC
Code Ex Machina LLC
A bounded multi-producer multi-consumer concurrent queue written in C++11

MPMCQueue.h A bounded multi-producer multi-consumer concurrent queue written in C++11. It's battle hardened and used daily in production: In the Frost

Erik Rigtorp 719 Jun 19, 2022
Thread pool - Thread pool using std::* primitives from C++17, with optional priority queue/greenthreading for POSIX.

thread_pool Thread pool using std::* primitives from C++11. Also includes a class for a priority thread pool. Requires concepts and C++17, including c

Tyler Hardin 74 Jun 15, 2022
A fast single-producer, single-consumer lock-free queue for C++

A single-producer, single-consumer lock-free queue for C++ This mini-repository has my very own implementation of a lock-free queue (that I designed f

Cameron 2.7k Jun 23, 2022
A bounded single-producer single-consumer wait-free and lock-free queue written in C++11

SPSCQueue.h A single producer single consumer wait-free and lock-free fixed size queue written in C++11. Example SPSCQueue<int> q(2); auto t = std::th

Erik Rigtorp 499 Jun 21, 2022
Concurrency Kit 2.1k Jun 18, 2022
Simple and fast C library implementing a thread-safe API to manage hash-tables, linked lists, lock-free ring buffers and queues

libhl C library implementing a set of APIs to efficiently manage some basic data structures such as : hashtables, linked lists, queues, trees, ringbuf

Andrea Guzzo 381 Jun 15, 2022
Thread-pool - Thread pool implementation using c++11 threads

Table of Contents Introduction Build instructions Thread pool Queue Submit function Thread worker Usage example Use case#1 Use case#2 Use case#3 Futur

Mariano Trebino 595 Jun 26, 2022
Thread-pool-cpp - High performance C++11 thread pool

thread-pool-cpp It is highly scalable and fast. It is header only. No external dependencies, only standard library needed. It implements both work-ste

Andrey Kubarkov 531 May 27, 2022
Sqrt OS is a simulation of an OS scheduler and memory manager using different scheduling algorithms including Highest Priority First (non-preemptive), Shortest Remaining Time Next, and Round Robin

A CPU scheduler determines an order for the execution of its scheduled processes; it decides which process will run according to a certain data structure that keeps track of the processes in the system and their status.

null 11 Jul 14, 2021
Vgpu unlock - Unlock vGPU functionality for consumer grade GPUs.

vgpu_unlock Unlock vGPU functionality for consumer-grade Nvidia GPUs. Important! This tool is not guarenteed to work out of the box in some cases, so

Jonathan Johansson 3.2k Jun 20, 2022
Code from https://queue.acm.org/detail.cfm?id=3448307 unzipped

Copyright (C) 2020-2021 Terence Kelly. All rights reserved. Author contact: [email protected], [email protected], [email protected] Adde

Breck Yunits 21 May 30, 2021
Smart queue that executes tasks in threadpool-like manner

execq execq is kind of task-based approach of processing data using threadpool idea with extended features. It supports different task sources and mai

Vladimir (Alkenso) 32 May 24, 2022
This is a study on how to do create a queue via IPC (inter-process communication)

IPC queue This is a study on how to do create a queue via IPC (inter-process communication). Please take a look at the examples of producer and consum

Tarcísio Zotelli Ferraz 2 Jan 10, 2022
A hybrid thread / fiber task scheduler written in C++ 11

Marl Marl is a hybrid thread / fiber task scheduler written in C++ 11. About Marl is a C++ 11 library that provides a fluent interface for running tas

Google 1.4k Jun 24, 2022
A easy to use multithreading thread pool library for C. It is a handy stream like job scheduler with an automatic garbage collector. This is a multithreaded job scheduler for non I/O bound computation.

A easy to use multithreading thread pool library for C. It is a handy stream-like job scheduler with an automatic garbage collector for non I/O bound computation.

Hyoung Min Suh 12 Jun 4, 2022
A C++17 thread pool for high-performance scientific computing.

We present a modern C++17-compatible thread pool implementation, built from scratch with high-performance scientific computing in mind. The thread pool is implemented as a single lightweight and self-contained class, and does not have any dependencies other than the C++17 standard library, thus allowing a great degree of portability

Barak Shoshany 743 Jun 18, 2022
An easy to use C++ Thread Pool

mvThreadPool (This library is available under a free and permissive license) mvThreadPool is a simple to use header only C++ threadpool based on work

Jonathan Hoffstadt 30 Jun 7, 2022
Fork of rpmalloc to be used with single thread applications and old C compilers

srpmalloc - Small rpmalloc This is a fork of rpmalloc, with the intent to be used in single threaded applications only, with old C99 compilers, and in

Eduardo Bart 6 Apr 30, 2022
An ultra-simple thread pool implementation for running void() functions in multiple worker threads

void_thread_pool.cpp © 2021 Dr Sebastien Sikora. [email protected] Updated 06/11/2021. What is it? void_thread_pool.cpp is an ultra-simple

Seb Sikora 1 Nov 19, 2021