FairMQ  1.4.33
C++ Message Queuing Library and Framework
Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | Protected Member Functions | Protected Attributes | Friends | List of all members
FairMQDevice Class Reference
Inheritance diagram for FairMQDevice:
Inheritance graph
[legend]
Collaboration diagram for FairMQDevice:
Collaboration graph
[legend]

Public Member Functions

 FairMQDevice ()
 Default constructor.
 
 FairMQDevice (fair::mq::ProgOptions &config)
 Constructor with external fair::mq::ProgOptions.
 
 FairMQDevice (const fair::mq::tools::Version version)
 Constructor that sets the version.
 
 FairMQDevice (fair::mq::ProgOptions &config, const fair::mq::tools::Version version)
 Constructor that sets the version and external fair::mq::ProgOptions.
 
 FairMQDevice (const FairMQDevice &)=delete
 Copy constructor (disabled)
 
FairMQDevice operator= (const FairMQDevice &)=delete
 Assignment operator (disabled)
 
virtual ~FairMQDevice ()
 Default destructor.
 
virtual void LogSocketRates ()
 Outputs the socket transfer rates.
 
template<typename Serializer , typename DataType , typename... Args>
void Serialize (FairMQMessage &msg, DataType &&data, Args &&... args) const
 
template<typename Deserializer , typename DataType , typename... Args>
void Deserialize (FairMQMessage &msg, DataType &&data, Args &&... args) const
 
