FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParameterMQServer.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "ParameterMQServer.h"
16 
17 #include "FairParAsciiFileIo.h"
18 #include "FairParGenericSet.h"
19 #include "FairParRootFileIo.h"
20 #include "FairRunIdGenerator.h"
21 #include "FairRuntimeDb.h"
22 #include "RootSerializer.h"
23 
24 #include <FairMQLogger.h>
25 #include <Rtypes.h>
26 #include <TGeoManager.h>
27 #include <cstdlib> // getenv
28 
29 using namespace std;
30 
32  : fRtdb(FairRuntimeDb::instance())
33  , fRunId(0)
34  , fNofSimDevices(0)
35  , fFirstInputName("first_input.root")
36  , fFirstInputType("ROOT")
37  , fSecondInputName("")
38  , fSecondInputType("ROOT")
39  , fOutputName("")
40  , fOutputType("ROOT")
41  , fRequestChannelName("data")
42  , fUpdateChannelName("")
43 {}
44 
46 {
47  fRequestChannelName = fConfig->GetValue<string>("channel-name");
48  fUpdateChannelName = fConfig->GetValue<string>("update-channel-name");
49 
50  if (fRequestChannelName != "") {
51  OnData(fRequestChannelName, &ParameterMQServer::ProcessRequest);
52  }
53  if (fUpdateChannelName != "") {
54  OnData(fUpdateChannelName, &ParameterMQServer::ProcessUpdate);
55  }
56 }
57 
59 {
60  fFirstInputName = fConfig->GetValue<string>("first-input-name");
61  fFirstInputType = fConfig->GetValue<string>("first-input-type");
62  fSecondInputName = fConfig->GetValue<string>("second-input-name");
63  fSecondInputType = fConfig->GetValue<string>("second-input-type");
64  fOutputName = fConfig->GetValue<string>("output-name");
65  fOutputType = fConfig->GetValue<string>("output-type");
66 
67  if (::getenv("DDS_SESSION_ID")) {
68  std::string ddsSessionId = ::getenv("DDS_SESSION_ID");
69  if (fOutputName.length() > 5) {
70  ddsSessionId = "." + ddsSessionId + ".root";
71  fOutputName.replace(fOutputName.length() - 5, 5, ddsSessionId.c_str());
72  }
73  }
74 
75  if (fRtdb != 0) {
76  // Set first input
77  if (fFirstInputName != "") {
78  if (fFirstInputType == "ROOT") {
79  FairParRootFileIo* par1R = new FairParRootFileIo();
80  par1R->open(fFirstInputName.data(), "UPDATE");
81  fRtdb->setFirstInput(par1R);
82  } else if (fFirstInputType == "ASCII") {
84  par1A->open(fFirstInputName.data(), "in");
85  fRtdb->setFirstInput(par1A);
86  }
87  }
88 
89  // Set second input
90  if (fSecondInputName != "") {
91  if (fSecondInputType == "ROOT") {
92  FairParRootFileIo* par2R = new FairParRootFileIo();
93  par2R->open(fSecondInputName.data(), "UPDATE");
94  fRtdb->setSecondInput(par2R);
95  } else if (fSecondInputType == "ASCII") {
97  par2A->open(fSecondInputName.data(), "in");
98  fRtdb->setSecondInput(par2A);
99  }
100  }
101 
102  // Set output
103  if (fUpdateChannelName == "") {
104  if (fOutputName != "") {
105  if (fOutputType == "ROOT") {
107  parOut->open(fOutputName.data());
108  fRtdb->setOutput(parOut);
109  }
110 
111  fRtdb->saveOutput();
112  }
113  }
114  }
115 }
116 
117 bool ParameterMQServer::ProcessRequest(FairMQMessagePtr& req, int /*index*/)
118 {
119  string parameterName = "";
120  FairParGenericSet* par = nullptr;
121 
122  string reqStr(static_cast<char*>(req->GetData()), req->GetSize());
123  LOG(info) << "Received parameter request from client: \"" << reqStr << "\"";
124 
125  size_t pos = reqStr.rfind(",");
126  string newParameterName = reqStr.substr(0, pos);
127  int runId = stoi(reqStr.substr(pos + 1));
128  LOG(info) << "Parameter name: " << newParameterName;
129  LOG(info) << "Run ID: " << runId;
130 
131  LOG(info) << "Retrieving parameter...";
132  // Check if the parameter name has changed to avoid getting same container repeatedly
133  if (newParameterName != parameterName) {
134  parameterName = newParameterName;
135  par = static_cast<FairParGenericSet*>(fRtdb->getContainer(parameterName.c_str()));
136  }
137  fRtdb->initContainers(runId);
138 
139  LOG(info) << "Sending following parameter to the client:";
140  if (par) {
141  par->print();
142 
143  FairMQMessagePtr rep(NewMessage());
144  Serialize<RootSerializer>(*rep, par);
145 
146  if (Send(rep, fRequestChannelName, 0) < 0) {
147  LOG(ERROR) << "failed sending reply";
148  return false;
149  }
150  } else {
151  LOG(ERROR) << "Parameter uninitialized! Sending empty message back";
152  // Send an empty message back to keep the REQ/REP cycle
153  FairMQMessagePtr rep(NewMessage());
154  if (Send(rep, fRequestChannelName, 0) < 0) {
155  LOG(ERROR) << "failed sending reply";
156  return false;
157  }
158  }
159 
160  return true;
161 }
162 
163 bool ParameterMQServer::ProcessUpdate(FairMQMessagePtr& update, int /*index*/)
164 {
165  gGeoManager =
166  nullptr; // FairGeoParSet update deletes previous geometry because of resetting gGeoManager, so let's NULL it
167 
168  std::string* text;
169 
170  LOG(info) << "got process update message with size = " << update->GetSize() << " !";
171  if (update->GetSize() < 20) {
172  std::string repString = string(static_cast<char*>(update->GetData()), update->GetSize());
173  LOG(info) << "Received string " << repString << " !";
174  if (fNofSimDevices == 0) {
175  FairRunIdGenerator genid;
176  fRunId = genid.generateId();
177  }
178  string messageToSend = to_string(fRunId) + "_" + to_string(fNofSimDevices);
179  text = new string(messageToSend);
180  fNofSimDevices += 1;
181  LOG(info) << "Replying with \"" << messageToSend << "\"";
182  } else {
183  // get the run id coded in the description of FairParSet
184  FairParGenericSet* newPar = nullptr;
185  Deserialize<RootSerializer>(*update, newPar);
186  std::string parDescr = std::string(newPar->getDescription());
187  uint runId = 0;
188  if (parDescr.find("RUNID") != std::string::npos) {
189  parDescr.erase(0, parDescr.find("RUNID") + 5);
190  runId = atoi(parDescr.data());
191  if (parDescr.find("RUNID") != std::string::npos) {
192  parDescr.erase(0, parDescr.find("RUNID") + 5);
193  }
194  }
195  fRtdb->initContainers(runId);
196 
197  newPar->setChanged(true); // trigger writing to file
198  newPar->setStatic(true); // to get rid of error
199  newPar->Print();
200 
201  if (fRtdb->addContainer(newPar)) {
202  text = new string("SUCCESS");
203  } else {
204  text = new string("FAIL");
205  }
206 
207  Bool_t kParameterMerged = kTRUE;
208  FairParRootFileIo* parOut = new FairParRootFileIo(kParameterMerged);
209  parOut->open(fOutputName.data());
210  fRtdb->setOutput(parOut);
211  fRtdb->saveOutput();
212  fRtdb->closeOutput();
213  }
214 
215  FairMQMessagePtr msg(NewMessage(
216  const_cast<char*>(text->c_str()),
217  text->length(),
218  [](void* /*data*/, void* object) { delete static_cast<string*>(object); },
219  text));
220 
221  if (Send(msg, fUpdateChannelName) < 0) {
222  return false;
223  }
224  return true;
225 }
226 
Bool_t kParameterMerged
list of container factories
Definition: FairRuntimeDb.h:24
Bool_t open(const Text_t *fname, const Text_t *status="in")
bool ProcessUpdate(FairMQMessagePtr &, int)
void setStatic(Bool_t flag=kTRUE)
Definition: FairParSet.h:66
unsigned int generateId(void)
void setChanged(Bool_t flag=kTRUE)
Definition: FairParSet.h:72
virtual void print()
Definition: FairParSet.cxx:94
Bool_t setSecondInput(FairParIo *)
const char * getDescription() const
Definition: FairParSet.h:81
FairParSet * getContainer(const Text_t *)
Bool_t initContainers(Int_t runId, Int_t refId=-1, const Text_t *fileName="")
void closeOutput(void)
FairParRootFileIo * parOut
Bool_t addContainer(FairParSet *)
virtual void InitTask()
FairMQExParamsParOne * par
Bool_t setOutput(FairParIo *)
Bool_t open(const Text_t *fname, Option_t *option="READ", const Text_t *ftitle="", Int_t compress=1)
bool ProcessRequest(FairMQMessagePtr &, int)
Bool_t setFirstInput(FairParIo *)
void saveOutput(void)