9 #ifndef FAIR_MQ_ZMQ_MESSAGE_H
10 #define FAIR_MQ_ZMQ_MESSAGE_H
12 #include <fairmq/zeromq/UnmanagedRegion.h>
13 #include <FairMQLogger.h>
14 #include <FairMQMessage.h>
15 #include <FairMQUnmanagedRegion.h>
26 namespace fair::mq::zmq
39 , fMsg(std::make_unique<zmq_msg_t>())
41 if (zmq_msg_init(fMsg.get()) != 0) {
42 LOG(error) <<
"failed initializing message, reason: " << zmq_strerror(errno);
48 , fAlignment(alignment.alignment)
49 , fMsg(std::make_unique<zmq_msg_t>())
51 if (zmq_msg_init(fMsg.get()) != 0) {
52 LOG(error) <<
"failed initializing message, reason: " << zmq_strerror(errno);
57 : fair::mq::Message(factory)
59 , fMsg(std::make_unique<zmq_msg_t>())
61 if (zmq_msg_init_size(fMsg.get(), size) != 0) {
62 LOG(error) <<
"failed initializing message with size, reason: " << zmq_strerror(errno);
66 static std::pair<void*, void*> AllocateAligned(
size_t size,
size_t alignment)
68 char* fullBufferPtr =
static_cast<char*
>(malloc(size + alignment));
70 LOG(error) <<
"failed to allocate buffer with provided size (" << size <<
") and alignment (" << alignment <<
").";
71 throw std::bad_alloc();
74 size_t offset = alignment - (
reinterpret_cast<uintptr_t
>(fullBufferPtr) % alignment);
75 char* alignedPartPtr = fullBufferPtr + offset;
77 return {
static_cast<void*
>(fullBufferPtr),
static_cast<void*
>(alignedPartPtr)};
81 : fair::mq::Message(factory)
82 , fAlignment(alignment.alignment)
83 , fMsg(std::make_unique<zmq_msg_t>())
85 if (fAlignment != 0) {
86 auto ptrs = AllocateAligned(size, fAlignment);
87 if (zmq_msg_init_data(fMsg.get(), ptrs.second, size, [](
void* ,
void* hint) { free(hint); }, ptrs.first) != 0) {
88 LOG(error) <<
"failed initializing message with size, reason: " << zmq_strerror(errno);
91 if (zmq_msg_init_size(fMsg.get(), size) != 0) {
92 LOG(error) <<
"failed initializing message with size, reason: " << zmq_strerror(errno);
97 Message(
void* data,
const size_t size, fairmq_free_fn* ffn,
void* hint =
nullptr,
FairMQTransportFactory* factory =
nullptr)
98 : fair::mq::Message(factory)
100 , fMsg(std::make_unique<zmq_msg_t>())
102 if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) {
103 LOG(error) <<
"failed initializing message with data, reason: " << zmq_strerror(errno);
107 Message(UnmanagedRegionPtr& region,
void* data,
const size_t size,
void* hint = 0,
FairMQTransportFactory* factory =
nullptr)
108 : fair::mq::Message(factory)
110 , fMsg(std::make_unique<zmq_msg_t>())
115 if (zmq_msg_init_size(fMsg.get(), size) != 0) {
116 LOG(error) <<
"failed initializing message with size, reason: " << zmq_strerror(errno);
119 std::memcpy(zmq_msg_data(fMsg.get()), data, size);
121 auto ptr =
static_cast<UnmanagedRegion*
>(region.get());
122 if (ptr->fBulkCallback) {
123 ptr->fBulkCallback({{data, size, hint}});
124 }
else if (ptr->fCallback) {
125 ptr->fCallback(data, size, hint);
135 void Rebuild()
override
138 fMsg = std::make_unique<zmq_msg_t>();
139 if (zmq_msg_init(fMsg.get()) != 0) {
140 LOG(error) <<
"failed initializing message, reason: " << zmq_strerror(errno);
144 void Rebuild(Alignment alignment)
override
147 fAlignment = alignment.alignment;
148 fMsg = std::make_unique<zmq_msg_t>();
149 if (zmq_msg_init(fMsg.get()) != 0) {
150 LOG(error) <<
"failed initializing message, reason: " << zmq_strerror(errno);
154 void Rebuild(
const size_t size)
override
157 fMsg = std::make_unique<zmq_msg_t>();
158 if (zmq_msg_init_size(fMsg.get(), size) != 0) {
159 LOG(error) <<
"failed initializing message with size, reason: " << zmq_strerror(errno);
163 void Rebuild(
const size_t size, Alignment alignment)
override
166 fAlignment = alignment.alignment;
167 fMsg = std::make_unique<zmq_msg_t>();
169 if (fAlignment != 0) {
170 auto ptrs = AllocateAligned(size, fAlignment);
171 if (zmq_msg_init_data(fMsg.get(), ptrs.second, size, [](
void* ,
void* hint) { free(hint); }, ptrs.first) != 0) {
172 LOG(error) <<
"failed initializing message with size, reason: " << zmq_strerror(errno);
175 if (zmq_msg_init_size(fMsg.get(), size) != 0) {
176 LOG(error) <<
"failed initializing message with size, reason: " << zmq_strerror(errno);
181 void Rebuild(
void* data,
const size_t size, fairmq_free_fn* ffn,
void* hint =
nullptr)
override
184 fMsg = std::make_unique<zmq_msg_t>();
185 if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) {
186 LOG(error) <<
"failed initializing message with data, reason: " << zmq_strerror(errno);
190 void* GetData()
const override
192 if (zmq_msg_size(fMsg.get()) > 0) {
193 return zmq_msg_data(fMsg.get());
199 size_t GetSize()
const override {
return zmq_msg_size(fMsg.get()); }
206 bool SetUsedSize(
const size_t size)
override
208 if (size == GetSize()) {
211 }
else if (size > GetSize()) {
212 LOG(error) <<
"cannot set used size higher than original.";
215 auto newMsg = std::make_unique<zmq_msg_t>();
216 void* data = GetData();
217 if (zmq_msg_init_data(newMsg.get(), data, size, [](
void* ,
void* obj) {
218 zmq_msg_close(static_cast<zmq_msg_t*>(obj));
219 delete static_cast<zmq_msg_t*>(obj);
220 }, fMsg.get()) != 0) {
221 LOG(error) <<
"failed initializing message with data, reason: " << zmq_strerror(errno);
233 if (fAlignment != 0) {
234 void* data = GetData();
235 size_t size = GetSize();
237 if (data !=
nullptr &&
reinterpret_cast<uintptr_t
>(GetData()) % fAlignment) {
239 auto ptrs = AllocateAligned(size, fAlignment);
240 std::memcpy(ptrs.second, zmq_msg_data(fMsg.get()), size);
242 Rebuild(ptrs.second, size, [](
void* ,
void* hint) { free(hint); }, ptrs.first);
247 Transport GetType()
const override {
return Transport::ZMQ; }
251 const Message& zMsg =
static_cast<const Message&
>(msg);
253 if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0) {
254 LOG(error) <<
"failed copying message, reason: " << zmq_strerror(errno);
259 ~Message()
override { CloseMessage(); }
263 std::unique_ptr<zmq_msg_t> fMsg;
265 zmq_msg_t* GetMessage()
const {
return fMsg.get(); }
269 if (zmq_msg_close(fMsg.get()) != 0) {
270 LOG(error) <<
"failed closing message, reason: " << zmq_strerror(errno);