CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/EventFilter/Utilities/src/Vulture.cc

Go to the documentation of this file.
00001 
00002 #include "EventFilter/Utilities/interface/Vulture.h"
00003 #include "EventFilter/Utilities/interface/Exception.h"
00004 #include "EventFilter/Utilities/interface/CurlPoster.h"
00005 
00006 // to handle pt file descriptors left open at fork
00007 #include "pt/PeerTransportReceiver.h"
00008 #include "pt/PeerTransportAgent.h"
00009 
00010 #include "toolbox/task/Action.h"
00011 #include "toolbox/task/WorkLoop.h"
00012 #include "toolbox/task/WorkLoopFactory.h"
00013 
00014 #include <unistd.h>
00015 #ifdef linux
00016 #include <sys/prctl.h>
00017 #endif
00018 #include <signal.h>
00019 #include <string.h>
00020 #include <sys/wait.h>
00021 #include <sys/time.h>
00022 
00023 #include <sys/stat.h>
00024 #include <unistd.h>
00025 #include <stdlib.h>
00026 #include <stdio.h>
00027 #include <fstream>
00028 
00029 #ifdef __APPLE__
00030 /* getline implementation is copied from glibc. */
00031 
00032 #ifndef SIZE_MAX
00033 # define SIZE_MAX ((size_t) -1)
00034 #endif
00035 #ifndef SSIZE_MAX
00036 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
00037 #endif
00038 
00039 ssize_t getline (char **lineptr, size_t *n, FILE *fp)
00040 {
00041     ssize_t result;
00042     size_t cur_len = 0;
00043 
00044     if (lineptr == NULL || n == NULL || fp == NULL)
00045     {
00046         errno = EINVAL;
00047         return -1;
00048    }
00049 
00050     if (*lineptr == NULL || *n == 0)
00051     {
00052         *n = 120;
00053         *lineptr = (char *) malloc (*n);
00054         if (*lineptr == NULL)
00055         {
00056             result = -1;
00057             goto end;
00058         }
00059     }
00060 
00061     for (;;)
00062     {
00063         int i;
00064 
00065         i = getc (fp);
00066         if (i == EOF)
00067         {
00068             result = -1;
00069             break;
00070         }
00071 
00072         /* Make enough space for len+1 (for final NUL) bytes.  */
00073         if (cur_len + 1 >= *n)
00074         {
00075             size_t needed_max =
00076                 SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
00077             size_t needed = 2 * *n + 1;   /* Be generous. */
00078             char *new_lineptr;
00079 
00080             if (needed_max < needed)
00081                 needed = needed_max;
00082             if (cur_len + 1 >= needed)
00083             {
00084                 result = -1;
00085                 goto end;
00086             }
00087 
00088             new_lineptr = (char *) realloc (*lineptr, needed);
00089             if (new_lineptr == NULL)
00090             {
00091                 result = -1;
00092                 goto end;
00093             }
00094 
00095             *lineptr = new_lineptr;
00096             *n = needed;
00097         }
00098 
00099         (*lineptr)[cur_len] = i;
00100         cur_len++;
00101 
00102         if (i == '\n')
00103             break;
00104     }
00105     (*lineptr)[cur_len] = '\0';
00106     result = cur_len ? (ssize_t) cur_len : result;
00107 
00108 end:
00109     return result;
00110 }
00111 #endif
00112 
00113 namespace evf{
00114 
00115   const std::string Vulture::FS="/tmp";
00116 
00117   Vulture::Vulture(bool push) 
00118     : wlCtrl_(0)
00119     , asCtrl_(0)
00120     , running_(false)
00121     , wlProwl_(0)
00122     , asProwl_(0)
00123     , prowling_(false)
00124     , iDieUrl_("")
00125     , updateMode_(push)
00126     , vulturePid_(0)
00127     , tmp_(0)
00128     , newCores_(0)
00129     , poster_(0)
00130     , mq_(new MasterQueue(vulture_queue_offset))
00131     , sq_(0) // this is only defined in the forked process
00132     , started_(-1)
00133     , stopped_(-1)
00134     , handicapped_(false)
00135   {
00136     // create command file for gdb, if not already there
00137         std::ifstream vulture("/tmp/vulture.cmd");
00138         if (!vulture.good())
00139         {
00140                 FILE *outf = fopen("/tmp/vulture.cmd","w");
00141                 fprintf(outf,"where\n");
00142                 fclose(outf);
00143         }
00144 
00145   }
00146   
00147   Vulture::~Vulture()
00148   {
00149     delete mq_;
00150     if(sq_ != 0) delete sq_;
00151     if(poster_ != 0) delete poster_;
00152   }
00153 
00154   pid_t Vulture::makeProcess(){
00155 
00156     pid_t retval = fork();
00157     if(retval==0){ // we are in the forked process
00158       int success = -1;
00159 // #ifdef linux
00160 //       success = prctl( PR_SET_DUMPABLE, 0 );
00161 // #endif
00162       if(success != 0){
00163         std::cout << "Vulture::could not set process undumpable" << std::endl;
00164         handicapped_ = true;
00165       }
00166 #ifdef linux
00167       success = prctl( PR_SET_PDEATHSIG, SIGKILL );
00168 #endif
00169       if(success != 0){
00170         std::cout << "Vulture::could not set process death signal" << std::endl;
00171         handicapped_ = true;    
00172       }
00173       tmp_ = opendir(FS.c_str());
00174 #ifdef linux
00175       success = prctl ( PR_SET_NAME , "vulture");
00176 #endif
00177       if(success != 0){
00178         std::cout << "Vulture::could not set process name" << std::endl;
00179         handicapped_ = true;    
00180       }
00181 
00182       try{
00183         pt::PeerTransport * ptr =
00184           pt::getPeerTransportAgent()->getPeerTransport("http","soap",pt::Receiver);
00185         delete ptr;
00186       }
00187       catch (pt::exception::PeerTransportNotFound & e ){
00188         //do nothing here since we don't know what to do... ?
00189       }
00190       //      freopen("/dev/null","w",stderr);
00191       sq_ = new SlaveQueue(vulture_queue_offset);
00192       // start the ctrl workloop
00193       try {
00194         wlCtrl_=
00195           toolbox::task::getWorkLoopFactory()->getWorkLoop("Ctrll",
00196                                                            "waiting");
00197         if (!wlCtrl_->isActive()) wlCtrl_->activate();
00198         
00199         asCtrl_ = toolbox::task::bind(this,&Vulture::control,
00200                                        "Ctrl");
00201         wlCtrl_->submit(asCtrl_);
00202       }
00203       catch (xcept::Exception& e) {
00204         std::cout << "Vulture:constructor - could not start workloop 'Ctrl' for process " << retval << std::endl;
00205       }
00206     }
00207     else{
00208       vulturePid_ = retval;
00209     }
00210     return retval;
00211 
00212 
00213   }
00214 
00215   int Vulture::hasStarted(){
00216     if(started_<0){
00217       MsgBuf msg2(MAX_MSG_SIZE,MSQS_VULTURE_TYPE_ACK);
00218       try{
00219         mq_->rcvNonBlocking(msg2);
00220         started_ = 0;
00221       }
00222       catch(evf::Exception &e){
00223       }
00224     } else {started_ = 1;}
00225     return started_;    
00226   }
00227 
00228   int Vulture::hasStopped(){
00229     if(stopped_<0){
00230       MsgBuf msg2(MAX_MSG_SIZE,MSQS_VULTURE_TYPE_ACK);
00231       try{
00232         mq_->rcvNonBlocking(msg2);
00233         stopped_ = 0;
00234       }
00235       catch(evf::Exception &e){
00236       }
00237     } else {stopped_ = 1;}
00238     return stopped_;    
00239   }
00240 
00241   pid_t Vulture::start(std::string url, int run){
00242 
00243     //communicate start-of-run to Vulture
00244     vulture_start_message stamsg;
00245     strcpy(stamsg.url_,url.c_str()); 
00246     stamsg.run_ = run;
00247     MsgBuf msg1(sizeof(vulture_start_message),MSQM_VULTURE_TYPE_STA);
00248     memcpy(msg1->mtext,&stamsg,sizeof(vulture_start_message));
00249     mq_->post(msg1);
00250     stopped_ = -1;
00251     return vulturePid_;
00252   }
00253   
00254   pid_t Vulture::stop()
00255   {
00256 
00257     MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQM_VULTURE_TYPE_STP);
00258     mq_->post(msg1);
00259     started_ = -1;
00260     return vulturePid_;
00261   }
00262 
00263  pid_t Vulture::kill() // eventually *could* be called by master app - it isn't now
00264   {
00265     ::kill (vulturePid_, SIGKILL);
00266     int sl;
00267     pid_t killedOrNot = waitpid(vulturePid_,&sl,WNOHANG);
00268     vulturePid_ = 0;
00269     return killedOrNot;
00270   }
00271 
00272   void Vulture::startProwling()
00273   {
00274     timeval now;
00275     gettimeofday(&now,0);
00276     lastUpdate_ = now.tv_sec;
00277     prowling_ = true;
00278     try {
00279       wlProwl_=
00280         toolbox::task::getWorkLoopFactory()->getWorkLoop("Prowl",
00281                                                          "waiting");
00282       if (!wlProwl_->isActive()) wlProwl_->activate();
00283       
00284       asProwl_ = toolbox::task::bind(this,&Vulture::prowling,
00285                                          "Prowl");
00286       wlProwl_->submit(asProwl_);
00287     }
00288     catch (xcept::Exception& e) {
00289       std::string msg = "Failed to start workloop 'Prowl'.";
00290       XCEPT_RETHROW(evf::Exception,msg,e);
00291     }
00292 
00293   }
00294 
00295   bool Vulture::control(toolbox::task::WorkLoop*wl)
00296   {
00297 
00298     MsgBuf msg;
00299     unsigned long mtype = MSQM_MESSAGE_TYPE_NOP;
00300     try{mtype = sq_->rcv(msg);}catch(evf::Exception &e){
00301       std::cout << "Vulture::exception on msgrcv for control, bailing out of control workloop - good bye" << std::endl;
00302       return false;
00303     }
00304     mtype = msg->mtype;
00305     switch(mtype){
00306     case MSQM_VULTURE_TYPE_STA:
00307       {
00308 
00309         vulture_start_message *sta = (vulture_start_message*)msg->mtext;
00310         if(poster_ == 0) poster_ = new CurlPoster(sta->url_);
00311         if(poster_->check(sta->run_)){
00312           try{
00313             startProwling();
00314             MsgBuf msg1(0,MSQS_VULTURE_TYPE_ACK) ;
00315             sq_->post(msg1);
00316           }
00317           catch(evf::Exception &e)
00318             {
00319               std::cout << "Vulture::start - exception in starting prowling workloop " << e.what() << std::endl;
00320               //@EM ToDo generate some message here
00321             }     
00322         }else{
00323           std::cout << "Vulture::start - could not contact iDie - chech Url - will not start prowling loop" << std::endl;
00324           prowling_ = false;
00325         }
00326       
00327         break;
00328       }
00329     case MSQM_VULTURE_TYPE_STP:
00330       {
00331         prowling_ = false;
00332         break;
00333       }
00334     default:
00335       {
00336         // do nothing @EM ToDo generate an appropriate error message
00337       }
00338     }
00339     return true;
00340     
00341   }
00342 
00343   bool Vulture::prowling(toolbox::task::WorkLoop*wl)
00344   {
00345 
00346     if(!prowling_){
00347       char messageDie[5];
00348       sprintf(messageDie,"Dead");
00349       if(poster_==0){
00350         std::cout << "Vulture: asked to stop prowling but no poster " 
00351                   << std::endl;
00352         return false;
00353       }
00354       try{
00355         poster_->postString(messageDie,5,0,CurlPoster::stack);
00356       }
00357       catch(evf::Exception &e){
00358           //do nothing just swallow the exception
00359       }
00360       std::cout << "Received STOP message, going to delete poster " << std::endl;
00361 //       delete poster_;
00362 //       poster_=0;
00363       
00364       return false;
00365     }
00366     
00367     newCores_ = 0;
00368     
00369     struct stat filestat;    
00370     
00371     timeval now;
00372     gettimeofday(&now,0);
00373     
00374     // examine /tmp looking for new coredumps
00375     dirent *dirp;
00376     while((dirp = readdir(tmp_))!=0){
00377       if(strncmp(dirp->d_name,"core",4)==0){
00378         stat(dirp->d_name,&filestat);
00379         if(filestat.st_mtime > lastUpdate_){
00380           currentCoreList_.push_back(dirp->d_name);
00381           newCores_++;
00382         }
00383       }
00384     }
00385     rewinddir(tmp_);
00386     lastUpdate_ = now.tv_sec;
00387     try{
00388       analyze();
00389     }
00390     catch(evf::Exception &e){
00391       std::cout << "Vulture cannot send to iDie server, bail out " << std::endl;
00392       return false;
00393     }
00394     ::sleep(60);
00395     return true;
00396   }
00397 
00398   void Vulture::analyze()
00399   {
00400     // do a first analysis of the coredump
00401     if(newCores_==0) return;
00402     for(unsigned int i = currentCoreList_.size()-newCores_; 
00403         i < currentCoreList_.size();
00404         i++){
00405       std::string command = "gdb /opt/xdaq/bin/xdaq.exe -batch -x /tmp/vulture.cmd -c /tmp/";
00406       std::string cmdout;
00407       command += currentCoreList_[i];
00408       std::string filePathAndName = FS + "/";
00409       filePathAndName += currentCoreList_[i];
00410       std::string pid = 
00411         currentCoreList_[i].substr(currentCoreList_[i].find_first_of(".")+1,
00412                                    currentCoreList_[i].length());
00413 
00414       FILE *ps = popen(command.c_str(),"r");
00415       size_t s = 256;
00416       char *p=new char[s];
00417       bool filter = false;
00418       while(getline(&p,&s,ps) != -1){
00419         if(strncmp("Core",p,4)==0) filter = true;
00420         if(filter)cmdout += p;
00421       }
00422       delete[] p;
00423       pclose(ps);
00424       int errsv = 0;
00425       int rch = chmod(filePathAndName.c_str(),0777);
00426       if(rch != 0){
00427         errsv = errno;
00428         std::cout << "ERROR: couldn't change corefile access privileges -" 
00429                   << strerror(errsv)<< std::endl;
00430       }
00431       unsigned int ipid = (unsigned int)atoi(pid.c_str());
00432       poster_->postString(cmdout.c_str(),cmdout.length(),ipid, CurlPoster::stack); 
00433       
00434     }
00435   }
00436 }
00437