9 #ifndef FAIR_MQ_ZMQ_SOCKET_H
10 #define FAIR_MQ_ZMQ_SOCKET_H
12 #include <FairMQLogger.h>
13 #include <FairMQMessage.h>
14 #include <FairMQSocket.h>
15 #include <fairmq/tools/Strings.h>
16 #include <fairmq/zeromq/Context.h>
17 #include <fairmq/zeromq/Message.h>
24 namespace fair::mq::zmq
30 Socket(Context& ctx,
const std::string& type,
const std::string& name,
const std::string&
id,
FairMQTransportFactory* factory =
nullptr)
31 : fair::mq::Socket(factory)
33 , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type)))
34 , fId(id +
"." + name +
"." + type)
41 if (fSocket ==
nullptr) {
42 LOG(error) <<
"Failed creating socket " << fId <<
", reason: " << zmq_strerror(errno);
43 throw SocketError(tools::ToString(
"Unavailable transport requested: ", type));
46 if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) {
47 LOG(error) <<
"Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno);
53 if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger,
sizeof(linger)) != 0) {
54 LOG(error) <<
"Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
57 if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fTimeout,
sizeof(fTimeout)) != 0) {
58 LOG(error) <<
"Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
61 if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fTimeout,
sizeof(fTimeout)) != 0) {
62 LOG(error) <<
"Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
66 if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE,
nullptr, 0) != 0) {
67 LOG(error) <<
"Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
71 LOG(debug) <<
"Created socket " << GetId();
77 std::string GetId()
const override {
return fId; }
79 bool Bind(
const std::string& address)
override
83 if (zmq_bind(fSocket, address.c_str()) != 0) {
84 if (errno == EADDRINUSE) {
89 LOG(error) <<
"Failed binding socket " << fId <<
", address: " << address <<
", reason: " << zmq_strerror(errno);
96 bool Connect(
const std::string& address)
override
100 if (zmq_connect(fSocket, address.c_str()) != 0) {
101 LOG(error) <<
"Failed connecting socket " << fId <<
", address: " << address <<
", reason: " << zmq_strerror(errno);
108 bool ShouldRetry(
int flags,
int timeout,
int& elapsed)
const
110 if ((flags & ZMQ_DONTWAIT) == 0) {
113 if (elapsed >= timeout) {
123 int HandleErrors()
const
125 if (zmq_errno() == ETERM) {
126 LOG(debug) <<
"Terminating socket " << fId;
127 return static_cast<int>(TransferCode::error);
129 LOG(error) <<
"Failed transfer on socket " << fId <<
", errno: " << errno <<
", reason: " << zmq_strerror(errno);
130 return static_cast<int>(TransferCode::error);
134 int64_t Send(MessagePtr& msg,
const int timeout = -1)
override
138 flags = ZMQ_DONTWAIT;
142 int64_t actualBytes = zmq_msg_size(
static_cast<Message*
>(msg.get())->GetMessage());
145 int nbytes = zmq_msg_send(
static_cast<Message*
>(msg.get())->GetMessage(), fSocket, flags);
147 fBytesTx += actualBytes;
150 }
else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
151 if (fCtx.Interrupted()) {
152 return static_cast<int>(TransferCode::interrupted);
153 }
else if (ShouldRetry(flags, timeout, elapsed)) {
156 return static_cast<int>(TransferCode::timeout);
159 return HandleErrors();
164 int64_t Receive(MessagePtr& msg,
const int timeout = -1)
override
168 flags = ZMQ_DONTWAIT;
173 int nbytes = zmq_msg_recv(
static_cast<Message*
>(msg.get())->GetMessage(), fSocket, flags);
175 static_cast<Message*
>(msg.get())->Realign();
176 int64_t actualBytes = zmq_msg_size(
static_cast<Message*
>(msg.get())->GetMessage());
177 fBytesRx += actualBytes;
180 }
else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
181 if (fCtx.Interrupted()) {
182 return static_cast<int>(TransferCode::interrupted);
183 }
else if (ShouldRetry(flags, timeout, elapsed)) {
186 return static_cast<int>(TransferCode::timeout);
189 return HandleErrors();
194 int64_t Send(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec,
const int timeout = -1)
override
198 flags = ZMQ_DONTWAIT;
201 const unsigned int vecSize = msgVec.size();
208 int64_t totalSize = 0;
211 for (
unsigned int i = 0; i < vecSize; ++i) {
212 int nbytes = zmq_msg_send(
static_cast<Message*
>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
215 }
else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
216 if (fCtx.Interrupted()) {
217 return static_cast<int>(TransferCode::interrupted);
218 }
else if (ShouldRetry(flags, timeout, elapsed)) {
222 return static_cast<int>(TransferCode::timeout);
225 return HandleErrors();
235 fBytesTx += totalSize;
238 }
else if (vecSize == 1) {
239 return Send(msgVec.back(), timeout);
241 LOG(warn) <<
"Will not send empty vector";
242 return static_cast<int>(TransferCode::error);
246 int64_t Receive(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec,
const int timeout = -1)
override
250 flags = ZMQ_DONTWAIT;
255 int64_t totalSize = 0;
260 FairMQMessagePtr part = std::make_unique<Message>(GetTransport());
262 int nbytes = zmq_msg_recv(
static_cast<Message*
>(part.get())->GetMessage(), fSocket, flags);
264 static_cast<Message*
>(part.get())->Realign();
265 msgVec.push_back(move(part));
267 }
else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
268 if (fCtx.Interrupted()) {
269 return static_cast<int>(TransferCode::interrupted);
270 }
else if (ShouldRetry(flags, timeout, elapsed)) {
274 return static_cast<int>(TransferCode::timeout);
277 return HandleErrors();
280 size_t moreSize =
sizeof(more);
281 zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize);
290 fBytesRx += totalSize;
295 void* GetSocket()
const {
return fSocket; }
297 void Close()
override
301 if (fSocket ==
nullptr) {
305 if (zmq_close(fSocket) != 0) {
306 LOG(error) <<
"Failed closing socket " << fId <<
", reason: " << zmq_strerror(errno);
312 void SetOption(
const std::string& option,
const void* value,
size_t valueSize)
override
314 if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
315 LOG(error) <<
"Failed setting socket option, reason: " << zmq_strerror(errno);
319 void GetOption(
const std::string& option,
void* value,
size_t* valueSize)
override
321 if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
322 LOG(error) <<
"Failed getting socket option, reason: " << zmq_strerror(errno);
326 void Events(uint32_t* events)
override
328 size_t eventsSize =
sizeof(uint32_t);
329 if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
330 throw SocketError(tools::ToString(
"failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
334 void SetLinger(
const int value)
override
336 if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value,
sizeof(value)) < 0) {
337 throw SocketError(tools::ToString(
"failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
341 int GetLinger()
const override
344 size_t valueSize =
sizeof(value);
345 if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
346 throw SocketError(tools::ToString(
"failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
351 void SetSndBufSize(
const int value)
override
353 if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value,
sizeof(value)) < 0) {
354 throw SocketError(tools::ToString(
"failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
358 int GetSndBufSize()
const override
361 size_t valueSize =
sizeof(value);
362 if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
363 throw SocketError(tools::ToString(
"failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
368 void SetRcvBufSize(
const int value)
override
370 if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value,
sizeof(value)) < 0) {
371 throw SocketError(tools::ToString(
"failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
375 int GetRcvBufSize()
const override
378 size_t valueSize =
sizeof(value);
379 if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
380 throw SocketError(tools::ToString(
"failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
385 void SetSndKernelSize(
const int value)
override
387 if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value,
sizeof(value)) < 0) {
388 throw SocketError(tools::ToString(
"failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
392 int GetSndKernelSize()
const override
395 size_t valueSize =
sizeof(value);
396 if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
397 throw SocketError(tools::ToString(
"failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
402 void SetRcvKernelSize(
const int value)
override
404 if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value,
sizeof(value)) < 0) {
405 throw SocketError(tools::ToString(
"failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
409 int GetRcvKernelSize()
const override
412 size_t valueSize =
sizeof(value);
413 if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
414 throw SocketError(tools::ToString(
"failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
419 unsigned long GetBytesTx()
const override {
return fBytesTx; }
420 unsigned long GetBytesRx()
const override {
return fBytesRx; }
421 unsigned long GetMessagesTx()
const override {
return fMessagesTx; }
422 unsigned long GetMessagesRx()
const override {
return fMessagesRx; }
424 static int GetConstant(
const std::string& constant)
426 if (constant ==
"")
return 0;
427 if (constant ==
"sub")
return ZMQ_SUB;
428 if (constant ==
"pub")
return ZMQ_PUB;
429 if (constant ==
"xsub")
return ZMQ_XSUB;
430 if (constant ==
"xpub")
return ZMQ_XPUB;
431 if (constant ==
"push")
return ZMQ_PUSH;
432 if (constant ==
"pull")
return ZMQ_PULL;
433 if (constant ==
"req")
return ZMQ_REQ;
434 if (constant ==
"rep")
return ZMQ_REP;
435 if (constant ==
"dealer")
return ZMQ_DEALER;
436 if (constant ==
"router")
return ZMQ_ROUTER;
437 if (constant ==
"pair")
return ZMQ_PAIR;
439 if (constant ==
"snd-hwm")
return ZMQ_SNDHWM;
440 if (constant ==
"rcv-hwm")
return ZMQ_RCVHWM;
441 if (constant ==
"snd-size")
return ZMQ_SNDBUF;
442 if (constant ==
"rcv-size")
return ZMQ_RCVBUF;
443 if (constant ==
"snd-more")
return ZMQ_SNDMORE;
444 if (constant ==
"rcv-more")
return ZMQ_RCVMORE;
446 if (constant ==
"linger")
return ZMQ_LINGER;
448 if (constant ==
"fd")
return ZMQ_FD;
449 if (constant ==
"events")
451 if (constant ==
"pollin")
453 if (constant ==
"pollout")
456 throw SocketError(tools::ToString(
"GetConstant called with an invalid argument: ", constant));
459 ~Socket()
override { Close(); }
465 std::atomic<unsigned long> fBytesTx;
466 std::atomic<unsigned long> fBytesRx;
467 std::atomic<unsigned long> fMessagesTx;
468 std::atomic<unsigned long> fMessagesRx;