CMS 3D CMS Logo

FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
5 
6 #include <iostream>
7 #include <vector>
8 #include <thread>
9 #include <mutex>
10 
11 
12 namespace evf{
13 
14  class FastMonitoringService;
15 
17  public:
18  // a copy of the Framework/EventProcessor states
21 
25  //supervisor thread and worker threads state
29  //combined with inWaitInput
34  //combined with inWaitChunk
40 
41  struct MonitorData
42  {
43  //fastpath global monitorables
51 
52  unsigned int varIndexThrougput_;
53 
54  //per stream
55  std::vector<unsigned int> microstateEncoded_;
56  std::vector<unsigned int> ministateEncoded_;
57  std::vector<jsoncollector::AtomicMonUInt*> processed_;
59  std::vector<unsigned int> threadMicrostateEncoded_;
60  std::vector<unsigned int> inputState_;
61 
62  //tracking luminosity of a stream
63  std::vector<unsigned int> streamLumi_;
64 
65  //N bins for histograms
66  unsigned int macrostateBins_;
67  unsigned int ministateBins_;
68  unsigned int microstateBins_;
69  unsigned int inputstateBins_;
70 
71  //unsigned int prescaleindex_; // ditto
72 
74 
75  fastMacrostateJ_ = FastMonitoringThread::sInit;
76  fastThroughputJ_ = 0;
77  fastAvgLeadTimeJ_ = 0;
78  fastFilesProcessedJ_ = 0;
79  fastLockWaitJ_ = 0;
80  fastLockCountJ_ = 0;
81  fastMacrostateJ_.setName("Macrostate");
82  fastThroughputJ_.setName("Throughput");
83  fastAvgLeadTimeJ_.setName("AverageLeadTime");
84  fastFilesProcessedJ_.setName("FilesProcessed");
85  fastLockWaitJ_.setName("LockWaitUs");
86  fastLockCountJ_.setName("LockCount");
87 
88  fastPathProcessedJ_ = 0;
89  fastPathProcessedJ_.setName("Processed");
90  }
91 
92  //to be called after fast monitor is constructed
93  void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
94  //tell FM to track these global variables(for fast and slow monitoring)
95  fm->registerGlobalMonitorable(&fastMacrostateJ_,true,&macrostateBins_);
96  fm->registerGlobalMonitorable(&fastThroughputJ_,false);
97  fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_,false);
98  fm->registerGlobalMonitorable(&fastFilesProcessedJ_,false);
99  fm->registerGlobalMonitorable(&fastLockWaitJ_,false);
100  fm->registerGlobalMonitorable(&fastLockCountJ_,false);
101 
102  for (unsigned int i=0;i<nStreams;i++) {
104  *p=0;
105  processed_.push_back(p);
106  streamLumi_.push_back(0);
107  }
108 
109  microstateEncoded_.resize(nStreams);
110  ministateEncoded_.resize(nStreams);
111  threadMicrostateEncoded_.resize(nThreads);
112  inputState_.resize(nStreams);
113  for (unsigned int j=0;j<inputState_.size();j++) inputState_[j]=0;
114 
115  //tell FM to track these int vectors
116  fm->registerStreamMonitorableUIntVec("Ministate", &ministateEncoded_,true,&ministateBins_);
117 
118  if (nThreads<=nStreams)//no overlapping in module execution per stream
119  fm->registerStreamMonitorableUIntVec("Microstate",&microstateEncoded_,true,&microstateBins_);
120  else
121  fm->registerStreamMonitorableUIntVec("Microstate",&threadMicrostateEncoded_,true,&microstateBins_);
122 
123  fm->registerStreamMonitorableUIntVecAtomic("Processed",&processed_,false,0);
124 
125  //input source state tracking (not stream, but other than first item in vector is set to Ignore state)
126  fm->registerStreamMonitorableUIntVec("Inputstate",&inputState_,true,&inputstateBins_);
127 
128  //global cumulative event counter is used for fast path
129  fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
130 
131  //provide vector with updated per stream lumis and let it finish initialization
132  fm->commit(&streamLumi_);
133  }
134  };
135 
136  //constructor
138  }
139 
140  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
141  std::string defGroup = "data";
142  jsonMonitor_.reset(new jsoncollector::FastMonitor(microStateDefPath,defGroup,false));
143  if (fastMicroStateDefPath.size())
144  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath,defGroup,false);
145  }
146 
148  assert(!m_thread);
149  m_thread = std::shared_ptr<std::thread>(new std::thread(fp,cp));
150  }
151  void stop(){
152  assert(m_thread);
153  m_stoprequest=true;
154  m_thread->join();
155  }
156 
157  private:
158 
159  std::atomic<bool> m_stoprequest;
160  std::shared_ptr<std::thread> m_thread;
163 
164  std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
165 
166  friend class FastMonitoringService;
167  };
168 } //end namespace evf
169 #endif
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:76
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
static boost::mutex mutex
Definition: LHEProxy.cc:11
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:96
std::shared_ptr< std::thread > m_thread
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:63
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::vector< unsigned int > threadMicrostateEncoded_
virtual void setName(std::string name)
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:84
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:110
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
std::vector< unsigned int > microstateEncoded_
std::atomic< bool > m_stoprequest
std::vector< jsoncollector::AtomicMonUInt * > processed_
std::vector< unsigned int > streamLumi_
std::vector< unsigned int > inputState_
std::vector< unsigned int > ministateEncoded_