9 #ifndef FAIRMQPIXELTASKPROCESSOR_H_
10 #define FAIRMQPIXELTASKPROCESSOR_H_
18 #include <FairMQDevice.h>
19 #include <FairMQParts.h>
29 : fInputChannelName(
"data-in")
30 , fOutputChannelName(
"data-out")
31 , fParamChannelName(
"param")
32 , fEventHeader(nullptr)
33 , fMCEventHeader(nullptr)
36 , fStaticParameters(false)
69 TObject* objectToKeep =
nullptr;
74 std::vector<TObject*> tempObjects;
75 for (
int ipart = 0; ipart < parts.Size(); ipart++) {
76 TObject* obj =
nullptr;
77 Deserialize<RootSerializer>(*parts.At(ipart), obj);
78 tempObjects.push_back(obj);
80 if (strcmp(tempObjects.back()->GetName(),
"EventHeader.") == 0) {
83 if (strcmp(tempObjects.back()->GetName(),
"MCEventHeader.") == 0) {
86 fInput->Add(tempObjects.back());
90 if (fStaticParameters ==
false || fCurrentRunId == -1) {
93 fNewRunId = fEventHeader->
GetRunId();
94 }
else if (fMCEventHeader) {
95 fNewRunId = fMCEventHeader->
GetRunID();
100 if (fNewRunId != fCurrentRunId) {
101 fCurrentRunId = fNewRunId;
103 fFairTask->InitMQ(fParCList);
105 LOG(info) <<
"Parameters updated, back to ProcessData(" << parts.Size() <<
" parts!)";
112 fFairTask->ExecMQ(fInput, fOutput);
114 if (!fDataToKeep.empty()) {
115 objectToKeep = (fInput->FindObject(fDataToKeep.c_str()))->Clone();
117 fOutput->Add(objectToKeep);
120 FairMQParts partsOut;
123 FairMQMessagePtr mess(NewMessage());
124 Serialize<RootSerializer>(*mess, fEventHeader);
125 partsOut.AddPart(std::move(mess));
126 }
else if (fMCEventHeader) {
127 FairMQMessagePtr mess(NewMessage());
128 Serialize<RootSerializer>(*mess, fMCEventHeader);
129 partsOut.AddPart(std::move(mess));
132 for (
int iobj = 0; iobj < fOutput->GetEntries(); iobj++) {
133 FairMQMessagePtr mess(NewMessage());
134 Serialize<RootSerializer>(*mess, fOutput->At(iobj));
135 partsOut.AddPart(std::move(mess));
138 Send(partsOut, fOutputChannelName);
143 for (
unsigned int ipart = 0; ipart < tempObjects.size(); ipart++) {
144 if (tempObjects[ipart]) {
145 delete tempObjects[ipart];
156 fDataToKeep = fConfig->GetValue<std::string>(
"keep-data");
157 fInputChannelName = fConfig->GetValue<std::string>(
"in-channel");
158 fOutputChannelName = fConfig->GetValue<std::string>(
"out-channel");
159 fParamChannelName = fConfig->GetValue<std::string>(
"par-channel");
160 fStaticParameters = fConfig->GetValue<
bool>(
"static-pars");
163 fFairTask->SetStreamProcessing(kTRUE);
165 fParCList =
new TList();
166 fParCList->Add(fGeoPar);
167 fFairTask->GetParList(fParCList);
169 fOutput =
new TList();
170 fInput =
new TList();
177 LOG(info) <<
"FairMQPixelTaskProcessor<T>::PostRun() Received " << fReceivedMsgs <<
" and sent " << fSentMsgs
182 std::string fInputChannelName;
183 std::string fOutputChannelName;
184 std::string fParamChannelName;
186 void UpdateParameters()
188 for (
int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
190 fParCList->Remove(tempObj);
191 fParCList->AddAt(UpdateParameter(tempObj), iparC);
197 std::string paramName = thisPar->
GetName();
199 std::string* reqStr =
new std::string(paramName +
"," + std::to_string(fCurrentRunId));
200 LOG(debug) <<
"Requesting parameter \"" << paramName <<
"\" for Run ID " << fCurrentRunId <<
" (" << thisPar
203 FairMQMessagePtr req(NewMessage(
204 const_cast<char*>(reqStr->c_str()),
206 [](
void* ,
void* hint) {
delete static_cast<std::string*
>(hint); },
208 FairMQMessagePtr rep(NewMessage());
210 if (Send(req, fParamChannelName) > 0) {
211 if (Receive(rep, fParamChannelName) > 0) {
213 Deserialize<RootSerializer>(*rep, thisPar);
214 LOG(info) <<
"Received parameter" << paramName <<
" from the server (" << thisPar <<
")";
227 bool fStaticParameters;
231 std::string fDataToKeep;
233 int fReceivedMsgs = 0;
virtual const char * GetName() const
bool ProcessData(FairMQParts &parts, int)
void SetParamChannelName(const std::string &tstr)
void SetOutputChannelName(const std::string &tstr)
void SetInputChannelName(const std::string &tstr)
virtual ~FairMQPixelTaskProcessor()
FairMQPixelTaskProcessor()
void SetDataToKeep(const std::string &tStr)
void SetStaticParameters(bool tbool)