FairMQ  1.4.33
C++ Message Queuing Library and Framework
Poller.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 #ifndef FAIR_MQ_SHMEM_POLLER_H_
9 #define FAIR_MQ_SHMEM_POLLER_H_
10 
11 #include "Socket.h"
12 #include <fairmq/tools/Strings.h>
13 #include <FairMQChannel.h>
14 #include <FairMQLogger.h>
15 #include <FairMQPoller.h>
16 
17 #include <zmq.h>
18 
19 #include <unordered_map>
20 #include <vector>
21 
22 class FairMQChannel;
23 
24 namespace fair::mq::shmem
25 {
26 
27 class Poller final : public fair::mq::Poller
28 {
29  public:
30  Poller(const std::vector<FairMQChannel>& channels)
31  : fItems()
32  , fNumItems(0)
33  , fOffsetMap()
34  {
35  fNumItems = channels.size();
36  fItems = new zmq_pollitem_t[fNumItems];
37 
38  for (int i = 0; i < fNumItems; ++i) {
39  fItems[i].socket = static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket();
40  fItems[i].fd = 0;
41  fItems[i].revents = 0;
42 
43  int type = 0;
44  size_t size = sizeof(type);
45  zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
46 
47  SetItemEvents(fItems[i], type);
48  }
49  }
50 
51  Poller(const std::vector<FairMQChannel*>& channels)
52  : fItems()
53  , fNumItems(0)
54  , fOffsetMap()
55  {
56  fNumItems = channels.size();
57  fItems = new zmq_pollitem_t[fNumItems];
58 
59  for (int i = 0; i < fNumItems; ++i) {
60  fItems[i].socket = static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket();
61  fItems[i].fd = 0;
62  fItems[i].revents = 0;
63 
64  int type = 0;
65  size_t size = sizeof(type);
66  zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
67 
68  SetItemEvents(fItems[i], type);
69  }
70  }
71 
72  Poller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList)
73  : fItems()
74  , fNumItems(0)
75  , fOffsetMap()
76  {
77  try {
78  int offset = 0;
79  // calculate offsets and the total size of the poll item set
80  for (std::string channel : channelList) {
81  fOffsetMap[channel] = offset;
82  offset += channelsMap.at(channel).size();
83  fNumItems += channelsMap.at(channel).size();
84  }
85 
86  fItems = new zmq_pollitem_t[fNumItems];
87 
88  int index = 0;
89  for (std::string channel : channelList) {
90  for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) {
91  index = fOffsetMap[channel] + i;
92 
93  fItems[index].socket = static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
94  fItems[index].fd = 0;
95  fItems[index].revents = 0;
96 
97  int type = 0;
98  size_t size = sizeof(type);
99  zmq_getsockopt(static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
100 
101  SetItemEvents(fItems[index], type);
102  }
103  }
104  } catch (const std::out_of_range& oor) {
105  LOG(error) << "At least one of the provided channel keys for poller initialization is invalid." << " Out of range error: " << oor.what();
106  throw fair::mq::PollerError(fair::mq::tools::ToString("At least one of the provided channel keys for poller initialization is invalid. ", "Out of range error: ", oor.what()));
107  }
108  }
109 
110  Poller(const Poller&) = delete;
111  Poller operator=(const Poller&) = delete;
112 
113  void SetItemEvents(zmq_pollitem_t& item, const int type)
114  {
115  if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) {
116  item.events = ZMQ_POLLIN | ZMQ_POLLOUT;
117  } else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) {
118  item.events = ZMQ_POLLOUT;
119  } else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) {
120  item.events = ZMQ_POLLIN;
121  } else {
122  LOG(error) << "invalid poller configuration, exiting.";
123  throw fair::mq::PollerError("Invalid poller configuration, exiting.");
124  }
125  }
126 
127  void Poll(const int timeout) override
128  {
129  while (true) {
130  if (zmq_poll(fItems, fNumItems, timeout) < 0) {
131  if (errno == ETERM) {
132  LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
133  return;
134  } else if (errno == EINTR) {
135  LOG(debug) << "polling interrupted by system call";
136  continue;
137  } else {
138  LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
139  throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
140  }
141  }
142  break;
143  }
144  }
145 
146  bool CheckInput(const int index) override
147  {
148  if (fItems[index].revents & ZMQ_POLLIN) {
149  return true;
150  }
151 
152  return false;
153  }
154 
155  bool CheckOutput(const int index) override
156  {
157  if (fItems[index].revents & ZMQ_POLLOUT) {
158  return true;
159  }
160 
161  return false;
162  }
163 
164  bool CheckInput(const std::string& channelKey, const int index) override
165  {
166  try {
167  if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) {
168  return true;
169  }
170 
171  return false;
172  } catch (const std::out_of_range& oor) {
173  LOG(error) << "invalid channel key: '" << channelKey << "'";
174  LOG(error) << "out of range error: " << oor.what();
175  throw fair::mq::PollerError(fair::mq::tools::ToString("Invalid channel key '", channelKey, "'. Out of range error: ", oor.what()));
176  }
177  }
178 
179  bool CheckOutput(const std::string& channelKey, const int index) override
180  {
181  try {
182  if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) {
183  return true;
184  }
185 
186  return false;
187  } catch (const std::out_of_range& oor) {
188  LOG(error) << "invalid channel key: '" << channelKey << "'";
189  LOG(error) << "out of range error: " << oor.what();
190  throw fair::mq::PollerError(fair::mq::tools::ToString("Invalid channel key '", channelKey, "'. Out of range error: ", oor.what()));
191  }
192  }
193 
194  ~Poller() override { delete[] fItems; }
195 
196  private:
197  zmq_pollitem_t* fItems;
198  int fNumItems;
199 
200  std::unordered_map<std::string, int> fOffsetMap;
201 };
202 
203 } // namespace fair::mq::shmem
204 
205 #endif /* FAIR_MQ_SHMEM_POLLER_H_ */
fair::mq::PollerError
Definition: FairMQPoller.h:34
fair::mq::shmem::Socket
Definition: Socket.h:44
FairMQPoller
Definition: FairMQPoller.h:16
FairMQChannel
Wrapper class for FairMQSocket and related methods.
Definition: FairMQChannel.h:35
fair::mq::shmem::Poller
Definition: Poller.h:28
fair::mq::shmem
Definition: Common.h:33

privacy