20 #include <FairMQLogger.h>
21 #include <TClonesArray.h>
28 : fInputChannelName(
"data-in")
29 , fOutputChannelName(
"data-out")
30 , fNofPartsPerEventMap()
35 , fNofReceivedMessages(0)
38 , fEventHeader(nullptr)
45 bool printInfo =
false;
46 int nofReceivedParts = 0;
48 fNofReceivedMessages++;
51 TClonesArray* tempArrays[10];
55 for (
int ipart = 0; ipart < parts.Size(); ipart++) {
57 Deserialize<RootSerializer>(*parts.At(ipart), tempObject);
58 if (strcmp(tempObject->GetName(),
"EventHeader.") == 0) {
65 fEvRIPair.second = fEventHeader->
GetRunId();
66 fEvRIPartTrio.first = fEvRIPair;
67 fEvRIPartTrio.second = fEventHeader->
GetPartNo();
69 if (fObjectMap.find(fEvRIPartTrio) != fObjectMap.end()) {
70 LOG(info) <<
"FairMQPixelMerger::Run(), shouldn't happen, already got objects for part "
71 << fEvRIPartTrio.second <<
", event " << fEvRIPair.first <<
", run " << fEvRIPair.second
72 <<
". Skipping this message!!!";
73 nofReceivedParts = -1;
77 auto it2 = fNofPartsPerEventMap.find(fEvRIPair);
78 if (it2 == fNofPartsPerEventMap.end()) {
80 fNofPartsPerEventMap[fEvRIPair] = 1;
84 nofReceivedParts = it2->second;
88 tempArrays[nofArrays] = (TClonesArray*)tempObject;
92 if (nofReceivedParts == -1)
96 if (nofReceivedParts != fNofParts) {
101 pair<pair<pair<int, int>,
int>, TObject*>(fEvRIPartTrio, static_cast<TObject*>(fEventHeader)));
102 for (
int iarray = 0; iarray < nofArrays; iarray++) {
106 pair<pair<pair<int, int>,
int>, TObject*>(fEvRIPartTrio, static_cast<TObject*>(tempArrays[iarray])));
111 << fEventHeader->
GetPartNo() <<
"] Received: " << fNofReceivedMessages
112 <<
" // Buffered: " << fObjectMap.size() <<
" // Sent: " << fNofSentMessages <<
" <<";
115 int currentEventPart = fEventHeader->
GetPartNo();
116 for (
int iarray = 0; iarray < nofArrays; iarray++) {
119 TClonesArray* arrayToAdd;
121 for (
int ieventpart = 0; ieventpart < fNofParts; ieventpart++) {
122 if (ieventpart == currentEventPart)
124 fEvRIPartTrio.second = ieventpart;
125 fRet = fObjectMap.equal_range(fEvRIPartTrio);
126 for (
auto& it = fRet.first; it != fRet.second; ++it) {
127 if (tempArrays[iarray]->GetName() == it->second->GetName()) {
128 arrayToAdd = (TClonesArray*)it->second;
129 tempArrays[iarray]->AbsorbObjects(arrayToAdd);
135 for (
int ieventpart = 0; ieventpart < fNofParts; ieventpart++) {
136 if (ieventpart == currentEventPart)
138 fEvRIPartTrio.second = ieventpart;
139 fRet = fObjectMap.equal_range(fEvRIPartTrio);
140 fObjectMap.erase(fRet.first, fRet.second);
143 FairMQMessagePtr messageTCA[10];
144 FairMQParts partsOut;
146 FairMQMessagePtr messFEH(NewMessage());
147 Serialize<RootSerializer>(*messFEH, fEventHeader);
148 partsOut.AddPart(std::move(messFEH));
149 for (
int iarray = 0; iarray < nofArrays; iarray++) {
150 messageTCA[iarray] = NewMessage();
151 Serialize<RootSerializer>(*messageTCA[iarray], tempArrays[iarray]);
152 partsOut.AddPart(std::move(messageTCA[iarray]));
154 Send(partsOut, fOutputChannelName);
158 << fEventHeader->
GetPartNo() <<
"] Received: " << fNofReceivedMessages
159 <<
" // Buffered: " << fObjectMap.size() <<
" // Sent: " << fNofSentMessages <<
" <<";
bool MergeData(FairMQParts &, int)
virtual ~FairMQPixelMerger()