CMS 3D CMS Logo

FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
12 
13 #include <filesystem>
14 
16 
17 #include <string>
18 #include <vector>
19 #include <map>
20 #include <queue>
21 #include <sstream>
22 #include <unordered_map>
23 
24 /*Description
25  this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
26  A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available).
27  We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the
28  moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
29  Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
30  At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
31  Path names use a similar logic. However path names are not accessible in the same way as later so they need to be
32  when starting to run associated to the memory location of path name strings as accessible when path is executed.
33  Path intermediate info will be called "ministate" :D
34  The general counters and status variables (event number, number of processed events, number of passed and stored
35  events, luminosity section etc.) are also monitored here.
36 
37  N.B. MicroStateService is referenced by a common base class which is now trivial.
38  It's complete removal will be completed in the future commit.
39 */
40 
42 class DAQSource;
43 
44 namespace edm {
46 }
47 
48 namespace evf {
49 
50  class FastMonitoringThread;
51 
52  namespace FastMonState {
53 
54  enum Microstate {
55  mInvalid = 0,
66  };
67 
68  enum Macrostate {
69  sInit = 0,
82  };
83 
84  enum InputState : short {
85  inIgnore = 0,
103  //supervisor thread and worker threads state
118  //combined with inWaitInput
134  //combined with inWaitChunk
153  };
154  } // namespace FastMonState
155 
157  public:
158  // the names of the states - some of them are never reached in an online app
162  // Reserved names for microstates
163  static const std::string nopath_;
165  ~FastMonitoringService() override;
166  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
167 
171 
173  void jobFailure();
175 
177  void postBeginJob();
178  void postEndJob();
179 
184 
189  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
190  void preEvent(edm::StreamContext const&);
191  void postEvent(edm::StreamContext const&);
201  void setExceptionDetected(unsigned int ls);
202 
203  //this is still needed for use in special functions like DQM which are in turn framework services
206 
207  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
208  void startedLookingForFile();
209  void stoppedLookingForFile(unsigned int lumi);
210  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
211  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
212  bool getAbortFlagForLumi(unsigned int lumi);
213  bool exceptionDetected() const;
214  bool isExceptionOnData(unsigned int ls);
215  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
216  unsigned int processed = getEventsProcessedForLumi(lumi);
217  if (proc)
218  *proc = processed;
219  return !getAbortFlagForLumi(lumi);
220  }
221  std::string getRunDirName() const { return runDirectory_.stem().string(); }
222  void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
223  void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
224  void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
226 
227  private:
228  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
229 
230  void snapshotRunner();
231 
232  //the actual monitoring thread is held by a separate class object for ease of maintenance
233  std::shared_ptr<FastMonitoringThread> fmt_;
234  //Encoding encModule_;
235  //std::vector<Encoding> encPath_;
238  std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
239  std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
240 
241  unsigned int nStreams_;
242  unsigned int nThreads_;
244  unsigned int fastMonIntervals_;
245  unsigned int snapCounter_ = 0;
249 
250  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
251 
252  std::map<unsigned int, timeval> lumiStartTime_; //needed for multiplexed begin/end lumis
253  timeval fileLookStart_, fileLookStop_; //this could also be calculated in the input source
254 
255  unsigned int lastGlobalLumi_;
256  std::atomic<bool> isInitTransition_;
257  unsigned int lumiFromSource_;
258 
259  //variables measuring source statistics (global)
260  //unordered_map is not used because of very few elements stored concurrently
261  std::map<unsigned int, double> avgLeadTime_;
262  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
263  //helpers for source statistics:
264  std::map<unsigned int, unsigned long> accuSize_;
265  std::vector<double> leadTimes_;
266  std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
267 
268  //for output module
269  std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
270 
271  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
272  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
273  std::vector<std::atomic<bool>*> streamCounterUpdating_;
274 
275  std::vector<std::atomic<bool>*> collectedPathList_;
276  std::vector<bool> pathNamesReady_;
277 
279 
280  bool threadIDAvailable_ = false;
281 
282  std::atomic<unsigned long> totalEventsProcessed_;
283 
289  unsigned int nOutputModules_ = 0;
290 
291  std::atomic<bool> monInit_;
292  bool exception_detected_ = false;
293  std::atomic<bool> has_source_exception_ = false;
294  std::atomic<bool> has_data_exception_ = false;
295  std::vector<unsigned int> exceptionInLS_;
296  std::vector<std::string> fastPathList_;
297 
298  bool verbose_ = false;
299  };
300 
301 } // namespace evf
302 
303 #endif
std::atomic< bool > has_data_exception_
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::atomic< FastMonState::InputState > inputState_
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::atomic< bool > isInitTransition_
void setExceptionDetected(unsigned int ls)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
void setMicroState(FastMonState::Microstate)
bool isExceptionOnData(unsigned int ls)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::vector< bool > pathNamesReady_
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::vector< std::atomic< bool > * > collectedPathList_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
std::atomic< FastMonState::InputState > inputSupervisorState_
static const std::string nopath_
std::filesystem::path runDirectory_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< std::string > fastPathList_
void postStreamBeginLumi(edm::StreamContext const &)
void setInputSource(FedRawDataInputSource *inputSource)
void setInputSource(DAQSource *inputSource)
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
void setInStateSup(FastMonState::InputState inputState)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void stoppedLookingForFile(unsigned int lumi)
static const std::string macroStateNames[FastMonState::MCOUNT]
std::atomic< bool > has_source_exception_
HLT enums.
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
bool getAbortFlagForLumi(unsigned int lumi)
std::shared_ptr< FastMonitoringThread > fmt_
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=nullptr)
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void setInState(FastMonState::InputState inputState)