An implementation of Actor, Publish-Subscribe, and CSP models in one rather small C++ framework. With performance, quality, and stability proved by years in the production.

Overview

Created by gh-md-toc

What is SObjectizer?

SObjectizer is one of a few cross-platform and OpenSource "actor frameworks" for C++. But SObjectizer supports not only Actor Model, but also Publish-Subscribe Model and CSP-like channels. The goal of SObjectizer is significant simplification of development of concurrent and multithreaded applications in C++.

SObjectizer allows the creation of a concurrent app as a set of agent-objects which interact with each other through asynchronous messages. It handles message dispatching and provides a working context for message processing. And allows to tune those things by supplying various ready-to-use dispatchers.

What distinguishes SObjectizer?

Maturity. SObjectizer is based on ideas that have been put forward in 1995-2000. And SObjectizer itself is being developed since 2002. SObjectizer-5 is continuously evolved since 2010.

Stability. From the very beginning SObjectizer was used for business-critical applications, and some of them are still being used in production. Breaking changes in SObjectizer are rare and we approach to them very carefully.

Cross-platform. SObjectizer runs on Windows, Linux, FreeBSD, macOS and Android.

Easy-to-use. SObjectizer provides easy to understand and easy to use API with a lot of examples in the SObjectizer's distributive and a plenty of information in the project's Wiki.

Free. SObjectizer is distributed under BSD-3-CLAUSE license, so it can be used in development of proprietary commercial software for free.

SObjectizer is not like TBB, taskflow or HPX

SObjectizer is often compared with tools like Intel Threading Building Blocks, taskflow, HPX, and similar to them. Such comparison is just useless.

All those tools are intended to be used for solving tasks from Parallel Computing area: they allow to reduce the computational time by utilizing several CPU cores. For example, you can reencode your video file from one format to another within one hour on one CPU core, by it takes only 15 minutes on four cores. That is the main goal of Parallel Computing.

SObjectizer is intended for a slightly different area: Concurrent Computing. The main goal of SObjectizer is the simplification of doing many different tasks at once. Sometimes there is no need to use more than just one CPU core for that. But if there are several CPU cores, then SObjectizer makes the handling of those tasks and the interaction between them much easier.

The tricky part is the fact that Parallel- and Concurrent Computing use the same concurrency mechanisms and primitives (like threads, mutexes, atomics, and so on) under the hood. But from the high-level point of view Parallel- and Concurrent Computing are used for very different tasks.

As examples of applications that were or could be implemented on top of SObjectizer, we can list multithreaded proxy-server, automatic control system, MQ-broker, database server, and so on.

Show me the code!

HelloWorld example

This is a classical example "Hello, World" expressed by using SObjectizer's agents:

#include <so_5/all.hpp>

class hello_actor final : public so_5::agent_t {
public:
   using so_5::agent_t::agent_t;

   void so_evt_start() override {
      std::cout << "Hello, World!" << std::endl;
      // Finish work of example.
      so_deregister_agent_coop_normally();
   }
};

int main() {
   // Launch SObjectizer.
   so_5::launch([](so_5::environment_t & env) {
         // Add a hello_actor instance in a new cooperation.
         env.register_agent_as_coop( env.make_agent<hello_actor>() );
      });

   return 0;
}

Ping-Pong example

Let's look at more interesting example with two agents and message exchange between them. It is another famous example for actor frameworks, "Ping-Pong":

#include <so_5/all.hpp>

struct ping {
   int counter_;
};

struct pong {
   int counter_;
};

class pinger final : public so_5::agent_t {
   so_5::mbox_t ponger_;

   void on_pong(mhood_t<pong> cmd) {
      if(cmd->counter_ > 0)
         so_5::send<ping>(ponger_, cmd->counter_ - 1);
      else
         so_deregister_agent_coop_normally();
   }

public:
   pinger(context_t ctx) : so_5::agent_t{std::move(ctx)} {}

   void set_ponger(const so_5::mbox_t mbox) { ponger_ = mbox; }

   void so_define_agent() override {
      so_subscribe_self().event( &pinger::on_pong );
   }

   void so_evt_start() override {
      so_5::send<ping>(ponger_, 1000);
   }
};

class ponger final : public so_5::agent_t {
   const so_5::mbox_t pinger_;
   int pings_received_{};

public:
   ponger(context_t ctx, so_5::mbox_t pinger)
      :  so_5::agent_t{std::move(ctx)}
      ,  pinger_{std::move(pinger)}
   {}

   void so_define_agent() override {
      so_subscribe_self().event(
         [this](mhood_t<ping> cmd) {
            ++pings_received_;
            so_5::send<pong>(pinger_, cmd->counter_);
         });
   }

   void so_evt_finish() override {
      std::cout << "pings received: " << pings_received_ << std::endl;
   }
};

int main() {
   so_5::launch([](so_5::environment_t & env) {
         env.introduce_coop([](so_5::coop_t & coop) {
               auto pinger_actor = coop.make_agent<pinger>();
               auto ponger_actor = coop.make_agent<ponger>(
                     pinger_actor->so_direct_mbox());

               pinger_actor->set_ponger(ponger_actor->so_direct_mbox());
            });
      });

   return 0;
}

All agents in the code above are working on the same work thread. How to bind them to different work threads?

It is very simple. Just use an appropriate dispatcher:

int main() {
   so_5::launch([](so_5::environment_t & env) {
         env.introduce_coop(
            so_5::disp::active_obj::make_dispatcher(env).binder(),
            [](so_5::coop_t & coop) {
               auto pinger_actor = coop.make_agent<pinger>();
               auto ponger_actor = coop.make_agent<ponger>(
                     pinger_actor->so_direct_mbox());

               pinger_actor->set_ponger(ponger_actor->so_direct_mbox());
            });
      });

   return 0;
}

