As the documentation states: when I disconnect a slot, then there is a chance that an emission is running from another thread. This makes me some trouble with my application, because after disconnection I cannot be sure that my callback object is really not used any more.
I did not find (or overlooked) any solution in signals2. So I came up with a special scoped_connection, where I inject/capture a RAII tracker object into the slot (see the template constructor PostEmissionSafeConnection), so that in the connection I know when all slot copies are gone. The RAII tracker object (here: HandlerTrace) manipulates an atomic reference counter (TraceCounter) to keep track of the slot copies. By the way it was interesting to see how many copies of the slot object is created during registration (via the slot counter) - maybe that can be optimized too?
I wonder if there is a better solution to that or this implementation sketch can evolve into the signals2 library. I used here C++20, but it can be relaxed.
What do you think about this idea?
#include <latch>
#include <future>
#include <thread>
#include <chrono>
#include <cassert>
class PostEmissionSafeConnection : public boost::signals2::scoped_connection
{
public:
template <typename TSignal, typename THandler>
PostEmissionSafeConnection(TSignal &signal, const THandler &handler)
: counter_(std::make_unique<TraceCounter>())
{
auto trackedHandler = [handlerTrace = HandlerTrace(*(counter_.get())), handler] { handler(); };
boost::signals2::scoped_connection::operator=(signal.connect(std::move(trackedHandler)));
}
PostEmissionSafeConnection(const PostEmissionSafeConnection &) = delete;
PostEmissionSafeConnection &operator=(const PostEmissionSafeConnection &) = delete;
PostEmissionSafeConnection(PostEmissionSafeConnection &&other)
: boost::signals2::scoped_connection::scoped_connection(std::move(other))
, counter_(std::move(other.counter_))
{ }
PostEmissionSafeConnection &operator=(PostEmissionSafeConnection &&other)
{
Release();
boost::signals2::scoped_connection::operator=(std::move(other));
counter_ = std::move(other.counter_);
return *this;
}
~PostEmissionSafeConnection() { Release(); }
// The base class methods can be used
// disconnect(), connected(), blocked() etc.
// The only thing is: this class blocks at the destructor until all handlers are released
private:
// A shared counter, which tracks the living Trace instances
// NOTE: This object is on the heap. This pins down its address, so the Trace-es can refer to it via
// a C++ reference. The Connection can be moved, because that also refers to this fixed address counter.
struct TraceCounter
{
void RegisterCreation()
{
auto currentTraceCount = traceCount_.load();
assert(traceCount_ == 0);
for (;;)
{
auto incrementedTraceCount = currentTraceCount + 1;
const bool exchanged = traceCount_.compare_exchange_strong(
currentTraceCount, incrementedTraceCount, std::memory_order_release, std::memory_order_relaxed);
if (exchanged)
break;
}
}
void RegisterCopy()
{
auto currentTraceCount = traceCount_.load();
assert(currentTraceCount > 0);
for (;;)
{
auto incrementedTraceCount = currentTraceCount + 1;
const bool exchanged = traceCount_.compare_exchange_strong(
currentTraceCount, incrementedTraceCount, std::memory_order_release, std::memory_order_relaxed);
if (exchanged)
break;
}
}
void UnRegister()
{
auto currentTraceCount = traceCount_.load();
assert(currentTraceCount > 0);
for (;;)
{
auto decrementedTraceCount = currentTraceCount - 1;
const bool exchanged = traceCount_.compare_exchange_strong(
currentTraceCount, decrementedTraceCount, std::memory_order_release, std::memory_order_relaxed);
if (exchanged)
break;
}
}
void WaitForLastHandlerTrace()
{
for (;;)
{
auto currentTraceCount = traceCount_.load();
if (currentTraceCount == 0)
break;
using namespace std::literals;
std::this_thread::sleep_for(1ms);
}
}
std::atomic<std::size_t> traceCount_ = 0;
};
// RAII object captured in the handler, so that we can keep track of the living handlers
struct HandlerTrace
{
HandlerTrace() = delete;
HandlerTrace(TraceCounter &counter)
: counter_(&counter)
{
counter_->RegisterCreation();
}
HandlerTrace(const HandlerTrace &other)
: counter_(other.counter_)
{
counter_->RegisterCopy();
}
HandlerTrace &operator=(const HandlerTrace &other)
{
counter_ = other.counter_;
counter_->RegisterCopy();
return *this;
}
HandlerTrace(HandlerTrace &&other)
: counter_(other.counter_)
{
other.counter_ = nullptr;
}
HandlerTrace &operator=(HandlerTrace &&other)
{
counter_ = other.counter_;
other.counter_ = nullptr;
return *this;
}
~HandlerTrace()
{
if (counter_)
counter_->UnRegister();
}
TraceCounter *counter_ = nullptr;
};
void Release()
{
if (!counter_)
return;
disconnect();
counter_->WaitForLastHandlerTrace();
counter_.reset();
}
std::unique_ptr<TraceCounter> counter_;
};
TEST(SignalTest, PostUnsubscriptionEmission)
{
// The handler must outlive all traces
boost::signals2::signal<void()> signal;
std::latch latch(2);
PostEmissionSafeConnection connection(signal, [&latch] {
latch.count_down();
using namespace std::literals;
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(2s);
});
std::thread signalSenderThread([&signal] { signal(); });
signalSenderThread.detach();
latch.arrive_and_wait();
connection.disconnect();
// the connection blocks its destruction until all handler instances are destroyed.
}