A C++17 thread pool for high-performance scientific computing.

Overview

DOI:10.5281/zenodo.4742687 arXiv:2105.00613 License: MIT Language: C++17 File size in bytes GitHub last commit GitHub repo stars Twitter @BarakShoshany Open in Visual Studio Code

A C++17 Thread Pool for High-Performance Scientific Computing

Barak Shoshany
Department of Physics, Brock University,
1812 Sir Isaac Brock Way, St. Catharines, Ontario, L2S 3A1, Canada
[email protected] | https://baraksh.com/
Companion paper: arXiv:2105.00613
DOI: doi:10.5281/zenodo.4742687

Abstract

We present a modern C++17-compatible thread pool implementation, built from scratch with high-performance scientific computing in mind. The thread pool is implemented as a single lightweight and self-contained class, and does not have any dependencies other than the C++17 standard library, thus allowing a great degree of portability. In particular, our implementation does not utilize OpenMP or any other high-level multithreading APIs, and thus gives the programmer precise low-level control over the details of the parallelization, which permits more robust optimizations. The thread pool was extensively tested on both AMD and Intel CPUs with up to 40 cores and 80 threads. This paper provides motivation, detailed usage instructions, and performance tests. The code is freely available in the GitHub repository. This README.md file contains roughly the same content as the companion paper.

Introduction

Motivation

Multithreading is essential for modern high-performance computing. Since C++11, the C++ standard library has included built-in low-level multithreading support using constructs such as std::thread. However, std::thread creates a new thread each time it is called, which can have a significant performance overhead. Furthermore, it is possible to create more threads than the hardware can handle simultaneously, potentially resulting in a substantial slowdown.

This library contains a thread pool class, thread_pool, which avoids these issues by creating a fixed pool of threads once and for all, and then reusing the same threads to perform different tasks throughout the lifetime of the pool. By default, the number of threads in the pool is equal to the maximum number of threads that the hardware can run in parallel.

The user submits tasks to be executed into a queue. Whenever a thread becomes available, it pops a task from the queue and executes it. Each task is automatically assigned an std::future, which can be used to wait for the task to finish executing and/or obtain its eventual return value.

In addition to std::thread, the C++ standard library also offers the higher-level construct std::async, which may internally utilize a thread pool - but this is not guaranteed, and in fact, currently only the MSVC implementation of std::async uses a thread pool, while GCC and Clang do not. Using our custom-made thread pool class instead of std::async allows the user more control, transparency, and portability.

High-level multithreading APIs, such as OpenMP, allow simple one-line automatic parallelization of C++ code, but they do not give the user precise low-level control over the details of the parallelization. The thread pool class presented here allows the programmer to perform and manage the parallelization at the lowest level, and thus permits more robust optimizations, which can be used to achieve considerably higher performance.

As demonstrated in the performance tests below, using our thread pool class we were able to saturate the upper bound of expected speedup for matrix multiplication and generation of random matrices. These performance tests were performed on 12-core / 24-thread and 40-core / 80-thread systems using GCC on Linux.

Overview of features

  • Fast:
    • Built from scratch with maximum performance in mind.
    • Suitable for use in high-performance computing nodes with a very large number of CPU cores.
    • Compact code, to reduce both compilation time and binary size.
    • Reusing threads avoids the overhead of creating and destroying them for individual tasks.
    • A task queue ensures that there are never more threads running in parallel than allowed by the hardware.
  • Lightweight:
    • Only ~180 lines of code, excluding comments, blank lines, and the two optional helper classes.
    • Single header file: simply #include "thread_pool.hpp".
    • Header-only: no need to install or build the library.
    • Self-contained: no external requirements or dependencies. Does not require OpenMP or any other multithreading APIs. Only uses the C++ standard library, and works with any C++17-compliant compiler.
  • Easy to use:
    • Very simple operation, using a handful of member functions.
    • Every task submitted to the queue automatically generates an std::future, which can be used to wait for the task to finish executing and/or obtain its eventual return value.
    • Optionally, tasks may also be submitted without generating a future, sacrificing convenience for greater performance.
    • The code is thoroughly documented using Doxygen comments - not only the interface, but also the implementation, in case the user would like to make modifications.
    • The included test program thread_pool_test.cpp can be used to perform comprehensive automated tests and benchmarks, and also serves as an extensive example of how to properly use the package.
  • Additional features:
    • Automatically parallelize a loop into any number of parallel tasks.
    • Easily wait for all tasks in the queue to complete.
    • Change the number of threads in the pool safely and on-the-fly as needed.
    • Fine-tune the sleep duration of each thread's worker function for optimal performance.
    • Monitor the number of queued and/or running tasks.
    • Pause and resume popping new tasks out of the queue.
    • Catch exceptions thrown by the submitted tasks.
    • Synchronize output to a stream from multiple threads in parallel using the synced_stream helper class.
    • Easily measure execution time for benchmarking purposes using the timer helper class.
    • Under continuous and active development. Bug reports and feature requests are welcome, and should be made via GitHub issues.

Compiling and compatibility

This library should successfully compile on any C++17 standard-compliant compiler, on all operating systems and architectures for which such a compiler is available. Compatibility was verified with a 12-core / 24-thread AMD Ryzen 9 3900X CPU at 3.8 GHz using the following compilers and platforms:

In addition, this library was tested on a Compute Canada node equipped with two 20-core / 40-thread Intel Xeon Gold 6148 CPUs at 2.4 GHz (for a total of 40 cores and 80 threads), running CentOS Linux 7.6.1810, using the following compilers:

The test program thread_pool_test.cpp was compiled without warnings (with the warning flags -Wall -Wpedantic -Wextra -Wconversion -Weffc++ in GCC/Clang and /W4 in MSVC), executed, and successfully completed all automated tests using all of the compilers and systems mentioned above.

As this library requires C++17 features, the code must be compiled with C++17 support:

  • For GCC, Clang, or ICC, use the -std=c++17 flag. On Linux, you will also need to use the -pthread flag to enable the POSIX threads library.
  • For MSVC, use /std:c++17.

For maximum performance, it is recommended to compile with all available compiler optimizations:

  • For GCC, Clang, or ICC, use the -O3 flag.
  • For MSVC, use /O2.

Getting started

Including the library

To use the thread pool library, simply download the latest release from the GitHub repository, place the single header file thread_pool.hpp in the desired folder, and include it in your program:

#include "thread_pool.hpp"

The thread pool will now be accessible via the thread_pool class.

Installing using vcpkg

If you are using the vcpkg C/C++ library manager, you can easily download and install this package with the following commands.

On Linux/macOS:

./vcpkg install bshoshany-thread-pool

On Windows:

.\vcpkg install bshoshany-thread-pool:x86-windows bshoshany-thread-pool:x64-windows

The thread pool will then be available automatically in the build system you integrated vcpkg with (e.g. MSBuild or CMake). Simply write #include "thread_pool.hpp" in any project to use the thread pool, without having to copy to file into the project first. I will update the vcpkg port with each new release, so it will be updated automatically when you run vcpkg upgrade.

Please see the vcpkg repository for more information on how to use vcpkg.

Constructors

The default constructor creates a thread pool with as many threads as the hardware can handle concurrently, as reported by the implementation via std::thread::hardware_concurrency(). With a hyperthreaded CPU, this will be twice the number of CPU cores. This is probably the constructor you want to use. For example:

// Constructs a thread pool with as many threads as available in the hardware.
thread_pool pool;

Optionally, a number of threads different from the hardware concurrency can be specified as an argument to the constructor. However, note that adding more threads than the hardware can handle will not improve performance, and in fact will most likely hinder it. This option exists in order to allow using less threads than the hardware concurrency, in cases where you wish to leave some threads available for other processes. For example:

// Constructs a thread pool with only 12 threads.
thread_pool pool(12);

If your program's main thread only submits tasks to the thread pool and waits for them to finish, and does not perform any computationally intensive tasks on its own, then it is recommended to use the default value for the number of threads. This ensures that all of the threads available in the hardware will be put to work while the main thread waits.

However, if your main thread does perform computationally intensive tasks on its own, then it is recommended to use the value std::thread::hardware_concurrency() - 1 for the number of threads. In this case, the main thread plus the thread pool will together take up exactly all the threads available in the hardware.

Getting and resetting the number of threads in the pool

The member function get_thread_count() returns the number of threads in the pool. This will be equal to std::thread::hardware_concurrency() if the default constructor was used.

It is generally unnecessary to change the number of threads in the pool after it has been created, since the whole point of a thread pool is that you only create the threads once. However, if needed, this can be done, safely and on-the-fly, using the reset() member function.

reset() will wait for all currently running tasks to be completed, but will leave the rest of the tasks in the queue. Then it will destroy the thread pool and create a new one with the desired new number of threads, as specified in the function's argument (or the hardware concurrency if no argument is given). The new thread pool will then resume executing the tasks that remained in the queue and any new submitted tasks.

Finding the version of the package

If desired, the version of this package may be read during compilation time from the macro THREAD_POOL_VERSION. The value will be a string containing the version number and release date. For example:

std::cout << "Thread pool library version is " << THREAD_POOL_VERSION << ".\n";

Sample output:

Thread pool library version is v2.0.0 (2021-08-14).

Submitting and waiting for tasks

Submitting tasks to the queue with futures

A task can be any function, with zero or more arguments, and with or without a return value. Once a task has been submitted to the queue, it will be executed as soon as a thread becomes available. Tasks are executed in the order that they were submitted (first-in, first-out).

The member function submit() is used to submit tasks to the queue. The first argument is the function to execute, and the rest of the arguments are the arguments to pass to the function, if any. The return value is an std::future associated to the task. For example:

// Submit a task without arguments to the queue, and get a future for it.
auto my_future = pool.submit(task);
// Submit a task with one argument to the queue, and get a future for it.
auto my_future = pool.submit(task, arg);
// Submit a task with two arguments to the queue, and get a future for it.
auto my_future = pool.submit(task, arg1, arg2);

Using auto for the return value of submit() is recommended, since it means the compiler will automatically detect which instance of the template std::future to use. The value of the future depends on whether the function has a return value or not:

  • If the submitted function has a return value, then the future will be set to that value when the function finishes its execution.
  • If the submitted function does not have a return value, then the future will be a bool that will be set to true when the function finishes its execution.

To wait until the future's value becomes available, use the member function wait(). To obtain the value itself, use the member function get(), which will also automatically wait for the future if it's not ready yet. For example:

// Submit a task and get a future.
auto my_future = pool.submit(task);
// Do some other stuff while the task is executing.
do_stuff();
// Get the task's return value from the future, waiting for it to finish running if needed.
auto my_return_value = my_future.get();

Submitting tasks to the queue without futures

Usually, it is best to submit a task to the queue using submit(). This allows you to wait for the task to finish and/or get its return value later. However, sometimes a future is not needed, for example when you just want to "set and forget" a certain task, or if the task already communicates with the main thread or with other tasks without using futures, such as via references or pointers. In such cases, you may wish to avoid the overhead involved in assigning a future to the task in order to increase performance.

The member function push_task() allows you to submit a task to the queue without generating a future for it. The task can have any number of arguments, but it cannot have a return value. For example:

