FairMQ  1.4.33
C++ Message Queuing Library and Framework
TransportFactory.h
1 /********************************************************************************
2  * Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
10 #define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
11 
12 #include "Manager.h"
13 #include "Common.h"
14 #include "Message.h"
15 #include "Socket.h"
16 #include "Poller.h"
17 #include "UnmanagedRegion.h"
18 
19 #include <FairMQTransportFactory.h>
20 #include <fairmq/ProgOptions.h>
21 #include <FairMQLogger.h>
22 #include <fairmq/tools/Strings.h>
23 
24 #include <boost/version.hpp>
25 
26 #include <zmq.h>
27 
28 #include <memory> // unique_ptr, make_unique
29 #include <string>
30 #include <vector>
31 #include <stdexcept>
32 
33 namespace fair::mq::shmem
34 {
35 
36 class TransportFactory final : public fair::mq::TransportFactory
37 {
38  public:
39  TransportFactory(const std::string& deviceId = "", const ProgOptions* config = nullptr)
40  : fair::mq::TransportFactory(deviceId)
41  , fZmqCtx(zmq_ctx_new())
42  , fManager(nullptr)
43  {
44  int major, minor, patch;
45  zmq_version(&major, &minor, &patch);
46  LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
47  << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")";
48 
49  if (!fZmqCtx) {
50  throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
51  }
52 
53  int numIoThreads = 1;
54  std::string sessionName = "default";
55  size_t segmentSize = 2ULL << 30;
56  std::string allocationAlgorithm("rbtree_best_fit");
57  if (config) {
58  numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
59  sessionName = config->GetProperty<std::string>("session", sessionName);
60  segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
61  allocationAlgorithm = config->GetProperty<std::string>("shm-allocation", allocationAlgorithm);
62  } else {
63  LOG(debug) << "ProgOptions not available! Using defaults.";
64  }
65 
66  if (allocationAlgorithm != "rbtree_best_fit" && allocationAlgorithm != "simple_seq_fit") {
67  LOG(error) << "Provided shared memory allocation algorithm '" << allocationAlgorithm << "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'";
68  throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
69  }
70 
71  std::string shmId = makeShmIdStr(sessionName);
72  LOG(debug) << "Generated shmid '" << shmId << "' out of session id '" << sessionName << "'.";
73 
74  try {
75  if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
76  LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
77  }
78 
79  // Set the maximum number of allowed sockets on the context.
80  if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
81  LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
82  }
83 
84  fManager = std::make_unique<Manager>(shmId, deviceId, segmentSize, config);
85  } catch (boost::interprocess::interprocess_exception& e) {
86  LOG(error) << "Could not initialize shared memory transport: " << e.what();
87  throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
88  } catch (const std::exception& e) {
89  LOG(error) << "Could not initialize shared memory transport: " << e.what();
90  throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
91  }
92  }
93 
94  TransportFactory(const TransportFactory&) = delete;
95  TransportFactory operator=(const TransportFactory&) = delete;
96 
97  MessagePtr CreateMessage() override
98  {
99  return std::make_unique<Message>(*fManager, this);
100  }
101 
102  MessagePtr CreateMessage(Alignment alignment) override
103  {
104  return std::make_unique<Message>(*fManager, alignment, this);
105  }
106 
107  MessagePtr CreateMessage(const size_t size) override
108  {
109  return std::make_unique<Message>(*fManager, size, this);
110  }
111 
112  MessagePtr CreateMessage(const size_t size, Alignment alignment) override
113  {
114  return std::make_unique<Message>(*fManager, size, alignment, this);
115  }
116 
117  MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
118  {
119  return std::make_unique<Message>(*fManager, data, size, ffn, hint, this);
120  }
121 
122  MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override
123  {
124  return std::make_unique<Message>(*fManager, region, data, size, hint, this);
125  }
126 
127  SocketPtr CreateSocket(const std::string& type, const std::string& name) override
128  {
129  return std::make_unique<Socket>(*fManager, type, name, GetId(), fZmqCtx, this);
130  }
131 
132  PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override
133  {
134  return std::make_unique<Poller>(channels);
135  }
136 
137  PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override
138  {
139  return std::make_unique<Poller>(channels);
140  }
141 
142  PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override
143  {
144  return std::make_unique<Poller>(channelsMap, channelList);
145  }
146 
147  UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
148  {
149  return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
150  }
151 
152  UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
153  {
154  return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
155  }
156 
157  UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
158  {
159  return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
160  }
161 
162  UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
163  {
164  return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
165  }
166 
167  UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
168  {
169  return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
170  }
171 
172  void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
173  bool SubscribedToRegionEvents() override { return fManager->SubscribedToRegionEvents(); }
174  void UnsubscribeFromRegionEvents() override { fManager->UnsubscribeFromRegionEvents(); }
175  std::vector<fair::mq::RegionInfo> GetRegionInfo() override { return fManager->GetRegionInfo(); }
176 
177  Transport GetType() const override { return fair::mq::Transport::SHM; }
178 
179  void Interrupt() override { fManager->Interrupt(); }
180  void Resume() override { fManager->Resume(); }
181  void Reset() override { fManager->Reset(); }
182 
183  ~TransportFactory() override
184  {
185  LOG(debug) << "Destroying Shared Memory transport...";
186 
187  if (fZmqCtx) {
188  while (true) {
189  if (zmq_ctx_term(fZmqCtx) != 0) {
190  if (errno == EINTR) {
191  LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
192  continue;
193  } else {
194  fZmqCtx = nullptr;
195  }
196  }
197  break;
198  }
199  } else {
200  LOG(error) << "context not available for shutdown";
201  }
202  }
203 
204  private:
205  void* fZmqCtx;
206  std::unique_ptr<Manager> fManager;
207 };
208 
209 } // namespace fair::mq::shmem
210 
211 #endif /* FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ */
fair::mq::Alignment
Definition: FairMQMessage.h:25
fair::mq::shmem::TransportFactory::CreateMessage
MessagePtr CreateMessage() override
Create empty FairMQMessage (for receiving)
Definition: TransportFactory.h:109
fair::mq::shmem::TransportFactory
Definition: TransportFactory.h:43
fair::mq::shmem::TransportFactory::CreatePoller
PollerPtr CreatePoller(const std::vector< FairMQChannel > &channels) const override
Create a poller for a single channel (all subchannels)
Definition: TransportFactory.h:144
fair::mq::shmem::TransportFactory::GetType
Transport GetType() const override
Get transport type.
Definition: TransportFactory.h:189
fair::mq::shmem::TransportFactory::UnsubscribeFromRegionEvents
void UnsubscribeFromRegionEvents() override
Unsubscribe from region events.
Definition: TransportFactory.h:186
fair::mq::shmem::TransportFactory::SubscribedToRegionEvents
bool SubscribedToRegionEvents() override
Check if there is an active subscription to region events.
Definition: TransportFactory.h:185
fair::mq::shmem::SharedMemoryError
Definition: Common.h:41
fair::mq::shmem::TransportFactory::CreateUnmanagedRegion
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback=nullptr, const std::string &path="", int flags=0) override
Create new UnmanagedRegion.
Definition: TransportFactory.h:159
fair::mq::shmem::TransportFactory::CreateSocket
SocketPtr CreateSocket(const std::string &type, const std::string &name) override
Create a socket.
Definition: TransportFactory.h:139
fair::mq::shmem::TransportFactory::SubscribeToRegionEvents
void SubscribeToRegionEvents(RegionEventCallback callback) override
Subscribe to region events (creation, destruction, ...)
Definition: TransportFactory.h:184
fair::mq::shmem
Definition: Common.h:33
FairMQTransportFactory
Definition: FairMQTransportFactory.h:30

privacy