9 #ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
10 #define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
17 #include "UnmanagedRegion.h"
19 #include <FairMQTransportFactory.h>
20 #include <fairmq/ProgOptions.h>
21 #include <FairMQLogger.h>
22 #include <fairmq/tools/Strings.h>
24 #include <boost/version.hpp>
39 TransportFactory(
const std::string& deviceId =
"",
const ProgOptions* config =
nullptr)
40 : fair::mq::TransportFactory(deviceId)
41 , fZmqCtx(zmq_ctx_new())
44 int major, minor, patch;
45 zmq_version(&major, &minor, &patch);
46 LOG(debug) <<
"Transport: Using ZeroMQ (" << major <<
"." << minor <<
"." << patch <<
") & "
47 <<
"boost::interprocess (" << (BOOST_VERSION / 100000) <<
"." << (BOOST_VERSION / 100 % 1000) <<
"." << (BOOST_VERSION % 100) <<
")";
50 throw std::runtime_error(tools::ToString(
"failed creating context, reason: ", zmq_strerror(errno)));
54 std::string sessionName =
"default";
55 size_t segmentSize = 2ULL << 30;
56 std::string allocationAlgorithm(
"rbtree_best_fit");
58 numIoThreads = config->GetProperty<
int>(
"io-threads", numIoThreads);
59 sessionName = config->GetProperty<std::string>(
"session", sessionName);
60 segmentSize = config->GetProperty<
size_t>(
"shm-segment-size", segmentSize);
61 allocationAlgorithm = config->GetProperty<std::string>(
"shm-allocation", allocationAlgorithm);
63 LOG(debug) <<
"ProgOptions not available! Using defaults.";
66 if (allocationAlgorithm !=
"rbtree_best_fit" && allocationAlgorithm !=
"simple_seq_fit") {
67 LOG(error) <<
"Provided shared memory allocation algorithm '" << allocationAlgorithm <<
"' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'";
68 throw SharedMemoryError(tools::ToString(
"Provided shared memory allocation algorithm '", allocationAlgorithm,
"' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
71 std::string shmId = makeShmIdStr(sessionName);
72 LOG(debug) <<
"Generated shmid '" << shmId <<
"' out of session id '" << sessionName <<
"'.";
75 if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
76 LOG(error) <<
"failed configuring context, reason: " << zmq_strerror(errno);
80 if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
81 LOG(error) <<
"failed configuring context, reason: " << zmq_strerror(errno);
84 fManager = std::make_unique<Manager>(shmId, deviceId, segmentSize, config);
85 }
catch (boost::interprocess::interprocess_exception& e) {
86 LOG(error) <<
"Could not initialize shared memory transport: " << e.what();
87 throw std::runtime_error(tools::ToString(
"Could not initialize shared memory transport: ", e.what()));
88 }
catch (
const std::exception& e) {
89 LOG(error) <<
"Could not initialize shared memory transport: " << e.what();
90 throw std::runtime_error(tools::ToString(
"Could not initialize shared memory transport: ", e.what()));
99 return std::make_unique<Message>(*fManager,
this);
104 return std::make_unique<Message>(*fManager, alignment,
this);
109 return std::make_unique<Message>(*fManager, size,
this);
114 return std::make_unique<Message>(*fManager, size, alignment,
this);
117 MessagePtr
CreateMessage(
void* data,
const size_t size, fairmq_free_fn* ffn,
void* hint =
nullptr)
override
119 return std::make_unique<Message>(*fManager, data, size, ffn, hint,
this);
122 MessagePtr
CreateMessage(UnmanagedRegionPtr& region,
void* data,
const size_t size,
void* hint = 0)
override
124 return std::make_unique<Message>(*fManager, region, data, size, hint,
this);
127 SocketPtr
CreateSocket(
const std::string& type,
const std::string& name)
override
129 return std::make_unique<Socket>(*fManager, type, name, GetId(), fZmqCtx,
this);
132 PollerPtr
CreatePoller(
const std::vector<FairMQChannel>& channels)
const override
134 return std::make_unique<Poller>(channels);
137 PollerPtr
CreatePoller(
const std::vector<FairMQChannel*>& channels)
const override
139 return std::make_unique<Poller>(channels);
142 PollerPtr
CreatePoller(
const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap,
const std::vector<std::string>& channelList)
const override
144 return std::make_unique<Poller>(channelsMap, channelList);
147 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size, RegionCallback callback =
nullptr,
const std::string& path =
"",
int flags = 0)
override
152 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size, RegionBulkCallback bulkCallback =
nullptr,
const std::string& path =
"",
int flags = 0)
override
157 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size, int64_t userFlags, RegionCallback callback =
nullptr,
const std::string& path =
"",
int flags = 0)
override
162 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback =
nullptr,
const std::string& path =
"",
int flags = 0)
override
167 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback,
const std::string& path,
int flags)
169 return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags,
this);
172 void SubscribeToRegionEvents(RegionEventCallback callback)
override { fManager->SubscribeToRegionEvents(callback); }
175 std::vector<fair::mq::RegionInfo> GetRegionInfo()
override {
return fManager->GetRegionInfo(); }
177 Transport
GetType()
const override {
return fair::mq::Transport::SHM; }
179 void Interrupt()
override { fManager->Interrupt(); }
180 void Resume()
override { fManager->Resume(); }
181 void Reset()
override { fManager->Reset(); }
183 ~TransportFactory()
override
185 LOG(debug) <<
"Destroying Shared Memory transport...";
189 if (zmq_ctx_term(fZmqCtx) != 0) {
190 if (errno == EINTR) {
191 LOG(debug) <<
"zmq_ctx_term interrupted by system call, retrying";
200 LOG(error) <<
"context not available for shutdown";
206 std::unique_ptr<Manager> fManager;