9 #ifndef FAIRMQPIXALTTASKPROCESSORBIN_H_
10 #define FAIRMQPIXALTTASKPROCESSORBIN_H_
19 #include <FairMQDevice.h>
20 #include <FairMQParts.h>
21 #include <TClonesArray.h>
31 , fInputChannelName(
"data-in")
32 , fOutputChannelName(
"data-out")
33 , fParamChannelName(
"param")
64 if (parts.Size() == 0)
70 if (parts.Size() % nPPE >= 1)
71 LOG(info) <<
"received " << parts.Size() <<
" parts, will ignore last part!!!";
76 for (
int ievent = 0; ievent < parts.Size() / nPPE; ievent++) {
83 fNewRunId = payloadE->
fRunId;
84 if (fNewRunId != fCurrentRunId) {
85 fCurrentRunId = fNewRunId;
87 fFairTask->InitMQ(fParCList);
89 LOG(info) <<
"Parameters updated, back to ProcessData(" << parts.Size() <<
" parts!)";
94 int digiArraySize = parts.At(nPPE * ievent + 1)->GetSize();
104 FairMQMessagePtr msgHeader(
108 partsOut.AddPart(std::move(msgHeader));
113 FairMQMessagePtr msgTCA = NewMessage(hitsSize);
119 fFairTask->ExecMQ(payloadD, nofDigis, hitPayload, nofHits);
121 partsOut.AddPart(std::move(msgTCA));
124 Send(partsOut, fOutputChannelName);
132 fDataToKeep = fConfig->GetValue<std::string>(
"keep-data");
133 fInputChannelName = fConfig->GetValue<std::string>(
"in-channel");
134 fOutputChannelName = fConfig->GetValue<std::string>(
"out-channel");
135 fParamChannelName = fConfig->GetValue<std::string>(
"par-channel");
139 fFairTask->SetStreamProcessing(kTRUE);
141 fParCList =
new TList();
142 fParCList->Add(fGeoPar);
143 fFairTask->GetParList(fParCList);
150 LOG(info) <<
"FairMQPixAltTaskProcessorBin<T>::PostRun() Received " << fReceivedMsgs <<
" and sent "
151 << fSentMsgs <<
" messages!";
155 std::string fInputChannelName;
156 std::string fOutputChannelName;
157 std::string fParamChannelName;
159 void UpdateParameters()
161 for (
int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
163 fParCList->Remove(tempObj);
164 fParCList->AddAt(UpdateParameter(tempObj), iparC);
170 std::string paramName = thisPar->
GetName();
172 std::string* reqStr =
new std::string(paramName +
"," + std::to_string(fCurrentRunId));
173 LOG(warn) <<
"Requesting parameter \"" << paramName <<
"\" for Run ID " << fCurrentRunId <<
" (" << thisPar
175 FairMQMessagePtr req(NewMessage(
176 const_cast<char*>(reqStr->c_str()),
178 [](
void* ,
void* obj) {
delete static_cast<std::string*
>(obj); },
180 FairMQMessagePtr rep(NewMessage());
182 if (Send(req, fParamChannelName) > 0) {
183 if (Receive(rep, fParamChannelName) > 0) {
185 Deserialize<RootSerializer>(*rep, thisPar);
186 LOG(info) <<
"Received parameter" << paramName <<
" from the server (" << thisPar <<
")";
198 std::string fDataToKeep;
200 int fReceivedMsgs = 0;
virtual const char * GetName() const
FairMQPixAltTaskProcessorBin()
void SetInputChannelName(const std::string &tstr)
bool ProcessData(FairMQParts &parts, int)
virtual ~FairMQPixAltTaskProcessorBin()
void SetParamChannelName(const std::string &tstr)
void SetDataToKeep(const std::string &tStr)
void SetOutputChannelName(const std::string &tstr)