17 #include <FairMQLogger.h>
18 #include <FairMQMessage.h>
28 , fOutputChannelName(
"data-out")
33 , fInputChain(nullptr)
34 , fEventHeader(nullptr)
35 , fDigiBranch(nullptr)
41 , fReadingRootFiles(false)
47 fFileNames = fConfig->GetValue<std::vector<std::string>>(
"file-name");
48 fMaxIndex = fConfig->GetValue<int64_t>(
50 fOutputChannelName = fConfig->GetValue<std::string>(
"out-channel");
51 fAckChannelName = fConfig->GetValue<std::string>(
"ack-channel");
52 fAggregateLevel = fConfig->GetValue<
int>(
"aggregate");
57 if (fAggregateLevel < 1)
60 LOG(info) <<
"FairMQPixAltSamplerBin::PreRun() started. fAggregateLevel = " << fAggregateLevel;
62 fReadingRootFiles =
false;
66 if (fFileNames.size() == 0)
69 if (fFileNames[0].find(
".root") == std::string::npos)
72 fInputChain =
new TChain(
"fairdata");
74 for (
int ifile = 0; ifile < (int)fFileNames.size(); ifile++) {
75 fInputChain->Add(fFileNames[ifile].c_str());
79 fInputChain->SetBranchAddress(
"EventHeader.", &fEventHeader);
80 fInputChain->SetBranchAddress(
"DigiVector", &fDigiArray, &fDigiBranch);
82 LOG(info) <<
"Loaded " << fFileNames.size() <<
" root files. Chain has now " << fInputChain->GetEntriesFast()
84 fReadingRootFiles =
true;
89 if (fReadingRootFiles) {
100 for (
int iaggr = 0; iaggr < fAggregateLevel; iaggr++) {
102 if (fEventCounter == fMaxIndex) {
103 if (parts.Size() > 0) {
104 Send(parts, fOutputChannelName);
109 if (!fInputFile.is_open()) {
110 if (fCurrentFile == (
int)fFileNames.size()) {
111 if (parts.Size() > 0) {
112 Send(parts, fOutputChannelName);
116 fInputFile.open(fFileNames[fCurrentFile], std::fstream::in | std::fstream::binary);
120 if (!fInputFile.is_open()) {
121 LOG(error) <<
"FairMQPixAltSamplerBin::ConditionalRun fInputFile \"" << fFileNames[fCurrentFile]
122 <<
"\" could not be open!";
123 if (parts.Size() > 0) {
124 Send(parts, fOutputChannelName);
132 fInputFile.read((
char*)head,
sizeof(head));
134 if (fInputFile.eof()) {
135 LOG(info) <<
"End of file reached!";
137 if (fCurrentFile == (
int)fFileNames.size()) {
138 if (parts.Size() > 0) {
139 Send(parts, fOutputChannelName);
147 const int constNofData = head[3] * dataSize;
148 short int dataCont[constNofData];
149 fInputFile.read((
char*)dataCont,
sizeof(dataCont));
155 FairMQMessagePtr msgHeader(
159 parts.AddPart(std::move(msgHeader));
163 FairMQMessagePtr msgDigis(NewMessage(digisSize));
167 for (
int idigi = 0; idigi < head[3]; idigi++) {
169 digiPayload[idigi].
fDetectorID = (int)dataCont[idigi * dataSize + 0];
170 digiPayload[idigi].
fFeID = (int)dataCont[idigi * dataSize + 1];
171 digiPayload[idigi].
fCharge = 1.;
172 digiPayload[idigi].
fCol = (int)dataCont[idigi * dataSize + 2];
173 digiPayload[idigi].
fRow = (int)dataCont[idigi * dataSize + 3];
175 parts.AddPart(std::move(msgDigis));
180 Send(parts, fOutputChannelName);
182 if (fInputFile.eof()) {
183 LOG(info) <<
"End of file reached!";
192 if (fEventCounter == fMaxIndex)
196 fInputChain->GetEntry(fEventCounter);
197 fDigiBranch->GetEntry(fEventCounter);
209 parts.AddPart(std::move(msgHeader));
213 FairMQMessagePtr msgDigis(NewMessage(digisSize));
216 for (
int idigi = 0; idigi < (int)fDigiArray->size(); idigi++) {
218 digiPayload[idigi].
fDetectorID = fDigiArray->at(idigi).fDetectorID;
219 digiPayload[idigi].
fFeID = fDigiArray->at(idigi).fFeID;
220 digiPayload[idigi].
fCharge = fDigiArray->at(idigi).fCharge;
221 digiPayload[idigi].
fCol = fDigiArray->at(idigi).fCol;
222 digiPayload[idigi].
fRow = fDigiArray->at(idigi).fRow;
224 parts.AddPart(std::move(msgDigis));
226 Send(parts, fOutputChannelName);
230 if (fInputFile.eof()) {
231 LOG(info) <<
"End of file reached!";
240 if (fAckChannelName !=
"") {
243 LOG(info) <<
"PostRun() finished!";
248 if (fAckChannelName !=
"") {
250 FairMQMessagePtr ack(NewMessage());
251 if (Receive(ack, fAckChannelName) >= 0) {
254 }
while (fNofRecAcks < fMaxIndex / fAggregateLevel);
256 LOG(info) <<
"Acknowledged " << fNofRecAcks <<
" messages (" << fAggregateLevel <<
" events each) out of "
257 << fMaxIndex <<
" events.";
virtual bool ConditionalRun()
virtual ~FairMQPixAltSamplerBin()