FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPixAltSamplerBin.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "FairMQPixAltSamplerBin.h"
16 
17 #include <FairMQLogger.h>
18 #include <FairMQMessage.h>
19 #include <TBranch.h>
20 #include <TChain.h>
21 #include <cstddef>
22 #include <utility>
23 
24 using namespace std;
25 
27  : FairMQDevice()
28  , fOutputChannelName("data-out")
29  , fAckChannelName("")
30  , fFileNames()
31  , fInputFile()
32  , fCurrentFile(0)
33  , fInputChain(nullptr)
34  , fEventHeader(nullptr)
35  , fDigiBranch(nullptr)
36  , fDigiArray(nullptr)
37  , fAggregateLevel(1)
38  , fMaxIndex(-1)
39  , fEventCounter(0)
40  , fNofRecAcks(0)
41  , fReadingRootFiles(false)
42  , fAckListener()
43 {}
44 
46 {
47  fFileNames = fConfig->GetValue<std::vector<std::string>>("file-name");
48  fMaxIndex = fConfig->GetValue<int64_t>(
49  "max-index"); // RK 2818.05.28 int64_t will print "Unknown value", but the value is still passed
50  fOutputChannelName = fConfig->GetValue<std::string>("out-channel");
51  fAckChannelName = fConfig->GetValue<std::string>("ack-channel");
52  fAggregateLevel = fConfig->GetValue<int>("aggregate");
53 }
54 
56 {
57  if (fAggregateLevel < 1)
58  fAggregateLevel = 1;
59 
60  LOG(info) << "FairMQPixAltSamplerBin::PreRun() started. fAggregateLevel = " << fAggregateLevel;
61 
62  fReadingRootFiles = false;
63 
64  fAckListener = std::thread(&FairMQPixAltSamplerBin::ListenForAcks, this);
65 
66  if (fFileNames.size() == 0)
67  return;
68 
69  if (fFileNames[0].find(".root") == std::string::npos)
70  return;
71 
72  fInputChain = new TChain("fairdata");
73 
74  for (int ifile = 0; ifile < (int)fFileNames.size(); ifile++) {
75  fInputChain->Add(fFileNames[ifile].c_str());
76  }
77 
78  fEventHeader = new PixelPayload::EventHeader();
79  fInputChain->SetBranchAddress("EventHeader.", &fEventHeader);
80  fInputChain->SetBranchAddress("DigiVector", &fDigiArray, &fDigiBranch);
81 
82  LOG(info) << "Loaded " << fFileNames.size() << " root files. Chain has now " << fInputChain->GetEntriesFast()
83  << " entries";
84  fReadingRootFiles = true;
85 }
86 
88 {
89  if (fReadingRootFiles) {
90  return ReadRootFile();
91  } else {
92  return ReadBinFile();
93  }
94 }
95 
97 {
98  FairMQParts parts;
99 
100  for (int iaggr = 0; iaggr < fAggregateLevel; iaggr++) {
101 
102  if (fEventCounter == fMaxIndex) { // check if reached event limit
103  if (parts.Size() > 0) {
104  Send(parts, fOutputChannelName);
105  }
106  return false;
107  }
108 
109  if (!fInputFile.is_open()) { // file not there
110  if (fCurrentFile == (int)fFileNames.size()) { // this is last file
111  if (parts.Size() > 0) {
112  Send(parts, fOutputChannelName);
113  }
114  return false;
115  }
116  fInputFile.open(fFileNames[fCurrentFile], std::fstream::in | std::fstream::binary);
117  fCurrentFile++;
118  }
119 
120  if (!fInputFile.is_open()) { // wrong file name
121  LOG(error) << "FairMQPixAltSamplerBin::ConditionalRun fInputFile \"" << fFileNames[fCurrentFile]
122  << "\" could not be open!";
123  if (parts.Size() > 0) {
124  Send(parts, fOutputChannelName);
125  }
126  return false;
127  }
128 
129  std::string buffer;
130 
131  int head[4]; // runId, MCEntryNo, PartNo, NofDigis
132  fInputFile.read((char*)head, sizeof(head));
133 
134  if (fInputFile.eof()) {
135  LOG(info) << "End of file reached!";
136  fInputFile.close();
137  if (fCurrentFile == (int)fFileNames.size()) { // this is the last file
138  if (parts.Size() > 0) {
139  Send(parts, fOutputChannelName);
140  }
141  return false;
142  } else
143  return true;
144  }
145 
146  int dataSize = 4; // detId, feId, col, row
147  const int constNofData = head[3] * dataSize;
148  short int dataCont[constNofData];
149  fInputFile.read((char*)dataCont, sizeof(dataCont));
150 
152  header->fRunId = head[0];
153  header->fMCEntryNo = head[1];
154  header->fPartNo = head[2];
155  FairMQMessagePtr msgHeader(
156  NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
157  delete static_cast<PixelPayload::EventHeader*>(data);
158  }));
159  parts.AddPart(std::move(msgHeader));
160 
161  size_t digisSize = head[3] * sizeof(PixelPayload::Digi);
162 
163  FairMQMessagePtr msgDigis(NewMessage(digisSize));
164 
165  PixelPayload::Digi* digiPayload = static_cast<PixelPayload::Digi*>(msgDigis->GetData());
166 
167  for (int idigi = 0; idigi < head[3]; idigi++) {
168  new (&digiPayload[idigi]) PixelPayload::Digi();
169  digiPayload[idigi].fDetectorID = (int)dataCont[idigi * dataSize + 0];
170  digiPayload[idigi].fFeID = (int)dataCont[idigi * dataSize + 1];
171  digiPayload[idigi].fCharge = 1.;
172  digiPayload[idigi].fCol = (int)dataCont[idigi * dataSize + 2];
173  digiPayload[idigi].fRow = (int)dataCont[idigi * dataSize + 3];
174  }
175  parts.AddPart(std::move(msgDigis));
176 
177  fEventCounter++;
178  }
179 
180  Send(parts, fOutputChannelName);
181 
182  if (fInputFile.eof()) {
183  LOG(info) << "End of file reached!";
184  fInputFile.close();
185  }
186 
187  return true;
188 }
189 
191 {
192  if (fEventCounter == fMaxIndex)
193  return false;
194 
195  // fill the input data containers
196  fInputChain->GetEntry(fEventCounter);
197  fDigiBranch->GetEntry(fEventCounter);
198 
199  // create output multipart message
200  FairMQParts parts;
201 
203  header->fRunId = fEventHeader->fRunId;
204  header->fMCEntryNo = fEventHeader->fMCEntryNo;
205  header->fPartNo = fEventHeader->fPartNo;
206  FairMQMessagePtr msgHeader(NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
207  delete static_cast<PixelPayload::EventHeader*>(data);
208  }));
209  parts.AddPart(std::move(msgHeader));
210 
211  size_t digisSize = sizeof(PixelPayload::Digi) * fDigiArray->size();
212 
213  FairMQMessagePtr msgDigis(NewMessage(digisSize));
214  PixelPayload::Digi* digiPayload = static_cast<PixelPayload::Digi*>(msgDigis->GetData());
215 
216  for (int idigi = 0; idigi < (int)fDigiArray->size(); idigi++) {
217  new (&digiPayload[idigi]) PixelPayload::Digi();
218  digiPayload[idigi].fDetectorID = fDigiArray->at(idigi).fDetectorID;
219  digiPayload[idigi].fFeID = fDigiArray->at(idigi).fFeID;
220  digiPayload[idigi].fCharge = fDigiArray->at(idigi).fCharge;
221  digiPayload[idigi].fCol = fDigiArray->at(idigi).fCol;
222  digiPayload[idigi].fRow = fDigiArray->at(idigi).fRow;
223  }
224  parts.AddPart(std::move(msgDigis));
225 
226  Send(parts, fOutputChannelName);
227 
228  fEventCounter++;
229 
230  if (fInputFile.eof()) {
231  LOG(info) << "End of file reached!";
232  fInputFile.close();
233  }
234 
235  return true;
236 }
237 
239 {
240  if (fAckChannelName != "") {
241  fAckListener.join();
242  }
243  LOG(info) << "PostRun() finished!";
244 }
245 
247 {
248  if (fAckChannelName != "") {
249  do {
250  FairMQMessagePtr ack(NewMessage());
251  if (Receive(ack, fAckChannelName) >= 0) {
252  fNofRecAcks++;
253  }
254  } while (fNofRecAcks < fMaxIndex / fAggregateLevel);
255 
256  LOG(info) << "Acknowledged " << fNofRecAcks << " messages (" << fAggregateLevel << " events each) out of "
257  << fMaxIndex << " events.";
258  }
259 }
260