FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPixelMerger.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "FairMQPixelMerger.h"
16 
17 #include "PixelEventHeader.h"
18 #include "RootSerializer.h"
19 
20 #include <FairMQLogger.h>
21 #include <TClonesArray.h>
22 #include <TObject.h>
23 #include <cstring> // strcmp
24 
25 using namespace std;
26 
28  : fInputChannelName("data-in")
29  , fOutputChannelName("data-out")
30  , fNofPartsPerEventMap()
31  , fObjectMap()
32  , fEvRIPair()
33  , fEvRIPartTrio()
34  , fRet()
35  , fNofReceivedMessages(0)
36  , fNofSentMessages(0)
37  , fNofParts(3)
38  , fEventHeader(nullptr)
39 {}
40 
41 void FairMQPixelMerger::Init() { OnData(fInputChannelName, &FairMQPixelMerger::MergeData); }
42 
43 bool FairMQPixelMerger::MergeData(FairMQParts& parts, int /*index*/)
44 {
45  bool printInfo = false;
46  int nofReceivedParts = 0; // if set to -1, the data seems to be duplicated
47 
48  fNofReceivedMessages++;
49  // dataDuplicationFlag = false;
50  TObject* tempObject;
51  TClonesArray* tempArrays[10];
52  int nofArrays = 0;
53  // LOG(debug) <<
54  // "******************************************************************************************************";
55  for (int ipart = 0; ipart < parts.Size(); ipart++) {
56  tempObject = nullptr;
57  Deserialize<RootSerializer>(*parts.At(ipart), tempObject);
58  if (strcmp(tempObject->GetName(), "EventHeader.") == 0) {
59  fEventHeader = (PixelEventHeader*)tempObject;
60  // LOG(debug) << "GOT PART [" << fEventHeader->GetRunId() << "][" << fEventHeader->GetMCEntryNumber() <<
61  // "][" << fEventHeader->GetPartNo() << "]";
62 
63  // setting how many parts were received...
64  fEvRIPair.first = fEventHeader->GetMCEntryNumber();
65  fEvRIPair.second = fEventHeader->GetRunId();
66  fEvRIPartTrio.first = fEvRIPair;
67  fEvRIPartTrio.second = fEventHeader->GetPartNo();
68 
69  if (fObjectMap.find(fEvRIPartTrio) != fObjectMap.end()) {
70  LOG(info) << "FairMQPixelMerger::Run(), shouldn't happen, already got objects for part "
71  << fEvRIPartTrio.second << ", event " << fEvRIPair.first << ", run " << fEvRIPair.second
72  << ". Skipping this message!!!";
73  nofReceivedParts = -1;
74  break; // break the for(ipart) loop, as nothing else is left to do
75  }
76 
77  auto it2 = fNofPartsPerEventMap.find(fEvRIPair);
78  if (it2 == fNofPartsPerEventMap.end()) {
79  // LOG(debug) << "FIRST PART OF event " << fEvRIPair.first;
80  fNofPartsPerEventMap[fEvRIPair] = 1;
81  nofReceivedParts = 1;
82  } else {
83  it2->second += 1;
84  nofReceivedParts = it2->second;
85  }
86  // LOG(debug) << " got " << nofReceivedParts << " parts of event " << fEvRIPair.first;
87  } else {
88  tempArrays[nofArrays] = (TClonesArray*)tempObject;
89  nofArrays++;
90  }
91  } // end the for(ipart) loop, should have received TCAs in tempArrays and PixelEventHeader
92  if (nofReceivedParts == -1)
93  return true;
94 
95  // not all parts are there yet, have to put them in buffer
96  if (nofReceivedParts != fNofParts) {
97  // LOG(debug) << "not all parts are yet here... adding to (size = " << fObjectMap.size() << ")";
98  // LOG(debug) << "+" << fEventHeader->GetName() << "[" << fEvRIPartTrio.first.second << "][" <<
99  // fEvRIPartTrio.first.first << "][" << fEvRIPartTrio.second << "]";
100  fObjectMap.insert(
101  pair<pair<pair<int, int>, int>, TObject*>(fEvRIPartTrio, static_cast<TObject*>(fEventHeader)));
102  for (int iarray = 0; iarray < nofArrays; iarray++) {
103  // LOG(debug) << "+" << tempArrays[iarray]->GetName() << "[" << fEvRIPartTrio.first.second << "][" <<
104  // fEvRIPartTrio.first.first << "][" << fEvRIPartTrio.second << "]";
105  fObjectMap.insert(
106  pair<pair<pair<int, int>, int>, TObject*>(fEvRIPartTrio, static_cast<TObject*>(tempArrays[iarray])));
107  }
108  // LOG(debug) << " now we have fObjectMap (size = " << fObjectMap.size() << ")";
109  if (printInfo)
110  LOG(info) << ">> [" << fEventHeader->GetRunId() << "][" << fEventHeader->GetMCEntryNumber() << "]["
111  << fEventHeader->GetPartNo() << "] Received: " << fNofReceivedMessages
112  << " // Buffered: " << fObjectMap.size() << " // Sent: " << fNofSentMessages << " <<";
113  } else { // got all the parts of the event, have to combine and send message, consisting of objects from
114  // tempArrays
115  int currentEventPart = fEventHeader->GetPartNo();
116  for (int iarray = 0; iarray < nofArrays; iarray++) {
117  // LOG(debug) << "BEFORE ADDING, TCA \"" << tempArrays[iarray]->GetName() << "\" has " <<
118  // tempArrays[iarray]->GetEntries() << " entries.";
119  TClonesArray* arrayToAdd;
120 
121  for (int ieventpart = 0; ieventpart < fNofParts; ieventpart++) {
122  if (ieventpart == currentEventPart)
123  continue;
124  fEvRIPartTrio.second = ieventpart;
125  fRet = fObjectMap.equal_range(fEvRIPartTrio);
126  for (auto& it = fRet.first; it != fRet.second; ++it) {
127  if (tempArrays[iarray]->GetName() == it->second->GetName()) {
128  arrayToAdd = (TClonesArray*)it->second;
129  tempArrays[iarray]->AbsorbObjects(arrayToAdd);
130  // LOG(debug) << "FOUND ONE!, TCA has now " << tempArrays[iarray]->GetEntries() << " entries.";
131  }
132  }
133  }
134  }
135  for (int ieventpart = 0; ieventpart < fNofParts; ieventpart++) {
136  if (ieventpart == currentEventPart)
137  continue;
138  fEvRIPartTrio.second = ieventpart;
139  fRet = fObjectMap.equal_range(fEvRIPartTrio);
140  fObjectMap.erase(fRet.first, fRet.second);
141  }
142 
143  FairMQMessagePtr messageTCA[10];
144  FairMQParts partsOut;
145 
146  FairMQMessagePtr messFEH(NewMessage());
147  Serialize<RootSerializer>(*messFEH, fEventHeader);
148  partsOut.AddPart(std::move(messFEH));
149  for (int iarray = 0; iarray < nofArrays; iarray++) {
150  messageTCA[iarray] = NewMessage();
151  Serialize<RootSerializer>(*messageTCA[iarray], tempArrays[iarray]);
152  partsOut.AddPart(std::move(messageTCA[iarray]));
153  }
154  Send(partsOut, fOutputChannelName);
155  fNofSentMessages++;
156  if (printInfo)
157  LOG(info) << ">> [" << fEventHeader->GetRunId() << "][" << fEventHeader->GetMCEntryNumber() << "]["
158  << fEventHeader->GetPartNo() << "] Received: " << fNofReceivedMessages
159  << " // Buffered: " << fObjectMap.size() << " // Sent: " << fNofSentMessages << " <<";
160  }
161 
162  return true;
163 }
164 
Int_t GetMCEntryNumber()
bool MergeData(FairMQParts &, int)