CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
5 
6 #include <iostream>
7 #include <vector>
8 #include <thread>
9 #include <mutex>
10 
11 
12 namespace evf{
13 
14  class FastMonitoringService;
15 
17  public:
18  // a copy of the Framework/EventProcessor states
21  struct MonitorData
22  {
23  //fastpath global monitorables
30 
31  unsigned int varIndexThrougput_;
32 
33  //per stream
34  std::vector<unsigned int> microstateEncoded_;
35  std::vector<unsigned int> ministateEncoded_;
36  std::vector<jsoncollector::AtomicMonUInt*> processed_;
38  std::vector<unsigned int> threadMicrostateEncoded_;
39 
40  //tracking luminosity of a stream
41  std::vector<unsigned int> streamLumi_;
42 
43  //N bins for histograms
44  unsigned int macrostateBins_;
45  unsigned int ministateBins_;
46  unsigned int microstateBins_;
47 
48  //unsigned int prescaleindex_; // ditto
49 
51 
53  fastThroughputJ_ = 0;
56  fastLockWaitJ_ = 0;
57  fastLockCountJ_ = 0;
58  fastMacrostateJ_.setName("Macrostate");
59  fastThroughputJ_.setName("Throughput");
60  fastAvgLeadTimeJ_.setName("AverageLeadTime");
61  fastFilesProcessedJ_.setName("FilesProcessed");
62  fastLockWaitJ_.setName("LockWaitUs");
63  fastLockCountJ_.setName("LockCount");
64 
66  fastPathProcessedJ_.setName("Processed");
67  }
68 
69  //to be called after fast monitor is constructed
70  void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
71  //tell FM to track these global variables(for fast and slow monitoring)
78 
79  for (unsigned int i=0;i<nStreams;i++) {
81  *p=0;
82  processed_.push_back(p);
83  streamLumi_.push_back(0);
84  }
85 
86  microstateEncoded_.resize(nStreams);
87  ministateEncoded_.resize(nStreams);
88  threadMicrostateEncoded_.resize(nThreads);
89 
90  //tell FM to track these int vectors
92 
93  if (nThreads<=nStreams)//no overlapping in module execution per stream
95  else
97 
98  fm->registerStreamMonitorableUIntVecAtomic("Processed",&processed_,false,0);
99 
100  //global cumulative event counter is used for fast path
102 
103  //provide vector with updated per stream lumis and let it finish initialization
104  fm->commit(&streamLumi_);
105  }
106  };
107 
108  //constructor
110  }
111 
112  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
113  std::string defGroup = "data";
114  jsonMonitor_.reset(new jsoncollector::FastMonitor(microStateDefPath,defGroup,false));
115  if (fastMicroStateDefPath.size())
116  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath,defGroup,false);
117  }
118 
120  assert(!m_thread);
121  m_thread = boost::shared_ptr<std::thread>(new std::thread(fp,cp));
122  }
123  void stop(){
124  assert(m_thread);
125  m_stoprequest=true;
126  m_thread->join();
127  }
128 
129  private:
130 
131  std::atomic<bool> m_stoprequest;
132  boost::shared_ptr<std::thread> m_thread;
135 
136  std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
137 
138  friend class FastMonitoringService;
139  };
140 } //end namespace evf
141 #endif
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:75
int i
Definition: DBlmapReader.cc:9
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
static boost::mutex mutex
Definition: LHEProxy.cc:11
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:95
assert(m_qm.get())
boost::shared_ptr< std::thread > m_thread
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:62
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::vector< unsigned int > threadMicrostateEncoded_
virtual void setName(std::string name)
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:83
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:109
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
std::vector< unsigned int > microstateEncoded_
std::atomic< bool > m_stoprequest
std::vector< jsoncollector::AtomicMonUInt * > processed_
volatile std::atomic< bool > shutdown_flag false
std::vector< unsigned int > streamLumi_
std::vector< unsigned int > ministateEncoded_