CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Vulture.cc
Go to the documentation of this file.
1 
5 
6 // to handle pt file descriptors left open at fork
7 #include "pt/PeerTransportReceiver.h"
8 #include "pt/PeerTransportAgent.h"
9 
10 #include "toolbox/task/Action.h"
11 #include "toolbox/task/WorkLoop.h"
12 #include "toolbox/task/WorkLoopFactory.h"
13 
14 #include <unistd.h>
15 #ifdef linux
16 #include <sys/prctl.h>
17 #endif
18 #include <signal.h>
19 #include <string.h>
20 #include <sys/wait.h>
21 #include <sys/time.h>
22 
23 #include <sys/stat.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <fstream>
28 
29 #ifdef __APPLE__
30 /* getline implementation is copied from glibc. */
31 
32 #ifndef SIZE_MAX
33 # define SIZE_MAX ((size_t) -1)
34 #endif
35 #ifndef SSIZE_MAX
36 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
37 #endif
38 
39 ssize_t getline (char **lineptr, size_t *n, FILE *fp)
40 {
41  ssize_t result;
42  size_t cur_len = 0;
43 
44  if (lineptr == NULL || n == NULL || fp == NULL)
45  {
46  errno = EINVAL;
47  return -1;
48  }
49 
50  if (*lineptr == NULL || *n == 0)
51  {
52  *n = 120;
53  *lineptr = (char *) malloc (*n);
54  if (*lineptr == NULL)
55  {
56  result = -1;
57  goto end;
58  }
59  }
60 
61  for (;;)
62  {
63  int i;
64 
65  i = getc (fp);
66  if (i == EOF)
67  {
68  result = -1;
69  break;
70  }
71 
72  /* Make enough space for len+1 (for final NUL) bytes. */
73  if (cur_len + 1 >= *n)
74  {
75  size_t needed_max =
76  SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
77  size_t needed = 2 * *n + 1; /* Be generous. */
78  char *new_lineptr;
79 
80  if (needed_max < needed)
81  needed = needed_max;
82  if (cur_len + 1 >= needed)
83  {
84  result = -1;
85  goto end;
86  }
87 
88  new_lineptr = (char *) realloc (*lineptr, needed);
89  if (new_lineptr == NULL)
90  {
91  result = -1;
92  goto end;
93  }
94 
95  *lineptr = new_lineptr;
96  *n = needed;
97  }
98 
99  (*lineptr)[cur_len] = i;
100  cur_len++;
101 
102  if (i == '\n')
103  break;
104  }
105  (*lineptr)[cur_len] = '\0';
106  result = cur_len ? (ssize_t) cur_len : result;
107 
108 end:
109  return result;
110 }
111 #endif
112 
113 namespace evf{
114 
115  const std::string Vulture::FS="/tmp";
116 
117  Vulture::Vulture(bool push)
118  : wlCtrl_(0)
119  , asCtrl_(0)
120  , running_(false)
121  , wlProwl_(0)
122  , asProwl_(0)
123  , prowling_(false)
124  , iDieUrl_("")
125  , updateMode_(push)
126  , vulturePid_(0)
127  , tmp_(0)
128  , newCores_(0)
129  , poster_(0)
130  , mq_(new MasterQueue(vulture_queue_offset))
131  , sq_(0) // this is only defined in the forked process
132  , started_(-1)
133  , stopped_(-1)
134  , handicapped_(false)
135  {
136  // create command file for gdb, if not already there
137  std::ifstream vulture("/tmp/vulture.cmd");
138  if (!vulture.good())
139  {
140  FILE *outf = fopen("/tmp/vulture.cmd","w");
141  fprintf(outf,"where\n");
142  fclose(outf);
143  }
144 
145  }
146 
148  {
149  delete mq_;
150  if(sq_ != 0) delete sq_;
151  if(poster_ != 0) delete poster_;
152  }
153 
155 
156  pid_t retval = fork();
157  if(retval==0){ // we are in the forked process
158  int success = -1;
159 // #ifdef linux
160 // success = prctl( PR_SET_DUMPABLE, 0 );
161 // #endif
162  if(success != 0){
163  std::cout << "Vulture::could not set process undumpable" << std::endl;
164  handicapped_ = true;
165  }
166 #ifdef linux
167  success = prctl( PR_SET_PDEATHSIG, SIGKILL );
168 #endif
169  if(success != 0){
170  std::cout << "Vulture::could not set process death signal" << std::endl;
171  handicapped_ = true;
172  }
173  tmp_ = opendir(FS.c_str());
174 #ifdef linux
175  success = prctl ( PR_SET_NAME , "vulture");
176 #endif
177  if(success != 0){
178  std::cout << "Vulture::could not set process name" << std::endl;
179  handicapped_ = true;
180  }
181 
182  try{
183  pt::PeerTransport * ptr =
184  pt::getPeerTransportAgent()->getPeerTransport("http","soap",pt::Receiver);
185  delete ptr;
186  }
187  catch (pt::exception::PeerTransportNotFound & e ){
188  //do nothing here since we don't know what to do... ?
189  }
190  // freopen("/dev/null","w",stderr);
192  // start the ctrl workloop
193  try {
194  wlCtrl_=
195  toolbox::task::getWorkLoopFactory()->getWorkLoop("Ctrll",
196  "waiting");
197  if (!wlCtrl_->isActive()) wlCtrl_->activate();
198 
199  asCtrl_ = toolbox::task::bind(this,&Vulture::control,
200  "Ctrl");
201  wlCtrl_->submit(asCtrl_);
202  }
203  catch (xcept::Exception& e) {
204  std::cout << "Vulture:constructor - could not start workloop 'Ctrl' for process " << retval << std::endl;
205  }
206  }
207  else{
208  vulturePid_ = retval;
209  }
210  return retval;
211 
212 
213  }
214 
216  if(started_<0){
218  try{
219  mq_->rcvNonBlocking(msg2);
220  started_ = 0;
221  }
222  catch(evf::Exception &e){
223  }
224  } else {started_ = 1;}
225  return started_;
226  }
227 
229  if(stopped_<0){
231  try{
232  mq_->rcvNonBlocking(msg2);
233  stopped_ = 0;
234  }
235  catch(evf::Exception &e){
236  }
237  } else {stopped_ = 1;}
238  return stopped_;
239  }
240 
242 
243  //communicate start-of-run to Vulture
244  vulture_start_message stamsg;
245  strcpy(stamsg.url_,url.c_str());
246  stamsg.run_ = run;
248  memcpy(msg1->mtext,&stamsg,sizeof(vulture_start_message));
249  mq_->post(msg1);
250  stopped_ = -1;
251  return vulturePid_;
252  }
253 
255  {
256 
258  mq_->post(msg1);
259  started_ = -1;
260  return vulturePid_;
261  }
262 
263  pid_t Vulture::kill() // eventually *could* be called by master app - it isn't now
264  {
265  ::kill (vulturePid_, SIGKILL);
266  int sl;
267  pid_t killedOrNot = waitpid(vulturePid_,&sl,WNOHANG);
268  vulturePid_ = 0;
269  return killedOrNot;
270  }
271 
273  {
274  timeval now;
275  gettimeofday(&now,0);
276  lastUpdate_ = now.tv_sec;
277  prowling_ = true;
278  try {
279  wlProwl_=
280  toolbox::task::getWorkLoopFactory()->getWorkLoop("Prowl",
281  "waiting");
282  if (!wlProwl_->isActive()) wlProwl_->activate();
283 
284  asProwl_ = toolbox::task::bind(this,&Vulture::prowling,
285  "Prowl");
286  wlProwl_->submit(asProwl_);
287  }
288  catch (xcept::Exception& e) {
289  std::string msg = "Failed to start workloop 'Prowl'.";
290  XCEPT_RETHROW(evf::Exception,msg,e);
291  }
292 
293  }
294 
295  bool Vulture::control(toolbox::task::WorkLoop*wl)
296  {
297 
298  MsgBuf msg;
299  unsigned long mtype = MSQM_MESSAGE_TYPE_NOP;
300  try{mtype = sq_->rcv(msg);}catch(evf::Exception &e){
301  std::cout << "Vulture::exception on msgrcv for control, bailing out of control workloop - good bye" << std::endl;
302  return false;
303  }
304  mtype = msg->mtype;
305  switch(mtype){
307  {
308 
309  vulture_start_message *sta = (vulture_start_message*)msg->mtext;
310  if(poster_ == 0) poster_ = new CurlPoster(sta->url_);
311  if(poster_->check(sta->run_)){
312  try{
313  startProwling();
314  MsgBuf msg1(0,MSQS_VULTURE_TYPE_ACK) ;
315  sq_->post(msg1);
316  }
317  catch(evf::Exception &e)
318  {
319  std::cout << "Vulture::start - exception in starting prowling workloop " << e.what() << std::endl;
320  //@EM ToDo generate some message here
321  }
322  }else{
323  std::cout << "Vulture::start - could not contact iDie - chech Url - will not start prowling loop" << std::endl;
324  prowling_ = false;
325  }
326 
327  break;
328  }
330  {
331  prowling_ = false;
332  break;
333  }
334  default:
335  {
336  // do nothing @EM ToDo generate an appropriate error message
337  }
338  }
339  return true;
340 
341  }
342 
343  bool Vulture::prowling(toolbox::task::WorkLoop*wl)
344  {
345 
346  if(!prowling_){
347  char messageDie[5];
348  sprintf(messageDie,"Dead");
349  if(poster_==0){
350  std::cout << "Vulture: asked to stop prowling but no poster "
351  << std::endl;
352  return false;
353  }
354  try{
355  poster_->postString(messageDie,5,0,CurlPoster::stack);
356  }
357  catch(evf::Exception &e){
358  //do nothing just swallow the exception
359  }
360  std::cout << "Received STOP message, going to delete poster " << std::endl;
361 // delete poster_;
362 // poster_=0;
363 
364  return false;
365  }
366 
367  newCores_ = 0;
368 
369  struct stat filestat;
370 
371  timeval now;
372  gettimeofday(&now,0);
373 
374  // examine /tmp looking for new coredumps
375  dirent *dirp;
376  while((dirp = readdir(tmp_))!=0){
377  if(strncmp(dirp->d_name,"core",4)==0){
378  stat(dirp->d_name,&filestat);
379  if(filestat.st_mtime > lastUpdate_){
380  currentCoreList_.push_back(dirp->d_name);
381  newCores_++;
382  }
383  }
384  }
385  rewinddir(tmp_);
386  lastUpdate_ = now.tv_sec;
387  try{
388  analyze();
389  }
390  catch(evf::Exception &e){
391  std::cout << "Vulture cannot send to iDie server, bail out " << std::endl;
392  return false;
393  }
394  ::sleep(60);
395  return true;
396  }
397 
399  {
400  // do a first analysis of the coredump
401  if(newCores_==0) return;
402  for(unsigned int i = currentCoreList_.size()-newCores_;
403  i < currentCoreList_.size();
404  i++){
405  std::string command = "gdb /opt/xdaq/bin/xdaq.exe -batch -x /tmp/vulture.cmd -c /tmp/";
406  std::string cmdout;
407  command += currentCoreList_[i];
408  std::string filePathAndName = FS + "/";
409  filePathAndName += currentCoreList_[i];
410  std::string pid =
411  currentCoreList_[i].substr(currentCoreList_[i].find_first_of(".")+1,
412  currentCoreList_[i].length());
413 
414  FILE *ps = popen(command.c_str(),"r");
415  size_t s = 256;
416  char *p=new char[s];
417  bool filter = false;
418  while(getline(&p,&s,ps) != -1){
419  if(strncmp("Core",p,4)==0) filter = true;
420  if(filter)cmdout += p;
421  }
422  delete[] p;
423  pclose(ps);
424  int errsv = 0;
425  int rch = chmod(filePathAndName.c_str(),0777);
426  if(rch != 0){
427  errsv = errno;
428  std::cout << "ERROR: couldn't change corefile access privileges -"
429  << strerror(errsv)<< std::endl;
430  }
431  unsigned int ipid = (unsigned int)atoi(pid.c_str());
432  poster_->postString(cmdout.c_str(),cmdout.length(),ipid, CurlPoster::stack);
433 
434  }
435  }
436 }
437 
bool check(int)
Definition: CurlPoster.cc:125
int stop()
Definition: Vulture.cc:254
char url_[VULTURE_START_MESSAGE_URL_SIZE]
Definition: Vulture.h:32
int i
Definition: DBlmapReader.cc:9
pid_t makeProcess()
Definition: Vulture.cc:154
toolbox::task::WorkLoop * wlProwl_
Definition: Vulture.h:63
#define MSQS_VULTURE_TYPE_ACK
Definition: queue_defs.h:35
virtual ~Vulture()
Definition: Vulture.cc:147
pid_t kill()
Definition: Vulture.cc:263
time_t lastUpdate_
Definition: Vulture.h:71
SlaveQueue * sq_
Definition: Vulture.h:75
static const std::string FS
Definition: Vulture.h:59
#define NULL
Definition: scimark2.h:8
bool prowling(toolbox::task::WorkLoop *)
Definition: Vulture.cc:343
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void postString(const char *, size_t, unsigned int, mode, const std::string &=standard_post_method_)
Definition: CurlPoster.cc:112
void sleep(Duration_t)
Definition: Utils.h:163
int post(MsgBuf &ptr)
Definition: SlaveQueue.cc:18
std::vector< std::string > currentCoreList_
Definition: Vulture.h:70
DIR * tmp_
Definition: Vulture.h:69
#define MSQM_MESSAGE_TYPE_NOP
Definition: queue_defs.h:15
toolbox::task::ActionSignature * asCtrl_
Definition: Vulture.h:61
#define SIZE_MAX
tuple result
Definition: query.py:137
int start(std::string, int=0)
Definition: Vulture.cc:241
toolbox::task::ActionSignature * asProwl_
Definition: Vulture.h:64
#define end
Definition: vmac.h:38
toolbox::task::WorkLoop * wlCtrl_
Definition: Vulture.h:60
bool control(toolbox::task::WorkLoop *)
Definition: Vulture.cc:295
unsigned long rcvNonBlocking(MsgBuf &ptr)
Definition: MasterQueue.cc:82
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool prowling_
Definition: Vulture.h:65
void startProwling()
Definition: Vulture.cc:272
void analyze()
Definition: Vulture.cc:398
CurlPoster * poster_
Definition: Vulture.h:73
#define SSIZE_MAX
#define MSQM_VULTURE_TYPE_STP
Definition: queue_defs.h:23
unsigned long rcv(MsgBuf &ptr)
Definition: SlaveQueue.cc:28
static const int vulture_queue_offset
Definition: Vulture.h:53
int hasStopped()
Definition: Vulture.cc:228
MasterQueue * mq_
Definition: Vulture.h:74
int hasStarted()
Definition: Vulture.cc:215
tuple cout
Definition: gather_cfg.py:121
Vulture(bool)
Definition: Vulture.cc:117
#define MSQM_VULTURE_TYPE_STA
Definition: queue_defs.h:22
unsigned int newCores_
Definition: Vulture.h:72
bool handicapped_
Definition: Vulture.h:78
int post(MsgBuf &ptr)
Definition: MasterQueue.cc:26
int stopped_
Definition: Vulture.h:77
int started_
Definition: Vulture.h:76
pid_t vulturePid_
Definition: Vulture.h:68