CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
12 
13 #include <filesystem>
14 
16 
17 #include <string>
18 #include <vector>
19 #include <map>
20 #include <queue>
21 #include <sstream>
22 #include <unordered_map>
23 
24 /*Description
25  this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
26  A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available).
27  We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the
28  moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
29  Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
30  At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
31  Path names use a similar logic. However path names are not accessible in the same way as later so they need to be
32  when starting to run associated to the memory location of path name strings as accessible when path is executed.
33  Path intermediate info will be called "ministate" :D
34  The general counters and status variables (event number, number of processed events, number of passed and stored
35  events, luminosity section etc.) are also monitored here.
36 
37  N.B. MicroStateService is referenced by a common base class which is now trivial.
38  It's complete removal will be completed in the future commit.
39 */
40 
42 
43 namespace edm {
45 }
46 
47 namespace evf {
48 
49  class FastMonitoringThread;
50 
51  namespace FastMonState {
52 
53  enum Microstate {
54  mInvalid = 0,
65  };
66 
67  enum Macrostate {
68  sInit = 0,
81  };
82 
83  enum InputState : short {
84  inIgnore = 0,
102  //supervisor thread and worker threads state
117  //combined with inWaitInput
133  //combined with inWaitChunk
152  };
153  } // namespace FastMonState
154 
156  public:
157  // the names of the states - some of them are never reached in an online app
161  // Reserved names for microstates
162  static const std::string nopath_;
164  ~FastMonitoringService() override;
165  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
166 
170 
172  void jobFailure();
174 
176  void postBeginJob();
177  void postEndJob();
178 
183 
188  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
189  void preEvent(edm::StreamContext const&);
190  void postEvent(edm::StreamContext const&);
200  void setExceptionDetected(unsigned int ls);
201 
202  //this is still needed for use in special functions like DQM which are in turn framework services
205 
206  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
207  void startedLookingForFile();
208  void stoppedLookingForFile(unsigned int lumi);
209  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
210  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
211  bool getAbortFlagForLumi(unsigned int lumi);
212  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
213  unsigned int processed = getEventsProcessedForLumi(lumi);
214  if (proc)
215  *proc = processed;
216  return !getAbortFlagForLumi(lumi);
217  }
218  std::string getRunDirName() const { return runDirectory_.stem().string(); }
219  void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
220  void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
222 
223  private:
224  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
225 
226  void snapshotRunner();
227 
228  //the actual monitoring thread is held by a separate class object for ease of maintenance
229  std::shared_ptr<FastMonitoringThread> fmt_;
230  //Encoding encModule_;
231  //std::vector<Encoding> encPath_;
233  std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
234  std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
235 
236  unsigned int nStreams_;
237  unsigned int nThreads_;
239  unsigned int fastMonIntervals_;
240  unsigned int snapCounter_ = 0;
244 
245  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
246 
247  std::map<unsigned int, timeval> lumiStartTime_; //needed for multiplexed begin/end lumis
248  timeval fileLookStart_, fileLookStop_; //this could also be calculated in the input source
249 
250  unsigned int lastGlobalLumi_;
251  std::atomic<bool> isInitTransition_;
252  unsigned int lumiFromSource_;
253 
254  //variables measuring source statistics (global)
255  //unordered_map is not used because of very few elements stored concurrently
256  std::map<unsigned int, double> avgLeadTime_;
257  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
258  //helpers for source statistics:
259  std::map<unsigned int, unsigned long> accuSize_;
260  std::vector<double> leadTimes_;
261  std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
262 
263  //for output module
264  std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
265 
266  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
267  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
268  std::vector<std::atomic<bool>*> streamCounterUpdating_;
269 
270  std::vector<std::atomic<bool>*> collectedPathList_;
271  std::vector<bool> pathNamesReady_;
272 
274 
275  bool threadIDAvailable_ = false;
276 
277  std::atomic<unsigned long> totalEventsProcessed_;
278 
284  unsigned int nOutputModules_ = 0;
285 
286  std::atomic<bool> monInit_;
287  bool exception_detected_ = false;
288  std::vector<unsigned int> exceptionInLS_;
289  std::vector<std::string> fastPathList_;
290  };
291 
292 } // namespace evf
293 
294 #endif
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::atomic< FastMonState::InputState > inputState_
static const std::string inputStateNames[FastMonState::inCOUNT]
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::atomic< bool > isInitTransition_
void setExceptionDetected(unsigned int ls)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
std::string getRunDirName() const
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
def ls
Definition: eostools.py:349
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
void setMicroState(FastMonState::Microstate)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::vector< bool > pathNamesReady_
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::vector< std::atomic< bool > * > collectedPathList_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
std::atomic< FastMonState::InputState > inputSupervisorState_
static const std::string nopath_
std::filesystem::path runDirectory_
list lumi
Definition: dqmdumpme.py:53
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< std::string > fastPathList_
void postStreamBeginLumi(edm::StreamContext const &)
void setInputSource(FedRawDataInputSource *inputSource)
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
void setInStateSup(FastMonState::InputState inputState)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void stoppedLookingForFile(unsigned int lumi)
static const std::string macroStateNames[FastMonState::MCOUNT]
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
bool getAbortFlagForLumi(unsigned int lumi)
std::shared_ptr< FastMonitoringThread > fmt_
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=nullptr)
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void setInState(FastMonState::InputState inputState)