15 #ifndef FAIR_MQ_SHMEM_MANAGER_H_
16 #define FAIR_MQ_SHMEM_MANAGER_H_
22 #include <FairMQLogger.h>
23 #include <FairMQMessage.h>
24 #include <fairmq/ProgOptions.h>
25 #include <fairmq/tools/Strings.h>
27 #include <boost/date_time/posix_time/posix_time.hpp>
28 #include <boost/filesystem.hpp>
29 #include <boost/interprocess/ipc/message_queue.hpp>
30 #include <boost/interprocess/managed_shared_memory.hpp>
31 #include <boost/interprocess/sync/named_condition.hpp>
32 #include <boost/interprocess/sync/named_mutex.hpp>
33 #include <boost/process.hpp>
34 #include <boost/variant.hpp>
37 #include <condition_variable>
45 #include <unordered_map>
57 Manager(std::string shmId, std::string deviceId,
size_t size,
const ProgOptions* config)
58 : fShmId(std::move(shmId))
59 , fSegmentId(config ? config->GetProperty<uint16_t>(
"shm-segment-id", 0) : 0)
60 , fDeviceId(std::move(deviceId))
62 , fManagementSegment(boost::interprocess::open_or_create, std::string(
"fmq_" + fShmId +
"_mng").c_str(), 6553600)
63 , fShmVoidAlloc(fManagementSegment.get_segment_manager())
64 , fShmMtx(boost::interprocess::open_or_create, std::string(
"fmq_" + fShmId +
"_mtx").c_str())
65 , fRegionEventsCV(boost::interprocess::open_or_create, std::string(
"fmq_" + fShmId +
"_cv").c_str())
66 , fRegionEventsSubscriptionActive(false)
67 , fNumObservedEvents(0)
68 , fDeviceCounter(nullptr)
69 , fEventCounter(nullptr)
70 , fShmSegments(nullptr)
71 , fShmRegions(nullptr)
74 #ifdef FAIRMQ_DEBUG_MODE
76 , fShmMsgCounters(nullptr)
79 , fSendHeartbeats(true)
80 , fThrowOnBadAlloc(config ? config->GetProperty<bool>(
"shm-throw-bad-alloc", true) : true)
81 , fNoCleanup(config ? config->GetProperty<bool>(
"shm-no-cleanup", false) : false)
83 using namespace boost::interprocess;
85 bool mlockSegment =
false;
86 bool zeroSegment =
false;
87 bool autolaunchMonitor =
false;
88 std::string allocationAlgorithm(
"rbtree_best_fit");
90 mlockSegment = config->GetProperty<
bool>(
"shm-mlock-segment", mlockSegment);
91 zeroSegment = config->GetProperty<
bool>(
"shm-zero-segment", zeroSegment);
92 autolaunchMonitor = config->GetProperty<
bool>(
"shm-monitor", autolaunchMonitor);
93 allocationAlgorithm = config->GetProperty<std::string>(
"shm-allocation", allocationAlgorithm);
95 LOG(debug) <<
"ProgOptions not available! Using defaults.";
98 if (autolaunchMonitor) {
103 std::stringstream ss;
104 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
106 fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
108 fEventCounter = fManagementSegment.find<
EventCounter>(unique_instance).first;
111 LOG(debug) <<
"event counter found: " << fEventCounter->fCount;
113 LOG(debug) <<
"no event counter found, creating one and initializing with 0";
114 fEventCounter = fManagementSegment.construct<
EventCounter>(unique_instance)(0);
115 LOG(debug) <<
"initialized event counter with: " << fEventCounter->fCount;
119 auto it = fShmSegments->find(fSegmentId);
120 if (it == fShmSegments->end()) {
122 if (allocationAlgorithm ==
"rbtree_best_fit") {
123 fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string(
"fmq_" + fShmId +
"_m_" + std::to_string(fSegmentId)).c_str(), size));
124 fShmSegments->emplace(fSegmentId, AllocationAlgorithm::rbtree_best_fit);
125 }
else if (allocationAlgorithm ==
"simple_seq_fit") {
126 fSegments.emplace(fSegmentId, SimpleSeqFitSegment(create_only, std::string(
"fmq_" + fShmId +
"_m_" + std::to_string(fSegmentId)).c_str(), size));
127 fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
130 (fEventCounter->fCount)++;
133 if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
134 fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string(
"fmq_" + fShmId +
"_m_" + std::to_string(fSegmentId)).c_str()));
135 if (allocationAlgorithm !=
"rbtree_best_fit") {
136 LOG(warn) <<
"Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm <<
". Ignoring requested setting.";
137 allocationAlgorithm =
"rbtree_best_fit";
140 fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_only, std::string(
"fmq_" + fShmId +
"_m_" + std::to_string(fSegmentId)).c_str()));
141 if (allocationAlgorithm !=
"simple_seq_fit") {
142 LOG(warn) <<
"Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm <<
". Ignoring requested setting.";
143 allocationAlgorithm =
"simple_seq_fit";
148 ss <<
"shared memory segment '" <<
"fmq_" << fShmId <<
"_m_" << fSegmentId <<
"'."
149 <<
" Size: " << boost::apply_visitor(
SegmentSize{}, fSegments.at(fSegmentId)) <<
" bytes."
150 <<
" Available: " << boost::apply_visitor(
SegmentFreeMemory{}, fSegments.at(fSegmentId)) <<
" bytes."
151 <<
" Allocation algorithm: " << allocationAlgorithm;
152 LOG(debug) << ss.str();
153 }
catch(interprocess_exception& bie) {
154 LOG(error) <<
"Failed to create/open shared memory segment (" <<
"fmq_" << fShmId <<
"_m_" << fSegmentId <<
"): " << bie.what();
155 throw std::runtime_error(tools::ToString(
"Failed to create/open shared memory segment (",
"fmq_", fShmId,
"_m_", fSegmentId,
"): ", bie.what()));
159 LOG(debug) <<
"Locking the managed segment memory pages...";
160 if (mlock(boost::apply_visitor(
SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(
SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
161 LOG(error) <<
"Could not lock the managed segment memory. Code: " << errno <<
", reason: " << strerror(errno);
163 LOG(debug) <<
"Successfully locked the managed segment memory pages.";
166 LOG(debug) <<
"Zeroing the managed segment free memory...";
168 LOG(debug) <<
"Successfully zeroed the managed segment free memory.";
171 fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
173 fDeviceCounter = fManagementSegment.find<
DeviceCounter>(unique_instance).first;
175 if (fDeviceCounter) {
176 LOG(debug) <<
"device counter found, with value of " << fDeviceCounter->fCount <<
". incrementing.";
177 (fDeviceCounter->fCount)++;
178 LOG(debug) <<
"incremented device counter, now: " << fDeviceCounter->fCount;
180 LOG(debug) <<
"no device counter found, creating one and initializing with 1";
181 fDeviceCounter = fManagementSegment.construct<
DeviceCounter>(unique_instance)(1);
182 LOG(debug) <<
"initialized device counter with: " << fDeviceCounter->fCount;
185 #ifdef FAIRMQ_DEBUG_MODE
186 fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
187 fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
191 fHeartbeatThread = std::thread(&Manager::SendHeartbeats,
this);
199 static void StartMonitor(
const std::string&
id)
201 using namespace boost::interprocess;
203 named_mutex monitorStatus(open_only, std::string(
"fmq_" +
id +
"_ms").c_str());
204 LOG(debug) <<
"Found fairmq-shmmonitor for shared memory id " << id;
205 }
catch (interprocess_exception&) {
206 LOG(debug) <<
"no fairmq-shmmonitor found for shared memory id " <<
id <<
", starting...";
207 auto env = boost::this_process::environment();
209 std::vector<boost::filesystem::path> ownPath = boost::this_process::path();
211 if (
const char* fmqp = getenv(
"FAIRMQ_PATH")) {
212 ownPath.insert(ownPath.begin(), boost::filesystem::path(fmqp));
215 boost::filesystem::path p = boost::process::search_path(
"fairmq-shmmonitor", ownPath);
218 boost::process::spawn(p,
"-x",
"--shmid",
id,
"-d",
"-t",
"2000", env);
222 named_mutex monitorStatus(open_only, std::string(
"fmq_" +
id +
"_ms").c_str());
223 LOG(debug) <<
"Started fairmq-shmmonitor for shared memory id " << id;
225 }
catch (interprocess_exception&) {
226 std::this_thread::sleep_for(std::chrono::milliseconds(10));
227 if (++numTries > 1000) {
228 LOG(error) <<
"Did not get response from fairmq-shmmonitor after " << 10 * 1000 <<
" milliseconds. Exiting.";
229 throw std::runtime_error(tools::ToString(
"Did not get response from fairmq-shmmonitor after ", 10 * 1000,
" milliseconds. Exiting."));
234 LOG(warn) <<
"could not find fairmq-shmmonitor in the path";
239 void Interrupt() { fInterrupted.store(
true); }
240 void Resume() { fInterrupted.store(
false); }
243 if (fMsgCounter.load() != 0) {
244 LOG(error) <<
"Message counter during Reset expected to be 0, found: " << fMsgCounter.load();
245 throw MessageError(tools::ToString(
"Message counter during Reset expected to be 0, found: ", fMsgCounter.load()));
248 bool Interrupted() {
return fInterrupted.load(); }
250 std::pair<boost::interprocess::mapped_region*, uint16_t> CreateRegion(
const size_t size,
251 const int64_t userFlags,
252 RegionCallback callback,
253 RegionBulkCallback bulkCallback,
254 const std::string& path =
"",
257 using namespace boost::interprocess;
259 std::pair<mapped_region*, uint16_t> result;
263 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
265 RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
268 LOG(debug) <<
"region counter found, with value of " << rc->fCount <<
". incrementing.";
270 LOG(debug) <<
"incremented region counter, now: " << rc->fCount;
272 LOG(debug) <<
"no region counter found, creating one and initializing with 1";
273 rc = fManagementSegment.construct<RegionCounter>(unique_instance)(1);
274 LOG(debug) <<
"initialized region counter with: " << rc->fCount;
279 auto it = fRegions.find(
id);
280 if (it != fRegions.end()) {
281 LOG(error) <<
"Trying to create a region that already exists";
282 return {
nullptr,
id};
286 fShmRegions->emplace(
id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
288 auto r = fRegions.emplace(
id, std::make_unique<Region>(fShmId,
id, size,
false, callback, bulkCallback, path, flags));
291 r.first->second->StartReceivingAcks();
292 result.first = &(r.first->second->fRegion);
295 (fEventCounter->fCount)++;
297 fRegionEventsCV.notify_all();
301 }
catch (interprocess_exception& e) {
302 LOG(error) <<
"cannot create region. Already created/not cleaned up?";
303 LOG(error) << e.what();
308 Region* GetRegion(
const uint16_t
id)
310 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
311 return GetRegionUnsafe(
id);
314 Region* GetRegionUnsafe(
const uint16_t
id)
317 auto it = fRegions.find(
id);
318 if (it != fRegions.end()) {
319 return it->second.get();
323 RegionInfo regionInfo = fShmRegions->at(
id);
324 std::string path = regionInfo.fPath.c_str();
325 int flags = regionInfo.fFlags;
328 auto r = fRegions.emplace(
id, std::make_unique<Region>(fShmId,
id, 0,
true,
nullptr,
nullptr, path, flags));
329 return r.first->second.get();
330 }
catch (std::out_of_range& oor) {
331 LOG(error) <<
"Could not get remote region with id '" <<
id <<
"'. Does the region creator run with the same session id?";
332 LOG(error) << oor.what();
334 }
catch (boost::interprocess::interprocess_exception& e) {
335 LOG(warn) <<
"Could not get remote region for id '" <<
id <<
"'";
341 void RemoveRegion(
const uint16_t
id)
345 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
346 fShmRegions->at(
id).fDestroyed =
true;
347 (fEventCounter->fCount)++;
349 fRegionEventsCV.notify_all();
352 std::vector<fair::mq::RegionInfo> GetRegionInfo()
354 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
355 return GetRegionInfoUnsafe();
358 std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe()
360 std::vector<fair::mq::RegionInfo> result;
362 for (
const auto& e : *fShmRegions) {
364 info.managed =
false;
366 info.flags = e.second.fUserFlags;
367 info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
368 if (!e.second.fDestroyed) {
369 auto region = GetRegionUnsafe(info.id);
370 info.ptr = region->fRegion.get_address();
371 info.size = region->fRegion.get_size();
376 result.push_back(info);
379 for (
const auto& e : *fShmSegments) {
386 info.event = RegionEvent::created;
387 info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
388 info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
389 result.push_back(info);
390 }
catch (
const std::out_of_range& oor) {
391 LOG(error) <<
"could not find segment with id " << e.first;
392 LOG(error) << oor.what();
399 void SubscribeToRegionEvents(RegionEventCallback callback)
401 if (fRegionEventThread.joinable()) {
402 LOG(debug) <<
"Already subscribed. Overwriting previous subscription.";
403 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
404 fRegionEventsSubscriptionActive =
false;
406 fRegionEventsCV.notify_all();
407 fRegionEventThread.join();
409 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
410 fRegionEventCallback = callback;
411 fRegionEventsSubscriptionActive =
true;
412 fRegionEventThread = std::thread(&Manager::RegionEventsSubscription,
this);
415 bool SubscribedToRegionEvents() {
return fRegionEventThread.joinable(); }
417 void UnsubscribeFromRegionEvents()
419 if (fRegionEventThread.joinable()) {
420 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
421 fRegionEventsSubscriptionActive =
false;
423 fRegionEventsCV.notify_all();
424 fRegionEventThread.join();
426 fRegionEventCallback =
nullptr;
430 void RegionEventsSubscription()
432 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
433 while (fRegionEventsSubscriptionActive) {
434 auto infos = GetRegionInfoUnsafe();
435 for (
const auto& i : infos) {
436 auto el = fObservedRegionEvents.find({i.id, i.managed});
437 if (el == fObservedRegionEvents.end()) {
438 fRegionEventCallback(i);
439 fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
440 ++fNumObservedEvents;
442 if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
443 fRegionEventCallback(i);
444 el->second = i.event;
445 ++fNumObservedEvents;
453 fRegionEventsCV.wait(lock, [&] {
return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
457 void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
458 void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
460 #ifdef FAIRMQ_DEBUG_MODE
461 void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
462 void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
465 boost::interprocess::named_mutex& GetMtx() {
return fShmMtx; }
467 void SendHeartbeats()
469 std::string controlQueueName(
"fmq_" + fShmId +
"_cq");
470 std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
471 while (fSendHeartbeats) {
473 boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str());
474 boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100);
475 if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) {
476 fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() {
return !fSendHeartbeats; });
478 LOG(debug) <<
"control queue timeout";
480 }
catch (boost::interprocess::interprocess_exception& ie) {
481 fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(500), [&]() {
return !fSendHeartbeats; });
487 bool ThrowingOnBadAlloc()
const {
return fThrowOnBadAlloc; }
489 void GetSegment(uint16_t
id)
491 auto it = fSegments.find(
id);
492 if (it == fSegments.end()) {
495 SegmentInfo segmentInfo = fShmSegments->at(
id);
496 LOG(debug) <<
"Located segment with id '" <<
id <<
"'";
498 using namespace boost::interprocess;
500 if (segmentInfo.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
501 fSegments.emplace(
id, RBTreeBestFitSegment(open_only, std::string(
"fmq_" + fShmId +
"_m_" + std::to_string(
id)).c_str()));
503 fSegments.emplace(
id, SimpleSeqFitSegment(open_only, std::string(
"fmq_" + fShmId +
"_m_" + std::to_string(
id)).c_str()));
505 }
catch (std::out_of_range& oor) {
506 LOG(error) <<
"Could not get segment with id '" <<
id <<
"': " << oor.what();
507 }
catch (boost::interprocess::interprocess_exception& bie) {
508 LOG(error) <<
"Could not get segment with id '" <<
id <<
"': " << bie.what();
513 boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(
const void* ptr, uint16_t segmentId)
const
515 return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(segmentId));
517 void* GetAddressFromHandle(
const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId)
const
519 return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(segmentId));
522 char* Allocate(
const size_t size,
size_t alignment = 0)
527 while (ptr ==
nullptr) {
532 size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId));
533 if (size > segmentSize) {
534 throw MessageBadAlloc(tools::ToString(
"Requested message size (", size,
") exceeds segment size (", segmentSize,
")"));
536 if (alignment == 0) {
537 ptr =
reinterpret_cast<char*
>(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(fSegmentId)));
539 ptr =
reinterpret_cast<char*
>(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(fSegmentId)));
541 }
catch (boost::interprocess::bad_alloc& ba) {
543 if (ThrowingOnBadAlloc()) {
544 throw MessageBadAlloc(tools::ToString(
"shmem: could not create a message of size ", size,
", alignment: ", (alignment != 0) ? std::to_string(alignment) :
"default",
", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId))));
547 std::this_thread::sleep_for(std::chrono::milliseconds(50));
554 #ifdef FAIRMQ_DEBUG_MODE
555 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
556 IncrementShmMsgCounter(fSegmentId);
557 if (fMsgDebug->count(fSegmentId) == 0) {
558 (*fMsgDebug).emplace(fSegmentId, fShmVoidAlloc);
560 (*fMsgDebug).at(fSegmentId).emplace(
561 static_cast<size_t>(GetHandleFromAddress(ptr, fSegmentId)),
562 MsgDebug(getpid(), size, std::chrono::system_clock::now().time_since_epoch().count())
570 void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId)
572 boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId));
573 #ifdef FAIRMQ_DEBUG_MODE
574 boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
575 DecrementShmMsgCounter(segmentId);
577 (*fMsgDebug).at(segmentId).erase(handle);
578 }
catch(
const std::out_of_range& oor) {
579 LOG(debug) <<
"could not locate debug container for " << segmentId <<
": " << oor.what();
584 char* ShrinkInPlace(
size_t newSize,
char* localPtr, uint16_t segmentId)
586 return boost::apply_visitor(SegmentBufferShrink{newSize, localPtr}, fSegments.at(segmentId));
589 uint16_t GetSegmentId()
const {
return fSegmentId; }
593 using namespace boost::interprocess;
594 bool lastRemoved =
false;
596 UnsubscribeFromRegionEvents();
599 std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
600 fSendHeartbeats =
false;
602 fHeartbeatsCV.notify_one();
603 if (fHeartbeatThread.joinable()) {
604 fHeartbeatThread.join();
608 boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
610 (fDeviceCounter->fCount)--;
612 if (fDeviceCounter->fCount == 0) {
613 LOG(debug) <<
"Last segment user, " << (fNoCleanup ?
"skipping removal (--shm-no-cleanup is true)." :
"removing segment.");
616 LOG(debug) <<
"Other segment users present (" << fDeviceCounter->fCount <<
"), skipping removal.";
618 }
catch (interprocess_exception& e) {
619 LOG(error) <<
"Manager could not acquire lock: " << e.what();
622 if (lastRemoved && !fNoCleanup) {
630 std::string fDeviceId;
631 std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
632 boost::interprocess::managed_shared_memory fManagementSegment;
633 VoidAlloc fShmVoidAlloc;
634 boost::interprocess::named_mutex fShmMtx;
636 boost::interprocess::named_condition fRegionEventsCV;
637 std::thread fRegionEventThread;
638 bool fRegionEventsSubscriptionActive;
640 std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
641 uint64_t fNumObservedEvents;
643 DeviceCounter* fDeviceCounter;
644 EventCounter* fEventCounter;
645 Uint16SegmentInfoHashMap* fShmSegments;
646 Uint16RegionInfoHashMap* fShmRegions;
647 std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
649 std::atomic<bool> fInterrupted;
650 std::atomic<int32_t> fMsgCounter;
651 #ifdef FAIRMQ_DEBUG_MODE
652 Uint16MsgDebugMapHashMap* fMsgDebug;
653 Uint16MsgCounterHashMap* fShmMsgCounters;
656 std::thread fHeartbeatThread;
657 bool fSendHeartbeats;
658 std::mutex fHeartbeatsMtx;
659 std::condition_variable fHeartbeatsCV;
661 bool fThrowOnBadAlloc;