22 #include <FairMQLogger.h>
23 #include <TClonesArray.h>
33 , fInputChannelName(
"data-in")
34 , fOutputChannelName(
"data-out")
35 , fNofPartsPerEventMap()
40 , fNofReceivedMessages(0)
43 , fMCSplitEventHeader(nullptr)
48 fInputChannelName = fConfig->GetValue<std::string>(
"in-channel");
49 fOutputChannelName = fConfig->GetValue<std::string>(
"out-channel");
56 bool printInfo =
false;
57 int nofReceivedParts = 0;
58 int nofExpectedParts = 1;
60 fNofReceivedMessages++;
63 std::vector<TClonesArray*> tcaVector;
64 for (
int ipart = 0; ipart < parts.Size(); ++ipart) {
65 TObject* tempObject =
nullptr;
66 Deserialize<RootSerializer>(*parts.At(ipart), tempObject);
69 if (strcmp(tempObject->GetName(),
"MCEventHeader.") == 0) {
73 fEvRIPair.first = fMCSplitEventHeader->
GetEventID();
74 fEvRIPair.second = fMCSplitEventHeader->
GetRunID();
78 fRet = fObjectMap.equal_range(fEvRIPair);
79 for (
auto& it = fRet.first; it != fRet.second; ++it) {
82 LOG(fatal) <<
"got part starting at " << fEvCOPair.first <<
" again!!!";
85 auto it2 = fNofPartsPerEventMap.find(fEvRIPair);
86 if (it2 == fNofPartsPerEventMap.end()) {
87 fNofPartsPerEventMap[fEvRIPair] = 1;
91 nofReceivedParts = it2->second;
94 tcaVector.push_back(dynamic_cast<TClonesArray*>(tempObject));
99 if (nofReceivedParts != nofExpectedParts) {
103 for (
int iarray = 0; iarray < tcaVector.size(); ++iarray) {
104 LOG(debug) <<
"+ [" << fEvRIPair.second <<
"][" << fEvRIPair.first <<
"][" << fEvCOPair.first <<
"] "
105 << tcaVector[iarray]->GetName();
106 fEvCOPair.second = (
dynamic_cast<TObject*
>(tcaVector[iarray]));
107 fObjectMap.insert(std::pair<std::pair<int, int>, std::pair<int, TObject*>>(fEvRIPair, fEvCOPair));
111 LOG(info) <<
">> [" << fMCSplitEventHeader->
GetRunID() <<
"][" << fMCSplitEventHeader->
GetEventID() <<
"]["
112 << fMCSplitEventHeader->
GetChunkStart() <<
"] Received: " << fNofReceivedMessages
113 <<
" // Buffered: " << fObjectMap.size() <<
" // Sent: " << fNofSentMessages <<
" <<";
116 [[gnu::unused]]
int currentEventPart = fMCSplitEventHeader->
GetChunkStart();
117 fRet = fObjectMap.equal_range(fEvRIPair);
118 std::vector<int> trackShift;
119 LOG(debug) <<
"- [" << fEvRIPair.second <<
"][" << fEvRIPair.first <<
"][ALL]";
120 for (
int iarray = 0; iarray < tcaVector.size(); ++iarray) {
121 if (strcmp(tcaVector[iarray]->GetName(),
"MCTrack") != 0)
125 TClonesArray* arrayToAdd;
126 for (
auto& it = fRet.first; it != fRet.second; ++it) {
127 if (it->second.first == fMCSplitEventHeader->
GetChunkStart())
129 if (strcmp(tcaVector[iarray]->GetName(), it->second.second->GetName()) == 0) {
130 trackShift.push_back(tcaVector[iarray]->GetEntries());
131 arrayToAdd =
dynamic_cast<TClonesArray*
>(it->second.second);
132 for (
int iobj = 0; iobj < arrayToAdd->GetEntries(); ++iobj) {
138 tcaVector[iarray]->AbsorbObjects(arrayToAdd);
144 for (
int iarray = 0; iarray < tcaVector.size(); ++iarray) {
145 if (strcmp(tcaVector[iarray]->GetName(),
"MCTrack") == 0)
150 TClonesArray* arrayToAdd;
152 for (
auto& it = fRet.first; it != fRet.second; ++it) {
153 if (it->second.first == fMCSplitEventHeader->
GetChunkStart())
155 if (strcmp(tcaVector[iarray]->GetName(), it->second.second->GetName()) == 0) {
156 int objShift = trackShift[addedArray++];
159 arrayToAdd =
dynamic_cast<TClonesArray*
>(it->second.second);
160 for (
int iobj = 0; iobj < arrayToAdd->GetEntries(); ++iobj) {
165 tcaVector[iarray]->AbsorbObjects(arrayToAdd);
170 fObjectMap.erase(fRet.first, fRet.second);
172 FairMQParts partsOut;
177 FairMQMessagePtr messEH(NewMessage());
178 Serialize<RootSerializer>(*messEH, fMCSplitEventHeader);
179 partsOut.AddPart(std::move(messEH));
181 for (
int iarray = 0; iarray < tcaVector.size(); ++iarray) {
182 FairMQMessagePtr mess(NewMessage());
183 Serialize<RootSerializer>(*mess, tcaVector[iarray]);
184 partsOut.AddPart(std::move(mess));
187 Send(partsOut, fOutputChannelName);
190 LOG(info) <<
">> [" << fMCSplitEventHeader->
GetRunID() <<
"][" << fMCSplitEventHeader->
GetEventID() <<
"]["
191 << fMCSplitEventHeader->
GetChunkStart() <<
"] Received: " << fNofReceivedMessages
192 <<
" // Buffered: " << fObjectMap.size() <<
" // Sent: " << fNofSentMessages <<
" <<";
195 for (
auto it = fObjectMap.begin(); it != fObjectMap.end(); ++it) {
196 LOG(debug) <<
"= [" << it->first.second <<
"][" << it->first.first <<
"][" << it->second.first <<
"] "
197 << it->second.second->GetName();
Int_t GetMotherId() const
virtual ~FairMQChunkMerger()
void SetMotherId(Int_t id)
bool MergeData(FairMQParts &, int)
Int_t GetTrackID() const
event identifier
virtual void SetTrackID(Int_t id)