CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
5 
6 #include <iostream>
7 #include <vector>
8 #include <thread>
9 #include <mutex>
10 
11 using namespace jsoncollector;
12 
13 namespace evf{
14 
16 
18  public:
19  // a copy of the Framework/EventProcessor states
20  enum Macrostate { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
21  sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid,MCOUNT};
22  struct MonitorData
23  {
24  //fastpath global monitorables
29 
30  unsigned int varIndexThrougput_;
31 
32  //per stream
33  std::vector<unsigned int> microstateEncoded_;
34  std::vector<unsigned int> ministateEncoded_;
35  std::vector<AtomicMonUInt*> processed_;
37  std::vector<unsigned int> threadMicrostateEncoded_;
38 
39  //tracking luminosity of a stream
40  std::vector<unsigned int> streamLumi_;
41 
42  //N bins for histograms
43  unsigned int macrostateBins_;
44  unsigned int ministateBins_;
45  unsigned int microstateBins_;
46 
47  //unsigned int prescaleindex_; // ditto
48 
50 
51  fastMacrostateJ_ = FastMonitoringThread::sInit;
52  fastThroughputJ_ = 0;
53  fastAvgLeadTimeJ_ = 0;
54  fastFilesProcessedJ_ = 0;
55  fastMacrostateJ_.setName("Macrostate");
56  fastThroughputJ_.setName("Throughput");
57  fastAvgLeadTimeJ_.setName("AverageLeadTime");
58  fastFilesProcessedJ_.setName("FilesProcessed");
59 
60  fastPathProcessedJ_ = 0;
61  fastPathProcessedJ_.setName("Processed");
62  }
63 
64  //to be called after fast monitor is constructed
65  void registerVariables(FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
66  //tell FM to track these global variables(for fast and slow monitoring)
67  fm->registerGlobalMonitorable(&fastMacrostateJ_,true,&macrostateBins_);
68  fm->registerGlobalMonitorable(&fastThroughputJ_,false);
69  fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_,false);
70  fm->registerGlobalMonitorable(&fastFilesProcessedJ_,false);
71 
72  for (unsigned int i=0;i<nStreams;i++) {
74  *p=0;
75  processed_.push_back(p);
76  streamLumi_.push_back(0);
77  }
78 
79  microstateEncoded_.resize(nStreams);
80  ministateEncoded_.resize(nStreams);
81  threadMicrostateEncoded_.resize(nThreads);
82 
83  //tell FM to track these int vectors
84  fm->registerStreamMonitorableUIntVec("Ministate", &ministateEncoded_,true,&ministateBins_);
85 
86  if (nThreads<=nStreams)//no overlapping in module execution per stream
87  fm->registerStreamMonitorableUIntVec("Microstate",&microstateEncoded_,true,&microstateBins_);
88  else
89  fm->registerStreamMonitorableUIntVec("Microstate",&threadMicrostateEncoded_,true,&microstateBins_);
90 
91  fm->registerStreamMonitorableUIntVecAtomic("Processed",&processed_,false,0);
92 
93  //global cumulative event counter is used for fast path
94  fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
95 
96  //provide vector with updated per stream lumis and let it finish initialization
97  fm->commit(&streamLumi_);
98  }
99  };
100 
101  //constructor
102  FastMonitoringThread() : m_stoprequest(false) {
103  }
104 
105  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
106  std::string defGroup = "data";
107  jsonMonitor_.reset(new FastMonitor(microStateDefPath,defGroup,false));
108  if (fastMicroStateDefPath.size())
109  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath,defGroup,false);
110  }
111 
113  assert(!m_thread);
114  m_thread = boost::shared_ptr<std::thread>(new std::thread(fp,cp));
115  }
116  void stop(){
117  assert(m_thread);
118  m_stoprequest=true;
119  m_thread->join();
120  }
121 
122  private:
123 
124  std::atomic<bool> m_stoprequest;
125  boost::shared_ptr<std::thread> m_thread;
128 
129  std::unique_ptr<FastMonitor> jsonMonitor_;
130 
131  friend class FastMonitoringService;
132  };
133 } //end namespace evf
134 #endif
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:75
int i
Definition: DBlmapReader.cc:9
std::vector< AtomicMonUInt * > processed_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
static boost::mutex mutex
Definition: LHEProxy.cc:11
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:95
boost::shared_ptr< std::thread > m_thread
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:62
std::vector< unsigned int > threadMicrostateEncoded_
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:83
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:109
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
std::vector< unsigned int > microstateEncoded_
void registerVariables(FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::atomic< bool > m_stoprequest
std::unique_ptr< FastMonitor > jsonMonitor_
volatile std::atomic< bool > shutdown_flag false
std::vector< unsigned int > streamLumi_
std::vector< unsigned int > ministateEncoded_