FairMQ  1.4.33
C++ Message Queuing Library and Framework
Classes | Public Member Functions | Static Public Attributes | Friends | List of all members
FairMQChannel Class Reference

Wrapper class for FairMQSocket and related methods. More...

#include <FairMQChannel.h>

Classes

struct  ChannelConfigurationError
 

Public Member Functions

 FairMQChannel ()
 Default constructor.
 
 FairMQChannel (const std::string &name)
 
 FairMQChannel (const std::string &type, const std::string &method, const std::string &address)
 
 FairMQChannel (const std::string &name, const std::string &type, std::shared_ptr< FairMQTransportFactory > factory)
 
 FairMQChannel (const std::string &name, const std::string &type, const std::string &method, const std::string &address, std::shared_ptr< FairMQTransportFactory > factory)
 
 FairMQChannel (const std::string &name, int index, const fair::mq::Properties &properties)
 
 FairMQChannel (const FairMQChannel &)
 Copy Constructor.
 
 FairMQChannel (const FairMQChannel &, const std::string &name)
 Copy Constructor (with new name)
 
FairMQChanneloperator= (const FairMQChannel &)
 Move constructor. More...
 
virtual ~FairMQChannel ()
 Move assignment operator. More...
 
FairMQSocketGetSocket () const
 
bool Bind (const std::string &address)
 
bool Connect (const std::string &address)
 
std::string GetName () const
 
std::string GetPrefix () const
 
std::string GetIndex () const
 
std::string GetType () const
 
std::string GetMethod () const
 
std::string GetAddress () const
 
std::string GetTransportName () const
 
fair::mq::Transport GetTransportType () const
 
int GetSndBufSize () const
 
int GetRcvBufSize () const
 
int GetSndKernelSize () const
 
int GetRcvKernelSize () const
 
int GetLinger () const
 
int GetRateLogging () const
 
int GetPortRangeMin () const
 
int GetPortRangeMax () const
 
bool GetAutoBind () const
 
void UpdateName (const std::string &name)
 
void UpdateType (const std::string &type)
 
void UpdateMethod (const std::string &method)
 
void UpdateAddress (const std::string &address)
 
void UpdateTransport (const std::string &transport)
 
void UpdateSndBufSize (const int sndBufSize)
 
void UpdateRcvBufSize (const int rcvBufSize)
 
void UpdateSndKernelSize (const int sndKernelSize)
 
void UpdateRcvKernelSize (const int rcvKernelSize)
 
void UpdateLinger (const int duration)
 
void UpdateRateLogging (const int rateLogging)
 
void UpdatePortRangeMin (const int minPort)
 
void UpdatePortRangeMax (const int maxPort)
 
void UpdateAutoBind (const bool autobind)
 
bool IsValid () const
 
bool Validate ()
 
void Init ()
 
bool ConnectEndpoint (const std::string &endpoint)
 
bool BindEndpoint (std::string &endpoint)
 
void Invalidate ()
 invalidates the channel (requires validation to be used again).
 
