FairMQ  1.4.33
C++ Message Queuing Library and Framework
Poller.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_ZMQ_POLLER_H
10 #define FAIR_MQ_ZMQ_POLLER_H
11 
12 #include <fairmq/zeromq/Socket.h>
13 #include <fairmq/tools/Strings.h>
14 #include <FairMQChannel.h>
15 #include <FairMQLogger.h>
16 #include <FairMQPoller.h>
17 
18 #include <zmq.h>
19 
20 #include <unordered_map>
21 #include <vector>
22 
23 namespace fair::mq::zmq
24 {
25 
26 class Poller final : public fair::mq::Poller
27 {
28  public:
29  Poller(const std::vector<FairMQChannel>& channels)
30  : fItems()
31  , fNumItems(0)
32  , fOffsetMap()
33  {
34  fNumItems = channels.size();
35  fItems = new zmq_pollitem_t[fNumItems]; // TODO: fix me
36 
37  for (int i = 0; i < fNumItems; ++i) {
38  fItems[i].socket = static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket();
39  fItems[i].fd = 0;
40  fItems[i].revents = 0;
41 
42  int type = 0;
43  size_t size = sizeof(type);
44  zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
45 
46  SetItemEvents(fItems[i], type);
47  }
48  }
49 
50  Poller(const std::vector<FairMQChannel*>& channels)
51  : fItems()
52  , fNumItems(0)
53  , fOffsetMap()
54  {
55  fNumItems = channels.size();
56  fItems = new zmq_pollitem_t[fNumItems];
57 
58  for (int i = 0; i < fNumItems; ++i) {
59  fItems[i].socket = static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket();
60  fItems[i].fd = 0;
61  fItems[i].revents = 0;
62 
63  int type = 0;
64  size_t size = sizeof(type);
65  zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
66 
67  SetItemEvents(fItems[i], type);
68  }
69  }
70 
71  Poller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList)
72  : fItems()
73  , fNumItems(0)
74  , fOffsetMap()
75  {
76  try {
77  int offset = 0;
78  // calculate offsets and the total size of the poll item set
79  for (std::string channel : channelList) {
80  fOffsetMap[channel] = offset;
81  offset += channelsMap.at(channel).size();
82  fNumItems += channelsMap.at(channel).size();
83  }
84 
85  fItems = new zmq_pollitem_t[fNumItems];
86 
87  int index = 0;
88  for (std::string channel : channelList) {
89  for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) {
90  index = fOffsetMap[channel] + i;
91 
92  fItems[index].socket = static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
93  fItems[index].fd = 0;
94  fItems[index].revents = 0;
95 
96  int type = 0;
97  size_t size = sizeof(type);
98  zmq_getsockopt(static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
99 
100  SetItemEvents(fItems[index], type);
101  }
102  }
103  } catch (const std::out_of_range& oor) {
104  LOG(error) << "at least one of the provided channel keys for poller initialization is invalid";
105  LOG(error) << "out of range error: " << oor.what();
106  throw fair::mq::PollerError(fair::mq::tools::ToString("At least one of the provided channel keys for poller initialization is invalid. ", "Out of range error: ", oor.what()));
107  }
108  }
109 
110  Poller(const Poller&) = delete;
111  Poller operator=(const Poller&) = delete;
112 
113  void SetItemEvents(zmq_pollitem_t& item, const int type)
114  {
115  if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) {
116  item.events = ZMQ_POLLIN | ZMQ_POLLOUT;
117  } else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) {
118  item.events = ZMQ_POLLOUT;
119  } else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) {
120  item.events = ZMQ_POLLIN;
121  } else {
122  LOG(error) << "invalid poller configuration, exiting.";
123  throw fair::mq::PollerError("Invalid poller configuration, exiting.");
124  }
125  }
126 
127  void Poll(const int timeout) override
128  {
129  while (true) {
130  if (zmq_poll(fItems, fNumItems, timeout) < 0) {
131  if (errno == ETERM) {
132  LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
133  return;
134  } else if (errno == EINTR) {
135  LOG(debug) << "polling interrupted by system call";
136  continue;
137  } else {
138  LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
139  throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
140  }
141  }
142  break;
143  }
144  }
145 
146  bool CheckInput(const int index) override
147  {
148  if (fItems[index].revents & ZMQ_POLLIN) {
149  return true;
150  }
151 
152  return false;
153  }
154 
155  bool CheckOutput(const int index) override
156  {
157  if (fItems[index].revents & ZMQ_POLLOUT) {
158  return true;
159  }
160 
161  return false;
162  }
163 
164  bool CheckInput(const std::string& channelKey, const int index) override
165  {
166  try {
167  if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) {
168  return true;
169  }
170 
171  return false;
172  } catch (const std::out_of_range& oor) {
173  LOG(error) << "invalid channel key: '" << channelKey << "'";
174  LOG(error) << "out of range error: " << oor.what();
175  throw fair::mq::PollerError(fair::mq::tools::ToString("Invalid channel key '", channelKey, "'. Out of range error: ", oor.what()));
176  }
177  }
178 
179  bool CheckOutput(const std::string& channelKey, const int index) override
180  {
181  try {
182  if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) {
183  return true;
184  }
185 
186  return false;
187  } catch (const std::out_of_range& oor) {
188  LOG(error) << "invalid channel key: '" << channelKey << "'";
189  LOG(error) << "out of range error: " << oor.what();
190  throw fair::mq::PollerError(fair::mq::tools::ToString("Invalid channel key '", channelKey, "'. Out of range error: ", oor.what()));
191  }
192  }
193 
194  ~Poller() override { delete[] fItems; }
195 
196  private:
197  zmq_pollitem_t* fItems;
198  int fNumItems;
199 
200  std::unordered_map<std::string, int> fOffsetMap;
201 };
202 
203 } // namespace fair::mq::zmq
204 
205 #endif /* FAIR_MQ_ZMQ_POLLER_H */
fair::mq::PollerError
Definition: FairMQPoller.h:34
fair::mq::zmq::Poller
Definition: Poller.h:33
fair::mq::zmq::Socket
Definition: Socket.h:34
FairMQPoller
Definition: FairMQPoller.h:16

privacy