FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FairMQProcessor.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQPROCESSOR_H_
16 #define FAIRMQPROCESSOR_H_
17 
18 #include "FairMQProcessorTask.h"
19 
20 #include <FairMQDevice.h>
21 #include <FairMQLogger.h>
22 
23 template<typename Task>
24 class FairMQProcessor : public FairMQDevice
25 {
26  public:
28  : fProcessorTask(new Task)
29  , fInChannelName("data1")
30  , fOutChannelName("data2")
31  , fReceivedMsgs(0)
32  , fSentMsgs(0)
33  {}
34 
36  FairMQProcessor(const FairMQProcessor&) = delete;
37  FairMQProcessor operator=(const FairMQProcessor&) = delete;
38 
39  virtual ~FairMQProcessor() {}
40 
41  protected:
42  virtual void InitTask()
43  {
44  std::string inChannelName = fConfig->GetValue<std::string>("in-channel");
45  std::string outChannelName = fConfig->GetValue<std::string>("out-channel");
46  // check if the returned value actually exists, for the compatibility with old devices.
47  if (inChannelName != "") {
48  fInChannelName = inChannelName;
49  }
50  if (outChannelName != "") {
51  fOutChannelName = outChannelName;
52  }
53 
54  fProcessorTask->InitTask();
55  OnData(fInChannelName, [this](FairMQMessagePtr& msg, int /*index*/) {
56  ++fReceivedMsgs;
57  fProcessorTask->SetPayload(msg);
58  fProcessorTask->Exec();
59  fProcessorTask->GetPayload(msg);
60 
61  Send(msg, fOutChannelName);
62  ++fSentMsgs;
63 
64  return true;
65  });
66  }
67 
68  virtual void PostRun() { LOG(info) << "Received " << fReceivedMsgs << " and sent " << fSentMsgs << " messages!"; }
69 
70  private:
71  std::unique_ptr<FairMQProcessorTask> fProcessorTask;
72  std::string fInChannelName;
73  std::string fOutChannelName;
74  int fReceivedMsgs;
75  int fSentMsgs;
76 };
77 
78 #endif /* FAIRMQPROCESSOR_H_ */
virtual void InitTask()
virtual ~FairMQProcessor()
FairMQProcessor operator=(const FairMQProcessor &)=delete
virtual void PostRun()