9 #ifndef FAIR_MQ_ZMQ_POLLER_H
10 #define FAIR_MQ_ZMQ_POLLER_H
12 #include <fairmq/zeromq/Socket.h>
13 #include <fairmq/tools/Strings.h>
14 #include <FairMQChannel.h>
15 #include <FairMQLogger.h>
16 #include <FairMQPoller.h>
20 #include <unordered_map>
23 namespace fair::mq::zmq
29 Poller(
const std::vector<FairMQChannel>& channels)
34 fNumItems = channels.size();
35 fItems =
new zmq_pollitem_t[fNumItems];
37 for (
int i = 0; i < fNumItems; ++i) {
38 fItems[i].socket =
static_cast<const Socket*
>(&(channels.at(i).GetSocket()))->GetSocket();
40 fItems[i].revents = 0;
43 size_t size =
sizeof(type);
44 zmq_getsockopt(
static_cast<const Socket*
>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
46 SetItemEvents(fItems[i], type);
50 Poller(
const std::vector<FairMQChannel*>& channels)
55 fNumItems = channels.size();
56 fItems =
new zmq_pollitem_t[fNumItems];
58 for (
int i = 0; i < fNumItems; ++i) {
59 fItems[i].socket =
static_cast<const Socket*
>(&(channels.at(i)->GetSocket()))->GetSocket();
61 fItems[i].revents = 0;
64 size_t size =
sizeof(type);
65 zmq_getsockopt(
static_cast<const Socket*
>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
67 SetItemEvents(fItems[i], type);
71 Poller(
const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap,
const std::vector<std::string>& channelList)
79 for (std::string channel : channelList) {
80 fOffsetMap[channel] = offset;
81 offset += channelsMap.at(channel).size();
82 fNumItems += channelsMap.at(channel).size();
85 fItems =
new zmq_pollitem_t[fNumItems];
88 for (std::string channel : channelList) {
89 for (
unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) {
90 index = fOffsetMap[channel] + i;
92 fItems[index].socket =
static_cast<const Socket*
>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
94 fItems[index].revents = 0;
97 size_t size =
sizeof(type);
98 zmq_getsockopt(
static_cast<const Socket*
>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
100 SetItemEvents(fItems[index], type);
103 }
catch (
const std::out_of_range& oor) {
104 LOG(error) <<
"at least one of the provided channel keys for poller initialization is invalid";
105 LOG(error) <<
"out of range error: " << oor.what();
106 throw fair::mq::PollerError(fair::mq::tools::ToString(
"At least one of the provided channel keys for poller initialization is invalid. ",
"Out of range error: ", oor.what()));
110 Poller(
const Poller&) =
delete;
111 Poller operator=(
const Poller&) =
delete;
113 void SetItemEvents(zmq_pollitem_t& item,
const int type)
115 if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) {
116 item.events = ZMQ_POLLIN | ZMQ_POLLOUT;
117 }
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) {
118 item.events = ZMQ_POLLOUT;
119 }
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) {
120 item.events = ZMQ_POLLIN;
122 LOG(error) <<
"invalid poller configuration, exiting.";
127 void Poll(
const int timeout)
override
130 if (zmq_poll(fItems, fNumItems, timeout) < 0) {
131 if (errno == ETERM) {
132 LOG(debug) <<
"polling exited, reason: " << zmq_strerror(errno);
134 }
else if (errno == EINTR) {
135 LOG(debug) <<
"polling interrupted by system call";
138 LOG(error) <<
"polling failed, reason: " << zmq_strerror(errno);
139 throw fair::mq::PollerError(fair::mq::tools::ToString(
"Polling failed, reason: ", zmq_strerror(errno)));
146 bool CheckInput(
const int index)
override
148 if (fItems[index].revents & ZMQ_POLLIN) {
155 bool CheckOutput(
const int index)
override
157 if (fItems[index].revents & ZMQ_POLLOUT) {
164 bool CheckInput(
const std::string& channelKey,
const int index)
override
167 if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) {
172 }
catch (
const std::out_of_range& oor) {
173 LOG(error) <<
"invalid channel key: '" << channelKey <<
"'";
174 LOG(error) <<
"out of range error: " << oor.what();
175 throw fair::mq::PollerError(fair::mq::tools::ToString(
"Invalid channel key '", channelKey,
"'. Out of range error: ", oor.what()));
179 bool CheckOutput(
const std::string& channelKey,
const int index)
override
182 if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) {
187 }
catch (
const std::out_of_range& oor) {
188 LOG(error) <<
"invalid channel key: '" << channelKey <<
"'";
189 LOG(error) <<
"out of range error: " << oor.what();
190 throw fair::mq::PollerError(fair::mq::tools::ToString(
"Invalid channel key '", channelKey,
"'. Out of range error: ", oor.what()));
194 ~Poller()
override {
delete[] fItems; }
197 zmq_pollitem_t* fItems;
200 std::unordered_map<std::string, int> fOffsetMap;