22 #include <FairMQLogger.h>
23 #include <FairMQMessage.h>
25 #include <TClonesArray.h>
34 , fOutputChannelName(
"data-out")
49 fFileNames = fConfig->GetValue<std::vector<std::string>>(
"file-name");
50 fMaxIndex = fConfig->GetValue<int64_t>(
"max-index");
51 fBranchNames = fConfig->GetValue<std::vector<std::string>>(
"branch-name");
52 fOutputChannelName = fConfig->GetValue<std::string>(
"out-channel");
53 fAckChannelName = fConfig->GetValue<std::string>(
"ack-channel");
57 if (fSource ==
nullptr) {
59 for (
unsigned int ifile = 1; ifile < fFileNames.size(); ifile++) {
65 LOG(info) <<
"Going to request " << fBranchNames.size() <<
" branches:";
66 for (
unsigned int ibrn = 0; ibrn < fBranchNames.size(); ibrn++) {
67 LOG(info) <<
" requesting branch \"" << fBranchNames[ibrn] <<
"\"";
68 int branchStat = fSource->
ActivateObject((TObject**)&fInputObjects[fNObjects],
69 fBranchNames[ibrn].c_str());
70 if (fInputObjects[fNObjects]) {
71 LOG(info) <<
"Activated object \"" << fInputObjects[fNObjects] <<
"\" with name \"" << fBranchNames[ibrn]
72 <<
"\" (" << branchStat <<
"), it got name: \"" << fInputObjects[fNObjects]->GetName() <<
"\"";
73 if (strcmp(fInputObjects[fNObjects]->GetName(), fBranchNames[ibrn].c_str()))
74 if (strcmp(fInputObjects[fNObjects]->ClassName(),
"TClonesArray") == 0)
75 ((TClonesArray*)fInputObjects[fNObjects])->SetName(fBranchNames[ibrn].c_str());
82 LOG(info) <<
"Input source has " << fMaxIndex <<
" events.";
87 if (fAckChannelName !=
"") {
91 LOG(info) <<
"FairMQPixelSampler::PreRun() finished!";
96 if (fEventCounter == fMaxIndex)
99 Int_t readEventReturn = fSource->
ReadEvent(fEventCounter);
101 if (readEventReturn != 0)
106 for (
int iobj = 0; iobj < fNObjects; iobj++) {
107 FairMQMessagePtr mess(NewMessage());
108 Serialize<RootSerializer>(*mess, fInputObjects[iobj]);
109 parts.AddPart(std::move(mess));
112 Send(parts, fOutputChannelName);
121 if (fAckChannelName !=
"") {
125 LOG(info) <<
"PostRun() finished!";
130 if (fAckChannelName !=
"") {
131 Long64_t numAcks = 0;
133 unique_ptr<FairMQMessage> ack(NewMessage());
134 if (Receive(ack, fAckChannelName) >= 0) {
137 }
while (numAcks < fMaxIndex);
139 LOG(info) <<
"Acknowledged " << numAcks <<
" messages.";
void AddFile(TString FileName)
virtual Int_t ReadEvent(UInt_t=0)=0
virtual ~FairMQPixelSampler()
virtual Int_t CheckMaxEventNo(Int_t=0)
virtual Bool_t ActivateObject(TObject **, const char *)
virtual bool ConditionalRun()