FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPixAltTaskProcessorBin.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQPIXALTTASKPROCESSORBIN_H_
10 #define FAIRMQPIXALTTASKPROCESSORBIN_H_
11 
12 #include "FairEventHeader.h"
13 #include "FairGeoParSet.h"
14 #include "FairParGenericSet.h"
15 #include "PixelDigi.h"
16 #include "PixelPayload.h"
17 #include "RootSerializer.h"
18 
19 #include <FairMQDevice.h>
20 #include <FairMQParts.h>
21 #include <TClonesArray.h>
22 #include <TList.h>
23 #include <string>
24 
25 template<typename T>
26 class FairMQPixAltTaskProcessorBin : public FairMQDevice
27 {
28  public:
30  : FairMQDevice()
31  , fInputChannelName("data-in")
32  , fOutputChannelName("data-out")
33  , fParamChannelName("param")
34  , fEventHeader(NULL)
35  , fNewRunId(1)
36  , fCurrentRunId(-1)
37  , fDataToKeep("")
38  , fReceivedMsgs(0)
39  , fSentMsgs(0)
40  , fFairTask(NULL)
41  , fParCList(NULL)
42  , fGeoPar(nullptr)
43  {}
44 
46  {
47  delete fGeoPar;
48  fGeoPar = nullptr;
49  delete fFairTask;
50  }
51 
52  void SetDataToKeep(const std::string& tStr) { fDataToKeep = tStr; }
53 
54  void SetInputChannelName(const std::string& tstr) { fInputChannelName = tstr; }
55  void SetOutputChannelName(const std::string& tstr) { fOutputChannelName = tstr; }
56  void SetParamChannelName(const std::string& tstr) { fParamChannelName = tstr; }
57 
58  protected:
59  bool ProcessData(FairMQParts& parts, int)
60  {
61  // LOG(debug)<<"message received with " << parts.Size() << " parts!";
62  fReceivedMsgs++;
63 
64  if (parts.Size() == 0)
65  return 0; // probably impossible, but still check
66 
67  // expecting even number of parts in the form: header,data,header,data,header,data and so on...
68  int nPPE = 2; // nof parts per event
69 
70  if (parts.Size() % nPPE >= 1)
71  LOG(info) << "received " << parts.Size() << " parts, will ignore last part!!!";
72 
73  // creating output multipart message
74  FairMQParts partsOut;
75 
76  for (int ievent = 0; ievent < parts.Size() / nPPE; ievent++) {
77  // the first part should be the event header
78  PixelPayload::EventHeader* payloadE =
79  static_cast<PixelPayload::EventHeader*>(parts.At(nPPE * ievent)->GetData());
80  // LOG(debug) << "GOT EVENT " << payloadE->fMCEntryNo << " OF RUN " << payloadE->fRunId << " (part " <<
81  // payloadE->fPartNo << ")";
82 
83  fNewRunId = payloadE->fRunId;
84  if (fNewRunId != fCurrentRunId) {
85  fCurrentRunId = fNewRunId;
86  UpdateParameters();
87  fFairTask->InitMQ(fParCList);
88 
89  LOG(info) << "Parameters updated, back to ProcessData(" << parts.Size() << " parts!)";
90  }
91 
92  // the second part should the TClonesArray with necessary data... now assuming Digi
93  PixelPayload::Digi* payloadD = static_cast<PixelPayload::Digi*>(parts.At(nPPE * ievent + 1)->GetData());
94  int digiArraySize = parts.At(nPPE * ievent + 1)->GetSize();
95  int nofDigis = digiArraySize / sizeof(PixelPayload::Digi);
96 
97  // LOG(debug) << " EVENT HAS " << nofDigis << " DIGIS!!!";
98 
99  // create eventHeader part
101  header->fRunId = payloadE->fRunId;
102  header->fMCEntryNo = payloadE->fMCEntryNo;
103  header->fPartNo = payloadE->fPartNo;
104  FairMQMessagePtr msgHeader(
105  NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
106  delete static_cast<PixelPayload::EventHeader*>(data);
107  }));
108  partsOut.AddPart(std::move(msgHeader));
109 
110  // create part with hits
111  int hitsSize = nofDigis * sizeof(PixelPayload::Hit);
112 
113  FairMQMessagePtr msgTCA = NewMessage(hitsSize);
114 
115  PixelPayload::Hit* hitPayload = static_cast<PixelPayload::Hit*>(msgTCA->GetData());
116 
117  // actually find hits
118  int nofHits = 0;
119  fFairTask->ExecMQ(payloadD, nofDigis, hitPayload, nofHits);
120 
121  partsOut.AddPart(std::move(msgTCA));
122  }
123 
124  Send(partsOut, fOutputChannelName);
125  fSentMsgs++;
126 
127  return true;
128  }
129 
130  virtual void Init()
131  {
132  fDataToKeep = fConfig->GetValue<std::string>("keep-data");
133  fInputChannelName = fConfig->GetValue<std::string>("in-channel");
134  fOutputChannelName = fConfig->GetValue<std::string>("out-channel");
135  fParamChannelName = fConfig->GetValue<std::string>("par-channel");
136 
137  // fHitFinder->InitMQ(fRootParFileName,fAsciiParFileName);
138  fFairTask = new T();
139  fFairTask->SetStreamProcessing(kTRUE);
140  fGeoPar = new FairGeoParSet("FairGeoParSet");
141  fParCList = new TList();
142  fParCList->Add(fGeoPar);
143  fFairTask->GetParList(fParCList);
144 
145  OnData(fInputChannelName, &FairMQPixAltTaskProcessorBin<T>::ProcessData);
146  }
147 
148  virtual void PostRun()
149  {
150  LOG(info) << "FairMQPixAltTaskProcessorBin<T>::PostRun() Received " << fReceivedMsgs << " and sent "
151  << fSentMsgs << " messages!";
152  }
153 
154  private:
155  std::string fInputChannelName;
156  std::string fOutputChannelName;
157  std::string fParamChannelName;
158 
159  void UpdateParameters()
160  {
161  for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
162  FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
163  fParCList->Remove(tempObj);
164  fParCList->AddAt(UpdateParameter(tempObj), iparC);
165  }
166  }
167 
168  FairParGenericSet* UpdateParameter(FairParGenericSet* thisPar)
169  {
170  std::string paramName = thisPar->GetName();
171 
172  std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
173  LOG(warn) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar
174  << ")";
175  FairMQMessagePtr req(NewMessage(
176  const_cast<char*>(reqStr->c_str()),
177  reqStr->length(),
178  [](void* /*data*/, void* obj) { delete static_cast<std::string*>(obj); },
179  reqStr));
180  FairMQMessagePtr rep(NewMessage());
181 
182  if (Send(req, fParamChannelName) > 0) {
183  if (Receive(rep, fParamChannelName) > 0) {
184  thisPar = nullptr;
185  Deserialize<RootSerializer>(*rep, thisPar);
186  LOG(info) << "Received parameter" << paramName << " from the server (" << thisPar << ")";
187  return thisPar;
188  }
189  }
190  return nullptr;
191  }
192 
193  FairEventHeader* fEventHeader;
194 
195  int fNewRunId;
196  int fCurrentRunId;
197 
198  std::string fDataToKeep;
199 
200  int fReceivedMsgs = 0;
201  int fSentMsgs = 0;
202 
203  T* fFairTask;
204  TList* fParCList;
205  FairGeoParSet* fGeoPar;
206 
209 };
210 
211 #endif /* FAIRMQPIXALTTASKPROCESSOR_H_ */
virtual const char * GetName() const
Definition: FairParSet.h:38
void SetInputChannelName(const std::string &tstr)
bool ProcessData(FairMQParts &parts, int)
void SetParamChannelName(const std::string &tstr)
void SetDataToKeep(const std::string &tStr)
void SetOutputChannelName(const std::string &tstr)