FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQUnpacker.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 /*
9  * File: FairMQUnpacker.h
10  * Author: winckler
11  *
12  * Created on October 27, 2015, 2:07 PM
13  */
14 
15 #ifndef FAIRMQUNPACKER_H
16 #define FAIRMQUNPACKER_H
17 
18 #include "RootSerializer.h"
19 
20 #include <FairMQDevice.h>
21 #include <map>
22 #include <stdexcept>
23 #include <string>
24 #include <tuple>
25 
26 template<typename UnpackerType, typename SerializationType = RootSerializer>
27 class FairMQUnpacker : public FairMQDevice
28 {
29  public:
32  , fUnpacker(nullptr)
34  , fType(0)
35  , fSubType(0)
36  , fProcId(0)
37  , fSubCrate(0)
38  , fControl(0)
39  , fInputChanName()
40  , fOutputChanName()
41  {}
42 
43  FairMQUnpacker(const FairMQUnpacker&) = delete;
44  FairMQUnpacker operator=(const FairMQUnpacker&) = delete;
45 
46  virtual ~FairMQUnpacker() { delete fUnpacker; }
47 
48  void AddSubEvtKey(short type,
49  short subType,
50  short procid,
51  short subCrate,
52  short control,
53  const std::string& channelName)
54  {
55  if (fSubEventChanMap.size() > 0) {
56  LOG(error) << "Only one input channel allowed for this device";
57  } else {
58  SubEvtKey key(type, subType, procid, subCrate, control);
59  fInputChannelName = channelName;
60 
62  LOG(warn) << "FairMQLmdSampler : subevent header key '(" << type << "," << subType << "," << procid
63  << "," << subCrate << "," << control << ")' has already been defined. "
64  << "It will be overwritten with new channel name = " << fInputChannelName;
65  }
67  }
68  }
69 
70  protected:
71  void InitTask()
72  {
73  fType = fConfig->GetValue<short>("lmd-type");
74  fSubType = fConfig->GetValue<short>("lmd-sub-type");
75  fProcId = fConfig->GetValue<short>("lmd-proc-id");
76  fSubCrate = fConfig->GetValue<short>("lmd-sub-crate");
77  fControl = fConfig->GetValue<short>("lmd-control");
78  fInputChanName = fConfig->GetValue<std::string>("lmd-chan-name");
79  fOutputChanName = fConfig->GetValue<std::string>("out-chan-name");
80 
81  // combination of sub-event header value = one special channel
82  // this channel MUST be defined in the json file for the MQ configuration
83  AddSubEvtKey(fType, fSubType, fProcId, fSubCrate, fControl, fInputChanName);
84 
85  // check if subevt map is configured
86  if (fInputChannelName.empty() || fSubEventChanMap.size() == 0) {
87  throw std::runtime_error(std::string("Sub-event map not configured."));
88  }
89 
90  // check if given channel exist
91  if (!fChannels.count(fInputChannelName)) {
92  throw std::runtime_error(std::string("MQ-channel name '") + fInputChannelName
93  + "' does not exist. Check the MQ-channel configuration");
94  }
95 
96  short setype;
97  short sesubtype;
98  short seprocid;
99  short sesubcrate;
100  short secontrol;
101  std::tie(setype, sesubtype, seprocid, sesubcrate, secontrol) = fSubEventChanMap.at(fInputChannelName);
102  fUnpacker = new UnpackerType(setype, sesubtype, seprocid, sesubcrate, secontrol);
103  // fUnpacker->Init(); // a priori not needed -> only required for Registering in FairRootManager
104  }
105 
106  void Run()
107  {
108  FairMQChannel& inputChannel = fChannels.at(fInputChannelName).at(0);
109 
110  while (!NewStatePending()) {
111  FairMQMessagePtr msgSize(NewMessage());
112  FairMQMessagePtr msg(NewMessage());
113 
114  if (inputChannel.Receive(msgSize) >= 0) {
115  if (inputChannel.Receive(msg) >= 0) {
116  int dataSize = *(static_cast<int*>(msgSize->GetData()));
117  int* subEvtPtr = static_cast<int*>(msg->GetData());
118 
119  // LOG(debug) << "array size = " << dataSize;
120  // if (dataSize > 0)
121  // {
122  // LOG(debug) << "first element in array = " << *subEvtPtr;
123  // }
124 
125  fUnpacker->DoUnpack(subEvtPtr, dataSize);
126  Serialize<SerializationType>(*msg, fUnpacker->GetOutputData());
127  Send(msg, fOutputChanName);
128  fUnpacker->Reset();
129  }
130  }
131  }
132  }
133 
134  typedef std::tuple<short, short, short, short, short> SubEvtKey;
135  std::map<std::string, SubEvtKey> fSubEventChanMap;
136 
137  UnpackerType* fUnpacker;
138  std::string fInputChannelName;
139 
140  short fType;
141  short fSubType;
142  short fProcId;
143  short fSubCrate;
144  short fControl;
145  std::string fInputChanName;
146  std::string fOutputChanName;
147 };
148 
149 #endif /* !FAIRMQUNPACKER_H */
virtual ~FairMQUnpacker()
std::string fInputChanName
std::string fInputChannelName
void AddSubEvtKey(short type, short subType, short procid, short subCrate, short control, const std::string &channelName)
std::map< std::string, SubEvtKey > fSubEventChanMap
std::string fOutputChanName
std::tuple< short, short, short, short, short > SubEvtKey
UnpackerType * fUnpacker
FairMQUnpacker operator=(const FairMQUnpacker &)=delete