Pub/Sub example

SObjectizer supports Pub/Sub model via multi-producer/multi-consumer message boxes. A message sent to that message box will be received by all subscribers of that message type:

#include <so_5/all.hpp>

using namespace std::literals;

struct acquired_value {
   std::chrono::steady_clock::time_point acquired_at_;
   int value_;
};

class producer final : public so_5::agent_t {
   const so_5::mbox_t board_;
   so_5::timer_id_t timer_;
   int counter_{};

   struct acquisition_time final : public so_5::signal_t {};

   void on_timer(mhood_t<acquisition_time>) {
      // Publish the next value for all consumers.
      so_5::send<acquired_value>(
            board_, std::chrono::steady_clock::now(), ++counter_);
   }

public:
   producer(context_t ctx, so_5::mbox_t board)
      :  so_5::agent_t{std::move(ctx)}
      ,  board_{std::move(board)}
   {}

   void so_define_agent() override {
      so_subscribe_self().event(&producer::on_timer);
   }

   void so_evt_start() override {
      // Agent will periodically recive acquisition_time signal
      // without initial delay and with period of 750ms.
      timer_ = so_5::send_periodic<acquisition_time>(*this, 0ms, 750ms);
   }
};

class consumer final : public so_5::agent_t {
   const so_5::mbox_t board_;
   const std::string name_;

   void on_value(mhood_t<acquired_value> cmd) {
      std::cout << name_ << ": " << cmd->value_ << std::endl;
   }

public:
   consumer(context_t ctx, so_5::mbox_t board, std::string name)
      :  so_5::agent_t{std::move(ctx)}
      ,  board_{std::move(board)}
      ,  name_{std::move(name)}
   {}

   void so_define_agent() override {
      so_subscribe(board_).event(&consumer::on_value);
   }
};

int main() {
   so_5::launch([](so_5::environment_t & env) {
         auto board = env.create_mbox();
         env.introduce_coop([board](so_5::coop_t & coop) {
               coop.make_agent<producer>(board);
               coop.make_agent<consumer>(board, "first"s);
               coop.make_agent<consumer>(board, "second"s);
            });

         std::this_thread::sleep_for(std::chrono::seconds(4));
         env.stop();
      });

   return 0;
}

BlinkingLed example

All agents in SObjectizer are finite-state machines. Almost all functionality of hierarchical finite-states machines (HSM) are supported: child states and handlers inheritance, on_enter/on_exit handlers, state timeouts, deep- and shallow state history, except orthogonal states.

Let's see how an agent that implements the following statechart can look like:

Blinking Led Statechart

This is a very simple example that demonstrates an agent for the statechart shown above:

#include <so_5/all.hpp>

using namespace std::literals;

class blinking_led final : public so_5::agent_t {
   state_t off{ this }, blinking{ this },
      blink_on{ initial_substate_of{ blinking } },
      blink_off{ substate_of{ blinking } };

public :
   struct turn_on_off : public so_5::signal_t {};

   blinking_led(context_t ctx) : so_5::agent_t{std::move(ctx)} {
      this >>= off;

      off.just_switch_to<turn_on_off>(blinking);

      blinking.just_switch_to<turn_on_off>(off);

      blink_on
         .on_enter([]{ std::cout << "ON" << std::endl; })
         .on_exit([]{ std::cout << "off" << std::endl; })
         .time_limit(1250ms, blink_off);

      blink_off
         .time_limit(750ms, blink_on);
   }
};

int main()
{
   so_5::launch([](so_5::environment_t & env) {
      so_5::mbox_t m;
      env.introduce_coop([&](so_5::coop_t & coop) {
            auto led = coop.make_agent< blinking_led >();
            m = led->so_direct_mbox();
         });

      const auto pause = [](auto duration) {
         std::this_thread::sleep_for(duration);
      };

      std::cout << "Turn blinking on for 10s" << std::endl;
      so_5::send<blinking_led::turn_on_off>(m);
      pause(10s);

      std::cout << "Turn blinking off for 5s" << std::endl;
      so_5::send<blinking_led::turn_on_off>(m);
      pause(5s);

      std::cout << "Turn blinking on for 5s" << std::endl;
      so_5::send<blinking_led::turn_on_off>(m);
      pause(5s);

      std::cout << "Stopping..." << std::endl;
      env.stop();
   } );

   return 0;
}

CSP-like Ping-Pong example

SObjectizer allows to write concurrent applications even without agents inside. Only plain threads and CSP-like channels can be used.

This is plain-thread implementation of Ping-Pong example (please note that main() is not exception-safe):

#include <so_5/all.hpp>

struct ping {
   int counter_;
};

struct pong {
   int counter_;
};

void pinger_proc(so_5::mchain_t self_ch, so_5::mchain_t ping_ch) {
   so_5::send<ping>(ping_ch, 1000);

   // Read all message until channel will be closed.
   so_5::receive( so_5::from(self_ch).handle_all(),
      [&](so_5::mhood_t<pong> cmd) {
         if(cmd->counter_ > 0)
            so_5::send<ping>(ping_ch, cmd->counter_ - 1);
         else {
            // Channels have to be closed to break `receive` calls.
            so_5::close_drop_content(self_ch);
            so_5::close_drop_content(ping_ch);
         }
      });
}

void ponger_proc(so_5::mchain_t self_ch, so_5::mchain_t pong_ch) {
   int pings_received{};

   // Read all message until channel will be closed.
   so_5::receive( so_5::from(self_ch).handle_all(),
      [&](so_5::mhood_t<ping> cmd) {
         ++pings_received;
         so_5::send<pong>(pong_ch, cmd->counter_);
      });

   std::cout << "pings received: " << pings_received << std::endl;
}

