CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.cc
Go to the documentation of this file.
2 #include <iostream>
3 
5 #include <iomanip>
6 
8 #include "EvFDaqDirector.h"
9 
10 namespace evf{
11 
13  {"Init","JobReady","RunGiven","Running",
14  "Stopping","Done","JobEnded","Error","ErrorEnded","End",
15  "Invalid"};
16 
18 
20  edm::ActivityRegistry& reg) :
21  MicroStateService(iPS,reg)
22  ,encModule_(33)
23  ,encPath_(0)
24  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
25  //,rootDirectory_(iPS.getUntrackedParameter<string>("rootDirectory", "/data"))
26  ,microstateDefPath_(iPS.getUntrackedParameter<string>("microstateDefPath", "/tmp/def.jsd"))
27  ,outputDefPath_(iPS.getUntrackedParameter<string>("outputDefPath", "/tmp/def.jsd"))
28  ,fastName_(iPS.getUntrackedParameter<string>("fastName", "states"))
29  ,slowName_(iPS.getUntrackedParameter<string>("slowName", "lumi"))
30  {
35  fmt_.m_data.accuSize_ = 0;
49 
53  for(unsigned int i = 0; i < (mCOUNT); i++)
55  encPath_.update((void*)&nopath_);
57 
58  fmt_.m_data.macrostateJ_.setName("Macrostate");
59  fmt_.m_data.ministateJ_.setName("Ministate");
60  fmt_.m_data.microstateJ_.setName("Microstate");
61  fmt_.m_data.processedJ_.setName("Processed");
62  fmt_.m_data.throughputJ_.setName("Throughput");
63  fmt_.m_data.avgLeadTimeJ_.setName("AverageLeadTime");
64  fmt_.m_data.filesProcessedDuringLumi_.setName("FilesProcessed");
65  vector<JsonMonitorable*> monParams;
66  monParams.push_back(&fmt_.m_data.macrostateJ_);
67  monParams.push_back(&fmt_.m_data.ministateJ_);
68  monParams.push_back(&fmt_.m_data.microstateJ_);
69  monParams.push_back(&fmt_.m_data.processedJ_);
70  monParams.push_back(&fmt_.m_data.throughputJ_);
71  monParams.push_back(&fmt_.m_data.avgLeadTimeJ_);
72  monParams.push_back(&fmt_.m_data.filesProcessedDuringLumi_);
73 
74  // The run dir should be set via the configuration
75  // For now, just grab the latest run directory available
76 
77  // FIND RUN DIRECTORY
78  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->findHighestRunDir());
79  workingDirectory_ = runDirectory_ = runDirectory;
80  workingDirectory_ /= "mon";
81 
82  bool foundMonDir = false;
83  if ( boost::filesystem::is_directory(workingDirectory_))
84  foundMonDir=true;
85  if (!foundMonDir) {
86  std::cout << "<MON> DIR NOT FOUND!" << std::endl;
87  boost::filesystem::create_directories(workingDirectory_);
88  }
89 
90  std::ostringstream fastFileName;
91 
92  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
94  fast /= fastFileName.str();
95  fastPath_ = fast.string();
96 
97  /*
98  * initialize the fast monitor with:
99  * vector of pointers to monitorable parameters
100  * path to definition
101  *
102  */
103  std::cout
104  << "FastMonitoringService: initializing FastMonitor with microstate def path: "
106  << encPath_.current_ + 1 << " " << encModule_.current_ + 1
107  << std::endl;
108 
109  fmt_.m_data.jsonMonitor_.reset(
110  new FastMonitor(monParams, microstateDefPath_));
111 
113  }
114 
115 
117  {
118  }
119 
121  {
122  //build a map of modules keyed by their module description address
123  //here we need to treat output modules in a special way so they can be easily singled out
124  if(desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EventStreamFileWriter" ||
125  desc.moduleName() == "PoolOutputModule")
126  encModule_.updateReserved((void*)&desc);
127  else
128  encModule_.update((void*)&desc);
129  }
130 
132  {
133  //bonus track, now monitoring path execution too...
134  // here we are forced to use string keys...
135  std::cout << ">>>>>>>>>>>>>>>>>>>>>>>>>>update path map with " << pathName << std::endl;
136  encPath_.update((void*)&pathName);
137  }
138 
140  {
141  std::cout << "path legenda*****************" << std::endl;
142  std::cout << makePathLegenda() << std::endl;
144  }
145 
147  {
148  //bonus track, now monitoring path execution too...
150  }
151 
153 
154  std::ostringstream ost;
155  for(int i = 0;
156  i < encPath_.current_;
157  i++)
158  ost<<i<<"="<<*((std::string *)(encPath_.decode(i)))<<" ";
159  return ost.str();
160  }
161 
163 
164  std::ostringstream ost;
165  for(int i = 0;
167  i++)
168  {
169  // std::cout << "for i = " << i << std::endl;
170  ost<<i<<"="<<((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()<<" ";
171  }
172  return ost.str();
173  }
174 
176  {
177  // boost::mutex::scoped_lock sl(lock_);
178  std::cout << "path legenda*****************" << std::endl;
179  std::cout << makePathLegenda() << std::endl;
180  std::cout << "module legenda***************" << std::endl;
181  std::cout << makeModuleLegenda() << std::endl;
183  }
184 
186  {
187  // boost::mutex::scoped_lock sl(lock_);
189  fmt_.stop();
190  }
191 
193  {
194  std::cout << "FastMonitoringService: Pre-begin LUMI: " << iID.luminosityBlock() << std::endl;
195  fmt_.monlock_.lock();
196  fmt_.m_data.lumisection_ = (unsigned int) iID.luminosityBlock();
197  gettimeofday(&lumiStartTime_, 0);
198  fmt_.monlock_.unlock();
199  }
200 
202  {
203  std::cout << "FastMonitoringService: LUMI: " << iID.luminosityBlock() << " ended! Writing JSON information..." << std::endl;
204  fmt_.monlock_.lock();
205  gettimeofday(&lumiStopTime_, 0);
206 
207  // Compute throughput
208  unsigned int secondsForLumi = lumiStopTime_.tv_sec - lumiStartTime_.tv_sec;
209  fmt_.m_data.throughputJ_.value() = double(fmt_.m_data.accuSize_) / double(secondsForLumi) / double(1024*1024);
210 
211  std::cout
212  << ">>> >>> FastMonitoringService: processed event count for this lumi = "
213  << fmt_.m_data.processedJ_.value() << " time = " << secondsForLumi
214  << " size = " << fmt_.m_data.accuSize_ << " thr = " << fmt_.m_data.throughputJ_.value() << std::endl;
215  fmt_.m_data.jsonMonitor_->snap(true, fastPath_);
216  // create file name for slow monitoring file
217  std::stringstream slowFileName;
218  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
219  << fmt_.m_data.lumisection_ << "_pid" << std::setfill('0')
220  << std::setw(5) << getpid() << ".jsn";
222  slow /= slowFileName.str();
223  fmt_.m_data.jsonMonitor_->outputFullHistoDataPoint(slow.string());
225 
226  fmt_.m_data.processedJ_ = 0;
227  fmt_.m_data.accuSize_ = 0;
231  leadTimes_.clear();
232  fmt_.monlock_.unlock();
233  }
234 
236  const edm::Timestamp& iTime)
237  {
238  // boost::mutex::scoped_lock sl(lock_);
239  }
240 
242  {
243  // boost::mutex::scoped_lock sl(lock_);
245  fmt_.monlock_.lock();
247  fmt_.monlock_.unlock();
248  }
249 
251  {
252  // boost::mutex::scoped_lock sl(lock_);
254  }
255 
257  {
258  // boost::mutex::scoped_lock sl(lock_);
260  }
261 
263  {
264  // boost::mutex::scoped_lock sl(lock_);
265  fmt_.m_data.microstate_ = &desc;
266  }
267 
269  {
270  // boost::mutex::scoped_lock sl(lock_);
271  fmt_.m_data.microstate_ = &desc;
272  }
274  {
275  // boost::mutex::scoped_lock sl(lock_);
277  }
279  {
280  // boost::mutex::scoped_lock sl(lock_);
282  }
283 
284  void FastMonitoringService::accummulateFileSize(unsigned long fileSize) {
285  fmt_.monlock_.lock();
286  //std::cout << "--> ACCUMMULATING size: " << fileSize << std::endl;
287  fmt_.m_data.accuSize_ += fileSize;
289  fmt_.monlock_.unlock();
290  }
291 
293  gettimeofday(&fileLookStart_, 0);
294  /*
295  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
296  << fileLookStart_.tv_usec / 1000.0 << std::endl;
297  */
298  }
299 
301  gettimeofday(&fileLookStop_, 0);
302  /*
303  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
304  << fileLookStop_.tv_usec / 1000.0 << std::endl;
305  */
306  double elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000.0; // sec to ms
307  elapsedTime += (fileLookStop_.tv_usec - fileLookStart_.tv_usec) / 1000.0; // us to ms
308  // add this to lead times for this lumi
309  leadTimes_.push_back(elapsedTime);
310 
311  // recompute average lead time for this lumi
312  if (leadTimes_.size() == 1) fmt_.m_data.avgLeadTimeJ_ = leadTimes_[0];
313  else {
314  double totTime = 0;
315  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
316  fmt_.m_data.avgLeadTimeJ_ = totTime / leadTimes_.size();
317  }
318  }
319 
320  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi) {
321  return processedEventsPerLumi_[lumi];
322 }
323 
324 } //end namespace evf
325 
void watchPostBeginRun(PostBeginRun::slot_type const &iSlot)
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
void preBeginLumi(edm::LuminosityBlockID const &iID, edm::Timestamp const &iTime)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
tuple lumi
Definition: fjr2json.py:35
void preEventProcessing(const edm::EventID &, const edm::Timestamp &)
void watchPostModule(PostModule::slot_type const &iSlot)
void watchPreProcessEvent(PreProcessEvent::slot_type const &iSlot)
void watchPreEndLumi(PreEndLumi::slot_type const &iSlot)
std::string const & moduleName() const
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
std::unordered_map< unsigned int, int > processedEventsPerLumi_
void watchPreModule(PreModule::slot_type const &iSlot)
void prePathBeginRun(const std::string &pathName)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void preEndLumi(edm::LuminosityBlockID const &iID, edm::Timestamp const &iTime)
void watchPostProcessEvent(PostProcessEvent::slot_type const &iSlot)
void watchPrePathBeginRun(PrePathBeginRun::slot_type const &iSlot)
boost::shared_ptr< FastMonitor > jsonMonitor_
static const std::string nopath_
void postBeginRun(edm::Run const &, edm::EventSetup const &)
static const std::string macroStateNames[FastMonitoringThread::MCOUNT]
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void preProcessPath(const std::string &pathName)
void preModule(const edm::ModuleDescription &)
LuminosityBlockNumber_t luminosityBlock() const
std::vector< double > leadTimes_
void postModule(const edm::ModuleDescription &)
void preModuleBeginJob(const edm::ModuleDescription &desc)
boost::filesystem::path workingDirectory_
void watchPreBeginLumi(PreBeginLumi::slot_type const &iSlot)
tuple cout
Definition: gather_cfg.py:121
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
const void * decode(unsigned int index)
void postEventProcessing(const edm::Event &, const edm::EventSetup &)
void accummulateFileSize(unsigned long fileSize)
void watchPreProcessPath(PreProcessPath::slot_type const &iSlot)
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
Definition: Run.h:41
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal