Smart queue that executes tasks in threadpool-like manner

Overview

execq

execq is kind of task-based approach of processing data using threadpool idea with extended features. It supports different task sources and maintains task execution in parallel on N threads (according to hardware concurrency).

  • providers are queues and streams that allow to execute tasks in different ways
  • queues provide ability to process the object just 'putting it to the queue'
  • supports serial and concurrent queues
  • work as you like: submitting an object to process on the queue returns nonblocking (unlike std::async) std::future that can be used to get result as far as the object is processed
  • maintains optimal thread count to avoid excess CPU thread context switches
  • runs tasks from diffirent queues/streams 'by turn', avoiding starvation for task that have been added very late
  • designed to process multiple non-blocking task (generally you do not want to sleep/wait inside task-processing function)
  • C++11 compilant

Queues and Streams

execq deals with concurrent task execution using two approaches: queue-based and stream-based

You are free to use multimple queues and streams and any combinations of them!

1.1 Queue-based approach

Designed to process objects as 'push-and-forget'. Objects pushed into the queues are processed as soon as any thread is ready to handle it.

ExecutionQueue combines usual queue, synchronization mechanisms and execution inside threadpool.

Internally ExecutionQueue tracks tasks are being executed. If destroyed, the queue marks all running and pendings tasks as 'canceled'. Even if task was canceled before execution, it wouldn't be discarded and will be called on its turn but with 'isCanceled' == true.

ExecutionQueue can be:

  • concurrent: process objects in parallel on multiple threads // CreateConcurrentExecutionQueue
  • serial: process objects strictly in 'one-after-one' order. You can be sure that no tasks are executed simultaneously // CreateSerialExecutionQueue

execq allows to create 'IExecutionQueue' (both serial and concurrent) instance to process your objects in specific IExecutionPool.

IExecutionPool is kind of opaque threadpool. The same IExecutionPool object usually is used with multiple queues and streams

Now that is no need to write you own queue and synchronization around it - all is done inside!

#include <execq/execq.h>

// The function is called in parallel on the next free thread
// with the next object from the queue.
void ProcessObject(const std::atomic_bool& isCanceled, std::string&& object)
{
    if (isCanceled)
    {
        std::cout << "Queue has been canceled. Skipping object...";
        return;
    }
    
    std::cout << "Processing object: " << object << '\n';
}

int main(void)
{
    std::shared_ptr<execq::IExecutionPool> pool = execq::CreateExecutionPool();
    
    std::unique_ptr<execq::IExecutionQueue<void(std::string)>> queue = execq::CreateConcurrentExecutionQueue<void, std::string>(pool, &ProcessObject);
    
    queue->push("qwe");
    queue->push("some string");
    queue->push("");
    
    // when destroyed, queue waits until all tasks are executed
    
    return 0;
}
Standalone serial queue

Sometimes you may need just single-thread implementation of the queue to process things in the right order. For this purpose there is an ability to created pool-independent serial queue.

#include <execq/execq.h>

// The function is called in parallel on the next free thread
// with the next object from the queue.
void ProcessObjectOneByOne(const std::atomic_bool& isCanceled, std::string&& object)
{
    if (isCanceled)
    {
        std::cout << "Queue has been canceled. Skipping object...";
        return;
    }
    
    std::cout << "Processing object: " << object << '\n';
}

int main(void)
{
    std::unique_ptr<execq::IExecutionQueue<void(std::string)>> queue = execq::CreateSerialExecutionQueue<void, std::string>(&ProcessObjectOneByOne);
    
    queue->push("qwe");
    queue->push("some string");
    queue->push("");
    
    // when destroyed, queue waits until all tasks are executed
    
    return 0;
}

1.2 Queue-based approach: future inside!

All ExecutionQueues when pushing object into it return std::future. Future object is bound to the pushed object and referers to result of object processing. Note: returned std::future objects could be simply discarded. They wouldn't block in std::future destructor.

#include <execq/execq.h>

// The function is called in parallel on the next free thread
// with the next object from the queue.
size_t GetStringSize(const std::atomic_bool& isCanceled, std::string&& object)
{
    if (isCanceled)
    {
        std::cout << "Queue has been canceled. Skipping object...";
        return 0;
    }
    
    std::cout << "Processing object: " << object << '\n';
    
    return object.size();
}

int main(void)
{
    std::shared_ptr<execq::IExecutionPool> pool = execq::CreateExecutionPool();
    
    std::unique_ptr<execq::IExecutionQueue<size_t(std::string)>> queue = execq::CreateConcurrentExecutionQueue<size_t, std::string>(pool, &GetStringSize);
    
    std::future<size_t> future1 = queue->push("qwe");
    std::future<size_t> future2 = queue->push("some string");
    std::future<size_t> future3 = queue->push("hello future");
    
    const size_t totalSize = future1.get() + future2.get() + future3.get();

    return 0;
}

execq supports std::future, so ou can just wait until the object is processed.

2. Stream-based approach.

Designed to process uncountable amount of tasks as fast as possible, i.e. process next task whenever new thread is available.

execq allows to create 'IExecutionStream' object that will execute your code each time the thread in the pool is ready to execute next task. That approach should be considered as the most effective way to process unlimited (or almost unlimited) tasks.

#include <execq/execq.h>

// The function is called each time the thread is ready to execute next task.
// It is called only if stream is started.
void ProcessNextObject(const std::atomic_bool& isCanceled)
{
    if (isCanceled)
    {
        std::cout << "Stream has been canceled. Skipping...";
        return;
    }
    
    static std::atomic_int s_someObject { 0 };
    
    const int nextObject = s_someObject++;
    
    std::cout << "Processing object: " << nextObject << '\n';
}

int main(void)
{
    std::shared_ptr<execq::IExecutionPool> pool = execq::CreateExecutionPool();
    
    std::unique_ptr<execq::IExecutionStream> stream = execq::CreateExecutionStream(pool, &ProcessNextObject);
    
    stream->start();
    
    // Only for example purposes. Usually here (if in 'main') could be RunLoop/EventLoop.
    // Wait until some objects are processed.
    sleep(5);
    
    return 0;
}

Design principles & Tech. details

Consider to use single ExecutionPool object (across whole application) with multiple queues and streams. Combine queues and streams for free to achieve your goals. Be free to assign tasks to queue or operate stream even from the inside of it's callback.

'by-turn' execution

execq is designed in special way of dealing with the tasks of queues and streams to avoid starvation.

Let's assume simple example: there is 2 queues. First, 100 object are pushed to queue #1. After 1 object is pushed to queue #2.

Now few tasks from queue #1 are being executed. But next task for execute will be the task from queue #2, and only then tasks from queue #1.

Avoiding queue starvation

Some tasks could be very time-comsumptive. That means they will block all pool threads execution for a long time. This causes i.e. starvation: none of other queue tasks will be executed unless one of existing tasks is done.

To prevent this, each queue and stream additionally has it's own thread. This thread is some kind of 'insurance' thread, where the tasks from the queue/stream could be executed even if all pool's threads are busy for a long time.

Work to be done

  • Replace using of std::packaged_task with reference counting

Tests

By default, unit-tests are off. To enable them, just add CMake option -DEXECQ_TESTING_ENABLE=ON

You might also like...
A fast multi-producer, multi-consumer lock-free concurrent queue for C++11

moodycamel::ConcurrentQueue An industrial-strength lock-free queue for C++. Note: If all you need is a single-producer, single-consumer queue, I have

A fast single-producer, single-consumer lock-free queue for C++

A single-producer, single-consumer lock-free queue for C++ This mini-repository has my very own implementation of a lock-free queue (that I designed f

Code from https://queue.acm.org/detail.cfm?id=3448307 unzipped

Copyright (C) 2020-2021 Terence Kelly. All rights reserved. Author contact: tpkelly@acm.org, [email protected], [email protected] Adde

A bounded multi-producer multi-consumer concurrent queue written in C++11
A bounded multi-producer multi-consumer concurrent queue written in C++11

MPMCQueue.h A bounded multi-producer multi-consumer concurrent queue written in C++11. It's battle hardened and used daily in production: In the Frost

A bounded single-producer single-consumer wait-free and lock-free queue written in C++11
A bounded single-producer single-consumer wait-free and lock-free queue written in C++11

SPSCQueue.h A single producer single consumer wait-free and lock-free fixed size queue written in C++11. Example SPSCQueueint q(2); auto t = std::th

Thread pool - Thread pool using std::* primitives from C++17, with optional priority queue/greenthreading for POSIX.

thread_pool Thread pool using std::* primitives from C++11. Also includes a class for a priority thread pool. Requires concepts and C++17, including c

This is a study on how to do create a queue via IPC (inter-process communication)

IPC queue This is a study on how to do create a queue via IPC (inter-process communication). Please take a look at the examples of producer and consum

stdgpu: Efficient STL-like Data Structures on the GPU
stdgpu: Efficient STL-like Data Structures on the GPU

stdgpu: Efficient STL-like Data Structures on the GPU Features | Examples | Documentation | Building | Integration | Contributing | License | Contact

A easy to use multithreading thread pool library for C. It is a handy stream like job scheduler with an automatic garbage collector. This is a multithreaded job scheduler for non I/O bound computation.

A easy to use multithreading thread pool library for C. It is a handy stream-like job scheduler with an automatic garbage collector for non I/O bound computation.

Releases(3.0.0)
Owner
Vladimir (Alkenso)
Vladimir (Alkenso)
C++11 thread safe, multi-producer, multi-consumer blocking queue, stack & priority queue class

BlockingCollection BlockingCollection is a C++11 thread safe collection class that provides the following features: Modeled after .NET BlockingCollect

Code Ex Machina LLC 50 Nov 23, 2022
Light, fast, threadpool for C++20

riften::Thiefpool A blazing-fast, lightweight, work-stealing thread-pool for C++20. Built on the lock-free concurrent riften::Deque. Usage #include "r

Conor Williams 67 Dec 28, 2022
EOSP ThreadPool is a header-only templated thread pool writtent in c++17.

EOSP Threadpool Description EOSP ThreadPool is a header-only templated thread pool writtent in c++17. It is designed to be easy to use while being abl

null 1 Apr 22, 2022
ThreadPool - A fastest, exception-safety and pure C++17 thread pool.

Warnings Since commit 468129863ec65c0b4ede02e8581bea682351a6d2, I move ThreadPool to C++17. (To use std::apply.) In addition, the rule of passing para

Han-Kuan Chen 124 Dec 28, 2022
DwThreadPool - A simple, header-only, dependency-free, C++ 11 based ThreadPool library.

dwThreadPool A simple, header-only, dependency-free, C++ 11 based ThreadPool library. Features C++ 11 Minimal Source Code Header-only No external depe

Dihara Wijetunga 27 Oct 28, 2022
ThreadPool - A simple C++11 Thread Pool implementation

ThreadPool A simple C++11 Thread Pool implementation. Basic usage: // create thread pool with 4 worker threads ThreadPool pool(4); // enqueue and sto

Jakob Progsch 6.1k Jan 7, 2023
Threadpool c++17 - 采用多线程多对列,每个线程控制一个队列,替代老的多个线程公用一个队列

ThreadPool c++17 采用多线程多对列,每个线程控制一个队列,替代老的多个线程公用一个队列。 将任务拆分多个下发给每个线程,每个线程掌管 M(tasks) / N(threads)个任务 M(tasks) / N(threads)个任务 公用一个队列。减少竞争。 使用方法: 初始化线程池

黑塞 7 Apr 25, 2022
Modern concurrency for C++. Tasks, executors, timers and C++20 coroutines to rule them all

concurrencpp, the C++ concurrency library concurrencpp is a tasking library for C++ allowing developers to write highly concurrent applications easily

David Haim 1.2k Jan 3, 2023
Bistro: A fast, flexible toolkit for scheduling and running distributed tasks

Bistro is a flexible distributed scheduler, a high-performance framework supporting multiple paradigms while retaining ease of configuration, management, and monitoring.

Facebook 1k Dec 19, 2022
Partr - Parallel Tasks Runtime

Parallel Tasks Runtime A parallel task execution runtime that uses parallel depth-first (PDF) scheduling [1]. [1] Shimin Chen, Phillip B. Gibbons, Mic

null 32 Jul 17, 2022