FairMQ  1.4.33
C++ Message Queuing Library and Framework
FairMQChannel.h
1 /********************************************************************************
2  * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQCHANNEL_H_
10 #define FAIRMQCHANNEL_H_
11 
12 #include <FairMQTransportFactory.h>
13 #include <FairMQUnmanagedRegion.h>
14 #include <FairMQSocket.h>
15 #include <fairmq/Transports.h>
16 #include <FairMQParts.h>
17 #include <fairmq/Properties.h>
18 #include <FairMQMessage.h>
19 
20 #include <string>
21 #include <memory> // unique_ptr, shared_ptr
22 #include <vector>
23 #include <mutex>
24 #include <stdexcept>
25 #include <utility> // std::move
26 #include <cstdint> // int64_t
27 
35 {
36  friend class FairMQDevice;
37 
38  public:
40  FairMQChannel();
41 
44  FairMQChannel(const std::string& name);
45 
50  FairMQChannel(const std::string& type, const std::string& method, const std::string& address);
51 
56  FairMQChannel(const std::string& name, const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
57 
64  FairMQChannel(const std::string& name, const std::string& type, const std::string& method, const std::string& address, std::shared_ptr<FairMQTransportFactory> factory);
65 
66  FairMQChannel(const std::string& name, int index, const fair::mq::Properties& properties);
67 
70 
72  FairMQChannel(const FairMQChannel&, const std::string& name);
73 
75  // FairMQChannel(FairMQChannel&&) = delete;
76 
79 
81  // FairMQChannel& operator=(FairMQChannel&&) = delete;
82 
84  virtual ~FairMQChannel() { /* LOG(warn) << "Destroying channel '" << fName << "'"; */ }
85 
86  struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
87 
88  FairMQSocket& GetSocket() const { assert(fSocket); return *fSocket; }
89 
90  bool Bind(const std::string& address)
91  {
92  fMethod = "bind";
93  fAddress = address;
94  return fSocket->Bind(address);
95  }
96 
97  bool Connect(const std::string& address)
98  {
99  fMethod = "connect";
100  fAddress = address;
101  return fSocket->Connect(address);
102  }
103 
106  std::string GetName() const { return fName; }
107 
110  std::string GetPrefix() const
111  {
112  std::string prefix = fName;
113  prefix = prefix.erase(fName.rfind('['));
114  return prefix;
115  }
116 
119  std::string GetIndex() const
120  {
121  std::string indexStr = fName;
122  indexStr.erase(indexStr.rfind(']'));
123  indexStr.erase(0, indexStr.rfind('[') + 1);
124  return indexStr;
125  }
126 
129  std::string GetType() const { return fType; }
130 
133  std::string GetMethod() const { return fMethod; }
134 
137  std::string GetAddress() const { return fAddress; }
138 
141  std::string GetTransportName() const { return fair::mq::TransportName(fTransportType); }
142 
145  fair::mq::Transport GetTransportType() const { return fTransportType; }
146 
149  int GetSndBufSize() const { return fSndBufSize; }
150 
153  int GetRcvBufSize() const { return fRcvBufSize; }
154 
157  int GetSndKernelSize() const { return fSndKernelSize; }
158 
161  int GetRcvKernelSize() const { return fRcvKernelSize; }
162 
165  int GetLinger() const { return fLinger; }
166 
169  int GetRateLogging() const { return fRateLogging; }
170 
173  int GetPortRangeMin() const { return fPortRangeMin; }
174 
177  int GetPortRangeMax() const { return fPortRangeMax; }
178 
181  bool GetAutoBind() const { return fAutoBind; }
182 
185  void UpdateName(const std::string& name) { fName = name; Invalidate(); }
186 
189  void UpdateType(const std::string& type) { fType = type; Invalidate(); }
190 
193  void UpdateMethod(const std::string& method) { fMethod = method; Invalidate(); }
194 
197  void UpdateAddress(const std::string& address) { fAddress = address; Invalidate(); }
198 
201  void UpdateTransport(const std::string& transport) { fTransportType = fair::mq::TransportType(transport); Invalidate(); }
202 
205  void UpdateSndBufSize(const int sndBufSize) { fSndBufSize = sndBufSize; Invalidate(); }
206 
209  void UpdateRcvBufSize(const int rcvBufSize) { fRcvBufSize = rcvBufSize; Invalidate(); }
210 
213  void UpdateSndKernelSize(const int sndKernelSize) { fSndKernelSize = sndKernelSize; Invalidate(); }
214 
217  void UpdateRcvKernelSize(const int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); }
218 
221  void UpdateLinger(const int duration) { fLinger = duration; Invalidate(); }
222 
225  void UpdateRateLogging(const int rateLogging) { fRateLogging = rateLogging; Invalidate(); }
226 
229  void UpdatePortRangeMin(const int minPort) { fPortRangeMin = minPort; Invalidate(); }
230 
233  void UpdatePortRangeMax(const int maxPort) { fPortRangeMax = maxPort; Invalidate(); }
234 
237  void UpdateAutoBind(const bool autobind) { fAutoBind = autobind; Invalidate(); }
238 
241  bool IsValid() const { return fValid; }
242 
245  bool Validate();
246 
247  void Init();
248 
249  bool ConnectEndpoint(const std::string& endpoint);
250 
251  bool BindEndpoint(std::string& endpoint);
252 
254  void Invalidate() { fValid = false; }
255 
260  int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
261  {
262  CheckSendCompatibility(msg);
263  return fSocket->Send(msg, sndTimeoutInMs);
264  }
265 
270  int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
271  {
272  CheckReceiveCompatibility(msg);
273  return fSocket->Receive(msg, rcvTimeoutInMs);
274  }
275 
280  int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
281  {
282  CheckSendCompatibility(msgVec);
283  return fSocket->Send(msgVec, sndTimeoutInMs);
284  }
285 
290  int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
291  {
292  CheckReceiveCompatibility(msgVec);
293  return fSocket->Receive(msgVec, rcvTimeoutInMs);
294  }
295 
300  int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
301  {
302  return Send(parts.fParts, sndTimeoutInMs);
303  }
304 
309  int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
310  {
311  return Receive(parts.fParts, rcvTimeoutInMs);
312  }
313 
314  unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
315  unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); }
316  unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); }
317  unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); }
318 
319  auto Transport() -> FairMQTransportFactory* { return fTransportFactory.get(); };
320 
321  template<typename... Args>
322  FairMQMessagePtr NewMessage(Args&&... args)
323  {
324  return Transport()->CreateMessage(std::forward<Args>(args)...);
325  }
326 
327  template<typename T>
328  FairMQMessagePtr NewSimpleMessage(const T& data)
329  {
330  return Transport()->NewSimpleMessage(data);
331  }
332 
333  template<typename T>
334  FairMQMessagePtr NewStaticMessage(const T& data)
335  {
336  return Transport()->NewStaticMessage(data);
337  }
338 
339  template<typename... Args>
340  FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
341  {
342  return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...);
343  }
344 
345  static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
346  static constexpr const char* DefaultTransportName = "default";
347  static constexpr const char* DefaultName = "";
348  static constexpr const char* DefaultType = "unspecified";
349  static constexpr const char* DefaultMethod = "unspecified";
350  static constexpr const char* DefaultAddress = "unspecified";
351  static constexpr int DefaultSndBufSize = 1000;
352  static constexpr int DefaultRcvBufSize = 1000;
353  static constexpr int DefaultSndKernelSize = 0;
354  static constexpr int DefaultRcvKernelSize = 0;
355  static constexpr int DefaultLinger = 500;
356  static constexpr int DefaultRateLogging = 1;
357  static constexpr int DefaultPortRangeMin = 22000;
358  static constexpr int DefaultPortRangeMax = 23000;
359  static constexpr bool DefaultAutoBind = true;
360 
361  private:
362  std::shared_ptr<FairMQTransportFactory> fTransportFactory;
363  fair::mq::Transport fTransportType;
364  std::unique_ptr<FairMQSocket> fSocket;
365 
366  std::string fName;
367  std::string fType;
368  std::string fMethod;
369  std::string fAddress;
370  int fSndBufSize;
371  int fRcvBufSize;
372  int fSndKernelSize;
373  int fRcvKernelSize;
374  int fLinger;
375  int fRateLogging;
376  int fPortRangeMin;
377  int fPortRangeMax;
378  bool fAutoBind;
379 
380  bool fValid;
381 
382  bool fMultipart;
383 
384  void CheckSendCompatibility(FairMQMessagePtr& msg)
385  {
386  if (fTransportType != msg->GetType()) {
387  FairMQMessagePtr msgWrapper(NewMessage(
388  msg->GetData(),
389  msg->GetSize(),
390  [](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
391  msg.get()
392  ));
393  msg.release();
394  msg = move(msgWrapper);
395  }
396  }
397 
398  void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec)
399  {
400  for (auto& msg : msgVec) {
401  if (fTransportType != msg->GetType()) {
402 
403  FairMQMessagePtr msgWrapper(NewMessage(
404  msg->GetData(),
405  msg->GetSize(),
406  [](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
407  msg.get()
408  ));
409  msg.release();
410  msg = move(msgWrapper);
411  }
412  }
413  }
414 
415  void CheckReceiveCompatibility(FairMQMessagePtr& msg)
416  {
417  if (fTransportType != msg->GetType()) {
418  FairMQMessagePtr newMsg(NewMessage());
419  msg = move(newMsg);
420  }
421  }
422 
423  void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec)
424  {
425  for (auto& msg : msgVec) {
426  if (fTransportType != msg->GetType()) {
427 
428  FairMQMessagePtr newMsg(NewMessage());
429  msg = move(newMsg);
430  }
431  }
432  }
433 
434  void InitTransport(std::shared_ptr<FairMQTransportFactory> factory)
435  {
436  fTransportFactory = factory;
437  fTransportType = factory->GetType();
438  }
439 };
440 
441 #endif /* FAIRMQCHANNEL_H_ */
FairMQChannel::UpdatePortRangeMax
void UpdatePortRangeMax(const int maxPort)
Definition: FairMQChannel.h:233
FairMQSocket
Definition: FairMQSocket.h:36
FairMQChannel::FairMQChannel
FairMQChannel(const FairMQChannel &, const std::string &name)
Copy Constructor (with new name)
FairMQChannel::~FairMQChannel
virtual ~FairMQChannel()
Move assignment operator.
Definition: FairMQChannel.h:84
FairMQChannel::Validate
bool Validate()
Definition: FairMQChannel.cxx:163
FairMQChannel::GetTransportType
fair::mq::Transport GetTransportType() const
Definition: FairMQChannel.h:145
FairMQChannel::GetLinger
int GetLinger() const
Definition: FairMQChannel.h:165
FairMQChannel::GetIndex
std::string GetIndex() const
Definition: FairMQChannel.h:119
FairMQParts
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:21
FairMQChannel::Send
int64_t Send(FairMQParts &parts, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:300
FairMQChannel::GetAddress
std::string GetAddress() const
Definition: FairMQChannel.h:137
FairMQChannel::ChannelConfigurationError
Definition: FairMQChannel.h:86
FairMQChannel::UpdateAutoBind
void UpdateAutoBind(const bool autobind)
Definition: FairMQChannel.h:237
FairMQChannel::UpdateRcvKernelSize
void UpdateRcvKernelSize(const int rcvKernelSize)
Definition: FairMQChannel.h:217
FairMQChannel::UpdateAddress
void UpdateAddress(const std::string &address)
Definition: FairMQChannel.h:197
FairMQChannel::GetTransportName
std::string GetTransportName() const
Definition: FairMQChannel.h:141
FairMQChannel::GetAutoBind
bool GetAutoBind() const
Definition: FairMQChannel.h:181
FairMQChannel::GetRateLogging
int GetRateLogging() const
Definition: FairMQChannel.h:169
FairMQChannel::GetPrefix
std::string GetPrefix() const
Definition: FairMQChannel.h:110
FairMQChannel::UpdateRateLogging
void UpdateRateLogging(const int rateLogging)
Definition: FairMQChannel.h:225
FairMQChannel::Receive
int64_t Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:270
FairMQChannel::Send
int64_t Send(FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:260
FairMQChannel::Invalidate
void Invalidate()
invalidates the channel (requires validation to be used again).
Definition: FairMQChannel.h:254
FairMQChannel::GetRcvKernelSize
int GetRcvKernelSize() const
Definition: FairMQChannel.h:161
FairMQChannel::UpdateMethod
void UpdateMethod(const std::string &method)
Definition: FairMQChannel.h:193
FairMQChannel::UpdateTransport
void UpdateTransport(const std::string &transport)
Definition: FairMQChannel.h:201
FairMQChannel::GetSndBufSize
int GetSndBufSize() const
Definition: FairMQChannel.h:149
FairMQChannel::UpdateLinger
void UpdateLinger(const int duration)
Definition: FairMQChannel.h:221
FairMQChannel::FairMQChannel
FairMQChannel(const std::string &name, const std::string &type, std::shared_ptr< FairMQTransportFactory > factory)
FairMQChannel::UpdateSndBufSize
void UpdateSndBufSize(const int sndBufSize)
Definition: FairMQChannel.h:205
FairMQChannel::UpdateSndKernelSize
void UpdateSndKernelSize(const int sndKernelSize)
Definition: FairMQChannel.h:213
FairMQChannel::Receive
int64_t Receive(FairMQParts &parts, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:309
FairMQChannel::UpdateName
void UpdateName(const std::string &name)
Definition: FairMQChannel.h:185
FairMQChannel::UpdateRcvBufSize
void UpdateRcvBufSize(const int rcvBufSize)
Definition: FairMQChannel.h:209
FairMQChannel::operator=
FairMQChannel & operator=(const FairMQChannel &)
Move constructor.
Definition: FairMQChannel.cxx:135
FairMQChannel::IsValid
bool IsValid() const
Definition: FairMQChannel.h:241
FairMQChannel::GetType
std::string GetType() const
Definition: FairMQChannel.h:129
FairMQChannel
Wrapper class for FairMQSocket and related methods.
Definition: FairMQChannel.h:35
FairMQChannel::FairMQChannel
FairMQChannel(const std::string &name, const std::string &type, const std::string &method, const std::string &address, std::shared_ptr< FairMQTransportFactory > factory)
FairMQChannel::GetSndKernelSize
int GetSndKernelSize() const
Definition: FairMQChannel.h:157
FairMQChannel::FairMQChannel
FairMQChannel(const std::string &type, const std::string &method, const std::string &address)
FairMQChannel::Receive
int64_t Receive(std::vector< FairMQMessagePtr > &msgVec, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:290
FairMQChannel::GetPortRangeMax
int GetPortRangeMax() const
Definition: FairMQChannel.h:177
FairMQChannel::FairMQChannel
FairMQChannel(const std::string &name)
FairMQChannel::Send
int64_t Send(std::vector< FairMQMessagePtr > &msgVec, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:280
FairMQChannel::GetPortRangeMin
int GetPortRangeMin() const
Definition: FairMQChannel.h:173
FairMQChannel::UpdatePortRangeMin
void UpdatePortRangeMin(const int minPort)
Definition: FairMQChannel.h:229
FairMQChannel::FairMQChannel
FairMQChannel()
Default constructor.
Definition: FairMQChannel.cxx:51
FairMQChannel::GetMethod
std::string GetMethod() const
Definition: FairMQChannel.h:133
FairMQDevice
Definition: FairMQDevice.h:50
FairMQChannel::UpdateType
void UpdateType(const std::string &type)
Definition: FairMQChannel.h:189
FairMQChannel::GetName
std::string GetName() const
Definition: FairMQChannel.h:106
FairMQTransportFactory
Definition: FairMQTransportFactory.h:30
FairMQChannel::GetRcvBufSize
int GetRcvBufSize() const
Definition: FairMQChannel.h:153

privacy