FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQPrimaryGeneratorDevice.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
16 
17 #include "FairMCEventHeader.h"
18 #include "FairMCSplitEventHeader.h"
19 #include "FairPrimaryGenerator.h"
20 #include "FairStack.h"
21 #include "RootSerializer.h"
22 
23 #include <FairMQLogger.h>
24 #include <FairMQMessage.h>
25 #include <Rtypes.h>
26 #include <TClonesArray.h>
27 #include <utility> // move
28 
29 using namespace std;
30 
32  : fGeneratorChannelName("primariesChannel")
33  , fAckChannelName("")
34  , fRunConditional(true)
35  , fPrimaryGenerator(nullptr)
36  , fMCEventHeader(nullptr)
37  , fStack(nullptr)
38  , fNofEvents(10)
39  , fEventCounter(0)
40  , fChunkSize(0)
41  , fChunkPointer(0)
42  , fAckListener()
43 {}
44 
46 {
47  fAckChannelName = fConfig->GetValue<std::string>("ack-channel");
48 
49  fStack = new FairStack();
50  fMCEventHeader = new FairMCEventHeader();
51  fPrimaryGenerator->SetEvent(fMCEventHeader);
52  fPrimaryGenerator->Init();
53 
54  if (!fRunConditional)
55  OnData(fGeneratorChannelName, &FairMQPrimaryGeneratorDevice::Reply);
56  if (fChunkSize < 0)
57  fChunkSize = 0;
58 }
59 
61 {
62  if (fAckChannelName != "") {
63  fAckListener = thread(&FairMQPrimaryGeneratorDevice::ListenForAcks, this);
64  }
65 }
66 
68 {
69  if (!fRunConditional)
70  return false;
71  return GenerateAndSendData();
72 }
73 
74 bool FairMQPrimaryGeneratorDevice::Reply([[gnu::unused]] FairMQMessagePtr& mPtr, [[gnu::unused]] int /*index*/)
75 {
76  return GenerateAndSendData();
77 }
78 
79 bool FairMQPrimaryGeneratorDevice::GenerateAndSendData()
80 {
81  if (fChunkPointer == 0) {
82  // LOG(INFO) << "Reseting fStack and generating new event!!!";
83  fStack->Reset();
84  fPrimaryGenerator->GenerateEvent(fStack);
85  ++fEventCounter;
86  }
87  if (fEventCounter > fNofEvents)
88  return false;
89 
90  FairMQParts parts;
91 
92  // even if sending in chunks is set, send all of the primaries anyway, the transporter takes care of transporting
93  // needed primaries create FairMCEventHeader, misuse not-yet-set fRunID to store begin
94  TClonesArray* prims = fStack->GetListOfParticles();
95 
97  0, fEventCounter, 1, 0); // RunId will be provided in the Transport from ParameterServer
98  meh->SetNPrim(prims->GetEntries());
99  if (fChunkSize != 0) {
100  meh->SetNofChunks((UInt_t)(prims->GetEntries() / fChunkSize));
101  meh->SetChunkStart(fChunkPointer);
102  meh->SetNPrim(fChunkPointer + fChunkSize);
103  if (fChunkPointer + fChunkSize > prims->GetEntries())
104  meh->SetNPrim(prims->GetEntries() - fChunkPointer);
105  }
106 
107  FairMQMessagePtr messEH(NewMessage());
108  Serialize<RootSerializer>(*messEH, meh);
109  parts.AddPart(std::move(messEH));
110 
111  FairMQMessagePtr mess(NewMessage());
112  Serialize<RootSerializer>(*mess, prims);
113  parts.AddPart(std::move(mess));
114 
115  // LOG(INFO) << "sending event " << fEventCounter << ", chunk starts at " << fChunkPointer;
116  if (Send(parts, fGeneratorChannelName) > 0) {}
117 
118  int numberofparticles = (int)prims->GetEntries();
119 
120  if (fChunkSize != 0) { // should send events in chunks with maximum size of fChunkSize
121  // the whole work should be done after
122  fChunkPointer += fChunkSize;
123  if (fChunkPointer >= numberofparticles) { // it means that already sent all primaries from this event
124  fChunkPointer = 0; // this will cause the reset of the stack and generating new event
125  }
126  }
127 
128  return true;
129 }
130 
132 {
133  if (fAckChannelName != "") {
134  fAckListener.join();
135  }
136 }
137 
139 {
140  if (fAckChannelName != "") {
141  Long64_t numAcks = 0;
142  do {
143  FairMQMessagePtr ack(NewMessage());
144  if (Receive(ack, fAckChannelName) >= 0) {
145  LOG(info) << "RECEIVED ACK!";
146  numAcks++;
147  }
148  } while (numAcks < fNofEvents);
149 
150  LOG(info) << "Acknowledged " << numAcks << " messages.";
151  }
152 }
153 
155 
bool Reply(FairMQMessagePtr &, int)
virtual void Reset()
Definition: FairStack.cxx:309
TClonesArray * GetListOfParticles()
Definition: FairStack.h:200
void SetNPrim(Int_t nPrim)
virtual Bool_t GenerateEvent(FairGenericStack *pStack)
void SetEvent(FairMCEventHeader *event)