CMS 3D CMS Logo

FastMonitoringService.h
Go to the documentation of this file.
1 #ifndef EvFFastMonitoringService_H
2 #define EvFFastMonitoringService_H 1
3 
13 
14 #include <filesystem>
15 
17 
18 #include <string>
19 #include <vector>
20 #include <map>
21 #include <queue>
22 #include <sstream>
23 #include <unordered_map>
24 #include "oneapi/tbb/task_arena.h"
25 #include "oneapi/tbb/task_scheduler_observer.h"
26 
27 /*Description
28  this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
29  A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available).
30  We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the
31  moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
32  Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
33  At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
34  The general counters and status variables (event number, number of processed events, number of passed and stored
35  events, luminosity section etc.) are also monitored here.
36 */
37 
39 class DAQSource;
40 
41 namespace edm {
43 }
44 
45 namespace evf {
46 
47  template <typename T>
50  class ConcurrencyTracker;
51 
52  namespace FastMonState {
53 
54  enum Microstate {
55  mInvalid = 0,
70  };
71 
72  enum Macrostate {
73  sInit = 0,
86  };
87 
88  enum InputState : short {
89  inIgnore = 0,
107  //supervisor thread and worker threads state
122  //combined with inWaitInput
138  //combined with inWaitChunk
157  };
158  } // namespace FastMonState
159 
161  //reserve output module space
163 
165  public:
166  // the names of the states - some of them are never reached in an online app
170  // Reserved names for microstates
173  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
174 
177 
179  void jobFailure();
181 
183  void postBeginJob();
184  void postEndJob();
185 
190 
195  void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
196  void preEvent(edm::StreamContext const&);
197  void postEvent(edm::StreamContext const&);
207  void setExceptionDetected(unsigned int ls);
208 
209  void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
210  void startedLookingForFile();
211  void stoppedLookingForFile(unsigned int lumi);
212  void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
213  unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
214  bool getAbortFlagForLumi(unsigned int lumi);
215  bool exceptionDetected() const;
216  bool isExceptionOnData(unsigned int ls);
217  bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
218  unsigned int processed = getEventsProcessedForLumi(lumi);
219  if (proc)
220  *proc = processed;
221  return !getAbortFlagForLumi(lumi);
222  }
223  std::string getRunDirName() const { return runDirectory_.stem().string(); }
224  void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
225  void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
226  void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
228  //available for other modules
230 
231  static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
232 
233  private:
234  void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
235 
236  void snapshotRunner();
237 
238  static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
239 
240  static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
241 
242  //the actual monitoring thread is held by a separate class object for ease of maintenance
243  std::unique_ptr<FastMonitoringThread> fmt_;
244  std::unique_ptr<ConcurrencyTracker> ct_;
245  //Encoding encModule_;
246  //std::vector<Encoding> encPath_;
249  std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
250  std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
251 
252  unsigned int nStreams_ = 0;
253  unsigned int nMonThreads_ = 0;
254  unsigned int nThreads_ = 0;
258  unsigned int fastMonIntervals_;
259  unsigned int snapCounter_ = 0;
262 
263  //variables that are used by/monitored by FastMonitoringThread / FastMonitor
264 
265  std::map<unsigned int, timeval> lumiStartTime_; //needed for multiplexed begin/end lumis
266  timeval fileLookStart_, fileLookStop_; //this could also be calculated in the input source
267 
268  unsigned int lastGlobalLumi_;
269  std::atomic<bool> isInitTransition_;
270  unsigned int lumiFromSource_;
271 
272  //variables measuring source statistics (global)
273  //unordered_map is not used because of very few elements stored concurrently
274  std::map<unsigned int, double> avgLeadTime_;
275  std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
276  //helpers for source statistics:
277  std::map<unsigned int, unsigned long> accuSize_;
278  std::vector<double> leadTimes_;
279  std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
280 
281  //for output module
282  std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
283 
284  //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
285  //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
286  std::vector<std::atomic<bool>*> streamCounterUpdating_;
287 
289 
290  bool threadIDAvailable_ = false;
291 
292  std::atomic<unsigned long> totalEventsProcessed_;
293 
297  unsigned int nOutputModules_ = 0;
298 
299  std::atomic<bool> monInit_;
300  bool exception_detected_ = false;
301  std::atomic<bool> has_source_exception_ = false;
302  std::atomic<bool> has_data_exception_ = false;
303  std::vector<unsigned int> exceptionInLS_;
304 
305  //per stream
306  std::vector<ContainableAtomic<const void*>> microstate_;
307  std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
308  //per thread
309  std::vector<ContainableAtomic<const void*>> tmicrostate_;
310  std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
311 
312  bool verbose_ = false;
313  };
314 
315 } // namespace evf
316 
317 #endif
std::atomic< bool > has_data_exception_
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::atomic< FastMonState::InputState > inputState_
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
void setExceptionDetected(unsigned int ls)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
bool isExceptionOnData(unsigned int ls)
std::unique_ptr< FastMonitoringThread > fmt_
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
constexpr int nSpecialModules
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamEndLumi(edm::StreamContext const &)
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::map< unsigned int, double > avgLeadTime_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
std::atomic< FastMonState::InputState > inputSupervisorState_
StreamID const & streamID() const
Definition: StreamContext.h:55
std::filesystem::path runDirectory_
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
static unsigned int getSID(edm::StreamContext const &sc)
void setInputSource(FedRawDataInputSource *inputSource)
std::unique_ptr< ConcurrencyTracker > ct_
void setInputSource(DAQSource *inputSource)
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
void setInStateSup(FastMonState::InputState inputState)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
void setTMicrostate(FastMonState::Microstate m)
constexpr int nReservedModules
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void stoppedLookingForFile(unsigned int lumi)
static const std::string macroStateNames[FastMonState::MCOUNT]
static unsigned int getSID(edm::StreamID const &sid)
std::atomic< bool > has_source_exception_
HLT enums.
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
unsigned int value() const
Definition: StreamID.h:43
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=nullptr)
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void setInState(FastMonState::InputState inputState)
std::vector< ContainableAtomic< const void * > > tmicrostate_