CMS 3D CMS Logo

FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
13 
14 #include <filesystem>
15 
17 
18 #include <string>
19 #include <vector>
20 #include <map>
21 #include <queue>
22 #include <sstream>
23 #include <unordered_map>
24 #include "oneapi/tbb/task_arena.h"
25 #include "oneapi/tbb/task_scheduler_observer.h"
26 
27 /*Description
28  this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
29  A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available).
30  We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the
31  moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
32  Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
33  At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
34  The general counters and status variables (event number, number of processed events, number of passed and stored
35  events, luminosity section etc.) are also monitored here.
36 
37  N.B. MicroStateService is referenced by a common base class which is now trivial.
38  It's complete removal will be completed in the future commit.
39 */
40 
42 class DAQSource;
43 
44 namespace edm {
46 }
47 
48 namespace evf {
49 
50  template <typename T>
53  class ConcurrencyTracker;
54 
55  namespace FastMonState {
56 
57  enum Microstate {
58  mInvalid = 0,
73  };
74 
75  enum Macrostate {
76  sInit = 0,
89  };
90 
91  enum InputState : short {
92  inIgnore = 0,
110  //supervisor thread and worker threads state
125  //combined with inWaitInput
141  //combined with inWaitChunk
160  };
161  } // namespace FastMonState
162 
164  //reserve output module space
166 
168  public:
169  // the names of the states - some of them are never reached in an online app
173  // Reserved names for microstates
175  ~FastMonitoringService() override;
176  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
177 
180 
182  void jobFailure();
184 
186  void postBeginJob();
187  void postEndJob();
188 
193 
198  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
199  void preEvent(edm::StreamContext const&);
200  void postEvent(edm::StreamContext const&);
210  void setExceptionDetected(unsigned int ls);
211 
212  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
213  void startedLookingForFile();
214  void stoppedLookingForFile(unsigned int lumi);
215  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
216  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
217  bool getAbortFlagForLumi(unsigned int lumi);
218  bool exceptionDetected() const;
219  bool isExceptionOnData(unsigned int ls);
220  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
221  unsigned int processed = getEventsProcessedForLumi(lumi);
222  if (proc)
223  *proc = processed;
224  return !getAbortFlagForLumi(lumi);
225  }
226  std::string getRunDirName() const { return runDirectory_.stem().string(); }
227  void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
228  void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
229  void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
231  //available for other modules
233 
234  static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
235 
236  private:
237  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
238 
239  void snapshotRunner();
240 
241  static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
242 
243  static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
244 
245  //the actual monitoring thread is held by a separate class object for ease of maintenance
246  std::unique_ptr<FastMonitoringThread> fmt_;
247  std::unique_ptr<ConcurrencyTracker> ct_;
248  //Encoding encModule_;
249  //std::vector<Encoding> encPath_;
252  std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
253  std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
254 
255  unsigned int nStreams_ = 0;
256  unsigned int nMonThreads_ = 0;
257  unsigned int nThreads_ = 0;
261  unsigned int fastMonIntervals_;
262  unsigned int snapCounter_ = 0;
265 
266  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
267 
268  std::map<unsigned int, timeval> lumiStartTime_; //needed for multiplexed begin/end lumis
269  timeval fileLookStart_, fileLookStop_; //this could also be calculated in the input source
270 
271  unsigned int lastGlobalLumi_;
272  std::atomic<bool> isInitTransition_;
273  unsigned int lumiFromSource_;
274 
275  //variables measuring source statistics (global)
276  //unordered_map is not used because of very few elements stored concurrently
277  std::map<unsigned int, double> avgLeadTime_;
278  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
279  //helpers for source statistics:
280  std::map<unsigned int, unsigned long> accuSize_;
281  std::vector<double> leadTimes_;
282  std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
283 
284  //for output module
285  std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
286 
287  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
288  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
289  std::vector<std::atomic<bool>*> streamCounterUpdating_;
290 
292 
293  bool threadIDAvailable_ = false;
294 
295  std::atomic<unsigned long> totalEventsProcessed_;
296 
300  unsigned int nOutputModules_ = 0;
301 
302  std::atomic<bool> monInit_;
303  bool exception_detected_ = false;
304  std::atomic<bool> has_source_exception_ = false;
305  std::atomic<bool> has_data_exception_ = false;
306  std::vector<unsigned int> exceptionInLS_;
307 
308  //per stream
309  std::vector<ContainableAtomic<const void*>> microstate_;
310  std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
311  //per thread
312  std::vector<ContainableAtomic<const void*>> tmicrostate_;
313  std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
314 
315  bool verbose_ = false;
316  };
317 
318 } // namespace evf
319 
320 #endif
std::atomic< bool > has_data_exception_
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::atomic< FastMonState::InputState > inputState_
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
void setExceptionDetected(unsigned int ls)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
bool isExceptionOnData(unsigned int ls)
std::unique_ptr< FastMonitoringThread > fmt_
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
constexpr int nSpecialModules
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamEndLumi(edm::StreamContext const &)
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
std::atomic< FastMonState::InputState > inputSupervisorState_
StreamID const & streamID() const
Definition: StreamContext.h:55
std::filesystem::path runDirectory_
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
static unsigned int getSID(edm::StreamContext const &sc)
void setInputSource(FedRawDataInputSource *inputSource)
std::unique_ptr< ConcurrencyTracker > ct_
void setInputSource(DAQSource *inputSource)
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
void setInStateSup(FastMonState::InputState inputState)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
void setTMicrostate(FastMonState::Microstate m)
constexpr int nReservedModules
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void stoppedLookingForFile(unsigned int lumi)
static const std::string macroStateNames[FastMonState::MCOUNT]
static unsigned int getSID(edm::StreamID const &sid)
std::atomic< bool > has_source_exception_
HLT enums.
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
unsigned int value() const
Definition: StreamID.h:43
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=nullptr)
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void setInState(FastMonState::InputState inputState)
std::vector< ContainableAtomic< const void * > > tmicrostate_