FairMQ  1.4.33
C++ Message Queuing Library and Framework
Region.h
1 /********************************************************************************
2 * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3 * *
4 * This software is distributed under the terms of the *
5 * GNU Lesser General Public Licence (LGPL) version 3, *
6 * copied verbatim in the file "LICENSE" *
7 ********************************************************************************/
15 #ifndef FAIR_MQ_SHMEM_REGION_H_
16 #define FAIR_MQ_SHMEM_REGION_H_
17 
18 #include "Common.h"
19 
20 #include <FairMQLogger.h>
21 #include <FairMQUnmanagedRegion.h>
22 #include <fairmq/tools/Strings.h>
23 
24 #include <boost/filesystem.hpp>
25 #include <boost/process.hpp>
26 #include <boost/date_time/posix_time/posix_time.hpp>
27 #include <boost/interprocess/managed_shared_memory.hpp>
28 #include <boost/interprocess/file_mapping.hpp>
29 #include <boost/interprocess/ipc/message_queue.hpp>
30 
31 #include <algorithm> // min
32 #include <atomic>
33 #include <thread>
34 #include <memory> // make_unique
35 #include <mutex>
36 #include <condition_variable>
37 #include <unordered_map>
38 #include <cerrno>
39 #include <chrono>
40 #include <ios>
41 
42 namespace fair::mq::shmem
43 {
44 
45 struct Region
46 {
47  Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
48  : fRemote(remote)
49  , fLinger(100)
50  , fStop(false)
51  , fName("fmq_" + shmId + "_rg_" + std::to_string(id))
52  , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
53  , fShmemObject()
54  , fFile(nullptr)
55  , fFileMapping()
56  , fQueue(nullptr)
57  , fAcksReceiver()
58  , fAcksSender()
59  , fCallback(callback)
60  , fBulkCallback(bulkCallback)
61  {
62  using namespace boost::interprocess;
63 
64  if (path != "") {
65  fName = std::string(path + fName);
66 
67  if (!fRemote) {
68  // create a file
69  std::filebuf fbuf;
70  if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) {
71  // set the size
72  fbuf.pubseekoff(size - 1, std::ios_base::beg);
73  fbuf.sputc(0);
74  }
75  }
76 
77  fFile = fopen(fName.c_str(), "r+");
78 
79  if (!fFile) {
80  LOG(error) << "Failed to initialize file: " << fName;
81  LOG(error) << "errno: " << errno << ": " << strerror(errno);
82  throw std::runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno)));
83  }
84  fFileMapping = file_mapping(fName.c_str(), read_write);
85  LOG(debug) << "shmem: initialized file: " << fName;
86  fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
87  } else {
88  if (fRemote) {
89  fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
90  } else {
91  fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
92  fShmemObject.truncate(size);
93  }
94  fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
95  }
96 
97  InitializeQueues();
98  StartSendingAcks();
99  LOG(debug) << "shmem: initialized region: " << fName;
100  }
101 
102  Region() = delete;
103 
104  Region(const Region&) = delete;
105  Region(Region&&) = delete;
106 
107  void InitializeQueues()
108  {
109  using namespace boost::interprocess;
110 
111  if (fRemote) {
112  fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
113  } else {
114  fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
115  }
116  LOG(debug) << "shmem: initialized region queue: " << fQueueName;
117  }
118 
119  void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
120  void SendAcks()
121  {
122  std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
123  size_t blocksToSend = 0;
124 
125  while (true) {
126  blocksToSend = 0;
127  {
128  std::unique_lock<std::mutex> lock(fBlockMtx);
129 
130  // try to get <fAckBunchSize> blocks
131  if (fBlocksToFree.size() < fAckBunchSize) {
132  fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
133  }
134 
135  // send whatever blocks we have
136  blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
137 
138  copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
139  fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
140  }
141 
142  if (blocksToSend > 0) {
143  while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
144  // receiver slow? yield and try again...
145  std::this_thread::yield();
146  }
147  // LOG(debug) << "Sent " << blocksToSend << " blocks.";
148  } else { // blocksToSend == 0
149  if (fStop) {
150  break;
151  }
152  }
153  }
154 
155  LOG(trace) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
156  << " blocks left to send: " << blocksToSend << ").";
157  }
158 
159  void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
160  void ReceiveAcks()
161  {
162  unsigned int priority;
163  boost::interprocess::message_queue::size_type recvdSize;
164  std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
165  std::vector<fair::mq::RegionBlock> result;
166  result.reserve(fAckBunchSize);
167 
168  while (true) {
169  uint32_t timeout = 100;
170  bool leave = false;
171  if (fStop) {
172  timeout = fLinger;
173  leave = true;
174  }
175  auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout);
176 
177  while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) {
178  const auto numBlocks = recvdSize / sizeof(RegionBlock);
179  // LOG(debug) << "Received " << numBlocks << " blocks (recvdSize: " << recvdSize << "). (remaining queue size: " << fQueue->get_num_msg() << ").";
180  if (fBulkCallback) {
181  result.clear();
182  for (size_t i = 0; i < numBlocks; i++) {
183  result.emplace_back(reinterpret_cast<char*>(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast<void*>(blocks[i].fHint));
184  }
185  fBulkCallback(result);
186  } else if (fCallback) {
187  for (size_t i = 0; i < numBlocks; i++) {
188  fCallback(reinterpret_cast<char*>(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast<void*>(blocks[i].fHint));
189  }
190  }
191  }
192 
193  if (leave) {
194  break;
195  }
196  }
197 
198  LOG(trace) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
199  }
200 
201  void ReleaseBlock(const RegionBlock& block)
202  {
203  std::unique_lock<std::mutex> lock(fBlockMtx);
204 
205  fBlocksToFree.emplace_back(block);
206 
207  if (fBlocksToFree.size() >= fAckBunchSize) {
208  lock.unlock();
209  fBlockSendCV.notify_one();
210  }
211  }
212 
213  void SetLinger(uint32_t linger) { fLinger = linger; }
214  uint32_t GetLinger() const { return fLinger; }
215 
216  ~Region()
217  {
218  fStop = true;
219 
220  if (fAcksSender.joinable()) {
221  fBlockSendCV.notify_one();
222  fAcksSender.join();
223  }
224 
225  if (!fRemote) {
226  if (fAcksReceiver.joinable()) {
227  fAcksReceiver.join();
228  }
229 
230  if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
231  LOG(debug) << "Region '" << fName << "' destroyed.";
232  }
233 
234  if (boost::interprocess::file_mapping::remove(fName.c_str())) {
235  LOG(debug) << "File mapping '" << fName << "' destroyed.";
236  }
237 
238  if (fFile) {
239  fclose(fFile);
240  }
241 
242  if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
243  LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
244  }
245  } else {
246  // LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
247  LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
248  }
249 
250  LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
251  }
252 
253  bool fRemote;
254  uint32_t fLinger;
255  std::atomic<bool> fStop;
256  std::string fName;
257  std::string fQueueName;
258  boost::interprocess::shared_memory_object fShmemObject;
259  FILE* fFile;
260  boost::interprocess::file_mapping fFileMapping;
261  boost::interprocess::mapped_region fRegion;
262 
263  std::mutex fBlockMtx;
264  std::condition_variable fBlockSendCV;
265  std::vector<RegionBlock> fBlocksToFree;
266  const std::size_t fAckBunchSize = 256;
267  std::unique_ptr<boost::interprocess::message_queue> fQueue;
268 
269  std::thread fAcksReceiver;
270  std::thread fAcksSender;
271  RegionCallback fCallback;
272  RegionBulkCallback fBulkCallback;
273 };
274 
275 } // namespace fair::mq::shmem
276 
277 #endif /* FAIR_MQ_SHMEM_REGION_H_ */
FairMQRegionBlock
Definition: FairMQUnmanagedRegion.h:56
fair::mq::shmem::Region
Definition: Region.h:52
fair::mq::shmem
Definition: Common.h:33

privacy