FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BaseMQFileSink.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 /*
10  * File: BaseMQFileSink.h
11  * Author: winckler
12  *
13  * Created on October 7, 2014, 6:06 PM
14  */
15 
16 #ifndef BASEMQFILESINK_H
17 #define BASEMQFILESINK_H
18 
19 #include <FairMQDevice.h>
20 #include <FairMQLogger.h>
21 
22 template<typename InputPolicy, typename OutputPolicy>
24  : public FairMQDevice
25  , public InputPolicy
26  , public OutputPolicy
27 {
28  public:
30  : InputPolicy()
31  , OutputPolicy()
32  , fInputChanName()
33  {}
34 
35  virtual ~BaseMQFileSink() {}
36 
37  template<typename... Args>
38  void InitInputData(Args&&... args)
39  {
40  InputPolicy::Create(std::forward<Args>(args)...);
41  }
42 
43  protected:
44  virtual void InitTask()
45  {
46  fInputChanName = fConfig->GetValue<std::string>("in-chan-name");
47  OutputPolicy::SetFileProperties(*fConfig);
48  OutputPolicy::InitOutputFile();
49  }
50 
51  virtual void Run()
52  {
53  int receivedMsg = 0;
54  while (!NewStatePending()) {
55  std::unique_ptr<FairMQMessage> msg(NewMessage());
56  if (Receive(msg, fInputChanName) > 0) {
57  FairMQDevice::Deserialize<typename InputPolicy::DeserializerType>(
58  *msg, InputPolicy::fInput); // get data from message.
59  OutputPolicy::Serialize(InputPolicy::fInput); // put data into output.
60  receivedMsg++;
61  }
62  }
63 
64  LOG(info) << "Received " << receivedMsg << " messages!";
65  }
66 
67  private:
68  std::string fInputChanName;
69 };
70 
71 #endif /* BASEMQFILESINK_H */
virtual void Run()
virtual void InitTask()
virtual ~BaseMQFileSink()
void InitInputData(Args &&...args)