9 #ifndef FAIRMQDEVICE_H_
10 #define FAIRMQDEVICE_H_
12 #include <StateMachine.h>
13 #include <FairMQTransportFactory.h>
14 #include <fairmq/Transports.h>
15 #include <fairmq/StateQueue.h>
17 #include <FairMQChannel.h>
18 #include <FairMQMessage.h>
19 #include <FairMQParts.h>
20 #include <FairMQUnmanagedRegion.h>
21 #include <FairMQLogger.h>
22 #include <fairmq/ProgOptions.h>
29 #include <unordered_map>
37 #include <fairmq/tools/Version.h>
39 using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
41 using InputMsgCallback = std::function<bool(FairMQMessagePtr&,
int)>;
42 using InputMultipartCallback = std::function<bool(
FairMQParts&,
int)>;
79 template<
typename Serializer,
typename DataType,
typename... Args>
80 void Serialize(
FairMQMessage& msg, DataType&& data, Args&&... args)
const
82 Serializer().Serialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
85 template<
typename Deserializer,
typename DataType,
typename... Args>
86 void Deserialize(
FairMQMessage& msg, DataType&& data, Args&&... args)
const
88 Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
97 int64_t
Send(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0,
int sndTimeoutInMs = -1)
99 return GetChannel(channel, index).
Send(msg, sndTimeoutInMs);
108 int64_t
Receive(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0,
int rcvTimeoutInMs = -1)
110 return GetChannel(channel, index).
Receive(msg, rcvTimeoutInMs);
119 int64_t
Send(
FairMQParts& parts,
const std::string& channel,
const int index = 0,
int sndTimeoutInMs = -1)
121 return GetChannel(channel, index).
Send(parts.fParts, sndTimeoutInMs);
130 int64_t
Receive(
FairMQParts& parts,
const std::string& channel,
const int index = 0,
int rcvTimeoutInMs = -1)
132 return GetChannel(channel, index).
Receive(parts.fParts, rcvTimeoutInMs);
142 template<
typename... Args>
143 FairMQMessagePtr NewMessage(Args&&... args)
145 return Transport()->CreateMessage(std::forward<Args>(args)...);
149 template<
typename... Args>
150 FairMQMessagePtr NewMessageFor(
const std::string& channel,
int index, Args&&... args)
152 return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
157 FairMQMessagePtr NewStaticMessage(
const T& data)
159 return Transport()->NewStaticMessage(data);
164 FairMQMessagePtr NewStaticMessageFor(
const std::string& channel,
int index,
const T& data)
166 return GetChannel(channel, index).NewStaticMessage(data);
171 FairMQMessagePtr NewSimpleMessage(
const T& data)
173 return Transport()->NewSimpleMessage(data);
178 FairMQMessagePtr NewSimpleMessageFor(
const std::string& channel,
int index,
const T& data)
180 return GetChannel(channel, index).NewSimpleMessage(data);
184 template<
typename... Args>
185 FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
187 return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...);
191 template<
typename... Args>
192 FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(
const std::string& channel,
int index, Args&&... args)
194 return GetChannel(channel, index).NewUnmanagedRegion(std::forward<Args>(args)...);
197 template<
typename ...Ts>
198 FairMQPollerPtr NewPoller(
const Ts&... inputs)
200 std::vector<std::string> chans{inputs...};
203 if (chans.size() > 1)
205 fair::mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType();
207 for (
unsigned int i = 1; i < chans.size(); ++i)
209 if (type != GetChannel(chans.at(i), 0).Transport()->GetType())
211 LOG(error) <<
"poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
212 throw std::runtime_error(
"poller failed: different transports within same poller are not yet supported.");
217 return GetChannel(chans.at(0), 0).Transport()->CreatePoller(
fChannels, chans);
220 FairMQPollerPtr NewPoller(
const std::vector<FairMQChannel*>& channels)
223 if (channels.size() > 1)
225 fair::mq::Transport type = channels.at(0)->Transport()->GetType();
227 for (
unsigned int i = 1; i < channels.size(); ++i)
229 if (type != channels.at(i)->Transport()->GetType())
231 LOG(error) <<
"poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
232 throw std::runtime_error(
"poller failed: different transports within same poller are not yet supported.");
237 return channels.at(0)->Transport()->CreatePoller(channels);
242 std::shared_ptr<FairMQTransportFactory>
AddTransport(
const fair::mq::Transport transport);
254 void OnData(
const std::string& channelName,
bool (T::* memberFunction)(FairMQMessagePtr& msg,
int index))
256 fDataCallbacks =
true;
257 fMsgInputs.insert(std::make_pair(channelName, [
this, memberFunction](FairMQMessagePtr& msg,
int index)
259 return (
static_cast<T*
>(
this)->*memberFunction)(msg, index);
262 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
264 fInputChannelKeys.push_back(channelName);
268 void OnData(
const std::string& channelName, InputMsgCallback callback)
270 fDataCallbacks =
true;
271 fMsgInputs.insert(make_pair(channelName, callback));
273 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
275 fInputChannelKeys.push_back(channelName);
281 void OnData(
const std::string& channelName,
bool (T::* memberFunction)(
FairMQParts& parts,
int index))
283 fDataCallbacks =
true;
284 fMultipartInputs.insert(std::make_pair(channelName, [
this, memberFunction](
FairMQParts& parts,
int index)
286 return (
static_cast<T*
>(
this)->*memberFunction)(parts, index);
289 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
291 fInputChannelKeys.push_back(channelName);
295 void OnData(
const std::string& channelName, InputMultipartCallback callback)
297 fDataCallbacks =
true;
298 fMultipartInputs.insert(make_pair(channelName, callback));
300 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
302 fInputChannelKeys.push_back(channelName);
306 FairMQChannel& GetChannel(
const std::string& channelName,
const int index = 0)
308 return fChannels.at(channelName).at(index);
309 }
catch (
const std::out_of_range& oor) {
310 LOG(error) <<
"requested channel has not been configured? check channel names/configuration.";
311 LOG(error) <<
"channel: " << channelName <<
", index: " << index;
312 LOG(error) <<
"out of range: " << oor.what();
316 virtual void RegisterChannelEndpoints() {}
318 bool RegisterChannelEndpoint(
const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1)
320 bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second;
322 LOG(warn) <<
"Registering channel: name already registered: \"" << channelName <<
"\"";
327 void PrintRegisteredChannels()
329 if (fChannelRegistry.size() < 1) {
330 LOGV(info, verylow) <<
"no channels registered.";
332 for (
const auto& c : fChannelRegistry) {
333 LOGV(info, verylow) << c.first <<
":" << c.second.first <<
":" << c.second.second;
338 void SetId(
const std::string&
id) {
fId = id; }
339 std::string GetId() {
return fId; }
343 void SetNumIoThreads(
int numIoThreads) {
fConfig->
SetProperty(
"io-threads", numIoThreads);}
344 int GetNumIoThreads()
const {
return fConfig->
GetProperty<
int>(
"io-threads", DefaultIOThreads); }
346 void SetNetworkInterface(
const std::string& networkInterface) {
fConfig->
SetProperty(
"network-interface", networkInterface); }
347 std::string GetNetworkInterface()
const {
return fConfig->
GetProperty<std::string>(
"network-interface", DefaultNetworkInterface); }
349 void SetDefaultTransport(
const std::string& name) {
fConfig->
SetProperty(
"transport", name); }
350 std::string GetDefaultTransport()
const {
return fConfig->
GetProperty<std::string>(
"transport", DefaultTransportName); }
352 void SetInitTimeoutInS(
int initTimeoutInS) {
fConfig->
SetProperty(
"init-timeout", initTimeoutInS); }
353 int GetInitTimeoutInS()
const {
return fConfig->
GetProperty<
int>(
"init-timeout", DefaultInitTimeout); }
361 void SetRawCmdLineArgs(
const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
362 std::vector<std::string> GetRawCmdLineArgs()
const {
return fRawCmdLineArgs; }
364 void RunStateMachine()
366 fStateMachine.ProcessWork();
372 template<
typename Rep,
typename Period>
373 bool WaitFor(std::chrono::duration<Rep, Period>
const& duration)
375 return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
380 std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>>
fTransports;
383 std::unordered_map<std::string, std::vector<FairMQChannel>>
fChannels;
387 void AddChannel(
const std::string& name,
FairMQChannel&& channel)
398 virtual void Bind() {}
400 virtual void Connect() {}
429 bool ChangeState(
const fair::mq::Transition transition) {
return fStateMachine.ChangeState(transition); }
435 bool ChangeState(
const std::string& transition) {
return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }
441 void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
446 void TransitionTo(
const fair::mq::State state);
454 void SubscribeToStateChange(
const std::string& key, std::function<
void(
const fair::mq::State)> callback) { fStateMachine.SubscribeToStateChange(key, callback); }
464 void SubscribeToNewTransition(
const std::string& key, std::function<
void(
const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
479 static std::string
GetStateName(
const fair::mq::State state) {
return fair::mq::GetStateName(state); }
482 static std::string
GetTransitionName(
const fair::mq::Transition transition) {
return fair::mq::GetTransitionName(transition); }
484 static constexpr
const char* DefaultId =
"";
485 static constexpr
int DefaultIOThreads = 1;
486 static constexpr
const char* DefaultTransportName =
"zeromq";
487 static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::ZMQ;
488 static constexpr
const char* DefaultNetworkInterface =
"default";
489 static constexpr
int DefaultInitTimeout = 120;
490 static constexpr uint64_t DefaultMaxRunTime = 0;
491 static constexpr
float DefaultRate = 0.;
492 static constexpr
const char* DefaultSession =
"default";
495 fair::mq::Transport fDefaultTransportType;
503 void ConnectWrapper();
505 void InitTaskWrapper();
509 void ResetTaskWrapper();
514 void UnblockTransports();
520 void AttachChannels(std::vector<FairMQChannel*>& chans);
523 void HandleSingleChannelInput();
524 void HandleMultipleChannelInput();
525 void HandleMultipleTransportInput();
526 void PollForTransport(
const FairMQTransportFactory* factory,
const std::vector<std::string>& channelKeys);
528 bool HandleMsgInput(
const std::string& chName,
const InputMsgCallback& callback,
int i);
529 bool HandleMultipartInput(
const std::string& chName,
const InputMultipartCallback& callback,
int i);
531 std::vector<FairMQChannel*> fUninitializedBindingChannels;
532 std::vector<FairMQChannel*> fUninitializedConnectingChannels;
535 std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
536 std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
537 std::unordered_map<fair::mq::Transport, std::vector<std::string>> fMultitransportInputs;
538 std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
539 std::vector<std::string> fInputChannelKeys;
540 std::mutex fMultitransportMutex;
541 std::atomic<bool> fMultitransportProceed;
545 uint64_t fMaxRunRuntimeInS;
546 int fInitializationTimeoutInS;
547 std::vector<std::string> fRawCmdLineArgs;
551 std::mutex fTransitionMtx;