FairMQ
1.4.33
C++ Message Queuing Library and Framework
|
9 #ifndef FAIRMQCHANNEL_H_
10 #define FAIRMQCHANNEL_H_
12 #include <FairMQTransportFactory.h>
13 #include <FairMQUnmanagedRegion.h>
14 #include <FairMQSocket.h>
15 #include <fairmq/Transports.h>
16 #include <FairMQParts.h>
17 #include <fairmq/Properties.h>
18 #include <FairMQMessage.h>
50 FairMQChannel(
const std::string& type,
const std::string& method,
const std::string& address);
56 FairMQChannel(
const std::string& name,
const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
64 FairMQChannel(
const std::string& name,
const std::string& type,
const std::string& method,
const std::string& address, std::shared_ptr<FairMQTransportFactory> factory);
66 FairMQChannel(
const std::string& name,
int index,
const fair::mq::Properties& properties);
88 FairMQSocket& GetSocket()
const { assert(fSocket);
return *fSocket; }
90 bool Bind(
const std::string& address)
94 return fSocket->Bind(address);
97 bool Connect(
const std::string& address)
101 return fSocket->Connect(address);
112 std::string prefix = fName;
113 prefix = prefix.erase(fName.rfind(
'['));
121 std::string indexStr = fName;
122 indexStr.erase(indexStr.rfind(
']'));
123 indexStr.erase(0, indexStr.rfind(
'[') + 1);
249 bool ConnectEndpoint(
const std::string& endpoint);
251 bool BindEndpoint(std::string& endpoint);
260 int64_t
Send(FairMQMessagePtr& msg,
int sndTimeoutInMs = -1)
262 CheckSendCompatibility(msg);
263 return fSocket->Send(msg, sndTimeoutInMs);
270 int64_t
Receive(FairMQMessagePtr& msg,
int rcvTimeoutInMs = -1)
272 CheckReceiveCompatibility(msg);
273 return fSocket->Receive(msg, rcvTimeoutInMs);
280 int64_t
Send(std::vector<FairMQMessagePtr>& msgVec,
int sndTimeoutInMs = -1)
282 CheckSendCompatibility(msgVec);
283 return fSocket->Send(msgVec, sndTimeoutInMs);
290 int64_t
Receive(std::vector<FairMQMessagePtr>& msgVec,
int rcvTimeoutInMs = -1)
292 CheckReceiveCompatibility(msgVec);
293 return fSocket->Receive(msgVec, rcvTimeoutInMs);
302 return Send(parts.fParts, sndTimeoutInMs);
311 return Receive(parts.fParts, rcvTimeoutInMs);
314 unsigned long GetBytesTx()
const {
return fSocket->GetBytesTx(); }
315 unsigned long GetBytesRx()
const {
return fSocket->GetBytesRx(); }
316 unsigned long GetMessagesTx()
const {
return fSocket->GetMessagesTx(); }
317 unsigned long GetMessagesRx()
const {
return fSocket->GetMessagesRx(); }
321 template<
typename... Args>
322 FairMQMessagePtr NewMessage(Args&&... args)
324 return Transport()->CreateMessage(std::forward<Args>(args)...);
328 FairMQMessagePtr NewSimpleMessage(
const T& data)
330 return Transport()->NewSimpleMessage(data);
334 FairMQMessagePtr NewStaticMessage(
const T& data)
336 return Transport()->NewStaticMessage(data);
339 template<
typename... Args>
340 FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
342 return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...);
345 static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
346 static constexpr
const char* DefaultTransportName =
"default";
347 static constexpr
const char* DefaultName =
"";
348 static constexpr
const char* DefaultType =
"unspecified";
349 static constexpr
const char* DefaultMethod =
"unspecified";
350 static constexpr
const char* DefaultAddress =
"unspecified";
351 static constexpr
int DefaultSndBufSize = 1000;
352 static constexpr
int DefaultRcvBufSize = 1000;
353 static constexpr
int DefaultSndKernelSize = 0;
354 static constexpr
int DefaultRcvKernelSize = 0;
355 static constexpr
int DefaultLinger = 500;
356 static constexpr
int DefaultRateLogging = 1;
357 static constexpr
int DefaultPortRangeMin = 22000;
358 static constexpr
int DefaultPortRangeMax = 23000;
359 static constexpr
bool DefaultAutoBind =
true;
362 std::shared_ptr<FairMQTransportFactory> fTransportFactory;
363 fair::mq::Transport fTransportType;
364 std::unique_ptr<FairMQSocket> fSocket;
369 std::string fAddress;
384 void CheckSendCompatibility(FairMQMessagePtr& msg)
386 if (fTransportType != msg->GetType()) {
387 FairMQMessagePtr msgWrapper(NewMessage(
390 [](
void* ,
void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
394 msg = move(msgWrapper);
398 void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec)
400 for (
auto& msg : msgVec) {
401 if (fTransportType != msg->GetType()) {
403 FairMQMessagePtr msgWrapper(NewMessage(
406 [](
void* ,
void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
410 msg = move(msgWrapper);
415 void CheckReceiveCompatibility(FairMQMessagePtr& msg)
417 if (fTransportType != msg->GetType()) {
418 FairMQMessagePtr newMsg(NewMessage());
423 void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec)
425 for (
auto& msg : msgVec) {
426 if (fTransportType != msg->GetType()) {
428 FairMQMessagePtr newMsg(NewMessage());
434 void InitTransport(std::shared_ptr<FairMQTransportFactory> factory)
436 fTransportFactory = factory;
437 fTransportType = factory->GetType();
void UpdatePortRangeMax(const int maxPort)
Definition: FairMQChannel.h:233
Definition: FairMQSocket.h:36
FairMQChannel(const FairMQChannel &, const std::string &name)
Copy Constructor (with new name)
virtual ~FairMQChannel()
Move assignment operator.
Definition: FairMQChannel.h:84
bool Validate()
Definition: FairMQChannel.cxx:163
fair::mq::Transport GetTransportType() const
Definition: FairMQChannel.h:145
int GetLinger() const
Definition: FairMQChannel.h:165
std::string GetIndex() const
Definition: FairMQChannel.h:119
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:21
int64_t Send(FairMQParts &parts, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:300
std::string GetAddress() const
Definition: FairMQChannel.h:137
Definition: FairMQChannel.h:86
void UpdateAutoBind(const bool autobind)
Definition: FairMQChannel.h:237
void UpdateRcvKernelSize(const int rcvKernelSize)
Definition: FairMQChannel.h:217
void UpdateAddress(const std::string &address)
Definition: FairMQChannel.h:197
std::string GetTransportName() const
Definition: FairMQChannel.h:141
bool GetAutoBind() const
Definition: FairMQChannel.h:181
int GetRateLogging() const
Definition: FairMQChannel.h:169
std::string GetPrefix() const
Definition: FairMQChannel.h:110
void UpdateRateLogging(const int rateLogging)
Definition: FairMQChannel.h:225
int64_t Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:270
int64_t Send(FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:260
void Invalidate()
invalidates the channel (requires validation to be used again).
Definition: FairMQChannel.h:254
int GetRcvKernelSize() const
Definition: FairMQChannel.h:161
void UpdateMethod(const std::string &method)
Definition: FairMQChannel.h:193
void UpdateTransport(const std::string &transport)
Definition: FairMQChannel.h:201
int GetSndBufSize() const
Definition: FairMQChannel.h:149
void UpdateLinger(const int duration)
Definition: FairMQChannel.h:221
FairMQChannel(const std::string &name, const std::string &type, std::shared_ptr< FairMQTransportFactory > factory)
void UpdateSndBufSize(const int sndBufSize)
Definition: FairMQChannel.h:205
void UpdateSndKernelSize(const int sndKernelSize)
Definition: FairMQChannel.h:213
int64_t Receive(FairMQParts &parts, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:309
void UpdateName(const std::string &name)
Definition: FairMQChannel.h:185
void UpdateRcvBufSize(const int rcvBufSize)
Definition: FairMQChannel.h:209
FairMQChannel & operator=(const FairMQChannel &)
Move constructor.
Definition: FairMQChannel.cxx:135
bool IsValid() const
Definition: FairMQChannel.h:241
std::string GetType() const
Definition: FairMQChannel.h:129
Wrapper class for FairMQSocket and related methods.
Definition: FairMQChannel.h:35
FairMQChannel(const std::string &name, const std::string &type, const std::string &method, const std::string &address, std::shared_ptr< FairMQTransportFactory > factory)
int GetSndKernelSize() const
Definition: FairMQChannel.h:157
FairMQChannel(const std::string &type, const std::string &method, const std::string &address)
int64_t Receive(std::vector< FairMQMessagePtr > &msgVec, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:290
int GetPortRangeMax() const
Definition: FairMQChannel.h:177
FairMQChannel(const std::string &name)
int64_t Send(std::vector< FairMQMessagePtr > &msgVec, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:280
int GetPortRangeMin() const
Definition: FairMQChannel.h:173
void UpdatePortRangeMin(const int minPort)
Definition: FairMQChannel.h:229
FairMQChannel()
Default constructor.
Definition: FairMQChannel.cxx:51
std::string GetMethod() const
Definition: FairMQChannel.h:133
Definition: FairMQDevice.h:50
void UpdateType(const std::string &type)
Definition: FairMQChannel.h:189
std::string GetName() const
Definition: FairMQChannel.h:106
Definition: FairMQTransportFactory.h:30
int GetRcvBufSize() const
Definition: FairMQChannel.h:153
privacy