FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQLmdSampler.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 /*
9  * File: FairMQLmdSampler.h
10  * Author: winckler
11  *
12  * Created on October 27, 2015, 2:07 PM
13  */
14 
15 #ifndef FAIRMQLMDSAMPLER_H
16 #define FAIRMQLMDSAMPLER_H
17 
18 extern "C"
19 {
20 #include "f_evt.h"
21 #include "s_bufhe_swap.h"
22 #include "s_filhe_swap.h"
23 }
24 
25 #include <FairMQDevice.h>
26 #include <FairMQLogger.h>
27 #include <FairMQMessage.h>
28 #include <boost/filesystem.hpp>
29 #include <map>
30 #include <string>
31 #include <tuple>
32 #include <vector>
33 
34 class FairMQLmdSampler : public FairMQDevice
35 {
36  public:
38  : fCurrentFile(0)
39  , fNEvent(0)
40  , fCurrentEvent(0)
41  , fFileNames()
42  , fInputChannel()
43  , fEvent(nullptr)
44  , fBuffer(nullptr)
45  , fEventData(nullptr)
46  , fSubEvent(nullptr)
47  , fInfoHeader(nullptr)
48  , fStop(false)
49  , fMsgCounter(0)
50  , fSubEventChanMap()
51  , fFilename()
52  , fType(0)
53  , fSubType(0)
54  , fProcId(0)
55  , fSubCrate(0)
56  , fControl(0)
57  , fChanName()
58  {}
59 
60  FairMQLmdSampler(const FairMQLmdSampler&) = delete;
62 
63  virtual ~FairMQLmdSampler() {}
64 
65  void AddSubEvtKey(short type,
66  short subType,
67  short procid,
68  short subCrate,
69  short control,
70  const std::string& channelName)
71  {
72  SubEvtKey key(type, subType, procid, subCrate, control);
73  if (fSubEventChanMap.count(key)) {
74  LOG(warn) << "FairMQLmdSampler : subevent header key '(" << type << "," << subType << "," << procid << ","
75  << subCrate << "," << control << ")' has already been defined with channel name '"
76  << fSubEventChanMap.at(key)
77  << "'. it will be overwritten with new channel name = " << channelName;
78  }
79  fSubEventChanMap[key] = channelName;
80  }
81 
82  void AddDir(const std::string& dir)
83  {
84  boost::filesystem::path directory = dir;
85 
86  if (boost::filesystem::exists(directory)) {
87  boost::filesystem::directory_iterator end_itr;
88 
89  for (boost::filesystem::directory_iterator itr(directory); itr != end_itr; ++itr) {
90  if (boost::filesystem::is_regular_file(itr->path())) {
91  std::string currentFile = itr->path().string();
92  AddFile(currentFile);
93  }
94  }
95  } else {
96  LOG(warn) << "FairMQLmdSampler: directory '" << directory.string() << "' not found";
97  }
98  }
99  void AddFile(const std::string& fileName)
100  {
101  boost::filesystem::path filepath = fileName;
102  if (boost::filesystem::exists(filepath)) {
103  fFileNames.push_back(fileName);
104  } else {
105  LOG(warn) << "FairMQLmdSampler: file '" << fileName << "' not found";
106  }
107  }
108 
109  protected:
110  void InitTask()
111  {
112  fFilename = fConfig->GetValue<std::string>("input-file-name");
113  fType = fConfig->GetValue<short>("lmd-type");
114  fSubType = fConfig->GetValue<short>("lmd-sub-type");
115  fProcId = fConfig->GetValue<short>("lmd-proc-id");
116  fSubCrate = fConfig->GetValue<short>("lmd-sub-crate");
117  fControl = fConfig->GetValue<short>("lmd-control");
118  fChanName = fConfig->GetValue<std::string>("lmd-chan-name");
119 
120  AddFile(fFilename);
121  // combination of sub-event header value = one special channel
122  // this channel MUST be defined in the json file for the MQ configuration
123  AddSubEvtKey(fType, fSubType, fProcId, fSubCrate, fControl, fChanName);
124 
125  if (fFileNames.empty()) {
126  throw std::runtime_error(std::string("FairMQLmdSampler::InitTask: No files provided"));
127  }
128 
129  std::string name = fFileNames.at(fCurrentFile);
130  if (!OpenNextFile(name)) {
131  throw std::runtime_error(std::string("FairMQLmdSampler::InitTask: cannot open file ") + name);
132  }
133 
134  fCurrentFile += 1;
135  // Init Counters
136  fNEvent = 0;
137  fCurrentEvent = 0;
138  }
139  void Run()
140  {
141  while (!NewStatePending()) //&& !fStop)
142  {
143  if (1 == ReadEvent()) {
144  break;
145  }
146  }
147  LOG(info) << "Sent " << fMsgCounter << " messages.";
148  }
149 
150  int ReadEvent()
151  {
152  void* evtptr = &fEvent;
153  void* buffptr = &fBuffer;
154 
155  // INTS4 f_evt_get_event(s_evt_channel*, INTS4**, INTS4**); // -> in f_evt.h
156  /*- GETEVT__SUCCESS=0 : success. */
157  /*- GETEVT__FRAGMENT=1: Event fragment found. */
158  /*- GETEVT__NOMORE=3 : No more events. */
159  /*- GETEVT__RDERR=6 : read server or file error */
160  /*- GETEVT__TIMEOUT=9 : when enabled by f_evt_timeout */
161  int status = f_evt_get_event(&fInputChannel, static_cast<INTS4**>(evtptr), static_cast<INTS4**>(buffptr));
162  // int fuEventCounter = fEvent->l_count;
163  // int fCurrentMbsEventNo = fuEventCounter;
164 
165  // LOG(debug) << "STATUS = " << status;
166  if (GETEVT__SUCCESS != status) // if f_evt_get_event not successfull close if nomore evt or look to another
167  // file and start again
168  {
169  // LOG(debug) << "FairMQLmdSampler::ReadEvent()";
170 
171  CHARS* sErrorString = NULL;
172  f_evt_error(status, sErrorString, 0);
173 
174  if (GETEVT__NOMORE == status) {
175  Close();
176  }
177 
178  // fCurrentFile incremented in InitTask once
179  if (fCurrentFile >= static_cast<int>(fFileNames.size())) {
180  fStop = true;
181  return 1;
182  }
183 
184  std::string name = fFileNames.at(fCurrentFile);
185  if (!OpenNextFile(name)) {
186  return 1;
187  } else {
188  fCurrentFile += 1;
189  return ReadEvent();
190  }
191  }
192 
193  // Store Start Times
194  // if (fCurrentEvent==0 )
195  // Unpack((int*)fBuffer, sizeof(s_bufhe), -4, -4, -4, -4, -4);
196  // Decode event header
197  // bool result = false;
198  /*bool result = */
199  // Unpack((int*)fEvent, sizeof(s_ve10_1), -2, -2, -2, -2, -2);
200 
201  int nrSubEvts = f_evt_get_subevent(fEvent, 0, NULL, NULL, NULL);
202  int sebuflength;
203  short setype;
204  short sesubtype;
205  short seprocid;
206  short sesubcrate;
207  short secontrol;
208 
209  // LOG(debug) << "FairMQLmdSampler::ReadEvent => Found " << nrSubEvts << " Sub-event ";
210  // if (fCurrentEvent%10000==0)
211  // cout << " -I- LMD_ANA: evt# " << fCurrentEvent << " n_subevt# " << nrSubEvts << " evt processed# " <<
212  // fNEvent << " : " << fEvent->l_count << endl;
213 
214  for (int i = 1; i <= nrSubEvts; i++) {
215  void* SubEvtptr = &fSubEvent;
216  void* EvtDataptr = &fEventData;
217  int nrlongwords;
218  status = f_evt_get_subevent(
219  fEvent, i, static_cast<int**>(SubEvtptr), static_cast<int**>(EvtDataptr), &nrlongwords);
220 
221  if (status) {
222  return 1;
223  }
224 
225  sebuflength = nrlongwords;
226  setype = fSubEvent->i_type;
227  sesubtype = fSubEvent->i_subtype;
228  seprocid = fSubEvent->i_procid;
229  sesubcrate = fSubEvent->h_subcrate;
230  secontrol = fSubEvent->h_control;
231 
232  // cout << setype << " " << sesubtype << " " << seprocid << " " << sesubcrate << " " << secontrol <<
233  // endl;
234 
235  // Data to send : fEventData
236  SubEvtKey key(setype, sesubtype, seprocid, sesubcrate, secontrol);
237 
238  if (!fSubEventChanMap.count(key)) {
239  // LOG(debug) << "FairMQLmdSampler::ReadEvent: sub-event key not registered";
240  } else {
241  // LOG(debug) << "array size = " << sebuflength;
242  // LOG(debug) << "fEventData = " << *fEventData;
243 
244  std::string chanName = fSubEventChanMap.at(key);
245  // LOG(debug) << "chanName = " << chanName;
246 
247  FairMQParts parts;
248 
249  // send header
250  // std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(fSubEvent, sizeof(fSubEvent),
251  // free_buffer, nullptr)); fChannels.at(chanName).at(0).SendPart(header);
252 
253  int* arraySize = new int(sebuflength);
254 
255  parts.AddPart(NewMessage(
256  arraySize,
257  sizeof(int),
258  [](void* /*data*/, void* hint) { delete static_cast<int*>(hint); },
259  arraySize));
260  parts.AddPart(NewMessage(
261  fEventData,
262  sebuflength,
263  [](void* /*data*/, void* /*hint*/) { /*LOG(debug) << "empty deleter";*/ },
264  nullptr));
265  Send(parts, chanName);
266  fMsgCounter++;
267  /*
268  if (Unpack(fEventData, sebuflength,
269  setype, sesubtype,
270  seprocid, sesubcrate, secontrol))
271  {
272  result = true;
273  }
274  */
275  }
276  }
277 
278  // Increment evt counters.
279  fNEvent++;
280  fCurrentEvent++;
281 
282  /*if (! result)
283  {
284  return 2;
285  }*/
286 
287  return 0;
288  }
289  bool OpenNextFile(const std::string& fileName)
290  {
291  int inputMode = GETEVT__FILE;
292  void* headptr = &fInfoHeader;
293  INTS4 status;
294 
295  LOG(info) << "File " << fileName << " will be opened.";
296 
297  status = f_evt_get_open(
298  inputMode, const_cast<char*>(fileName.c_str()), &fInputChannel, static_cast<char**>(headptr), 1, 1);
299 
300  if (status) {
301  LOG(error) << "File " << fileName << " opening failed.";
302  return false;
303  }
304 
305  LOG(info) << "File " << fileName << " opened.";
306 
307  // Decode File Header
308  // bool result = Unpack((int*)fInfoHeader, sizeof(s_filhe), -4, -4, -4, -4, -4);
309 
310  return true;
311  }
312 
313  void Close()
314  {
315  f_evt_get_close(&fInputChannel);
316  // Unpack((Int_t*)fBuffer, sizeof(s_bufhe), -4, -4, -4, -4, -4);
317  fCurrentEvent = 0;
318  }
319 
320  private:
321  int fCurrentFile;
322  int fNEvent;
323  int fCurrentEvent;
324  std::vector<std::string> fFileNames;
325  s_evt_channel fInputChannel;
326  s_ve10_1* fEvent;
327  s_bufhe* fBuffer;
328  int* fEventData;
329  s_ves10_1* fSubEvent;
330  s_filhe* fInfoHeader;
331  bool fStop;
332  int fMsgCounter;
333  typedef std::tuple<short, short, short, short, short> SubEvtKey;
334  std::map<SubEvtKey, std::string> fSubEventChanMap;
335 
336  std::string fFilename;
337  short fType;
338  short fSubType;
339  short fProcId;
340  short fSubCrate;
341  short fControl;
342  std::string fChanName;
343 };
344 
345 #endif /* !FAIRMQLMDSAMPLER_H */
#define GETEVT__SUCCESS
Definition: f_evt.h:130
INTS4 f_evt_get_subevent(s_ve10_1 *ps_ve10_1, INTS4 l_subevent, INTS4 **pl_se, INTS4 **pl_d, INTS4 *pl_lwords)
Definition: f_evt.c:228
INTS4 f_evt_get_close(s_evt_channel *ps_chan)
Definition: f_evt.c:1189
CHARS h_subcrate
Definition: s_ves10_1.h:35
INTS4 f_evt_get_open(INTS4 l_mode, CHARS *pc_server, s_evt_channel *ps_chan, CHARS **ps_info, INTS4 l_sample, INTS4 l_param)
Definition: f_evt.c:555
INTS2 i_procid
Definition: s_ves10_1.h:34
INTS4 f_evt_error(INTS4 l_error, CHARS *pc_dest, INTS4 l_out)
Definition: f_evt.c:1756
#define GETEVT__NOMORE
Definition: f_evt.h:133
void AddSubEvtKey(short type, short subType, short procid, short subCrate, short control, const std::string &channelName)
int INTS4
Definition: typedefs.h:21
FairMQLmdSampler operator=(const FairMQLmdSampler &)=delete
bool OpenNextFile(const std::string &fileName)
void AddFile(const std::string &fileName)
INTS2 i_type
Definition: s_ves10_1.h:32
virtual ~FairMQLmdSampler()
char CHARS
Definition: typedefs.h:15
INTS4 f_evt_get_event(s_evt_channel *ps_chan, INTS4 **ppl_buffer, INTS4 **ppl_goobuf)
Definition: f_evt.c:987
#define GETEVT__FILE
Definition: f_evt.h:119
INTS2 i_subtype
Definition: s_ves10_1.h:33
CHARS h_control
Definition: s_ves10_1.h:36
void AddDir(const std::string &dir)