// Submit a task without arguments or return value to the queue.
pool.push_task(task);
// Submit a task with one argument and no return value to the queue.
pool.push_task(task, arg);
// Submit a task with two arguments and no return value to the queue.
pool.push_task(task, arg1, arg2);

Manually waiting for all tasks to complete

To wait for a single submitted task to complete, use submit() and then use the wait() or get() member functions of the obtained future. However, in cases where you need to wait until all submitted tasks finish their execution, or if the tasks have been submitted without futures using push_task(), you can use the member function wait_for_tasks().

Consider, for example, the following code:

thread_pool pool;
size_t a[100];
for (size_t i = 0; i < 100; i++)
    pool.push_task([&a, i] { a[i] = i * i; });
std::cout << a[50];

The output will most likely be garbage, since the task that modifies a[50] has not yet finished executing by the time we try to access that element (in fact, that task is probably still waiting in the queue). One solution would be to use submit() instead of push_task(), but perhaps we don't want the overhead of generating 100 different futures. Instead, simply adding the line

pool.wait_for_tasks();

after the for loop will ensure - as efficiently as possible - that all tasks have finished running before we attempt to access any elements of the array a, and the code will print out the value 2500 as expected. (Note, however, that wait_for_tasks() will wait for all the tasks in the queue, including those that are unrelated to the for loop. Using parallelize_loop() would make much more sense in this particular case, as it will wait only for the tasks related to the loop.)

Parallelizing loops

Consider the following loop:

for (T i = start; i < end; i++)
    do_something(i);

where:

  • T is any signed or unsigned integer type.
  • The loop is over the range [start, end), i.e. inclusive of start but exclusive of end.
  • do_something() is an operation performed for each loop index i, such as modifying an array with end - start elements.

This loop may be automatically parallelized and submitted to the thread pool's queue using the member function parallelize_loop() as follows:

auto loop = [](const T &a, const T &b)
{
    for (T i = a; i < b; i++)
        do_something(i);
};
pool.parallelize_loop(start, end, loop, n);

The range of indices [start, end) will be divided into n blocks of the form [a, b). For example, if the range is [0, 9) and there are 3 blocks, then the blocks will be the ranges [0, 3), [3, 6), and [6, 9). If possible, the blocks will be equal in size, otherwise the last block may be a bit longer. Then, a task will be submitted for each block, consisting of the function loop() with its two arguments being the start and end of the range [a, b) of each block. The main thread will then wait until all tasks generated by parallelize_loop() finish executing (and only those tasks - not any other tasks that also happen to be in the queue).

In the example above, the lambda function loop was defined separately for clarity. In practice, the lambda function will usually be defined within the argument itself, as in the example below. loop can also be an ordinary function (with no return value) instead of a lambda function, but that may be less useful, since typically one would like to capture some of the surrounding variables, as below.

If the fourth argument n is not specified, the number of blocks will be equal to the number of threads in the pool. For best performance, it is recommended to do your own benchmarks to find the optimal number of blocks for each loop (you can use the timer helper class - see below). Using less tasks than there are threads may be preferred if you are also running other tasks in parallel. Using more tasks than there are threads may improve performance in some cases.

As a simple example, the following code will calculate the squares of all integers from 0 to 99. Since there are 10 threads, and we did not specify a fourth argument, the loop will be divided into 10 blocks, each calculating 10 squares:

#include "thread_pool.hpp"

int main()
{
    thread_pool pool(10);
    uint32_t squares[100];
    pool.parallelize_loop(0, 100,
                          [&squares](const uint32_t &a, const uint32_t &b)
                          {
                              for (uint32_t i = a; i < b; i++)
                                  squares[i] = i * i;
                          });
    std::cout << "16^2 = " << squares[16] << '\n';
    std::cout << "32^2 = " << squares[32] << '\n';
}

The output should be:

16^2 = 256
32^2 = 1024

Helper classes

Synchronizing printing to an output stream

When printing to an output stream from multiple threads in parallel, the output may become garbled. For example, consider this code:

thread_pool pool;
for (auto i = 1; i <= 5; i++)
    pool.push_task([i] {
        std::cout << "Task no. " << i << " executing.\n";
    });

The output may look as follows:

Task no. Task no. 2Task no. 5 executing.
Task no.  executing.
Task no. 1 executing.
4 executing.
3 executing.

The reason is that, although each individual insertion to std::cout is thread-safe, there is no mechanism in place to ensure subsequent insertions from the same thread are printed contiguously.

The helper class synced_stream is designed to eliminate such synchronization issues. The constructor takes one optional argument, specifying the output stream to print to. If no argument is supplied, std::cout will be used:

// Construct a synced stream that will print to std::cout.
synced_stream sync_out;
// Construct a synced stream that will print to the output stream my_stream.
synced_stream sync_out(my_stream);

The member function print() takes an arbitrary number of arguments, which are inserted into the stream one by one, in the order they were given. println() does the same, but also prints a newline character \n at the end, for convenience. A mutex is used to synchronize this process, so that any other calls to print() or println() using the same synced_stream object must wait until the previous call has finished.

As an example, this code:

synced_stream sync_out;
thread_pool pool;
for (auto i = 1; i <= 5; i++)
    pool.push_task([i, &sync_out] {
        sync_out.println("Task no. ", i, " executing.");
    });

Will print out:

Task no. 1 executing.
Task no. 2 executing.
Task no. 3 executing.
Task no. 4 executing.
Task no. 5 executing.

Warning: Always create the synced_stream object before the thread_pool object, as we did in this example. When the thread_pool object goes out of scope, it waits for the remaining tasks to be executed. If the synced_stream object goes out of scope before the thread_pool object, then any tasks using the synced_stream will crash. Since objects are destructed in the opposite order of construction, creating the synced_stream object before the thread_pool object ensures that the synced_stream is always available to the tasks, even while the pool is destructing.

Measuring execution time

If you are using a thread pool, then your code is most likely performance-critical. Achieving maximum performance requires performing a considerable amount of benchmarking to determine the optimal settings and algorithms. Therefore, it is important to be able to measure the execution time of various computations and operations under different conditions.

For example, you may be interested in figuring out:

  • The optimal number of threads in the pool.
  • The optimal number of tasks to divide a specific operation into, either using parallelize_loop() or manually.
  • The optimal sleep duration for the worker functions.

The helper class timer provides a simple way to measure execution time. It is very straightforward to use:

  1. Create a new timer object.
  2. Immediately before you execute the computation that you want to time, call the start() member function.
  3. Immediately after the computation ends, call the stop() member function.
  4. Use the member function ms() to obtain the elapsed time for the computation in milliseconds.

For example:

timer tmr;
tmr.start();
do_something();
tmr.stop();
std::cout << "The elapsed time was " << tmr.ms() << " ms.\n";

Other features

Setting the worker function's sleep duration

The worker function is the function that controls the execution of tasks by each thread. It loops continuously, and with each iteration of the loop, checks if there are any tasks in the queue. If it finds a task, it pops it out of the queue and executes it. If it does not find a task, it will wait for a bit, by calling std::this_thread::sleep_for(), and then check the queue again. The public member variable sleep_duration controls the duration, in microseconds, that the worker function sleeps for when it cannot find a task in the queue.

The default value of sleep_duration is 1000 microseconds, or 1 millisecond. In our benchmarks, lower values resulted in high CPU usage when the workers were idle. The value of 1000 microseconds was roughly the minimum value needed to reduce the idle CPU usage to a negligible amount.

In addition, in our benchmarks this value resulted in moderately improved performance compared to lower values, since the workers check the queue - which is a costly process - less frequently. On the other hand, increasing the value even more could potentially cause the workers to spend too much time sleeping and not pick up tasks from the queue quickly enough, so 1000 is the "sweet spot".

However, please note that this value is likely unique to the particular system our benchmarks were performed on, and your own optimal value would depend on factors such as your OS and C++ implementation, the type, complexity, and average duration of the tasks submitted to the pool, and whether there are any other programs running at the same time. Therefore, it is strongly recommended to do your own benchmarks and find the value that works best for you.

If sleep_duration is set to 0, then the worker function will execute std::this_thread::yield() instead of sleeping if there are no tasks in the queue. This will suggest to the OS that it should put this thread on hold and allow other threads to run instead. However, this also causes the worker functions to have high CPU usage when idle. On the other hand, for some applications this setting may provide better performance than sleeping - again, do your own benchmarks and find what works best for you.

Monitoring the tasks

Sometimes you may wish to monitor what is happening with the tasks you submitted to the pool. This may be done using three member functions:

  • get_tasks_queued() gets the number of tasks currently waiting in the queue to be executed by the threads.
  • get_tasks_running() gets the number of tasks currently being executed by the threads.
  • get_tasks_total() gets the total number of unfinished tasks - either still in the queue, or running in a thread.
  • Note that get_tasks_running() == get_tasks_total() - get_tasks_queued().

These functions are demonstrated in the following program:

#include "thread_pool.hpp"

synced_stream sync_out;
thread_pool pool(4);

void sleep_half_second(const size_t &i)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    sync_out.println("Task ", i, " done.");
}

void monitor_tasks()
{
    sync_out.println(pool.get_tasks_total(),
                     " tasks total, ",
                     pool.get_tasks_running(),
                     " tasks running, ",
                     pool.get_tasks_queued(),
                     " tasks queued.");
}

int main()
{
    for (size_t i = 0; i < 12; i++)
        pool.push_task(sleep_half_second, i);
    monitor_tasks();
    std::this_thread::sleep_for(std::chrono::milliseconds(750));
    monitor_tasks();
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    monitor_tasks();
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    monitor_tasks();
}

Assuming you have at least 4 hardware threads (so that 4 tasks can run concurrently), the output will be similar to:

12 tasks total, 0 tasks running, 12 tasks queued.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
8 tasks total, 4 tasks running, 4 tasks queued.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
4 tasks total, 4 tasks running, 0 tasks queued.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
0 tasks total, 0 tasks running, 0 tasks queued.

Pausing the workers

Sometimes you may wish to temporarily pause the execution of tasks, or perhaps you want to submit tasks to the queue but only start executing them at a later time. You can do this using the public member variable paused.

When paused is set to true, the workers will temporarily stop popping new tasks out of the queue. However, any tasks already executed will keep running until they are done, since the thread pool has no control over the internal code of your tasks. If you need to pause a task in the middle of its execution, you must do that manually by programming your own pause mechanism into the task itself. To resume popping tasks, set paused back to its default value of false.

Here is an example:

#include "thread_pool.hpp"

synced_stream sync_out;
thread_pool pool(4);

void sleep_half_second(const size_t &i)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    sync_out.println("Task ", i, " done.");
}

int main()
{
    for (size_t i = 0; i < 8; i++)
        pool.push_task(sleep_half_second, i);
    sync_out.println("Submitted 8 tasks.");
    std::this_thread::sleep_for(std::chrono::milliseconds(250));
    pool.paused = true;
    sync_out.println("Pool paused.");
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    sync_out.println("Still paused...");
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    for (size_t i = 8; i < 12; i++)
        pool.push_task(sleep_half_second, i);
    sync_out.println("Submitted 4 more tasks.");
    sync_out.println("Still paused...");
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    pool.paused = false;
    sync_out.println("Pool resumed.");
}

