FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPixelSamplerBin.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "FairMQPixelSamplerBin.h"
16 
17 #include "FairEventHeader.h"
18 #include "FairFileSource.h"
19 #include "FairRunAna.h"
20 #include "PixelDigi.h"
21 #include "PixelPayload.h"
22 
23 #include <FairMQLogger.h>
24 #include <Rtypes.h> // for Int_t, Long64_t
25 #include <TClonesArray.h>
26 #include <TObject.h>
27 #include <cstring>
28 #include <utility> // move
29 
30 using namespace std;
31 
33  : FairMQDevice()
34  , fOutputChannelName("data-out")
35  , fAckChannelName("")
36  , fRunAna(nullptr)
37  , fSource(nullptr)
38  , fInputObjects()
39  , fNObjects(0)
40  , fMaxIndex(-1)
41  , fEventCounter(0)
42  , fBranchNames()
43  , fFileNames()
44  , fAckListener()
45 {}
46 
48 {
49  fFileNames = fConfig->GetValue<std::vector<std::string>>("file-name");
50  fMaxIndex = fConfig->GetValue<int64_t>("max-index");
51  fBranchNames = fConfig->GetValue<std::vector<std::string>>("branch-name");
52  fOutputChannelName = fConfig->GetValue<std::string>("out-channel");
53  fAckChannelName = fConfig->GetValue<std::string>("ack-channel");
54 
55  fRunAna = new FairRunAna();
56  if (fFileNames.size() > 0) {
57  fSource = new FairFileSource(fFileNames.at(0).c_str());
58  for (unsigned int ifile = 1; ifile < fFileNames.size(); ifile++)
59  fSource->AddFile(fFileNames.at(ifile));
60  }
61  fSource->Init();
62  LOG(info) << "Going to request " << fBranchNames.size() << " branches:";
63  for (unsigned int ibrn = 0; ibrn < fBranchNames.size(); ibrn++) {
64  LOG(info) << " requesting branch \"" << fBranchNames[ibrn] << "\"";
65  int branchStat = fSource->ActivateObject((TObject**)&fInputObjects[fNObjects],
66  fBranchNames[ibrn].c_str()); // should check the status...
67  if (fInputObjects[fNObjects]) {
68  LOG(info) << "Activated object \"" << fInputObjects[fNObjects] << "\" with name \"" << fBranchNames[ibrn]
69  << "\" (" << branchStat << ")";
70  fNObjects++;
71  }
72  }
73  if (fMaxIndex < 0)
74  fMaxIndex = fSource->CheckMaxEventNo();
75  LOG(info) << "Input source has " << fMaxIndex << " events.";
76 }
77 
79 {
80  LOG(info) << "FairMQPixelSampler::PreRun() started!";
81 
82  fAckListener = thread(&FairMQPixelSamplerBin::ListenForAcks, this);
83 }
84 
86 {
87  if (fEventCounter == fMaxIndex)
88  return false;
89 
90  int readEventReturn = 0;
91  readEventReturn = fSource->ReadEvent(fEventCounter);
92 
93  if (readEventReturn != 0)
94  return false;
95 
96  FairMQParts parts;
97 
98  for (int iobj = 0; iobj < fNObjects; iobj++) {
99  if (strcmp(fInputObjects[iobj]->GetName(), "EventHeader.") == 0) {
101  header->fRunId = ((FairEventHeader*)fInputObjects[iobj])->GetRunId();
102  header->fMCEntryNo = ((FairEventHeader*)fInputObjects[iobj])->GetMCEntryNumber();
103  header->fPartNo = 0;
104  FairMQMessagePtr msgHeader(
105  NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
106  delete static_cast<PixelPayload::EventHeader*>(data);
107  }));
108  parts.AddPart(std::move(msgHeader));
109  // LOG(debug) << "-----------------------------";
110  // LOG(debug) << "first part has size = " << sizeof(PixelPayload::EventHeader);
111  } else {
112  Int_t nofEntries = ((TClonesArray*)fInputObjects[iobj])->GetEntries();
113  size_t digisSize = nofEntries * sizeof(PixelPayload::Digi);
114 
115  FairMQMessagePtr msgTCA(NewMessage(digisSize));
116 
117  PixelPayload::Digi* digiPayload = static_cast<PixelPayload::Digi*>(msgTCA->GetData());
118 
119  for (int idigi = 0; idigi < nofEntries; idigi++) {
120  PixelDigi* digi = static_cast<PixelDigi*>(((TClonesArray*)fInputObjects[iobj])->At(idigi));
121  if (!digi) {
122  continue;
123  }
124  new (&digiPayload[idigi]) PixelPayload::Digi();
125  digiPayload[idigi].fDetectorID = digi->GetDetectorID();
126  digiPayload[idigi].fFeID = digi->GetFeID();
127  digiPayload[idigi].fCharge = digi->GetCharge();
128  digiPayload[idigi].fCol = digi->GetCol();
129  digiPayload[idigi].fRow = digi->GetRow();
130  }
131  // LOG(debug) << "second part has size = " << digisSize;
132  parts.AddPart(std::move(msgTCA));
133  }
134  }
135 
136  // LOG(debug) << "sending data with " << parts.Size() << " parts";
137  Send(parts, fOutputChannelName);
138 
139  fEventCounter++;
140 
141  return true;
142 }
143 
145 {
146  if (fAckChannelName != "") {
147  fAckListener.join();
148  }
149 
150  LOG(info) << "PostRun() finished!";
151 }
152 
154 {
155  if (fAckChannelName != "") {
156  Long64_t numAcks = 0;
157  do {
158  FairMQMessagePtr ack(NewMessage());
159  if (Receive(ack, fAckChannelName) >= 0) {
160  numAcks++;
161  }
162  } while (numAcks < fMaxIndex);
163 
164  LOG(info) << "Acknowledged " << numAcks << " messages.";
165  }
166 }
167 
void AddFile(TString FileName)
virtual Int_t CheckMaxEventNo(Int_t EvtEnd=0)
Int_t ReadEvent(UInt_t i=0)
Int_t GetRow()
Definition: PixelDigi.h:49
virtual Bool_t ActivateObject(TObject **obj, const char *BrName)
Int_t GetDetectorID()
Definition: PixelDigi.h:45
Int_t GetCol()
Definition: PixelDigi.h:48
Double_t GetCharge()
Definition: PixelDigi.h:47
Int_t GetFeID()
Definition: PixelDigi.h:46