1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
14 #include "boost/filesystem.hpp"
19 #include <string>
20 #include <vector>
21 #include <map>
22 #include <queue>
23 #include <sstream>
24 #include <unordered_map>
26 /*Description
27  this is an evolution of the MicroStateService intended to be run standalone in cmsRun or similar
28  As such, it has to independently create a monitoring thread and run it in each forked process, which needs
29  to be arranged following the standard CMSSW procedure.
30  We try to use boost threads for uniformity with the rest of the framework, even if they suck a bit.
31  A legenda for use by the monitoring process in the DAQ needs to be generated as soon as convenient - since
32  no access to the EventProcessor is granted, this needs to wait until after beginJob is executed.
33  At the same time, we try to spare time in the monitoring by avoiding even a single string lookup and using the
34  moduledesc pointer to key into the map instead.
35  As a bonus, we can now add to the monitored status the current path (and possibly associate modules to a path...)
36  this intermediate info will be called "ministate" :D
37  The general counters and status variables (event number, number of processed events, number of passed and stored
38  events, luminosity section etc. are also monitored here.
40  NOTA BENE!!! with respect to the MicroStateService, no string or string pointers are used for the microstates.
41  NOTA BENE!!! the state of the edm::EventProcessor cannot be monitored directly from within a service, so a
42  different solution must be identified for that (especially one needs to identify error states).
43  NOTA BENE!!! to keep backward compatibility with the MicroStateService, a common base class with abstract interface,
44  exposing the single method to be used by all other packages (except EventFilter/Processor,
45  which should continue to use the concrete class interface) will be defined
47 */
49 namespace evf{
52  {
53  struct Encoding
54  {
55  Encoding(unsigned int res): reserved_(res), current_(reserved_), currentReserved_(0)
56  {
57  if (reserved_)
59  // completeReservedWithDummies();
60  }
62  {
63  if (reserved_)
64  delete[] dummiesForReserved_;
65  }
66  //trick: only encode state when sending it over (i.e. every sec)
67  int encode(const void *add){
68  std::unordered_map<const void *, int>::const_iterator it=quickReference_.find(add);
69  return (it!=quickReference_.end()) ? (*it).second : 0;
70  }
71  const void* decode(unsigned int index){return decoder_[index];}
72  void fillReserved(void* add, unsigned int i){
73  // translation_[*name]=current_;
75  if(decoder_.size()<=i)
76  decoder_.push_back(add);
77  else
79  }
80  void updateReserved(void* add){
83  }
85  {
86  for(unsigned int i = currentReserved_; i<reserved_; i++)
88  }
89  void update(void* add){
90  // translation_[*name]=current_;
92  decoder_.push_back(add);
93  current_++;
94  }
95  unsigned int vecsize() {
96  return decoder_.size();
97  }
98  std::unordered_map<const void *,int> quickReference_;
99  std::vector<const void *> decoder_;
100  unsigned int reserved_;
101  int current_;
104  };
105  public:
107  // the names of the states - some of them are never reached in an online app
109  // Reserved names for microstates
110  // moved into base class in EventFilter/Utilities for compatibility with MicroStateServiceClassic
111  static const std::string nopath_;
119  void jobFailure();
121  void postBeginJob();
122  void postEndJob();
133  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
134  void preEvent(edm::StreamContext const&);
135  void postEvent(edm::StreamContext const&);
143  void setExceptionDetected(unsigned int ls);
145  //this is still needed for use in special functions like DQM which are in turn framework services
149  void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);
150  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
151  void startedLookingForFile();
152  void stoppedLookingForFile(unsigned int lumi);
153  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
154  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag=nullptr);
155  bool getAbortFlagForLumi(unsigned int lumi);
156  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc=nullptr)
157  {
158  unsigned int processed = getEventsProcessedForLumi(lumi);
159  if (proc) *proc = processed;
160  return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
161  }
162  std::string getRunDirName() const { return runDirectory_.stem().string(); }
164  private:
166  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
168  void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID) {
169  //pick up only event count here
170  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
171  }
173  void dowork() { // the function to be called in the thread. Thread completes when function returns.
175  while (!fmt_.m_stoprequest) {
176  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
177  << " ms=" << encPath_[0].encode(ministate_[0])
178  << " us=" << encModule_.encode(microstate_[0]) << std::endl;
180  {
181  std::lock_guard<std::mutex> lock(fmt_.monlock_);
186  std::string CSV = fmt_.jsonMonitor_->getCSVString();
187  //release mutex before writing out fast path file
188  fmt_.monlock_.unlock();
189  if (CSV.size())
190  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
191  }
193  snapCounter_++;
195  }
196  ::sleep(sleepTime_);
197  }
198  }
200  //the actual monitoring thread is held by a separate class object for ease of maintenance
203  std::vector<Encoding> encPath_;
205  unsigned int nStreams_;
206  unsigned int nThreads_;
208  unsigned int fastMonIntervals_;
209  unsigned int snapCounter_ = 0;
213  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
215  std::map<unsigned int, timeval> lumiStartTime_;//needed for multiplexed begin/end lumis
216  timeval fileLookStart_, fileLookStop_;//this could also be calculated in the input source
218  unsigned int lastGlobalLumi_;
219  std::queue<unsigned int> lastGlobalLumisClosed_;
221  unsigned int lumiFromSource_;
223  //global state
226  //per stream
227  std::vector<const void*> ministate_;
228  std::vector<const void*> microstate_;
229  std::vector<const void*> threadMicrostate_;
231  //variables measuring source statistics (global)
232  //unordered_map is not used because of very few elements stored concurrently
233  std::map<unsigned int, double> avgLeadTime_;
234  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
235  //helpers for source statistics:
236  std::map<unsigned int, unsigned long> accuSize_;
237  std::vector<double> leadTimes_;
238  std::map<unsigned int, std::pair<double,unsigned int>> lockStatsDuringLumi_;
240  //for output module
241  std::map<unsigned int, std::pair<unsigned int,bool>> processedEventsPerLumi_;
243  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
244  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
245  std::vector<std::atomic<bool>*> streamCounterUpdating_;
247  std::vector<unsigned long> firstEventId_;
248  std::vector<std::atomic<bool>*> collectedPathList_;
249  std::vector<unsigned int> eventCountForPathInit_;
250  std::vector<bool> pathNamesReady_;
254  std::map<unsigned int,unsigned int> sourceEventsReport_;
256  bool threadIDAvailable_ = false;
258  std::atomic<unsigned long> totalEventsProcessed_;
264  bool pathLegendWritten_ = false;
265  unsigned int nOutputModules_ =0;
267  std::atomic<bool> monInit_;
268  bool exception_detected_ = false;
269  std::vector<unsigned int> exceptionInLS_;
270  bool emptyLumisectionMode_ = false;
271  };
273 }
275 #endif
