FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQChunkMerger.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "FairMQChunkMerger.h"
16 
17 #include "FairMCPoint.h"
18 #include "FairMCSplitEventHeader.h"
19 #include "FairMCTrack.h"
20 #include "RootSerializer.h"
21 
22 #include <FairMQLogger.h>
23 #include <TClonesArray.h>
24 #include <TObject.h>
25 #include <algorithm>
26 #include <cstring>
27 #include <vector>
28 
29 using namespace std;
30 
32  : FairMQDevice()
33  , fInputChannelName("data-in")
34  , fOutputChannelName("data-out")
35  , fNofPartsPerEventMap()
36  , fObjectMap()
37  , fEvRIPair()
38  , fEvCOPair()
39  , fRet()
40  , fNofReceivedMessages(0)
41  , fNofSentMessages(0)
42  , fNofParts(3)
43  , fMCSplitEventHeader(nullptr)
44 {}
45 
47 {
48  fInputChannelName = fConfig->GetValue<std::string>("in-channel");
49  fOutputChannelName = fConfig->GetValue<std::string>("out-channel");
50 
51  OnData(fInputChannelName, &FairMQChunkMerger::MergeData);
52 }
53 
54 bool FairMQChunkMerger::MergeData(FairMQParts& parts, int /*index*/)
55 {
56  bool printInfo = false;
57  int nofReceivedParts = 0; // if set to -1, the data seems to be duplicated
58  int nofExpectedParts = 1;
59 
60  fNofReceivedMessages++;
61  // dataDuplicationFlag = false;
62 
63  std::vector<TClonesArray*> tcaVector;
64  for (int ipart = 0; ipart < parts.Size(); ++ipart) {
65  TObject* tempObject = nullptr;
66  Deserialize<RootSerializer>(*parts.At(ipart), tempObject);
67 
68  // LOG(INFO) << "Got object " << tempObject->ClassName() << " named " << tempObject->GetName();
69  if (strcmp(tempObject->GetName(), "MCEventHeader.") == 0) {
70  fMCSplitEventHeader = dynamic_cast<FairMCSplitEventHeader*>(tempObject);
71  // LOG(info) << "GOT PART [" << fMCSplitEventHeader->GetRunID() << "][" <<
72  // fMCSplitEventHeader->GetEventID() << "][" << fMCSplitEventHeader->GetChunkStart() << "]";
73  fEvRIPair.first = fMCSplitEventHeader->GetEventID();
74  fEvRIPair.second = fMCSplitEventHeader->GetRunID();
75  fEvCOPair.first = fMCSplitEventHeader->GetChunkStart();
76  nofExpectedParts = fMCSplitEventHeader->GetNofChunks();
77 
78  fRet = fObjectMap.equal_range(fEvRIPair);
79  for (auto& it = fRet.first; it != fRet.second; ++it) {
80  // LOG(info) << "comparing " << it->second.first << " and " << fEvCOPair.first;
81  if (it->second.first == fMCSplitEventHeader->GetChunkStart())
82  LOG(fatal) << "got part starting at " << fEvCOPair.first << " again!!!";
83  }
84 
85  auto it2 = fNofPartsPerEventMap.find(fEvRIPair);
86  if (it2 == fNofPartsPerEventMap.end()) {
87  fNofPartsPerEventMap[fEvRIPair] = 1;
88  nofReceivedParts = 1;
89  } else {
90  it2->second += 1;
91  nofReceivedParts = it2->second;
92  }
93  } else {
94  tcaVector.push_back(dynamic_cast<TClonesArray*>(tempObject));
95  }
96  }
97 
98  // not all parts are there yet, have to put them in buffer
99  if (nofReceivedParts != nofExpectedParts) {
100  // LOG(info) << "not all parts are yet here (got " << nofReceivedParts << " out of " << nofExpectedParts <<
101  // ")... adding to (size = " << fObjectMap.size() << ")"; LOG(info) << "+" << fMCSplitEventHeader->GetName() <<
102  // "[" << fEvRIPair.second << "][" << fEvRIPair.first << "][" << fEvCOPair.first << "]";
103  for (int iarray = 0; iarray < tcaVector.size(); ++iarray) {
104  LOG(debug) << "+ [" << fEvRIPair.second << "][" << fEvRIPair.first << "][" << fEvCOPair.first << "] "
105  << tcaVector[iarray]->GetName();
106  fEvCOPair.second = (dynamic_cast<TObject*>(tcaVector[iarray]));
107  fObjectMap.insert(std::pair<std::pair<int, int>, std::pair<int, TObject*>>(fEvRIPair, fEvCOPair));
108  }
109  // LOG(info) << " now we have fObjectMap (size = " << fObjectMap.size() << ")";
110  if (printInfo)
111  LOG(info) << ">> [" << fMCSplitEventHeader->GetRunID() << "][" << fMCSplitEventHeader->GetEventID() << "]["
112  << fMCSplitEventHeader->GetChunkStart() << "] Received: " << fNofReceivedMessages
113  << " // Buffered: " << fObjectMap.size() << " // Sent: " << fNofSentMessages << " <<";
114  } else {
115  // got all the parts of the event, have to combine and send message, consisting of objects from fObjectMap
116  [[gnu::unused]] int currentEventPart = fMCSplitEventHeader->GetChunkStart();
117  fRet = fObjectMap.equal_range(fEvRIPair);
118  std::vector<int> trackShift;
119  LOG(debug) << "- [" << fEvRIPair.second << "][" << fEvRIPair.first << "][ALL]";
120  for (int iarray = 0; iarray < tcaVector.size(); ++iarray) {
121  if (strcmp(tcaVector[iarray]->GetName(), "MCTrack") != 0)
122  continue; // want only MCTrack array to renumber tracks and get track shifts...
123  // LOG(info) << "BEFORE ADDING, TCA \"" << tcaVector[iarray]->GetName() << "\" has " <<
124  // tcaVector[iarray]->GetEntries() << " entries.";
125  TClonesArray* arrayToAdd;
126  for (auto& it = fRet.first; it != fRet.second; ++it) {
127  if (it->second.first == fMCSplitEventHeader->GetChunkStart())
128  continue;
129  if (strcmp(tcaVector[iarray]->GetName(), it->second.second->GetName()) == 0) {
130  trackShift.push_back(tcaVector[iarray]->GetEntries());
131  arrayToAdd = dynamic_cast<TClonesArray*>(it->second.second);
132  for (int iobj = 0; iobj < arrayToAdd->GetEntries(); ++iobj) {
133  FairMCTrack* temp = dynamic_cast<FairMCTrack*>(arrayToAdd->At(iobj));
134  if (temp->GetMotherId() >= 0) {
135  temp->SetMotherId(temp->GetMotherId() + trackShift.back());
136  }
137  }
138  tcaVector[iarray]->AbsorbObjects(arrayToAdd);
139  // LOG(info) << "FOUND ONE!, TCA has now " << tcaVector[iarray]->GetEntries() << " entries.";
140  }
141  }
142  }
143 
144  for (int iarray = 0; iarray < tcaVector.size(); ++iarray) {
145  if (strcmp(tcaVector[iarray]->GetName(), "MCTrack") == 0)
146  continue; // MCTrack already done, renumber all _other_ arrays...
147  // LOG(info) << "BEFORE ADDING, TCA \"" << tcaVector[iarray]->GetName() << "\" has " <<
148  // tcaVector[iarray]->GetEntries() << " entries.";
149  int addedArray = 0;
150  TClonesArray* arrayToAdd;
151 
152  for (auto& it = fRet.first; it != fRet.second; ++it) {
153  if (it->second.first == fMCSplitEventHeader->GetChunkStart())
154  continue;
155  if (strcmp(tcaVector[iarray]->GetName(), it->second.second->GetName()) == 0) {
156  int objShift = trackShift[addedArray++];
157  // LOG(INFO) << "trying to add " << tcaVector[iarray]->GetName() << " and " <<
158  // it->second.second->GetName() << "(shift = " << objShift << ")";
159  arrayToAdd = dynamic_cast<TClonesArray*>(it->second.second);
160  for (int iobj = 0; iobj < arrayToAdd->GetEntries(); ++iobj) {
161  FairMCPoint* temp = dynamic_cast<FairMCPoint*>(arrayToAdd->At(iobj));
162  temp->SetTrackID(temp->GetTrackID() + objShift);
163  }
164  }
165  tcaVector[iarray]->AbsorbObjects(arrayToAdd);
166  // LOG(info) << "FOUND ONE!, TCA has now " << tcaVector[iarray]->GetEntries() << "
167  // entries.";
168  }
169  }
170  fObjectMap.erase(fRet.first, fRet.second);
171 
172  FairMQParts partsOut;
173 
174  fMCSplitEventHeader->SetNofChunks(1);
175  fMCSplitEventHeader->SetChunkStart(0);
176 
177  FairMQMessagePtr messEH(NewMessage());
178  Serialize<RootSerializer>(*messEH, fMCSplitEventHeader);
179  partsOut.AddPart(std::move(messEH));
180 
181  for (int iarray = 0; iarray < tcaVector.size(); ++iarray) {
182  FairMQMessagePtr mess(NewMessage());
183  Serialize<RootSerializer>(*mess, tcaVector[iarray]);
184  partsOut.AddPart(std::move(mess));
185  }
186  // LOG(info) << "created output message with " << partsOut.Size() << " parts.";
187  Send(partsOut, fOutputChannelName);
188  fNofSentMessages++;
189  if (printInfo)
190  LOG(info) << ">> [" << fMCSplitEventHeader->GetRunID() << "][" << fMCSplitEventHeader->GetEventID() << "]["
191  << fMCSplitEventHeader->GetChunkStart() << "] Received: " << fNofReceivedMessages
192  << " // Buffered: " << fObjectMap.size() << " // Sent: " << fNofSentMessages << " <<";
193  }
194 
195  for (auto it = fObjectMap.begin(); it != fObjectMap.end(); ++it) {
196  LOG(debug) << "= [" << it->first.second << "][" << it->first.first << "][" << it->second.first << "] "
197  << it->second.second->GetName();
198  }
199  return true;
200 }
201 
UInt_t GetRunID() const
Int_t GetMotherId() const
Definition: FairMCTrack.h:74
void SetMotherId(Int_t id)
Definition: FairMCTrack.h:95
bool MergeData(FairMQParts &, int)
Int_t GetTrackID() const
event identifier
Definition: FairMCPoint.h:58
virtual void SetTrackID(Int_t id)
Definition: FairMCPoint.h:74
UInt_t GetEventID() const
run identifier