FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPixelTaskProcessorBin.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQPIXELTASKPROCESSORBIN_H_
10 #define FAIRMQPIXELTASKPROCESSORBIN_H_
11 
12 #include "FairEventHeader.h"
13 #include "FairGeoParSet.h"
14 #include "FairParGenericSet.h"
15 #include "PixelDigi.h"
16 #include "PixelHit.h"
17 #include "PixelPayload.h"
18 #include "RootSerializer.h"
19 
20 #include <FairMQDevice.h>
21 #include <FairMQParts.h>
22 #include <TClonesArray.h>
23 #include <TList.h>
24 #include <string>
25 
26 template<typename T>
27 class FairMQPixelTaskProcessorBin : public FairMQDevice
28 {
29  public:
31  : fInputChannelName("data-in")
32  , fOutputChannelName("data-out")
33  , fParamChannelName("param")
34  , fEventHeader(nullptr)
35  , fInput(nullptr)
36  , fOutput(nullptr)
37  , fInputArray(nullptr)
38  , fOutputArray(nullptr)
39  , fStaticParameters(false)
40  , fNewRunId(1)
41  , fCurrentRunId(-1)
42  , fDataToKeep("")
43  , fReceivedMsgs(0)
44  , fSentMsgs(0)
45  , fFairTask(nullptr)
46  , fParCList(nullptr)
47  , fGeoPar(nullptr)
48  {}
49 
51  {
52  delete fGeoPar;
53  fGeoPar = nullptr;
54  delete fInput;
55  fInput = nullptr;
56  delete fOutput;
57  fOutput = nullptr;
58  delete fInputArray;
59  fInputArray = nullptr;
60  delete fOutputArray;
61  fOutputArray = nullptr;
62  delete fFairTask;
63  }
64 
65  void SetDataToKeep(const std::string& str) { fDataToKeep = str; }
66 
67  void SetInputChannelName(const std::string& str) { fInputChannelName = str; }
68  void SetOutputChannelName(const std::string& str) { fOutputChannelName = str; }
69  void SetParamChannelName(const std::string& str) { fParamChannelName = str; }
70 
71  void SetStaticParameters(bool tbool) { fStaticParameters = tbool; }
72 
73  protected:
74  bool ProcessData(FairMQParts& parts, int)
75  {
76  // LOG(debug) << "message received with " << parts.Size() << " parts!";
77  fReceivedMsgs++;
78 
79  if (parts.Size() == 0)
80  return 0; // probably impossible, but still check
81 
82  // the first part should be the event header
83  PixelPayload::EventHeader* payloadE = static_cast<PixelPayload::EventHeader*>(parts.At(0)->GetData());
84  // LOG(debug) << "GOT EVENT " << payloadE->fMCEntryNo << " OF RUN " << payloadE->fRunId << " (part " <<
85  // payloadE->fPartNo << ")";
86 
87  if (fStaticParameters == false || fCurrentRunId == -1) {
88  fNewRunId = payloadE->fRunId;
89  if (fNewRunId != fCurrentRunId) {
90  fCurrentRunId = fNewRunId;
91  UpdateParameters();
92  fFairTask->InitMQ(fParCList);
93 
94  LOG(info) << "Parameters updated, back to ProcessData(" << parts.Size() << " parts!)";
95  }
96  }
97 
98  // the second part should the TClonesArray with necessary data... now assuming Digi
99  PixelPayload::Digi* payloadD = static_cast<PixelPayload::Digi*>(parts.At(1)->GetData());
100  int digiArraySize = parts.At(1)->GetSize();
101  int nofDigis = digiArraySize / sizeof(PixelPayload::Digi);
102 
103  fInputArray->Clear();
104  for (int idigi = 0; idigi < nofDigis; idigi++) {
105  new ((*fInputArray)[idigi]) PixelDigi(-1,
106  payloadD[idigi].fDetectorID,
107  payloadD[idigi].fFeID,
108  payloadD[idigi].fCol,
109  payloadD[idigi].fRow,
110  payloadD[idigi].fCharge);
111  }
112 
113  // LOG(debug) << " EVENT HAS " << nofDigis << " DIGIS!!!";
114 
115  // Execute hit finder task
116  fOutput->Clear();
117  // LOG(info) << " The blocking line... analyzing event " << fEventHeader->GetMCEntryNumber();
118  fFairTask->ExecMQ(fInput, fOutput);
119 
120  FairMQParts partsOut;
121 
123  header->fRunId = payloadE->fRunId;
124  header->fMCEntryNo = payloadE->fMCEntryNo;
125  header->fPartNo = payloadE->fPartNo;
126 
127  FairMQMessagePtr msgHeader(NewMessage(
128  header,
130  [](void* data, void* /*hint*/) { delete static_cast<PixelPayload::EventHeader*>(data); },
131  nullptr));
132  partsOut.AddPart(std::move(msgHeader));
133 
134  for (int iobj = 0; iobj < fOutput->GetEntries(); iobj++) {
135  if (strcmp(fOutput->At(iobj)->GetName(), "PixelHits") == 0) {
136  Int_t nofEntries = ((TClonesArray*)fOutput->At(iobj))->GetEntries();
137  size_t hitsSize = nofEntries * sizeof(PixelPayload::Hit);
138 
139  FairMQMessagePtr msgTCA(NewMessage(hitsSize));
140 
141  PixelPayload::Hit* hitPayload = static_cast<PixelPayload::Hit*>(msgTCA->GetData());
142 
143  for (int ihit = 0; ihit < nofEntries; ihit++) {
144  PixelHit* hit = static_cast<PixelHit*>(((TClonesArray*)fOutput->At(iobj))->At(ihit));
145  if (!hit) {
146  continue;
147  }
148  new (&hitPayload[ihit]) PixelPayload::Hit();
149  hitPayload[ihit].fDetectorID = hit->GetDetectorID();
150  hitPayload[ihit].posX = hit->GetX();
151  hitPayload[ihit].posY = hit->GetY();
152  hitPayload[ihit].posZ = hit->GetZ();
153  hitPayload[ihit].dposX = hit->GetDx();
154  hitPayload[ihit].dposY = hit->GetDy();
155  hitPayload[ihit].dposZ = hit->GetDz();
156  }
157  // LOG(debug) << "second part has size = " << hitsSize;
158  partsOut.AddPart(std::move(msgTCA));
159 
160  // LOG(debug) << "Output array should have " << nofEntries << "hits";
161  }
162  }
163 
164  Send(partsOut, fOutputChannelName);
165  fSentMsgs++;
166 
167  return true;
168  }
169 
170  virtual void Init()
171  {
172  fDataToKeep = fConfig->GetValue<std::string>("keep-data");
173  fInputChannelName = fConfig->GetValue<std::string>("in-channel");
174  fOutputChannelName = fConfig->GetValue<std::string>("out-channel");
175  fParamChannelName = fConfig->GetValue<std::string>("par-channel");
176  fStaticParameters = fConfig->GetValue<bool>("static-pars");
177 
178  // fHitFinder->InitMQ(fRootParFileName,fAsciiParFileName);
179  fFairTask = new T();
180  fFairTask->SetStreamProcessing(kTRUE);
181  fGeoPar = new FairGeoParSet("FairGeoParSet");
182  fParCList = new TList();
183  fParCList->Add(fGeoPar);
184  fFairTask->GetParList(fParCList);
185 
186  fOutput = new TList();
187  fInput = new TList();
188 
189  fInputArray = new TClonesArray("PixelDigi");
190  fInputArray->SetName("PixelDigis");
191  fInput->Add(fInputArray);
192  // fOutputArray = new TClonesArray("PixelHit");
193  // fOutputArray->SetName("PixelHits");
194 
195  OnData(fInputChannelName, &FairMQPixelTaskProcessorBin<T>::ProcessData);
196  }
197 
198  virtual void PostRun()
199  {
200  LOG(info) << "FairMQPixelTaskProcessorBin<T>::PostRun() Received " << fReceivedMsgs << " and sent " << fSentMsgs
201  << " messages!";
202  }
203 
204  private:
205  std::string fInputChannelName;
206  std::string fOutputChannelName;
207  std::string fParamChannelName;
208 
209  void UpdateParameters()
210  {
211  for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
212  FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
213  fParCList->Remove(tempObj);
214  fParCList->AddAt(UpdateParameter(tempObj), iparC);
215  }
216  }
217 
218  FairParGenericSet* UpdateParameter(FairParGenericSet* thisPar)
219  {
220  std::string paramName = thisPar->GetName();
221 
222  std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
223  LOG(warn) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar
224  << ")";
225  FairMQMessagePtr req(NewMessage(
226  const_cast<char*>(reqStr->c_str()),
227  reqStr->length(),
228  [](void* /* data */, void* hint) { delete static_cast<std::string*>(hint); },
229  reqStr));
230  FairMQMessagePtr rep(NewMessage());
231 
232  if (Send(req, fParamChannelName) > 0) {
233  if (Receive(rep, fParamChannelName) > 0) {
234  thisPar = nullptr;
235  Deserialize<RootSerializer>(*rep, thisPar);
236  LOG(info) << "Received parameter" << paramName << " from the server (" << thisPar << ")";
237  return thisPar;
238  }
239  }
240 
241  return nullptr;
242  }
243 
244  FairEventHeader* fEventHeader;
245  TList* fInput;
246  TList* fOutput;
247 
248  TClonesArray* fInputArray;
249  TClonesArray* fOutputArray;
250 
251  bool fStaticParameters;
252  int fNewRunId;
253  int fCurrentRunId;
254 
255  std::string fDataToKeep;
256 
257  int fReceivedMsgs = 0;
258  int fSentMsgs = 0;
259 
260  T* fFairTask;
261  TList* fParCList;
262  FairGeoParSet* fGeoPar;
263 
266 };
267 
268 #endif /* FAIRMQPIXELTASKPROCESSOR_H_ */
Double_t GetDz() const
Definition: FairHit.h:44
Double_t GetZ() const
Definition: FairHit.h:50
virtual const char * GetName() const
Definition: FairParSet.h:38
Double_t GetX() const
Definition: FairHit.h:48
void SetParamChannelName(const std::string &str)
bool ProcessData(FairMQParts &parts, int)
Int_t GetDetectorID() const
Definition: FairHit.h:47
Double_t GetDy() const
Definition: FairHit.h:43
void SetInputChannelName(const std::string &str)
Double_t GetY() const
Definition: FairHit.h:49
void SetDataToKeep(const std::string &str)
void SetOutputChannelName(const std::string &str)
Double_t GetDx() const
Definition: FairHit.h:42