A General-purpose Parallel and Heterogeneous Task Programming System

Overview

Taskflow

taskflow Windows Build status Ubuntu macOS Windows Wiki TFProf Cite

Taskflow helps you quickly write parallel and heterogeneous tasks programs in modern C++

Why Taskflow?

Taskflow is faster, more expressive, and easier for drop-in integration than many of existing task programming frameworks in handling complex parallel workloads.

Taskflow lets you quickly implement task decomposition strategies that incorporate both regular and irregular compute patterns, together with an efficient work-stealing scheduler to optimize your multithreaded performance.

Static Tasking Dynamic Tasking

Taskflow supports conditional tasking for you to make rapid control-flow decisions across dependent tasks to implement cycles and conditions that were otherwise difficult to do with existing tools.

Conditional Tasking

Taskflow is composable. You can create large parallel graphs through composition of modular and reusable blocks that are easier to optimize at an individual scope.

Taskflow Composition

Taskflow supports heterogeneous tasking for you to accelerate a wide range of scientific computing applications by harnessing the power of CPU-GPU collaborative computing.

Concurrent CPU-GPU Tasking

Taskflow provides visualization and tooling needed for profiling Taskflow programs.

Taskflow Profiler

We are committed to support trustworthy developments for both academic and industrial research projects in parallel computing. Check out Who is Using Taskflow and what our users say:

See a quick presentation and visit the documentation to learn more about Taskflow. Technical details can be referred to our IPDPS paper.

Start Your First Taskflow Program

The following program (simple.cpp) creates four tasks A, B, C, and D, where A runs before B and C, and D runs after B and C. When A finishes, B and C can run in parallel.

#include <taskflow/taskflow.hpp>  // Taskflow is header-only

int main(){
  
  tf::Executor executor;
  tf::Taskflow taskflow;

  auto [A, B, C, D] = taskflow.emplace(  // create 4 tasks
    [] () { std::cout << "TaskA\n"; },
    [] () { std::cout << "TaskB\n"; },
    [] () { std::cout << "TaskC\n"; },
    [] () { std::cout << "TaskD\n"; } 
  );                                  
                                      
  A.precede(B, C);  // A runs before B and C
  D.succeed(B, C);  // D runs after  B and C
                                      
  executor.run(taskflow).wait(); 

  return 0;
}

Taskflow is header-only and there is no wrangle with installation. To compile the program, clone the Taskflow project and tell the compiler to include the headers.

~$ git clone https://github.com/taskflow/taskflow.git  # clone it only once
~$ g++ -std=c++17 simple.cpp -I taskflow/taskflow -O2 -pthread -o simple
~$ ./simple
TaskA
TaskC 
TaskB 
TaskD

Visualize Your First Taskflow Program

Taskflow comes with a built-in profiler, TFProf, for you to profile and visualize taskflow programs in an easy-to-use web-based interface.

# run the program with the environment variable TF_ENABLE_PROFILER enabled
~$ TF_ENABLE_PROFILER=simple.json ./simple
~$ cat simple.json
[
{"executor":"0","data":[{"worker":0,"level":0,"data":[{"span":[172,186],"name":"0_0","type":"static"},{"span":[187,189],"name":"0_1","type":"static"}]},{"worker":2,"level":0,"data":[{"span":[93,164],"name":"2_0","type":"static"},{"span":[170,179],"name":"2_1","type":"static"}]}]}
]
# paste the profiling json data to https://taskflow.github.io/tfprof/

In addition to execution diagram, you can dump the graph to a DOT format and visualize it using a number of free GraphViz tools.

// dump the taskflow graph to a DOT format through std::cout
taskflow.dump(std::cout); 

Supported Compilers

To use Taskflow, you only need a compiler that supports C++17:

  • GNU C++ Compiler at least v7.0 with -std=c++17
  • Clang C++ Compiler at least v6.0 with -std=c++17
  • Microsoft Visual Studio at least v19.27 with /std:c++17
  • AppleClang Xode Version at least v12.0 with -std=c++17
  • Nvidia CUDA Toolkit and Compiler (nvcc) at least v11.1 with -std=c++17
  • Intel C++ Compiler (nvcc) at least v19.0.1 with -std=c++17