int64_t Send (FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
 
int64_t Receive (FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
 
int64_t Send (std::vector< FairMQMessagePtr > &msgVec, int sndTimeoutInMs=-1)
 
int64_t Receive (std::vector< FairMQMessagePtr > &msgVec, int rcvTimeoutInMs=-1)
 
int64_t Send (FairMQParts &parts, int sndTimeoutInMs=-1)
 
int64_t Receive (FairMQParts &parts, int rcvTimeoutInMs=-1)
 
unsigned long GetBytesTx () const
 
unsigned long GetBytesRx () const
 
unsigned long GetMessagesTx () const
 
unsigned long GetMessagesRx () const
 
auto Transport () -> FairMQTransportFactory *
 
template<typename... Args>
FairMQMessagePtr NewMessage (Args &&... args)
 
template<typename T >
FairMQMessagePtr NewSimpleMessage (const T &data)
 
template<typename T >
FairMQMessagePtr NewStaticMessage (const T &data)
 
template<typename... Args>
FairMQUnmanagedRegionPtr NewUnmanagedRegion (Args &&... args)
 

Static Public Attributes

static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT
 
static constexpr const char * DefaultTransportName = "default"
 
static constexpr const char * DefaultName = ""
 
static constexpr const char * DefaultType = "unspecified"
 
static constexpr const char * DefaultMethod = "unspecified"
 
static constexpr const char * DefaultAddress = "unspecified"
 
static constexpr int DefaultSndBufSize = 1000
 
static constexpr int DefaultRcvBufSize = 1000
 
static constexpr int DefaultSndKernelSize = 0
 
static constexpr int DefaultRcvKernelSize = 0
 
static constexpr int DefaultLinger = 500
 
static constexpr int DefaultRateLogging = 1
 
static constexpr int DefaultPortRangeMin = 22000
 
static constexpr int DefaultPortRangeMax = 23000
 
static constexpr bool DefaultAutoBind = true
 

Friends

class FairMQDevice
 

Detailed Description

Wrapper class for FairMQSocket and related methods.

The class is not thread-safe.

Constructor & Destructor Documentation

◆ FairMQChannel() [1/4]

FairMQChannel::FairMQChannel ( const std::string &  name)

Constructor

Parameters
nameChannel name

◆ FairMQChannel() [2/4]

FairMQChannel::FairMQChannel ( const std::string &  type,
const std::string &  method,
const std::string &  address 
)

Constructor

Parameters
typeSocket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
methodSocket method (bind/connect)
addressNetwork address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")

◆ FairMQChannel() [3/4]

FairMQChannel::FairMQChannel ( const std::string &  name,
const std::string &  type,
std::shared_ptr< FairMQTransportFactory factory 
)

Constructor

Parameters
nameChannel name
typeSocket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
factoryTransportFactory

◆ FairMQChannel() [4/4]

FairMQChannel::FairMQChannel ( const std::string &  name,
const std::string &  type,
const std::string &  method,
const std::string &  address,
std::shared_ptr< FairMQTransportFactory factory 
)

Constructor

Parameters
nameChannel name
typeSocket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
methodSocket method (bind/connect)
addressNetwork address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
factoryTransportFactory

◆ ~FairMQChannel()

virtual FairMQChannel::~FairMQChannel ( )
inlinevirtual

Move assignment operator.

Destructor

Member Function Documentation

◆ GetAddress()

std::string FairMQChannel::GetAddress ( ) const
inline

Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")

Returns
Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")

◆ GetAutoBind()

bool FairMQChannel::GetAutoBind ( ) const
inline

Set automatic binding (pick random port if bind fails)

Returns
true/false, true if automatic binding is enabled

◆ GetIndex()

std::string FairMQChannel::GetIndex ( ) const
inline

Get channel index

Returns
Returns channel index (e.g. 0 in "data[0]")

◆ GetLinger()

int FairMQChannel::GetLinger ( ) const
inline

Get linger duration (in milliseconds)

Returns
Returns linger duration (in milliseconds)

◆ GetMethod()

std::string FairMQChannel::GetMethod ( ) const
inline

Get socket method

Returns
Returns socket method (bind/connect)

◆ GetName()

std::string FairMQChannel::GetName ( ) const
inline

Get channel name

Returns
Returns full channel name (e.g. "data[0]")

◆ GetPortRangeMax()

int FairMQChannel::GetPortRangeMax ( ) const
inline

Get end of the port range for automatic binding

Returns
end of the port range

◆ GetPortRangeMin()

int FairMQChannel::GetPortRangeMin ( ) const
inline

Get start of the port range for automatic binding

Returns
start of the port range

◆ GetPrefix()

std::string FairMQChannel::GetPrefix ( ) const
inline

Get channel prefix

Returns
Returns channel prefix (e.g. "data" in "data[0]")

◆ GetRateLogging()

int FairMQChannel::GetRateLogging ( ) const
inline

Get socket rate logging interval (in seconds)

Returns
Returns socket rate logging interval (in seconds)

◆ GetRcvBufSize()

int FairMQChannel::GetRcvBufSize ( ) const
inline

Get socket receive buffer size (in number of messages)

Returns
Returns socket receive buffer size (in number of messages)

◆ GetRcvKernelSize()

int FairMQChannel::GetRcvKernelSize ( ) const
inline

Get socket kernel transmit receive buffer size (in bytes)

Returns
Returns socket kernel transmit receive buffer size (in bytes)

◆ GetSndBufSize()

int FairMQChannel::GetSndBufSize ( ) const
inline

Get socket send buffer size (in number of messages)

Returns
Returns socket send buffer size (in number of messages)

◆ GetSndKernelSize()

int FairMQChannel::GetSndKernelSize ( ) const
inline

Get socket kernel transmit send buffer size (in bytes)

Returns
Returns socket kernel transmit send buffer size (in bytes)

◆ GetTransportName()

std::string FairMQChannel::GetTransportName ( ) const
inline

Get channel transport name ("default", "zeromq" or "shmem")

Returns
Returns channel transport name (e.g. "default", "zeromq" or "shmem")

◆ GetTransportType()

fair::mq::Transport FairMQChannel::GetTransportType ( ) const
inline

Get channel transport type

Returns
Returns channel transport type

◆ GetType()

std::string FairMQChannel::GetType ( ) const
inline

Get socket type

Returns
Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)

◆ IsValid()

bool FairMQChannel::IsValid ( ) const
inline

Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel()))

Returns
true if channel settings are valid, false otherwise.

◆ operator=()

FairMQChannel & FairMQChannel::operator= ( const FairMQChannel chan)

Move constructor.

Assignment operator

