8 #ifndef FAIR_MQ_SHMEM_MESSAGE_H_
9 #define FAIR_MQ_SHMEM_MESSAGE_H_
14 #include "UnmanagedRegion.h"
15 #include <FairMQLogger.h>
16 #include <FairMQMessage.h>
17 #include <FairMQUnmanagedRegion.h>
19 #include <boost/interprocess/mapped_region.hpp>
24 #include <sys/types.h>
41 , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
45 fManager.IncrementMsgCounter();
52 , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
53 , fAlignment(alignment.alignment)
57 fManager.IncrementMsgCounter();
61 : fair::mq::Message(factory)
64 , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
68 InitializeChunk(size);
69 fManager.IncrementMsgCounter();
72 Message(Manager& manager,
const size_t size, Alignment alignment,
FairMQTransportFactory* factory =
nullptr)
73 : fair::mq::Message(factory)
76 , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
77 , fAlignment(alignment.alignment)
81 InitializeChunk(size, fAlignment);
82 fManager.IncrementMsgCounter();
85 Message(Manager& manager,
void* data,
const size_t size, fairmq_free_fn* ffn,
void* hint =
nullptr,
FairMQTransportFactory* factory =
nullptr)
86 : fair::mq::Message(factory)
89 , fMeta{0, 0, 0, fManager.GetSegmentId(), -1}
93 if (InitializeChunk(size)) {
94 std::memcpy(fLocalPtr, data, size);
101 fManager.IncrementMsgCounter();
104 Message(Manager& manager, UnmanagedRegionPtr& region,
void* data,
const size_t size,
void* hint = 0,
FairMQTransportFactory* factory =
nullptr)
105 : fair::mq::Message(factory)
108 , fMeta{size,
reinterpret_cast<size_t>(hint),
static_cast<UnmanagedRegion*
>(region.get())->fRegionId, fManager.GetSegmentId(), -1}
109 , fRegionPtr(
nullptr)
110 , fLocalPtr(
static_cast<char*
>(data))
112 if (
reinterpret_cast<const char*
>(data) >=
reinterpret_cast<const char*
>(region->GetData()) &&
113 reinterpret_cast<const char*
>(data) <=
reinterpret_cast<const char*
>(region->GetData()) + region->GetSize()) {
114 fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(
reinterpret_cast<const char*
>(data) -
reinterpret_cast<const char*
>(region->GetData()));
116 LOG(error) <<
"trying to create region message with data from outside the region";
117 throw std::runtime_error(
"trying to create region message with data from outside the region");
119 fManager.IncrementMsgCounter();
123 : fair::mq::Message(factory)
127 , fRegionPtr(
nullptr)
130 fManager.IncrementMsgCounter();
133 Message(
const Message&) =
delete;
134 Message operator=(
const Message&) =
delete;
136 void Rebuild()
override
142 void Rebuild(Alignment alignment)
override
146 fAlignment = alignment.alignment;
149 void Rebuild(
const size_t size)
override
153 InitializeChunk(size);
156 void Rebuild(
const size_t size, Alignment alignment)
override
160 fAlignment = alignment.alignment;
161 InitializeChunk(size, fAlignment);
164 void Rebuild(
void* data,
const size_t size, fairmq_free_fn* ffn,
void* hint =
nullptr)
override
169 if (InitializeChunk(size)) {
170 std::memcpy(fLocalPtr, data, size);
179 void* GetData()
const override
182 if (fMeta.fRegionId == 0) {
183 if (fMeta.fSize > 0) {
184 fManager.GetSegment(fMeta.fSegmentId);
185 fLocalPtr =
reinterpret_cast<char*
>(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId));
190 fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
192 fLocalPtr =
reinterpret_cast<char*
>(fRegionPtr->fRegion.get_address()) + fMeta.fHandle;
203 size_t GetSize()
const override {
return fMeta.fSize; }
205 bool SetUsedSize(
const size_t newSize)
override
207 if (newSize == fMeta.fSize) {
209 }
else if (newSize == 0) {
212 }
else if (newSize <= fMeta.fSize) {
215 fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
216 fMeta.fSize = newSize;
218 }
catch (boost::interprocess::bad_alloc& e) {
222 if (fMeta.fSize - newSize >= 1000000) {
223 char* newPtr = fManager.Allocate(newSize, fAlignment);
225 std::memcpy(newPtr, fLocalPtr, newSize);
226 fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
228 fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
230 LOG(debug) <<
"could not set used size: " << e.what();
234 fMeta.fSize = newSize;
237 }
catch (boost::interprocess::interprocess_exception& e) {
238 LOG(debug) <<
"could not set used size: " << e.what();
242 LOG(error) <<
"cannot set used size higher than original.";
247 Transport GetType()
const override {
return fair::mq::Transport::SHM; }
251 if (fMeta.fHandle < 0) {
252 boost::interprocess::managed_shared_memory::handle_t otherHandle =
static_cast<const Message&
>(msg).fMeta.fHandle;
254 if (InitializeChunk(msg.GetSize())) {
255 std::memcpy(GetData(), msg.GetData(), msg.GetSize());
258 LOG(error) <<
"copy fail: source message not initialized!";
261 LOG(error) <<
"copy fail: target message already initialized!";
269 }
catch(SharedMemoryError& sme) {
270 LOG(error) <<
"error closing message: " << sme.what();
271 }
catch(boost::interprocess::lock_exception& le) {
272 LOG(error) <<
"error closing message: " << le.what();
281 mutable Region* fRegionPtr;
282 mutable char* fLocalPtr;
284 char* InitializeChunk(
const size_t size,
size_t alignment = 0)
286 fLocalPtr = fManager.Allocate(size, alignment);
288 fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
296 if (fMeta.fHandle >= 0 && !fQueued) {
297 if (fMeta.fRegionId == 0) {
298 fManager.GetSegment(fMeta.fSegmentId);
299 fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
303 fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
307 fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
309 LOG(warn) <<
"region ack queue for id " << fMeta.fRegionId <<
" no longer exist. Not sending ack";
322 fManager.DecrementMsgCounter();