int64_t Send (FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
 
int64_t Receive (FairMQMessagePtr &msg, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
 
int64_t Send (FairMQParts &parts, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
 
int64_t Receive (FairMQParts &parts, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
 
auto Transport () const -> FairMQTransportFactory *
 Getter for default transport factory.
 
template<typename... Args>
FairMQMessagePtr NewMessage (Args &&... args)
 
template<typename... Args>
FairMQMessagePtr NewMessageFor (const std::string &channel, int index, Args &&... args)
 
template<typename T >
FairMQMessagePtr NewStaticMessage (const T &data)
 
template<typename T >
FairMQMessagePtr NewStaticMessageFor (const std::string &channel, int index, const T &data)
 
template<typename T >
FairMQMessagePtr NewSimpleMessage (const T &data)
 
template<typename T >
FairMQMessagePtr NewSimpleMessageFor (const std::string &channel, int index, const T &data)
 
template<typename... Args>
FairMQUnmanagedRegionPtr NewUnmanagedRegion (Args &&... args)
 
template<typename... Args>
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor (const std::string &channel, int index, Args &&... args)
 
template<typename ... Ts>
FairMQPollerPtr NewPoller (const Ts &... inputs)
 
FairMQPollerPtr NewPoller (const std::vector< FairMQChannel * > &channels)
 
std::shared_ptr< FairMQTransportFactoryAddTransport (const fair::mq::Transport transport)
 
void SetConfig (fair::mq::ProgOptions &config)
 Assigns config to the device.
 
fair::mq::ProgOptionsGetConfig () const
 Get pointer to the config.
 
template<typename T >
void OnData (const std::string &channelName, bool(T::*memberFunction)(FairMQMessagePtr &msg, int index))
 
void OnData (const std::string &channelName, InputMsgCallback callback)
 
template<typename T >
void OnData (const std::string &channelName, bool(T::*memberFunction)(FairMQParts &parts, int index))
 
void OnData (const std::string &channelName, InputMultipartCallback callback)
 
FairMQChannelGetChannel (const std::string &channelName, const int index=0)
 
virtual void RegisterChannelEndpoints ()
 
bool RegisterChannelEndpoint (const std::string &channelName, uint16_t minNumSubChannels=1, uint16_t maxNumSubChannels=1)
 
void PrintRegisteredChannels ()
 
void SetId (const std::string &id)
 
std::string GetId ()
 
const fair::mq::tools::Version GetVersion () const
 
void SetNumIoThreads (int numIoThreads)
 
int GetNumIoThreads () const
 
void SetNetworkInterface (const std::string &networkInterface)
 
std::string GetNetworkInterface () const
 
void SetDefaultTransport (const std::string &name)
 
std::string GetDefaultTransport () const
 
void SetInitTimeoutInS (int initTimeoutInS)
 
int GetInitTimeoutInS () const
 
void SetTransport (const std::string &transport)
 
std::string GetTransportName () const
 Gets the default transport name.
 
void SetRawCmdLineArgs (const std::vector< std::string > &args)
 
std::vector< std::string > GetRawCmdLineArgs () const
 
void RunStateMachine ()
 
template<typename Rep , typename Period >
bool WaitFor (std::chrono::duration< Rep, Period > const &duration)
 
void AddChannel (const std::string &name, FairMQChannel &&channel)
 
bool ChangeState (const fair::mq::Transition transition)
 Request a device state transition. More...
 
bool ChangeState (const std::string &transition)
 Request a device state transition. More...
 
fair::mq::State WaitForNextState ()
 waits for the next state (any) to occur
 
void WaitForState (fair::mq::State state)
 waits for the specified state to occur More...
 
void WaitForState (const std::string &state)
 waits for the specified state to occur More...
 
void TransitionTo (const fair::mq::State state)
 
void SubscribeToStateChange (const std::string &key, std::function< void(const fair::mq::State)> callback)
 Subscribe with a callback to state changes. More...
 
void UnsubscribeFromStateChange (const std::string &key)
 Unsubscribe from state changes. More...
 
void SubscribeToNewTransition (const std::string &key, std::function< void(const fair::mq::Transition)> callback)
 Subscribe with a callback to incoming state transitions. More...
 
void UnsubscribeFromNewTransition (const std::string &key)
 Unsubscribe from state transitions. More...
 
bool NewStatePending () const
 Returns true if a new state has been requested, signaling the current handler to stop.
 
fair::mq::State GetCurrentState () const
 Returns the current state.
 
std::string GetCurrentStateName () const
 Returns the name of the current state as a string.
 

Static Public Member Functions

static std::string GetStateName (const fair::mq::State state)
 Returns name of the given state as a string. More...
 
static std::string GetTransitionName (const fair::mq::Transition transition)
 Returns name of the given transition as a string. More...
 

Public Attributes

std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
 Device channels.
 
std::unique_ptr< fair::mq::ProgOptionsfInternalConfig
 Internal program options configuration.
 
fair::mq::ProgOptionsfConfig
 Pointer to config (internal or external)
 

Static Public Attributes

static constexpr const char * DefaultId = ""
 
static constexpr int DefaultIOThreads = 1
 
static constexpr const char * DefaultTransportName = "zeromq"
 
static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::ZMQ
 
static constexpr const char * DefaultNetworkInterface = "default"
 
static constexpr int DefaultInitTimeout = 120
 
static constexpr uint64_t DefaultMaxRunTime = 0
 
static constexpr float DefaultRate = 0.
 
static constexpr const char * DefaultSession = "default"
 

Protected Member Functions

virtual void Init ()
 Additional user initialization (can be overloaded in child classes). Prefer to use InitTask().
 
virtual void Bind ()
 
virtual void Connect ()
 
virtual void InitTask ()
 Task initialization (can be overloaded in child classes)
 
virtual void Run ()
 Runs the device (to be overloaded in child classes)
 
virtual void PreRun ()
 Called in the RUNNING state once before executing the Run()/ConditionalRun() method.
 
virtual bool ConditionalRun ()
 Called during RUNNING state repeatedly until it returns false or device state changes.
 
virtual void PostRun ()
 Called in the RUNNING state once after executing the Run()/ConditionalRun() method.
 
virtual void ResetTask ()
 Resets the user task (to be overloaded in child classes)
 
virtual void Reset ()
 Resets the device (can be overloaded in child classes)
 

Protected Attributes

std::shared_ptr< FairMQTransportFactoryfTransportFactory
 Default transport factory.
 
std::unordered_map< fair::mq::Transport, std::shared_ptr< FairMQTransportFactory > > fTransports
 Container for transports.
 
std::string fId
 Device ID.
 

Friends

class FairMQChannel
 

Member Function Documentation

◆ AddTransport()

shared_ptr< FairMQTransportFactory > FairMQDevice::AddTransport ( const fair::mq::Transport  transport)

Adds a transport to the device if it doesn't exist

Parameters
transportTransport string ("zeromq"/"shmem")

◆ ChangeState() [1/2]

bool FairMQDevice::ChangeState ( const fair::mq::Transition  transition)
inline

Request a device state transition.

Parameters
transitionstate transition

The state transition may not happen immediately, but when the current state evaluates the pending transition event and terminates. In other words, the device states are scheduled cooperatively.

◆ ChangeState() [2/2]

bool FairMQDevice::ChangeState ( const std::string &  transition)
inline

Request a device state transition.

Parameters
transitionstate transition

The state transition may not happen immediately, but when the current state evaluates the pending transition event and terminates. In other words, the device states are scheduled cooperatively.

◆ GetStateName()

static std::string FairMQDevice::GetStateName ( const fair::mq::State  state)
inlinestatic

Returns name of the given state as a string.

Parameters
statestate

◆ GetTransitionName()

static std::string FairMQDevice::GetTransitionName ( const fair::mq::Transition  transition)
inlinestatic

Returns name of the given transition as a string.

Parameters
transitiontransition

◆ Receive() [1/2]

int64_t FairMQDevice::Receive ( FairMQMessagePtr &  msg,
const std::string &  channel,
const int  index = 0,
int  rcvTimeoutInMs = -1 
)
inline

Shorthand method to receive msg on chan at index i

Parameters
msgmessage reference
chanchannel name
ichannel index
rcvTimeoutInMsreceive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
Returns
Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Receive() [2/2]

int64_t FairMQDevice::Receive ( FairMQParts parts,
const std::string &  channel,
const int  index = 0,
int  rcvTimeoutInMs = -1 
)
inline

Shorthand method to receive FairMQParts on chan at index i

Parameters
partsparts reference
chanchannel name
ichannel index
rcvTimeoutInMsreceive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
Returns
Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Send() [1/2]

int64_t FairMQDevice::Send ( FairMQMessagePtr &  msg,
const std::string &  channel,
const int  index = 0,
int  sndTimeoutInMs = -1 
)
inline

Shorthand method to send msg on chan at index i

Parameters
msgmessage reference
chanchannel name
ichannel index
sndTimeoutInMssend timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
Returns
Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Send() [2/2]

int64_t FairMQDevice::Send ( FairMQParts parts,
const std::string &  channel,
const int  index = 0,
int  sndTimeoutInMs = -1 
)
inline

Shorthand method to send FairMQParts on chan at index i

Parameters
partsparts reference
chanchannel name
ichannel index
sndTimeoutInMssend timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
Returns
Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ SetTransport()

void FairMQDevice::SetTransport ( const std::string &  transport)
inline

Sets the default transport for the device

Parameters
transportTransport string ("zeromq"/"shmem")

◆ SubscribeToNewTransition()

void FairMQDevice::SubscribeToNewTransition ( const std::string &  key,
std::function< void(const fair::mq::Transition)>  callback 
)
inline

Subscribe with a callback to incoming state transitions.

Parameters
keyid to identify your subscription
callbackcallback (called with the incoming transition as the parameter) The callback is called when new transition is initiated. The callback is called from the thread that initiates the transition (via ChangeState).

◆ SubscribeToStateChange()

void FairMQDevice::SubscribeToStateChange ( const std::string &  key,
std::function< void(const fair::mq::State)>  callback 
)
inline

Subscribe with a callback to state changes.

Parameters
keyid to identify your subscription
callbackcallback (called with the new state as the parameter)

The callback is called at the beginning of a new state. The callback is called from the thread the state is running in.

◆ UnsubscribeFromNewTransition()

void FairMQDevice::UnsubscribeFromNewTransition ( const std::string &  key)
inline

Unsubscribe from state transitions.

Parameters
keyid (that was used when subscribing)

◆ UnsubscribeFromStateChange()

void FairMQDevice::UnsubscribeFromStateChange ( const std::string &  key)
inline

Unsubscribe from state changes.

Parameters
keyid (that was used when subscribing)

◆ WaitFor()

template<typename Rep , typename Period >
bool FairMQDevice::WaitFor ( std::chrono::duration< Rep, Period > const &  duration)
inline

Wait for the supplied amount of time or for interruption. If interrupted, returns false, otherwise true.

Parameters
durationwait duration

◆ WaitForState() [1/2]

void FairMQDevice::WaitForState ( const std::string &  state)
inline

waits for the specified state to occur

Parameters
statestate to wait for

◆ WaitForState() [2/2]

void FairMQDevice::WaitForState ( fair::mq::State  state)
inline

waits for the specified state to occur

Parameters
statestate to wait for

The documentation for this class was generated from the following files:

privacy