CMS 3D CMS Logo

EvFBuildingThrottle.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
3 
11 
12 #include <sys/statvfs.h>
13 #include <iostream>
14 #include <thread>
15 #include <mutex>
16 
17 namespace evf {
19  public:
20  enum Directory { mInvalid = 0, mBase, mBU, mCOUNT };
22  : highWaterMark_(pset.getUntrackedParameter<double>("highWaterMark", 0.8)),
23  lowWaterMark_(pset.getUntrackedParameter<double>("lowWaterMark", 0.5)),
25  whatToThrottleOn_(Directory(pset.getUntrackedParameter<int>("dirCode", mBase))),
27  sleep_(pset.getUntrackedParameter<unsigned int>("sleepmSecs", 1000)) {
31  }
33  void preBeginRun(edm::GlobalContext const& gc) {
34  //obtain directory to stat on
35  switch (whatToThrottleOn_) {
36  case mInvalid:
37  //do nothing
38  break;
39  case mBase:
40  baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
41  break;
42  case mBU:
43  baseDir_ = edm::Service<EvFDaqDirector>()->buBaseRunDir();
44  break;
45  default:
46  baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
47  }
48  start();
49  }
50  void postBeginRun(edm::GlobalContext const& gc) {}
51 
52  void postEndRun(edm::GlobalContext const& gc) { stop(); }
53  void preBeginLumi(edm::GlobalContext const& gc) {
54  lock_.lock();
55  lock_.unlock();
56  }
57  bool throttled() const { return throttled_; }
58 
59  private:
60  void dowork() {
62  struct statvfs buf;
63  while (!m_stoprequest) {
64  int retval = statvfs(baseDir_.c_str(), &buf);
65  if (retval != 0) {
66  std::cout << " building throttle - unable to stat " << baseDir_ << std::endl;
67  m_stoprequest = true;
68  continue;
69  }
70  double fraction = 1. - float(buf.f_bfree * buf.f_bsize) / float(buf.f_blocks * buf.f_frsize);
71  bool highwater_ = fraction > highWaterMark_;
72  bool lowwater_ = fraction < lowWaterMark_;
73  if (highwater_ && !throttled_) {
74  lock_.lock();
75  throttled_ = true;
76  std::cout << ">>>>throttling on " << std::endl;
77  }
78  if (lowwater_ && throttled_) {
79  lock_.unlock();
80  throttled_ = false;
81  }
82  std::cout << " building throttle on " << baseDir_ << " is " << fraction * 100 << " %full " << std::endl;
83  //edm::Service<EvFDaqDirector>()->writeDiskAndThrottleStat(fraction,highwater_,lowwater_);
84  ::usleep(sleep_ * 1000);
85  if (edm::shutdown_flag) {
86  std::cout << " Shutdown flag set: stop throttling" << std::endl;
87  break;
88  }
89  }
90  if (throttled_)
91  lock_.unlock();
92  }
93  void start() {
94  assert(!m_thread);
96  m_thread = std::make_shared<std::thread>(std::bind(&EvFBuildingThrottle::dowork, this));
97  std::cout << "throttle thread started - throttle on " << whatToThrottleOn_ << std::endl;
98  }
99  void stop() {
100  assert(m_thread);
101  m_stoprequest = true;
102  m_thread->join();
103  }
104 
107  std::atomic<bool> m_stoprequest;
108  std::shared_ptr<std::thread> m_thread;
114  unsigned int sleep_;
115  };
116 } // namespace evf
117 
118 #endif
Definition: fillJson.h:27
std::atomic< bool > m_stoprequest
static std::mutex mutex
Definition: Proxy.cc:8
volatile std::atomic< bool > shutdown_flag
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void preBeginLumi(edm::GlobalContext const &gc)
assert(be >=bs)
void postBeginRun(edm::GlobalContext const &gc)
static ServiceRegistry & instance()
EvFBuildingThrottle(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::shared_ptr< std::thread > m_thread
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void preBeginRun(edm::GlobalContext const &gc)
ServiceToken presentToken() const
void postEndRun(edm::GlobalContext const &gc)