A General-purpose Parallel and Heterogeneous Task Programming System

Overview

Taskflow

Windows Build status Ubuntu macOS Windows Wiki TFProf Cite

Taskflow helps you quickly write parallel and heterogeneous task programs in modern C++

Why Taskflow?

Taskflow is faster, more expressive, and easier for drop-in integration than many of existing task programming frameworks in handling complex parallel workloads.

Taskflow lets you quickly implement task decomposition strategies that incorporate both regular and irregular compute patterns, together with an efficient work-stealing scheduler to optimize your multithreaded performance.

Static Tasking Dynamic Tasking

Taskflow supports conditional tasking for you to make rapid control-flow decisions across dependent tasks to implement cycles and conditions that were otherwise difficult to do with existing tools.

Conditional Tasking

Taskflow is composable. You can create large parallel graphs through composition of modular and reusable blocks that are easier to optimize at an individual scope.

Taskflow Composition

Taskflow supports heterogeneous tasking for you to accelerate a wide range of scientific computing applications by harnessing the power of CPU-GPU collaborative computing.

Concurrent CPU-GPU Tasking

Taskflow provides visualization and tooling needed for profiling Taskflow programs.

Taskflow Profiler

We are committed to support trustworthy developments for both academic and industrial research projects in parallel computing. Check out Who is Using Taskflow and what our users say:

See a quick presentation and visit the documentation to learn more about Taskflow. Technical details can be referred to our TPDS paper.

Start Your First Taskflow Program

The following program (simple.cpp) creates four tasks A, B, C, and D, where A runs before B and C, and D runs after B and C. When A finishes, B and C can run in parallel.

#include <taskflow/taskflow.hpp>  // Taskflow is header-only

int main(){
  
  tf::Executor executor;
  tf::Taskflow taskflow;

  auto [A, B, C, D] = taskflow.emplace(  // create four tasks
    [] () { std::cout << "TaskA\n"; },
    [] () { std::cout << "TaskB\n"; },
    [] () { std::cout << "TaskC\n"; },
    [] () { std::cout << "TaskD\n"; } 
  );                                  
                                      
  A.precede(B, C);  // A runs before B and C
  D.succeed(B, C);  // D runs after  B and C
                                      
  executor.run(taskflow).wait(); 

  return 0;
}

Taskflow is header-only and there is no wrangle with installation. To compile the program, clone the Taskflow project and tell the compiler to include the headers.

~$ git clone https://github.com/taskflow/taskflow.git  # clone it only once
~$ g++ -std=c++17 simple.cpp -I taskflow/taskflow -O2 -pthread -o simple
~$ ./simple
TaskA
TaskC 
TaskB 
TaskD

Visualize Your First Taskflow Program

Taskflow comes with a built-in profiler, TFProf, for you to profile and visualize taskflow programs in an easy-to-use web-based interface.

# run the program with the environment variable TF_ENABLE_PROFILER enabled
~$ TF_ENABLE_PROFILER=simple.json ./simple
~$ cat simple.json
[
{"executor":"0","data":[{"worker":0,"level":0,"data":[{"span":[172,186],"name":"0_0","type":"static"},{"span":[187,189],"name":"0_1","type":"static"}]},{"worker":2,"level":0,"data":[{"span":[93,164],"name":"2_0","type":"static"},{"span":[170,179],"name":"2_1","type":"static"}]}]}
]
# paste the profiling json data to https://taskflow.github.io/tfprof/

In addition to execution diagram, you can dump the graph to a DOT format and visualize it using a number of free GraphViz tools.

// dump the taskflow graph to a DOT format through std::cout
taskflow.dump(std::cout); 

Express Task Graph Parallelism

Taskflow empowers users with both static and dynamic task graph constructions to express end-to-end parallelism in a task graph that embeds in-graph control flow.

  1. Create a Subflow Graph
  2. Integrate Control Flow to a Task Graph
  3. Offload a Task to a GPU
  4. Compose Task Graphs
  5. Launch Asynchronous Tasks
  6. Execute a Taskflow
  7. Leverage Standard Parallel Algorithms

Create a Subflow Graph

Taskflow supports dynamic tasking for you to create a subflow graph from the execution of a task to perform dynamic parallelism. The following program spawns a task dependency graph parented at task B.

tf::Task A = taskflow.emplace([](){}).name("A");  
tf::Task C = taskflow.emplace([](){}).name("C");  
tf::Task D = taskflow.emplace([](){}).name("D");  