Assuming you have at least 4 hardware threads, the output will be similar to:

Submitted 8 tasks.
Pool paused.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
Still paused...
Submitted 4 more tasks.
Still paused...
Pool resumed.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.

Here is what happened. We initially submitted a total of 8 tasks to the queue. Since we waited for 250ms before pausing, the first 4 tasks have already started running, so they kept running until they finished. While the pool was paused, we submitted 4 more tasks to the queue, but they just waited at the end of the queue. When we resumed, the remaining 4 initial tasks were executed, followed by the 4 new tasks.

While the workers are paused, wait_for_tasks() will wait for the running tasks instead of all tasks (otherwise it would wait forever). This is demonstrated by the following program:

#include "thread_pool.hpp"

synced_stream sync_out;
thread_pool pool(4);

void sleep_half_second(const size_t &i)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    sync_out.println("Task ", i, " done.");
}

int main()
{
    for (size_t i = 0; i < 8; i++)
        pool.push_task(sleep_half_second, i);
    sync_out.println("Submitted 8 tasks. Waiting for them to complete.");
    pool.wait_for_tasks();
    for (size_t i = 8; i < 20; i++)
        pool.push_task(sleep_half_second, i);
    sync_out.println("Submitted 12 more tasks.");
    std::this_thread::sleep_for(std::chrono::milliseconds(250));
    pool.paused = true;
    sync_out.println("Pool paused. Waiting for the ", pool.get_tasks_running(), " running tasks to complete.");
    pool.wait_for_tasks();
    sync_out.println("All running tasks completed. ", pool.get_tasks_queued(), " tasks still queued.");
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    sync_out.println("Still paused...");
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    sync_out.println("Still paused...");
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    pool.paused = false;
    std::this_thread::sleep_for(std::chrono::milliseconds(250));
    sync_out.println("Pool resumed. Waiting for the remaining ", pool.get_tasks_total(), " tasks (", pool.get_tasks_running(), " running and ", pool.get_tasks_queued(), " queued) to complete.");
    pool.wait_for_tasks();
    sync_out.println("All tasks completed.");
}

The output should be similar to:

Submitted 8 tasks. Waiting for them to complete.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
Submitted 12 more tasks.
Pool paused. Waiting for the 4 running tasks to complete.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
All running tasks completed. 8 tasks still queued.
Still paused...
Still paused...
Pool resumed. Waiting for the remaining 8 tasks (4 running and 4 queued) to complete.
Task 12 done.
Task 13 done.
Task 14 done.
Task 15 done.
Task 16 done.
Task 17 done.
Task 18 done.
Task 19 done.
All tasks completed.

The first wait_for_tasks(), which was called with paused == false, waited for all 8 tasks, both running and queued. The second wait_for_tasks(), which was called with paused == true, only waited for the 4 running tasks, while the other 8 tasks remained queued, and were not executed since the pool was paused. Finally, the third wait_for_tasks(), which was called with paused == false, waited for the remaining 8 tasks, both running and queued.

Warning: If the thread pool is destroyed while paused, any tasks still in the queue will never be executed.

Exception handling

submit() catches any exceptions thrown by the submitted task and forwards them to the corresponding future. They can then be caught when invoking the get() member function of the future. For example:

#include "thread_pool.hpp"

double inverse(const double &x)
{
    if (x == 0)
        throw std::runtime_error("Division by zero!");
    else
        return 1 / x;
}

int main()
{
    thread_pool pool;
    auto my_future = pool.submit(inverse, 0);
    try
    {
        double result = my_future.get();
        std::cout << "The result is: " << result << '\n';
    }
    catch (const std::exception &e)
    {
        std::cout << "Caught exception: " << e.what() << '\n';
    }
}

The output will be:

Caught exception: Division by zero!

Testing the package

The included file thread_pool_test.cpp will perform automated tests of all aspects of the package, and benchmark some multithreaded matrix operations. The output will be printed both to std::cout and to a file named thread_pool_test-yyyy-mm-dd_hh.mm.ss.log based on the current date and time. In addition, the code is thoroughly documented, and is meant to serve as an extensive example of how to properly use the package.

Please make sure to:

  1. Compile thread_pool_test.cpp with optimization flags enabled (e.g. -O3 on GCC / Clang or /O2 on MSVC).
  2. Run the test without any other applications, especially multithreaded applications, running in parallel.

If any of the tests fail, please submit a bug report including the exact specifications of your system (OS, CPU, compiler, etc.) and the generated log file.

Automated tests

A sample output of a successful run of the automated tests is as follows:

