CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
4 
13 
14 #include "boost/filesystem.hpp"
15 
18 
19 #include <string>
20 #include <vector>
21 #include <map>
22 #include <queue>
23 #include <sstream>
24 #include <unordered_map>
25 
26 /*Description
27  this is an evolution of the MicroStateService intended to be run standalone in cmsRun or similar
28  As such, it has to independently create a monitoring thread and run it in each forked process, which needs
29  to be arranged following the standard CMSSW procedure.
30  We try to use boost threads for uniformity with the rest of the framework, even if they suck a bit.
31  A legenda for use by the monitoring process in the DAQ needs to be generated as soon as convenient - since
32  no access to the EventProcessor is granted, this needs to wait until after beginJob is executed.
33  At the same time, we try to spare time in the monitoring by avoiding even a single string lookup and using the
34  moduledesc pointer to key into the map instead.
35  As a bonus, we can now add to the monitored status the current path (and possibly associate modules to a path...)
36  this intermediate info will be called "ministate" :D
37  The general counters and status variables (event number, number of processed events, number of passed and stored
38  events, luminosity section etc. are also monitored here.
39 
40  NOTA BENE!!! with respect to the MicroStateService, no string or string pointers are used for the microstates.
41  NOTA BENE!!! the state of the edm::EventProcessor cannot be monitored directly from within a service, so a
42  different solution must be identified for that (especially one needs to identify error states).
43  NOTA BENE!!! to keep backward compatibility with the MicroStateService, a common base class with abstract interface,
44  exposing the single method to be used by all other packages (except EventFilter/Processor,
45  which should continue to use the concrete class interface) will be defined
46 
47 */
48 
49 namespace evf{
50 
52  {
53  struct Encoding
54  {
55  Encoding(unsigned int res): reserved_(res), current_(reserved_), currentReserved_(0)
56  {
57  if (reserved_)
59  // completeReservedWithDummies();
60  }
62  {
63  if (reserved_)
64  delete[] dummiesForReserved_;
65  }
66  //trick: only encode state when sending it over (i.e. every sec)
67  int encode(const void *add){
68  std::unordered_map<const void *, int>::const_iterator it=quickReference_.find(add);
69  return (it!=quickReference_.end()) ? (*it).second : 0;
70  }
71  const void* decode(unsigned int index){return decoder_[index];}
72  void fillReserved(void* add, unsigned int i){
73  // translation_[*name]=current_;
75  if(decoder_.size()<=i)
76  decoder_.push_back(add);
77  else
79  }
80  void updateReserved(void* add){
83  }
85  {
86  for(unsigned int i = currentReserved_; i<reserved_; i++)
88  }
89  void update(void* add){
90  // translation_[*name]=current_;
92  decoder_.push_back(add);
93  current_++;
94  }
95  unsigned int vecsize() {
96  return decoder_.size();
97  }
98  std::unordered_map<const void *,int> quickReference_;
99  std::vector<const void *> decoder_;
100  unsigned int reserved_;
101  int current_;
104  };
105  public:
106 
107  // the names of the states - some of them are never reached in an online app
109  // Reserved names for microstates
110  // moved into base class in EventFilter/Utilities for compatibility with MicroStateServiceClassic
111  static const std::string nopath_;
114 
117 
119  void jobFailure();
121  void postBeginJob();
122  void postEndJob();
123 
128 
133  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
134  void preEvent(edm::StreamContext const&);
135  void postEvent(edm::StreamContext const&);
140 
141  //this is still needed for use in special functions like DQM which are in turn framework services
144 
145  void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);
146  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
147  void startedLookingForFile();
148  void stoppedLookingForFile(unsigned int lumi);
149  unsigned int getEventsProcessedForLumi(unsigned int lumi);
150  std::string getRunDirName() const { return runDirectory_.stem().string(); }
151 
152  private:
153 
154  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
155 
156  void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID) {
157  //pick up only event count here
158  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
159  }
160 
161  void dowork() { // the function to be called in the thread. Thread completes when function returns.
162  monInit_.exchange(true,std::memory_order_acquire);
163  while (!fmt_.m_stoprequest) {
164  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
165  << " ms=" << encPath_[0].encode(ministate_[0])
166  << " us=" << encModule_.encode(microstate_[0]) << std::endl;
167 
168  {
169  std::lock_guard<std::mutex> lock(fmt_.monlock_);
170 
172 
174  std::string CSV = fmt_.jsonMonitor_->getCSVString();
175  //release mutex before writing out fast path file
176  fmt_.monlock_.unlock();
177  if (CSV.size())
178  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
179  }
180 
181  snapCounter_++;
182 
183  }
184  ::sleep(sleepTime_);
185  }
186  }
187 
188  //the actual monitoring thread is held by a separate class object for ease of maintenance
191  std::vector<Encoding> encPath_;
192 
193  unsigned int nStreams_;
194  unsigned int nThreads_;
196  unsigned int fastMonIntervals_;
197  unsigned int snapCounter_ = 0;
200 
201  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
202 
203  std::map<unsigned int, timeval> lumiStartTime_;//needed for multiplexed begin/end lumis
204  timeval fileLookStart_, fileLookStop_;//this could also be calculated in the input source
205 
206  unsigned int lastGlobalLumi_;
207  std::queue<unsigned int> lastGlobalLumisClosed_;
209  unsigned int lumiFromSource_;
210 
211  //global state
213 
214  //per stream
215  std::vector<const void*> ministate_;
216  std::vector<const void*> microstate_;
217  std::vector<const void*> threadMicrostate_;
218 
219  //variables measuring source statistics (global)
220  //unordered_map is not used because of very few elements stored concurrently
221  std::map<unsigned int, double> avgLeadTime_;
222  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
223  //helpers for source statistics:
224  std::map<unsigned int, unsigned long> accuSize_;
225  std::vector<double> leadTimes_;
226 
227  //for output module
228  std::map<unsigned int, unsigned int> processedEventsPerLumi_;
229 
230  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
231  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
232  std::vector<std::atomic<bool>*> streamCounterUpdating_;
233 
234  std::vector<unsigned long> firstEventId_;
235  std::vector<std::atomic<bool>*> collectedPathList_;
236  std::vector<unsigned int> eventCountForPathInit_;
237  std::vector<bool> pathNamesReady_;
238 
240 
241  std::map<unsigned int,unsigned int> sourceEventsReport_;
242 
243  bool threadIDAvailable_ = false;
244 
245  std::atomic<unsigned long> totalEventsProcessed_;
246 
249  bool pathLegendWritten_ = false;
250 
251  std::atomic<bool> monInit_;
252  };
253 
254 }
255 
256 #endif
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
int i
Definition: DBlmapReader.cc:9
boost::filesystem::path runDirectory_
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
std::string getRunDirName() const
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void add(const std::vector< const T * > &source, std::vector< const T * > &dest)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
std::vector< bool > pathNamesReady_
unsigned int getEventsProcessedForLumi(unsigned int lumi)
void preModuleBeginJob(edm::ModuleDescription const &)
std::vector< std::atomic< bool > * > collectedPathList_
tuple path
else: Piece not in the list, fine.
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
static const std::string nopath_
std::vector< const void * > threadMicrostate_
std::unordered_map< const void *, int > quickReference_
std::vector< unsigned long > firstEventId_
std::vector< const void * > decoder_
static const std::string macroStateNames[FastMonitoringThread::MCOUNT]
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
std::vector< const void * > microstate_
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
tuple events
Definition: patZpeak.py:19
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > processedEventsPerLumi_
std::atomic< bool > m_stoprequest
std::map< unsigned int, unsigned int > sourceEventsReport_
std::unique_ptr< FastMonitor > jsonMonitor_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< const void * > ministate_
const void * decode(unsigned int index)
std::vector< Encoding > encPath_
edm::ModuleDescription * dummiesForReserved_
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void fillReserved(void *add, unsigned int i)