CMS 3D CMS Logo

FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
12 
13 #include <filesystem>
14 
16 
17 #include <string>
18 #include <vector>
19 #include <map>
20 #include <queue>
21 #include <sstream>
22 #include <unordered_map>
23 
24 /*Description
25  this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
26  A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available).
27  We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the
28  moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
29  Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
30  At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
31  Path names use a similar logic. However path names are not accessible in the same way as later so they need to be
32  when starting to run associated to the memory location of path name strings as accessible when path is executed.
33  Path intermediate info will be called "ministate" :D
34  The general counters and status variables (event number, number of processed events, number of passed and stored
35  events, luminosity section etc.) are also monitored here.
36 
37  N.B. MicroStateService is referenced by a common base class which is now trivial.
38  It's complete removal will be completed in the future commit.
39 */
40 
42 
43 namespace edm {
45 }
46 
47 namespace evf {
48 
49  class FastMonitoringThread;
50 
51  namespace FastMonState {
52 
53  enum Microstate {
54  mInvalid = 0,
65  };
66 
67  enum Macrostate {
68  sInit = 0,
81  };
82 
83  enum InputState : short {
84  inIgnore = 0,
102  //supervisor thread and worker threads state
117  //combined with inWaitInput
133  //combined with inWaitChunk
152  };
153  } // namespace FastMonState
154 
156  public:
157  // the names of the states - some of them are never reached in an online app
161  // Reserved names for microstates
162  static const std::string nopath_;
164  ~FastMonitoringService() override;
165  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
166 
170 
172  void jobFailure();
174 
176  void postBeginJob();
177  void postEndJob();
178 
183 
188  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
189  void preEvent(edm::StreamContext const&);
190  void postEvent(edm::StreamContext const&);
200  void setExceptionDetected(unsigned int ls);
201 
202  //this is still needed for use in special functions like DQM which are in turn framework services
205 
206  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
207  void startedLookingForFile();
208  void stoppedLookingForFile(unsigned int lumi);
209  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
210  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
211  bool getAbortFlagForLumi(unsigned int lumi);
212  bool exceptionDetected() const;
213  bool isExceptionOnData(unsigned int ls);
214  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
215  unsigned int processed = getEventsProcessedForLumi(lumi);
216  if (proc)
217  *proc = processed;
218  return !getAbortFlagForLumi(lumi);
219  }
220  std::string getRunDirName() const { return runDirectory_.stem().string(); }
221  void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
222  void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
224 
225  private:
226  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
227 
228  void snapshotRunner();
229 
230  //the actual monitoring thread is held by a separate class object for ease of maintenance
231  std::shared_ptr<FastMonitoringThread> fmt_;
232  //Encoding encModule_;
233  //std::vector<Encoding> encPath_;
235  std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
236  std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
237 
238  unsigned int nStreams_;
239  unsigned int nThreads_;
241  unsigned int fastMonIntervals_;
242  unsigned int snapCounter_ = 0;
246 
247  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
248 
249  std::map<unsigned int, timeval> lumiStartTime_; //needed for multiplexed begin/end lumis
250  timeval fileLookStart_, fileLookStop_; //this could also be calculated in the input source
251 
252  unsigned int lastGlobalLumi_;
253  std::atomic<bool> isInitTransition_;
254  unsigned int lumiFromSource_;
255 
256  //variables measuring source statistics (global)
257  //unordered_map is not used because of very few elements stored concurrently
258  std::map<unsigned int, double> avgLeadTime_;
259  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
260  //helpers for source statistics:
261  std::map<unsigned int, unsigned long> accuSize_;
262  std::vector<double> leadTimes_;
263  std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
264 
265  //for output module
266  std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
267 
268  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
269  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
270  std::vector<std::atomic<bool>*> streamCounterUpdating_;
271 
272  std::vector<std::atomic<bool>*> collectedPathList_;
273  std::vector<bool> pathNamesReady_;
274 
276 
277  bool threadIDAvailable_ = false;
278 
279  std::atomic<unsigned long> totalEventsProcessed_;
280 
286  unsigned int nOutputModules_ = 0;
287 
288  std::atomic<bool> monInit_;
289  bool exception_detected_ = false;
290  std::atomic<bool> has_source_exception_ = false;
291  std::atomic<bool> has_data_exception_ = false;
292  std::vector<unsigned int> exceptionInLS_;
293  std::vector<std::string> fastPathList_;
294  };
295 
296 } // namespace evf
297 
298 #endif
std::atomic< bool > has_data_exception_
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::atomic< FastMonState::InputState > inputState_
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::atomic< bool > isInitTransition_
void setExceptionDetected(unsigned int ls)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
void setMicroState(FastMonState::Microstate)
bool isExceptionOnData(unsigned int ls)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::vector< bool > pathNamesReady_
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::vector< std::atomic< bool > * > collectedPathList_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
std::atomic< FastMonState::InputState > inputSupervisorState_
static const std::string nopath_
std::filesystem::path runDirectory_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< std::string > fastPathList_
void postStreamBeginLumi(edm::StreamContext const &)
void setInputSource(FedRawDataInputSource *inputSource)
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
void setInStateSup(FastMonState::InputState inputState)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void stoppedLookingForFile(unsigned int lumi)
static const std::string macroStateNames[FastMonState::MCOUNT]
std::atomic< bool > has_source_exception_
HLT enums.
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
bool getAbortFlagForLumi(unsigned int lumi)
std::shared_ptr< FastMonitoringThread > fmt_
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=nullptr)
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void setInState(FastMonState::InputState inputState)