FairMQ  1.4.33
C++ Message Queuing Library and Framework
Socket.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 #ifndef FAIR_MQ_SHMEM_SOCKET_H_
9 #define FAIR_MQ_SHMEM_SOCKET_H_
10 
11 #include "Common.h"
12 #include "Manager.h"
13 #include "Message.h"
14 
15 #include <FairMQSocket.h>
16 #include <FairMQMessage.h>
17 #include <FairMQLogger.h>
18 #include <fairmq/tools/Strings.h>
19 
20 #include <zmq.h>
21 
22 #include <atomic>
23 #include <memory> // make_unique
24 
26 
27 namespace fair::mq::shmem
28 {
29 
30 struct ZMsg
31 {
32  ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
33  explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); }
34  ~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); }
35 
36  void* Data() { return zmq_msg_data(&fMsg); }
37  size_t Size() { return zmq_msg_size(&fMsg); }
38  zmq_msg_t* Msg() { return &fMsg; }
39 
40  zmq_msg_t fMsg;
41 };
42 
43 class Socket final : public fair::mq::Socket
44 {
45  public:
46  Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id, void* context, FairMQTransportFactory* fac = nullptr)
47  : fair::mq::Socket(fac)
48  , fSocket(nullptr)
49  , fManager(manager)
50  , fId(id + "." + name + "." + type)
51  , fBytesTx(0)
52  , fBytesRx(0)
53  , fMessagesTx(0)
54  , fMessagesRx(0)
55  , fTimeout(100)
56  {
57  assert(context);
58 
59  if (type == "sub" || type == "pub") {
60  LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
61  throw SocketError("PUB/SUB socket type is not supported for shared memory transport");
62  }
63 
64  fSocket = zmq_socket(context, GetConstant(type));
65 
66  if (fSocket == nullptr) {
67  LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
68  throw SocketError(tools::ToString("Failed creating socket ", fId, ", reason: ", zmq_strerror(errno)));
69  }
70 
71  if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) {
72  LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno);
73  }
74 
75  // Tell socket to try and send/receive outstanding messages for <linger> milliseconds before terminating.
76  // Default value for ZeroMQ is -1, which is to wait forever.
77  int linger = 1000;
78  if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
79  LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
80  }
81 
82  if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
83  LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
84  }
85 
86  if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
87  LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
88  }
89 
90  // if (type == "sub")
91  // {
92  // if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
93  // {
94  // LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
95  // }
96  // }
97  LOG(debug) << "Created socket " << GetId();
98  }
99 
100  Socket(const Socket&) = delete;
101  Socket operator=(const Socket&) = delete;
102 
103  std::string GetId() const override { return fId; }
104 
105  bool Bind(const std::string& address) override
106  {
107  // LOG(info) << "binding socket " << fId << " on " << address;
108  if (zmq_bind(fSocket, address.c_str()) != 0) {
109  if (errno == EADDRINUSE) {
110  // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range.
111  return false;
112  }
113  LOG(error) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
114  return false;
115  }
116  return true;
117  }
118 
119  bool Connect(const std::string& address) override
120  {
121  // LOG(info) << "connecting socket " << fId << " on " << address;
122  if (zmq_connect(fSocket, address.c_str()) != 0) {
123  LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
124  return false;
125  }
126  return true;
127  }
128 
129  bool ShouldRetry(int flags, int timeout, int& elapsed) const
130  {
131  if ((flags & ZMQ_DONTWAIT) == 0) {
132  if (timeout > 0) {
133  elapsed += fTimeout;
134  if (elapsed >= timeout) {
135  return false;
136  }
137  }
138  return true;
139  } else {
140  return false;
141  }
142  }
143 
144  int HandleErrors() const
145  {
146  if (zmq_errno() == ETERM) {
147  LOG(debug) << "Terminating socket " << fId;
148  return static_cast<int>(TransferCode::error);
149  } else {
150  LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
151  return static_cast<int>(TransferCode::error);
152  }
153  }
154 
155  int64_t Send(MessagePtr& msg, const int timeout = -1) override
156  {
157  int flags = 0;
158  if (timeout == 0) {
159  flags = ZMQ_DONTWAIT;
160  }
161  int elapsed = 0;
162 
163  Message* shmMsg = static_cast<Message*>(msg.get());
164  ZMsg zmqMsg(sizeof(MetaHeader));
165  std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
166 
167  while (true) {
168  int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
169  if (nbytes > 0) {
170  shmMsg->fQueued = true;
171  ++fMessagesTx;
172  size_t size = msg->GetSize();
173  fBytesTx += size;
174  return size;
175  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
176  if (fManager.Interrupted()) {
177  return static_cast<int>(TransferCode::interrupted);
178  } else if (ShouldRetry(flags, timeout, elapsed)) {
179  continue;
180  } else {
181  return static_cast<int>(TransferCode::timeout);
182  }
183  } else {
184  return HandleErrors();
185  }
186  }
187 
188  return static_cast<int>(TransferCode::error);
189  }
190 
191  int64_t Receive(MessagePtr& msg, const int timeout = -1) override
192  {
193  int flags = 0;
194  if (timeout == 0) {
195  flags = ZMQ_DONTWAIT;
196  }
197  int elapsed = 0;
198 
199  ZMsg zmqMsg;
200 
201  while (true) {
202  Message* shmMsg = static_cast<Message*>(msg.get());
203  int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
204  if (nbytes > 0) {
205  // check for number of received messages. must be 1
206  if (nbytes != sizeof(MetaHeader)) {
207  throw SocketError(
208  tools::ToString("Received message is not a valid FairMQ shared memory message. ",
209  "Possibly due to a misconfigured transport on the sender side. ",
210  "Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
211  }
212 
213  MetaHeader* hdr = static_cast<MetaHeader*>(zmqMsg.Data());
214  size_t size = hdr->fSize;
215  shmMsg->fMeta = *hdr;
216 
217  fBytesRx += size;
218  ++fMessagesRx;
219  return size;
220  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
221  if (fManager.Interrupted()) {
222  return static_cast<int>(TransferCode::interrupted);
223  } else if (ShouldRetry(flags, timeout, elapsed)) {
224  continue;
225  } else {
226  return static_cast<int>(TransferCode::timeout);
227  }
228  } else {
229  return HandleErrors();
230  }
231  }
232  }
233 
234  int64_t Send(std::vector<MessagePtr>& msgVec, const int timeout = -1) override
235  {
236  int flags = 0;
237  if (timeout == 0) {
238  flags = ZMQ_DONTWAIT;
239  }
240  int elapsed = 0;
241 
242  // put it into zmq message
243  const unsigned int vecSize = msgVec.size();
244  ZMsg zmqMsg(vecSize * sizeof(MetaHeader));
245 
246  // prepare the message with shm metas
247  MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
248 
249  for (auto& msg : msgVec) {
250  Message* shmMsg = static_cast<Message*>(msg.get());
251  std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
252  }
253 
254  while (true) {
255  int64_t totalSize = 0;
256  int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
257  if (nbytes > 0) {
258  assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
259 
260  for (auto& msg : msgVec) {
261  Message* shmMsg = static_cast<Message*>(msg.get());
262  shmMsg->fQueued = true;
263  totalSize += shmMsg->fMeta.fSize;
264  }
265 
266  // store statistics on how many messages have been sent
267  fMessagesTx++;
268  fBytesTx += totalSize;
269 
270  return totalSize;
271  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
272  if (fManager.Interrupted()) {
273  return static_cast<int>(TransferCode::interrupted);
274  } else if (ShouldRetry(flags, timeout, elapsed)) {
275  continue;
276  } else {
277  return static_cast<int>(TransferCode::timeout);
278  }
279  } else {
280  return HandleErrors();
281  }
282  }
283 
284  return static_cast<int>(TransferCode::error);
285  }
286 
287  int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override
288  {
289  int flags = 0;
290  if (timeout == 0) {
291  flags = ZMQ_DONTWAIT;
292  }
293  int elapsed = 0;
294 
295  ZMsg zmqMsg;
296 
297  while (true) {
298  int64_t totalSize = 0;
299  int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
300  if (nbytes > 0) {
301  MetaHeader* hdrVec = static_cast<MetaHeader*>(zmqMsg.Data());
302  const auto hdrVecSize = zmqMsg.Size();
303 
304  assert(hdrVecSize > 0);
305  if (hdrVecSize % sizeof(MetaHeader) != 0) {
306  throw SocketError(
307  tools::ToString("Received message is not a valid FairMQ shared memory message. ",
308  "Possibly due to a misconfigured transport on the sender side. ",
309  "Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
310  }
311 
312  const auto numMessages = hdrVecSize / sizeof(MetaHeader);
313  msgVec.reserve(numMessages);
314 
315  for (size_t m = 0; m < numMessages; m++) {
316  // create new message (part)
317  msgVec.emplace_back(std::make_unique<Message>(fManager, hdrVec[m], GetTransport()));
318  Message* shmMsg = static_cast<Message*>(msgVec.back().get());
319  totalSize += shmMsg->GetSize();
320  }
321 
322  // store statistics on how many messages have been received (handle all parts as a single message)
323  fMessagesRx++;
324  fBytesRx += totalSize;
325 
326  return totalSize;
327  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
328  if (fManager.Interrupted()) {
329  return static_cast<int>(TransferCode::interrupted);
330  } else if (ShouldRetry(flags, timeout, elapsed)) {
331  continue;
332  } else {
333  return static_cast<int>(TransferCode::timeout);
334  }
335  } else {
336  return HandleErrors();
337  }
338  }
339 
340  return static_cast<int>(TransferCode::error);
341  }
342 
343  void* GetSocket() const { return fSocket; }
344 
345  void Close() override
346  {
347  // LOG(debug) << "Closing socket " << fId;
348 
349  if (fSocket == nullptr) {
350  return;
351  }
352 
353  if (zmq_close(fSocket) != 0) {
354  LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno);
355  }
356 
357  fSocket = nullptr;
358  }
359 
360  void SetOption(const std::string& option, const void* value, size_t valueSize) override
361  {
362  if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
363  LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno);
364  }
365  }
366 
367  void GetOption(const std::string& option, void* value, size_t* valueSize) override
368  {
369  if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
370  LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno);
371  }
372  }
373 
374  void SetLinger(const int value) override
375  {
376  if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
377  throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
378  }
379  }
380 
381  void Events(uint32_t* events) override
382  {
383  size_t eventsSize = sizeof(uint32_t);
384  if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
385  throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
386  }
387  }
388 
389  int GetLinger() const override
390  {
391  int value = 0;
392  size_t valueSize = sizeof(value);
393  if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
394  throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
395  }
396  return value;
397  }
398 
399  void SetSndBufSize(const int value) override
400  {
401  if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
402  throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
403  }
404  }
405 
406  int GetSndBufSize() const override
407  {
408  int value = 0;
409  size_t valueSize = sizeof(value);
410  if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
411  throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
412  }
413  return value;
414  }
415 
416  void SetRcvBufSize(const int value) override
417  {
418  if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) {
419  throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
420  }
421  }
422 
423  int GetRcvBufSize() const override
424  {
425  int value = 0;
426  size_t valueSize = sizeof(value);
427  if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
428  throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
429  }
430  return value;
431  }
432 
433  void SetSndKernelSize(const int value) override
434  {
435  if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) {
436  throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
437  }
438  }
439 
440  int GetSndKernelSize() const override
441  {
442  int value = 0;
443  size_t valueSize = sizeof(value);
444  if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
445  throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
446  }
447  return value;
448  }
449 
450  void SetRcvKernelSize(const int value) override
451  {
452  if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) {
453  throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
454  }
455  }
456 
457  int GetRcvKernelSize() const override
458  {
459  int value = 0;
460  size_t valueSize = sizeof(value);
461  if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
462  throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
463  }
464  return value;
465  }
466 
467  unsigned long GetBytesTx() const override { return fBytesTx; }
468  unsigned long GetBytesRx() const override { return fBytesRx; }
469  unsigned long GetMessagesTx() const override { return fMessagesTx; }
470  unsigned long GetMessagesRx() const override { return fMessagesRx; }
471 
472  static int GetConstant(const std::string& constant)
473  {
474  if (constant == "") return 0;
475  if (constant == "sub") return ZMQ_SUB;
476  if (constant == "pub") return ZMQ_PUB;
477  if (constant == "xsub") return ZMQ_XSUB;
478  if (constant == "xpub") return ZMQ_XPUB;
479  if (constant == "push") return ZMQ_PUSH;
480  if (constant == "pull") return ZMQ_PULL;
481  if (constant == "req") return ZMQ_REQ;
482  if (constant == "rep") return ZMQ_REP;
483  if (constant == "dealer") return ZMQ_DEALER;
484  if (constant == "router") return ZMQ_ROUTER;
485  if (constant == "pair") return ZMQ_PAIR;
486 
487  if (constant == "snd-hwm") return ZMQ_SNDHWM;
488  if (constant == "rcv-hwm") return ZMQ_RCVHWM;
489  if (constant == "snd-size") return ZMQ_SNDBUF;
490  if (constant == "rcv-size") return ZMQ_RCVBUF;
491  if (constant == "snd-more") return ZMQ_SNDMORE;
492  if (constant == "rcv-more") return ZMQ_RCVMORE;
493 
494  if (constant == "linger") return ZMQ_LINGER;
495  if (constant == "no-block") return ZMQ_DONTWAIT;
496  if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
497 
498  if (constant == "fd") return ZMQ_FD;
499  if (constant == "events")
500  return ZMQ_EVENTS;
501  if (constant == "pollin")
502  return ZMQ_POLLIN;
503  if (constant == "pollout")
504  return ZMQ_POLLOUT;
505 
506  throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
507  }
508 
509  ~Socket() override { Close(); }
510 
511  private:
512  void* fSocket;
513  Manager& fManager;
514  std::string fId;
515  std::atomic<unsigned long> fBytesTx;
516  std::atomic<unsigned long> fBytesRx;
517  std::atomic<unsigned long> fMessagesTx;
518  std::atomic<unsigned long> fMessagesRx;
519 
520  int fTimeout;
521 };
522 
523 } // namespace fair::mq::shmem
524 
525 #endif /* FAIR_MQ_SHMEM_SOCKET_H_ */
FairMQSocket
Definition: FairMQSocket.h:36
fair::mq::shmem::Socket
Definition: Socket.h:44
fair::mq::shmem::Socket::Events
void Events(uint32_t *events) override
Definition: Socket.h:381
fair::mq::shmem::Manager
Definition: Manager.h:61
fair::mq::SocketError
Definition: FairMQSocket.h:92
fair::mq::shmem::ZMsg
Definition: Socket.h:31
fair::mq::shmem::MetaHeader
Definition: Common.h:132
fair::mq::shmem::Message
Definition: Message.h:39
fair::mq::shmem
Definition: Common.h:33
FairMQTransportFactory
Definition: FairMQTransportFactory.h:30

privacy