CMS 3D CMS Logo

FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
4 
13 
14 #include "boost/filesystem.hpp"
15 
18 
19 #include <string>
20 #include <vector>
21 #include <map>
22 #include <queue>
23 #include <sstream>
24 #include <unordered_map>
25 
26 /*Description
27  this is an evolution of the MicroStateService intended to be run standalone in cmsRun or similar
28  As such, it has to independently create a monitoring thread and run it in each forked process, which needs
29  to be arranged following the standard CMSSW procedure.
30  We try to use boost threads for uniformity with the rest of the framework, even if they suck a bit.
31  A legenda for use by the monitoring process in the DAQ needs to be generated as soon as convenient - since
32  no access to the EventProcessor is granted, this needs to wait until after beginJob is executed.
33  At the same time, we try to spare time in the monitoring by avoiding even a single string lookup and using the
34  moduledesc pointer to key into the map instead.
35  As a bonus, we can now add to the monitored status the current path (and possibly associate modules to a path...)
36  this intermediate info will be called "ministate" :D
37  The general counters and status variables (event number, number of processed events, number of passed and stored
38  events, luminosity section etc. are also monitored here.
39 
40  NOTA BENE!!! with respect to the MicroStateService, no string or string pointers are used for the microstates.
41  NOTA BENE!!! the state of the edm::EventProcessor cannot be monitored directly from within a service, so a
42  different solution must be identified for that (especially one needs to identify error states).
43  NOTA BENE!!! to keep backward compatibility with the MicroStateService, a common base class with abstract interface,
44  exposing the single method to be used by all other packages (except EventFilter/Processor,
45  which should continue to use the concrete class interface) will be defined
46 
47 */
49 
50 namespace edm {
52 }
53 
54 
55 namespace evf{
56 
57  template<typename T>
59  ContainableAtomic(): m_value{} {}
60  ContainableAtomic(T iValue): m_value(iValue) {}
61  ContainableAtomic(ContainableAtomic<T> const& iOther): m_value(iOther.m_value.load()) {}
62  ContainableAtomic<T>& operator=(const void* iValue) {
63  m_value.store(iValue,std::memory_order_relaxed);
64  return *this;
65  }
66  operator T() { return m_value.load(std::memory_order_relaxed); }
67 
68  std::atomic<T> m_value;
69  };
70 
72  {
73  struct Encoding
74  {
75  Encoding(unsigned int res): reserved_(res), current_(reserved_), currentReserved_(0)
76  {
77  if (reserved_)
78  dummiesForReserved_ = new edm::ModuleDescription[reserved_];
79  // completeReservedWithDummies();
80  }
82  {
83  if (reserved_)
84  delete[] dummiesForReserved_;
85  }
86  //trick: only encode state when sending it over (i.e. every sec)
87  int encode(const void *add){
88  std::unordered_map<const void *, int>::const_iterator it=quickReference_.find(add);
89  return (it!=quickReference_.end()) ? (*it).second : 0;
90  }
91  const void* decode(unsigned int index){return decoder_[index];}
92  void fillReserved(const void* add, unsigned int i){
93  // translation_[*name]=current_;
94  quickReference_[add]=i;
95  if(decoder_.size()<=i)
96  decoder_.push_back(add);
97  else
98  decoder_[currentReserved_] = add;
99  }
100  void updateReserved(const void* add){
101  fillReserved(add,currentReserved_);
102  currentReserved_++;
103  }
105  {
106  for(unsigned int i = currentReserved_; i<reserved_; i++)
107  fillReserved(dummiesForReserved_+i,i);
108  }
109  void update(const void* add){
110  // translation_[*name]=current_;
111  quickReference_[add]=current_;
112  decoder_.push_back(add);
113  current_++;
114  }
115  unsigned int vecsize() {
116  return decoder_.size();
117  }
118  std::unordered_map<const void *,int> quickReference_;
119  std::vector<const void *> decoder_;
120  unsigned int reserved_;
121  int current_;
124  };
125  public:
126 
127  // the names of the states - some of them are never reached in an online app
128  static const std::string macroStateNames[FastMonitoringThread::MCOUNT];
129  static const std::string inputStateNames[FastMonitoringThread::inCOUNT];
130  // Reserved names for microstates
131  // moved into base class in EventFilter/Utilities for compatibility with MicroStateServiceClassic
132  static const std::string nopath_;
134  ~FastMonitoringService() override;
135  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
136 
137  std::string makePathLegendaJson();
138  std::string makeModuleLegendaJson();
139  std::string makeInputLegendaJson();
140 
141  void preallocate(edm::service::SystemBounds const&);
142  void jobFailure();
143  void preBeginJob(edm::PathsAndConsumesOfModulesBase const&,edm::ProcessContext const& pc);
144 
145  void preModuleBeginJob(edm::ModuleDescription const&);
146  void postBeginJob();
147  void postEndJob();
148 
149  void postGlobalBeginRun(edm::GlobalContext const&);
150  void preGlobalBeginLumi(edm::GlobalContext const&);
151  void preGlobalEndLumi(edm::GlobalContext const&);
152  void postGlobalEndLumi(edm::GlobalContext const&);
153 
154  void preStreamBeginLumi(edm::StreamContext const&);
155  void postStreamBeginLumi(edm::StreamContext const&);
156  void preStreamEndLumi(edm::StreamContext const&);
157  void postStreamEndLumi(edm::StreamContext const&);
158  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
159  void preEvent(edm::StreamContext const&);
160  void postEvent(edm::StreamContext const&);
161  void preSourceEvent(edm::StreamID);
162  void postSourceEvent(edm::StreamID);
163  void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
164  void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
165  void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
166  void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
167  void preSourceEarlyTermination(edm::TerminationOrigin);
168  void setExceptionDetected(unsigned int ls);
169 
170  //this is still needed for use in special functions like DQM which are in turn framework services
171  void setMicroState(MicroStateService::Microstate) override;
172  void setMicroState(edm::StreamID, MicroStateService::Microstate) override;
173 
174  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
175  void startedLookingForFile();
176  void stoppedLookingForFile(unsigned int lumi);
177  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
178  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag=nullptr);
179  bool getAbortFlagForLumi(unsigned int lumi);
180  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc=nullptr)
181  {
182  unsigned int processed = getEventsProcessedForLumi(lumi);
183  if (proc) *proc = processed;
184  return !getAbortFlagForLumi(lumi);
185  }
186  std::string getRunDirName() const { return runDirectory_.stem().string(); }
187  void setInputSource(FedRawDataInputSource *inputSource) {inputSource_=inputSource;}
188  void setInState(FastMonitoringThread::InputState inputState) {inputState_=inputState;}
189  void setInStateSup(FastMonitoringThread::InputState inputState) {inputSupervisorState_=inputState;}
190 
191  private:
192 
193  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
194 
195  void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID) {
196  //pick up only event count here
197  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
198  }
199 
200  void dowork() { // the function to be called in the thread. Thread completes when function returns.
201  monInit_.exchange(true,std::memory_order_acquire);
202  while (!fmt_.m_stoprequest) {
203  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
204  << " ms=" << encPath_[0].encode(ministate_[0])
205  << " us=" << encModule_.encode(microstate_[0])
206  << " is=" << inputStateNames[inputState_]
207  << " iss="<< inputStateNames[inputSupervisorState_]
208  << std::endl;
209 
210  {
211  std::lock_guard<std::mutex> lock(fmt_.monlock_);
212 
213  doSnapshot(lastGlobalLumi_,false);
214 
215  if (fastMonIntervals_ && (snapCounter_%fastMonIntervals_)==0) {
216  if (filePerFwkStream_) {
217  std::vector<std::string> CSVv;
218  for (unsigned int i=0;i<nStreams_;i++) {
219  CSVv.push_back(fmt_.jsonMonitor_->getCSVString((int)i));
220  }
221  fmt_.monlock_.unlock();
222  for (unsigned int i=0;i<nStreams_;i++) {
223  if (!CSVv[i].empty())
224  fmt_.jsonMonitor_->outputCSV(fastPathList_[i],CSVv[i]);
225  }
226  }
227  else {
228  std::string CSV = fmt_.jsonMonitor_->getCSVString();
229  //release mutex before writing out fast path file
230  fmt_.monlock_.unlock();
231  if (!CSV.empty())
232  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
233  }
234  }
235 
236  snapCounter_++;
237 
238  }
239  ::sleep(sleepTime_);
240  }
241  }
242 
243  //the actual monitoring thread is held by a separate class object for ease of maintenance
246  std::vector<Encoding> encPath_;
247  FedRawDataInputSource * inputSource_ = nullptr;
248  std::atomic<FastMonitoringThread::InputState> inputState_ { FastMonitoringThread::InputState::inInit };
249  std::atomic<FastMonitoringThread::InputState> inputSupervisorState_ { FastMonitoringThread::InputState::inInit };
250 
251  unsigned int nStreams_;
252  unsigned int nThreads_;
254  unsigned int fastMonIntervals_;
255  unsigned int snapCounter_ = 0;
256  std::string microstateDefPath_, fastMicrostateDefPath_;
257  std::string fastName_, fastPath_, slowName_;
259 
260  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
261 
262  std::map<unsigned int, timeval> lumiStartTime_;//needed for multiplexed begin/end lumis
263  timeval fileLookStart_, fileLookStop_;//this could also be calculated in the input source
264 
265  unsigned int lastGlobalLumi_;
266  std::atomic<bool> isInitTransition_;
267  unsigned int lumiFromSource_;
268 
269  //global state
270  std::atomic<FastMonitoringThread::Macrostate> macrostate_;
271 
272  //per stream
273  std::vector<ContainableAtomic<const void*>> ministate_;
274  std::vector<ContainableAtomic<const void*>> microstate_;
275  std::vector<ContainableAtomic<const void*>> threadMicrostate_;
276 
277  //variables measuring source statistics (global)
278  //unordered_map is not used because of very few elements stored concurrently
279  std::map<unsigned int, double> avgLeadTime_;
280  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
281  //helpers for source statistics:
282  std::map<unsigned int, unsigned long> accuSize_;
283  std::vector<double> leadTimes_;
284  std::map<unsigned int, std::pair<double,unsigned int>> lockStatsDuringLumi_;
285 
286  //for output module
287  std::map<unsigned int, std::pair<unsigned int,bool>> processedEventsPerLumi_;
288 
289  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
290  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
291  std::vector<std::atomic<bool>*> streamCounterUpdating_;
292 
293  std::vector<unsigned long> firstEventId_;
294  std::vector<std::atomic<bool>*> collectedPathList_;
295  std::vector<ContainableAtomic<unsigned int>> eventCountForPathInit_;
296  std::vector<bool> pathNamesReady_;
297 
299 
300  bool threadIDAvailable_ = false;
301 
302  std::atomic<unsigned long> totalEventsProcessed_;
303 
309  bool pathLegendWritten_ = false;
310  unsigned int nOutputModules_ =0;
311 
312  std::atomic<bool> monInit_;
313  bool exception_detected_ = false;
314  std::vector<unsigned int> exceptionInLS_;
315  std::vector<std::string> fastPathList_;
316 
317  };
318 
319 }
320 
321 #endif
Definition: fillJson.h:27
ContainableAtomic(ContainableAtomic< T > const &iOther)
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
std::map< unsigned int, timeval > lumiStartTime_
void fillReserved(const void *add, unsigned int i)
std::string getRunDirName() const
TrainProcessor *const proc
Definition: MVATrainer.cc:101
std::map< unsigned int, unsigned long > accuSize_
void setInState(FastMonitoringThread::InputState inputState)
std::vector< std::atomic< bool > * > streamCounterUpdating_
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=0)
void setInStateSup(FastMonitoringThread::InputState inputState)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
ContainableAtomic< T > & operator=(const void *iValue)
Definition: Electron.h:6
std::vector< bool > pathNamesReady_
std::vector< std::atomic< bool > * > collectedPathList_
std::map< unsigned int, double > avgLeadTime_
static const std::string nopath_
std::vector< ContainableAtomic< const void * > > threadMicrostate_
std::unordered_map< const void *, int > quickReference_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
std::vector< std::string > fastPathList_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
void setInputSource(FedRawDataInputSource *inputSource)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
def ls(path, rec=False)
Definition: eostools.py:349
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::atomic< unsigned long > totalEventsProcessed_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
def load(fileName)
Definition: svgfig.py:547
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< double > leadTimes_
HLT enums.
boost::filesystem::path workingDirectory_
long double T
const void * decode(unsigned int index)
std::vector< Encoding > encPath_
edm::ModuleDescription * dummiesForReserved_
std::vector< unsigned int > exceptionInLS_