test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
4 
13 
14 #include "boost/filesystem.hpp"
15 
18 
19 #include <string>
20 #include <vector>
21 #include <map>
22 #include <queue>
23 #include <sstream>
24 #include <unordered_map>
25 
26 /*Description
27  this is an evolution of the MicroStateService intended to be run standalone in cmsRun or similar
28  As such, it has to independently create a monitoring thread and run it in each forked process, which needs
29  to be arranged following the standard CMSSW procedure.
30  We try to use boost threads for uniformity with the rest of the framework, even if they suck a bit.
31  A legenda for use by the monitoring process in the DAQ needs to be generated as soon as convenient - since
32  no access to the EventProcessor is granted, this needs to wait until after beginJob is executed.
33  At the same time, we try to spare time in the monitoring by avoiding even a single string lookup and using the
34  moduledesc pointer to key into the map instead.
35  As a bonus, we can now add to the monitored status the current path (and possibly associate modules to a path...)
36  this intermediate info will be called "ministate" :D
37  The general counters and status variables (event number, number of processed events, number of passed and stored
38  events, luminosity section etc. are also monitored here.
39 
40  NOTA BENE!!! with respect to the MicroStateService, no string or string pointers are used for the microstates.
41  NOTA BENE!!! the state of the edm::EventProcessor cannot be monitored directly from within a service, so a
42  different solution must be identified for that (especially one needs to identify error states).
43  NOTA BENE!!! to keep backward compatibility with the MicroStateService, a common base class with abstract interface,
44  exposing the single method to be used by all other packages (except EventFilter/Processor,
45  which should continue to use the concrete class interface) will be defined
46 
47 */
49 
50 namespace edm {
52 }
53 
54 
55 namespace evf{
56 
58  {
59  struct Encoding
60  {
61  Encoding(unsigned int res): reserved_(res), current_(reserved_), currentReserved_(0)
62  {
63  if (reserved_)
65  // completeReservedWithDummies();
66  }
68  {
69  if (reserved_)
70  delete[] dummiesForReserved_;
71  }
72  //trick: only encode state when sending it over (i.e. every sec)
73  int encode(const void *add){
74  std::unordered_map<const void *, int>::const_iterator it=quickReference_.find(add);
75  return (it!=quickReference_.end()) ? (*it).second : 0;
76  }
77  const void* decode(unsigned int index){return decoder_[index];}
78  void fillReserved(const void* add, unsigned int i){
79  // translation_[*name]=current_;
81  if(decoder_.size()<=i)
82  decoder_.push_back(add);
83  else
85  }
86  void updateReserved(const void* add){
89  }
91  {
92  for(unsigned int i = currentReserved_; i<reserved_; i++)
94  }
95  void update(const void* add){
96  // translation_[*name]=current_;
98  decoder_.push_back(add);
99  current_++;
100  }
101  unsigned int vecsize() {
102  return decoder_.size();
103  }
104  std::unordered_map<const void *,int> quickReference_;
105  std::vector<const void *> decoder_;
106  unsigned int reserved_;
107  int current_;
110  };
111  public:
112 
113  // the names of the states - some of them are never reached in an online app
116  // Reserved names for microstates
117  // moved into base class in EventFilter/Utilities for compatibility with MicroStateServiceClassic
118  static const std::string nopath_;
121  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
122 
126 
128  void jobFailure();
130 
132  void postBeginJob();
133  void postEndJob();
134 
139 
144  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
145  void preEvent(edm::StreamContext const&);
146  void postEvent(edm::StreamContext const&);
154  void setExceptionDetected(unsigned int ls);
155 
156  //this is still needed for use in special functions like DQM which are in turn framework services
159 
160  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
161  void startedLookingForFile();
162  void stoppedLookingForFile(unsigned int lumi);
163  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
164  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag=nullptr);
165  bool getAbortFlagForLumi(unsigned int lumi);
166  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc=nullptr)
167  {
168  unsigned int processed = getEventsProcessedForLumi(lumi);
169  if (proc) *proc = processed;
170  return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
171  }
172  std::string getRunDirName() const { return runDirectory_.stem().string(); }
173  void setInputSource(FedRawDataInputSource *inputSource) {inputSource_=inputSource;}
176 
177  private:
178 
179  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
180 
181  void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID) {
182  //pick up only event count here
183  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
184  }
185 
186  void dowork() { // the function to be called in the thread. Thread completes when function returns.
187  monInit_.exchange(true,std::memory_order_acquire);
188  while (!fmt_.m_stoprequest) {
189  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
190  << " ms=" << encPath_[0].encode(ministate_[0])
191  << " us=" << encModule_.encode(microstate_[0])
192  << " is=" << inputStateNames[inputState_]
194  << std::endl;
195 
196  {
197  std::lock_guard<std::mutex> lock(fmt_.monlock_);
198 
200 
202  if (filePerFwkStream_) {
203  std::vector<std::string> CSVv;
204  for (unsigned int i=0;i<nStreams_;i++) {
205  CSVv.push_back(fmt_.jsonMonitor_->getCSVString((int)i));
206  }
207  fmt_.monlock_.unlock();
208  for (unsigned int i=0;i<nStreams_;i++) {
209  if (CSVv[i].size())
210  fmt_.jsonMonitor_->outputCSV(fastPathList_[i],CSVv[i]);
211  }
212  }
213  else {
214  std::string CSV = fmt_.jsonMonitor_->getCSVString();
215  //release mutex before writing out fast path file
216  fmt_.monlock_.unlock();
217  if (CSV.size())
218  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
219  }
220  }
221 
222  snapCounter_++;
223 
224  }
225  ::sleep(sleepTime_);
226  }
227  }
228 
229  //the actual monitoring thread is held by a separate class object for ease of maintenance
232  std::vector<Encoding> encPath_;
236 
237  unsigned int nStreams_;
238  unsigned int nThreads_;
240  unsigned int fastMonIntervals_;
241  unsigned int snapCounter_ = 0;
245 
246  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
247 
248  std::map<unsigned int, timeval> lumiStartTime_;//needed for multiplexed begin/end lumis
249  timeval fileLookStart_, fileLookStop_;//this could also be calculated in the input source
250 
251  unsigned int lastGlobalLumi_;
252  std::queue<unsigned int> lastGlobalLumisClosed_;
255  unsigned int lumiFromSource_;
256 
257  //global state
259 
260  //per stream
261  std::vector<const void*> ministate_;
262  std::vector<const void*> microstate_;
263  std::vector<const void*> threadMicrostate_;
264 
265  //variables measuring source statistics (global)
266  //unordered_map is not used because of very few elements stored concurrently
267  std::map<unsigned int, double> avgLeadTime_;
268  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
269  //helpers for source statistics:
270  std::map<unsigned int, unsigned long> accuSize_;
271  std::vector<double> leadTimes_;
272  std::map<unsigned int, std::pair<double,unsigned int>> lockStatsDuringLumi_;
273 
274  //for output module
275  std::map<unsigned int, std::pair<unsigned int,bool>> processedEventsPerLumi_;
276 
277  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
278  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
279  std::vector<std::atomic<bool>*> streamCounterUpdating_;
280 
281  std::vector<unsigned long> firstEventId_;
282  std::vector<std::atomic<bool>*> collectedPathList_;
283  std::vector<unsigned int> eventCountForPathInit_;
284  std::vector<bool> pathNamesReady_;
285 
287 
288  bool threadIDAvailable_ = false;
289 
290  std::atomic<unsigned long> totalEventsProcessed_;
291 
297  bool pathLegendWritten_ = false;
298  unsigned int nOutputModules_ =0;
299 
300  std::atomic<bool> monInit_;
301  bool exception_detected_ = false;
302  std::vector<unsigned int> exceptionInLS_;
303  bool emptyLumisectionMode_ = false;
304  std::vector<std::string> fastPathList_;
305 
306  };
307 
308 }
309 
310 #endif
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
FastMonitoringThread::InputState inputState_
int i
Definition: DBlmapReader.cc:9
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void fillReserved(const void *add, unsigned int i)
std::string getRunDirName() const
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
void setInState(FastMonitoringThread::InputState inputState)
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
def ls
Definition: eostools.py:348
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=0)
void setInStateSup(FastMonitoringThread::InputState inputState)
void add(const std::vector< const T * > &source, std::vector< const T * > &dest)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
FastMonitoringThread::InputState inputSupervisorState_
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
std::vector< bool > pathNamesReady_
void preModuleBeginJob(edm::ModuleDescription const &)
static const std::string inputStateNames[FastMonitoringThread::inCOUNT]
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
std::vector< const void * > threadMicrostate_
std::unordered_map< const void *, int > quickReference_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
static const std::string macroStateNames[FastMonitoringThread::MCOUNT]
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< std::string > fastPathList_
void postStreamBeginLumi(edm::StreamContext const &)
void setInputSource(FedRawDataInputSource *inputSource)
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< const void * > microstate_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
boost::filesystem::path workingDirectory_
std::atomic< bool > m_stoprequest
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< const void * > ministate_
const void * decode(unsigned int index)
std::vector< Encoding > encPath_
tuple size
Write out results.
edm::ModuleDescription * dummiesForReserved_
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_