15 #ifndef FAIRMQSAMPLER_H_
16 #define FAIRMQSAMPLER_H_
25 #include <FairMQDevice.h>
26 #include <FairMQLogger.h>
42 template<
typename Task>
47 : fFairRunAna(nullptr)
48 , fSamplerTask(nullptr)
54 , fOutChannelName(
"data1")
55 , fAckChannelName(
"ack")
70 LOG(info) <<
"Initializing Task...";
73 fSamplerTask =
new Task();
75 fInputFile = fConfig->GetValue<std::string>(
"input-file");
76 fParFile = fConfig->GetValue<std::string>(
"parameter-file");
77 fBranch = fConfig->GetValue<std::string>(
"branch");
78 fChainInput = fConfig->GetValue<
int>(
"chain-input");
80 std::string outChannelName = fConfig->GetValue<std::string>(
"out-channel");
81 std::string ackChannelName = fConfig->GetValue<std::string>(
"ack-channel");
83 if (outChannelName !=
"") {
84 fOutChannelName = outChannelName;
86 if (ackChannelName !=
"") {
87 fAckChannelName = ackChannelName;
95 for (
int i = 0; i < fChainInput; ++i) {
101 TString output = fInputFile;
102 output.Append(
".out.root");
105 fFairRunAna->
AddTask(fSamplerTask);
107 if (fParFile !=
"") {
110 parInput->
open(TString(fParFile).Data());
114 LOG(warn) <<
"Parameter file not provided. Starting without RuntimeDB.";
121 LOG(info) <<
"Task initialized.";
122 LOG(info) <<
"Number of events to process: " << fNumEvents;
127 fStart = std::chrono::high_resolution_clock::now();
133 FairMQMessagePtr msg;
136 fFairRunAna->
RunMQ(fSentMsgs);
139 if (Send(msg, fOutChannelName) >= 0) {
141 if (fSentMsgs == fNumEvents) {
153 }
catch (std::exception& e) {
154 LOG(error) <<
"Exception when ending AckListener thread: " << e.what();
169 uint64_t numAcks = 0;
170 for (Long64_t eventNr = 0; eventNr < fNumEvents; ++eventNr) {
171 FairMQMessagePtr ack(NewMessage());
172 if (Receive(ack, fAckChannelName) >= 0) {
176 if (NewStatePending()) {
181 fEnd = std::chrono::high_resolution_clock::now();
182 LOG(info) <<
"Acknowledged " << numAcks
183 <<
" messages in: " << std::chrono::duration<double, std::milli>(fEnd - fStart).count() <<
"ms.";
189 std::chrono::high_resolution_clock::time_point fStart;
190 std::chrono::high_resolution_clock::time_point fEnd;
191 std::string fInputFile;
192 std::string fParFile;
194 std::string fOutChannelName;
195 std::string fAckChannelName;
199 std::thread fAckListener;
void AddFile(TString FileName)
list of container factories
virtual bool ConditionalRun()
static FairRootManager * Instance()
void SetSink(FairSink *tempSink)
void RunMQ(Long64_t entry)
void SetEventIndex(Long64_t eventIndex)
FairRuntimeDb * GetRuntimeDb(void)
void SetTransport(std::shared_ptr< FairMQTransportFactory > factory)
virtual void AddTask(FairTask *t)
Bool_t open(const Text_t *fname, Option_t *option="READ", const Text_t *ftitle="", Int_t compress=1)
void GetPayload(std::unique_ptr< FairMQMessage > &msg)
void SetBranch(const std::string &branch)
FairMQSampler operator=(const FairMQSampler &)=delete
virtual void SetSource(FairSource *tempSource)
Bool_t setFirstInput(FairParIo *)