tf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { 
  tf::Task B1 = subflow.emplace([](){}).name("B1");  
  tf::Task B2 = subflow.emplace([](){}).name("B2");  
  tf::Task B3 = subflow.emplace([](){}).name("B3");  
  B3.succeed(B1, B2);  // B3 runs after B1 and B2
}).name("B");

A.precede(B, C);  // A runs before B and C
D.succeed(B, C);  // D runs after  B and C

Integrate Control Flow to a Task Graph

Taskflow supports conditional tasking for you to make rapid control-flow decisions across dependent tasks to implement cycles and conditions in an end-to-end task graph.

tf::Task init = taskflow.emplace([](){}).name("init");
tf::Task stop = taskflow.emplace([](){}).name("stop");

// creates a condition task that returns a random binary
tf::Task cond = taskflow.emplace(
  [](){ return std::rand() % 2; }
).name("cond");

init.precede(cond);

// creates a feedback loop {0: cond, 1: stop}
cond.precede(cond, stop);

Offload a Task to a GPU

Taskflow supports GPU tasking for you to accelerate a wide range of scientific computing applications by harnessing the power of CPU-GPU collaborative computing using CUDA.

__global__ void saxpy(size_t N, float alpha, float* dx, float* dy) {
  int i = blockIdx.x*blockDim.x + threadIdx.x;
  if (i < n) {
    y[i] = a*x[i] + y[i];
  }
}
tf::Task cudaflow = taskflow.emplace([&](tf::cudaFlow& cf) {

  // data copy tasks
  tf::cudaTask h2d_x = cf.copy(dx, hx.data(), N).name("h2d_x");
  tf::cudaTask h2d_y = cf.copy(dy, hy.data(), N).name("h2d_y");
  tf::cudaTask d2h_x = cf.copy(hx.data(), dx, N).name("d2h_x");
  tf::cudaTask d2h_y = cf.copy(hy.data(), dy, N).name("d2h_y");
  
  // kernel task with parameters to launch the saxpy kernel
  tf::cudaTask saxpy = cf.kernel(
    (N+255)/256, 256, 0, saxpy, N, 2.0f, dx, dy
  ).name("saxpy");

  saxpy.succeed(h2d_x, h2d_y)
       .precede(d2h_x, d2h_y);
}).name("cudaFlow");

Taskflow also supports SYCL, a general-purpose heterogeneous programming model, to program GPU tasks in a single-source C++ environment using the task graph-based approach.

tf::Task syclflow = taskflow.emplace_on([&](tf::syclFlow& sf){
  tf::syclTask h2d_x = cf.copy(dx, hx.data(), N).name("h2d_x");
  tf::syclTask h2d_y = cf.copy(dy, hy.data(), N).name("h2d_y");
  tf::syclTask d2h_x = cf.copy(hx.data(), dx, N).name("d2h_x");
  tf::syclTask d2h_y = cf.copy(hy.data(), dy, N).name("d2h_y");
  tf::syclTask saxpy = sf.parallel_for(sycl::range<1>(N), 
    [=] (sycl::id<1> id) {
      dx[id] = 2.0f * dx[id] + dy[id];
    }
  ).name("saxpy");
  saxpy.succeed(h2d_x, h2d_y)
       .precede(d2h_x, d2h_y);
}, sycl_queue).name("syclFlow");

Compose Task Graphs

Taskflow is composable. You can create large parallel graphs through composition of modular and reusable blocks that are easier to optimize at an individual scope.

tf::Taskflow f1, f2;

// create taskflow f1 of two tasks
tf::Task f1A = f1.emplace([]() { std::cout << "Task f1A\n"; })
                 .name("f1A");
tf::Task f1B = f1.emplace([]() { std::cout << "Task f1B\n"; })
                 .name("f1B");

// create taskflow f2 with one module task composed of f1
tf::Task f2A = f2.emplace([]() { std::cout << "Task f2A\n"; })
                 .name("f2A");
tf::Task f2B = f2.emplace([]() { std::cout << "Task f2B\n"; })
                 .name("f2B");
tf::Task f2C = f2.emplace([]() { std::cout << "Task f2C\n"; })
                 .name("f2C");

tf::Task f1_module_task = f2.composed_of(f1)
                            .name("module");

f1_module_task.succeed(f2A, f2B)
              .precede(f2C);

Launch Asynchronous Tasks

Taskflow supports asynchronous tasking. You can launch tasks asynchronously to incorporate independent, dynamic parallelism in your taskflows.

