FairMQ  1.4.33
C++ Message Queuing Library and Framework
Message.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_ZMQ_MESSAGE_H
10 #define FAIR_MQ_ZMQ_MESSAGE_H
11 
12 #include <fairmq/zeromq/UnmanagedRegion.h>
13 #include <FairMQLogger.h>
14 #include <FairMQMessage.h>
15 #include <FairMQUnmanagedRegion.h>
16 
17 #include <zmq.h>
18 
19 #include <cstddef>
20 #include <cstdlib> // malloc
21 #include <cstring>
22 #include <memory> // make_unique
23 #include <new> // bad_alloc
24 #include <string>
25 
26 namespace fair::mq::zmq
27 {
28 
29 class Socket;
30 
31 class Message final : public fair::mq::Message
32 {
33  friend class Socket;
34 
35  public:
36  Message(FairMQTransportFactory* factory = nullptr)
37  : fair::mq::Message(factory)
38  , fAlignment(0)
39  , fMsg(std::make_unique<zmq_msg_t>())
40  {
41  if (zmq_msg_init(fMsg.get()) != 0) {
42  LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
43  }
44  }
45 
46  Message(Alignment alignment, FairMQTransportFactory* factory = nullptr)
47  : fair::mq::Message(factory)
48  , fAlignment(alignment.alignment)
49  , fMsg(std::make_unique<zmq_msg_t>())
50  {
51  if (zmq_msg_init(fMsg.get()) != 0) {
52  LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
53  }
54  }
55 
56  Message(const size_t size, FairMQTransportFactory* factory = nullptr)
57  : fair::mq::Message(factory)
58  , fAlignment(0)
59  , fMsg(std::make_unique<zmq_msg_t>())
60  {
61  if (zmq_msg_init_size(fMsg.get(), size) != 0) {
62  LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
63  }
64  }
65 
66  static std::pair<void*, void*> AllocateAligned(size_t size, size_t alignment)
67  {
68  char* fullBufferPtr = static_cast<char*>(malloc(size + alignment));
69  if (!fullBufferPtr) {
70  LOG(error) << "failed to allocate buffer with provided size (" << size << ") and alignment (" << alignment << ").";
71  throw std::bad_alloc();
72  }
73 
74  size_t offset = alignment - (reinterpret_cast<uintptr_t>(fullBufferPtr) % alignment);
75  char* alignedPartPtr = fullBufferPtr + offset;
76 
77  return {static_cast<void*>(fullBufferPtr), static_cast<void*>(alignedPartPtr)};
78  }
79 
80  Message(const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr)
81  : fair::mq::Message(factory)
82  , fAlignment(alignment.alignment)
83  , fMsg(std::make_unique<zmq_msg_t>())
84  {
85  if (fAlignment != 0) {
86  auto ptrs = AllocateAligned(size, fAlignment);
87  if (zmq_msg_init_data(fMsg.get(), ptrs.second, size, [](void* /* data */, void* hint) { free(hint); }, ptrs.first) != 0) {
88  LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
89  }
90  } else {
91  if (zmq_msg_init_size(fMsg.get(), size) != 0) {
92  LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
93  }
94  }
95  }
96 
97  Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
98  : fair::mq::Message(factory)
99  , fAlignment(0)
100  , fMsg(std::make_unique<zmq_msg_t>())
101  {
102  if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) {
103  LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
104  }
105  }
106 
107  Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr)
108  : fair::mq::Message(factory)
109  , fAlignment(0)
110  , fMsg(std::make_unique<zmq_msg_t>())
111  {
112  // FIXME: make this zero-copy:
113  // simply taking over the provided buffer can casue premature delete, since region could be
114  // destroyed before the message is sent out. Needs lifetime extension for the ZMQ region.
115  if (zmq_msg_init_size(fMsg.get(), size) != 0) {
116  LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
117  }
118 
119  std::memcpy(zmq_msg_data(fMsg.get()), data, size);
120  // call region callback
121  auto ptr = static_cast<UnmanagedRegion*>(region.get());
122  if (ptr->fBulkCallback) {
123  ptr->fBulkCallback({{data, size, hint}});
124  } else if (ptr->fCallback) {
125  ptr->fCallback(data, size, hint);
126  }
127 
128  // if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0)
129  // {
130  // LOG(error) << "failed initializing message with data, reason: " <<
131  // zmq_strerror(errno);
132  // }
133  }
134 
135  void Rebuild() override
136  {
137  CloseMessage();
138  fMsg = std::make_unique<zmq_msg_t>();
139  if (zmq_msg_init(fMsg.get()) != 0) {
140  LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
141  }
142  }
143 
144  void Rebuild(Alignment alignment) override
145  {
146  CloseMessage();
147  fAlignment = alignment.alignment;
148  fMsg = std::make_unique<zmq_msg_t>();
149  if (zmq_msg_init(fMsg.get()) != 0) {
150  LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
151  }
152  }
153 
154  void Rebuild(const size_t size) override
155  {
156  CloseMessage();
157  fMsg = std::make_unique<zmq_msg_t>();
158  if (zmq_msg_init_size(fMsg.get(), size) != 0) {
159  LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
160  }
161  }
162 
163  void Rebuild(const size_t size, Alignment alignment) override
164  {
165  CloseMessage();
166  fAlignment = alignment.alignment;
167  fMsg = std::make_unique<zmq_msg_t>();
168 
169  if (fAlignment != 0) {
170  auto ptrs = AllocateAligned(size, fAlignment);
171  if (zmq_msg_init_data(fMsg.get(), ptrs.second, size, [](void* /* data */, void* hint) { free(hint); }, ptrs.first) != 0) {
172  LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
173  }
174  } else {
175  if (zmq_msg_init_size(fMsg.get(), size) != 0) {
176  LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
177  }
178  }
179  }
180 
181  void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
182  {
183  CloseMessage();
184  fMsg = std::make_unique<zmq_msg_t>();
185  if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) {
186  LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
187  }
188  }
189 
190  void* GetData() const override
191  {
192  if (zmq_msg_size(fMsg.get()) > 0) {
193  return zmq_msg_data(fMsg.get());
194  } else {
195  return nullptr;
196  }
197  }
198 
199  size_t GetSize() const override { return zmq_msg_size(fMsg.get()); }
200 
201  // To emulate shrinking, a new message is created with the new size (ViewMsg), that points to
202  // the original buffer with the new size. Once the "view message" is transfered, the original is
203  // destroyed. Used size is applied only once in ApplyUsedSize, which is called by the socket
204  // before sending. This function just updates the desired size until the actual "resizing"
205  // happens.
206  bool SetUsedSize(const size_t size) override
207  {
208  if (size == GetSize()) {
209  // nothing to do
210  return true;
211  } else if (size > GetSize()) {
212  LOG(error) << "cannot set used size higher than original.";
213  return false;
214  } else {
215  auto newMsg = std::make_unique<zmq_msg_t>();
216  void* data = GetData();
217  if (zmq_msg_init_data(newMsg.get(), data, size, [](void* /* data */, void* obj) {
218  zmq_msg_close(static_cast<zmq_msg_t*>(obj));
219  delete static_cast<zmq_msg_t*>(obj);
220  }, fMsg.get()) != 0) {
221  LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
222  return false;
223  }
224  fMsg.release();
225  fMsg.swap(newMsg);
226  return true;
227  }
228  }
229 
230  void Realign()
231  {
232  // if alignment is provided
233  if (fAlignment != 0) {
234  void* data = GetData();
235  size_t size = GetSize();
236  // if buffer is valid && not already aligned with the given alignment
237  if (data != nullptr && reinterpret_cast<uintptr_t>(GetData()) % fAlignment) {
238  // create new aligned buffer
239  auto ptrs = AllocateAligned(size, fAlignment);
240  std::memcpy(ptrs.second, zmq_msg_data(fMsg.get()), size);
241  // rebuild the message with the new buffer
242  Rebuild(ptrs.second, size, [](void* /* buf */, void* hint) { free(hint); }, ptrs.first);
243  }
244  }
245  }
246 
247  Transport GetType() const override { return Transport::ZMQ; }
248 
249  void Copy(const fair::mq::Message& msg) override
250  {
251  const Message& zMsg = static_cast<const Message&>(msg);
252  // Shares the message buffer between msg and this fMsg.
253  if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0) {
254  LOG(error) << "failed copying message, reason: " << zmq_strerror(errno);
255  return;
256  }
257  }
258 
259  ~Message() override { CloseMessage(); }
260 
261  private:
262  size_t fAlignment;
263  std::unique_ptr<zmq_msg_t> fMsg;
264 
265  zmq_msg_t* GetMessage() const { return fMsg.get(); }
266 
267  void CloseMessage()
268  {
269  if (zmq_msg_close(fMsg.get()) != 0) {
270  LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
271  }
272  // reset the message object to allow reuse in Rebuild
273  fMsg.reset(nullptr);
274  fAlignment = 0;
275  }
276 };
277 
278 } // namespace fair::mq::zmq
279 
280 #endif /* FAIR_MQ_ZMQ_MESSAGE_H */
fair::mq::Alignment
Definition: FairMQMessage.h:25
FairMQMessage
Definition: FairMQMessage.h:33
fair::mq::zmq::Message
Definition: Message.h:38
FairMQTransportFactory
Definition: FairMQTransportFactory.h:30

privacy