FairMQ  1.4.33
C++ Message Queuing Library and Framework
FairMQDevice.h
1 /********************************************************************************
2  * Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQDEVICE_H_
10 #define FAIRMQDEVICE_H_
11 
12 #include <StateMachine.h>
13 #include <FairMQTransportFactory.h>
14 #include <fairmq/Transports.h>
15 #include <fairmq/StateQueue.h>
16 
17 #include <FairMQChannel.h>
18 #include <FairMQMessage.h>
19 #include <FairMQParts.h>
20 #include <FairMQUnmanagedRegion.h>
21 #include <FairMQLogger.h>
22 #include <fairmq/ProgOptions.h>
23 
24 #include <vector>
25 #include <memory> // unique_ptr
26 #include <algorithm> // find
27 #include <string>
28 #include <chrono>
29 #include <unordered_map>
30 #include <functional>
31 #include <stdexcept>
32 #include <mutex>
33 #include <atomic>
34 #include <cstddef>
35 #include <utility> // pair
36 
37 #include <fairmq/tools/Version.h>
38 
39 using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
40 
41 using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
42 using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
43 
44 namespace fair::mq
45 {
46 struct OngoingTransition : std::runtime_error { using std::runtime_error::runtime_error; };
47 }
48 
50 {
51  friend class FairMQChannel;
52 
53  public:
55  FairMQDevice();
58 
61 
64 
65  private:
67 
68  public:
70  FairMQDevice(const FairMQDevice&) = delete;
74  virtual ~FairMQDevice();
75 
77  virtual void LogSocketRates();
78 
79  template<typename Serializer, typename DataType, typename... Args>
80  void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
81  {
82  Serializer().Serialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
83  }
84 
85  template<typename Deserializer, typename DataType, typename... Args>
86  void Deserialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
87  {
88  Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
89  }
90 
97  int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
98  {
99  return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
100  }
101 
108  int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
109  {
110  return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
111  }
112 
119  int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
120  {
121  return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
122  }
123 
130  int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
131  {
132  return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);
133  }
134 
137  {
138  return fTransportFactory.get();
139  }
140 
141  // creates message with the default device transport
142  template<typename... Args>
143  FairMQMessagePtr NewMessage(Args&&... args)
144  {
145  return Transport()->CreateMessage(std::forward<Args>(args)...);
146  }
147 
148  // creates message with the transport of the specified channel
149  template<typename... Args>
150  FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args)
151  {
152  return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
153  }
154 
155  // creates a message that will not be cleaned up after transfer, with the default device transport
156  template<typename T>
157  FairMQMessagePtr NewStaticMessage(const T& data)
158  {
159  return Transport()->NewStaticMessage(data);
160  }
161 
162  // creates a message that will not be cleaned up after transfer, with the transport of the specified channel
163  template<typename T>
164  FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data)
165  {
166  return GetChannel(channel, index).NewStaticMessage(data);
167  }
168 
169  // creates a message with a copy of the provided data, with the default device transport
170  template<typename T>
171  FairMQMessagePtr NewSimpleMessage(const T& data)
172  {
173  return Transport()->NewSimpleMessage(data);
174  }
175 
176  // creates a message with a copy of the provided data, with the transport of the specified channel
177  template<typename T>
178  FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data)
179  {
180  return GetChannel(channel, index).NewSimpleMessage(data);
181  }
182 
183  // creates unamanaged region with the default device transport
184  template<typename... Args>
185  FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
186  {
187  return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...);
188  }
189 
190  // creates unmanaged region with the transport of the specified channel
191  template<typename... Args>
192  FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, Args&&... args)
193  {
194  return GetChannel(channel, index).NewUnmanagedRegion(std::forward<Args>(args)...);
195  }
196 
197  template<typename ...Ts>
198  FairMQPollerPtr NewPoller(const Ts&... inputs)
199  {
200  std::vector<std::string> chans{inputs...};
201 
202  // if more than one channel provided, check compatibility
203  if (chans.size() > 1)
204  {
205  fair::mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType();
206 
207  for (unsigned int i = 1; i < chans.size(); ++i)
208  {
209  if (type != GetChannel(chans.at(i), 0).Transport()->GetType())
210  {
211  LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
212  throw std::runtime_error("poller failed: different transports within same poller are not yet supported.");
213  }
214  }
215  }
216 
217  return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans);
218  }
219 
220  FairMQPollerPtr NewPoller(const std::vector<FairMQChannel*>& channels)
221  {
222  // if more than one channel provided, check compatibility
223  if (channels.size() > 1)
224  {
225  fair::mq::Transport type = channels.at(0)->Transport()->GetType();
226 
227  for (unsigned int i = 1; i < channels.size(); ++i)
228  {
229  if (type != channels.at(i)->Transport()->GetType())
230  {
231  LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
232  throw std::runtime_error("poller failed: different transports within same poller are not yet supported.");
233  }
234  }
235  }
236 
237  return channels.at(0)->Transport()->CreatePoller(channels);
238  }
239 
242  std::shared_ptr<FairMQTransportFactory> AddTransport(const fair::mq::Transport transport);
243 
245  void SetConfig(fair::mq::ProgOptions& config);
248  {
249  return fConfig;
250  }
251 
252  // overload to easily bind member functions
253  template<typename T>
254  void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index))
255  {
256  fDataCallbacks = true;
257  fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQMessagePtr& msg, int index)
258  {
259  return (static_cast<T*>(this)->*memberFunction)(msg, index);
260  }));
261 
262  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
263  {
264  fInputChannelKeys.push_back(channelName);
265  }
266  }
267 
268  void OnData(const std::string& channelName, InputMsgCallback callback)
269  {
270  fDataCallbacks = true;
271  fMsgInputs.insert(make_pair(channelName, callback));
272 
273  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
274  {
275  fInputChannelKeys.push_back(channelName);
276  }
277  }
278 
279  // overload to easily bind member functions
280  template<typename T>
281  void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index))
282  {
283  fDataCallbacks = true;
284  fMultipartInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQParts& parts, int index)
285  {
286  return (static_cast<T*>(this)->*memberFunction)(parts, index);
287  }));
288 
289  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
290  {
291  fInputChannelKeys.push_back(channelName);
292  }
293  }
294 
295  void OnData(const std::string& channelName, InputMultipartCallback callback)
296  {
297  fDataCallbacks = true;
298  fMultipartInputs.insert(make_pair(channelName, callback));
299 
300  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
301  {
302  fInputChannelKeys.push_back(channelName);
303  }
304  }
305 
306  FairMQChannel& GetChannel(const std::string& channelName, const int index = 0)
307  try {
308  return fChannels.at(channelName).at(index);
309  } catch (const std::out_of_range& oor) {
310  LOG(error) << "requested channel has not been configured? check channel names/configuration.";
311  LOG(error) << "channel: " << channelName << ", index: " << index;
312  LOG(error) << "out of range: " << oor.what();
313  throw;
314  }
315 
316  virtual void RegisterChannelEndpoints() {}
317 
318  bool RegisterChannelEndpoint(const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1)
319  {
320  bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second;
321  if (!ok) {
322  LOG(warn) << "Registering channel: name already registered: \"" << channelName << "\"";
323  }
324  return ok;
325  }
326 
327  void PrintRegisteredChannels()
328  {
329  if (fChannelRegistry.size() < 1) {
330  LOGV(info, verylow) << "no channels registered.";
331  } else {
332  for (const auto& c : fChannelRegistry) {
333  LOGV(info, verylow) << c.first << ":" << c.second.first << ":" << c.second.second;
334  }
335  }
336  }
337 
338  void SetId(const std::string& id) { fId = id; }
339  std::string GetId() { return fId; }
340 
341  const fair::mq::tools::Version GetVersion() const { return fVersion; }
342 
343  void SetNumIoThreads(int numIoThreads) { fConfig->SetProperty("io-threads", numIoThreads);}
344  int GetNumIoThreads() const { return fConfig->GetProperty<int>("io-threads", DefaultIOThreads); }
345 
346  void SetNetworkInterface(const std::string& networkInterface) { fConfig->SetProperty("network-interface", networkInterface); }
347  std::string GetNetworkInterface() const { return fConfig->GetProperty<std::string>("network-interface", DefaultNetworkInterface); }
348 
349  void SetDefaultTransport(const std::string& name) { fConfig->SetProperty("transport", name); }
350  std::string GetDefaultTransport() const { return fConfig->GetProperty<std::string>("transport", DefaultTransportName); }
351 
352  void SetInitTimeoutInS(int initTimeoutInS) { fConfig->SetProperty("init-timeout", initTimeoutInS); }
353  int GetInitTimeoutInS() const { return fConfig->GetProperty<int>("init-timeout", DefaultInitTimeout); }
354 
357  void SetTransport(const std::string& transport) { fConfig->SetProperty("transport", transport); }
359  std::string GetTransportName() const { return fConfig->GetProperty<std::string>("transport", DefaultTransportName); }
360 
361  void SetRawCmdLineArgs(const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
362  std::vector<std::string> GetRawCmdLineArgs() const { return fRawCmdLineArgs; }
363 
364  void RunStateMachine()
365  {
366  fStateMachine.ProcessWork();
367  };
368 
372  template<typename Rep, typename Period>
373  bool WaitFor(std::chrono::duration<Rep, Period> const& duration)
374  {
375  return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
376  }
377 
378  protected:
379  std::shared_ptr<FairMQTransportFactory> fTransportFactory;
380  std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports;
381 
382  public:
383  std::unordered_map<std::string, std::vector<FairMQChannel>> fChannels;
384  std::unique_ptr<fair::mq::ProgOptions> fInternalConfig;
386 
387  void AddChannel(const std::string& name, FairMQChannel&& channel)
388  {
389  fConfig->AddChannel(name, channel);
390  }
391 
392  protected:
393  std::string fId;
394 
396  virtual void Init() {}
397 
398  virtual void Bind() {}
399 
400  virtual void Connect() {}
401 
403  virtual void InitTask() {}
404 
406  virtual void Run() {}
407 
409  virtual void PreRun() {}
410 
412  virtual bool ConditionalRun() { return false; }
413 
415  virtual void PostRun() {}
416 
418  virtual void ResetTask() {}
419 
421  virtual void Reset() {}
422 
423  public:
429  bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); }
435  bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }
436 
438  fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); }
441  void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
444  void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); }
445 
446  void TransitionTo(const fair::mq::State state);
447 
454  void SubscribeToStateChange(const std::string& key, std::function<void(const fair::mq::State)> callback) { fStateMachine.SubscribeToStateChange(key, callback); }
457  void UnsubscribeFromStateChange(const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); }
458 
464  void SubscribeToNewTransition(const std::string& key, std::function<void(const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
467  void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); }
468 
470  bool NewStatePending() const { return fStateMachine.NewStatePending(); }
471 
473  fair::mq::State GetCurrentState() const { return fStateMachine.GetCurrentState(); }
475  std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); }
476 
479  static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); }
482  static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); }
483 
484  static constexpr const char* DefaultId = "";
485  static constexpr int DefaultIOThreads = 1;
486  static constexpr const char* DefaultTransportName = "zeromq";
487  static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::ZMQ;
488  static constexpr const char* DefaultNetworkInterface = "default";
489  static constexpr int DefaultInitTimeout = 120;
490  static constexpr uint64_t DefaultMaxRunTime = 0;
491  static constexpr float DefaultRate = 0.;
492  static constexpr const char* DefaultSession = "default";
493 
494  private:
495  fair::mq::Transport fDefaultTransportType;
496  fair::mq::StateMachine fStateMachine;
497 
499  void InitWrapper();
501  void BindWrapper();
503  void ConnectWrapper();
505  void InitTaskWrapper();
507  void RunWrapper();
509  void ResetTaskWrapper();
511  void ResetWrapper();
512 
514  void UnblockTransports();
515 
517  void Exit() {}
518 
520  void AttachChannels(std::vector<FairMQChannel*>& chans);
521  bool AttachChannel(FairMQChannel& ch);
522 
523  void HandleSingleChannelInput();
524  void HandleMultipleChannelInput();
525  void HandleMultipleTransportInput();
526  void PollForTransport(const FairMQTransportFactory* factory, const std::vector<std::string>& channelKeys);
527 
528  bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i);
529  bool HandleMultipartInput(const std::string& chName, const InputMultipartCallback& callback, int i);
530 
531  std::vector<FairMQChannel*> fUninitializedBindingChannels;
532  std::vector<FairMQChannel*> fUninitializedConnectingChannels;
533 
534  bool fDataCallbacks;
535  std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
536  std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
537  std::unordered_map<fair::mq::Transport, std::vector<std::string>> fMultitransportInputs;
538  std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
539  std::vector<std::string> fInputChannelKeys;
540  std::mutex fMultitransportMutex;
541  std::atomic<bool> fMultitransportProceed;
542 
543  const fair::mq::tools::Version fVersion;
544  float fRate;
545  uint64_t fMaxRunRuntimeInS;
546  int fInitializationTimeoutInS;
547  std::vector<std::string> fRawCmdLineArgs;
548 
549  fair::mq::StateQueue fStateQueue;
550 
551  std::mutex fTransitionMtx;
552  bool fTransitioning;
553 };
554 
555 #endif /* FAIRMQDEVICE_H_ */
FairMQDevice::GetStateName
static std::string GetStateName(const fair::mq::State state)
Returns name of the given state as a string.
Definition: FairMQDevice.h:479
fair::mq::ProgOptions
Definition: ProgOptions.h:41
FairMQDevice::GetCurrentState
fair::mq::State GetCurrentState() const
Returns the current state.
Definition: FairMQDevice.h:473
FairMQDevice::Run
virtual void Run()
Runs the device (to be overloaded in child classes)
Definition: FairMQDevice.h:406
fair::mq::tools::Version
Definition: Version.h:25
FairMQDevice::fChannels
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:383
FairMQDevice::ChangeState
bool ChangeState(const fair::mq::Transition transition)
Request a device state transition.
Definition: FairMQDevice.h:429
FairMQDevice::ResetTask
virtual void ResetTask()
Resets the user task (to be overloaded in child classes)
Definition: FairMQDevice.h:418
FairMQDevice::GetConfig
fair::mq::ProgOptions * GetConfig() const
Get pointer to the config.
Definition: FairMQDevice.h:247
fair::mq
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
FairMQDevice::WaitForState
void WaitForState(const std::string &state)
waits for the specified state to occur
Definition: FairMQDevice.h:444
FairMQDevice::GetTransitionName
static std::string GetTransitionName(const fair::mq::Transition transition)
Returns name of the given transition as a string.
Definition: FairMQDevice.h:482
FairMQDevice::Receive
int64_t Receive(FairMQParts &parts, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:130
FairMQParts
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:21
FairMQDevice::UnsubscribeFromNewTransition
void UnsubscribeFromNewTransition(const std::string &key)
Unsubscribe from state transitions.
Definition: FairMQDevice.h:467
FairMQDevice::ConditionalRun
virtual bool ConditionalRun()
Called during RUNNING state repeatedly until it returns false or device state changes.
Definition: FairMQDevice.h:412
fair::mq::ProgOptions::AddChannel
void AddChannel(const std::string &name, const FairMQChannel &channel)
Takes the provided channel and creates properties based on it.
Definition: ProgOptions.cxx:357
fair::mq::ProgOptions::GetProperty
T GetProperty(const std::string &key) const
Read config property, throw if no property with this key exists.
Definition: ProgOptions.h:69
FairMQDevice::operator=
FairMQDevice operator=(const FairMQDevice &)=delete
Assignment operator (disabled)
FairMQDevice::fId
std::string fId
Device ID.
Definition: FairMQDevice.h:393
fair::mq::OngoingTransition
Definition: FairMQDevice.h:46
FairMQDevice::Send
int64_t Send(FairMQParts &parts, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:119
FairMQDevice::AddTransport
std::shared_ptr< FairMQTransportFactory > AddTransport(const fair::mq::Transport transport)
Definition: FairMQDevice.cxx:653
FairMQDevice::Transport
auto Transport() const -> FairMQTransportFactory *
Getter for default transport factory.
Definition: FairMQDevice.h:136
FairMQDevice::Reset
virtual void Reset()
Resets the device (can be overloaded in child classes)
Definition: FairMQDevice.h:421
FairMQDevice::FairMQDevice
FairMQDevice()
Default constructor.
Definition: FairMQDevice.cxx:58
FairMQChannel::Receive
int64_t Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:270
fair::mq::ProgOptions::SetProperty
void SetProperty(const std::string &key, T val)
Set config property.
Definition: ProgOptions.h:136
FairMQChannel::Send
int64_t Send(FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:260
FairMQDevice::InitTask
virtual void InitTask()
Task initialization (can be overloaded in child classes)
Definition: FairMQDevice.h:403
fair::mq::StateMachine
Definition: StateMachine.h:29
FairMQDevice::Receive
int64_t Receive(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:108
FairMQDevice::WaitForNextState
fair::mq::State WaitForNextState()
waits for the next state (any) to occur
Definition: FairMQDevice.h:438
FairMQDevice::fTransports
std::unordered_map< fair::mq::Transport, std::shared_ptr< FairMQTransportFactory > > fTransports
Container for transports.
Definition: FairMQDevice.h:380
FairMQDevice::PreRun
virtual void PreRun()
Called in the RUNNING state once before executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.h:409
FairMQDevice::SubscribeToNewTransition
void SubscribeToNewTransition(const std::string &key, std::function< void(const fair::mq::Transition)> callback)
Subscribe with a callback to incoming state transitions.
Definition: FairMQDevice.h:464
FairMQDevice::GetTransportName
std::string GetTransportName() const
Gets the default transport name.
Definition: FairMQDevice.h:359
FairMQDevice::WaitFor
bool WaitFor(std::chrono::duration< Rep, Period > const &duration)
Definition: FairMQDevice.h:373
FairMQDevice::Send
int64_t Send(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:97
FairMQDevice::fConfig
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:385
FairMQDevice::FairMQDevice
FairMQDevice(const FairMQDevice &)=delete
Copy constructor (disabled)
FairMQDevice::PostRun
virtual void PostRun()
Called in the RUNNING state once after executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.h:415
FairMQDevice::SubscribeToStateChange
void SubscribeToStateChange(const std::string &key, std::function< void(const fair::mq::State)> callback)
Subscribe with a callback to state changes.
Definition: FairMQDevice.h:454
FairMQDevice::LogSocketRates
virtual void LogSocketRates()
Outputs the socket transfer rates.
Definition: FairMQDevice.cxx:678
FairMQDevice::Init
virtual void Init()
Additional user initialization (can be overloaded in child classes). Prefer to use InitTask().
Definition: FairMQDevice.h:396
FairMQDevice::NewStatePending
bool NewStatePending() const
Returns true if a new state has been requested, signaling the current handler to stop.
Definition: FairMQDevice.h:470
FairMQChannel
Wrapper class for FairMQSocket and related methods.
Definition: FairMQChannel.h:35
FairMQDevice::~FairMQDevice
virtual ~FairMQDevice()
Default destructor.
Definition: FairMQDevice.cxx:804
FairMQMessage
Definition: FairMQMessage.h:33
FairMQDevice::fInternalConfig
std::unique_ptr< fair::mq::ProgOptions > fInternalConfig
Internal program options configuration.
Definition: FairMQDevice.h:384
FairMQDevice::GetCurrentStateName
std::string GetCurrentStateName() const
Returns the name of the current state as a string.
Definition: FairMQDevice.h:475
FairMQDevice::SetTransport
void SetTransport(const std::string &transport)
Definition: FairMQDevice.h:357
FairMQDevice::SetConfig
void SetConfig(fair::mq::ProgOptions &config)
Assigns config to the device.
Definition: FairMQDevice.cxx:672
FairMQDevice::ChangeState
bool ChangeState(const std::string &transition)
Request a device state transition.
Definition: FairMQDevice.h:435
FairMQDevice::WaitForState
void WaitForState(fair::mq::State state)
waits for the specified state to occur
Definition: FairMQDevice.h:441
fair::mq::StateQueue
Definition: StateQueue.h:30
FairMQDevice::fTransportFactory
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Default transport factory.
Definition: FairMQDevice.h:379
FairMQDevice
Definition: FairMQDevice.h:50
FairMQDevice::UnsubscribeFromStateChange
void UnsubscribeFromStateChange(const std::string &key)
Unsubscribe from state changes.
Definition: FairMQDevice.h:457
FairMQTransportFactory
Definition: FairMQTransportFactory.h:30

privacy