tf::Executor executor;
tf::Taskflow taskflow;

// create asynchronous tasks directly from an executor
tf::future<std::optional<int>> future = executor.async([](){ 
  std::cout << "async task returns 1\n";
  return 1;
}); 
executor.silent_async([](){ std::cout << "async task of no return\n"; });

// launch an asynchronous task from a running task
taskflow.emplace([&](){
  executor.async([](){ std::cout << "async task within a task\n"; });
});

executor.run(taskflow).wait();

Execute a Taskflow

The executor provides several thread-safe methods to run a taskflow. You can run a taskflow once, multiple times, or until a stopping criteria is met. These methods are non-blocking with a tf::future<void> return to let you query the execution status.

// runs the taskflow once
tf::Future<void> run_once = executor.run(taskflow); 

// wait on this run to finish
run_once.get();

// run the taskflow four times
executor.run_n(taskflow, 4);

// runs the taskflow five times
executor.run_until(taskflow, [counter=5](){ return --counter == 0; });

// block the executor until all submitted taskflows complete
executor.wait_for_all();

Leverage Standard Parallel Algorithms

Taskflow defines algorithms for you to quickly express common parallel patterns using standard C++ syntaxes, such as parallel iterations, parallel reductions, and parallel sort.

// standard parallel CPU algorithms
tf::Task task1 = taskflow.for_each( // assign each element to 100 in parallel
  first, last, [] (auto& i) { i = 100; }    
);
tf::Task task2 = taskflow.reduce(   // reduce a range of items in parallel
  first, last, init, [] (auto a, auto b) { return a + b; }
);
tf::Task task3 = taskflow.sort(     // sort a range of items in parallel
  first, last, [] (auto a, auto b) { return a < b; }
);

// standard parallel GPU algorithms
tf::cudaTask cuda1 = cudaflow.for_each( // assign each element to 100 on GPU
  dfirst, dlast, [] __device__ (auto i) { i = 100; }
);
tf::cudaTask cuda2 = cudaflow.reduce(   // reduce a range of items on GPU
  dfirst, dlast, init, [] __device__ (auto a, auto b) { return a + b; }
);
tf::cudaTask cuda3 = cudaflow.sort(     // sort a range of items on GPU
  dfirst, dlast, [] __device__ (auto a, auto b) { return a < b; }
);

Supported Compilers

To use Taskflow, you only need a compiler that supports C++17:

  • GNU C++ Compiler at least v8.4 with -std=c++17
  • Clang C++ Compiler at least v6.0 with -std=c++17
  • Microsoft Visual Studio at least v19.27 with /std:c++17
  • AppleClang Xode Version at least v12.0 with -std=c++17
  • Nvidia CUDA Toolkit and Compiler (nvcc) at least v11.1 with -std=c++17
  • Intel C++ Compiler at least v19.0.1 with -std=c++17
  • Intel DPC++ Clang Compiler at least v13.0.0 with -std=c++17 and SYCL20

Taskflow works on Linux, Windows, and Mac OS X.

Learn More about Taskflow

Visit our project website and documentation to learn more about Taskflow. To get involved:

CppCon20 Tech Talk MUC++ Tech Talk

We are committed to support trustworthy developments for both academic and industrial research projects in parallel and heterogeneous computing. At the same time, we appreciate all Taskflow contributors!

License

Taskflow is licensed with the MIT License. You are completely free to re-distribute your work derived from Taskflow.