◆ Receive() [1/3]

int64_t FairMQChannel::Receive ( FairMQMessagePtr &  msg,
int  rcvTimeoutInMs = -1 
)
inline

Receives a message from the socket queue.

Parameters
msgConstant reference of unique_ptr to a FairMQMessage
rcvTimeoutInMsreceive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
Returns
Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Receive() [2/3]

int64_t FairMQChannel::Receive ( FairMQParts parts,
int  rcvTimeoutInMs = -1 
)
inline

Receive FairMQParts

Parameters
partsFairMQParts reference
rcvTimeoutInMsreceive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
Returns
Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Receive() [3/3]

int64_t FairMQChannel::Receive ( std::vector< FairMQMessagePtr > &  msgVec,
int  rcvTimeoutInMs = -1 
)
inline

Receive a vector of messages

Parameters
msgVecmessage vector reference
rcvTimeoutInMsreceive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
Returns
Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Send() [1/3]

int64_t FairMQChannel::Send ( FairMQMessagePtr &  msg,
int  sndTimeoutInMs = -1 
)
inline

Sends a message to the socket queue.

Parameters
msgConstant reference of unique_ptr to a FairMQMessage
sndTimeoutInMssend timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
Returns
Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Send() [2/3]

int64_t FairMQChannel::Send ( FairMQParts parts,
int  sndTimeoutInMs = -1 
)
inline

Send FairMQParts

Parameters
partsFairMQParts reference
sndTimeoutInMssend timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
Returns
Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ Send() [3/3]

int64_t FairMQChannel::Send ( std::vector< FairMQMessagePtr > &  msgVec,
int  sndTimeoutInMs = -1 
)
inline

Send a vector of messages

Parameters
msgVecmessage vector reference
sndTimeoutInMssend timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
Returns
Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)

◆ UpdateAddress()

void FairMQChannel::UpdateAddress ( const std::string &  address)
inline

Set socket address

Parameters
Socketaddress (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")

◆ UpdateAutoBind()

void FairMQChannel::UpdateAutoBind ( const bool  autobind)
inline

Set automatic binding (pick random port if bind fails)

Parameters
autobindtrue/false, true to enable automatic binding

◆ UpdateLinger()

void FairMQChannel::UpdateLinger ( const int  duration)
inline

Set linger duration (in milliseconds)

Parameters
durationlinger duration (in milliseconds)

◆ UpdateMethod()

void FairMQChannel::UpdateMethod ( const std::string &  method)
inline

Set socket method

Parameters
methodSocket method (bind/connect)

◆ UpdateName()

void FairMQChannel::UpdateName ( const std::string &  name)
inline

Set channel name

Parameters
nameArbitrary channel name

◆ UpdatePortRangeMax()

void FairMQChannel::UpdatePortRangeMax ( const int  maxPort)
inline

Set end of the port range for automatic binding

Parameters
maxPortend of the port range

◆ UpdatePortRangeMin()

void FairMQChannel::UpdatePortRangeMin ( const int  minPort)
inline

Set start of the port range for automatic binding

Parameters
minPortstart of the port range

◆ UpdateRateLogging()

void FairMQChannel::UpdateRateLogging ( const int  rateLogging)
inline

Set socket rate logging interval (in seconds)

Parameters
rateLoggingSocket rate logging interval (in seconds)

◆ UpdateRcvBufSize()

void FairMQChannel::UpdateRcvBufSize ( const int  rcvBufSize)
inline

Set socket receive buffer size

Parameters
rcvBufSizeSocket receive buffer size (in number of messages)

◆ UpdateRcvKernelSize()

void FairMQChannel::UpdateRcvKernelSize ( const int  rcvKernelSize)
inline

Set socket kernel transmit receive buffer size (in bytes)

Parameters
rcvKernelSizeSocket receive buffer size (in bytes)

◆ UpdateSndBufSize()

void FairMQChannel::UpdateSndBufSize ( const int  sndBufSize)
inline

Set socket send buffer size

Parameters
sndBufSizeSocket send buffer size (in number of messages)

◆ UpdateSndKernelSize()

void FairMQChannel::UpdateSndKernelSize ( const int  sndKernelSize)
inline

Set socket kernel transmit send buffer size (in bytes)

Parameters
sndKernelSizeSocket send buffer size (in bytes)

◆ UpdateTransport()

void FairMQChannel::UpdateTransport ( const std::string &  transport)
inline

Set channel transport

Parameters
transporttransport string ("default", "zeromq" or "shmem")

◆ UpdateType()

void FairMQChannel::UpdateType ( const std::string &  type)
inline

Set socket type

Parameters
typeSocket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)

◆ Validate()

bool FairMQChannel::Validate ( )

Validates channel configuration

Returns
true if channel settings are valid, false otherwise.

The documentation for this class was generated from the following files:

privacy