A C++17 Thread Pool for High-Performance Scientific Computing
(c) 2021 Barak Shoshany ([email protected]) (http://baraksh.com)
GitHub: https://github.com/bshoshany/thread-pool

Thread pool library version is v2.0.0 (2021-08-14).
Hardware concurrency is 24.
Generating log file: thread_pool_test-2021-08-14_23.34.25.log.

Important: Please do not run any other applications, especially multithreaded applications, in parallel with this test!

====================================
Checking that the constructor works:
====================================
Checking that the thread pool reports a number of threads equal to the hardware concurrency...
-> PASSED!
Checking that the manually counted number of unique thread IDs is equal to the reported number of threads...
-> PASSED!

============================
Checking that reset() works:
============================
Checking that after reset() the thread pool reports a number of threads equal to half the hardware concurrency...
-> PASSED!
Checking that after reset() the manually counted number of unique thread IDs is equal to the reported number of threads...
-> PASSED!
Checking that after a second reset() the thread pool reports a number of threads equal to the hardware concurrency...
-> PASSED!
Checking that after a second reset() the manually counted number of unique thread IDs is equal to the reported number of threads...
-> PASSED!

================================
Checking that push_task() works:
================================
Checking that push_task() works for a function with no arguments or return value...
-> PASSED!
Checking that push_task() works for a function with one argument and no return value...
-> PASSED!
Checking that push_task() works for a function with two arguments and no return value...
-> PASSED!

=============================
Checking that submit() works:
=============================
Checking that submit() works for a function with no arguments or return value...
-> PASSED!
Checking that submit() works for a function with one argument and no return value...
-> PASSED!
Checking that submit() works for a function with two arguments and no return value...
-> PASSED!
Checking that submit() works for a function with no arguments and a return value...
-> PASSED!
Checking that submit() works for a function with one argument and a return value...
-> PASSED!
Checking that submit() works for a function with two arguments and a return value...
-> PASSED!

=======================================
Checking that wait_for_tasks() works...
=======================================
-> PASSED!

=======================================
Checking that parallelize_loop() works:
=======================================
Verifying that a loop from -2064 to 551 with 4 tasks modifies all indices...
-> PASSED!
Verifying that a loop from -658 to -77 with 19 tasks modifies all indices...
-> PASSED!
Verifying that a loop from 1512 to -1046 with 1 task modifies all indices...
-> PASSED!
Verifying that a loop from -2334 to -1770 with 23 tasks modifies all indices...
-> PASSED!
Verifying that a loop from 1775 to -1242 with 13 tasks modifies all indices...
-> PASSED!
Verifying that a loop from 846 to -506 with 14 tasks modifies all indices...
-> PASSED!
Verifying that a loop from -301 to -2111 with 5 tasks modifies all indices...
-> PASSED!
Verifying that a loop from 1758 to -1602 with 11 tasks modifies all indices...
-> PASSED!
Verifying that a loop from 94 to -1103 with 24 tasks modifies all indices...
-> PASSED!
Verifying that a loop from 612 to 2026 with 13 tasks modifies all indices...
-> PASSED!

======================================================
Checking that different values of sleep_duration work:
======================================================
Submitting tasks with sleep_duration = 0 microseconds...
-> PASSED!
Submitting tasks with sleep_duration = 1909 microseconds...
-> PASSED!
Submitting tasks with sleep_duration = 469 microseconds...
-> PASSED!
Submitting tasks with sleep_duration = 964 microseconds...
-> PASSED!
Submitting tasks with sleep_duration = 1946 microseconds...
-> PASSED!
Submitting tasks with sleep_duration = 773 microseconds...
-> PASSED!
Resetting sleep_duration to the default value (1000 microseconds).

====================================
Checking that task monitoring works:
====================================
Resetting pool to 4 threads.
Submitting 12 tasks.
After submission, should have: 12 tasks total, 4 tasks running, 8 tasks queued...
-> PASSED!
Task 1 released.
Task 3 released.
Task 0 released.
Task 2 released.
After releasing 4 tasks, should have: 8 tasks total, 4 tasks running, 4 tasks queued...
Task 5 released.
Task 4 released.
Task 7 released.
Task 6 released.
-> PASSED!
After releasing 4 more tasks, should have: 4 tasks total, 4 tasks running, 0 tasks queued...
-> PASSED!
Task 11 released.
Task 8 released.
Task 9 released.
Task 10 released.
After releasing the final 4 tasks, should have: 0 tasks total, 0 tasks running, 0 tasks queued...
-> PASSED!
Resetting pool to 24 threads.

============================
Checking that pausing works:
============================
Resetting pool to 4 threads.
Pausing pool.
Submitting 12 tasks, each one waiting for 200ms.
Immediately after submission, should have: 12 tasks total, 0 tasks running, 12 tasks queued...
-> PASSED!
300ms later, should still have: 12 tasks total, 0 tasks running, 12 tasks queued...
-> PASSED!
Unpausing pool.
Task 1 done.
Task 2 done.
Task 3 done.
Task 0 done.
300ms later, should have: 8 tasks total, 4 tasks running, 4 tasks queued...
-> PASSED!
Pausing pool and using wait_for_tasks() to wait for the running tasks.
Task 7 done.
Task 5 done.
Task 6 done.
Task 4 done.
After waiting, should have: 4 tasks total, 0 tasks running, 4 tasks queued...
-> PASSED!
200ms later, should still have: 4 tasks total, 0 tasks running, 4 tasks queued...
-> PASSED!
Unpausing pool and using wait_for_tasks() to wait for all tasks.
Task 9 done.
Task 8 done.
Task 10 done.
Task 11 done.
After waiting, should have: 0 tasks total, 0 tasks running, 0 tasks queued...
-> PASSED!
Resetting pool to 24 threads.

=======================================
Checking that exception handling works:
=======================================
-> PASSED!

============================================================
Testing that matrix operations produce the expected results:
============================================================
Using matrices of size 240x240 with a total of 57600 elements.
Adding two matrices (single-threaded)...
Adding two matrices (multithreaded)...
Comparing the results...
-> PASSED!
Transposing a matrix (single-threaded)...
Transposing a matrix (multithreaded)...
Comparing the results...
-> PASSED!
Multiplying two matrices (single-threaded)...
Multiplying two matrices (multithreaded)...
Comparing the results...
-> PASSED!

++++++++++++++++++++++++++++++
SUCCESS: Passed all 46 checks!
++++++++++++++++++++++++++++++

Performance tests

If all checks passed, thread_pool_test.cpp will perform benchmarking of multithreaded matrix operations. Here we will present the results obtained with two different systems.

The first test was performed on a high-end desktop computer equipped with a 12-core / 24-thread AMD Ryzen 9 3900X CPU at 3.8 GHz and 32 GB of DDR4 RAM at 3600 MHz, compiled using GCC v11.2.0 on Windows 10 build 19043.1165 with the -O3 compiler flag. The thread pool used 22 out of 24 threads, leaving 2 threads free for the operating system - which in our tests increased performance, presumably since all 22 threads could be dedicated entirely to the test. The output was as follows:

===================================
Performing matrix performance test:
===================================
Using 22 out of 24 threads.
Determining the optimal sleep duration........................
Result: The optimal sleep duration is 300 microseconds.

Adding two 4400x4400 matrices 20 times:
With   1  task, mean execution time was   39.3 ms with standard deviation  2.4 ms.
With   5 tasks, mean execution time was   21.2 ms with standard deviation  1.7 ms.
With  11 tasks, mean execution time was   20.4 ms with standard deviation  1.1 ms.
With  22 tasks, mean execution time was   18.3 ms with standard deviation  1.3 ms.
With  44 tasks, mean execution time was   17.4 ms with standard deviation  0.7 ms.
With  88 tasks, mean execution time was   18.0 ms with standard deviation  1.0 ms.
Maximum speedup obtained: 2.3x.

Transposing one 4400x4400 matrix 20 times:
With   1  task, mean execution time was  139.8 ms with standard deviation  3.0 ms.
With   5 tasks, mean execution time was   38.2 ms with standard deviation  2.4 ms.
With  11 tasks, mean execution time was   23.3 ms with standard deviation  1.8 ms.
With  22 tasks, mean execution time was   18.9 ms with standard deviation  1.6 ms.
With  44 tasks, mean execution time was   19.5 ms with standard deviation  1.5 ms.
With  88 tasks, mean execution time was   18.1 ms with standard deviation  0.7 ms.
Maximum speedup obtained: 7.7x.

Multiplying two 550x550 matrices 20 times:
With   1  task, mean execution time was  165.2 ms with standard deviation  2.5 ms.
With   5 tasks, mean execution time was   35.9 ms with standard deviation  1.0 ms.
With  11 tasks, mean execution time was   17.6 ms with standard deviation  0.5 ms.
With  22 tasks, mean execution time was   10.2 ms with standard deviation  0.7 ms.
With  44 tasks, mean execution time was   16.1 ms with standard deviation  1.4 ms.
With  88 tasks, mean execution time was   15.4 ms with standard deviation  0.7 ms.
Maximum speedup obtained: 16.2x.

Generating random 4400x4400 matrix 20 times:
With   1  task, mean execution time was  244.7 ms with standard deviation  2.6 ms.
With   5 tasks, mean execution time was   51.5 ms with standard deviation  1.5 ms.
With  11 tasks, mean execution time was   25.7 ms with standard deviation  0.9 ms.
With  22 tasks, mean execution time was   19.1 ms with standard deviation  2.7 ms.
With  44 tasks, mean execution time was   17.2 ms with standard deviation  2.1 ms.
With  88 tasks, mean execution time was   15.8 ms with standard deviation  1.0 ms.
Maximum speedup obtained: 15.5x.

Overall, multithreading provided speedups of up to 16.2x.

+++++++++++++++++++++++++++++++++++++++
Thread pool performance test completed!
+++++++++++++++++++++++++++++++++++++++

Here are some lessons we can learn from these results:

  • For simple element-wise operations such as addition, multithreading improves performance very modestly, only by a factor of 2.3, even when utilizing 22 threads in parallel. This is because compiler optimizations already parallelize simple loops fairly well on their own. Omitting the -O3 optimization flag, we observed a 6.8x speedup for addition. However, the user will most likely be compiling with optimizations turned on anyway.
  • Transposition enjoys a moderate 7.7x speedup with multithreading. Note that transposition requires reading memory is non-sequential order, jumping between the rows of the source matrix, which is why, compared to sequential operations such as addition, it is much slower when single-threaded, and benefits more from multithreading.
  • Matrix multiplication and random matrix generation, which are more complicated operations that cannot be automatically parallelized by compiler optimizations, gain the most out of multithreading - with a very significant speedup by a factor of around 16 on average. Given that the test CPU only has 12 physical cores, and hyperthreading can generally produce no more than a 30% performance improvement, a 16x speedup is about as good as can be expected.
  • Using as many tasks as there are threads almost always provides the best performance. Although in some cases 44 or 88 tasks seem to provide a slightly lower mean execution time compared to 22 tasks, the difference is within less than 1 standard deviation in all cases.

Dual Intel Xeon Gold 6148 (80 threads)

The second test was performed on a Compute Canada node equipped with dual 20-core / 40-thread Intel Xeon Gold 6148 CPUs at 2.4 GHz (for a total of 40 cores and 80 threads) and 202 GB of RAM, compiled using GCC v9.4.0 on CentOS Linux 7.6.1810 with the -O3 compiler flag. The thread pool consisted of 78 threads. The output was as follows:

===================================
Performing matrix performance test:
===================================
Using 78 out of 80 threads.
Determining the optimal sleep duration........................
Result: The optimal sleep duration is 1000 microseconds.

Adding two 15600x15600 matrices 20 times:
With   1  task, mean execution time was  846.1 ms with standard deviation 40.2 ms.
With  19 tasks, mean execution time was   88.1 ms with standard deviation  8.6 ms.
With  39 tasks, mean execution time was   73.5 ms with standard deviation  4.8 ms.
With  78 tasks, mean execution time was   67.3 ms with standard deviation  2.2 ms.
With 156 tasks, mean execution time was   64.9 ms with standard deviation  2.3 ms.
With 312 tasks, mean execution time was   65.8 ms with standard deviation  1.5 ms.
Maximum speedup obtained: 13.0x.

Transposing one 15600x15600 matrix 20 times:
With   1  task, mean execution time was 1689.4 ms with standard deviation 75.3 ms.
With  19 tasks, mean execution time was  155.3 ms with standard deviation 19.7 ms.
With  39 tasks, mean execution time was  115.0 ms with standard deviation 10.8 ms.
With  78 tasks, mean execution time was   99.0 ms with standard deviation  6.0 ms.
With 156 tasks, mean execution time was   96.2 ms with standard deviation  1.6 ms.
With 312 tasks, mean execution time was   97.8 ms with standard deviation  1.7 ms.
Maximum speedup obtained: 17.6x.

Multiplying two 1950x1950 matrices 20 times:
With   1  task, mean execution time was 15415.1 ms with standard deviation 672.5 ms.
With  19 tasks, mean execution time was 1152.5 ms with standard deviation 62.8 ms.
With  39 tasks, mean execution time was  537.9 ms with standard deviation  4.1 ms.
With  78 tasks, mean execution time was  292.3 ms with standard deviation 42.5 ms.
With 156 tasks, mean execution time was  936.4 ms with standard deviation 15.8 ms.
With 312 tasks, mean execution time was  951.2 ms with standard deviation 22.3 ms.
Maximum speedup obtained: 52.7x.

Generating random 15600x15600 matrix 20 times:
With   1  task, mean execution time was 4318.3 ms with standard deviation  6.3 ms.
With  19 tasks, mean execution time was  260.8 ms with standard deviation 15.1 ms.
With  39 tasks, mean execution time was  156.1 ms with standard deviation  1.6 ms.
With  78 tasks, mean execution time was   86.2 ms with standard deviation  1.9 ms.
With 156 tasks, mean execution time was   84.8 ms with standard deviation  0.4 ms.
With 312 tasks, mean execution time was   85.2 ms with standard deviation  1.3 ms.
Maximum speedup obtained: 51.0x.

Overall, multithreading provided speedups of up to 52.7x.

+++++++++++++++++++++++++++++++++++++++
Thread pool performance test completed!
+++++++++++++++++++++++++++++++++++++++

The speedup of around 51.9x on average for matrix multiplication and random matrix generation again matches the estimation of a 30% improvement in performance over the 40 physical CPU cores due to hyperthreading, which indicates that we are once again saturating the maximum possible performance of our system.

Issue and pull request policy

This package is under continuous and active development. If you encounter any bugs, or if you would like to request any additional features, please feel free to open a new issue on GitHub and I will look into it as soon as I can.

Contributions are always welcome. However, I release my projects in cumulative updates after editing them locally on my system, so my policy is not to accept any pull requests. If you open a pull request, and I decide to incorporate it into the code, I will first perform some tests to ensure that the change doesn't break anything, and then merge it into the next release of the project, possibly together with some other changes, and along with a version bump and a corresponding note in CHANGELOG.md with a link to the pull request.

Copyright and citing

Copyright (c) 2021 Barak Shoshany. Licensed under the MIT license.

If you use this library in published research, please cite it as follows:

You can use the following BibTeX entry:

@article{Shoshany2021_ThreadPool,
    archiveprefix = {arXiv},
    author        = {Barak Shoshany},
    doi           = {10.5281/zenodo.4742687},
    eid           = {arXiv:2105.00613},
    eprint        = {2105.00613},
    journal       = {arXiv e-prints},
    keywords      = {Computer Science - Distributed, Parallel, and Cluster Computing, D.1.3, D.1.5},
    month         = {May},
    primaryclass  = {cs.DC},
    title         = {{A C++17 Thread Pool for High-Performance Scientific Computing}},
    year          = {2021}
}
Comments
  • Cannot be used inside the class

    Cannot be used inside the class

    I found out that I can't call threadpool inside class. If I call it in main function in app (like your example) or in dll like this:

    void sleep_half_second(const size_t& i, synced_stream* sync_out)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        sync_out->println("Task ", i, " done.");
    }
    
    BOOL APIENTRY DllMain( HMODULE hModule,
                           DWORD  ul_reason_for_call,
                           LPVOID lpReserved
                         )
    {
        switch (ul_reason_for_call)
        {
        case DLL_PROCESS_ATTACH:
        case DLL_THREAD_ATTACH:
        case DLL_THREAD_DETACH:
        case DLL_PROCESS_DETACH:
            break;
        }
        thread_pool* pool;
        synced_stream sync_out;
        int i = 1;
        pool = new thread_pool(12);
        pool->push_task(sleep_half_second, i, &sync_out);
    
        return TRUE;
    }
    

    It's OK, doesn't generate error c2064. But if using it in class, it still generates error c2064.

    Please help me fix this, I need to call it inside the class

    doubleRsi.zip Thank you very much

    opened by tuandat64 17
  • link errors with 3.2.0

    link errors with 3.2.0

    The following two lines cause link error (using Visual Studio + mingw64 or remote ubuntu g++12) because of multiple defined symbols :

    std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
    std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);
    
    

    Direct compilation (ubuntu g++ or msvc) don't trigger linker problems.

    However, changing these lines to

    inline static std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
    inline static std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);
    
    

    seems more correct, don't hurt already working builds, and fixes the linking error.

    bug 
    opened by reder2000 13
  • Is there anyway to get and pass thread IDs or pointers to tasks?

    Is there anyway to get and pass thread IDs or pointers to tasks?

    I divide data by the number of threads. Each thread will update its divided data. All will merge at the end. By doing that I can avoid using locks. However I need to know which thread is used to run a task. It could be whatever such as thread IDs or thread pointers. Thanks

    opened by nguyenpham 13
  • [TEST] thread_pool_test.cpp

    [TEST] thread_pool_test.cpp

    System information

    • CPU model, architecture, # of cores and threads: 11th Gen Intel® Core™ i7-11390H @ 3.40GHz × 8

    • Operating system: Ubuntu 20.04.4 LTS

    • Name and version of C++ compiler: GCC 9.4.0

    • Full command used for compiling, including all compiler flags: gcc -Wall -Wpedantic -Wextra -Wconversion -Weffc++ thread_pool_test.cpp

    • Thread pool library version: latest

    Log file

    http://codepad.org/uwXjtRzH

    bug 
    opened by brunolnetto 12
  • make worker threads robust and able to catch catastrophic failures for more controlled general application shutdown and post mortem.

    make worker threads robust and able to catch catastrophic failures for more controlled general application shutdown and post mortem.

    Note

    This work builds upon https://github.com/bshoshany/thread-pool/pull/76, hence it is merged as part of this commit chain to the individual commit deltas small, comprehensible and mergeable.

    Adding virtual member function workerthread_main() which is used to wrap every worker thread's worker() method: this prepares us for two features to be submitted:

    1. per thread catastrophic failure handling, where worker threads gain the ability to detect catastrophic failures, e.g. uncaught exceptions and hardware faults, and help to terminate the application in a controlled fashion.

    2. allowing applications to use a derived class which is used to provide advanced per-worker-thread setup and shutdown procedures which are application-specific. (Hence the virtual method.)


    Merge branch 'patch-7' (i.e. https://github.com/bshoshany/thread-pool/pull/76 ) -- patch-7 is a mandatory prerequisite for this work.


    added the get_alive_threads_count() monitoring feature, which tracks the worker threads 'life': this is in preparation of the catastrophic failure handling feature.


    Revision: 598f66ff8f5e183c12f378daca0593f71d43c7ff

    Phase 1 of the per-workerthread catastrophic failure handling code: this contains the platform-generic part, which is able to catch unhandled exceptions and report those to any custom workerthread_main() reporter.

    This is the inverse of commit ad63b02290e4debeeae388bbce925426cb023071 as it was easier to remove the features one-by-one when I was extracting this bit of work from the dev commits, then revert those onto here.

    Originally From SHA-1: 12253b6610ed2f237660abeb91446beab27f63d1

    augmentations:

    • thread workers now wait for a signal to wake up when in 'pause mode', as they always already were in normal run mode.

    [...]

    Note that, at least under Windows 10, threads MAY be nuked silently under fatal/crash/exception conditions, in which case the thread disappears even before it was able to do anything simple, like updating the alive_threads_count count, because the thread worker code simply will cease to run and exist. Fortunately, thread::is_joinable() detects this fact -- which was a very important reason to have wait_for_tasks() be a wait_for_tasks()-timed poll loop, as we CANNOT guarantee on all OSes that worker threads will be able to wake up and act once a severe-enough fault condition has occurred in the application/task code. From our own observations, it already seems sufficient to directly, bluntly, call the standard exit(N) RTL API to have this happen to you: no exception of any kind will be reported then, yet all threads will be joinable as they will have vanished already.

    WARNING: hence, get_alive_threads_count() will be unsafely optimistic and depending on that number at such disparate times will surely cause your application to lock up on exit on semi-random occasions. This is completely independent of the running state, as this is driven by external factors.

    • catch unexpected C++ and SEH/hardware exceptions occurring in your tasks/worker-threads in the outer layers of the worker thread: as this is a catastrophic, fatal, condition anyway (your application state is largely unpredictable by then already), the thread will terminate, but we now have a fighting chance of catching and reporting such errors at least. As C++ and SEH exception handling cannot co-exist in a single function, we have the following call chain, where each wraps the next one: workerthread_main() --> __worker_SEH() --> __worker() --> worker(), where worker() is the core threadpool thread code, waiting for and executing tasks once they arrive.

      When you need special handling of this (fatal) scenario in your application, you can create a derived class and override the workerthread_main() with your own. Observe the code comments when you do to ensure continued proper operation of the threadpool. (__worker_SEH() returns a boolean indicating whether its termination was due to normal or abnormal (i.e. catastrophic failure) termination, while you may pass a string to them, which will now have been filled with the relevant and available error information. The given implementation prints this info the STDERR -- but you can replace that behaviour in your override.


    Revision: 57d067a405c646f8f1e43f1c949b2c57bb7b7b66

    Add support for negative thread_count: 'this few less than number of CPU cores' setup feature.

    From the original: SHA-1: 12253b6610ed2f237660abeb91446beab27f63d1

    • now accepts NEGATIVE thread_counts during pool creation (or reset()) to create a pool that's occupying all CPU cores MINUS thread_count. This is useful when you want the pool to occupy ALMOST all cores, but leave one or more for other, unrelated, jobs.

    ... Which is a feature due to the results from SHA-1: de3247306c4ae9e370e740579deb6e235ef96b9a: see https://github.com/bshoshany/thread-pool/pull/74 and https://github.com/bshoshany/thread-pool/pull/75 for that bit of the work.

    This is produced as a revert of commit 3109b5c146f1ce50855b92ba57a8324a160588b3 as it was easier to remove features from a existing and tested-ok rig than build up from scratch on a new tree.

    Revision: 635472fff414f6b3d82fa98aaeb7147ddc70ba02

    wait_for_tasks(): remove the need to use YIELD by slowly growing the poll period until it hits a preconfigured ceiling (1 second); this code is generally only relevant in abnormal application terminations, but helps to keep it civil at those times.

    This reverts commit f7551e41cd5c85d724dfb47b3d94ec03afc76798 as it was easier to remove features from an existing and known-good code rig than build up from scratch on a different tree, so produced a remove patch there and then reverted it onto here. *Ach.*🤷‍♀️

    Revision: 34b6aa53e3453ea39209e6510b945f16b9e10dd7

    add the original ASSERTion checks, which document and test the implicit assumptions at critical places in the code. We explicitly decide NOT to use STD RTL <assert.h> et al as those are very inflexible, not allowing one, for example, to continue debugging an application when an assertion fails. These are engineered to be replaced by application/test specific macros; their default implementation (on Windows at least) will wake up your attached debugger -- if you were running any.

    Works nicely for me at least with a MSVC2019 dev rig. 😄

    Again, this is another inverse of a commit: 72424f2c65e1b2cbe36c781b5dc35971b66d5417. It was easier to remove features from a known-good rig and then revert those commits onto here.🤷‍♀️

    Revision: 97b10acc32ac21b50089a31e7a1c5b919f3add59

    Phase 2 of the per-threadworker catastrophic failure handling and controlled termination: adding MSWindows-specific SEH handling, which allows an application to catch and process hardware failures of various kinds, e.g. bus errors and DIV/0. This allows advanced applications to cope with internal failures and shut down in a controlled fashion - even while the state of a large chunk of the application may arguably already be 'in limbo'.

    "adding the MSVC-specific extra SEH catastrophic failure handling code. Based on the already-present generic skeleton rig, so the change is minimal. This basically fleshes out the __worker_SEH() wrapper method."

    This reverts commit 4c0774a860a1534aae19424893dfe619b21fcac8. It was easier to remove features from a known-good rig than to build up from scratch on a different tree, so we created the removal commit there, then reverted that one onto here.

    Revision: bb550265245caae76e137531baf1b8527c85265b

    Continuation of SHA-1: 59c4193a44d9bdb858a1d66ca90ea9c62bf1331a, adding the omitted original ASSERTion checks, which are applicable now that the SEH handling code in __worker_SEH() has finally been introduced.


    🥴😱 whoopsie! forgot to add the get_alive_threads_count() test which was added to check the new added API. Done now. ✅


    Testing

    This was tested as part of a larger work (see the previous PRs) after hunting down shutdown issues (application lockups, etc.) in a large application.

    Tested this code via your provided test code rig; see my own fork and the referenced commits which point into there.

    Tested on AMD Ryzen 3700X, 128GB RAM, latest Win10/64, latest MSVC2019 dev environment. Using in-house project files which use a (in-house) standardized set of optimizations.

    Additional information

    TBD

    The patches are hopefully largely self-explanatory. Where deemed useful, the original commit messages from the dev fork have been referenced and included.

    opened by GerHobbelt 9
  • [BUG] VS 2022 Win32 crash (compiler bug ?)

    [BUG] VS 2022 Win32 crash (compiler bug ?)

    Describe the bug

    This simple code crashes with VS 2022 win32 target BUT it works with same compiler x64

    image

    Minimal working example

        thread_pool pool;
    

    "A breakpoint statement (__debugbreak() statement or similar call) was executed in NelSon-gui.exe."

    Behavior

    Code works perfectly on x64 target but same crashs on win32 target

    System information

    • CPU model, architecture, # of cores and threads: AMD 4800H
    • Operating system: Windows 10
    • Name and version of C++ compiler: VS 2022 Version 17.2.1
    • Full command used for compiling, including all compiler flags:
    /permissive- /MP /ifcOutput "Release\" /GS /GL /analyze- /W3 /Gy /Zc:wchar_t /I"D:\Developpements\Github\nelson\modules/os_functions/src/include" /I"D:\Developpements\Github\nelson\modules/dynamic_link/src/include" /I"D:\Developpements\Github\nelson\modules/characters_encoding/src/include" /I"D:\Developpements\Github\nelson\modules/i18n/src/include" /I"D:\Developpements\Github\nelson\modules/interpreter/src/include" /I"D:\Developpements\Github\nelson\modules/types/src/include" /I"D:\Developpements\Github\nelson\modules/api_nelson/src/include" /I"D:\Developpements\Github\nelson\modules/stream_manager/src/include" /I"D:\Developpements\Github\nelson\modules/error_manager/src/include" /I"D:\Developpements\Github\nelson\modules/nelson_manager/src/include" /I"D:\Developpements\Github\nelson\../NelSon-thirdparty-Win32/Boost" /Zi /Gm- /O2 /sdl /Fd"Release\vc143.pdb" /Zc:inline /fp:precise /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "_USRDLL" /D "NLSOS_FUNCTIONS_EXPORTS" /D "_WINDLL" /D "_UNICODE" /D "UNICODE" /errorReport:prompt /WX- /Zc:forScope /Gd /Oy- /Oi /MD /openmp /std:c++17 /FC /Fa"Release\" /EHsc /nologo /Fo"Release\" /Fp"Release\libnlsOs_functions.pch" /diagnostics:column 
    
    • Thread pool library version: github master

    (Please note that only the latest version of the thread pool library is supported.)

    Additional information

    No sure but it must be more a compiler bug that code :( Reported for feedback and eventualy a workaround

    bug 
    opened by Nelson-numerical-software 8
  • Compiling on Amazon Linux

    Compiling on Amazon Linux

    Describe the bug

    I am trying to compile my code which uses your thread_pool library. The code compiles and runs without any problems on my own PC (WSL2 and GCC). When I compile on Amazon Linux 2, I get the errors attached in the file, during compile time. The code and compile commands are exactly the same. Since your library is based on C++17, I wonder why there is a compile error. Any suggestions for the reasons for the errors?

    Minimal working example

    not yet.

    Behavior

    errors.txt

    System information

    • CPU model, architecture, # of cores and threads: AWS EC2 t2.micro
    • Operating system: Amazon Linux 2
    • Name and version of C++ compiler: GCC
    • Full command used for compiling, including all compiler flags: -std=c++17 -march=native -O3 -pthread
    • Thread pool library version: 3.0.0

    Additional information

    same compile errors between versions 2 and 3.

    bug 
    opened by danchitnis 7
  • How to build using Microsoft Visual Studio 2019 ?

    How to build using Microsoft Visual Studio 2019 ?

    I have not written any code yet. Simply, I want to (first) build and run your thread_pool_test.cpp file using MSVS 2019 (lastest update 16.11.9)

    Below, I'm listing the first few compiler errors, and I also provide the compiler switches that are set (and STD++17 is set).

    Thanks in advance for help!

    Manfred Sever.

    The top of the compiler output (errors) is: 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(176,24): error C2039: 'scoped_lock': is not a member of 'std' 1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\queue(22): message : see declaration of 'std' 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(172,10): message : This diagnostic occurred in the compiler generated function 'void thread_pool::push_task(const F &)' 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,20): error C2039: 'scoped_lock': is not a member of 'std' 1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\queue(22): message : see declaration of 'std' 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,1): error C4430: missing type specifier - int assumed. Note: C++ does not support default-int 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,32): error C2146: syntax error: missing ';' before identifier 'lock' 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,32): error C2672: 'lock': no matching overloaded function found 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,1): error C2780: 'void std::lock(_Lock0 &,_Lock1 &,_LockN &...)': expects 3 arguments - 1 provided 1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\mutex(427): message : see declaration of 'std::lock' 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,20): error C2039: 'scoped_lock': is not a member of 'std' 1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\queue(22): message : see declaration of 'std' 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,1): error C4430: missing type specifier - int assumed. Note: C++ does not support default-int 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,32): error C2146: syntax error: missing ';' before identifier 'lock' 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,32): error C2672: 'lock': no matching overloaded function found 1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,1): error C2780: 'void std::lock(_Lock0 &,_Lock1 &,_LockN &...)': expects 3 arguments - 1 provided

    The compilter options I have set are: /JMC /permissive- /ifcOutput "x64\Debug" /GS /W4 /Zc:wchar_t /ZI /Gm- /O2 /sdl /Fd"x64\Debug\vc142.pdb" /Zc:inline /fp:precise /D "_DEBUG" /D "_CONSOLE" /D "_UNICODE" /D "UNICODE" /errorReport:prompt /WX- /Zc:forScope /RTC1 /Gd /MDd /std:c++17 /FC /Fa"x64\Debug" /EHsc /nologo /Fo"x64\Debug" /Fp"x64\Debug\testThreadPool.pch" /diagnostics:column

    opened by MDMS24 7
  • Replaced user-configured sleep time with condition variables

    Replaced user-configured sleep time with condition variables

    Describe the changes

    I thought it would be useful to do away with the arbitrary sleeps in this library. I've replaced the sleeps with std::condition_variables.

    • In the case when a worker thread is idle and waiting for more work, the worker thread will wait on a condition variable (task_available_condition_variable) to be notified once work is added.
    • In the case of wait_for_tasks, the thread will wait on a different condition variable (tasks_done_condition_variable). Each time a worker thread finishes some work, it will notify on this condition variable, possibly waking up wait_for_tasks if all of the work is actually completed.
    • In the case of parallelize_loop, yet another condition variable (loop_done_condition_variable; this time, a local variable) is waited on to synchronize the blocks of the loop. Once a block of the loop is completed, if it was the last block, it will notify the condition variable.

    paused as a public data member of the class posed an issue with the condition variables. If the user un-pauses execution, the pool should wake up all worker threads. For this reason, I made paused private and provided public functions to modify the value.

    Previously, tasks was protected by an std::mutex (now named tasks_mutex) and tasks_total was an std::atomic. Since tasks_total is tested under the condition variable, it made sense to make it no longer atomic and to move it under the protection of the tasks_mutex.

    I adapted thead_pool_test.cpp for the changes to the interface.

    The file thead_pool_test.cpp used carriage returns and linefeeds for line endings, while thread_pool.hpp only used linefeeds. I removed the carriage returns from thread_pool_test.cpp to make them consistent.

    Testing

    Have you tested the new code using the provided automated test program thread_pool_test.cpp and/or performed any other tests to ensure that it works correctly? If so, please provide information about the test system(s):

    Yes, I built thead_pool_test.cpp and ran it hundreds of times (to hopefully cover some different possible thread-execution-order possibilities) built by two different compilers:

    • Compiler g++ version 8.4.0, flags -std=c++17 -pthread -O3
    • Compiler MSVC 14.29, with flag \std:c++17 in default debug mode (\Od)
    • Compiler MSVC 14.29, with flag \std:c++17 in default release mode (\O2)

    I also created my own little load testing tool which creates a bunch of tasks (which just sleep) with random delays.

    Machine details:

    • Intel I7-8700K
    • Windows 10 (and Ubuntu 18.04 running in WSL 1)
    opened by SandSnip3r 7
  • Use semaphores instead of sleeping

    Use semaphores instead of sleeping

    Have you considered using a semaphore to wait for available tasks instead of using a sleep? I'm not sure if it's a better method, more interested in your opinion on the matter more than anything! Lightweight semaphore here: https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h

    This article is interesting, particularly point 2: https://preshing.com/20150316/semaphores-are-surprisingly-versatile/

    opened by tommitytom 7
  • Suggestion about wake-up mechanism of worker

    Suggestion about wake-up mechanism of worker

    Currently, threading worker function wake-up mechanism is kind of a polling way which use sleep_or_yield() to yield CPU.

      void worker() {
        while (running) {
          std::function<void()> task;
          if (!paused && pop_task(task)) {
            task();
            tasks_total--;
          } else {
            sleep_or_yield();
          }
        }
      }
    

    How about to use std::condition_variable wait-notification wake-up mechanism? Thanks. Something like below: worker function changed to wait for notification

      void worker() {
        while (running) {
          std::unique_lock lock(queue_mutex);
          tasks_cv.wait(lock, [this] {
            return !this->is_tasks_queue_empty() || !this->running();
          });
          std::function<void()> task;
          if (!paused_ && pop_task(task)) {
            task();
            tasks_total--;
          }
        }
      }
    

    wait up worker by cv notification when task coming or thread pool reset or thread pool shutdown

      template <typename F>
      void push_task(const F &task) {
        tasks_total++;
        {
          const std::scoped_lock lock(queue_mutex);
          tasks.push(std::function<void()>(task));
        }
        tasks_cv.notify_all();
      }
    
      void reset(const ui32 &_thread_count = std::thread::hardware_concurrency()) {
        bool was_paused = paused;
        paused = true;
        wait_for_tasks();
        running = false;
        tasks_cv.notify_all();
        destroy_threads();
        thread_count =
            _thread_count ? _thread_count : std::thread::hardware_concurrency();
        threads.reset(new std::thread[thread_count]);
        paused = was_paused;
        create_threads();
        running = true;
      }
    
      ~thread_pool() {
        wait_for_tasks();
        running = false;
        tasks_cv.notify_all();
        destroy_threads();
      }
    
    opened by BenjaminChi007 7
  • Remove unnecessary copies of submitted functions to the threadpool.

    Remove unnecessary copies of submitted functions to the threadpool.

    Pull request policy (please read)

    Contributions are always welcome. However, I release my projects in cumulative updates after editing and testing them locally on my system, so my policy is not to accept any pull requests. If you open a pull request, and I decide to incorporate your suggestion into the project, I will first modify your code to comply with the project's coding conventions (formatting, syntax, naming, comments, programming practices, etc.), and perform some tests to ensure that the change doesn't break anything. I will then merge it into the next release of the project, possibly together with some other changes. The new release will also include a note in CHANGELOG.md with a link to your pull request, and modifications to the documentation in README.md as needed.

    Describe the changes

    Currently, the threadpool performs two unnecessary copies: one inside submit(), and another inside push_task.

    This PR addresses these unnecessary copies by adding std::move to the two std::functions created in these methods when they're used.

    I added a test for submit() that covers these unnecessary copies. Please note that the submitted closures must remain copyable because they're put into std::function.

    Testing

    Have you tested the new code using the provided automated test program and/or performed any other tests to ensure that it works correctly? If so, please provide information about the test system(s):

    I ran both versions of tests, and wrote a new test that ensures the copies are actually avoided.

    • CPU model, architecture, # of cores and threads:
    • Operating system: Windows 10 Enterprise, version 10.0.19044
    • Name and version of C++ compiler: Visual Studio 2019, version 16.11.19
    • Full command used for compiling, including all compiler flags: cl BS_thread_pool_test.cpp /std:c++17 /permissive- /O2 /W4 /EHsc /Fe:BS_thread_pool_test.exe, cl BS_thread_pool_light_test.cpp /std:c++17 /permissive- /O2 /W4 /EHsc /Fe:BS_thread_pool_light_test.exe

    Additional information

    Example code that will benefit from this change:

    
    int main() {
        BS::thread_pool pool(12);
        pool.submit(
            [big_data {std::vector<int>(100'000)}] {
    	    // do stuff
    	    return big_data.size();
        }).get();
        return 0;
    }
    
    opened by radekvit 1
Releases(v3.3.0)
  • v3.3.0(Aug 3, 2022)

    v3.3.0 (2022-08-03)

    • BS_thread_pool.hpp:
      • The public member variable paused of BS::thread_pool has been made private for future-proofing (in case future versions implement a more involved pausing mechanism) and better encapsulation. It is now accessible only via the pause(), unpause(), and is_paused() member functions. In other words:
        • Replace pool.paused = true with pool.pause().
        • Replace pool.paused = false with pool.unpause().
        • Replace if (pool.paused) (or similar) with if (pool.is_paused()).
      • The public member variable f of BS::multi_future has been renamed to futures for clarity, and has been made private for encapsulation and simplification purposes. Instead of operating on the vector futures itself, you can now use the [] operator of the BS::multi_future to access the future at a specific index directly, or the push_back() member function to append a new future to the list. The size() member function tells you how many futures are currently stored in the object.
      • The explicit casts of std::endl and std::flush, added in v3.2.0 to enable flushing a BS::synced_stream, caused ODR (One Definition Rule) violations if BS_thread_pool.hpp was included in two different translation units, since they were mistakenly not defined as inline. To fix this, I decided to make them static members of BS::synced_stream instead of global variables, which also makes the code better organized in my opinion. These objects can now be accessed as BS::synced_stream::endl and BS::synced_stream::flush. I also added an example for how to use them in README.md. See #64.
    • BS_thread_pool_light.hpp:
      • This package started out as a very lightweight thread pool, but over time has expanded to include many additional features, and at the time of writing it has a total of 340 lines of code, including all the helper classes. Therefore, I have decided to bundle a light version of the thread pool in a separate and stand-alone header file, BS_thread_pool_light.hpp, with only 170 lines of code (half the size of the full package). This file does not contain any of the helper classes, only a new BS::thread_pool_light class, which is a minimal thread pool with only the 5 most basic member functions:
        • get_thread_count()
        • push_loop()
        • push_task()
        • submit()
        • wait_for_tasks()
      • A separate test program BS_thread_pool_light_test.cpp tests only the features of the lightweight BS::thread_pool_light class. In the spirit of minimalism, it does not generate a log file and does not do any benchmarks.
      • To be perfectly clear, each header file is 100% stand-alone. If you wish to use the full package, you only need BS_thread_pool.hpp, and if you wish to use the light version, you only need BS_thread_pool_light.hpp. Only a single header file needs to be included in your project.
    Source code(tar.gz)
    Source code(zip)
  • v3.2.0(Jul 29, 2022)

    v3.2.0 (2022-07-28)

    • BS_thread_pool.hpp:
      • Main BS::thread_pool class:
        • Added a new member function, push_loop(), which does the same thing as parallelize_loop(), except that it does not return a BS::multi_future with the futures for each block. Just like push_task() vs. submit(), this avoids the overhead of creating the futures, but the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
        • push_task() and submit() now utilize perfect forwarding in order to support more types of tasks - in particular member functions, which in previous versions could not be submitted unless wrapped in a lambda. To submit a member function, use the syntax submit(&class::function, &object, args). More information can be found in README.md. See #9.
        • push_loop() and parallelize_loop() now have overloads where the first argument (the first index in the loop) is omitted, in which case it is assumed to be 0. This is for convenience, as the case where the first index is 0 is very common.
      • Helper classes:
        • BS::synced_stream now utilizes perfect forwarding in the member functions print() and println().
        • Previously, it was impossible to pass the flushing manipulators std::endl and std::flush to print() and println(), since the compiler could not figure out which template specializations to use. The new objects BS::endl and BS::flush are explicit casts of these manipulators, whose sole purpose is to enable passing them to print() and println().
        • BS::multi_future::get() now rethrows exceptions generated by the futures, even if the futures return void. See #62.
        • Added a new helper class, BS::blocks, which is used by parallelize_loop() and push_loop() to divide a range into blocks. This class is not documented in README.md, as it most likely will not be of interest to most users, but it is still publicly available, in case you want to parallelize something manually but still benefit from the built-in algorithm for splitting a range into blocks.
    • BS_thread_pool_test.cpp:
      • Added plenty of new tests for the new features described above.
      • Fixed a bug in count_unique_threads() that caused it to get stuck on certain systems.
      • dual_println() now also flushes the stream using BS::endl, so that if the test gets stuck, the log file will still contain everything up to that point. (Note: It is a common misconception that std::endl and '\n' are interchangeable. std::endl not only prints a newline character, it also flushes the stream, which is not always desirable, as it may reduce performance.)
      • The performance test has been modified as follows:
        • Instead of generating random vectors using std::mersenne_twister_engine, which proved to be inconsistent across different compilers and systems, the test now generates each element via an arbitrarily-chosen numerical operation. In my testing, this provided much more consistent results.
        • Instead of using a hard-coded vector size, a suitable vector size is now determined dynamically at runtime.
        • Instead of using parallelize_loop(), the test now uses the new push_loop() function to squeeze out a bit more performance.
        • Instead of setting the test parameters to achieve a fixed single-threaded mean execution time of 300 ms, the test now aims to achieve a fixed multi-threaded mean execution time of 50 ms when the number of blocks is equal to the number of threads. This allows for more reliable results on very fast CPUs with a very large number of threads, where the mean execution time when using all the threads could previously be below a statistically significant value.
        • The number of vectors is now restricted to be a multiple of the number of threads, so that the blocks are always all of the same size.
    • README.md:
      • Added instructions and examples for the new features described above.
      • Rewrote the documentation for parallelize_loop() to make it clearer.
    Source code(tar.gz)
    Source code(zip)
  • v3.1.0(Jul 13, 2022)

    v3.1.0 (2022-07-13)

    • BS_thread_pool.hpp:
      • Fixed an issue where wait_for_tasks() would sometimes get stuck if push_task() was executed immediately before wait_for_tasks().
      • Both the thread pool constructor and the reset() member function now determine the number of threads to use in the pool as follows. If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied, then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns a non-positive number for some reason, then the pool will be created with just one thread. See #51 and #52.
      • Added the [[nodiscard]] attribute to classes and class members, in order to warn the user when accidentally discarding an important return value, such as a future or the return value of a function with no useful side-effects. For example, if you use submit() and don't save the future it returns, the compiler will now generate a warning. (If a future is not needed, then you should use push_task() instead.)
      • Removed the explicit specifier from all constructors, as it prevented the default constructor from being used with static class members. See #48.
    • BS_thread_pool_test.cpp:
      • Improved count_unique_threads() using condition variables, to ensure that each thread in the pool runs at least one task regardless of how fast it takes to run the tasks.
      • When appropriate, check() now explicitly reports what the obtained result was and what it was expected to be.
      • check_task_monitoring() and check_pausing() now explicitly report the results of the monitoring at each step.
      • Changed all instances of std::vector<std::atomic<bool>> to std::unique_ptr<std::atomic<bool>[]>. See #44.
      • Converted a few more C-style casts to C++ cast expressions.
    • README.md:
      • Added instructions for using this package with the Conan C/C++ package manager. Please refer to this package's page on ConanCenter to learn how to use Conan to include this package in your project with various build systems.
    • If you found this project useful, please consider starring it on GitHub! This allows me to see how many people are using my code, and motivates me to keep working to improve it.
    Source code(tar.gz)
    Source code(zip)
  • v3.0.0(May 31, 2022)

    v3.0.0 (2022-05-30)

    • This is a major new release with many changes and improvements! Please note that code written using previous releases will need to be slightly modified to work with the new release. The changes needed to migrate to the new API are explicitly indicated below for your convenience.
    • Breaking changes to the library header file:
      • The header file has been renamed to BS_thread_pool.hpp to avoid potential conflict with other thread pool libraries.
        • API migration: The library must now be included by invoking #include "BS_thread_pool.hpp".
      • All the definitions in the library, including the thread_pool class and the helper classes, are now located in the namespace BS. This namespace will also be used for my other C++ projects, and is intended to ensure consistency between my projects while avoiding potential name conflicts with other libraries.
        • API migration: The thread pool class should now be invoked as BS::thread_pool. Alternatively, it is possible to employ using BS::thread_pool or even using namespace BS and then invoke thread_pool directly. Same for the BS::synced_stream and BS::timer helper classes.
      • The macro THREAD_POOL_VERSION, which contains the version number and release date of the library, has been renamed to BS_THREAD_POOL_VERSION to avoid potential conflicts.
        • API migration: The version must now be read from the macro BS_THREAD_POOL_VERSION.
      • The public member sleep_duration has been removed. The thread pool now uses condition variables instead of sleep to facilitate waiting. This significantly improves performance (by 10%-50% in my testing), drastically decreases idle CPU utilization, and eliminates the need to set an optimal sleep time. This was a highly-requested change; see issue #1, issue #12, and pull request #23.
        • API migration: Remove any code that relates to the public member sleep_duration.
      • The template specializations for submit() have been merged. Now instead of two versions, one for functions with a return value and one for functions without a return value, there is just one version, which can accept any function. This makes the code more compact (and elegant). If a function with no return value is submitted, an std::future<void> is returned (the previous version returned an std::future<bool>)
        • API migration: To wait for a task with no return value, simply call wait() or get() on the corresponding std::future<void>.
      • parallelize_loop() now returns a future in the form of a new BS::multi_future helper class template. The member function wait() of this future allows waiting until all of the loop's blocks finish executing. In previous versions, calling parallelize_loop() both parallelized the loop and waited for the blocks to finish; now it is possible to do other stuff while the loop executes.
        • API migration: Since parallelize_loop() no longer automatically blocks, you should either store the result in a BS::multi_future object and call its wait() member function, or simply call parallelize_loop().wait() to reproduce the old behavior.
    • Non-breaking changes to the library header file:
      • It is now possible to use parallelize_loop() with functions that have return values and get these values from all blocks at once through the get() member function of the BS::multi_future.
      • The template specializations for push_task() have been merged. Now instead of two versions, one for functions with arguments and one for functions without arguments, there is just one version, which can accept any function.
      • Constructors have been made explicit. See issue #28.
      • submit() now uses std::make_shared instead of new to create the shared pointer. This means only one memory allocation is performed instead of two, which should improve performance. In addition, all unique pointers are now created using std::make_unique.
      • A new helper class template, BS::multi_future, has been added. It's basically just a wrapper around std::vector<std::future<T>>. This class is used by the new implementation of parallelize_loop() to allow waiting for the entire loop, consisting of multiple tasks with their corresponding futures, to finish executing.
      • BS::multi_future can also be used independently to handle multiple futures at once. For example, you can now keep track of several groups of tasks by storing their futures inside separate BS::multi_future objects and use either wait() to wait for all tasks in a specific group to finish or get() to get an std::vector with the return values of every task in the group.
      • Integer types are now chosen in a smarter way to improve portability, allow for better compatibility with 32-bit systems, and prevent potential conversion errors.
      • Added a new type, BS::concurrency_t, equal to the return type of std::thread::hardware_concurrency(). This is probably pointless, since the C++ standard requires this to be unsigned int, but it seems to me to make the code slightly more portable, in case some non-conforming compiler chooses to use a different integer type.
      • C-style casts have been converted to C++ cast expressions for added clarity.
      • Miscellaneous minor optimizations and style improvements.
    • Changes to the test program:
      • The program has been renamed to BS_thread_pool_test.cpp to avoid potential conflict with other thread pool libraries.
      • The program now returns EXIT_FAILURE if any of the tests failed, for automation purposes. See pull request #42.
      • Fixed incorrect check order in check_task_monitoring(). See pull request #43.
      • Added a new test for parallelize_loop() with a return value.
      • Improved some of the tests to make them more reliable. For example, count_unique_threads() now uses futures (stored in a BS::multi_future<void> object).
      • The program now uses std::vector instead of matrices, for both consistency checks and benchmarks, in order to simplify the code and considerably reduce its length.
      • The benchmarks have been simplified. There's now only one test: filling a specific number of vectors of fixed size with random values. This may be replaced with something more practical in a future released, but at least on the systems I've tested on, it does demonstrate a very significant multi-threading speedup.
      • In addition to multi-threaded tests with different numbers of tasks, the benchmark now also includes a single-threaded test. This allows for more accurate benchmarks compared to previous versions, as the (slight) parallelization overhead is now taken into account when calculating the maximum speedup.
      • The program decides how many vectors to use for benchmarking by testing how many are needed to reach a target duration in the single-threaded test. This ensures that the test takes approximately the same amount of time on different systems, and is thus more consistent and portable.
      • Miscellaneous minor optimizations and style improvements.
    • Changes to README.md:
      • Many sections have been rewritten and/or polished.
      • Explanations and examples of all the new features have been added.
      • Added an acknowledgements section.
    • Miscellaneous changes:
      • Added a CITATION.bib file (in BibTeX format) to the GitHub repository. You can use it to easily cite this package if you use it in any research papers.
      • Added a CITATION.cff file (in YAML format) to the GitHub repository. This should add an option to get a citation in different formats directly from GitHub repository by clicking on "cite this repository" on the sidebar to the right.
      • Added templates for GitHub issues and pull requests.
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0(Aug 15, 2021)

    • v2.0.0 (2021-08-14)
      • From now on, version numbers will adhere to the Semantic Versioning specification in the format major.minor.patch.
      • A file named thread_pool_test.cpp has been added to the package. It will perform automated tests of all aspects of the package, and benchmark some multithreaded matrix operations. Please run it on your system and submit a bug report if any of the tests fail. In addition, the code is thoroughly documented, and is meant to serve as an extensive example of how to properly use the package.
      • The package is now available through vcpkg. Instructions for how to install it have been added to README.md. See this pull request.
      • The package now defines a macro THREAD_POOL_VERSION, which returns the version number and release date of the thread pool library as a string.
      • parallelize_loop() has undergone some major changes (and is now incompatible with v1.x):
        • The second argument is now the index after the last index, instead of the last index itself. This is more consistent with C++ conventions (e.g. standard library algorithms) where the range is always [first, last). For example, for an array with n indices, instead of parallelize_loop(0, n - 1, ...) you should now write parallelize_loop(0, n, ...).
        • The loop function is now only called once per block, instead of once per index, as was the case before. This should provide a performance boost due to significantly reducing the number of function calls, and it also allows you to conserve resources by using them only once per block instead of once per index (an example can be found in the random_matrix_generator class in thread_pool_test.cpp). It also means that loop now takes two arguments: the first index in the block and the index after the last index in the block. Thus, loop(start, end) should typically involve a loop of the form for (T i = start; i < end; i++).
        • The first and last indices can now be of two different integer types. Previously, parallelize_loop(0, i, ...) did not work if i was not an int, because 0 was interpreted as int, and the two arguments had to be of the same type. Therefore, one had to use casting, e.g. parallelize_loop((size_t)0, i), to make it work. Now this is no longer necessary; the common type is inferred automatically using std::common_type_t.
    Source code(tar.gz)
    Source code(zip)
  • v1.9(Jul 30, 2021)

  • v1.8(Jul 28, 2021)

    • v1.8 (2021-07-28)
      • The version history has become too long to be included in README.md, so I moved it to a separate file, CHANGELOG.md.
      • A button to open this repository directly in Visual Studio Code has been added to the badges in README.md.
      • An internal variable named promise has been renamed to task_promise to avoid any potential errors in case the user invokes using namespace std.
      • submit() now catches exceptions thrown by the submitted task and forwards them to the future. See this issue.
      • Eliminated compiler warnings that appeared when using the -Weffc++ flag in GCC. See this pull request.
    Source code(tar.gz)
    Source code(zip)
  • v1.7(Jun 2, 2021)

  • v1.6(May 26, 2021)

    • Version 1.6 (2021-05-26)
      • Since MSVC does not interpret and as && by default, the previous release did not compile with MSVC unless the /permissive- or /Za compiler flags were used. This has been fixed in this version, and the code now successfully compiles with GCC, Clang, and MSVC. See this pull request.
    Source code(tar.gz)
    Source code(zip)
  • v1.5(May 8, 2021)

    • Version 1.5 (2021-05-07)
      • This library now has a DOI for citation purposes. Information on how to cite it in publications has been added to the source code and to README.md.
      • Added GitHub badges to README.md.
    Source code(tar.gz)
    Source code(zip)
  • v1.4(May 5, 2021)

    • Version 1.4 (2021-05-05)
      • Added three new public member functions to monitor the tasks submitted to the pool:
        • get_tasks_queued() gets the number of tasks currently waiting in the queue to be executed by the threads.
        • get_tasks_running() gets the number of tasks currently being executed by the threads.
        • get_tasks_total() gets the total number of unfinished tasks - either still in the queue, or running in a thread.
        • Note that get_tasks_running() == get_tasks_total() - get_tasks_queued().
        • Renamed the private member variable tasks_waiting to tasks_total to make its purpose clearer.
      • Added an option to temporarily pause the workers:
        • When public member variable paused is set to true, the workers temporarily stop popping new tasks out of the queue, although any tasks already executed will keep running until they are done. Set to false again to resume popping tasks.
        • While the workers are paused, wait_for_tasks() will wait for the running tasks instead of all tasks (otherwise it would wait forever).
        • By utilizing the new pausing mechanism, reset() can now change the number of threads on-the-fly while there are still tasks waiting in the queue. The new thread pool will resume executing tasks from the queue once it is created.
      • parallelize_loop() and wait_for_tasks() now have the same behavior as the worker function with regards to waiting for tasks to complete. If the relevant tasks are not yet complete, then before checking again, they will sleep for sleep_duration microseconds, unless that variable is set to zero, in which case they will call std::this_thread::yield(). This should improve performance and reduce CPU usage.
      • Merged this commit: Fixed weird error when using MSVC and including windows.h.
      • The README.md file has been reorganized and expanded.
    Source code(tar.gz)
    Source code(zip)
  • v1.3(May 4, 2021)

    • Version 1.3 (2021-05-03)
      • Fixed this issue: Removed std::move from the return statement in push_task(). This previously generated a -Wpessimizing-move warning in Clang. The assembly code generated by the compiler seems to be the same before and after this change, presumably because the compiler eliminates the std::move automatically, but this change gets rid of the Clang warning.
      • Fixed this issue: Removed a debugging message printed to std::cout, which was left in the code by mistake.
      • Fixed this issue: parallelize_loop() no longer sends references for the variables start and stop when calling push_task(), which may lead to undefined behavior.
      • A companion paper is now published at arXiv:2105.00613, including additional information such as performance tests on systems with up to 80 hardware threads. The README.md has been updated, and it is now roughly identical in content to the paper.
    Source code(tar.gz)
    Source code(zip)
  • v1.2(Apr 30, 2021)

    • Version 1.2 (2021-04-29)
      • The worker function, which controls the execution of tasks by each thread, now sleeps by default instead of yielding. Previously, when the worker could not find any tasks in the queue, it called std::this_thread::yield() and then tried again. However, this caused the workers to have high CPU usage when idle, as reported by some users. Now, when the worker function cannot find a task to run, it instead sleeps for a duration given by the public member variable sleep_duration (in microseconds) before checking the queue again. The default value is 1000 microseconds, which I found to be optimal in terms of both CPU usage and performance, but your own optimal value may be different.
      • If the constructor is called with an argument of zero for the number of threads, then the default value, std::thread::hardware_concurrency(), is used instead.
      • Added a simple helper class, timer, which can be used to measure execution time for benchmarking purposes.
      • Improved and expanded the documentation.
    Source code(tar.gz)
    Source code(zip)
  • v1.1(Apr 24, 2021)

Owner
Barak Shoshany
Assistant Professor of Physics at Brock University.
Barak Shoshany
Thread pool - Thread pool using std::* primitives from C++17, with optional priority queue/greenthreading for POSIX.

thread_pool Thread pool using std::* primitives from C++11. Also includes a class for a priority thread pool. Requires concepts and C++17, including c

Tyler Hardin 77 Dec 30, 2022
Thread-pool - Thread pool implementation using c++11 threads

Table of Contents Introduction Build instructions Thread pool Queue Submit function Thread worker Usage example Use case#1 Use case#2 Use case#3 Futur

Mariano Trebino 655 Dec 27, 2022
High Performance Linux C++ Network Programming Framework based on IO Multiplexing and Thread Pool

Kingpin is a C++ network programming framework based on TCP/IP + epoll + pthread, aims to implement a library for the high concurrent servers and clie

null 23 Oct 19, 2022
Pool is C++17 memory pool template with different implementations(algorithms)

Object Pool Description Pool is C++17 object(memory) pool template with different implementations(algorithms) The classic object pool pattern is a sof

KoynovStas 1 Nov 18, 2022
A easy to use multithreading thread pool library for C. It is a handy stream like job scheduler with an automatic garbage collector. This is a multithreaded job scheduler for non I/O bound computation.

A easy to use multithreading thread pool library for C. It is a handy stream-like job scheduler with an automatic garbage collector for non I/O bound computation.

Hyoung Min Suh 12 Jun 4, 2022
An easy to use C++ Thread Pool

mvThreadPool (This library is available under a free and permissive license) mvThreadPool is a simple to use header only C++ threadpool based on work

Jonathan Hoffstadt 30 Dec 8, 2022
An ultra-simple thread pool implementation for running void() functions in multiple worker threads

void_thread_pool.cpp © 2021 Dr Sebastien Sikora. [email protected] Updated 06/11/2021. What is it? void_thread_pool.cpp is an ultra-simple

Seb Sikora 1 Nov 19, 2021
EOSP ThreadPool is a header-only templated thread pool writtent in c++17.

EOSP Threadpool Description EOSP ThreadPool is a header-only templated thread pool writtent in c++17. It is designed to be easy to use while being abl

null 1 Apr 22, 2022
Work Stealing Thread Pool

wstpool Work Stealing Thread Pool, Header Only, C++ Threads Consistent with the C++ async/future programming model. Drop-in replacement for 'async' fo

Yasser Asmi 5 Oct 29, 2022
MAN - Man is Thread Pool in C++17

Introduction MAN is a ThreadPool wrote in C++17. The name is chosen because, at least in France, it is said that men are not able to do several things

Antoine MORRIER 6 Mar 6, 2022
ThreadPool - A fastest, exception-safety and pure C++17 thread pool.

Warnings Since commit 468129863ec65c0b4ede02e8581bea682351a6d2, I move ThreadPool to C++17. (To use std::apply.) In addition, the rule of passing para

Han-Kuan Chen 124 Dec 28, 2022
CTPL - Modern and efficient C++ Thread Pool Library

CTPL Modern and efficient C++ Thread Pool Library A thread pool is a programming pattern for parallel execution of jobs, http://en.wikipedia.org/wiki/

null 1.1k Dec 22, 2022
ThreadPool - A simple C++11 Thread Pool implementation

ThreadPool A simple C++11 Thread Pool implementation. Basic usage: // create thread pool with 4 worker threads ThreadPool pool(4); // enqueue and sto

Jakob Progsch 6.1k Jan 7, 2023
A modern thread pool implementation based on C++20

thread-pool A simple, functional thread pool implementation using pure C++20. Features Built entirely with C++20 Enqueue tasks with or without trackin

Paul T 151 Dec 22, 2022
BabyCoin: mining pool

BabyCoin Pool Based on cryptonote-nodejs-pool cryptonote-nodejs-pool High performance Node.js (with native C addons) mining pool for CryptoNote based

null 1 May 15, 2022
Objectpool - Object pool implementation in C++11

Object pool allocator This is a C++11 implementation of an object pool allocator. For more information on object pool allocators and their purpose see

Cameron Hart 68 Nov 3, 2022
A C++ GPU Computing Library for OpenCL

Boost.Compute Boost.Compute is a GPU/parallel-computing library for C++ based on OpenCL. The core library is a thin C++ wrapper over the OpenCL API an

Boost.org 1.4k Jan 5, 2023
Patterns and behaviors for GPU computing

moderngpu 2.0 (c) 2016 Sean Baxter You can drop me a line here Full documentation with github wiki under heavy construction. Latest update: 2.12 2016

null 1.4k Jan 5, 2023
Fidelius - YeeZ Privacy Computing

Fidelius - YeeZ Privacy Computing Introduction In order to empower data collaboration between enterprises and help enterprises use data to enhance the

YeeZTech 59 Dec 9, 2022