Issues
  • _per_thread() data in separate compilation units

    _per_thread() data in separate compilation units

    When an tf::Executor is created in one translation unit and then used in another translation unit (or shared library?), the per-thread data inside the Executor is invalid. This causes asserts like

    ../bundled/taskflow-2.7.0/include/taskflow/core/executor.hpp:976: void tf::Executor::_invoke_dynamic_work_external(tf::Node*, tf::Graph&, bool): Assertion `worker &&
    worker->executor == this' failed.
    

    to trigger.

    The problem is https://github.com/taskflow/taskflow/blob/9d17ee3fb28ef8b9b92cd28d10d7ac840ad33de8/taskflow/core/executor.hpp#L446 which will create a different copy in each translation unit, also see https://stackoverflow.com/questions/185624/static-variables-in-an-inlined-function for more details.

    I am attaching an example that uses a shared library for this. Compile and run with:

    export LD_LIBRARY_PATH=`pwd`
    # 1. make libfoo.so
    g++ --std=c++14 -Wall -I ~/taskflow/installed/include/ -O2 -pthread -fpic -c other.cc
    g++ -shared -o libfoo.so other.o
    
    # 2. compile a.out and link libfoo.so
    g++ --std=c++14 -Wall -I ~/taskflow/installed/include/ -O2 -pthread -o a.out -L. main.cc -lfoo
    
    # 3. run
    ./a.out
    

    source code to reproduce: taskflow-thread-local-bug.zip

    bug 
    opened by tjhei 25
  • [Question] Is taskflow for_each_index a drop-in replacement for Intel TBB parallel_for ?

    [Question] Is taskflow for_each_index a drop-in replacement for Intel TBB parallel_for ?

    I am taking a look at a project that uses TBB parallel_for for a few loops. When I tried replacing one of these calls with taskflow for_each_index call, the average duration for the loop increased by around 5X.

    opened by boxerab 22
  • It is possible to create a continuous flow?

    It is possible to create a continuous flow?

    The situation I am describing is based on my use case, but it has simplified. So, please, do not focus on the details unless they are needed to answer.

    I have a flow of data, from our point of view we can see it as a volatile variable I have to read regularly. From the value of the variable I need to compute five functions: f0, f1, f2, f3, and f4. Each takes longer than the previous and uses a input the output of the previous. f0 starts with the volatile variable value.

    This situation calls for the pipeline pattern, basically after five readings we are doing all the computations in parallel.

    Playing around with cppflow (great library, by the way) I understood how to make one loop, however this is not helpful as if there is only one reading then the computation is basically single threaded.

    So my question is, it is possible to implement something like this? I would be happy to read the fine manual if the answer is there, just point me in the right direction.

    enhancement help wanted 
    opened by paolobolzoni 20
  • Taskflow reuse

    Taskflow reuse

    I came across cpp-taskflow recently and I find it very interesting and well designed. However, from my point of view, it would be great to have the possibility to reuse a Taskflow graph after its completion.

    To give a bit of background, I work on robot control and we always have a bunch of stuff to compute in a real time loop (up to 1kHz/1ms) in order to send the robot(s) a new command. Taskflow would fit perfectly to describe and perform all the computations but having to reconstruct the graph at each iteration is not very elegant and may also bring some latencies due to the dynamic memory allocations that happen behind the scene.

    Is it something that could be implemented in cpp-taskflow?

    opened by BenjaminNavarro 20
  • Register callback function to run when task is completed

    Register callback function to run when task is completed

    Hi,

    C++ has a bit of a weird API when it comes to std::future and monitoring progress. It is "doable" but certainly not trivial and requires quite a bit of hackery. What i once did is starting the tasks from within a sub thread and let that thread block on the tasks, like your wait_for_all() function. Then, once those tasks were done the block lifted and the next line would be called to in effect notify some other class that the task is done.

    I'm hoping for an api function like that in taskflow. You now have the functions: dispatch/silent_dispatch/wait_for_topologies/wait_for_all I would like to request a new addition to those, lets call it dispatch_callback which would be non blocking and takes one callable as argument. That callable would be called once all tasks are done processing.

    The implementation could be what i described above as hackery :) But i'm guessing you have much neater options at your disposal as you're already managing the threadpool behind it.

    For me, it would make this project really useful! Even though i'm merely using it as a threadpool, it simply becomes very easy and convenient to use and implement multithreading in something :)

    Cheers, Mark

    opened by markg85 19
  • Add semaphores to limit concurrency for certain tasks

    Add semaphores to limit concurrency for certain tasks

    This is the early stages of implementing a "Semaphore" to limit concurrency in certain sections / tasks of the graph.

    A task can be given the requirement to acquire one or multiple semaphores before executing its work and a task can be given the job to release one or multiple semaphores after finishing its work. A task can acquire and release a semaphore, or just acquire or just release it. Semaphores start with an initial count. As long as that count is above 0, tasks can acquire the semaphore and do their work. If the count is 0 or less, a task trying to acquire the semaphore is not run and instead put into a waiting list of that semaphore. When the semaphore is released by another task, then tasks on that waiting list are scheduled to be run again (trying to acquire their semaphore(s) again).

    I've added a simple example with 5 tasks with no links/dependencies between them; which under normal circumstances would be executed concurrently. The example however has a semaphore with initial count 1, and all tasks need to acquire that semaphore before running and release that semaphore after they're done. This limits the concurrently running tasks to only one. See examples/onlyone.cpp

    Todo:

    • [x] Generally: Test! Right now I just have that single example which runs as expected from a first glance.
    • [x] Add ability for acquire/release to change semaphore counter by another value than 1
    • [x] Measure performance impact when feature is not used.
    • [x] Make sure semaphores don't needlessly block worker threads.

    I'd be happy about general feedback about the implementation so far! Did I follow your coding standards? Any oversights? Suggestions for improvement?

    opened by musteresel 18
  • Schedule an individual task with an executor

    Schedule an individual task with an executor

    We're working to convert the deal.II project (see https://github/dealii/dealii) to use cpp-taskflow (#170 was a result of this attempt) and in https://github.com/dealii/dealii/issues/10388 I was wondering whether there is a way to schedule individual tasks with an executor without actually creating a TaskFlow object?

    Right now, we are creating individual tasks using std::async (see https://github.com/dealii/dealii/pull/10389). In essence, this just puts the object into the background. But if cpp-taskflow uses a different thread pool to schedule task flows than the C++ runtime environment, then these two thread pools will get in each others' ways. As a consequence, I'm looking for a way to run individual tasks through cpp-taskflow executors instead of std::async. It seems unnecessary to create a whole task graph for a single task, but I can't quite find the right interface to avoid this.

    enhancement 
    opened by bangerth 18
  • Is taskflow suitable for this scenario ?

    Is taskflow suitable for this scenario ?

    This looks like a wonderful project.

    Here is my problem:

    I would like to run an incremental filter on a compressed 2D image i.e. decompress the image in small windows and then filter each window once it is fully decompressed. The uncompressed image is broken down into 64x64 code blocks (each code block has a matching compressed code stream), and the blocks are grouped into overlapping 128x128 windows, where each window covers 4 blocks. All blocks in a window must be decompressed before that window gets filtered.

    Say I have a 192x192 image; then there are 4 overlapping windows.

    Let's say I have four threads, and each thread processes one window. Each thread tries to decompress its blocks, and when they are all decompressed, it applies the filter, But, if a block is already being decompressed by another thread, then we must wait until the other thread is done (but while waiting we can steal decompress jobs from some other thread).

    So....... could this problem be modelled efficiently with taskflow ?

    One more issue: I would like to set the thread affinity of the threads in the pool, and reserve one core and it's hyperthreaded sibling for each window.

    Thanks very much! Aaron

    opened by boxerab 17
  • Passing data from tasks to their successors

    Passing data from tasks to their successors

    Could you suggest an easy way to pass data (return values) from tasks to their successors? Suppose A->C and B->C and C needs the results of A and B to proceed (typical of divide & conquer algorithms - BTW an example of d&q would be really helpful - e.g. TBB has a great example for recursive Fibonacci).

    As far as I understand, I'd need to emplace A and B, getting the futures; then I need to capture the futures in a lambda that is passed to C.

    Alternatively, I'd have to create variables for the results of A and B, and capture one of them in A and B, and both of them in C. This variables cannot be allocated on stack as it can get destroyed before the tasks have a chance to run, so one has to use dynamic memory allocation.

    These approaches have major disadvantages:

    • futures are heavy objects, allocating memory and requiring synchronisation - I believe they are an overkill for this scenario: We already know that A and B have stopped once C is running, so it is safe to retrieve the return values without synchronisation; moreover, if A and B have not been destroyed yet (my understanding is that the clean-up is only performed when the graph terminates), no memory allocation is necessary.
    • The code becomes unnecessarily ugly - I think it would be much cleaner to pass a lambda accepting 2 parameters to C, and guarantee that the order of parameters corresponds to the order of precede() etc. calls.
    opened by vkhomenko 16
  • Memory is not being released.

    Memory is not being released.

    I have been noticing that memory is not being release even though the taskflow goes out of scope. Do anyone know why for the case below why the memory is not being released?

    struct TestStruct
    {
      using Ptr = std::shared_ptr<TestStruct>;
      using ConstPtr = std::shared_ptr<const TestStruct>;
    
      std::unordered_map<std::string, double> joints;
    };
    
    void runTest()
    {
      std::vector<TestStruct::Ptr> t;
      t.reserve(10000000);
      for (std::size_t j = 0; j < 10000000; j++)
      {
        auto p = std::make_shared<TestStruct>();
        p->joints["joint_1"] = 0;
        p->joints["joint_2"] = 0;
        p->joints["joint_3"] = 0;
        p->joints["joint_4"] = 0;
        p->joints["joint_5"] = 0;
        p->joints["joint_6"] = 0;
        t.push_back(p);
      }
    }
    
    int main(int /*argc*/, char** /*argv*/)
    {
      tf::Executor executor;
    
      {
        tf::Taskflow taskflow;
    
        taskflow.emplace([=]() { runTest(); });
    
        std::future<void> f = executor.run(taskflow);
        f.wait();
      }
    
      std::cout << "Hit enter key to continue!" << std::endl;
      std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
    }
    

    Now if I replace the runTest with the function below it does what I expect the memory ramps up then drops back to where it started.

    inline std::mt19937 mersenne{ static_cast<std::mt19937::result_type>(std::time(nullptr)) };
    void runTest()
    {
      std::vector<double> t;
      t.reserve(1000000000);
      for (std::size_t j = 0; j < 1000000000; j++)
      {
        std::uniform_real_distribution<double> sample{ 0, 10 };
        t.push_back(sample(mersenne));
      }
    }
    
    opened by Levi-Armstrong 15
  • Attaching user data to tasks

    Attaching user data to tasks

    Is it possible or considered for future development to attach user data to a task? My primary interest is a seemingly simple case when a taskflow consists of tasks each with consistent set of boolean flags attached to it. They are used by conditional tasks to skip over some calculations if certain flags are set in dependencies.

    It seems that Node::name is similar kind of data. Maybe it could be generalized for such use cases.

    I can also help implementing this.

    opened by Endilll 13
  • Add ability to set max steals and max yields

    Add ability to set max steals and max yields

    When profiling our application that uses Taskflow, we noticed that std::this_thread::yield(); was causing a lot of cpu spin time. After discussing with @tsung-wei-huang, it sounds like the main problem was that our system uses multiple executors in order to keep resources independent for different types of tasks. He suggested that we try to reduce the number of executors and also that we adjust the max steals and max yields. After testing reducing the max yields and max steals (first 2 commits), this didn't seem to have much of an impact on spin time. As a result, I added an optional sleep after max yields is reached (3rd commit). This does reduce the spin time.

    This PR adds the ability to set the max steals, max yields, and the sleep duration after those are reached. The defaults should work the same as before if you were using all default arguments for the executor. If you were passing in the number of threads (N), simply pass in ((N + 1) << 1) for max steals to maintain the same behavior.

    opened by mpowelson 0
  • Submitting tasks in runtime

    Submitting tasks in runtime

    I was thinking to utilise the taskflow for a UDP load balancer. One thread receives UDP packets in a loop. For each packet received it can create a routing task. I.e. find the destination server from the routing table and send the packet to it. I also want each route to a server to be an independent flow. Otherwise, packets from the same route may be reordered by parallel threads.

    However, I am struggling to understand if and how it can be accomplished with the taskflow library. A taskflow cannot be modified during execution, thus I can't dynamically push tasks to it. Should I use the Pipeline? Maybe the library does not allow such use cases. Please advise!

    opened by maxsharabayko 4
  • Deadlock avoidance scheduling with semaphores?

    Deadlock avoidance scheduling with semaphores?

    Assume we have the following example:

    #include <iostream>
    #include "taskflow/taskflow/core/executor.hpp"
    #include "taskflow/taskflow/core/semaphore.hpp"
    #include "taskflow/taskflow/taskflow.hpp"
    
    int main() {
        tf::Executor exec(24);
    
        tf::Taskflow flow;
    
        tf::Semaphore semaphore(8);
    
        for(size_t i = 0; i < 5; ++i) {
    
            tf::Task commonTask = flow.emplace([i]() {
                std::string str;
                str.append("Common task for i = ").append(std::to_string(i)).append("\n");
                std::cout << str;
            }).name(std::string("Common i = ").append(std::to_string(i)));
    
            for(size_t j = 0; j < 8; ++j) {
                tf::Task preTask = flow.emplace([i, j]() {
                    std::string str;
                    str.append("Pre task for i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)).append("\n");
                    std::cout << str;
                }).name(std::string("Pre task i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)));
    
                tf::Task postTask = flow.emplace([i, j]() {
                    std::string str;
                    str.append("Post task for i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)).append("\n");
                    std::cout << str;
                }).name(std::string("Post task i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)));
    
                preTask.precede(commonTask);
                postTask.succeed(commonTask);
    
                preTask.acquire(semaphore);
                postTask.release(semaphore);
            }
        }
    
        flow.dump(std::cout);
    
        std::cout << "Start!" << std::endl;
        exec.run(flow).wait();
        std::cout << "End!" << std::endl;
    }
    

    Where we want to limit the concurrency, e.g. due to memory allocation constraints

    The issue is that, as I feared it would, taskflow schedules itself into a deadlock.

    Output looks like:

    Start!
    Pre task for i = 0, j = 0
    Pre task for i = 0, j = 5
    Pre task for i = 0, j = 4
    Pre task for i = 0, j = 2
    Pre task for i = 0, j = 3
    Pre task for i = 0, j = 7
    Pre task for i = 0, j = 1
    Pre task for i = 0, j = 6
    Common task for i = 0
    Post task for i = 0, j = 7
    Post task for i = 0, j = 0
    Post task for i = 0, j = 2
    Post task for i = 0, j = 3
    Pre task for i = 3, j = 3
    Pre task for i = 1, j = 0
    Post task for i = 0, j = 5
    Post task for i = 0, j = 1
    Pre task for i = 1, j = 3
    Pre task for i = 2, j = 2
    Pre task for i = 1, j = 1
    Post task for i = 0, j = 6
    Pre task for i = 4, j = 1
    Post task for i = 0, j = 4
    Pre task for i = 2, j = 6
    Pre task for i = 1, j = 2
    

    ...and then hangs

    What would be the best way to rewrite this issue and still limit the concurrency?

    Maybe adding another separate acquire (precedes all 8 pre tasks) and release (succeeds all post tasks) task?

    And would it be feasible for taskflow to get a scheduler that understands such issue and avoids them? I could imagine that this is, as always, an NP-complete issue or would come with great implications.

    Anyway, I think adding a warning to the documentation would be good - Warn users that taskflow may schedule itself into deadlocks when using semaphores

    opened by robinchrist 0
  • [Question]-Task timeout

    [Question]-Task timeout

    Thanks for your great library

    I want to know is there any way to detect a timeout from the running task?

    For example you have taskflow with some routine (sub flow) when the application receiving the message, it sends it to taskflow for execution, and each tasks has some timeout for execute the request, if the execution takes time we need to mark it as a timeout and send timeout reply to requester and terminate the running task

    How can we detect the timeout for each task from main process?

    enhancement 
    opened by mohsenomidi 2
  • Non-global ObjectPool for control over memory footprint

    Non-global ObjectPool for control over memory footprint

    My understanding of the internals of taskflow is quite limited. So pardon if this is a dumb question.

    I'm using taskflow as an asynchronous simulation engine. I'm only using static tasks, although quite a lot of them.

    I have a struct more or less like this:

    struct TaskflowContext {
        tf::Executor executor;
        tf::Taskflow taskflow;
        tf::Future future;
    };
    

    (Maybe this is irrelevant for this question but I use the future to be able to cancel the running simulation.)

    What happens is that once the taskflow finishes executing for a large task graph. I notice that there is still a substantial amount of memory that isn't released. I implemented a shrink function that reassigns the tf::Taskflow after completion, and that frees some memory, but only around 10% of the total usage.

    I also tried destroying and restarting the executor (by wrapping it in a std::optional<tf::Executor>) but that didn't work either.

    Which led me find the tf::ObjectPool<Node> node_pool;. Is this the thing I'm looking for? Is this a global object and if so can it be cleared safely at some point in time?

    Or alternatively, make it possible to pass in your own node_pool reference to the tf::Executor and tf::Taskflow constructors? This way the user could have control over the entire memory footprint of the taskflow context.

    enhancement 
    opened by sunbubble 5
  • Hierarchical Pipeline

    Hierarchical Pipeline

    Hi Tsung,

    Did the changes based on our discussion.

    Please find the attached sample program, demonstrating my use case.

    Request you to bless the code and suggest the improvements.

    Please use the following command to compile the sample g++ -fdiagnostics-color=always -std=c++17 -stdlib=libc++ -g -I/opt/taskflow-examples/include -I/opt/taskflow-examples/../taskflow /opt/taskflow-examples/simple.cpp -o /opt/taskflow-examples/simple

    opened by sureshgl 3
Releases(v3.4.0)
Owner
Taskflow
A General-purpose Parallel and Heterogeneous Task Programming System
Taskflow
Parallel-util - Simple header-only implementation of "parallel for" and "parallel map" for C++11

parallel-util A single-header implementation of parallel_for, parallel_map, and parallel_exec using C++11. This library is based on multi-threading on

Yuki Koyama 27 Jun 24, 2022
Cpp-taskflow - Modern C++ Parallel Task Programming Library

Cpp-Taskflow A fast C++ header-only library to help you quickly write parallel programs with complex task dependencies Why Cpp-Taskflow? Cpp-Taskflow

null 4 Mar 30, 2021
A library for enabling task-based multi-threading. It allows execution of task graphs with arbitrary dependencies.

Fiber Tasking Lib This is a library for enabling task-based multi-threading. It allows execution of task graphs with arbitrary dependencies. Dependenc

RichieSams 775 Aug 7, 2022
EnkiTS - A permissively licensed C and C++ Task Scheduler for creating parallel programs. Requires C++11 support.

Support development of enkiTS through Github Sponsors or Patreon enkiTS Master branch Dev branch enki Task Scheduler A permissively licensed C and C++

Doug Binks 1.3k Aug 6, 2022
ArrayFire: a general purpose GPU library.

ArrayFire is a general-purpose library that simplifies the process of developing software that targets parallel and massively-parallel architectures i

ArrayFire 3.9k Aug 9, 2022
C++-based high-performance parallel environment execution engine for general RL environments.

EnvPool is a highly parallel reinforcement learning environment execution engine which significantly outperforms existing environment executors. With

Sea AI Lab 571 Aug 5, 2022
Kokkos C++ Performance Portability Programming EcoSystem: The Programming Model - Parallel Execution and Memory Abstraction

Kokkos: Core Libraries Kokkos Core implements a programming model in C++ for writing performance portable applications targeting all major HPC platfor

Kokkos 1.1k Aug 10, 2022
Task System presented in "Better Code: Concurrency - Sean Parent"

task_system task_system provides a task scheduler for modern C++. The scheduler manages an array of concurrent queues A task, when scheduled, is enque

Pranav 28 Jun 30, 2022
Jobxx - Lightweight C++ task system

jobxx License Copyright (c) 2017 Sean Middleditch [email protected] This is free and unencumbered software released into the public domain. A

Sean Middleditch 77 May 28, 2022
ParallelComputingPlayground - Shows different programming techniques for parallel computing on CPU and GPU

ParallelComputingPlayground Shows different programming techniques for parallel computing on CPU and GPU. Purpose The idea here is to compute a Mandel

Morten Nobel-Jørgensen 2 May 16, 2020
OOX: Out-of-Order Executor library. Yet another approach to efficient and scalable tasking API and task scheduling.

OOX Out-of-Order Executor library. Yet another approach to efficient and scalable tasking API and task scheduling. Try it Requirements: Install cmake,

Intel Corporation 17 Aug 9, 2022
Material for the UIBK Parallel Programming Lab (2021)

UIBK PS Parallel Systems (703078, 2021) This repository contains material required to complete exercises for the Parallel Programming lab in the 2021

null 12 May 6, 2022
Arcana.cpp - Arcana.cpp is a collection of helpers and utility code for low overhead, cross platform C++ implementation of task-based asynchrony.

Arcana.cpp Arcana is a collection of general purpose C++ utilities with no code that is specific to a particular project or specialized technology are

Microsoft 64 Aug 10, 2022
A task scheduling framework designed for the needs of game developers.

Intel Games Task Scheduler (GTS) To the documentation. Introduction GTS is a C++ task scheduling framework for multi-processor platforms. It is design

null 419 Aug 14, 2022
A hybrid thread / fiber task scheduler written in C++ 11

Marl Marl is a hybrid thread / fiber task scheduler written in C++ 11. About Marl is a C++ 11 library that provides a fluent interface for running tas

Google 1.4k Aug 4, 2022
A header-only C++ library for task concurrency

transwarp Doxygen documentation transwarp is a header-only C++ library for task concurrency. It allows you to easily create a graph of tasks where eve

Christian Blume 590 Aug 6, 2022
C++14 coroutine-based task library for games

SquidTasks Squid::Tasks is a header-only C++14 coroutine-based task library for games. Full project and source code available at https://github.com/we

Tim Ambrogi Saxon 58 Jul 25, 2022
Parallel-hashmap - A family of header-only, very fast and memory-friendly hashmap and btree containers.

The Parallel Hashmap Overview This repository aims to provide a set of excellent hash map implementations, as well as a btree alternative to std::map

Gregory Popovitch 1.5k Aug 9, 2022
Powerful multi-threaded coroutine dispatcher and parallel execution engine

Quantum Library : A scalable C++ coroutine framework Quantum is a full-featured and powerful C++ framework build on top of the Boost coroutine library

Bloomberg 447 Jul 25, 2022