FairMQ  1.4.33
C++ Message Queuing Library and Framework
Socket.h
1 /********************************************************************************
2  * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_ZMQ_SOCKET_H
10 #define FAIR_MQ_ZMQ_SOCKET_H
11 
12 #include <FairMQLogger.h>
13 #include <FairMQMessage.h>
14 #include <FairMQSocket.h>
15 #include <fairmq/tools/Strings.h>
16 #include <fairmq/zeromq/Context.h>
17 #include <fairmq/zeromq/Message.h>
18 
19 #include <zmq.h>
20 
21 #include <atomic>
22 #include <memory> // unique_ptr, make_unique
23 
24 namespace fair::mq::zmq
25 {
26 
27 class Socket final : public fair::mq::Socket
28 {
29  public:
30  Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id, FairMQTransportFactory* factory = nullptr)
31  : fair::mq::Socket(factory)
32  , fCtx(ctx)
33  , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type)))
34  , fId(id + "." + name + "." + type)
35  , fBytesTx(0)
36  , fBytesRx(0)
37  , fMessagesTx(0)
38  , fMessagesRx(0)
39  , fTimeout(100)
40  {
41  if (fSocket == nullptr) {
42  LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
43  throw SocketError(tools::ToString("Unavailable transport requested: ", type));
44  }
45 
46  if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) {
47  LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno);
48  }
49 
50  // Tell socket to try and send/receive outstanding messages for <linger> milliseconds before
51  // terminating. Default value for ZeroMQ is -1, which is to wait forever.
52  int linger = 1000;
53  if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
54  LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
55  }
56 
57  if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
58  LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
59  }
60 
61  if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
62  LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
63  }
64 
65  if (type == "sub") {
66  if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) {
67  LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
68  }
69  }
70 
71  LOG(debug) << "Created socket " << GetId();
72  }
73 
74  Socket(const Socket&) = delete;
75  Socket operator=(const Socket&) = delete;
76 
77  std::string GetId() const override { return fId; }
78 
79  bool Bind(const std::string& address) override
80  {
81  // LOG(debug) << "Binding socket " << fId << " on " << address;
82 
83  if (zmq_bind(fSocket, address.c_str()) != 0) {
84  if (errno == EADDRINUSE) {
85  // do not print error in this case, this is handled by FairMQDevice in case no
86  // connection could be established after trying a number of random ports from a range.
87  return false;
88  }
89  LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
90  return false;
91  }
92 
93  return true;
94  }
95 
96  bool Connect(const std::string& address) override
97  {
98  // LOG(debug) << "Connecting socket " << fId << " on " << address;
99 
100  if (zmq_connect(fSocket, address.c_str()) != 0) {
101  LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
102  return false;
103  }
104 
105  return true;
106  }
107 
108  bool ShouldRetry(int flags, int timeout, int& elapsed) const
109  {
110  if ((flags & ZMQ_DONTWAIT) == 0) {
111  if (timeout > 0) {
112  elapsed += fTimeout;
113  if (elapsed >= timeout) {
114  return false;
115  }
116  }
117  return true;
118  } else {
119  return false;
120  }
121  }
122 
123  int HandleErrors() const
124  {
125  if (zmq_errno() == ETERM) {
126  LOG(debug) << "Terminating socket " << fId;
127  return static_cast<int>(TransferCode::error);
128  } else {
129  LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
130  return static_cast<int>(TransferCode::error);
131  }
132  }
133 
134  int64_t Send(MessagePtr& msg, const int timeout = -1) override
135  {
136  int flags = 0;
137  if (timeout == 0) {
138  flags = ZMQ_DONTWAIT;
139  }
140  int elapsed = 0;
141 
142  int64_t actualBytes = zmq_msg_size(static_cast<Message*>(msg.get())->GetMessage());
143 
144  while (true) {
145  int nbytes = zmq_msg_send(static_cast<Message*>(msg.get())->GetMessage(), fSocket, flags);
146  if (nbytes >= 0) {
147  fBytesTx += actualBytes;
148  ++fMessagesTx;
149  return actualBytes;
150  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
151  if (fCtx.Interrupted()) {
152  return static_cast<int>(TransferCode::interrupted);
153  } else if (ShouldRetry(flags, timeout, elapsed)) {
154  continue;
155  } else {
156  return static_cast<int>(TransferCode::timeout);
157  }
158  } else {
159  return HandleErrors();
160  }
161  }
162  }
163 
164  int64_t Receive(MessagePtr& msg, const int timeout = -1) override
165  {
166  int flags = 0;
167  if (timeout == 0) {
168  flags = ZMQ_DONTWAIT;
169  }
170  int elapsed = 0;
171 
172  while (true) {
173  int nbytes = zmq_msg_recv(static_cast<Message*>(msg.get())->GetMessage(), fSocket, flags);
174  if (nbytes >= 0) {
175  static_cast<Message*>(msg.get())->Realign();
176  int64_t actualBytes = zmq_msg_size(static_cast<Message*>(msg.get())->GetMessage());
177  fBytesRx += actualBytes;
178  ++fMessagesRx;
179  return actualBytes;
180  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
181  if (fCtx.Interrupted()) {
182  return static_cast<int>(TransferCode::interrupted);
183  } else if (ShouldRetry(flags, timeout, elapsed)) {
184  continue;
185  } else {
186  return static_cast<int>(TransferCode::timeout);
187  }
188  } else {
189  return HandleErrors();
190  }
191  }
192  }
193 
194  int64_t Send(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec, const int timeout = -1) override
195  {
196  int flags = 0;
197  if (timeout == 0) {
198  flags = ZMQ_DONTWAIT;
199  }
200 
201  const unsigned int vecSize = msgVec.size();
202 
203  // Sending vector typicaly handles more then one part
204  if (vecSize > 1) {
205  int elapsed = 0;
206 
207  while (true) {
208  int64_t totalSize = 0;
209  bool repeat = false;
210 
211  for (unsigned int i = 0; i < vecSize; ++i) {
212  int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
213  if (nbytes >= 0) {
214  totalSize += nbytes;
215  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
216  if (fCtx.Interrupted()) {
217  return static_cast<int>(TransferCode::interrupted);
218  } else if (ShouldRetry(flags, timeout, elapsed)) {
219  repeat = true;
220  break;
221  } else {
222  return static_cast<int>(TransferCode::timeout);
223  }
224  } else {
225  return HandleErrors();
226  }
227  }
228 
229  if (repeat) {
230  continue;
231  }
232 
233  // store statistics on how many messages have been sent (handle all parts as a single message)
234  ++fMessagesTx;
235  fBytesTx += totalSize;
236  return totalSize;
237  }
238  } else if (vecSize == 1) { // If there's only one part, send it as a regular message
239  return Send(msgVec.back(), timeout);
240  } else { // if the vector is empty, something might be wrong
241  LOG(warn) << "Will not send empty vector";
242  return static_cast<int>(TransferCode::error);
243  }
244  }
245 
246  int64_t Receive(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec, const int timeout = -1) override
247  {
248  int flags = 0;
249  if (timeout == 0) {
250  flags = ZMQ_DONTWAIT;
251  }
252  int elapsed = 0;
253 
254  while (true) {
255  int64_t totalSize = 0;
256  int more = 0;
257  bool repeat = false;
258 
259  do {
260  FairMQMessagePtr part = std::make_unique<Message>(GetTransport());
261 
262  int nbytes = zmq_msg_recv(static_cast<Message*>(part.get())->GetMessage(), fSocket, flags);
263  if (nbytes >= 0) {
264  static_cast<Message*>(part.get())->Realign();
265  msgVec.push_back(move(part));
266  totalSize += nbytes;
267  } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
268  if (fCtx.Interrupted()) {
269  return static_cast<int>(TransferCode::interrupted);
270  } else if (ShouldRetry(flags, timeout, elapsed)) {
271  repeat = true;
272  break;
273  } else {
274  return static_cast<int>(TransferCode::timeout);
275  }
276  } else {
277  return HandleErrors();
278  }
279 
280  size_t moreSize = sizeof(more);
281  zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize);
282  } while (more);
283 
284  if (repeat) {
285  continue;
286  }
287 
288  // store statistics on how many messages have been received (handle all parts as a single message)
289  ++fMessagesRx;
290  fBytesRx += totalSize;
291  return totalSize;
292  }
293  }
294 
295  void* GetSocket() const { return fSocket; }
296 
297  void Close() override
298  {
299  // LOG(debug) << "Closing socket " << fId;
300 
301  if (fSocket == nullptr) {
302  return;
303  }
304 
305  if (zmq_close(fSocket) != 0) {
306  LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno);
307  }
308 
309  fSocket = nullptr;
310  }
311 
312  void SetOption(const std::string& option, const void* value, size_t valueSize) override
313  {
314  if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
315  LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno);
316  }
317  }
318 
319  void GetOption(const std::string& option, void* value, size_t* valueSize) override
320  {
321  if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
322  LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno);
323  }
324  }
325 
326  void Events(uint32_t* events) override
327  {
328  size_t eventsSize = sizeof(uint32_t);
329  if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
330  throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
331  }
332  }
333 
334  void SetLinger(const int value) override
335  {
336  if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
337  throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
338  }
339  }
340 
341  int GetLinger() const override
342  {
343  int value = 0;
344  size_t valueSize = sizeof(value);
345  if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
346  throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
347  }
348  return value;
349  }
350 
351  void SetSndBufSize(const int value) override
352  {
353  if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
354  throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
355  }
356  }
357 
358  int GetSndBufSize() const override
359  {
360  int value = 0;
361  size_t valueSize = sizeof(value);
362  if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
363  throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
364  }
365  return value;
366  }
367 
368  void SetRcvBufSize(const int value) override
369  {
370  if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) {
371  throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
372  }
373  }
374 
375  int GetRcvBufSize() const override
376  {
377  int value = 0;
378  size_t valueSize = sizeof(value);
379  if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
380  throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
381  }
382  return value;
383  }
384 
385  void SetSndKernelSize(const int value) override
386  {
387  if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) {
388  throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
389  }
390  }
391 
392  int GetSndKernelSize() const override
393  {
394  int value = 0;
395  size_t valueSize = sizeof(value);
396  if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
397  throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
398  }
399  return value;
400  }
401 
402  void SetRcvKernelSize(const int value) override
403  {
404  if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) {
405  throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
406  }
407  }
408 
409  int GetRcvKernelSize() const override
410  {
411  int value = 0;
412  size_t valueSize = sizeof(value);
413  if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
414  throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
415  }
416  return value;
417  }
418 
419  unsigned long GetBytesTx() const override { return fBytesTx; }
420  unsigned long GetBytesRx() const override { return fBytesRx; }
421  unsigned long GetMessagesTx() const override { return fMessagesTx; }
422  unsigned long GetMessagesRx() const override { return fMessagesRx; }
423 
424  static int GetConstant(const std::string& constant)
425  {
426  if (constant == "") return 0;
427  if (constant == "sub") return ZMQ_SUB;
428  if (constant == "pub") return ZMQ_PUB;
429  if (constant == "xsub") return ZMQ_XSUB;
430  if (constant == "xpub") return ZMQ_XPUB;
431  if (constant == "push") return ZMQ_PUSH;
432  if (constant == "pull") return ZMQ_PULL;
433  if (constant == "req") return ZMQ_REQ;
434  if (constant == "rep") return ZMQ_REP;
435  if (constant == "dealer") return ZMQ_DEALER;
436  if (constant == "router") return ZMQ_ROUTER;
437  if (constant == "pair") return ZMQ_PAIR;
438 
439  if (constant == "snd-hwm") return ZMQ_SNDHWM;
440  if (constant == "rcv-hwm") return ZMQ_RCVHWM;
441  if (constant == "snd-size") return ZMQ_SNDBUF;
442  if (constant == "rcv-size") return ZMQ_RCVBUF;
443  if (constant == "snd-more") return ZMQ_SNDMORE;
444  if (constant == "rcv-more") return ZMQ_RCVMORE;
445 
446  if (constant == "linger") return ZMQ_LINGER;
447 
448  if (constant == "fd") return ZMQ_FD;
449  if (constant == "events")
450  return ZMQ_EVENTS;
451  if (constant == "pollin")
452  return ZMQ_POLLIN;
453  if (constant == "pollout")
454  return ZMQ_POLLOUT;
455 
456  throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
457  }
458 
459  ~Socket() override { Close(); }
460 
461  private:
462  Context& fCtx;
463  void* fSocket;
464  std::string fId;
465  std::atomic<unsigned long> fBytesTx;
466  std::atomic<unsigned long> fBytesRx;
467  std::atomic<unsigned long> fMessagesTx;
468  std::atomic<unsigned long> fMessagesRx;
469 
470  int fTimeout;
471 };
472 
473 } // namespace fair::mq::zmq
474 
475 #endif /* FAIR_MQ_ZMQ_SOCKET_H */
FairMQSocket
Definition: FairMQSocket.h:36
fair::mq::zmq::Socket
Definition: Socket.h:34
fair::mq::SocketError
Definition: FairMQSocket.h:92
fair::mq::zmq::Socket::Events
void Events(uint32_t *events) override
Definition: Socket.h:338
FairMQTransportFactory
Definition: FairMQTransportFactory.h:30

privacy