CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
5 
6 #include <iostream>
7 #include <vector>
8 #include <thread>
9 #include <mutex>
10 
11 using namespace jsoncollector;
12 
13 namespace evf{
14 
16 
18  public:
19  // a copy of the Framework/EventProcessor states
20  enum Macrostate { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
21  sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid,MCOUNT};
22  struct MonitorData
23  {
24  //fastpath global monitorables
31 
32  unsigned int varIndexThrougput_;
33 
34  //per stream
35  std::vector<unsigned int> microstateEncoded_;
36  std::vector<unsigned int> ministateEncoded_;
37  std::vector<AtomicMonUInt*> processed_;
39  std::vector<unsigned int> threadMicrostateEncoded_;
40 
41  //tracking luminosity of a stream
42  std::vector<unsigned int> streamLumi_;
43 
44  //N bins for histograms
45  unsigned int macrostateBins_;
46  unsigned int ministateBins_;
47  unsigned int microstateBins_;
48 
49  //unsigned int prescaleindex_; // ditto
50 
52 
53  fastMacrostateJ_ = FastMonitoringThread::sInit;
54  fastThroughputJ_ = 0;
55  fastAvgLeadTimeJ_ = 0;
56  fastFilesProcessedJ_ = 0;
57  fastLockWaitJ_ = 0;
58  fastLockCountJ_ = 0;
59  fastMacrostateJ_.setName("Macrostate");
60  fastThroughputJ_.setName("Throughput");
61  fastAvgLeadTimeJ_.setName("AverageLeadTime");
62  fastFilesProcessedJ_.setName("FilesProcessed");
63  fastLockWaitJ_.setName("LockWaitUs");
64  fastLockCountJ_.setName("LockCount");
65 
66  fastPathProcessedJ_ = 0;
67  fastPathProcessedJ_.setName("Processed");
68  }
69 
70  //to be called after fast monitor is constructed
71  void registerVariables(FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
72  //tell FM to track these global variables(for fast and slow monitoring)
73  fm->registerGlobalMonitorable(&fastMacrostateJ_,true,&macrostateBins_);
74  fm->registerGlobalMonitorable(&fastThroughputJ_,false);
75  fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_,false);
76  fm->registerGlobalMonitorable(&fastFilesProcessedJ_,false);
77  fm->registerGlobalMonitorable(&fastLockWaitJ_,false);
78  fm->registerGlobalMonitorable(&fastLockCountJ_,false);
79 
80  for (unsigned int i=0;i<nStreams;i++) {
82  *p=0;
83  processed_.push_back(p);
84  streamLumi_.push_back(0);
85  }
86 
87  microstateEncoded_.resize(nStreams);
88  ministateEncoded_.resize(nStreams);
89  threadMicrostateEncoded_.resize(nThreads);
90 
91  //tell FM to track these int vectors
92  fm->registerStreamMonitorableUIntVec("Ministate", &ministateEncoded_,true,&ministateBins_);
93 
94  if (nThreads<=nStreams)//no overlapping in module execution per stream
95  fm->registerStreamMonitorableUIntVec("Microstate",&microstateEncoded_,true,&microstateBins_);
96  else
97  fm->registerStreamMonitorableUIntVec("Microstate",&threadMicrostateEncoded_,true,&microstateBins_);
98 
99  fm->registerStreamMonitorableUIntVecAtomic("Processed",&processed_,false,0);
100 
101  //global cumulative event counter is used for fast path
102  fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
103 
104  //provide vector with updated per stream lumis and let it finish initialization
105  fm->commit(&streamLumi_);
106  }
107  };
108 
109  //constructor
110  FastMonitoringThread() : m_stoprequest(false) {
111  }
112 
113  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
114  std::string defGroup = "data";
115  jsonMonitor_.reset(new FastMonitor(microStateDefPath,defGroup,false));
116  if (fastMicroStateDefPath.size())
117  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath,defGroup,false);
118  }
119 
121  assert(!m_thread);
122  m_thread = boost::shared_ptr<std::thread>(new std::thread(fp,cp));
123  }
124  void stop(){
125  assert(m_thread);
126  m_stoprequest=true;
127  m_thread->join();
128  }
129 
130  private:
131 
132  std::atomic<bool> m_stoprequest;
133  boost::shared_ptr<std::thread> m_thread;
136 
137  std::unique_ptr<FastMonitor> jsonMonitor_;
138 
139  friend class FastMonitoringService;
140  };
141 } //end namespace evf
142 #endif
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:75
int i
Definition: DBlmapReader.cc:9
std::vector< AtomicMonUInt * > processed_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
static boost::mutex mutex
Definition: LHEProxy.cc:11
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:95
assert(m_qm.get())
boost::shared_ptr< std::thread > m_thread
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:62
std::vector< unsigned int > threadMicrostateEncoded_
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:83
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:109
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
std::vector< unsigned int > microstateEncoded_
void registerVariables(FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::atomic< bool > m_stoprequest
std::unique_ptr< FastMonitor > jsonMonitor_
volatile std::atomic< bool > shutdown_flag false
std::vector< unsigned int > streamLumi_
std::vector< unsigned int > ministateEncoded_