FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPixelTaskProcessor.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQPIXELTASKPROCESSOR_H_
10 #define FAIRMQPIXELTASKPROCESSOR_H_
11 
12 #include "FairEventHeader.h"
13 #include "FairGeoParSet.h"
14 #include "FairMCEventHeader.h"
15 #include "FairParGenericSet.h"
16 #include "RootSerializer.h"
17 
18 #include <FairMQDevice.h>
19 #include <FairMQParts.h>
20 #include <TList.h>
21 #include <string>
22 #include <vector>
23 
24 template<typename T>
25 class FairMQPixelTaskProcessor : public FairMQDevice
26 {
27  public:
29  : fInputChannelName("data-in")
30  , fOutputChannelName("data-out")
31  , fParamChannelName("param")
32  , fEventHeader(nullptr)
33  , fMCEventHeader(nullptr)
34  , fInput(nullptr)
35  , fOutput(nullptr)
36  , fStaticParameters(false)
37  , fNewRunId(1)
38  , fCurrentRunId(-1)
39  , fDataToKeep("")
40  , fReceivedMsgs(0)
41  , fSentMsgs(0)
42  , fFairTask(nullptr)
43  , fParCList(nullptr)
44  , fGeoPar(nullptr)
45  {}
46 
48  {
49  delete fGeoPar;
50  fGeoPar = nullptr;
51  delete fInput;
52  fInput = nullptr;
53  delete fOutput;
54  fOutput = nullptr;
55  delete fFairTask;
56  }
57 
58  void SetDataToKeep(const std::string& tStr) { fDataToKeep = tStr; }
59 
60  void SetInputChannelName(const std::string& tstr) { fInputChannelName = tstr; }
61  void SetOutputChannelName(const std::string& tstr) { fOutputChannelName = tstr; }
62  void SetParamChannelName(const std::string& tstr) { fParamChannelName = tstr; }
63 
64  void SetStaticParameters(bool tbool) { fStaticParameters = tbool; }
65 
66  protected:
67  bool ProcessData(FairMQParts& parts, int)
68  {
69  TObject* objectToKeep = nullptr;
70 
71  // LOG(debug)<<"message received with " << parts.Size() << " parts.";
72  fReceivedMsgs++;
73 
74  std::vector<TObject*> tempObjects;
75  for (int ipart = 0; ipart < parts.Size(); ipart++) {
76  TObject* obj = nullptr;
77  Deserialize<RootSerializer>(*parts.At(ipart), obj);
78  tempObjects.push_back(obj);
79  // LOG(trace) << "got TObject with name \"" << tempObjects[ipart]->GetName() << "\".";
80  if (strcmp(tempObjects.back()->GetName(), "EventHeader.") == 0) {
81  fEventHeader = (FairEventHeader*)(tempObjects.back());
82  }
83  if (strcmp(tempObjects.back()->GetName(), "MCEventHeader.") == 0) {
84  fMCEventHeader = (FairMCEventHeader*)(tempObjects.back());
85  } else {
86  fInput->Add(tempObjects.back());
87  }
88  }
89 
90  if (fStaticParameters == false || fCurrentRunId == -1) {
91  // TODO: create fEventHeader form fMCEventHeader, if not there
92  if (fEventHeader) {
93  fNewRunId = fEventHeader->GetRunId();
94  } else if (fMCEventHeader) {
95  fNewRunId = fMCEventHeader->GetRunID();
96  }
97 
98  // LOG(debug)<<"got event header with run = " << fNewRunId;
99 
100  if (fNewRunId != fCurrentRunId) {
101  fCurrentRunId = fNewRunId;
102  UpdateParameters();
103  fFairTask->InitMQ(fParCList);
104 
105  LOG(info) << "Parameters updated, back to ProcessData(" << parts.Size() << " parts!)";
106  }
107  }
108 
109  // Execute hit finder task
110  fOutput->Clear();
111  // LOG(info) << " The blocking line... analyzing event " << fEventHeader->GetMCEntryNumber();
112  fFairTask->ExecMQ(fInput, fOutput);
113 
114  if (!fDataToKeep.empty()) {
115  objectToKeep = (fInput->FindObject(fDataToKeep.c_str()))->Clone();
116  if (objectToKeep)
117  fOutput->Add(objectToKeep);
118  }
119 
120  FairMQParts partsOut;
121 
122  if (fEventHeader) {
123  FairMQMessagePtr mess(NewMessage());
124  Serialize<RootSerializer>(*mess, fEventHeader);
125  partsOut.AddPart(std::move(mess));
126  } else if (fMCEventHeader) {
127  FairMQMessagePtr mess(NewMessage());
128  Serialize<RootSerializer>(*mess, fMCEventHeader);
129  partsOut.AddPart(std::move(mess));
130  }
131 
132  for (int iobj = 0; iobj < fOutput->GetEntries(); iobj++) {
133  FairMQMessagePtr mess(NewMessage());
134  Serialize<RootSerializer>(*mess, fOutput->At(iobj));
135  partsOut.AddPart(std::move(mess));
136  }
137 
138  Send(partsOut, fOutputChannelName);
139  fSentMsgs++;
140 
141  fInput->Clear();
142 
143  for (unsigned int ipart = 0; ipart < tempObjects.size(); ipart++) {
144  if (tempObjects[ipart]) {
145  delete tempObjects[ipart];
146  }
147  }
148 
149  tempObjects.clear();
150 
151  return true;
152  }
153 
154  virtual void Init()
155  {
156  fDataToKeep = fConfig->GetValue<std::string>("keep-data");
157  fInputChannelName = fConfig->GetValue<std::string>("in-channel");
158  fOutputChannelName = fConfig->GetValue<std::string>("out-channel");
159  fParamChannelName = fConfig->GetValue<std::string>("par-channel");
160  fStaticParameters = fConfig->GetValue<bool>("static-pars");
161 
162  fFairTask = new T();
163  fFairTask->SetStreamProcessing(kTRUE);
164  fGeoPar = new FairGeoParSet("FairGeoParSet");
165  fParCList = new TList();
166  fParCList->Add(fGeoPar);
167  fFairTask->GetParList(fParCList);
168 
169  fOutput = new TList();
170  fInput = new TList();
171 
172  OnData(fInputChannelName, &FairMQPixelTaskProcessor<T>::ProcessData);
173  }
174 
175  virtual void PostRun()
176  {
177  LOG(info) << "FairMQPixelTaskProcessor<T>::PostRun() Received " << fReceivedMsgs << " and sent " << fSentMsgs
178  << " messages!";
179  }
180 
181  private:
182  std::string fInputChannelName;
183  std::string fOutputChannelName;
184  std::string fParamChannelName;
185 
186  void UpdateParameters()
187  {
188  for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
189  FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
190  fParCList->Remove(tempObj);
191  fParCList->AddAt(UpdateParameter(tempObj), iparC);
192  }
193  }
194 
195  FairParGenericSet* UpdateParameter(FairParGenericSet* thisPar)
196  {
197  std::string paramName = thisPar->GetName();
198 
199  std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
200  LOG(debug) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar
201  << ")";
202 
203  FairMQMessagePtr req(NewMessage(
204  const_cast<char*>(reqStr->c_str()),
205  reqStr->length(),
206  [](void* /* data */, void* hint) { delete static_cast<std::string*>(hint); },
207  reqStr));
208  FairMQMessagePtr rep(NewMessage());
209 
210  if (Send(req, fParamChannelName) > 0) {
211  if (Receive(rep, fParamChannelName) > 0) {
212  thisPar = nullptr;
213  Deserialize<RootSerializer>(*rep, thisPar);
214  LOG(info) << "Received parameter" << paramName << " from the server (" << thisPar << ")";
215  return thisPar;
216  }
217  }
218 
219  return nullptr;
220  }
221 
222  FairEventHeader* fEventHeader;
223  FairMCEventHeader* fMCEventHeader;
224  TList* fInput;
225  TList* fOutput;
226 
227  bool fStaticParameters;
228  int fNewRunId;
229  int fCurrentRunId;
230 
231  std::string fDataToKeep;
232 
233  int fReceivedMsgs = 0;
234  int fSentMsgs = 0;
235 
236  T* fFairTask;
237  TList* fParCList;
238  FairGeoParSet* fGeoPar;
239 
242 };
243 
244 #endif /* FAIRMQPIXELTASKPROCESSOR_H_ */
UInt_t GetRunID() const
virtual const char * GetName() const
Definition: FairParSet.h:38
bool ProcessData(FairMQParts &parts, int)
void SetParamChannelName(const std::string &tstr)
void SetOutputChannelName(const std::string &tstr)
void SetInputChannelName(const std::string &tstr)
void SetDataToKeep(const std::string &tStr)