15 #ifndef FAIR_MQ_SHMEM_REGION_H_
16 #define FAIR_MQ_SHMEM_REGION_H_
20 #include <FairMQLogger.h>
21 #include <FairMQUnmanagedRegion.h>
22 #include <fairmq/tools/Strings.h>
24 #include <boost/filesystem.hpp>
25 #include <boost/process.hpp>
26 #include <boost/date_time/posix_time/posix_time.hpp>
27 #include <boost/interprocess/managed_shared_memory.hpp>
28 #include <boost/interprocess/file_mapping.hpp>
29 #include <boost/interprocess/ipc/message_queue.hpp>
36 #include <condition_variable>
37 #include <unordered_map>
47 Region(
const std::string& shmId, uint16_t
id, uint64_t size,
bool remote, RegionCallback callback, RegionBulkCallback bulkCallback,
const std::string& path,
int flags)
51 , fName(
"fmq_" + shmId +
"_rg_" + std::to_string(id))
52 , fQueueName(
"fmq_" + shmId +
"_rgq_" + std::to_string(id))
60 , fBulkCallback(bulkCallback)
62 using namespace boost::interprocess;
65 fName = std::string(path + fName);
70 if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) {
72 fbuf.pubseekoff(size - 1, std::ios_base::beg);
77 fFile = fopen(fName.c_str(),
"r+");
80 LOG(error) <<
"Failed to initialize file: " << fName;
81 LOG(error) <<
"errno: " << errno <<
": " << strerror(errno);
82 throw std::runtime_error(tools::ToString(
"Failed to initialize file for shared memory region: ", strerror(errno)));
84 fFileMapping = file_mapping(fName.c_str(), read_write);
85 LOG(debug) <<
"shmem: initialized file: " << fName;
86 fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
89 fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
91 fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
92 fShmemObject.truncate(size);
94 fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
99 LOG(debug) <<
"shmem: initialized region: " << fName;
107 void InitializeQueues()
109 using namespace boost::interprocess;
112 fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
114 fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize *
sizeof(
RegionBlock));
116 LOG(debug) <<
"shmem: initialized region queue: " << fQueueName;
119 void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks,
this); }
122 std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
123 size_t blocksToSend = 0;
128 std::unique_lock<std::mutex> lock(fBlockMtx);
131 if (fBlocksToFree.size() < fAckBunchSize) {
132 fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
136 blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
138 copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
139 fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
142 if (blocksToSend > 0) {
143 while (!fQueue->try_send(blocks.get(), blocksToSend *
sizeof(RegionBlock), 0) && !fStop) {
145 std::this_thread::yield();
155 LOG(trace) <<
"AcksSender for " << fName <<
" leaving " <<
"(blocks left to free: " << fBlocksToFree.size() <<
", "
156 <<
" blocks left to send: " << blocksToSend <<
").";
159 void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks,
this); }
162 unsigned int priority;
163 boost::interprocess::message_queue::size_type recvdSize;
164 std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
165 std::vector<fair::mq::RegionBlock> result;
166 result.reserve(fAckBunchSize);
169 uint32_t timeout = 100;
175 auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout);
177 while (fQueue->timed_receive(blocks.get(), fAckBunchSize *
sizeof(RegionBlock), recvdSize, priority, rcvTill)) {
178 const auto numBlocks = recvdSize /
sizeof(RegionBlock);
182 for (
size_t i = 0; i < numBlocks; i++) {
183 result.emplace_back(
reinterpret_cast<char*
>(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize,
reinterpret_cast<void*
>(blocks[i].fHint));
185 fBulkCallback(result);
186 }
else if (fCallback) {
187 for (
size_t i = 0; i < numBlocks; i++) {
188 fCallback(
reinterpret_cast<char*
>(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize,
reinterpret_cast<void*
>(blocks[i].fHint));
198 LOG(trace) <<
"AcksReceiver for " << fName <<
" leaving (remaining queue size: " << fQueue->get_num_msg() <<
").";
201 void ReleaseBlock(
const RegionBlock& block)
203 std::unique_lock<std::mutex> lock(fBlockMtx);
205 fBlocksToFree.emplace_back(block);
207 if (fBlocksToFree.size() >= fAckBunchSize) {
209 fBlockSendCV.notify_one();
213 void SetLinger(uint32_t linger) { fLinger = linger; }
214 uint32_t GetLinger()
const {
return fLinger; }
220 if (fAcksSender.joinable()) {
221 fBlockSendCV.notify_one();
226 if (fAcksReceiver.joinable()) {
227 fAcksReceiver.join();
230 if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
231 LOG(debug) <<
"Region '" << fName <<
"' destroyed.";
234 if (boost::interprocess::file_mapping::remove(fName.c_str())) {
235 LOG(debug) <<
"File mapping '" << fName <<
"' destroyed.";
242 if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
243 LOG(debug) <<
"Region queue '" << fQueueName <<
"' destroyed.";
247 LOG(debug) <<
"Region queue '" << fQueueName <<
"' is remote, no cleanup necessary";
250 LOG(debug) <<
"Region '" << fName <<
"' (" << (fRemote ?
"remote" :
"local") <<
") destructed.";
255 std::atomic<bool> fStop;
257 std::string fQueueName;
258 boost::interprocess::shared_memory_object fShmemObject;
260 boost::interprocess::file_mapping fFileMapping;
261 boost::interprocess::mapped_region fRegion;
263 std::mutex fBlockMtx;
264 std::condition_variable fBlockSendCV;
265 std::vector<RegionBlock> fBlocksToFree;
266 const std::size_t fAckBunchSize = 256;
267 std::unique_ptr<boost::interprocess::message_queue> fQueue;
269 std::thread fAcksReceiver;
270 std::thread fAcksSender;
271 RegionCallback fCallback;
272 RegionBulkCallback fBulkCallback;