int main() {
   so_5::wrapped_env_t sobj;

   auto pinger_ch = so_5::create_mchain(sobj);
   auto ponger_ch = so_5::create_mchain(sobj);

   std::thread pinger{pinger_proc, pinger_ch, ponger_ch};
   std::thread ponger{ponger_proc, ponger_ch, pinger_ch};

   ponger.join();
   pinger.join();

   return 0;
}

Another CSP-example with Golang's like select() statement

SObjectizer provides a select() function that is similar to Golang's select statement. This function allows waiting for incoming messages from several message chains. It also allows to wait for the readiness of message chains for accepting a new outgoing message. So select() allows to do non-blocking send() calls with the handling of incoming messages while the target message chain is full.

There is a Fibonacci calculation example that uses select() as the back-pressure mechanism (number producer thread will wait if number reader thread doesn't read the previous number yet). Note also that main() function in this example is exception-safe.

#include <so_5/all.hpp>

#include <chrono>

using namespace std;
using namespace std::chrono_literals;
using namespace so_5;

struct quit {};

void fibonacci( mchain_t values_ch, mchain_t quit_ch )
{
   int x = 0, y = 1;
   mchain_select_result_t r;
   do
   {
      r = select(
         from_all().handle_n(1),
         // Sends a new message of type 'int' with value 'x' inside
         // when values_ch is ready for a new outgoing message.
         send_case( values_ch, message_holder_t<int>::make(x),
               [&x, &y] { // This block of code will be called after the send().
                  auto old_x = x;
                  x = y; y = old_x + y;
               } ),
         // Receive a 'quit' message from quit_ch if it is here.
         receive_case( quit_ch, [](quit){} ) );
   }
   // Continue the loop while we send something and receive nothing.
   while( r.was_sent() && !r.was_handled() );
}

int main()
{
   wrapped_env_t sobj;

   thread fibonacci_thr;
   auto thr_joiner = auto_join( fibonacci_thr );

   // The chain for Fibonacci number will have limited capacity.
   auto values_ch = create_mchain( sobj, 1s, 1,
         mchain_props::memory_usage_t::preallocated,
         mchain_props::overflow_reaction_t::abort_app );

   auto quit_ch = create_mchain( sobj );
   auto ch_closer = auto_close_drop_content( values_ch, quit_ch );

   fibonacci_thr = thread{ fibonacci, values_ch, quit_ch };

   // Read the first 10 numbers from values_ch.
   receive( from( values_ch ).handle_n( 10 ),
         // And show every number to the standard output.
         []( int v ) { cout << v << endl; } );

   send< quit >( quit_ch );
}

Want to know more?

More information about SObjectizer can be found in the corresponding section of the project's Wiki.

Limitations

SObjectizer is an in-process message dispatching framework. It doesn't support distributed applications just out of box. But external tools and libraries can be used in that case. Please take a look at our mosquitto_transport experiment: https://github.com/Stiffstream/mosquitto_transport

Obtaining and building

SObjectizer can be checked out from GitHub. Archives with SObjectizer's source code can be downloaded from GitHub or from SourceForge.

There are two ways for building SObjectizer. The first one by using Mxx_ru tool. The second one by using CMake.

NOTE. Since v.5.5.15.2 there is a support of Android platform. Building for Android is possible by CMake only. See the corresponding section below.

SObjectizer can also be installed and used via vcpkg and Conan dependency managers. See the appropriate sections below.

SObjectizer-5.7 requires C++17!

The 5.7-branch of SObjectizer requires C++17.

If you need support for C++14 or C++11 try to look to older versions of SObjectizer on SourceForge. Or contact stiffstream to discuss porting of SObjectizer-5.7 to older C++ standards.

Building via Mxx_ru

NOTE. This is a standard way for building SObjectizer. This way is used in SObjectizer development process.

To build SObjectizer it is necessary to use Ruby language and Mxx_ru tool. Install Ruby and then install Mxx_ru via RubyGems command:

gem install Mxx_ru

If you already have Mxx_ru installed please update to at least version 1.6.14.6:

gem update Mxx_ru

SObjectizer can be obtained from Git repository on GitHub:

git clone https://github.com/stiffstream/sobjectizer

To build SObjectizer:

cd sobjectizer/dev
ruby build.rb

Static and shared library for SObjectizer will be built. Libraries will be placed into target/release subdirectory.

If you want to build just shared library:

cd sobjectizer/dev
ruby so_5/prj.rb

Or if you want to build just static library:

cd sobjectizer/dev
ruby so_5/prj_s.rb

To build SObjectizer with all tests and samples:

cd sobjectizer/dev
ruby build_all.rb

Please note that under FreeBSD it could be necessary to define LD_LIBRARY_PATH environment variable. And the actual build command sequence under FreeBSD could be as follows:

cd sobjectizer/dev
export LD_LIBRARY_PATH=target/release
ruby build_all.rb

To build html-format documentation for SObjectizer the Doxygen tool is necessary. If it is installed then:

cd sobjectizer/doxygen
doxygen

Generated html-files will be located in sobjectizer/dev/doc/html.

NOTE. If you do not specify MXX_RU_CPP_TOOLSET by youself then Mxx_ru will try to detect your C++ toolset automatically. If you want to use C++ compiler which is not default in your system please define MXX_RU_CPP_TOOLSET environment variable manually. It could look like:

export MXX_RU_CPP_TOOLSET="clang_linux compiler_name=clang++-6 linker_name=clang++-6"

More information about tuning Mxx_ru for your needs you can find in the corresponding documentation.

Building via CMake

To build SObjectizer via CMake it is necessary to have CMake and some knowledge of how to use it. The following action is just a demonstration. For more detailed info about cmake build system for SObjectizer see dev/cmake/CmakeQuickHowto.txt

To get and build SObjectizer under Linux/FreeBSD in command line run:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release ../dev
cmake --build . --config Release
cmake --build . --config Release --target install

Those commands will create all necessary Makefile, then build SObjectizer. If it necessary to build examples and tests too, use

cmake -DBUILD_ALL=ON -DCMAKE_INSTALL_PREFIX=target ../dev

When 'make install' finished './target' will contain two subfolders './bin' with samples and './lib' with shared libso.5.x.x.so

CMake build system currently supports this options:

  • SOBJECTIZER_BUILD_STATIC. Enable building SObjectizer as a static library [default: ON]
  • SOBJECTIZER_BUILD_SHARED. Enable building SObjectizer as a shared library [default: ON]
  • BUILD_ALL. Enable building examples and tests [default: OFF]
  • BUILD_EXAMPLES. Enable building examples [default: OFF]
  • BUILD_TESTS. Enable building tests [default: OFF]

Please note that if BUILD_ALL or BUILD_EXAMPLES or BUILD_TESTS is turned ON then both SOBJECTIZER_BUILD_STATIC and SOBJECTIZER_BUILD_SHARED must be turned ON. It means that if SOBJECTIZER_BUILD_STATIC or SOBJECTIZER_BUILD_SHARED is turned OFF then BUILD_ALL/BUILD_EXAMPLES/BUILD_TESTS all must be turned OFF.

To build SObjectizer under Windows by MS Visual Studio 2013 from command line:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release -G "Visual Studio 15 2017" ../dev
cmake --build . --config Release
cmake --build . --config Release --target install

If it necessary to build examples too, use BUILD_ALL in cmake invocation:

cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release -DBUILD_ALL=ON -G "Visual Studio 15 2017" ../dev

Since v.5.5.24 SObjectizer provides sobjectizer-config.cmake files. These files are automatically installed into <target>/lib/cmake/sobjectizer subfolder. It allows to use SObjectizer via CMake's find_package command.

Building for Android

Building for Android is possible via a rather fresh Android NDK or CrystaX NDK.

Building with Android NDK

You need Android SDK and Android NDK installed in your system. As well as an appropriate version of CMake. You have also need properly set environment variables ANDROID_HOME, ANDROID_NDK. Then you can issue the following commands:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DBUILD_ALL -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release \
     -DCMAKE_TOOLCHAIN_FILE=${ANDROID_NDK}/build/cmake/android.toolchain.cmake \
     -G Ninja \
     -DANDROID_ABI=arm64-v8a \
     -DANDROID_NDK=${ANDROID_NDK} \
     -DANDROID_NATIVE_API_LEVEL=23 \
     -DANDROID_TOOLCHAIN=clang \
     ../dev
cmake --build . --config=Release
cmake --build . --config=Release --target install

Building with CrystaX NDK

You need CrystaX NDK v.10.4.0 or higher already installed in your system. CMake is used for building SObjectizer:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
export NDK=/path/to/the/crystax-ndk
cmake -DBUILD_ALL -DCMAKE_INSTALL_PREFIX=result -DCMAKE_TOOLCHAIN_FILE=$NDK/cmake/toolchain.cmake -DANDROID_ABI=arm64-v8a ../dev
make
make test
make install

Using C++ Dependency Managers

Using via vcpkg

To use SObjectizer via vcpkg it is necessary to do the following steps.

Install sobjectizer package:

vcpkg install sobjectizer

Add the following lines into your CMakeLists.txt file:

find_package(sobjectizer CONFIG REQUIRED)
target_link_libraries(your_target sobjectizer::SharedLib) # or sobjectizer::StaticLib

Using via Conan

Installing SObjectizer And Adding It To conanfile.txt

To use SObjectizer via Conan it is necessary to do the following steps:

Add the corresponding remote to your conan:

conan remote add stiffstream https://api.bintray.com/conan/stiffstream/public

Add SObjectizer to conanfile.txt of your project:

[requires]
sobjectizer/[email protected]/stable

It also may be necessary to specify shared option for SObjectizer. For example, for build SObjectizer as a static library:

[options]
sobjectizer:shared=False

Install dependencies for your project:

conan install SOME_PATH --build=missing

Adding SObjectizer To Your CMakeLists.txt

Please note that SObjectizer should be added to your CMakeLists.txt via find_package command:

...
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()

find_package(sobjectizer CONFIG REQUIRED)
...
target_link_libraries(your_target sobjectizer::SharedLib) # Or sobjectizer::StaticLib

License

SObjectizer is distributed under 3-clause BSD license. For license information please see LICENSE file.

Comments
  • Agent shutdown

    Agent shutdown

    Hi, I have a simple question: is it possible to stop an agent from inside of that agent? In other words, can an agent signal to the environment that it wants to be stopped/terminated/finished in a normal way?

    so_deregister_agent_coop_normally() and so_deregister_agent_coop() are close to what I need, however such functions deregister the entire cooperation. My current approach is based on parent-child relations:

    • create a "root" cooperation
    • for each agent, create a child cooperation and register the agent at that
    • when an agent needs to be deregistered, call so_deregister_agent_coop_normally() (this will result in shutting down that agent only)

    Is that a good approach?

    Just to give you an example of why I need this: I have an agent that communicates with a child process. When the child process sends back a particular data or it just terminates, that agent must be stopped.

    Many thanks.

    PS I hope to see you in 1-hour at the meetup :)

    opened by ilpropheta 25
  • Best practices to get a timeout?

    Best practices to get a timeout?

    Hi everyone, my adventures with SObjectizer continue and I am very happy with it. I have a question that I suppose is related to agent states, but I need your expert advise.

    Suppose I have an agent that subscribes to a mailbox of images (e.g. OpenCV Mat) and shows them in real-time with OpenCV's imshow (it's just for troubleshooting and not intended for production):

    //                                                   v-- this is by value for simplicity
    so_subscribe(channel).event([](cv::Mat image) {
         imshow("Live Show", image);
         cv::waitKey(1);
    });
    

    The problem with this code is that not receiving images for a some time causes the window to hang (the behavior is intended because of cv::waitKey). In another implementation (not based on SObjectizer) I have a way to get into another lambda when the channel does not get data for a specified amount of time.

    Imagine something like this:

    so_subscribe(channel)
               .event([](cv::Mat image) {
                    imshow("Live Show", image);
                    cv::waitKey(1);
               })
               .on_timeout(200ms, []{
                    cv::destroyAllWindows();
               });
    

    What is the best way to get something like this in SObjectizer?

    Many thanks!

    Marco

    opened by ilpropheta 18
  • Mbox with contention?

    Mbox with contention?

    Hi, I hope it's the right place for asking questions. Please, forgive me if this question is silly, I am learning the basics of SObjectizer.

    I have understood that a Mbox enables to dispatch messages to several consumers. As explained here, every consumer receives a copy of each message.

    Keeping N consumers, I am wondering if it's possible to have, instead, that every message is received and processed exactly by one consumer, the "first that is free" (clearly, this is a simplification). This way we have contention on the Mbox. It's the classical pattern "N jobs, M workers".

    How can I properly model this pattern with SObjectizer?

    Many thanks,

    Marco

    opened by ilpropheta 15
  • Handling any message type

    Handling any message type

    Hi, I have a very simple question: is it possible to make a subscription handler that will be called regardless of the message type?

    Something like this:

    so_subscribe(board_).event([](any_message_t m){
       // do something
    });
    
    // ...
    send<int>(board_, 5); // handler called
    send<MyType>(board_, MyType{}); // handler called
    // ...
    

    What's the recommended practice here?

    Thanks.

    opened by ilpropheta 12
  • Synchronous shutdown

    Synchronous shutdown

    I experiment with switching to sobjectizer in an existing project. By now, the objects in this project get created and destroyed dynamically and they have internal threading (event loops) that get shutdown and waited for in the destructor.

    I removed those internal threads and replaced them by companion actors that do the event handling. That works, but I want to make sure that an actor has actually shut down, before finishing the destructor. From what I see, this is currently not happening in a thread safe manor...

    Example code

    
    class MyClassCompanion : public so_5::agent_t {
        MyClass& _parent;
        // to things with parent on events
    };
    
    
    class MyClass {
        MyClassCompanion* _actor;
        MyClass() {
                    so_5::introduce_child_coop(env.get_root_coop(), [&](so_5::coop_t& coop) {
                        _actor = coop.make_agent<MyClassCompanion>(*this);
                    });
        }
    
        ~MyClass() {
              _actor->so_deregister_agent_coop_normally();
        }
    };
    

    When I run valgrind on my solution I see, that sometimes the actor is still accessing its _parent, although the destructor had finished. That leads me to the conclusion that so_deregister_agent_coop_normally is not synchronous, that after it returns something is still running...

    1. is so_deregister_agent_coop_normally thread-safe and synchronous?
    2. is the way of spawning agents in the constructor correct? is that thread-safe?
    opened by gabm 10
  • Classical endless loop?

    Classical endless loop?

    Hi, this is probably a simple question but I am wondering which are your recommended answers.

    I need to encapsulate a classical endless loop into a SObjectizer agent. Something like this:

    class Worker
    {
    public:
       Worker()
       {
         m_thread = [this]{
             while (!m_stop)
             {
                // ... do something
             }
          };
       }
    
       void StopAndWait()
       {
          m_stop = true;
          m_thread.join();
       }
    
    private:
       thread m_thread;
       atomic<bool> m_stop = false;
    };
    

    Also, I would like to remove "StopAndWait" and let SObjectizer handle shutdown automatically as it happens normally.

    I guess this can be modeled with states. Do you recommend any common implementations?

    Many thanks.

    opened by ilpropheta 7
  • The application consumes almost 100% CPU (1 thread) all the time.

    The application consumes almost 100% CPU (1 thread) all the time.

    Hi, I am using SObjectizer in our projects. I have some observation on the CPU utilisation. The following is a simple code with one thread handling messages on 2 channels. // Channel 1 -> process a request and post a message to channel 2 // Channel 2 -> process a request and post a message to channel 1 And the loop continues.

    Observation: The application consumes almost 100% CPU (1 thread) all the time.

    Query: Please help make it CPU efficient / event based? I admit I am still learning the SObjectizer library.

    Sample Code:

    #include <thread>
    #include <memory>
    #include <random>
    
    // SObjectizer is one of a few cross-platform and OpenSource "actor frameworks" for C++. 
    // But SObjectizer supports not only Actor Model, but also Publish-Subscribe Model and CSP-like channels. 
    // ver 5.7
    #include <so_5/all.hpp>
    
    // Message structure
    struct poll_device_events_request_t { };
    
    void thread_func(so_5::mchain_t primary_channel, so_5::mchain_t secondary_channel)
    {
      // The stuff necessary for random pause generation.
      std::random_device rd;
    	std::mt19937 generator{rd()};
    	std::uniform_int_distribution<> pause_generator{5000, 10000}; // Delay between 5 - 10 seconds.
      
      // This flag will be set to 'true' when some of channels will be closed.
    	bool stop = false;
      auto prepared = so_5::prepare_select(
        so_5::from_all().handle_all()
          // If some channel gets closed we should set out 'stop' flag.
          .on_close([&stop](const auto &) { stop = true; })
          // A predicate for stopping select() function.
          .stop_on([&stop]{ return stop; }),
    
        // Read and handle poll_device_events_request messages from channel_.
        so_5::receive_case(primary_channel, 
          [&](poll_device_events_request_t){
            
            std::cout << "Received message on primary channel..." << std::endl;
            // Do some work.
            
            // Generate a random pause.
            const std::chrono::milliseconds pause{pause_generator(generator)};
            
            // Send a message to secondary channel - delayed message.
            so_5::send_delayed<poll_device_events_request_t>(secondary_channel, pause);
          }
        ),
        so_5::receive_case(secondary_channel, 
          [&](poll_device_events_request_t){
            std::cout << "Received message on secondary channel..." << std::endl;
            // Do some work.
            
            // Generate a random pause for processing of that request.
            const std::chrono::milliseconds pause{pause_generator(generator)};
            
            // Delay processing of that request by using delayed message.
            so_5::send_delayed<poll_device_events_request_t>(primary_channel, pause);
          }
        )
      );
      
      while (!stop) {
        so_5::select( prepared );
      }
    }
    
    // Let's create two channels - 
    // Channel 1 -> process a request and post a message to channel 2
    // Channel 2 -> process a request and post a message to channel 1
      
    int main() {
      std::shared_ptr<so_5::wrapped_env_t> s_node_env_obj_ = std::make_shared<so_5::wrapped_env_t>();
      
      auto primary_channel_ = so_5::create_mchain(*s_node_env_obj_);
      auto primary_channel_closer = so_5::auto_close_drop_content(primary_channel_);
    	
      auto secondary_channel_ = so_5::create_mchain(*s_node_env_obj_);
      auto secondary_channel_closer = so_5::auto_close_drop_content(secondary_channel_);
      
      // Thread objects for workers.
    	std::thread nthread_ {thread_func, primary_channel_, secondary_channel_};
    	auto nthread_joiner = so_5::auto_join( nthread_ );
    
      // Initiate the first message.
      so_5::send<poll_device_events_request_t>(primary_channel_);
    
      nthread_.join();
      return 0;
    }
    
    opened by vidcentum 7
  • Any way to wait until an agent is ready to receive messages?

    Any way to wait until an agent is ready to receive messages?

    Hi, I have a very simple question. Suppose I am testing an application where I have some agents registered to a global environment. I would like to be sure that such agents are ready to get messages from channels they have subscribed to. This way I can generate fake messages and make expectations.

    At the moment, I just register them and wait - say - 10ms to be on the safe side. However, this is not portable and can fail sporadically on CI builds. In general, I don't like "sleep" of any sort.

    Is it possible to wait for such a "state" without adding custom logic? For example, agents have that ensure_binding_finished. Is this what I am looking for?

    For the moment, I can't use the experimental "testing" capabilities in SObjectizer since I have some stable code that can't be easily accommodated to the "scenario" semantics.

    Thanks!

    opened by ilpropheta 6
  • Drop oldest agent message limit?

    Drop oldest agent message limit?

    Hi everybody, speaking about message limits, I know that limit_then_drop makes the agent ignore new messages. I am wondering if it's possible to drop oldest messages somehow (as it happens with message chains' overflow policy so_5::mchain_props::overflow_reaction_t::remove_oldest).

    Many thanks!

    Marco

    opened by ilpropheta 6
  • Generic message limits for all message types

    Generic message limits for all message types

    Hello.

    I've read about message limits https://github.com/Stiffstream/sobjectizer/wiki/SO-5.7-InDepth-Message-Limits and a post on Habrahabr about collector-performer pattern.

    But I am a little bit wondered, that message limits can be defined only for specific messages. Seems like it's not possible (at least in easy way) define message limits for all message types in a mbox.

    Is there any way for doing such thing in SObjectizer? For now I see only one option: inherit all message types from some class generic_message_t and then define limit_and_X<generic_message_t> (I hope it works).

    opened by zamazan4ik 6
  • Compilation error with Clang 10

    Compilation error with Clang 10

    Hi! SObjectizer 5.7.0 cannot be compiled with Clang 10 (seems like the reason is in the updated libstdc++). I've got the following errors:

    In file included from /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.cpp:13:
    /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:427:13: error: no type named 'string_view' in namespace 'std'
            const std::string_view data_sources_name_base,
                  ~~~~~^
    /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:458:13: error: no type named 'string_view' in namespace 'std'
            const std::string_view data_sources_name_base,
                  ~~~~~^
    /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:491:37: error: no member named 'string_view' in namespace 'std'
                    return make_dispatcher( env, std::string_view{}, thread_count );
                                                 ~~~~~^
    /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:524:10: error: no member named 'string_view' in namespace 'std'
                                    std::string_view{},
                                    ~~~~~^
    In file included from /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.cpp:12:
    /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:412:13: error: no type named 'string_view' in namespace 'std'
            const std::string_view data_sources_name_base,
                  ~~~~~^
    /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:443:13: error: no type named 'string_view' in namespace 'std'
            const std::string_view data_sources_name_base,
                  ~~~~~^
    /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:476:37: error: no member named 'string_view' in namespace 'std'
                    return make_dispatcher( env, std::string_view{}, thread_count );
                                                 ~~~~~^
    3 errors generated.
    

    As far as I see you just forgot to include #include <string_view> in pub.hpp

    Hope it'll help.

    opened by zamazan4ik 6
  • Should exception_t::raise receive error_descr as string_view instead of std::string?

    Should exception_t::raise receive error_descr as string_view instead of std::string?

    Here: https://github.com/Stiffstream/sobjectizer/blob/3ac0ca9c255e72cc4a5cc657378fd456fe7cf0d1/dev/so_5/exception.cpp#L17-L21

    It seems that so_5::exception_t::raise can receive std::string_view instead of std::string.

    opened by eao197 0
  • [idea] Passing of some message_sink_t to subscribe_event_handler instead of agent_t

    [idea] Passing of some message_sink_t to subscribe_event_handler instead of agent_t

    NOTE. This idea is for the next big release of SObjectizer (probably breaking compatibility with 5.7-branch). It's unknown at the moment when work on that big release could be started.

    In the current version of SObjectizer, only agents can make subscriptions to messages passed to message boxes. It's reflected in the format of abstract_message_box_t::subscribe_event_handler method:

    //! Add the message handler.
    virtual void
    subscribe_event_handler(
    	//! Message type.
    	const std::type_index & type_index,
    	//! Optional message limit for that message type.
    	const message_limit::control_block_t * limit,
    	//! Agent-subscriber.
    	agent_t & subscriber ) = 0;
    

    This prevents some interesting tricks like the chaining of mboxes. For example, there could be a standard MPMC mbox to that some messages are sent. And an instance of so_5::extra::mboxes::round_robin mbox can be subscribed to that MPMC mbox as a subscriber. And one of subscribers to round_robin mbox can be a so_5::extra::mboxes::retained_mbox. And so on.

    It seems that there could be an abstract interface named message_sink_t that can be used as a destination for messages. Class agent_t will implement that interface. Message mboxes and message chains can implement that interface too.

    It would make possible to subscribe a mbox to another mbox. And that opens a possibility to create chains of mboxes like the one described above.

    I think it can make SObjectizer yet more flexible. But this idea is at the very early stage and needs additional thinking and experimenting.

    opened by eao197 2
  • Use different message types or different mboxes?

    Use different message types or different mboxes?

    I'm quite impressed by SObjectizer and I want to convert my application to use it. However, at the moment I'm having problems to understand the design I should follow when defining messages and mboxes:

    When should I use different message types and when should I use the same types on different mailboxes?

    My application controls a number of identical devices (the devices are not known until start-up). Most of the messages to and from the devices are of the same basic type (a triplet of real numbers), although they mean different things. My idea is to have several actors which listen to messages from all devices and do some state estimation and a few which consume both estimations and observations and act a controllers.

    In my message broker (MQTT) I have one topic per device (e.g. "device1/measured", "device1/setpoint", "device1/status") because that's the way MQTT is supposed to be used, but I don't believe it is a good idea to have one mbox per device in the application.

    I guess my options are:

    1. One generic "board" mbox, with different message subclasses to identify the topic of the message (eg, define "Measured", "Setpoint", "Status", and derive both "Measured" and "Setpoint" from the same class).
    2. One mbox per topic, use the same class for messages whose data is essentially the same (e.g. have a "Measured" and "Setpoint" mboxes, and publish the same type of objects to both).
    3. A combination of (1) and (2): several mboxes and different message classes too.

    Regards,

    Juan.

    opened by jcarrano 4
  • A possibility to subscribe a event-handler to a base type of a message

    A possibility to subscribe a event-handler to a base type of a message

    NOTE. This issue is just a reminder that SObjectizer needs a solution for use-cases like described in #24 I don't know how and when this will be implemented (and can it be implemented at all), but the reminder will help to form scope for a new SObjectizer's release when appropriate resources will be available.

    The task is to allow writing something like that:

    class image_base : public so_5::message_t {...};
    class image_vendor_A : public image_base {...};
    class image_vendor_B : public image_base {...};
    class image_vendor_C : public image_base {...};
    
    // The first handler.
    void first_handler::so_define_agent() {
      so_subscribe_self()
        // Handles only one type of images.
        .event([this](mhood_t<image_vendor_A> cmd) {...})
        // All other messages are just logged.
        .event([this](mhood_t<image_base> cmd) { log_image(*cmd); });
    }
    ...
    // Another handler.
    void second_handler::so_define_agent() {
      so_subscribe_self()
        // Handles two types.
        .event([this](mhood_t<image_vendor_B> cmd) {...})
        .event([this](mhood_t<image_vendor_C> cmd) {...})
        // All other messages are just redirected as is.
        .event([this](mhood_t<image_base> cmd) {
          so_5::send(new_dest, cmd);
        });
    }
    
    // Yet another handler.
    void third_handler::so_define_agent() {
      // Redirects all messages to another destination.
      so_subscribe_self().event([this](mhood_t<image_base> cmd) {
        so_5::send(new_dest, cmd);
      }
    }
    

    Additional requirement. It'll be great to have that functionality for handling messages from mchains too:

    so_5::receive(from(mchain).handle_all(),
      [](so_5::mhood_t<image_vendor_A> cmd) {...},
      [](so_5::mhood_t<image_vendor_C> cmd) {...},
      [new_dest](so_5::mhood_t<image_base> cmd) { so_5::send(new_dest, cmd); });
    
    opened by eao197 1
  • User payload for event handler subscription

    User payload for event handler subscription

    SObjectizer allows to prioritize agents, but often it's not enough. There are two another prioritization levels. First one is level of messages. Dispatcher can define specific class (derived from message_t) to inherit prioritized messages from and resolve message priority by this class. This is great, but may be too expensive sometimes. Another one is level of event handlers. It is powerful enough and cheap, also it is best solution for my case. Unfortunately there is no way to use that level at this moment. I had to patch SObjectizer to pass priority through subscription_bind_t::event and store it in execution_hint_t, but i wish SObjectizer had native way to do it.

    Optional argument of subscription_bind_t::event for user payload looks like suitable way that doesn't break backward compatibility. Also it may allow other possibilities for custom dispatchers.

    opened by zumm 10
  • Default dispatcher binder of child coops

    Default dispatcher binder of child coops

    There is a way to set dispatcher binder for coop. This binder will be used for all agents of that coop by default instead of the default binder from environment infrastructure. Is there a way to force child coops of that coop use this binder by default too?

    opened by zumm 3
Releases(v.5.7.4.2)
Owner
Stiffstream
We create and support OpenSource-tools to make C++ development easier
Stiffstream
A framework for decentralised production and redemption of single-use codes

Xenium Xenium is a noninteractive protocol for producing and redeeming single-use claim codes. What this means in practice is that you can have a devi

Nick Johnson 31 Sep 24, 2022
Laughably simple Actor concurrency framework for C++20

Light Actor Framework Concurrency is a breeze. Also a nightmare, if you ever used synchronization techniques. Mostly a nightmare, though. This tiny li

Josip Palavra 93 Dec 27, 2022
High Performance Linux C++ Network Programming Framework based on IO Multiplexing and Thread Pool

Kingpin is a C++ network programming framework based on TCP/IP + epoll + pthread, aims to implement a library for the high concurrent servers and clie

null 23 Oct 19, 2022
Concurrency Kit 2.1k Jan 4, 2023
A small C OpenCL wrapper

oclkit, plain and stupid OpenCL helper oclkit is a small set of C functions, to avoid writing the same OpenCL boiler plate over and over again, yet ke

Matthias Vogelgesang 15 Jul 22, 2022
Small library helping you with basic stuff like getting metrics out of your code, thread naming, etc.

CommonPP commonpp is a multi purpose library easing very few operations like: Getting metrics out of your program (counter, gauge, statistical descrip

Thomas Sanchez 28 Oct 31, 2022
Bolt is a C++ template library optimized for GPUs. Bolt provides high-performance library implementations for common algorithms such as scan, reduce, transform, and sort.

Bolt is a C++ template library optimized for heterogeneous computing. Bolt is designed to provide high-performance library implementations for common

null 360 Dec 27, 2022
Kokkos C++ Performance Portability Programming EcoSystem: The Programming Model - Parallel Execution and Memory Abstraction

Kokkos: Core Libraries Kokkos Core implements a programming model in C++ for writing performance portable applications targeting all major HPC platfor

Kokkos 1.2k Jan 5, 2023
A C++17 thread pool for high-performance scientific computing.

We present a modern C++17-compatible thread pool implementation, built from scratch with high-performance scientific computing in mind. The thread pool is implemented as a single lightweight and self-contained class, and does not have any dependencies other than the C++17 standard library, thus allowing a great degree of portability

Barak Shoshany 1.1k Jan 4, 2023
C++-based high-performance parallel environment execution engine for general RL environments.

EnvPool is a highly parallel reinforcement learning environment execution engine which significantly outperforms existing environment executors. With

Sea AI Lab 709 Dec 30, 2022
Thread-pool-cpp - High performance C++11 thread pool

thread-pool-cpp It is highly scalable and fast. It is header only. No external dependencies, only standard library needed. It implements both work-ste

Andrey Kubarkov 542 Dec 17, 2022
Async++ concurrency framework for C++11

Async++ Async++ is a lightweight concurrency framework for C++11. The concept was inspired by the Microsoft PPL library and the N3428 C++ standard pro

Amanieu d'Antras 1.1k Dec 30, 2022
A task scheduling framework designed for the needs of game developers.

Intel Games Task Scheduler (GTS) To the documentation. Introduction GTS is a C++ task scheduling framework for multi-processor platforms. It is design

null 424 Jan 3, 2023
Elle - The Elle coroutine-based asynchronous C++ development framework.

Elle, the coroutine-based asynchronous C++ development framework Elle is a collection of libraries, written in modern C++ (C++14). It contains a rich

Infinit 466 Jan 1, 2023
KRATOS Multiphysics ("Kratos") is a framework for building parallel, multi-disciplinary simulation software

KRATOS Multiphysics ("Kratos") is a framework for building parallel, multi-disciplinary simulation software, aiming at modularity, extensibility, and high performance. Kratos is written in C++, and counts with an extensive Python interface.

KratosMultiphysics 774 Dec 29, 2022
Multi-backend implementation of SYCL for CPUs and GPUs

hipSYCL - a SYCL implementation for CPUs and GPUs hipSYCL is a modern SYCL implementation targeting CPUs and GPUs, with a focus on leveraging existing

Aksel Alpay 609 Dec 26, 2022
Arcana.cpp - Arcana.cpp is a collection of helpers and utility code for low overhead, cross platform C++ implementation of task-based asynchrony.

Arcana.cpp Arcana is a collection of general purpose C++ utilities with no code that is specific to a particular project or specialized technology are

Microsoft 67 Nov 23, 2022
Parallel-util - Simple header-only implementation of "parallel for" and "parallel map" for C++11

parallel-util A single-header implementation of parallel_for, parallel_map, and parallel_exec using C++11. This library is based on multi-threading on

Yuki Koyama 27 Jun 24, 2022
Fast, generalized, implementation of the Chase-Lev lock-free work-stealing deque for C++17

riften::Deque A bleeding-edge lock-free, single-producer multi-consumer, Chase-Lev work stealing deque as presented in the paper "Dynamic Circular Wor

Conor Williams 120 Dec 22, 2022