CMS 3D CMS Logo

FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
12 
13 #include "boost/filesystem.hpp"
14 
17 
18 #include <string>
19 #include <vector>
20 #include <map>
21 #include <queue>
22 #include <sstream>
23 #include <unordered_map>
24 
25 /*Description
26  this is an evolution of the MicroStateService intended to be run standalone in cmsRun or similar
27  As such, it has to independently create a monitoring thread and run it in each forked process, which needs
28  to be arranged following the standard CMSSW procedure.
29  We try to use boost threads for uniformity with the rest of the framework, even if they suck a bit.
30  A legenda for use by the monitoring process in the DAQ needs to be generated as soon as convenient - since
31  no access to the EventProcessor is granted, this needs to wait until after beginJob is executed.
32  At the same time, we try to spare time in the monitoring by avoiding even a single string lookup and using the
33  moduledesc pointer to key into the map instead.
34  As a bonus, we can now add to the monitored status the current path (and possibly associate modules to a path...)
35  this intermediate info will be called "ministate" :D
36  The general counters and status variables (event number, number of processed events, number of passed and stored
37  events, luminosity section etc. are also monitored here.
38 
39  NOTA BENE!!! with respect to the MicroStateService, no string or string pointers are used for the microstates.
40  NOTA BENE!!! the state of the edm::EventProcessor cannot be monitored directly from within a service, so a
41  different solution must be identified for that (especially one needs to identify error states).
42  NOTA BENE!!! to keep backward compatibility with the MicroStateService, a common base class with abstract interface,
43  exposing the single method to be used by all other packages (except EventFilter/Processor,
44  which should continue to use the concrete class interface) will be defined
45 
46 */
48 
49 namespace edm {
51 }
52 
53 namespace evf {
54 
55  template <typename T>
57  ContainableAtomic() : m_value{} {}
58  ContainableAtomic(T iValue) : m_value(iValue) {}
59  ContainableAtomic(ContainableAtomic<T> const& iOther) : m_value(iOther.m_value.load()) {}
60  ContainableAtomic<T>& operator=(const void* iValue) {
61  m_value.store(iValue, std::memory_order_relaxed);
62  return *this;
63  }
64  operator T() { return m_value.load(std::memory_order_relaxed); }
65 
66  std::atomic<T> m_value;
67  };
68 
70  struct Encoding {
71  Encoding(unsigned int res) : reserved_(res), current_(reserved_), currentReserved_(0) {
72  if (reserved_)
73  dummiesForReserved_ = new edm::ModuleDescription[reserved_];
74  // completeReservedWithDummies();
75  }
77  if (reserved_)
78  delete[] dummiesForReserved_;
79  }
80  //trick: only encode state when sending it over (i.e. every sec)
81  int encode(const void* add) {
82  std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
83  return (it != quickReference_.end()) ? (*it).second : 0;
84  }
85  const void* decode(unsigned int index) { return decoder_[index]; }
86  void fillReserved(const void* add, unsigned int i) {
87  // translation_[*name]=current_;
88  quickReference_[add] = i;
89  if (decoder_.size() <= i)
90  decoder_.push_back(add);
91  else
92  decoder_[currentReserved_] = add;
93  }
94  void updateReserved(const void* add) {
95  fillReserved(add, currentReserved_);
96  currentReserved_++;
97  }
99  for (unsigned int i = currentReserved_; i < reserved_; i++)
100  fillReserved(dummiesForReserved_ + i, i);
101  }
102  void update(const void* add) {
103  // translation_[*name]=current_;
104  quickReference_[add] = current_;
105  decoder_.push_back(add);
106  current_++;
107  }
108  unsigned int vecsize() { return decoder_.size(); }
109  std::unordered_map<const void*, int> quickReference_;
110  std::vector<const void*> decoder_;
111  unsigned int reserved_;
112  int current_;
115  };
116 
117  public:
118  // the names of the states - some of them are never reached in an online app
119  static const std::string macroStateNames[FastMonitoringThread::MCOUNT];
120  static const std::string inputStateNames[FastMonitoringThread::inCOUNT];
121  // Reserved names for microstates
122  // moved into base class in EventFilter/Utilities for compatibility with MicroStateServiceClassic
123  static const std::string nopath_;
125  ~FastMonitoringService() override;
126  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
127 
128  std::string makePathLegendaJson();
129  std::string makeModuleLegendaJson();
130  std::string makeInputLegendaJson();
131 
132  void preallocate(edm::service::SystemBounds const&);
133  void jobFailure();
134  void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc);
135 
136  void preModuleBeginJob(edm::ModuleDescription const&);
137  void postBeginJob();
138  void postEndJob();
139 
140  void postGlobalBeginRun(edm::GlobalContext const&);
141  void preGlobalBeginLumi(edm::GlobalContext const&);
142  void preGlobalEndLumi(edm::GlobalContext const&);
143  void postGlobalEndLumi(edm::GlobalContext const&);
144 
145  void preStreamBeginLumi(edm::StreamContext const&);
146  void postStreamBeginLumi(edm::StreamContext const&);
147  void preStreamEndLumi(edm::StreamContext const&);
148  void postStreamEndLumi(edm::StreamContext const&);
149  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
150  void preEvent(edm::StreamContext const&);
151  void postEvent(edm::StreamContext const&);
152  void preSourceEvent(edm::StreamID);
153  void postSourceEvent(edm::StreamID);
154  void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
155  void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
156  void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
157  void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
158  void preSourceEarlyTermination(edm::TerminationOrigin);
159  void setExceptionDetected(unsigned int ls);
160 
161  //this is still needed for use in special functions like DQM which are in turn framework services
162  void setMicroState(MicroStateService::Microstate) override;
163  void setMicroState(edm::StreamID, MicroStateService::Microstate) override;
164 
165  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
166  void startedLookingForFile();
167  void stoppedLookingForFile(unsigned int lumi);
168  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
169  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
170  bool getAbortFlagForLumi(unsigned int lumi);
171  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
172  unsigned int processed = getEventsProcessedForLumi(lumi);
173  if (proc)
174  *proc = processed;
175  return !getAbortFlagForLumi(lumi);
176  }
177  std::string getRunDirName() const { return runDirectory_.stem().string(); }
178  void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
179  void setInState(FastMonitoringThread::InputState inputState) { inputState_ = inputState; }
180  void setInStateSup(FastMonitoringThread::InputState inputState) { inputSupervisorState_ = inputState; }
181 
182  private:
183  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
184 
185  void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID) {
186  //pick up only event count here
187  fmt_.jsonMonitor_->snapStreamAtomic(ls, streamID);
188  }
189 
190  void dowork() { // the function to be called in the thread. Thread completes when function returns.
191  monInit_.exchange(true, std::memory_order_acquire);
192  while (!fmt_.m_stoprequest) {
193  edm::LogInfo("FastMonitoringService")
194  << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
195  << " ms=" << encPath_[0].encode(ministate_[0]) << " us=" << encModule_.encode(microstate_[0])
196  << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_] << std::endl;
197 
198  {
199  std::lock_guard<std::mutex> lock(fmt_.monlock_);
200 
201  doSnapshot(lastGlobalLumi_, false);
202 
203  if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
204  if (filePerFwkStream_) {
205  std::vector<std::string> CSVv;
206  for (unsigned int i = 0; i < nStreams_; i++) {
207  CSVv.push_back(fmt_.jsonMonitor_->getCSVString((int)i));
208  }
209  fmt_.monlock_.unlock();
210  for (unsigned int i = 0; i < nStreams_; i++) {
211  if (!CSVv[i].empty())
212  fmt_.jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
213  }
214  } else {
215  std::string CSV = fmt_.jsonMonitor_->getCSVString();
216  //release mutex before writing out fast path file
217  fmt_.monlock_.unlock();
218  if (!CSV.empty())
219  fmt_.jsonMonitor_->outputCSV(fastPath_, CSV);
220  }
221  }
222 
223  snapCounter_++;
224  }
225  ::sleep(sleepTime_);
226  }
227  }
228 
229  //the actual monitoring thread is held by a separate class object for ease of maintenance
232  std::vector<Encoding> encPath_;
233  FedRawDataInputSource* inputSource_ = nullptr;
234  std::atomic<FastMonitoringThread::InputState> inputState_{FastMonitoringThread::InputState::inInit};
235  std::atomic<FastMonitoringThread::InputState> inputSupervisorState_{FastMonitoringThread::InputState::inInit};
236 
237  unsigned int nStreams_;
238  unsigned int nThreads_;
240  unsigned int fastMonIntervals_;
241  unsigned int snapCounter_ = 0;
242  std::string microstateDefPath_, fastMicrostateDefPath_;
243  std::string fastName_, fastPath_, slowName_;
245 
246  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
247 
248  std::map<unsigned int, timeval> lumiStartTime_; //needed for multiplexed begin/end lumis
249  timeval fileLookStart_, fileLookStop_; //this could also be calculated in the input source
250 
251  unsigned int lastGlobalLumi_;
252  std::atomic<bool> isInitTransition_;
253  unsigned int lumiFromSource_;
254 
255  //global state
256  std::atomic<FastMonitoringThread::Macrostate> macrostate_;
257 
258  //per stream
259  std::vector<ContainableAtomic<const void*>> ministate_;
260  std::vector<ContainableAtomic<const void*>> microstate_;
261  std::vector<ContainableAtomic<const void*>> threadMicrostate_;
262 
263  //variables measuring source statistics (global)
264  //unordered_map is not used because of very few elements stored concurrently
265  std::map<unsigned int, double> avgLeadTime_;
266  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
267  //helpers for source statistics:
268  std::map<unsigned int, unsigned long> accuSize_;
269  std::vector<double> leadTimes_;
270  std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
271 
272  //for output module
273  std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
274 
275  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
276  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
277  std::vector<std::atomic<bool>*> streamCounterUpdating_;
278 
279  std::vector<unsigned long> firstEventId_;
280  std::vector<std::atomic<bool>*> collectedPathList_;
281  std::vector<ContainableAtomic<unsigned int>> eventCountForPathInit_;
282  std::vector<bool> pathNamesReady_;
283 
285 
286  bool threadIDAvailable_ = false;
287 
288  std::atomic<unsigned long> totalEventsProcessed_;
289 
295  bool pathLegendWritten_ = false;
296  unsigned int nOutputModules_ = 0;
297 
298  std::atomic<bool> monInit_;
299  bool exception_detected_ = false;
300  std::vector<unsigned int> exceptionInLS_;
301  std::vector<std::string> fastPathList_;
302  };
303 
304 } // namespace evf
305 
306 #endif
Definition: fillJson.h:27
ContainableAtomic(ContainableAtomic< T > const &iOther)
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
std::map< unsigned int, timeval > lumiStartTime_
void fillReserved(const void *add, unsigned int i)
std::string getRunDirName() const
std::map< unsigned int, unsigned long > accuSize_
void setInState(FastMonitoringThread::InputState inputState)
std::vector< std::atomic< bool > * > streamCounterUpdating_
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=0)
void setInStateSup(FastMonitoringThread::InputState inputState)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
ContainableAtomic< T > & operator=(const void *iValue)
Definition: Electron.h:6
std::vector< bool > pathNamesReady_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::vector< std::atomic< bool > * > collectedPathList_
std::unordered_map< const void *, int > quickReference_
std::map< unsigned int, double > avgLeadTime_
static const std::string nopath_
std::vector< ContainableAtomic< const void * > > threadMicrostate_
std::vector< unsigned long > firstEventId_
std::vector< std::string > fastPathList_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
void setInputSource(FedRawDataInputSource *inputSource)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
def ls(path, rec=False)
Definition: eostools.py:349
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::atomic< unsigned long > totalEventsProcessed_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
def load(fileName)
Definition: svgfig.py:547
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< double > leadTimes_
HLT enums.
boost::filesystem::path workingDirectory_
long double T
const void * decode(unsigned int index)
std::vector< Encoding > encPath_
edm::ModuleDescription * dummiesForReserved_
std::vector< unsigned int > exceptionInLS_