CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ZombieKillerService.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Services
4 // Class : ZombieKillerService
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Sat, 22 Mar 2014 16:25:47 GMT
11 //
12 
13 // system include files
14 #include <atomic>
15 #include <thread>
16 #include <mutex>
17 #include <condition_variable>
18 #include <exception>
19 
20 // user include files
25 
26 
27 namespace edm {
29  public:
31 
32  static void fillDescriptions(ConfigurationDescriptions& descriptions);
33 
34  private:
35  const unsigned int m_checkThreshold;
36  const unsigned int m_secsBetweenChecks;
37  std::thread m_watchingThread;
38  std::condition_variable m_jobDoneCondition;
40  bool m_jobDone;
41  std::atomic<bool> m_stillAlive;
42  std::atomic<unsigned int> m_numberChecksWhenNotAlive;
43 
44 
45  void notAZombieYet();
46  void checkForZombie();
47  void startThread();
48  void stopThread();
49  };
50 }
51 
52 using namespace edm;
53 
54 inline
56  return true;
57 }
58 
59 
60 //
61 // constants, enums and typedefs
62 //
63 
64 //
65 // static data member definitions
66 //
67 
68 //
69 // constructors and destructor
70 //
72 m_checkThreshold(iPSet.getUntrackedParameter<unsigned int>("numberOfAllowedFailedChecksInARow")),
73 m_secsBetweenChecks(iPSet.getUntrackedParameter<unsigned int>("secondsBetweenChecks")),
74 m_jobDone(false),
75 m_stillAlive(true),
76 m_numberChecksWhenNotAlive(0)
77 {
78  iRegistry.watchPostBeginJob([this](){ startThread(); } );
79  iRegistry.watchPostEndJob([this]() {stopThread(); } );
80 
81  iRegistry.watchPreSourceRun([this](){notAZombieYet();});
82  iRegistry.watchPostSourceRun([this](){notAZombieYet();});
83 
84  iRegistry.watchPreSourceLumi([this](){notAZombieYet();});
85  iRegistry.watchPostSourceLumi([this](){notAZombieYet();});
86 
87  iRegistry.watchPreSourceEvent([this](StreamID){notAZombieYet();});
88  iRegistry.watchPostSourceEvent([this](StreamID){notAZombieYet();});
89 
90  iRegistry.watchPreModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
91  iRegistry.watchPostModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
92 
93  iRegistry.watchPreModuleEndStream([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
94  iRegistry.watchPostModuleEndStream([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
95 
96  iRegistry.watchPreModuleEndJob([this](ModuleDescription const&) {notAZombieYet();});
97  iRegistry.watchPostModuleEndJob([this](ModuleDescription const&) {notAZombieYet();});
98  iRegistry.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
99  iRegistry.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
100 
103 
104  iRegistry.watchPreModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
105  iRegistry.watchPostModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
106 
109 
110  iRegistry.watchPreModuleStreamEndLumi([this](StreamContext const&, ModuleCallingContext const&){notAZombieYet();});
112 
115 
116  iRegistry.watchPreModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&){notAZombieYet();});
117  iRegistry.watchPostModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&){notAZombieYet();});
118 
121 
122  iRegistry.watchPreModuleGlobalEndLumi([this](GlobalContext const&, ModuleCallingContext const&){notAZombieYet();});
124 
125 
126 }
127 
128 // ZombieKillerService::ZombieKillerService(const ZombieKillerService& rhs)
129 // {
130 // // do actual copying here;
131 // }
132 
133 //ZombieKillerService::~ZombieKillerService()
134 //{
135 //}
136 
137 //
138 // assignment operators
139 //
140 // const ZombieKillerService& ZombieKillerService::operator=(const ZombieKillerService& rhs)
141 // {
142 // //An exception safe implementation is
143 // ZombieKillerService temp(rhs);
144 // swap(rhs);
145 //
146 // return *this;
147 // }
148 
149 //
150 // member functions
151 //
152 void
155  m_stillAlive = true;
156 }
157 
158 void
160  if (not m_stillAlive) {
163  edm::LogError("JobStuck")<<"Too long since the job has last made progress.";
164  std::terminate();
165  } else {
166  edm::LogWarning("JobProgressing")<<"It has been "<<m_numberChecksWhenNotAlive*m_secsBetweenChecks<<" seconds since job seen progressing";
167  }
168  }
169  m_stillAlive = false;
170 }
171 
172 void
174  m_watchingThread = std::thread([this]() {
175 
176  std::unique_lock<std::mutex> lock(m_jobDoneMutex);
177  while(not m_jobDoneCondition.wait_for(lock,
179  [this]()->bool
180  {
181  return m_jobDone;
182  }))
183  {
184  //we timed out
185  checkForZombie();
186  }
187  });
188 }
189 
190 void
191 ZombieKillerService::stopThread() {
192  {
193  std::lock_guard<std::mutex> guard(m_jobDoneMutex);
194  m_jobDone=true;
195  }
196  m_jobDoneCondition.notify_all();
197  m_watchingThread.join();
198 }
199 
200 void
203  desc.addUntracked<unsigned int>("secondsBetweenChecks", 60)->setComment("Number of seconds to wait between checking if progress has been made.");
204  desc.addUntracked<unsigned int>("numberOfAllowedFailedChecksInARow", 3)->setComment("Number of allowed checks in a row with no progress.");
205  descriptions.add("ZombieKillerService", desc);
206 }
207 
208 //
209 // const member functions
210 //
211 
212 //
213 // static member functions
214 //
215 
void watchPostModuleGlobalEndLumi(PostModuleGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleGlobalBeginRun(PreModuleGlobalBeginRun::slot_type const &iSlot)
static void fillDescriptions(ConfigurationDescriptions &descriptions)
double seconds()
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
static boost::mutex mutex
Definition: LHEProxy.cc:11
void watchPostModuleEndStream(PostModuleEndStream::slot_type const &iSlot)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
const unsigned int m_checkThreshold
void watchPostModuleGlobalBeginLumi(PostModuleGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleStreamEndLumi(PostModuleStreamEndLumi::slot_type const &iSlot)
void watchPostModuleStreamBeginRun(PostModuleStreamBeginRun::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void watchPreModuleBeginStream(PreModuleBeginStream::slot_type const &iSlot)
void watchPreModuleGlobalEndRun(PreModuleGlobalEndRun::slot_type const &iSlot)
void watchPostSourceRun(PostSourceRun::slot_type const &iSlot)
void watchPreSourceLumi(PreSourceLumi::slot_type const &iSlot)
void watchPreModuleEndJob(PreModuleEndJob::slot_type const &iSlot)
void watchPreSourceRun(PreSourceRun::slot_type const &iSlot)
void watchPreModuleGlobalBeginLumi(PreModuleGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleStreamEndRun(PostModuleStreamEndRun::slot_type const &iSlot)
void watchPreModuleStreamBeginLumi(PreModuleStreamBeginLumi::slot_type const &iSlot)
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:113
void watchPostModuleBeginStream(PostModuleBeginStream::slot_type const &iSlot)
bool isProcessWideService(ZombieKillerService const *)
std::atomic< unsigned int > m_numberChecksWhenNotAlive
void watchPostSourceLumi(PostSourceLumi::slot_type const &iSlot)
void watchPostModuleGlobalEndRun(PostModuleGlobalEndRun::slot_type const &iSlot)
void watchPostModuleStreamBeginLumi(PostModuleStreamBeginLumi::slot_type const &iSlot)
void watchPreModuleStreamEndLumi(PreModuleStreamEndLumi::slot_type const &iSlot)
void watchPreModuleStreamBeginRun(PreModuleStreamBeginRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreModuleEndStream(PreModuleEndStream::slot_type const &iSlot)
const unsigned int m_secsBetweenChecks
std::atomic< bool > m_stillAlive
void watchPreModuleStreamEndRun(PreModuleStreamEndRun::slot_type const &iSlot)
ZombieKillerService(edm::ParameterSet const &, edm::ActivityRegistry &)
void watchPostModuleGlobalBeginRun(PostModuleGlobalBeginRun::slot_type const &iSlot)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag false
void watchPostModuleEndJob(PostModuleEndJob::slot_type const &iSlot)
void watchPreModuleGlobalEndLumi(PreModuleGlobalEndLumi::slot_type const &iSlot)
std::condition_variable m_jobDoneCondition
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal