9 #ifndef FAIRMQPIXELTASKPROCESSORBIN_H_
10 #define FAIRMQPIXELTASKPROCESSORBIN_H_
20 #include <FairMQDevice.h>
21 #include <FairMQParts.h>
22 #include <TClonesArray.h>
31 : fInputChannelName(
"data-in")
32 , fOutputChannelName(
"data-out")
33 , fParamChannelName(
"param")
34 , fEventHeader(nullptr)
37 , fInputArray(nullptr)
38 , fOutputArray(nullptr)
39 , fStaticParameters(false)
59 fInputArray =
nullptr;
61 fOutputArray =
nullptr;
79 if (parts.Size() == 0)
87 if (fStaticParameters ==
false || fCurrentRunId == -1) {
88 fNewRunId = payloadE->
fRunId;
89 if (fNewRunId != fCurrentRunId) {
90 fCurrentRunId = fNewRunId;
92 fFairTask->InitMQ(fParCList);
94 LOG(info) <<
"Parameters updated, back to ProcessData(" << parts.Size() <<
" parts!)";
100 int digiArraySize = parts.At(1)->GetSize();
103 fInputArray->Clear();
104 for (
int idigi = 0; idigi < nofDigis; idigi++) {
105 new ((*fInputArray)[idigi])
PixelDigi(-1,
106 payloadD[idigi].fDetectorID,
107 payloadD[idigi].fFeID,
108 payloadD[idigi].fCol,
109 payloadD[idigi].fRow,
110 payloadD[idigi].fCharge);
118 fFairTask->ExecMQ(fInput, fOutput);
120 FairMQParts partsOut;
127 FairMQMessagePtr msgHeader(NewMessage(
132 partsOut.AddPart(std::move(msgHeader));
134 for (
int iobj = 0; iobj < fOutput->GetEntries(); iobj++) {
135 if (strcmp(fOutput->At(iobj)->GetName(),
"PixelHits") == 0) {
136 Int_t nofEntries = ((TClonesArray*)fOutput->At(iobj))->GetEntries();
139 FairMQMessagePtr msgTCA(NewMessage(hitsSize));
143 for (
int ihit = 0; ihit < nofEntries; ihit++) {
144 PixelHit* hit =
static_cast<PixelHit*
>(((TClonesArray*)fOutput->At(iobj))->At(ihit));
150 hitPayload[ihit].
posX = hit->
GetX();
151 hitPayload[ihit].
posY = hit->
GetY();
152 hitPayload[ihit].
posZ = hit->
GetZ();
158 partsOut.AddPart(std::move(msgTCA));
164 Send(partsOut, fOutputChannelName);
172 fDataToKeep = fConfig->GetValue<std::string>(
"keep-data");
173 fInputChannelName = fConfig->GetValue<std::string>(
"in-channel");
174 fOutputChannelName = fConfig->GetValue<std::string>(
"out-channel");
175 fParamChannelName = fConfig->GetValue<std::string>(
"par-channel");
176 fStaticParameters = fConfig->GetValue<
bool>(
"static-pars");
180 fFairTask->SetStreamProcessing(kTRUE);
182 fParCList =
new TList();
183 fParCList->Add(fGeoPar);
184 fFairTask->GetParList(fParCList);
186 fOutput =
new TList();
187 fInput =
new TList();
189 fInputArray =
new TClonesArray(
"PixelDigi");
190 fInputArray->SetName(
"PixelDigis");
191 fInput->Add(fInputArray);
200 LOG(info) <<
"FairMQPixelTaskProcessorBin<T>::PostRun() Received " << fReceivedMsgs <<
" and sent " << fSentMsgs
205 std::string fInputChannelName;
206 std::string fOutputChannelName;
207 std::string fParamChannelName;
209 void UpdateParameters()
211 for (
int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
213 fParCList->Remove(tempObj);
214 fParCList->AddAt(UpdateParameter(tempObj), iparC);
220 std::string paramName = thisPar->
GetName();
222 std::string* reqStr =
new std::string(paramName +
"," + std::to_string(fCurrentRunId));
223 LOG(warn) <<
"Requesting parameter \"" << paramName <<
"\" for Run ID " << fCurrentRunId <<
" (" << thisPar
225 FairMQMessagePtr req(NewMessage(
226 const_cast<char*>(reqStr->c_str()),
228 [](
void* ,
void* hint) {
delete static_cast<std::string*
>(hint); },
230 FairMQMessagePtr rep(NewMessage());
232 if (Send(req, fParamChannelName) > 0) {
233 if (Receive(rep, fParamChannelName) > 0) {
235 Deserialize<RootSerializer>(*rep, thisPar);
236 LOG(info) <<
"Received parameter" << paramName <<
" from the server (" << thisPar <<
")";
248 TClonesArray* fInputArray;
249 TClonesArray* fOutputArray;
251 bool fStaticParameters;
255 std::string fDataToKeep;
257 int fReceivedMsgs = 0;
virtual const char * GetName() const
FairMQPixelTaskProcessorBin()
virtual ~FairMQPixelTaskProcessorBin()
void SetParamChannelName(const std::string &str)
bool ProcessData(FairMQParts &parts, int)
Int_t GetDetectorID() const
void SetStaticParameters(bool tbool)
void SetInputChannelName(const std::string &str)
void SetDataToKeep(const std::string &str)
void SetOutputChannelName(const std::string &str)