16 #ifndef BASEMQFILESINK_H
17 #define BASEMQFILESINK_H
19 #include <FairMQDevice.h>
20 #include <FairMQLogger.h>
22 template<
typename InputPolicy,
typename OutputPolicy>
37 template<
typename... Args>
40 InputPolicy::Create(std::forward<Args>(args)...);
46 fInputChanName = fConfig->GetValue<std::string>(
"in-chan-name");
47 OutputPolicy::SetFileProperties(*fConfig);
48 OutputPolicy::InitOutputFile();
54 while (!NewStatePending()) {
55 std::unique_ptr<FairMQMessage> msg(NewMessage());
56 if (Receive(msg, fInputChanName) > 0) {
57 FairMQDevice::Deserialize<typename InputPolicy::DeserializerType>(
58 *msg, InputPolicy::fInput);
59 OutputPolicy::Serialize(InputPolicy::fInput);
64 LOG(info) <<
"Received " << receivedMsg <<
" messages!";
68 std::string fInputChanName;
virtual ~BaseMQFileSink()
void InitInputData(Args &&...args)