FairMQ  1.4.33
C++ Message Queuing Library and Framework
Context.h
1 /********************************************************************************
2  * Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_ZMQ_CONTEXT_H_
10 #define FAIR_MQ_ZMQ_CONTEXT_H_
11 
12 #include <fairmq/tools/Strings.h>
13 #include <FairMQLogger.h>
14 #include <FairMQUnmanagedRegion.h>
15 
16 #include <zmq.h>
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <functional>
21 #include <mutex>
22 #include <queue>
23 #include <stdexcept>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 namespace fair::mq::zmq
29 {
30 
31 struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; };
32 
33 class Context
34 {
35  public:
36  Context(int numIoThreads)
37  : fZmqCtx(zmq_ctx_new())
38  , fInterrupted(false)
39  , fRegionCounter(1)
40  {
41  if (!fZmqCtx) {
42  throw ContextError(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
43  }
44 
45  if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
46  LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
47  throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno)));
48  }
49 
50  if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
51  LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
52  throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno)));
53  }
54 
55  fRegionEvents.emplace(true, 0, nullptr, 0, 0, RegionEvent::local_only);
56  }
57 
58  Context(const Context&) = delete;
59  Context operator=(const Context&) = delete;
60 
61  void SubscribeToRegionEvents(RegionEventCallback callback)
62  {
63  if (fRegionEventThread.joinable()) {
64  LOG(debug) << "Already subscribed. Overwriting previous subscription.";
65  {
66  std::lock_guard<std::mutex> lock(fMtx);
67  fRegionEventsSubscriptionActive = false;
68  }
69  fRegionEventsCV.notify_one();
70  fRegionEventThread.join();
71  }
72  std::lock_guard<std::mutex> lock(fMtx);
73  fRegionEventCallback = callback;
74  fRegionEventsSubscriptionActive = true;
75  fRegionEventThread = std::thread(&Context::RegionEventsSubscription, this);
76  }
77 
78  bool SubscribedToRegionEvents() const { return fRegionEventThread.joinable(); }
79 
80  void UnsubscribeFromRegionEvents()
81  {
82  if (fRegionEventThread.joinable()) {
83  std::unique_lock<std::mutex> lock(fMtx);
84  fRegionEventsSubscriptionActive = false;
85  lock.unlock();
86  fRegionEventsCV.notify_one();
87  fRegionEventThread.join();
88  lock.lock();
89  fRegionEventCallback = nullptr;
90  }
91  }
92 
93  void RegionEventsSubscription()
94  {
95  std::unique_lock<std::mutex> lock(fMtx);
96  while (fRegionEventsSubscriptionActive) {
97 
98  while (!fRegionEvents.empty()) {
99  auto i = fRegionEvents.front();
100  fRegionEventCallback(i);
101  fRegionEvents.pop();
102  }
103  fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); });
104  }
105  }
106 
107  std::vector<RegionInfo> GetRegionInfo() const
108  {
109  std::lock_guard<std::mutex> lock(fMtx);
110  return fRegionInfos;
111  }
112 
113  uint16_t RegionCount() const
114  {
115  std::lock_guard<std::mutex> lock(fMtx);
116  return fRegionCounter;
117  }
118 
119  void AddRegion(bool managed, uint16_t id, void* ptr, size_t size, int64_t userFlags, RegionEvent event)
120  {
121  {
122  std::lock_guard<std::mutex> lock(fMtx);
123  ++fRegionCounter;
124  fRegionInfos.emplace_back(managed, id, ptr, size, userFlags, event);
125  fRegionEvents.emplace(managed, id, ptr, size, userFlags, event);
126  }
127  fRegionEventsCV.notify_one();
128  }
129 
130  void RemoveRegion(uint16_t id)
131  {
132  {
133  std::lock_guard<std::mutex> lock(fMtx);
134  auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const RegionInfo& i) {
135  return i.id == id;
136  });
137  if (it != fRegionInfos.end()) {
138  fRegionEvents.push(*it);
139  fRegionEvents.back().event = RegionEvent::destroyed;
140  fRegionInfos.erase(it);
141  } else {
142  LOG(error) << "RemoveRegion: given id (" << id << ") not found.";
143  }
144  }
145  fRegionEventsCV.notify_one();
146  }
147 
148  void Interrupt() { fInterrupted.store(true); }
149  void Resume() { fInterrupted.store(false); }
150  void Reset() {}
151  bool Interrupted() { return fInterrupted.load(); }
152 
153  void* GetZmqCtx() { return fZmqCtx; }
154 
155  ~Context()
156  {
157  UnsubscribeFromRegionEvents();
158 
159  if (fZmqCtx) {
160  while (true) {
161  if (zmq_ctx_term(fZmqCtx) != 0) {
162  if (errno == EINTR) {
163  LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
164  continue;
165  } else {
166  fZmqCtx = nullptr;
167  }
168  }
169  break;
170  }
171  } else {
172  LOG(error) << "context not available for shutdown";
173  }
174  }
175 
176  private:
177  void* fZmqCtx;
178  mutable std::mutex fMtx;
179  std::atomic<bool> fInterrupted;
180 
181  uint16_t fRegionCounter;
182  std::condition_variable fRegionEventsCV;
183  std::vector<RegionInfo> fRegionInfos;
184  std::queue<RegionInfo> fRegionEvents;
185  std::thread fRegionEventThread;
186  std::function<void(RegionInfo)> fRegionEventCallback;
187  bool fRegionEventsSubscriptionActive;
188 };
189 
190 } // namespace fair::mq::zmq
191 
192 #endif /* FAIR_MQ_ZMQ_CONTEXT_H_ */
fair::mq::zmq::Context
Definition: Context.h:40
fair::mq::zmq::ContextError
Definition: Context.h:37

privacy