CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EvFBuildingThrottle.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
3 
10 
11 #include <sys/statvfs.h>
12 
13 #include "boost/thread/thread.hpp"
14 
15 
16 namespace evf{
18  {
19  public:
20  enum Directory { mInvalid = 0, mBase, mBU, mCOUNT};
22  edm::ActivityRegistry& reg )
23  : highWaterMark_(pset.getUntrackedParameter<double>("highWaterMark",0.8))
24  , lowWaterMark_(pset.getUntrackedParameter<double>("lowWaterMark",0.5))
26  , whatToThrottleOn_(Directory(pset.getUntrackedParameter<int>("dirCode",mBase)))
27  , throttled_(false)
28  , sleep_( pset.getUntrackedParameter<unsigned int>("sleepmSecs",1000))
29  {
33  }
36  //obtain directory to stat on
37  switch(whatToThrottleOn_){
38  case mInvalid:
39  //do nothing
40  break;
41  case mBase:
42  baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
43  break;
44  case mBU:
45  baseDir_ = edm::Service<EvFDaqDirector>()->buBaseRunDir();
46  break;
47  default:
48  baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
49  }
50  start();
51  }
53  }
54 
55  void postEndRun(edm::GlobalContext const& gc){
56  stop();
57  }
59  lock_.lock();
60  lock_.unlock();
61  }
62  bool throttled() const {return throttled_;}
63  private:
64  void dowork(){
66  struct statvfs buf;
67  while(!m_stoprequest){
68  int retval = statvfs(baseDir_.c_str(),&buf);
69  if(retval != 0){
70  std::cout << " building throttle - unable to stat " << baseDir_ << std::endl;
71  m_stoprequest=true;
72  continue;
73  }
74  double fraction = 1.-float(buf.f_bfree*buf.f_bsize)/float(buf.f_blocks*buf.f_frsize);
75  bool highwater_ = fraction>highWaterMark_;
76  bool lowwater_ = fraction<lowWaterMark_;
77  if(highwater_ && !throttled_){ lock_.lock(); throttled_ = true;std::cout << ">>>>throttling on " << std::endl;}
78  if(lowwater_ && throttled_){ lock_.unlock(); throttled_ = false;}
79  std::cout << " building throttle on " << baseDir_ << " is " << fraction*100 << " %full " << std::endl;
80  //edm::Service<EvFDaqDirector>()->writeDiskAndThrottleStat(fraction,highwater_,lowwater_);
81  ::usleep(sleep_*1000);
82  }
83  }
84  void start(){
85  assert(!m_thread);
87  m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&EvFBuildingThrottle::dowork,this)));
88  std::cout << "throttle thread started - throttle on " << whatToThrottleOn_ << std::endl;
89  }
90  void stop(){
92  m_stoprequest=true;
93  m_thread->join();
94  }
95 
97  double lowWaterMark_;
98  volatile bool m_stoprequest;
99  boost::shared_ptr<boost::thread> m_thread;
105  unsigned int sleep_;
106  };
107 }
108 
109 #endif
110 
static boost::mutex mutex
Definition: LHEProxy.cc:11
assert(m_qm.get())
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void preBeginLumi(edm::GlobalContext const &gc)
ServiceToken presentToken() const
void postBeginRun(edm::GlobalContext const &gc)
static ServiceRegistry & instance()
boost::shared_ptr< boost::thread > m_thread
EvFBuildingThrottle(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void preBeginRun(edm::GlobalContext const &gc)
tuple cout
Definition: gather_cfg.py:145
volatile std::atomic< bool > shutdown_flag false
void postEndRun(edm::GlobalContext const &gc)