Taskflow works on Linux, Windows, and Mac OS X.

Learn More about Taskflow

Visit our project website and documentation to learn more about Taskflow. To get involved:

CppCon20 Tech Talk MUC++ Tech Talk

We are committed to support trustworthy developments for both academic and industrial research projects in parallel and heterogeneous computing. At the same time, we appreciate all Taskflow contributors!

License

Taskflow is licensed with the MIT License. You are completely free to re-distribute your work derived from Taskflow.


Issues
  • _per_thread() data in separate compilation units

    _per_thread() data in separate compilation units

    When an tf::Executor is created in one translation unit and then used in another translation unit (or shared library?), the per-thread data inside the Executor is invalid. This causes asserts like

    ../bundled/taskflow-2.7.0/include/taskflow/core/executor.hpp:976: void tf::Executor::_invoke_dynamic_work_external(tf::Node*, tf::Graph&, bool): Assertion `worker &&
    worker->executor == this' failed.
    

    to trigger.

    The problem is https://github.com/taskflow/taskflow/blob/9d17ee3fb28ef8b9b92cd28d10d7ac840ad33de8/taskflow/core/executor.hpp#L446 which will create a different copy in each translation unit, also see https://stackoverflow.com/questions/185624/static-variables-in-an-inlined-function for more details.

    I am attaching an example that uses a shared library for this. Compile and run with:

    export LD_LIBRARY_PATH=`pwd`
    # 1. make libfoo.so
    g++ --std=c++14 -Wall -I ~/taskflow/installed/include/ -O2 -pthread -fpic -c other.cc
    g++ -shared -o libfoo.so other.o
    
    # 2. compile a.out and link libfoo.so
    g++ --std=c++14 -Wall -I ~/taskflow/installed/include/ -O2 -pthread -o a.out -L. main.cc -lfoo
    
    # 3. run
    ./a.out
    

    source code to reproduce: taskflow-thread-local-bug.zip

    bug 
    opened by tjhei 25
  • It is possible to create a continuous flow?

    It is possible to create a continuous flow?

    The situation I am describing is based on my use case, but it has simplified. So, please, do not focus on the details unless they are needed to answer.

    I have a flow of data, from our point of view we can see it as a volatile variable I have to read regularly. From the value of the variable I need to compute five functions: f0, f1, f2, f3, and f4. Each takes longer than the previous and uses a input the output of the previous. f0 starts with the volatile variable value.

    This situation calls for the pipeline pattern, basically after five readings we are doing all the computations in parallel.

    Playing around with cppflow (great library, by the way) I understood how to make one loop, however this is not helpful as if there is only one reading then the computation is basically single threaded.

    So my question is, it is possible to implement something like this? I would be happy to read the fine manual if the answer is there, just point me in the right direction.

    enhancement help wanted 
    opened by paolobolzoni 20
  • Taskflow reuse

    Taskflow reuse

    I came across cpp-taskflow recently and I find it very interesting and well designed. However, from my point of view, it would be great to have the possibility to reuse a Taskflow graph after its completion.

    To give a bit of background, I work on robot control and we always have a bunch of stuff to compute in a real time loop (up to 1kHz/1ms) in order to send the robot(s) a new command. Taskflow would fit perfectly to describe and perform all the computations but having to reconstruct the graph at each iteration is not very elegant and may also bring some latencies due to the dynamic memory allocations that happen behind the scene.

    Is it something that could be implemented in cpp-taskflow?

    opened by BenjaminNavarro 20
  • Register callback function to run when task is completed

    Register callback function to run when task is completed

    Hi,

    C++ has a bit of a weird API when it comes to std::future and monitoring progress. It is "doable" but certainly not trivial and requires quite a bit of hackery. What i once did is starting the tasks from within a sub thread and let that thread block on the tasks, like your wait_for_all() function. Then, once those tasks were done the block lifted and the next line would be called to in effect notify some other class that the task is done.

    I'm hoping for an api function like that in taskflow. You now have the functions: dispatch/silent_dispatch/wait_for_topologies/wait_for_all I would like to request a new addition to those, lets call it dispatch_callback which would be non blocking and takes one callable as argument. That callable would be called once all tasks are done processing.

    The implementation could be what i described above as hackery :) But i'm guessing you have much neater options at your disposal as you're already managing the threadpool behind it.

    For me, it would make this project really useful! Even though i'm merely using it as a threadpool, it simply becomes very easy and convenient to use and implement multithreading in something :)

    Cheers, Mark

    opened by markg85 19
  • Add semaphores to limit concurrency for certain tasks

    Add semaphores to limit concurrency for certain tasks

    This is the early stages of implementing a "Semaphore" to limit concurrency in certain sections / tasks of the graph.

    A task can be given the requirement to acquire one or multiple semaphores before executing its work and a task can be given the job to release one or multiple semaphores after finishing its work. A task can acquire and release a semaphore, or just acquire or just release it. Semaphores start with an initial count. As long as that count is above 0, tasks can acquire the semaphore and do their work. If the count is 0 or less, a task trying to acquire the semaphore is not run and instead put into a waiting list of that semaphore. When the semaphore is released by another task, then tasks on that waiting list are scheduled to be run again (trying to acquire their semaphore(s) again).

    I've added a simple example with 5 tasks with no links/dependencies between them; which under normal circumstances would be executed concurrently. The example however has a semaphore with initial count 1, and all tasks need to acquire that semaphore before running and release that semaphore after they're done. This limits the concurrently running tasks to only one. See examples/onlyone.cpp

    Todo:

    • [x] Generally: Test! Right now I just have that single example which runs as expected from a first glance.
    • [x] Add ability for acquire/release to change semaphore counter by another value than 1
    • [x] Measure performance impact when feature is not used.
    • [x] Make sure semaphores don't needlessly block worker threads.

    I'd be happy about general feedback about the implementation so far! Did I follow your coding standards? Any oversights? Suggestions for improvement?

    opened by musteresel 18
  • Schedule an individual task with an executor

    Schedule an individual task with an executor

    We're working to convert the deal.II project (see https://github/dealii/dealii) to use cpp-taskflow (#170 was a result of this attempt) and in https://github.com/dealii/dealii/issues/10388 I was wondering whether there is a way to schedule individual tasks with an executor without actually creating a TaskFlow object?

    Right now, we are creating individual tasks using std::async (see https://github.com/dealii/dealii/pull/10389). In essence, this just puts the object into the background. But if cpp-taskflow uses a different thread pool to schedule task flows than the C++ runtime environment, then these two thread pools will get in each others' ways. As a consequence, I'm looking for a way to run individual tasks through cpp-taskflow executors instead of std::async. It seems unnecessary to create a whole task graph for a single task, but I can't quite find the right interface to avoid this.

    enhancement 
    opened by bangerth 18
  • Is taskflow suitable for this scenario ?

    Is taskflow suitable for this scenario ?

    This looks like a wonderful project.

    Here is my problem:

    I would like to run an incremental filter on a compressed 2D image i.e. decompress the image in small windows and then filter each window once it is fully decompressed. The uncompressed image is broken down into 64x64 code blocks (each code block has a matching compressed code stream), and the blocks are grouped into overlapping 128x128 windows, where each window covers 4 blocks. All blocks in a window must be decompressed before that window gets filtered.

    Say I have a 192x192 image; then there are 4 overlapping windows.

    Let's say I have four threads, and each thread processes one window. Each thread tries to decompress its blocks, and when they are all decompressed, it applies the filter, But, if a block is already being decompressed by another thread, then we must wait until the other thread is done (but while waiting we can steal decompress jobs from some other thread).

    So....... could this problem be modelled efficiently with taskflow ?

    One more issue: I would like to set the thread affinity of the threads in the pool, and reserve one core and it's hyperthreaded sibling for each window.

    Thanks very much! Aaron

    opened by boxerab 17
  • Passing data from tasks to their successors

    Passing data from tasks to their successors

    Could you suggest an easy way to pass data (return values) from tasks to their successors? Suppose A->C and B->C and C needs the results of A and B to proceed (typical of divide & conquer algorithms - BTW an example of d&q would be really helpful - e.g. TBB has a great example for recursive Fibonacci).

    As far as I understand, I'd need to emplace A and B, getting the futures; then I need to capture the futures in a lambda that is passed to C.

    Alternatively, I'd have to create variables for the results of A and B, and capture one of them in A and B, and both of them in C. This variables cannot be allocated on stack as it can get destroyed before the tasks have a chance to run, so one has to use dynamic memory allocation.

    These approaches have major disadvantages:

    • futures are heavy objects, allocating memory and requiring synchronisation - I believe they are an overkill for this scenario: We already know that A and B have stopped once C is running, so it is safe to retrieve the return values without synchronisation; moreover, if A and B have not been destroyed yet (my understanding is that the clean-up is only performed when the graph terminates), no memory allocation is necessary.
    • The code becomes unnecessarily ugly - I think it would be much cleaner to pass a lambda accepting 2 parameters to C, and guarantee that the order of parameters corresponds to the order of precede() etc. calls.
    opened by vkhomenko 16
  • Memory is not being released.

    Memory is not being released.

    I have been noticing that memory is not being release even though the taskflow goes out of scope. Do anyone know why for the case below why the memory is not being released?

    struct TestStruct
    {
      using Ptr = std::shared_ptr<TestStruct>;
      using ConstPtr = std::shared_ptr<const TestStruct>;
    
      std::unordered_map<std::string, double> joints;
    };
    
    void runTest()
    {
      std::vector<TestStruct::Ptr> t;
      t.reserve(10000000);
      for (std::size_t j = 0; j < 10000000; j++)
      {
        auto p = std::make_shared<TestStruct>();
        p->joints["joint_1"] = 0;
        p->joints["joint_2"] = 0;
        p->joints["joint_3"] = 0;
        p->joints["joint_4"] = 0;
        p->joints["joint_5"] = 0;
        p->joints["joint_6"] = 0;
        t.push_back(p);
      }
    }
    
    int main(int /*argc*/, char** /*argv*/)
    {
      tf::Executor executor;
    
      {
        tf::Taskflow taskflow;
    
        taskflow.emplace([=]() { runTest(); });
    
        std::future<void> f = executor.run(taskflow);
        f.wait();
      }
    
      std::cout << "Hit enter key to continue!" << std::endl;
      std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
    }
    

    Now if I replace the runTest with the function below it does what I expect the memory ramps up then drops back to where it started.

    inline std::mt19937 mersenne{ static_cast<std::mt19937::result_type>(std::time(nullptr)) };
    void runTest()
    {
      std::vector<double> t;
      t.reserve(1000000000);
      for (std::size_t j = 0; j < 1000000000; j++)
      {
        std::uniform_real_distribution<double> sample{ 0, 10 };
        t.push_back(sample(mersenne));
      }
    }
    
    opened by Levi-Armstrong 15
  • Attaching user data to tasks

    Attaching user data to tasks

    Is it possible or considered for future development to attach user data to a task? My primary interest is a seemingly simple case when a taskflow consists of tasks each with consistent set of boolean flags attached to it. They are used by conditional tasks to skip over some calculations if certain flags are set in dependencies.

    It seems that Node::name is similar kind of data. Maybe it could be generalized for such use cases.

    I can also help implementing this.

    opened by Endilll 13
  • compile error with latest master + gcc 10

    compile error with latest master + gcc 10

    error:

    /utility/serializer.hpp:469:18: [ skipping 2 instantiation contexts, use -ftemplate-backtrace-limit=0 to disable ]

    in this code:

      if constexpr (std::is_arithmetic_v<typename U::value_type>) {
        _device.write(
          reinterpret_cast<const char*>(t.data()), 
          t.size() * sizeof(typename U::value_type)
        );
        sz += t.size() * sizeof(typename U::value_type);
      } else {
        for(auto&& item : t) {
         sz += _save(item);
        }
      }
    

    this line: sz += _save(item);

    Full error log:

    In file included from /home/aaron/src/grok/src/include/taskflow/core/graph.hpp:9,
                     from /home/aaron/src/grok/src/include/taskflow/core/task.hpp:3,
                     from /home/aaron/src/grok/src/include/taskflow/core/observer.hpp:3,
                     from /home/aaron/src/grok/src/include/taskflow/core/executor.hpp:3,
                     from /home/aaron/src/grok/src/include/taskflow/taskflow.hpp:3,
                     from /home/aaron/src/grok/src/lib/jp2/grk_includes.h:87,
                     from /home/aaron/src/grok/src/lib/jp2/t1/t1_part1/T1Part1.cpp:17:
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp: In instantiation of ‘SizeType tf::Serializer<Device, SizeType>::_save(T&&) [with T = const std::__cxx11::basic_string<char>&; std::enable_if_t<is_std_basic_string_v<typename std::decay<_Up>::type>, void>* <anonymous> = 0; Device = std::basic_ofstream<char>; SizeType = long int]’:
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:422:16:   required from ‘SizeType tf::Serializer<Device, SizeType>::operator()(T&& ...) [with T = {const std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&, const tf::TaskType&, const std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long int, std::ratio<1, 1000000000> > >&, const std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long int, std::ratio<1, 1000000000> > >&}; Device = std::basic_ofstream<char>; SizeType = long int]’
    /home/aaron/src/grok/src/include/taskflow/core/observer.hpp:35:14:   required from ‘auto tf::Segment::save(Archiver&) const [with Archiver = tf::Serializer<std::basic_ofstream<char>, long int>]’
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:632:16:   required from ‘SizeType tf::Serializer<Device, SizeType>::_save(T&&) [with T = const tf::Segment&; std::enable_if_t<(! is_default_serializable_v<typename std::decay<_Up>::type>), void>* <anonymous> = 0; Device = std::basic_ofstream<char>; SizeType = long int]’
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:469:18:   [ skipping 2 instantiation contexts, use -ftemplate-backtrace-limit=0 to disable ]
    /home/aaron/src/grok/src/include/taskflow/core/observer.hpp:75:14:   required from ‘auto tf::Timeline::save(Archiver&) const [with Archiver = tf::Serializer<std::basic_ofstream<char>, long int>]’
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:632:16:   required from ‘SizeType tf::Serializer<Device, SizeType>::_save(T&&) [with T = const tf::Timeline&; std::enable_if_t<(! is_default_serializable_v<typename std::decay<_Up>::type>), void>* <anonymous> = 0; Device = std::basic_ofstream<char>; SizeType = long int]’
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:469:18:   required from ‘SizeType tf::Serializer<Device, SizeType>::_save(T&&) [with T = const std::vector<tf::Timeline>&; std::enable_if_t<is_std_vector_v<typename std::decay<_Up>::type>, void>* <anonymous> = 0; Device = std::basic_ofstream<char>; SizeType = long int]’
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:422:16:   required from ‘SizeType tf::Serializer<Device, SizeType>::operator()(T&& ...) [with T = {const std::vector<tf::Timeline, std::allocator<tf::Timeline> >&}; Device = std::basic_ofstream<char>; SizeType = long int]’
    /home/aaron/src/grok/src/include/taskflow/core/observer.hpp:101:14:   required from ‘auto tf::ProfileData::save(Archiver&) const [with Archiver = tf::Serializer<std::basic_ofstream<char>, long int>]’
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:632:16:   required from ‘SizeType tf::Serializer<Device, SizeType>::_save(T&&) [with T = tf::ProfileData&; std::enable_if_t<(! is_default_serializable_v<typename std::decay<_Up>::type>), void>* <anonymous> = 0; Device = std::basic_ofstream<char>; SizeType = long int]’
    /home/aaron/src/grok/src/include/taskflow/core/../utility/serializer.hpp:422:16:   required from ‘SizeType tf::Serializer<Device, SizeType>::operator()(T&& ...) [with T = {tf::ProfileData&}; Device = std::basic_ofstream<char>; SizeType = long int]’
    /home/aaron/src/grok/src/include/taskflow/core/observer.hpp:686:22:   required from here
    
    opened by boxerab 13
  • How to embed asynchronous pipelines in TaskFlow

    How to embed asynchronous pipelines in TaskFlow

    I'm trying to use TaskFlow for an application that contains a mix of GPU and CPU-based processing; the caveat is that the GPU processing is not using a local GPU/CUDA but is using an NVIDIA Triton inference server (for deep learning).

    So, as part of the computational graph, we need to do inference using a Triton server; as it will take a while for the data to be uploaded, inference being done, and results being retrieved, we would like to do the inference asynchronously (i.e. the worker thread will steal work while waiting for the inference data to arrive).

    I can't seem to figure out how to implement this using the various constructs - largely because I'm still climbing the TF learning curve. Tried async from the executor, define a Subflow - they don't quite do what I need.

    So, basically, if I'm executing a task - how can I yield back control to the executor while I'm waiting for data to arrive - and once the data arrives, how can I resume the work? (hmm, that does sound like coroutines, doesn't it)

    Question 2: Inference is faster when we combine data into batches. Would there be a way to implement this batching using TF?

    Thanks for the great work!

    Karel

    opened by kzuiderveld 1
  • [Question] Is taskflow for_each_index a drop-in replacement for Intel TBB parallel_for ?

    [Question] Is taskflow for_each_index a drop-in replacement for Intel TBB parallel_for ?

    I am taking a look at a project that uses TBB parallel_for for a few loops. When I tried replacing one of these calls with taskflow for_each_index call, the average duration for the loop increased by around 5X.

    opened by boxerab 18
  • Add ability to set max steals and max yields

    Add ability to set max steals and max yields

    When profiling our application that uses Taskflow, we noticed that std::this_thread::yield(); was causing a lot of cpu spin time. After discussing with @tsung-wei-huang, it sounds like the main problem was that our system uses multiple executors in order to keep resources independent for different types of tasks. He suggested that we try to reduce the number of executors and also that we adjust the max steals and max yields. After testing reducing the max yields and max steals (first 2 commits), this didn't seem to have much of an impact on spin time. As a result, I added an optional sleep after max yields is reached (3rd commit). This does reduce the spin time.

    This PR adds the ability to set the max steals, max yields, and the sleep duration after those are reached. The defaults should work the same as before if you were using all default arguments for the executor. If you were passing in the number of threads (N), simply pass in ((N + 1) << 1) for max steals to maintain the same behavior.

    opened by mpowelson 0
  • Submitting tasks in runtime

    Submitting tasks in runtime

    I was thinking to utilise the taskflow for a UDP load balancer. One thread receives UDP packets in a loop. For each packet received it can create a routing task. I.e. find the destination server from the routing table and send the packet to it. I also want each route to a server to be an independent flow. Otherwise, packets from the same route may be reordered by parallel threads.

    However, I am struggling to understand if and how it can be accomplished with the taskflow library. A taskflow cannot be modified during execution, thus I can't dynamically push tasks to it. Should I use the Pipeline? Maybe the library does not allow such use cases. Please advise!

    opened by maxsharabayko 4
  • Deadlock avoidance scheduling with semaphores?

    Deadlock avoidance scheduling with semaphores?

    Assume we have the following example:

    #include <iostream>
    #include "taskflow/taskflow/core/executor.hpp"
    #include "taskflow/taskflow/core/semaphore.hpp"
    #include "taskflow/taskflow/taskflow.hpp"
    
    int main() {
        tf::Executor exec(24);
    
        tf::Taskflow flow;
    
        tf::Semaphore semaphore(8);
    
        for(size_t i = 0; i < 5; ++i) {
    
            tf::Task commonTask = flow.emplace([i]() {
                std::string str;
                str.append("Common task for i = ").append(std::to_string(i)).append("\n");
                std::cout << str;
            }).name(std::string("Common i = ").append(std::to_string(i)));
    
            for(size_t j = 0; j < 8; ++j) {
                tf::Task preTask = flow.emplace([i, j]() {
                    std::string str;
                    str.append("Pre task for i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)).append("\n");
                    std::cout << str;
                }).name(std::string("Pre task i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)));
    
                tf::Task postTask = flow.emplace([i, j]() {
                    std::string str;
                    str.append("Post task for i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)).append("\n");
                    std::cout << str;
                }).name(std::string("Post task i = ").append(std::to_string(i)).append(", j = ").append(std::to_string(j)));
    
                preTask.precede(commonTask);
                postTask.succeed(commonTask);
    
                preTask.acquire(semaphore);
                postTask.release(semaphore);
            }
        }
    
        flow.dump(std::cout);
    
        std::cout << "Start!" << std::endl;
        exec.run(flow).wait();
        std::cout << "End!" << std::endl;
    }
    

    Where we want to limit the concurrency, e.g. due to memory allocation constraints

    The issue is that, as I feared it would, taskflow schedules itself into a deadlock.

    Output looks like:

    Start!
    Pre task for i = 0, j = 0
    Pre task for i = 0, j = 5
    Pre task for i = 0, j = 4
    Pre task for i = 0, j = 2
    Pre task for i = 0, j = 3
    Pre task for i = 0, j = 7
    Pre task for i = 0, j = 1
    Pre task for i = 0, j = 6
    Common task for i = 0
    Post task for i = 0, j = 7
    Post task for i = 0, j = 0
    Post task for i = 0, j = 2
    Post task for i = 0, j = 3
    Pre task for i = 3, j = 3
    Pre task for i = 1, j = 0
    Post task for i = 0, j = 5
    Post task for i = 0, j = 1
    Pre task for i = 1, j = 3
    Pre task for i = 2, j = 2
    Pre task for i = 1, j = 1
    Post task for i = 0, j = 6
    Pre task for i = 4, j = 1
    Post task for i = 0, j = 4
    Pre task for i = 2, j = 6
    Pre task for i = 1, j = 2
    

    ...and then hangs

    What would be the best way to rewrite this issue and still limit the concurrency?

    Maybe adding another separate acquire (precedes all 8 pre tasks) and release (succeeds all post tasks) task?

    And would it be feasible for taskflow to get a scheduler that understands such issue and avoids them? I could imagine that this is, as always, an NP-complete issue or would come with great implications.

    Anyway, I think adding a warning to the documentation would be good - Warn users that taskflow may schedule itself into deadlocks when using semaphores

    opened by robinchrist 0
  • [Question]-Task timeout

    [Question]-Task timeout

    Thanks for your great library

    I want to know is there any way to detect a timeout from the running task?

    For example you have taskflow with some routine (sub flow) when the application receiving the message, it sends it to taskflow for execution, and each tasks has some timeout for execute the request, if the execution takes time we need to mark it as a timeout and send timeout reply to requester and terminate the running task

    How can we detect the timeout for each task from main process?

    enhancement 
    opened by mohsenomidi 2
Releases(v3.4.0)
Owner
Taskflow
A General-purpose Parallel and Heterogeneous Task Programming System
Taskflow
Parallel-util - Simple header-only implementation of "parallel for" and "parallel map" for C++11

parallel-util A single-header implementation of parallel_for, parallel_map, and parallel_exec using C++11. This library is based on multi-threading on

Yuki Koyama 27 Jun 24, 2022
Cpp-taskflow - Modern C++ Parallel Task Programming Library

Cpp-Taskflow A fast C++ header-only library to help you quickly write parallel programs with complex task dependencies Why Cpp-Taskflow? Cpp-Taskflow

null 4 Mar 30, 2021
A library for enabling task-based multi-threading. It allows execution of task graphs with arbitrary dependencies.

Fiber Tasking Lib This is a library for enabling task-based multi-threading. It allows execution of task graphs with arbitrary dependencies. Dependenc

RichieSams 775 Aug 7, 2022
EnkiTS - A permissively licensed C and C++ Task Scheduler for creating parallel programs. Requires C++11 support.

Support development of enkiTS through Github Sponsors or Patreon enkiTS Master branch Dev branch enki Task Scheduler A permissively licensed C and C++

Doug Binks 1.3k Aug 6, 2022
ArrayFire: a general purpose GPU library.

ArrayFire is a general-purpose library that simplifies the process of developing software that targets parallel and massively-parallel architectures i

ArrayFire 3.9k Aug 9, 2022
C++-based high-performance parallel environment execution engine for general RL environments.

EnvPool is a highly parallel reinforcement learning environment execution engine which significantly outperforms existing environment executors. With

Sea AI Lab 571 Aug 5, 2022
Kokkos C++ Performance Portability Programming EcoSystem: The Programming Model - Parallel Execution and Memory Abstraction

Kokkos: Core Libraries Kokkos Core implements a programming model in C++ for writing performance portable applications targeting all major HPC platfor

Kokkos 1.1k Aug 10, 2022
Task System presented in "Better Code: Concurrency - Sean Parent"

task_system task_system provides a task scheduler for modern C++. The scheduler manages an array of concurrent queues A task, when scheduled, is enque

Pranav 28 Jun 30, 2022
Jobxx - Lightweight C++ task system

jobxx License Copyright (c) 2017 Sean Middleditch [email protected] This is free and unencumbered software released into the public domain. A

Sean Middleditch 77 May 28, 2022
ParallelComputingPlayground - Shows different programming techniques for parallel computing on CPU and GPU

ParallelComputingPlayground Shows different programming techniques for parallel computing on CPU and GPU. Purpose The idea here is to compute a Mandel

Morten Nobel-Jørgensen 2 May 16, 2020
OOX: Out-of-Order Executor library. Yet another approach to efficient and scalable tasking API and task scheduling.

OOX Out-of-Order Executor library. Yet another approach to efficient and scalable tasking API and task scheduling. Try it Requirements: Install cmake,

Intel Corporation 17 Mar 10, 2022
Material for the UIBK Parallel Programming Lab (2021)

UIBK PS Parallel Systems (703078, 2021) This repository contains material required to complete exercises for the Parallel Programming lab in the 2021

null 12 May 6, 2022
Arcana.cpp - Arcana.cpp is a collection of helpers and utility code for low overhead, cross platform C++ implementation of task-based asynchrony.

Arcana.cpp Arcana is a collection of general purpose C++ utilities with no code that is specific to a particular project or specialized technology are

Microsoft 64 Aug 10, 2022
A task scheduling framework designed for the needs of game developers.

Intel Games Task Scheduler (GTS) To the documentation. Introduction GTS is a C++ task scheduling framework for multi-processor platforms. It is design

null 418 Jul 25, 2022
A hybrid thread / fiber task scheduler written in C++ 11

Marl Marl is a hybrid thread / fiber task scheduler written in C++ 11. About Marl is a C++ 11 library that provides a fluent interface for running tas

Google 1.4k Aug 4, 2022
A header-only C++ library for task concurrency

transwarp Doxygen documentation transwarp is a header-only C++ library for task concurrency. It allows you to easily create a graph of tasks where eve

Christian Blume 590 Aug 6, 2022
C++14 coroutine-based task library for games

SquidTasks Squid::Tasks is a header-only C++14 coroutine-based task library for games. Full project and source code available at https://github.com/we

Tim Ambrogi Saxon 58 Jul 25, 2022
Parallel-hashmap - A family of header-only, very fast and memory-friendly hashmap and btree containers.

The Parallel Hashmap Overview This repository aims to provide a set of excellent hash map implementations, as well as a btree alternative to std::map

Gregory Popovitch 1.5k Aug 9, 2022
Powerful multi-threaded coroutine dispatcher and parallel execution engine

Quantum Library : A scalable C++ coroutine framework Quantum is a full-featured and powerful C++ framework build on top of the Boost coroutine library

Bloomberg 447 Jul 25, 2022