9 #ifndef FAIR_MQ_OFI_SOCKET_H
10 #define FAIR_MQ_OFI_SOCKET_H
12 #include <FairMQSocket.h>
13 #include <FairMQMessage.h>
14 #include <fairmq/ofi/Context.h>
15 #include <fairmq/ofi/ControlMessages.h>
17 #include <asiofi/connected_endpoint.hpp>
18 #include <asiofi/memory_resources.hpp>
19 #include <asiofi/passive_endpoint.hpp>
20 #include <asiofi/semaphore.hpp>
21 #include <boost/asio.hpp>
26 namespace fair::mq::ofi
38 Socket(Context& context,
const std::string& type,
const std::string& name,
const std::string&
id =
"");
39 Socket(
const Socket&) =
delete;
40 Socket operator=(
const Socket&) =
delete;
42 auto GetId() const -> std::
string override {
return fId; }
44 auto Events(uint32_t *events) ->
void override { *events = 0; }
45 auto Bind(
const std::string& address) ->
bool override;
46 auto Connect(
const std::string& address) ->
bool override;
48 auto Send(MessagePtr& msg,
int timeout = 0) -> int64_t
override;
49 auto Receive(MessagePtr& msg,
int timeout = 0) -> int64_t
override;
50 auto Send(std::vector<MessagePtr>& msgVec,
int timeout = 0) -> int64_t
override;
51 auto Receive(std::vector<MessagePtr>& msgVec,
int timeout = 0) -> int64_t
override;
53 auto GetSocket() const ->
void* {
return nullptr; }
55 void SetLinger(
const int value)
override;
56 int GetLinger()
const override;
57 void SetSndBufSize(
const int value)
override;
58 int GetSndBufSize()
const override;
59 void SetRcvBufSize(
const int value)
override;
60 int GetRcvBufSize()
const override;
61 void SetSndKernelSize(
const int value)
override;
62 int GetSndKernelSize()
const override;
63 void SetRcvKernelSize(
const int value)
override;
64 int GetRcvKernelSize()
const override;
66 auto Close() ->
void override;
68 auto SetOption(
const std::string& option,
const void* value,
size_t valueSize) ->
void override;
69 auto GetOption(
const std::string& option,
void* value,
size_t* valueSize) ->
void override;
71 auto GetBytesTx() const ->
unsigned long override {
return fBytesTx; }
72 auto GetBytesRx() const ->
unsigned long override {
return fBytesRx; }
73 auto GetMessagesTx() const ->
unsigned long override {
return fMessagesTx; }
74 auto GetMessagesRx() const ->
unsigned long override {
return fMessagesRx; }
76 static auto GetConstant(
const std::string& constant) -> int;
82 asiofi::allocated_pool_resource fControlMemPool;
83 std::unique_ptr<asiofi::info> fOfiInfo;
84 std::unique_ptr<asiofi::fabric> fOfiFabric;
85 std::unique_ptr<asiofi::domain> fOfiDomain;
86 std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
87 std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
89 std::atomic<unsigned long> fBytesTx;
90 std::atomic<unsigned long> fBytesRx;
91 std::atomic<unsigned long> fMessagesTx;
92 std::atomic<unsigned long> fMessagesRx;
97 std::mutex fSendQueueMutex, fRecvQueueMutex;
98 std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
99 std::vector<MessagePtr> fInflightMultiPartMessage;
100 int64_t fMultiPartRecvCounter;
101 asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
102 std::atomic<bool> fNeedOfiMemoryRegistration;
104 auto InitOfi(Address addr) -> void;
105 auto BindControlEndpoint() -> void;
106 auto BindDataEndpoint() -> void;
107 enum class Band { Control, Data };
108 auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
109 auto SendQueueReader() -> void;
110 auto SendQueueReaderStatic() -> void;
111 auto RecvControlQueueReader() -> void;
112 auto RecvQueueReaderStatic() -> void;
113 auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
114 auto DataMessageReceived(MessagePtr msg) -> void;
117 struct SilentSocketError : SocketError {
using SocketError::SocketError; };