FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPixelSampler.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "FairMQPixelSampler.h"
16 
17 #include "FairFileSource.h"
18 #include "FairRunAna.h"
19 #include "FairSource.h"
20 #include "RootSerializer.h"
21 
22 #include <FairMQLogger.h>
23 #include <FairMQMessage.h>
24 #include <Rtypes.h>
25 #include <TClonesArray.h>
26 #include <TObject.h>
27 #include <cstring>
28 #include <utility> // move
29 
30 using namespace std;
31 
33  : FairMQDevice()
34  , fOutputChannelName("data-out")
35  , fAckChannelName("")
36  , fRunAna(nullptr)
37  , fSource(nullptr)
38  , fInputObjects()
39  , fNObjects(0)
40  , fMaxIndex(-1)
41  , fEventCounter(0)
42  , fBranchNames()
43  , fFileNames()
44  , fAckListener()
45 {}
46 
48 {
49  fFileNames = fConfig->GetValue<std::vector<std::string>>("file-name");
50  fMaxIndex = fConfig->GetValue<int64_t>("max-index");
51  fBranchNames = fConfig->GetValue<std::vector<std::string>>("branch-name");
52  fOutputChannelName = fConfig->GetValue<std::string>("out-channel");
53  fAckChannelName = fConfig->GetValue<std::string>("ack-channel");
54 
55  fRunAna = new FairRunAna();
56 
57  if (fSource == nullptr) {
58  fSource = new FairFileSource(fFileNames.at(0).c_str());
59  for (unsigned int ifile = 1; ifile < fFileNames.size(); ifile++) {
60  ((FairFileSource*)fSource)->AddFile(fFileNames.at(ifile));
61  }
62  }
63 
64  fSource->Init();
65  LOG(info) << "Going to request " << fBranchNames.size() << " branches:";
66  for (unsigned int ibrn = 0; ibrn < fBranchNames.size(); ibrn++) {
67  LOG(info) << " requesting branch \"" << fBranchNames[ibrn] << "\"";
68  int branchStat = fSource->ActivateObject((TObject**)&fInputObjects[fNObjects],
69  fBranchNames[ibrn].c_str()); // should check the status...
70  if (fInputObjects[fNObjects]) {
71  LOG(info) << "Activated object \"" << fInputObjects[fNObjects] << "\" with name \"" << fBranchNames[ibrn]
72  << "\" (" << branchStat << "), it got name: \"" << fInputObjects[fNObjects]->GetName() << "\"";
73  if (strcmp(fInputObjects[fNObjects]->GetName(), fBranchNames[ibrn].c_str()))
74  if (strcmp(fInputObjects[fNObjects]->ClassName(), "TClonesArray") == 0)
75  ((TClonesArray*)fInputObjects[fNObjects])->SetName(fBranchNames[ibrn].c_str());
76  fNObjects++;
77  }
78  }
79 
80  if (fMaxIndex < 0)
81  fMaxIndex = fSource->CheckMaxEventNo();
82  LOG(info) << "Input source has " << fMaxIndex << " events.";
83 }
84 
86 {
87  if (fAckChannelName != "") {
88  fAckListener = thread(&FairMQPixelSampler::ListenForAcks, this);
89  }
90 
91  LOG(info) << "FairMQPixelSampler::PreRun() finished!";
92 }
93 
95 {
96  if (fEventCounter == fMaxIndex)
97  return false;
98 
99  Int_t readEventReturn = fSource->ReadEvent(fEventCounter);
100 
101  if (readEventReturn != 0)
102  return false;
103 
104  FairMQParts parts;
105 
106  for (int iobj = 0; iobj < fNObjects; iobj++) {
107  FairMQMessagePtr mess(NewMessage());
108  Serialize<RootSerializer>(*mess, fInputObjects[iobj]);
109  parts.AddPart(std::move(mess));
110  }
111 
112  Send(parts, fOutputChannelName);
113 
114  fEventCounter++;
115 
116  return true;
117 }
118 
120 {
121  if (fAckChannelName != "") {
122  fAckListener.join();
123  }
124 
125  LOG(info) << "PostRun() finished!";
126 }
127 
129 {
130  if (fAckChannelName != "") {
131  Long64_t numAcks = 0;
132  do {
133  unique_ptr<FairMQMessage> ack(NewMessage());
134  if (Receive(ack, fAckChannelName) >= 0) {
135  numAcks++;
136  }
137  } while (numAcks < fMaxIndex);
138 
139  LOG(info) << "Acknowledged " << numAcks << " messages.";
140  }
141 }
142 
void AddFile(TString FileName)
virtual Int_t ReadEvent(UInt_t=0)=0
virtual Bool_t Init()=0
virtual Int_t CheckMaxEventNo(Int_t=0)
Definition: FairSource.h:50
virtual Bool_t ActivateObject(TObject **, const char *)
Definition: FairSource.h:43
virtual bool ConditionalRun()