FairMQ  1.4.33
C++ Message Queuing Library and Framework
StateQueue.h
1 /********************************************************************************
2  * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQSTATEQUEUE_H_
10 #define FAIRMQSTATEQUEUE_H_
11 
12 #include <fairmq/States.h>
13 
14 #include <queue>
15 #include <mutex>
16 #include <chrono>
17 #include <utility> // pair
18 #include <condition_variable>
19 
20 namespace fair::mq
21 {
22 
23 class StateQueue
24 {
25  public:
26  StateQueue() {}
27  ~StateQueue() {}
28 
29  fair::mq::State WaitForNext()
30  {
31  std::unique_lock<std::mutex> lock(fMtx);
32  while (fStates.empty()) {
33  fCV.wait_for(lock, std::chrono::milliseconds(50));
34  }
35 
36  fair::mq::State state = fStates.front();
37 
38  if (state == fair::mq::State::Error) {
39  throw DeviceErrorState("Controlled device transitioned to error state.");
40  }
41 
42  fStates.pop();
43  return state;
44  }
45 
46  template<typename Rep, typename Period>
47  std::pair<bool, fair::mq::State> WaitForNext(std::chrono::duration<Rep, Period> const& duration)
48  {
49  std::unique_lock<std::mutex> lock(fMtx);
50  fCV.wait_for(lock, duration);
51 
52  if (fStates.empty()) {
53  return { false, fair::mq::State::Ok };
54  }
55 
56  fair::mq::State state = fStates.front();
57 
58  if (state == fair::mq::State::Error) {
59  throw DeviceErrorState("Controlled device transitioned to error state.");
60  }
61 
62  fStates.pop();
63  return { true, state };
64  }
65 
66  void WaitForState(fair::mq::State state) { while (WaitForNext() != state) {} }
67 
68  void Push(fair::mq::State state)
69  {
70  {
71  std::lock_guard<std::mutex> lock(fMtx);
72  fStates.push(state);
73  }
74  fCV.notify_all();
75  }
76 
77  void Clear()
78  {
79  std::lock_guard<std::mutex> lock(fMtx);
80  fStates = std::queue<fair::mq::State>();
81  }
82 
83  private:
84  std::queue<fair::mq::State> fStates;
85  std::mutex fMtx;
86  std::condition_variable fCV;
87 };
88 
89 } // namespace fair::mq
90 
91 #endif /* FAIRMQSTATEQUEUE_H_ */
fair::mq
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
fair::mq::DeviceErrorState
Definition: States.h:66

privacy