CMS 3D CMS Logo

EvFBuildingThrottle.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
3 
11 
12 #include <sys/statvfs.h>
13 #include <atomic>
14 
15 #include "boost/thread/thread.hpp"
16 
17 
18 namespace evf{
20  {
21  public:
22  enum Directory { mInvalid = 0, mBase, mBU, mCOUNT};
24  edm::ActivityRegistry& reg )
25  : highWaterMark_(pset.getUntrackedParameter<double>("highWaterMark",0.8))
26  , lowWaterMark_(pset.getUntrackedParameter<double>("lowWaterMark",0.5))
28  , whatToThrottleOn_(Directory(pset.getUntrackedParameter<int>("dirCode",mBase)))
29  , throttled_(false)
30  , sleep_( pset.getUntrackedParameter<unsigned int>("sleepmSecs",1000))
31  {
35  }
38  //obtain directory to stat on
39  switch(whatToThrottleOn_){
40  case mInvalid:
41  //do nothing
42  break;
43  case mBase:
44  baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
45  break;
46  case mBU:
47  baseDir_ = edm::Service<EvFDaqDirector>()->buBaseRunDir();
48  break;
49  default:
50  baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
51  }
52  start();
53  }
55  }
56 
57  void postEndRun(edm::GlobalContext const& gc){
58  stop();
59  }
61  lock_.lock();
62  lock_.unlock();
63  }
64  bool throttled() const {return throttled_;}
65  private:
66  void dowork(){
68  struct statvfs buf;
69  while(!m_stoprequest){
70  int retval = statvfs(baseDir_.c_str(),&buf);
71  if(retval != 0){
72  std::cout << " building throttle - unable to stat " << baseDir_ << std::endl;
73  m_stoprequest=true;
74  continue;
75  }
76  double fraction = 1.-float(buf.f_bfree*buf.f_bsize)/float(buf.f_blocks*buf.f_frsize);
77  bool highwater_ = fraction>highWaterMark_;
78  bool lowwater_ = fraction<lowWaterMark_;
79  if(highwater_ && !throttled_){ lock_.lock(); throttled_ = true;std::cout << ">>>>throttling on " << std::endl;}
80  if(lowwater_ && throttled_){ lock_.unlock(); throttled_ = false;}
81  std::cout << " building throttle on " << baseDir_ << " is " << fraction*100 << " %full " << std::endl;
82  //edm::Service<EvFDaqDirector>()->writeDiskAndThrottleStat(fraction,highwater_,lowwater_);
83  ::usleep(sleep_*1000);
84  if (edm::shutdown_flag) {
85  std::cout << " Shutdown flag set: stop throttling" << std::endl;
86  break;
87  }
88  }
89  if (throttled_) lock_.unlock();
90  }
91  void start(){
92  assert(!m_thread);
94  m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&EvFBuildingThrottle::dowork,this)));
95  std::cout << "throttle thread started - throttle on " << whatToThrottleOn_ << std::endl;
96  }
97  void stop(){
98  assert(m_thread);
99  m_stoprequest=true;
100  m_thread->join();
101  }
102 
105  std::atomic<bool> m_stoprequest;
106  boost::shared_ptr<boost::thread> m_thread;
112  unsigned int sleep_;
113  };
114 }
115 
116 #endif
117 
static boost::mutex mutex
Definition: Proxy.cc:11
std::atomic< bool > m_stoprequest
volatile std::atomic< bool > shutdown_flag
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void preBeginLumi(edm::GlobalContext const &gc)
ServiceToken presentToken() const
void postBeginRun(edm::GlobalContext const &gc)
static ServiceRegistry & instance()
boost::shared_ptr< boost::thread > m_thread
EvFBuildingThrottle(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void preBeginRun(edm::GlobalContext const &gc)
void postEndRun(edm::GlobalContext const &gc)
def operate(timelog, memlog, json_f, num)