FairMQ  1.4.33
C++ Message Queuing Library and Framework
Message.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 #ifndef FAIR_MQ_SHMEM_MESSAGE_H_
9 #define FAIR_MQ_SHMEM_MESSAGE_H_
10 
11 #include "Common.h"
12 #include "Manager.h"
13 #include "Region.h"
14 #include "UnmanagedRegion.h"
15 #include <FairMQLogger.h>
16 #include <FairMQMessage.h>
17 #include <FairMQUnmanagedRegion.h>
18 
19 #include <boost/interprocess/mapped_region.hpp>
20 
21 #include <cstddef> // size_t
22 #include <atomic>
23 
24 #include <sys/types.h> // getpid
25 #include <unistd.h> // pid_t
26 
27 namespace fair::mq::shmem
28 {
29 
30 class Socket;
31 
32 class Message final : public fair::mq::Message
33 {
34  friend class Socket;
35 
36  public:
37  Message(Manager& manager, FairMQTransportFactory* factory = nullptr)
38  : fair::mq::Message(factory)
39  , fManager(manager)
40  , fQueued(false)
41  , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
42  , fRegionPtr(nullptr)
43  , fLocalPtr(nullptr)
44  {
45  fManager.IncrementMsgCounter();
46  }
47 
48  Message(Manager& manager, Alignment alignment, FairMQTransportFactory* factory = nullptr)
49  : fair::mq::Message(factory)
50  , fManager(manager)
51  , fQueued(false)
52  , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
53  , fAlignment(alignment.alignment)
54  , fRegionPtr(nullptr)
55  , fLocalPtr(nullptr)
56  {
57  fManager.IncrementMsgCounter();
58  }
59 
60  Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr)
61  : fair::mq::Message(factory)
62  , fManager(manager)
63  , fQueued(false)
64  , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
65  , fRegionPtr(nullptr)
66  , fLocalPtr(nullptr)
67  {
68  InitializeChunk(size);
69  fManager.IncrementMsgCounter();
70  }
71 
72  Message(Manager& manager, const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr)
73  : fair::mq::Message(factory)
74  , fManager(manager)
75  , fQueued(false)
76  , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
77  , fAlignment(alignment.alignment)
78  , fRegionPtr(nullptr)
79  , fLocalPtr(nullptr)
80  {
81  InitializeChunk(size, fAlignment);
82  fManager.IncrementMsgCounter();
83  }
84 
85  Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
86  : fair::mq::Message(factory)
87  , fManager(manager)
88  , fQueued(false)
89  , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
90  , fRegionPtr(nullptr)
91  , fLocalPtr(nullptr)
92  {
93  if (InitializeChunk(size)) {
94  std::memcpy(fLocalPtr, data, size);
95  if (ffn) {
96  ffn(data, hint);
97  } else {
98  free(data);
99  }
100  }
101  fManager.IncrementMsgCounter();
102  }
103 
104  Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr)
105  : fair::mq::Message(factory)
106  , fManager(manager)
107  , fQueued(false)
108  , fMeta{size, reinterpret_cast<size_t>(hint), static_cast<UnmanagedRegion*>(region.get())->fRegionId, fManager.GetSegmentId(), -1}
109  , fRegionPtr(nullptr)
110  , fLocalPtr(static_cast<char*>(data))
111  {
112  if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
113  reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
114  fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
115  } else {
116  LOG(error) << "trying to create region message with data from outside the region";
117  throw std::runtime_error("trying to create region message with data from outside the region");
118  }
119  fManager.IncrementMsgCounter();
120  }
121 
122  Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory = nullptr)
123  : fair::mq::Message(factory)
124  , fManager(manager)
125  , fQueued(false)
126  , fMeta{hdr}
127  , fRegionPtr(nullptr)
128  , fLocalPtr(nullptr)
129  {
130  fManager.IncrementMsgCounter();
131  }
132 
133  Message(const Message&) = delete;
134  Message operator=(const Message&) = delete;
135 
136  void Rebuild() override
137  {
138  CloseMessage();
139  fQueued = false;
140  }
141 
142  void Rebuild(Alignment alignment) override
143  {
144  CloseMessage();
145  fQueued = false;
146  fAlignment = alignment.alignment;
147  }
148 
149  void Rebuild(const size_t size) override
150  {
151  CloseMessage();
152  fQueued = false;
153  InitializeChunk(size);
154  }
155 
156  void Rebuild(const size_t size, Alignment alignment) override
157  {
158  CloseMessage();
159  fQueued = false;
160  fAlignment = alignment.alignment;
161  InitializeChunk(size, fAlignment);
162  }
163 
164  void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
165  {
166  CloseMessage();
167  fQueued = false;
168 
169  if (InitializeChunk(size)) {
170  std::memcpy(fLocalPtr, data, size);
171  if (ffn) {
172  ffn(data, hint);
173  } else {
174  free(data);
175  }
176  }
177  }
178 
179  void* GetData() const override
180  {
181  if (!fLocalPtr) {
182  if (fMeta.fRegionId == 0) {
183  if (fMeta.fSize > 0) {
184  fManager.GetSegment(fMeta.fSegmentId);
185  fLocalPtr = reinterpret_cast<char*>(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId));
186  } else {
187  fLocalPtr = nullptr;
188  }
189  } else {
190  fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
191  if (fRegionPtr) {
192  fLocalPtr = reinterpret_cast<char*>(fRegionPtr->fRegion.get_address()) + fMeta.fHandle;
193  } else {
194  // LOG(warn) << "could not get pointer from a region message";
195  fLocalPtr = nullptr;
196  }
197  }
198  }
199 
200  return fLocalPtr;
201  }
202 
203  size_t GetSize() const override { return fMeta.fSize; }
204 
205  bool SetUsedSize(const size_t newSize) override
206  {
207  if (newSize == fMeta.fSize) {
208  return true;
209  } else if (newSize == 0) {
210  Deallocate();
211  return true;
212  } else if (newSize <= fMeta.fSize) {
213  try {
214  try {
215  fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
216  fMeta.fSize = newSize;
217  return true;
218  } catch (boost::interprocess::bad_alloc& e) {
219  // if shrinking fails (can happen due to boost alignment requirements):
220  // unused size >= 1000000 bytes: reallocate fully
221  // unused size < 1000000 bytes: simply reset the size and keep the rest of the buffer until message destruction
222  if (fMeta.fSize - newSize >= 1000000) {
223  char* newPtr = fManager.Allocate(newSize, fAlignment);
224  if (newPtr) {
225  std::memcpy(newPtr, fLocalPtr, newSize);
226  fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
227  fLocalPtr = newPtr;
228  fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
229  } else {
230  LOG(debug) << "could not set used size: " << e.what();
231  return false;
232  }
233  }
234  fMeta.fSize = newSize;
235  return true;
236  }
237  } catch (boost::interprocess::interprocess_exception& e) {
238  LOG(debug) << "could not set used size: " << e.what();
239  return false;
240  }
241  } else {
242  LOG(error) << "cannot set used size higher than original.";
243  return false;
244  }
245  }
246 
247  Transport GetType() const override { return fair::mq::Transport::SHM; }
248 
249  void Copy(const fair::mq::Message& msg) override
250  {
251  if (fMeta.fHandle < 0) {
252  boost::interprocess::managed_shared_memory::handle_t otherHandle = static_cast<const Message&>(msg).fMeta.fHandle;
253  if (otherHandle) {
254  if (InitializeChunk(msg.GetSize())) {
255  std::memcpy(GetData(), msg.GetData(), msg.GetSize());
256  }
257  } else {
258  LOG(error) << "copy fail: source message not initialized!";
259  }
260  } else {
261  LOG(error) << "copy fail: target message already initialized!";
262  }
263  }
264 
265  ~Message() override
266  {
267  try {
268  CloseMessage();
269  } catch(SharedMemoryError& sme) {
270  LOG(error) << "error closing message: " << sme.what();
271  } catch(boost::interprocess::lock_exception& le) {
272  LOG(error) << "error closing message: " << le.what();
273  }
274  }
275 
276  private:
277  Manager& fManager;
278  bool fQueued;
279  MetaHeader fMeta;
280  size_t fAlignment;
281  mutable Region* fRegionPtr;
282  mutable char* fLocalPtr;
283 
284  char* InitializeChunk(const size_t size, size_t alignment = 0)
285  {
286  fLocalPtr = fManager.Allocate(size, alignment);
287  if (fLocalPtr) {
288  fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
289  fMeta.fSize = size;
290  }
291  return fLocalPtr;
292  }
293 
294  void Deallocate()
295  {
296  if (fMeta.fHandle >= 0 && !fQueued) {
297  if (fMeta.fRegionId == 0) {
298  fManager.GetSegment(fMeta.fSegmentId);
299  fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
300  fMeta.fHandle = -1;
301  } else {
302  if (!fRegionPtr) {
303  fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
304  }
305 
306  if (fRegionPtr) {
307  fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
308  } else {
309  LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
310  }
311  }
312  }
313  fLocalPtr = nullptr;
314  fMeta.fSize = 0;
315  }
316 
317  void CloseMessage()
318  {
319  Deallocate();
320  fAlignment = 0;
321 
322  fManager.DecrementMsgCounter(); // TODO: put this to debug mode
323  }
324 };
325 
326 } // namespace fair::mq::shmem
327 
328 #endif /* FAIR_MQ_SHMEM_MESSAGE_H_ */
fair::mq::Alignment
Definition: FairMQMessage.h:25
fair::mq::shmem::Message
Definition: Message.h:39
FairMQMessage
Definition: FairMQMessage.h:33
fair::mq::shmem
Definition: Common.h:33
FairMQTransportFactory
Definition: FairMQTransportFactory.h:30

privacy