9 #ifndef FAIR_MQ_ZMQ_TRANSPORTFACTORY_H
10 #define FAIR_MQ_ZMQ_TRANSPORTFACTORY_H
12 #include <fairmq/zeromq/Context.h>
13 #include <fairmq/zeromq/Message.h>
14 #include <fairmq/zeromq/Socket.h>
15 #include <fairmq/zeromq/Poller.h>
16 #include <fairmq/zeromq/UnmanagedRegion.h>
17 #include <FairMQTransportFactory.h>
18 #include <fairmq/ProgOptions.h>
24 namespace fair::mq::zmq
30 TransportFactory(
const std::string&
id =
"",
const ProgOptions* config =
nullptr)
34 int major, minor, patch;
35 zmq_version(&major, &minor, &patch);
36 LOG(debug) <<
"Transport: Using ZeroMQ library, version: " << major <<
"." << minor <<
"." << patch;
39 fCtx = std::make_unique<Context>(config->GetProperty<
int>(
"io-threads", 1));
41 LOG(debug) <<
"fair::mq::ProgOptions not available! Using defaults.";
42 fCtx = std::make_unique<Context>(1);
51 return std::make_unique<Message>(
this);
56 return std::make_unique<Message>(alignment,
this);
61 return std::make_unique<Message>(size,
this);
66 return std::make_unique<Message>(size, alignment,
this);
69 MessagePtr
CreateMessage(
void* data,
const size_t size, fairmq_free_fn* ffn,
void* hint =
nullptr)
override
71 return std::make_unique<Message>(data, size, ffn, hint,
this);
74 MessagePtr
CreateMessage(UnmanagedRegionPtr& region,
void* data,
const size_t size,
void* hint = 0)
override
76 return std::make_unique<Message>(region, data, size, hint,
this);
79 SocketPtr
CreateSocket(
const std::string& type,
const std::string& name)
override
81 return std::make_unique<Socket>(*fCtx, type, name, GetId(),
this);
84 PollerPtr
CreatePoller(
const std::vector<FairMQChannel>& channels)
const override
86 return std::make_unique<Poller>(channels);
89 PollerPtr
CreatePoller(
const std::vector<FairMQChannel*>& channels)
const override
91 return std::make_unique<Poller>(channels);
94 PollerPtr
CreatePoller(
const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap,
const std::vector<std::string>& channelList)
const override
96 return std::make_unique<Poller>(channelsMap, channelList);
99 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size, RegionCallback callback,
const std::string& path =
"",
int flags = 0)
override
104 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size, RegionBulkCallback bulkCallback,
const std::string& path =
"",
int flags = 0)
override
109 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size,
const int64_t userFlags, RegionCallback callback,
const std::string& path =
"",
int flags = 0)
override
114 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size,
const int64_t userFlags, RegionBulkCallback bulkCallback,
const std::string& path =
"",
int flags = 0)
override
119 UnmanagedRegionPtr
CreateUnmanagedRegion(
const size_t size,
const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback,
const std::string&,
int)
121 UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback,
this);
123 fCtx->AddRegion(
false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
127 void SubscribeToRegionEvents(RegionEventCallback callback)
override { fCtx->SubscribeToRegionEvents(callback); }
130 std::vector<RegionInfo> GetRegionInfo()
override {
return fCtx->GetRegionInfo(); }
132 Transport
GetType()
const override {
return Transport::ZMQ; }
134 void Interrupt()
override { fCtx->Interrupt(); }
135 void Resume()
override { fCtx->Resume(); }
136 void Reset()
override { fCtx->Reset(); }
138 ~TransportFactory()
override { LOG(debug) <<
"Destroying ZeroMQ transport..."; }
141 std::unique_ptr<Context> fCtx;