9 #ifndef FAIR_MQ_ZMQ_CONTEXT_H_
10 #define FAIR_MQ_ZMQ_CONTEXT_H_
12 #include <fairmq/tools/Strings.h>
13 #include <FairMQLogger.h>
14 #include <FairMQUnmanagedRegion.h>
19 #include <condition_variable>
28 namespace fair::mq::zmq
31 struct ContextError : std::runtime_error {
using std::runtime_error::runtime_error; };
36 Context(
int numIoThreads)
37 : fZmqCtx(zmq_ctx_new())
42 throw ContextError(tools::ToString(
"failed creating context, reason: ", zmq_strerror(errno)));
45 if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
46 LOG(error) <<
"failed configuring context, reason: " << zmq_strerror(errno);
47 throw ContextError(tools::ToString(
"failed configuring context, reason: ", zmq_strerror(errno)));
50 if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
51 LOG(error) <<
"failed configuring context, reason: " << zmq_strerror(errno);
52 throw ContextError(tools::ToString(
"failed configuring context, reason: ", zmq_strerror(errno)));
55 fRegionEvents.emplace(
true, 0,
nullptr, 0, 0, RegionEvent::local_only);
61 void SubscribeToRegionEvents(RegionEventCallback callback)
63 if (fRegionEventThread.joinable()) {
64 LOG(debug) <<
"Already subscribed. Overwriting previous subscription.";
66 std::lock_guard<std::mutex> lock(fMtx);
67 fRegionEventsSubscriptionActive =
false;
69 fRegionEventsCV.notify_one();
70 fRegionEventThread.join();
72 std::lock_guard<std::mutex> lock(fMtx);
73 fRegionEventCallback = callback;
74 fRegionEventsSubscriptionActive =
true;
75 fRegionEventThread = std::thread(&Context::RegionEventsSubscription,
this);
78 bool SubscribedToRegionEvents()
const {
return fRegionEventThread.joinable(); }
80 void UnsubscribeFromRegionEvents()
82 if (fRegionEventThread.joinable()) {
83 std::unique_lock<std::mutex> lock(fMtx);
84 fRegionEventsSubscriptionActive =
false;
86 fRegionEventsCV.notify_one();
87 fRegionEventThread.join();
89 fRegionEventCallback =
nullptr;
93 void RegionEventsSubscription()
95 std::unique_lock<std::mutex> lock(fMtx);
96 while (fRegionEventsSubscriptionActive) {
98 while (!fRegionEvents.empty()) {
99 auto i = fRegionEvents.front();
100 fRegionEventCallback(i);
103 fRegionEventsCV.wait(lock, [&]() {
return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); });
107 std::vector<RegionInfo> GetRegionInfo()
const
109 std::lock_guard<std::mutex> lock(fMtx);
113 uint16_t RegionCount()
const
115 std::lock_guard<std::mutex> lock(fMtx);
116 return fRegionCounter;
119 void AddRegion(
bool managed, uint16_t
id,
void* ptr,
size_t size, int64_t userFlags, RegionEvent event)
122 std::lock_guard<std::mutex> lock(fMtx);
124 fRegionInfos.emplace_back(managed,
id, ptr, size, userFlags, event);
125 fRegionEvents.emplace(managed,
id, ptr, size, userFlags, event);
127 fRegionEventsCV.notify_one();
130 void RemoveRegion(uint16_t
id)
133 std::lock_guard<std::mutex> lock(fMtx);
134 auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [
id](
const RegionInfo& i) {
137 if (it != fRegionInfos.end()) {
138 fRegionEvents.push(*it);
139 fRegionEvents.back().event = RegionEvent::destroyed;
140 fRegionInfos.erase(it);
142 LOG(error) <<
"RemoveRegion: given id (" <<
id <<
") not found.";
145 fRegionEventsCV.notify_one();
148 void Interrupt() { fInterrupted.store(
true); }
149 void Resume() { fInterrupted.store(
false); }
151 bool Interrupted() {
return fInterrupted.load(); }
153 void* GetZmqCtx() {
return fZmqCtx; }
157 UnsubscribeFromRegionEvents();
161 if (zmq_ctx_term(fZmqCtx) != 0) {
162 if (errno == EINTR) {
163 LOG(debug) <<
"zmq_ctx_term interrupted by system call, retrying";
172 LOG(error) <<
"context not available for shutdown";
178 mutable std::mutex fMtx;
179 std::atomic<bool> fInterrupted;
181 uint16_t fRegionCounter;
182 std::condition_variable fRegionEventsCV;
183 std::vector<RegionInfo> fRegionInfos;
184 std::queue<RegionInfo> fRegionEvents;
185 std::thread fRegionEventThread;
186 std::function<void(RegionInfo)> fRegionEventCallback;
187 bool fRegionEventsSubscriptionActive;