FairMQ  1.4.33
C++ Message Queuing Library and Framework
Manager.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIR_MQ_SHMEM_MANAGER_H_
16 #define FAIR_MQ_SHMEM_MANAGER_H_
17 
18 #include "Common.h"
19 #include "Region.h"
20 #include "Monitor.h"
21 
22 #include <FairMQLogger.h>
23 #include <FairMQMessage.h>
24 #include <fairmq/ProgOptions.h>
25 #include <fairmq/tools/Strings.h>
26 
27 #include <boost/date_time/posix_time/posix_time.hpp>
28 #include <boost/filesystem.hpp>
29 #include <boost/interprocess/ipc/message_queue.hpp>
30 #include <boost/interprocess/managed_shared_memory.hpp>
31 #include <boost/interprocess/sync/named_condition.hpp>
32 #include <boost/interprocess/sync/named_mutex.hpp>
33 #include <boost/process.hpp>
34 #include <boost/variant.hpp>
35 
36 #include <cstdlib> // getenv
37 #include <condition_variable>
38 #include <memory> // make_unique
39 #include <mutex>
40 #include <set>
41 #include <sstream>
42 #include <stdexcept>
43 #include <string>
44 #include <thread>
45 #include <unordered_map>
46 #include <utility> // pair
47 #include <vector>
48 
49 #include <sys/mman.h> // mlock
50 
51 namespace fair::mq::shmem
52 {
53 
54 class Manager
55 {
56  public:
57  Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
58  : fShmId(std::move(shmId))
59  , fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
60  , fDeviceId(std::move(deviceId))
61  , fSegments()
62  , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
63  , fShmVoidAlloc(fManagementSegment.get_segment_manager())
64  , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
65  , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
66  , fRegionEventsSubscriptionActive(false)
67  , fNumObservedEvents(0)
68  , fDeviceCounter(nullptr)
69  , fEventCounter(nullptr)
70  , fShmSegments(nullptr)
71  , fShmRegions(nullptr)
72  , fInterrupted(false)
73  , fMsgCounter(0)
74 #ifdef FAIRMQ_DEBUG_MODE
75  , fMsgDebug(nullptr)
76  , fShmMsgCounters(nullptr)
77 #endif
78  , fHeartbeatThread()
79  , fSendHeartbeats(true)
80  , fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
81  , fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
82  {
83  using namespace boost::interprocess;
84 
85  bool mlockSegment = false;
86  bool zeroSegment = false;
87  bool autolaunchMonitor = false;
88  std::string allocationAlgorithm("rbtree_best_fit");
89  if (config) {
90  mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
91  zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
92  autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
93  allocationAlgorithm = config->GetProperty<std::string>("shm-allocation", allocationAlgorithm);
94  } else {
95  LOG(debug) << "ProgOptions not available! Using defaults.";
96  }
97 
98  if (autolaunchMonitor) {
99  StartMonitor(fShmId);
100  }
101 
102  {
103  std::stringstream ss;
104  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
105 
106  fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
107 
108  fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
109 
110  if (fEventCounter) {
111  LOG(debug) << "event counter found: " << fEventCounter->fCount;
112  } else {
113  LOG(debug) << "no event counter found, creating one and initializing with 0";
114  fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0);
115  LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
116  }
117 
118  try {
119  auto it = fShmSegments->find(fSegmentId);
120  if (it == fShmSegments->end()) {
121  // no segment with given id exists, creating
122  if (allocationAlgorithm == "rbtree_best_fit") {
123  fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size));
124  fShmSegments->emplace(fSegmentId, AllocationAlgorithm::rbtree_best_fit);
125  } else if (allocationAlgorithm == "simple_seq_fit") {
126  fSegments.emplace(fSegmentId, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size));
127  fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
128  }
129  ss << "Created ";
130  (fEventCounter->fCount)++;
131  } else {
132  // found segment with the given id, opening
133  if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
134  fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str()));
135  if (allocationAlgorithm != "rbtree_best_fit") {
136  LOG(warn) << "Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
137  allocationAlgorithm = "rbtree_best_fit";
138  }
139  } else {
140  fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str()));
141  if (allocationAlgorithm != "simple_seq_fit") {
142  LOG(warn) << "Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
143  allocationAlgorithm = "simple_seq_fit";
144  }
145  }
146  ss << "Opened ";
147  }
148  ss << "shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
149  << " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId)) << " bytes."
150  << " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId)) << " bytes."
151  << " Allocation algorithm: " << allocationAlgorithm;
152  LOG(debug) << ss.str();
153  } catch(interprocess_exception& bie) {
154  LOG(error) << "Failed to create/open shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what();
155  throw std::runtime_error(tools::ToString("Failed to create/open shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what()));
156  }
157 
158  if (mlockSegment) {
159  LOG(debug) << "Locking the managed segment memory pages...";
160  if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
161  LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
162  }
163  LOG(debug) << "Successfully locked the managed segment memory pages.";
164  }
165  if (zeroSegment) {
166  LOG(debug) << "Zeroing the managed segment free memory...";
167  boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
168  LOG(debug) << "Successfully zeroed the managed segment free memory.";
169  }
170 
171  fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
172 
173  fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
174 
175  if (fDeviceCounter) {
176  LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
177  (fDeviceCounter->fCount)++;
178  LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
179  } else {
180  LOG(debug) << "no device counter found, creating one and initializing with 1";
181  fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
182  LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
183  }
184 
185 #ifdef FAIRMQ_DEBUG_MODE
186  fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
187  fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
188 #endif
189  }
190 
191  fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
192  }
193 
194  Manager() = delete;
195 
196  Manager(const Manager&) = delete;
197  Manager operator=(const Manager&) = delete;
198 
199  static void StartMonitor(const std::string& id)
200  {
201  using namespace boost::interprocess;
202  try {
203  named_mutex monitorStatus(open_only, std::string("fmq_" + id + "_ms").c_str());
204  LOG(debug) << "Found fairmq-shmmonitor for shared memory id " << id;
205  } catch (interprocess_exception&) {
206  LOG(debug) << "no fairmq-shmmonitor found for shared memory id " << id << ", starting...";
207  auto env = boost::this_process::environment();
208 
209  std::vector<boost::filesystem::path> ownPath = boost::this_process::path();
210 
211  if (const char* fmqp = getenv("FAIRMQ_PATH")) {
212  ownPath.insert(ownPath.begin(), boost::filesystem::path(fmqp));
213  }
214 
215  boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
216 
217  if (!p.empty()) {
218  boost::process::spawn(p, "-x", "--shmid", id, "-d", "-t", "2000", env);
219  int numTries = 0;
220  do {
221  try {
222  named_mutex monitorStatus(open_only, std::string("fmq_" + id + "_ms").c_str());
223  LOG(debug) << "Started fairmq-shmmonitor for shared memory id " << id;
224  break;
225  } catch (interprocess_exception&) {
226  std::this_thread::sleep_for(std::chrono::milliseconds(10));
227  if (++numTries > 1000) {
228  LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting.";
229  throw std::runtime_error(tools::ToString("Did not get response from fairmq-shmmonitor after ", 10 * 1000, " milliseconds. Exiting."));
230  }
231  }
232  } while (true);
233  } else {
234  LOG(warn) << "could not find fairmq-shmmonitor in the path";
235  }
236  }
237  }
238 
239  void Interrupt() { fInterrupted.store(true); }
240  void Resume() { fInterrupted.store(false); }
241  void Reset()
242  {
243  if (fMsgCounter.load() != 0) {
244  LOG(error) << "Message counter during Reset expected to be 0, found: " << fMsgCounter.load();
245  throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", fMsgCounter.load()));
246  }
247  }
248  bool Interrupted() { return fInterrupted.load(); }
249 
250  std::pair<boost::interprocess::mapped_region*, uint16_t> CreateRegion(const size_t size,
251  const int64_t userFlags,
252  RegionCallback callback,
253  RegionBulkCallback bulkCallback,
254  const std::string& path = "",
255  int flags = 0)
256  {
257  using namespace boost::interprocess;
258  try {
259  std::pair<mapped_region*, uint16_t> result;
260 
261  {
262  uint16_t id = 0;
263  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
264 
265  RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
266 
267  if (rc) {
268  LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing.";
269  (rc->fCount)++;
270  LOG(debug) << "incremented region counter, now: " << rc->fCount;
271  } else {
272  LOG(debug) << "no region counter found, creating one and initializing with 1";
273  rc = fManagementSegment.construct<RegionCounter>(unique_instance)(1);
274  LOG(debug) << "initialized region counter with: " << rc->fCount;
275  }
276 
277  id = rc->fCount;
278 
279  auto it = fRegions.find(id);
280  if (it != fRegions.end()) {
281  LOG(error) << "Trying to create a region that already exists";
282  return {nullptr, id};
283  }
284 
285  // create region info
286  fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
287 
288  auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
289  // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
290 
291  r.first->second->StartReceivingAcks();
292  result.first = &(r.first->second->fRegion);
293  result.second = id;
294 
295  (fEventCounter->fCount)++;
296  }
297  fRegionEventsCV.notify_all();
298 
299  return result;
300 
301  } catch (interprocess_exception& e) {
302  LOG(error) << "cannot create region. Already created/not cleaned up?";
303  LOG(error) << e.what();
304  throw;
305  }
306  }
307 
308  Region* GetRegion(const uint16_t id)
309  {
310  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
311  return GetRegionUnsafe(id);
312  }
313 
314  Region* GetRegionUnsafe(const uint16_t id)
315  {
316  // remote region could actually be a local one if a message originates from this device (has been sent out and returned)
317  auto it = fRegions.find(id);
318  if (it != fRegions.end()) {
319  return it->second.get();
320  } else {
321  try {
322  // get region info
323  RegionInfo regionInfo = fShmRegions->at(id);
324  std::string path = regionInfo.fPath.c_str();
325  int flags = regionInfo.fFlags;
326  // LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
327 
328  auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, path, flags));
329  return r.first->second.get();
330  } catch (std::out_of_range& oor) {
331  LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
332  LOG(error) << oor.what();
333  return nullptr;
334  } catch (boost::interprocess::interprocess_exception& e) {
335  LOG(warn) << "Could not get remote region for id '" << id << "'";
336  return nullptr;
337  }
338  }
339  }
340 
341  void RemoveRegion(const uint16_t id)
342  {
343  fRegions.erase(id);
344  {
345  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
346  fShmRegions->at(id).fDestroyed = true;
347  (fEventCounter->fCount)++;
348  }
349  fRegionEventsCV.notify_all();
350  }
351 
352  std::vector<fair::mq::RegionInfo> GetRegionInfo()
353  {
354  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
355  return GetRegionInfoUnsafe();
356  }
357 
358  std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe()
359  {
360  std::vector<fair::mq::RegionInfo> result;
361 
362  for (const auto& e : *fShmRegions) {
364  info.managed = false;
365  info.id = e.first;
366  info.flags = e.second.fUserFlags;
367  info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
368  if (!e.second.fDestroyed) {
369  auto region = GetRegionUnsafe(info.id);
370  info.ptr = region->fRegion.get_address();
371  info.size = region->fRegion.get_size();
372  } else {
373  info.ptr = nullptr;
374  info.size = 0;
375  }
376  result.push_back(info);
377  }
378 
379  for (const auto& e : *fShmSegments) {
380  // make sure any segments in the session are found
381  GetSegment(e.first);
382  try {
384  info.managed = true;
385  info.id = e.first;
386  info.event = RegionEvent::created;
387  info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
388  info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
389  result.push_back(info);
390  } catch (const std::out_of_range& oor) {
391  LOG(error) << "could not find segment with id " << e.first;
392  LOG(error) << oor.what();
393  }
394  }
395 
396  return result;
397  }
398 
399  void SubscribeToRegionEvents(RegionEventCallback callback)
400  {
401  if (fRegionEventThread.joinable()) {
402  LOG(debug) << "Already subscribed. Overwriting previous subscription.";
403  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
404  fRegionEventsSubscriptionActive = false;
405  lock.unlock();
406  fRegionEventsCV.notify_all();
407  fRegionEventThread.join();
408  }
409  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
410  fRegionEventCallback = callback;
411  fRegionEventsSubscriptionActive = true;
412  fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this);
413  }
414 
415  bool SubscribedToRegionEvents() { return fRegionEventThread.joinable(); }
416 
417  void UnsubscribeFromRegionEvents()
418  {
419  if (fRegionEventThread.joinable()) {
420  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
421  fRegionEventsSubscriptionActive = false;
422  lock.unlock();
423  fRegionEventsCV.notify_all();
424  fRegionEventThread.join();
425  lock.lock();
426  fRegionEventCallback = nullptr;
427  }
428  }
429 
430  void RegionEventsSubscription()
431  {
432  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
433  while (fRegionEventsSubscriptionActive) {
434  auto infos = GetRegionInfoUnsafe();
435  for (const auto& i : infos) {
436  auto el = fObservedRegionEvents.find({i.id, i.managed});
437  if (el == fObservedRegionEvents.end()) {
438  fRegionEventCallback(i);
439  fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
440  ++fNumObservedEvents;
441  } else {
442  if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
443  fRegionEventCallback(i);
444  el->second = i.event;
445  ++fNumObservedEvents;
446  } else {
447  // LOG(debug) << "ignoring event for id" << i.id << ":";
448  // LOG(debug) << "incoming event: " << i.event;
449  // LOG(debug) << "stored event: " << el->second;
450  }
451  }
452  }
453  fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
454  }
455  }
456 
457  void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
458  void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
459 
460 #ifdef FAIRMQ_DEBUG_MODE
461  void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
462  void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
463 #endif
464 
465  boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
466 
467  void SendHeartbeats()
468  {
469  std::string controlQueueName("fmq_" + fShmId + "_cq");
470  std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
471  while (fSendHeartbeats) {
472  try {
473  boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str());
474  boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100);
475  if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) {
476  fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() { return !fSendHeartbeats; });
477  } else {
478  LOG(debug) << "control queue timeout";
479  }
480  } catch (boost::interprocess::interprocess_exception& ie) {
481  fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(500), [&]() { return !fSendHeartbeats; });
482  // LOG(debug) << "no " << controlQueueName << " found";
483  }
484  }
485  }
486 
487  bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
488 
489  void GetSegment(uint16_t id)
490  {
491  auto it = fSegments.find(id);
492  if (it == fSegments.end()) {
493  try {
494  // get region info
495  SegmentInfo segmentInfo = fShmSegments->at(id);
496  LOG(debug) << "Located segment with id '" << id << "'";
497 
498  using namespace boost::interprocess;
499 
500  if (segmentInfo.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
501  fSegments.emplace(id, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(id)).c_str()));
502  } else {
503  fSegments.emplace(id, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(id)).c_str()));
504  }
505  } catch (std::out_of_range& oor) {
506  LOG(error) << "Could not get segment with id '" << id << "': " << oor.what();
507  } catch (boost::interprocess::interprocess_exception& bie) {
508  LOG(error) << "Could not get segment with id '" << id << "': " << bie.what();
509  }
510  }
511  }
512 
513  boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr, uint16_t segmentId) const
514  {
515  return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(segmentId));
516  }
517  void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) const
518  {
519  return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(segmentId));
520  }
521 
522  char* Allocate(const size_t size, size_t alignment = 0)
523  {
524  char* ptr = nullptr;
525  // tools::RateLimiter rateLimiter(20);
526 
527  while (ptr == nullptr) {
528  try {
529  // boost::interprocess::managed_shared_memory::size_type actualSize = size;
530  // char* hint = 0; // unused for boost::interprocess::allocate_new
531  // ptr = fSegments.at(fSegmentId).allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
532  size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId));
533  if (size > segmentSize) {
534  throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
535  }
536  if (alignment == 0) {
537  ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(fSegmentId)));
538  } else {
539  ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(fSegmentId)));
540  }
541  } catch (boost::interprocess::bad_alloc& ba) {
542  // LOG(warn) << "Shared memory full...";
543  if (ThrowingOnBadAlloc()) {
544  throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId))));
545  }
546  // rateLimiter.maybe_sleep();
547  std::this_thread::sleep_for(std::chrono::milliseconds(50));
548  if (Interrupted()) {
549  return ptr;
550  } else {
551  continue;
552  }
553  }
554 #ifdef FAIRMQ_DEBUG_MODE
555  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
556  IncrementShmMsgCounter(fSegmentId);
557  if (fMsgDebug->count(fSegmentId) == 0) {
558  (*fMsgDebug).emplace(fSegmentId, fShmVoidAlloc);
559  }
560  (*fMsgDebug).at(fSegmentId).emplace(
561  static_cast<size_t>(GetHandleFromAddress(ptr, fSegmentId)),
562  MsgDebug(getpid(), size, std::chrono::system_clock::now().time_since_epoch().count())
563  );
564 #endif
565  }
566 
567  return ptr;
568  }
569 
570  void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId)
571  {
572  boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId));
573 #ifdef FAIRMQ_DEBUG_MODE
574  boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
575  DecrementShmMsgCounter(segmentId);
576  try {
577  (*fMsgDebug).at(segmentId).erase(handle);
578  } catch(const std::out_of_range& oor) {
579  LOG(debug) << "could not locate debug container for " << segmentId << ": " << oor.what();
580  }
581 #endif
582  }
583 
584  char* ShrinkInPlace(size_t newSize, char* localPtr, uint16_t segmentId)
585  {
586  return boost::apply_visitor(SegmentBufferShrink{newSize, localPtr}, fSegments.at(segmentId));
587  }
588 
589  uint16_t GetSegmentId() const { return fSegmentId; }
590 
591  ~Manager()
592  {
593  using namespace boost::interprocess;
594  bool lastRemoved = false;
595 
596  UnsubscribeFromRegionEvents();
597 
598  {
599  std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
600  fSendHeartbeats = false;
601  }
602  fHeartbeatsCV.notify_one();
603  if (fHeartbeatThread.joinable()) {
604  fHeartbeatThread.join();
605  }
606 
607  try {
608  boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
609 
610  (fDeviceCounter->fCount)--;
611 
612  if (fDeviceCounter->fCount == 0) {
613  LOG(debug) << "Last segment user, " << (fNoCleanup ? "skipping removal (--shm-no-cleanup is true)." : "removing segment.");
614  lastRemoved = true;
615  } else {
616  LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal.";
617  }
618  } catch (interprocess_exception& e) {
619  LOG(error) << "Manager could not acquire lock: " << e.what();
620  }
621 
622  if (lastRemoved && !fNoCleanup) {
623  Monitor::Cleanup(ShmId{fShmId});
624  }
625  }
626 
627  private:
628  std::string fShmId;
629  uint16_t fSegmentId;
630  std::string fDeviceId;
631  std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
632  boost::interprocess::managed_shared_memory fManagementSegment;
633  VoidAlloc fShmVoidAlloc;
634  boost::interprocess::named_mutex fShmMtx;
635 
636  boost::interprocess::named_condition fRegionEventsCV;
637  std::thread fRegionEventThread;
638  bool fRegionEventsSubscriptionActive;
639  std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
640  std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
641  uint64_t fNumObservedEvents;
642 
643  DeviceCounter* fDeviceCounter;
644  EventCounter* fEventCounter;
645  Uint16SegmentInfoHashMap* fShmSegments;
646  Uint16RegionInfoHashMap* fShmRegions;
647  std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
648 
649  std::atomic<bool> fInterrupted;
650  std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
651 #ifdef FAIRMQ_DEBUG_MODE
652  Uint16MsgDebugMapHashMap* fMsgDebug;
653  Uint16MsgCounterHashMap* fShmMsgCounters;
654 #endif
655 
656  std::thread fHeartbeatThread;
657  bool fSendHeartbeats;
658  std::mutex fHeartbeatsMtx;
659  std::condition_variable fHeartbeatsCV;
660 
661  bool fThrowOnBadAlloc;
662  bool fNoCleanup;
663 };
664 
665 } // namespace fair::mq::shmem
666 
667 #endif /* FAIR_MQ_SHMEM_MANAGER_H_ */
fair::mq::shmem::DeviceCounter
Definition: Common.h:105
fair::mq::shmem::EventCounter
Definition: Common.h:114
FairMQRegionInfo
Definition: FairMQUnmanagedRegion.h:29
fair::mq::shmem::SegmentMemoryZeroer
Definition: Common.h:238
fair::mq::shmem::SegmentFreeMemory
Definition: Common.h:244
fair::mq::shmem::Manager
Definition: Manager.h:61
fair::mq::shmem::SegmentSize
Definition: Common.h:226
fair::mq::shmem::SegmentAddress
Definition: Common.h:232
fair::mq::shmem::Monitor::Cleanup
static std::vector< std::pair< std::string, bool > > Cleanup(const ShmId &shmId, bool verbose=true)
Cleanup all shared memory artifacts created by devices.
Definition: Monitor.cxx:466
fair::mq::shmem
Definition: Common.h:33

privacy