FairMQ  1.4.33
C++ Message Queuing Library and Framework
Socket.h
1 /********************************************************************************
2  * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_OFI_SOCKET_H
10 #define FAIR_MQ_OFI_SOCKET_H
11 
12 #include <FairMQSocket.h>
13 #include <FairMQMessage.h>
14 #include <fairmq/ofi/Context.h>
15 #include <fairmq/ofi/ControlMessages.h>
16 
17 #include <asiofi/connected_endpoint.hpp>
18 #include <asiofi/memory_resources.hpp>
19 #include <asiofi/passive_endpoint.hpp>
20 #include <asiofi/semaphore.hpp>
21 #include <boost/asio.hpp>
22 #include <memory> // unique_ptr
23 #include <mutex>
24 
25 
26 namespace fair::mq::ofi
27 {
28 
35 class Socket final : public fair::mq::Socket
36 {
37  public:
38  Socket(Context& context, const std::string& type, const std::string& name, const std::string& id = "");
39  Socket(const Socket&) = delete;
40  Socket operator=(const Socket&) = delete;
41 
42  auto GetId() const -> std::string override { return fId; }
43 
44  auto Events(uint32_t *events) -> void override { *events = 0; }
45  auto Bind(const std::string& address) -> bool override;
46  auto Connect(const std::string& address) -> bool override;
47 
48  auto Send(MessagePtr& msg, int timeout = 0) -> int64_t override;
49  auto Receive(MessagePtr& msg, int timeout = 0) -> int64_t override;
50  auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
51  auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
52 
53  auto GetSocket() const -> void* { return nullptr; }
54 
55  void SetLinger(const int value) override;
56  int GetLinger() const override;
57  void SetSndBufSize(const int value) override;
58  int GetSndBufSize() const override;
59  void SetRcvBufSize(const int value) override;
60  int GetRcvBufSize() const override;
61  void SetSndKernelSize(const int value) override;
62  int GetSndKernelSize() const override;
63  void SetRcvKernelSize(const int value) override;
64  int GetRcvKernelSize() const override;
65 
66  auto Close() -> void override;
67 
68  auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override;
69  auto GetOption(const std::string& option, void* value, size_t* valueSize) -> void override;
70 
71  auto GetBytesTx() const -> unsigned long override { return fBytesTx; }
72  auto GetBytesRx() const -> unsigned long override { return fBytesRx; }
73  auto GetMessagesTx() const -> unsigned long override { return fMessagesTx; }
74  auto GetMessagesRx() const -> unsigned long override { return fMessagesRx; }
75 
76  static auto GetConstant(const std::string& constant) -> int;
77 
78  ~Socket() override;
79 
80  private:
81  Context& fContext;
82  asiofi::allocated_pool_resource fControlMemPool;
83  std::unique_ptr<asiofi::info> fOfiInfo;
84  std::unique_ptr<asiofi::fabric> fOfiFabric;
85  std::unique_ptr<asiofi::domain> fOfiDomain;
86  std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
87  std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
88  std::string fId;
89  std::atomic<unsigned long> fBytesTx;
90  std::atomic<unsigned long> fBytesRx;
91  std::atomic<unsigned long> fMessagesTx;
92  std::atomic<unsigned long> fMessagesRx;
93  Address fRemoteAddr;
94  Address fLocalAddr;
95  int fSndTimeout;
96  int fRcvTimeout;
97  std::mutex fSendQueueMutex, fRecvQueueMutex;
98  std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
99  std::vector<MessagePtr> fInflightMultiPartMessage;
100  int64_t fMultiPartRecvCounter;
101  asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
102  std::atomic<bool> fNeedOfiMemoryRegistration;
103 
104  auto InitOfi(Address addr) -> void;
105  auto BindControlEndpoint() -> void;
106  auto BindDataEndpoint() -> void;
107  enum class Band { Control, Data };
108  auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
109  auto SendQueueReader() -> void;
110  auto SendQueueReaderStatic() -> void;
111  auto RecvControlQueueReader() -> void;
112  auto RecvQueueReaderStatic() -> void;
113  auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
114  auto DataMessageReceived(MessagePtr msg) -> void;
115 }; /* class Socket */
116 
117 struct SilentSocketError : SocketError { using SocketError::SocketError; };
118 
119 } // namespace fair::mq::ofi
120 
121 #endif /* FAIR_MQ_OFI_SOCKET_H */
FairMQSocket
Definition: FairMQSocket.h:36
fair::mq::ofi::Socket::Events
auto Events(uint32_t *events) -> void override
Definition: Socket.h:50

privacy