FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQSampler.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQSAMPLER_H_
16 #define FAIRMQSAMPLER_H_
17 
18 #include "FairFileSource.h"
19 #include "FairMQSamplerTask.h"
20 #include "FairParRootFileIo.h"
21 #include "FairRootFileSink.h"
22 #include "FairRunAna.h"
23 #include "FairRuntimeDb.h"
24 
25 #include <FairMQDevice.h>
26 #include <FairMQLogger.h>
27 #include <chrono>
28 #include <thread>
29 
42 template<typename Task>
43 class FairMQSampler : public FairMQDevice
44 {
45  public:
47  : fFairRunAna(nullptr)
48  , fSamplerTask(nullptr)
49  , fStart()
50  , fEnd()
51  , fInputFile()
52  , fParFile()
53  , fBranch()
54  , fOutChannelName("data1")
55  , fAckChannelName("ack")
56  , fNumEvents(0)
57  , fChainInput(0)
58  , fSentMsgs(0)
59  , fAckListener()
60  {}
61 
62  FairMQSampler(const FairMQSampler&) = delete;
63  FairMQSampler operator=(const FairMQSampler&) = delete;
64 
65  virtual ~FairMQSampler() {}
66 
67  protected:
68  virtual void InitTask()
69  {
70  LOG(info) << "Initializing Task...";
71 
72  fFairRunAna = new FairRunAna();
73  fSamplerTask = new Task();
74 
75  fInputFile = fConfig->GetValue<std::string>("input-file");
76  fParFile = fConfig->GetValue<std::string>("parameter-file");
77  fBranch = fConfig->GetValue<std::string>("branch");
78  fChainInput = fConfig->GetValue<int>("chain-input");
79 
80  std::string outChannelName = fConfig->GetValue<std::string>("out-channel");
81  std::string ackChannelName = fConfig->GetValue<std::string>("ack-channel");
82  // check if the returned value actually exists, for the compatibility with old devices.
83  if (outChannelName != "") {
84  fOutChannelName = outChannelName;
85  }
86  if (ackChannelName != "") {
87  fAckChannelName = ackChannelName;
88  }
89 
90  fSamplerTask->SetBranch(fBranch);
91  fSamplerTask->SetTransport(fTransportFactory);
92 
93  FairFileSource* source = new FairFileSource(TString(fInputFile));
94  // Adds the same file to the input. The output will still be a single file.
95  for (int i = 0; i < fChainInput; ++i) {
96  source->AddFile(fInputFile);
97  }
98 
99  fFairRunAna->SetSource(source);
100 
101  TString output = fInputFile;
102  output.Append(".out.root");
103  fFairRunAna->SetSink(new FairRootFileSink(output.Data()));
104 
105  fFairRunAna->AddTask(fSamplerTask);
106 
107  if (fParFile != "") {
108  FairRuntimeDb* rtdb = fFairRunAna->GetRuntimeDb();
109  FairParRootFileIo* parInput = new FairParRootFileIo();
110  parInput->open(TString(fParFile).Data());
111  rtdb->setFirstInput(parInput);
112  rtdb->print();
113  } else {
114  LOG(warn) << "Parameter file not provided. Starting without RuntimeDB.";
115  }
116 
117  fFairRunAna->Init();
118  // fFairRunAna->Run(0, 0);
119  fNumEvents = int((FairRootManager::Instance()->GetInChain())->GetEntries());
120 
121  LOG(info) << "Task initialized.";
122  LOG(info) << "Number of events to process: " << fNumEvents;
123  }
124 
125  virtual void PreRun()
126  {
127  fStart = std::chrono::high_resolution_clock::now();
128  fAckListener = std::thread(&FairMQSampler::ListenForAcks, this);
129  }
130 
131  virtual bool ConditionalRun()
132  {
133  FairMQMessagePtr msg;
134 
135  fSamplerTask->SetEventIndex(fSentMsgs);
136  fFairRunAna->RunMQ(fSentMsgs);
137  fSamplerTask->GetPayload(msg);
138 
139  if (Send(msg, fOutChannelName) >= 0) {
140  ++fSentMsgs;
141  if (fSentMsgs == fNumEvents) {
142  return false;
143  }
144  }
145 
146  return true;
147  }
148 
149  virtual void PostRun()
150  {
151  try {
152  fAckListener.join();
153  } catch (std::exception& e) {
154  LOG(error) << "Exception when ending AckListener thread: " << e.what();
155  exit(EXIT_FAILURE);
156  }
157  }
158 
159  virtual void ResetTask()
160  {
161  if (fFairRunAna) {
162  fFairRunAna->TerminateRun();
163  }
164  delete fSamplerTask;
165  }
166 
168  {
169  uint64_t numAcks = 0;
170  for (Long64_t eventNr = 0; eventNr < fNumEvents; ++eventNr) {
171  FairMQMessagePtr ack(NewMessage());
172  if (Receive(ack, fAckChannelName) >= 0) {
173  ++numAcks;
174  }
175 
176  if (NewStatePending()) {
177  break;
178  }
179  }
180 
181  fEnd = std::chrono::high_resolution_clock::now();
182  LOG(info) << "Acknowledged " << numAcks
183  << " messages in: " << std::chrono::duration<double, std::milli>(fEnd - fStart).count() << "ms.";
184  }
185 
186  private:
187  FairRunAna* fFairRunAna;
188  FairMQSamplerTask* fSamplerTask;
189  std::chrono::high_resolution_clock::time_point fStart;
190  std::chrono::high_resolution_clock::time_point fEnd;
191  std::string fInputFile; // Filename of a root file containing the simulated digis.
192  std::string fParFile;
193  std::string fBranch; // The name of the sub-detector branch to stream the digis from.
194  std::string fOutChannelName;
195  std::string fAckChannelName;
196  int fNumEvents;
197  int fChainInput;
198  int fSentMsgs;
199  std::thread fAckListener;
200 };
201 
202 #endif /* FAIRMQSAMPLER_H_ */
void AddFile(TString FileName)
list of container factories
Definition: FairRuntimeDb.h:24
void TerminateRun()
Definition: FairRunAna.cxx:602
virtual bool ConditionalRun()
void Init()
Definition: FairRunAna.cxx:127
virtual void InitTask()
Definition: FairMQSampler.h:68
virtual ~FairMQSampler()
Definition: FairMQSampler.h:65
virtual void ResetTask()
static FairRootManager * Instance()
void print(void)
void SetSink(FairSink *tempSink)
Definition: FairRun.h:84
void RunMQ(Long64_t entry)
Definition: FairRunAna.cxx:478
virtual void PostRun()
void SetEventIndex(Long64_t eventIndex)
FairRuntimeDb * GetRuntimeDb(void)
Definition: FairRun.h:80
void SetTransport(std::shared_ptr< FairMQTransportFactory > factory)
virtual void AddTask(FairTask *t)
Definition: FairRun.cxx:70
Bool_t open(const Text_t *fname, Option_t *option="READ", const Text_t *ftitle="", Int_t compress=1)
void GetPayload(std::unique_ptr< FairMQMessage > &msg)
void SetBranch(const std::string &branch)
FairMQSampler operator=(const FairMQSampler &)=delete
virtual void SetSource(FairSource *tempSource)
Definition: FairRunAna.h:70
virtual void PreRun()
void ListenForAcks()
Bool_t setFirstInput(FairParIo *)