CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
4 
13 
14 #include "boost/filesystem.hpp"
15 
18 
19 #include <string>
20 #include <vector>
21 #include <map>
22 #include <queue>
23 #include <sstream>
24 #include <unordered_map>
25 
26 /*Description
27  this is an evolution of the MicroStateService intended to be run standalone in cmsRun or similar
28  As such, it has to independently create a monitoring thread and run it in each forked process, which needs
29  to be arranged following the standard CMSSW procedure.
30  We try to use boost threads for uniformity with the rest of the framework, even if they suck a bit.
31  A legenda for use by the monitoring process in the DAQ needs to be generated as soon as convenient - since
32  no access to the EventProcessor is granted, this needs to wait until after beginJob is executed.
33  At the same time, we try to spare time in the monitoring by avoiding even a single string lookup and using the
34  moduledesc pointer to key into the map instead.
35  As a bonus, we can now add to the monitored status the current path (and possibly associate modules to a path...)
36  this intermediate info will be called "ministate" :D
37  The general counters and status variables (event number, number of processed events, number of passed and stored
38  events, luminosity section etc. are also monitored here.
39 
40  NOTA BENE!!! with respect to the MicroStateService, no string or string pointers are used for the microstates.
41  NOTA BENE!!! the state of the edm::EventProcessor cannot be monitored directly from within a service, so a
42  different solution must be identified for that (especially one needs to identify error states).
43  NOTA BENE!!! to keep backward compatibility with the MicroStateService, a common base class with abstract interface,
44  exposing the single method to be used by all other packages (except EventFilter/Processor,
45  which should continue to use the concrete class interface) will be defined
46 
47 */
49 
50 namespace edm {
52 }
53 
54 
55 namespace evf{
56 
58  {
59  struct Encoding
60  {
61  Encoding(unsigned int res): reserved_(res), current_(reserved_), currentReserved_(0)
62  {
63  if (reserved_)
65  // completeReservedWithDummies();
66  }
68  {
69  if (reserved_)
70  delete[] dummiesForReserved_;
71  }
72  //trick: only encode state when sending it over (i.e. every sec)
73  int encode(const void *add){
74  std::unordered_map<const void *, int>::const_iterator it=quickReference_.find(add);
75  return (it!=quickReference_.end()) ? (*it).second : 0;
76  }
77  const void* decode(unsigned int index){return decoder_[index];}
78  void fillReserved(const void* add, unsigned int i){
79  // translation_[*name]=current_;
81  if(decoder_.size()<=i)
82  decoder_.push_back(add);
83  else
85  }
86  void updateReserved(const void* add){
89  }
91  {
92  for(unsigned int i = currentReserved_; i<reserved_; i++)
94  }
95  void update(const void* add){
96  // translation_[*name]=current_;
98  decoder_.push_back(add);
99  current_++;
100  }
101  unsigned int vecsize() {
102  return decoder_.size();
103  }
104  std::unordered_map<const void *,int> quickReference_;
105  std::vector<const void *> decoder_;
106  unsigned int reserved_;
107  int current_;
110  };
111  public:
112 
113  // the names of the states - some of them are never reached in an online app
115  // Reserved names for microstates
116  // moved into base class in EventFilter/Utilities for compatibility with MicroStateServiceClassic
117  static const std::string nopath_;
120  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
121 
124 
126  void jobFailure();
128 
130  void postBeginJob();
131  void postEndJob();
132 
137 
142  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
143  void preEvent(edm::StreamContext const&);
144  void postEvent(edm::StreamContext const&);
152  void setExceptionDetected(unsigned int ls);
153 
154  //this is still needed for use in special functions like DQM which are in turn framework services
157 
158  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
159  void startedLookingForFile();
160  void stoppedLookingForFile(unsigned int lumi);
161  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
162  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag=nullptr);
163  bool getAbortFlagForLumi(unsigned int lumi);
164  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc=nullptr)
165  {
166  unsigned int processed = getEventsProcessedForLumi(lumi);
167  if (proc) *proc = processed;
168  return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
169  }
170  std::string getRunDirName() const { return runDirectory_.stem().string(); }
171  void setInputSource(FedRawDataInputSource *inputSource) {inputSource_=inputSource;}
172 
173  private:
174 
175  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
176 
177  void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID) {
178  //pick up only event count here
179  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
180  }
181 
182  void dowork() { // the function to be called in the thread. Thread completes when function returns.
183  monInit_.exchange(true,std::memory_order_acquire);
184  while (!fmt_.m_stoprequest) {
185  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
186  << " ms=" << encPath_[0].encode(ministate_[0])
187  << " us=" << encModule_.encode(microstate_[0]) << std::endl;
188 
189  {
190  std::lock_guard<std::mutex> lock(fmt_.monlock_);
191 
193 
195  std::string CSV = fmt_.jsonMonitor_->getCSVString();
196  //release mutex before writing out fast path file
197  fmt_.monlock_.unlock();
198  if (CSV.size())
199  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
200  }
201 
202  snapCounter_++;
203 
204  }
205  ::sleep(sleepTime_);
206  }
207  }
208 
209  //the actual monitoring thread is held by a separate class object for ease of maintenance
212  std::vector<Encoding> encPath_;
214 
215  unsigned int nStreams_;
216  unsigned int nThreads_;
218  unsigned int fastMonIntervals_;
219  unsigned int snapCounter_ = 0;
222 
223  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
224 
225  std::map<unsigned int, timeval> lumiStartTime_;//needed for multiplexed begin/end lumis
226  timeval fileLookStart_, fileLookStop_;//this could also be calculated in the input source
227 
228  unsigned int lastGlobalLumi_;
229  std::queue<unsigned int> lastGlobalLumisClosed_;
231  unsigned int lumiFromSource_;
232 
233  //global state
235 
236  //per stream
237  std::vector<const void*> ministate_;
238  std::vector<const void*> microstate_;
239  std::vector<const void*> threadMicrostate_;
240 
241  //variables measuring source statistics (global)
242  //unordered_map is not used because of very few elements stored concurrently
243  std::map<unsigned int, double> avgLeadTime_;
244  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
245  //helpers for source statistics:
246  std::map<unsigned int, unsigned long> accuSize_;
247  std::vector<double> leadTimes_;
248  std::map<unsigned int, std::pair<double,unsigned int>> lockStatsDuringLumi_;
249 
250  //for output module
251  std::map<unsigned int, std::pair<unsigned int,bool>> processedEventsPerLumi_;
252 
253  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
254  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
255  std::vector<std::atomic<bool>*> streamCounterUpdating_;
256 
257  std::vector<unsigned long> firstEventId_;
258  std::vector<std::atomic<bool>*> collectedPathList_;
259  std::vector<unsigned int> eventCountForPathInit_;
260  std::vector<bool> pathNamesReady_;
261 
263 
264  bool threadIDAvailable_ = false;
265 
266  std::atomic<unsigned long> totalEventsProcessed_;
267 
272  bool pathLegendWritten_ = false;
273  unsigned int nOutputModules_ =0;
274 
275  std::atomic<bool> monInit_;
276  bool exception_detected_ = false;
277  std::vector<unsigned int> exceptionInLS_;
278  bool emptyLumisectionMode_ = false;
279  };
280 
281 }
282 
283 #endif
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
int i
Definition: DBlmapReader.cc:9
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void fillReserved(const void *add, unsigned int i)
std::string getRunDirName() const
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
def ls
Definition: eostools.py:348
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=0)
void add(const std::vector< const T * > &source, std::vector< const T * > &dest)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
std::vector< bool > pathNamesReady_
void preModuleBeginJob(edm::ModuleDescription const &)
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
std::vector< const void * > threadMicrostate_
std::unordered_map< const void *, int > quickReference_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
static const std::string macroStateNames[FastMonitoringThread::MCOUNT]
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void setInputSource(FedRawDataInputSource *inputSource)
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< const void * > microstate_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
boost::filesystem::path workingDirectory_
std::atomic< bool > m_stoprequest
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< const void * > ministate_
const void * decode(unsigned int index)
std::vector< Encoding > encPath_
edm::ModuleDescription * dummiesForReserved_
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_