CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/Utilities/src/Vulture.cc

Go to the documentation of this file.
00001 
00002 #include "EventFilter/Utilities/interface/Vulture.h"
00003 #include "EventFilter/Utilities/interface/Exception.h"
00004 #include "EventFilter/Utilities/interface/CurlPoster.h"
00005 
00006 // to handle pt file descriptors left open at fork
00007 #include "pt/PeerTransportReceiver.h"
00008 #include "pt/PeerTransportAgent.h"
00009 
00010 #include "toolbox/task/Action.h"
00011 #include "toolbox/task/WorkLoop.h"
00012 #include "toolbox/task/WorkLoopFactory.h"
00013 
00014 #include <unistd.h>
00015 #ifdef linux
00016 #include <sys/prctl.h>
00017 #endif
00018 #include <signal.h>
00019 #include <string.h>
00020 #include <sys/wait.h>
00021 #include <sys/time.h>
00022 
00023 #include <sys/stat.h>
00024 #include <unistd.h>
00025 #include <stdlib.h>
00026 #include <stdio.h>
00027 
00028 #ifdef __APPLE__
00029 /* getline implementation is copied from glibc. */
00030 
00031 #ifndef SIZE_MAX
00032 # define SIZE_MAX ((size_t) -1)
00033 #endif
00034 #ifndef SSIZE_MAX
00035 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
00036 #endif
00037 
00038 ssize_t getline (char **lineptr, size_t *n, FILE *fp)
00039 {
00040     ssize_t result;
00041     size_t cur_len = 0;
00042 
00043     if (lineptr == NULL || n == NULL || fp == NULL)
00044     {
00045         errno = EINVAL;
00046         return -1;
00047    }
00048 
00049     if (*lineptr == NULL || *n == 0)
00050     {
00051         *n = 120;
00052         *lineptr = (char *) malloc (*n);
00053         if (*lineptr == NULL)
00054         {
00055             result = -1;
00056             goto end;
00057         }
00058     }
00059 
00060     for (;;)
00061     {
00062         int i;
00063 
00064         i = getc (fp);
00065         if (i == EOF)
00066         {
00067             result = -1;
00068             break;
00069         }
00070 
00071         /* Make enough space for len+1 (for final NUL) bytes.  */
00072         if (cur_len + 1 >= *n)
00073         {
00074             size_t needed_max =
00075                 SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
00076             size_t needed = 2 * *n + 1;   /* Be generous. */
00077             char *new_lineptr;
00078 
00079             if (needed_max < needed)
00080                 needed = needed_max;
00081             if (cur_len + 1 >= needed)
00082             {
00083                 result = -1;
00084                 goto end;
00085             }
00086 
00087             new_lineptr = (char *) realloc (*lineptr, needed);
00088             if (new_lineptr == NULL)
00089             {
00090                 result = -1;
00091                 goto end;
00092             }
00093 
00094             *lineptr = new_lineptr;
00095             *n = needed;
00096         }
00097 
00098         (*lineptr)[cur_len] = i;
00099         cur_len++;
00100 
00101         if (i == '\n')
00102             break;
00103     }
00104     (*lineptr)[cur_len] = '\0';
00105     result = cur_len ? (ssize_t) cur_len : result;
00106 
00107 end:
00108     return result;
00109 }
00110 #endif
00111 
00112 namespace evf{
00113 
00114   const std::string Vulture::FS="/tmp";
00115 
00116   Vulture::Vulture(bool push) 
00117     : wlCtrl_(0)
00118     , asCtrl_(0)
00119     , running_(false)
00120     , wlProwl_(0)
00121     , asProwl_(0)
00122     , prowling_(false)
00123     , iDieUrl_("")
00124     , updateMode_(push)
00125     , vulturePid_(0)
00126     , tmp_(0)
00127     , newCores_(0)
00128     , poster_(0)
00129     , mq_(new MasterQueue(vulture_queue_offset))
00130     , sq_(0) // this is only defined in the forked process
00131     , started_(-1)
00132     , stopped_(-1)
00133     , handicapped_(false)
00134   {
00135     // create command file for gdb
00136     FILE *outf = fopen("/tmp/vulture.cmd","w");
00137     fprintf(outf,"where\n");
00138     fclose(outf);
00139   }
00140   
00141   Vulture::~Vulture()
00142   {
00143     delete mq_;
00144     if(sq_ != 0) delete sq_;
00145     if(poster_ != 0) delete poster_;
00146   }
00147 
00148   pid_t Vulture::makeProcess(){
00149 
00150     pid_t retval = fork();
00151     if(retval==0){ // we are in the forked process
00152       int success = -1;
00153 #ifdef linux
00154       success = prctl( PR_SET_DUMPABLE, 0 );
00155 #endif
00156       if(success != 0){
00157         std::cout << "Vulture::could not set process undumpable" << std::endl;
00158         handicapped_ = true;
00159       }
00160 #ifdef linux
00161       success = prctl( PR_SET_PDEATHSIG, SIGKILL );
00162 #endif
00163       if(success != 0){
00164         std::cout << "Vulture::could not set process death signal" << std::endl;
00165         handicapped_ = true;    
00166       }
00167       tmp_ = opendir(FS.c_str());
00168 #ifdef linux
00169       success = prctl ( PR_SET_NAME , "vulture");
00170 #endif
00171       if(success != 0){
00172         std::cout << "Vulture::could not set process name" << std::endl;
00173         handicapped_ = true;    
00174       }
00175 
00176       try{
00177         pt::PeerTransport * ptr =
00178           pt::getPeerTransportAgent()->getPeerTransport("https","soap",pt::Receiver);
00179         delete ptr;
00180       }
00181       catch (pt::exception::PeerTransportNotFound & e ){
00182         //do nothing here since we don't know what to do... ?
00183       }
00184       //      freopen("/dev/null","w",stderr);
00185       sq_ = new SlaveQueue(vulture_queue_offset);
00186       // start the ctrl workloop
00187       try {
00188         wlCtrl_=
00189           toolbox::task::getWorkLoopFactory()->getWorkLoop("Ctrll",
00190                                                            "waiting");
00191         if (!wlCtrl_->isActive()) wlCtrl_->activate();
00192         
00193         asCtrl_ = toolbox::task::bind(this,&Vulture::control,
00194                                        "Ctrl");
00195         wlCtrl_->submit(asCtrl_);
00196       }
00197       catch (xcept::Exception& e) {
00198         std::cout << "Vulture:constructor - could not start workloop 'Ctrl' for process " << retval << std::endl;
00199       }
00200     }
00201     else{
00202       vulturePid_ = retval;
00203     }
00204     return retval;
00205 
00206 
00207   }
00208 
00209   int Vulture::hasStarted(){
00210     if(started_<0){
00211       MsgBuf msg2(MAX_MSG_SIZE,MSQS_VULTURE_TYPE_ACK);
00212       try{
00213         mq_->rcvNonBlocking(msg2);
00214         started_ = 0;
00215       }
00216       catch(evf::Exception &e){
00217       }
00218     } else {started_ = 1;}
00219     return started_;    
00220   }
00221 
00222   int Vulture::hasStopped(){
00223     if(stopped_<0){
00224       MsgBuf msg2(MAX_MSG_SIZE,MSQS_VULTURE_TYPE_ACK);
00225       try{
00226         mq_->rcvNonBlocking(msg2);
00227         stopped_ = 0;
00228       }
00229       catch(evf::Exception &e){
00230       }
00231     } else {stopped_ = 1;}
00232     return stopped_;    
00233   }
00234 
00235   pid_t Vulture::start(std::string url, int run){
00236 
00237     //communicate start-of-run to Vulture
00238     vulture_start_message stamsg;
00239     strcpy(stamsg.url_,url.c_str()); 
00240     stamsg.run_ = run;
00241     MsgBuf msg1(sizeof(vulture_start_message),MSQM_VULTURE_TYPE_STA);
00242     memcpy(msg1->mtext,&stamsg,sizeof(vulture_start_message));
00243     mq_->post(msg1);
00244     stopped_ = -1;
00245     return vulturePid_;
00246   }
00247   
00248   pid_t Vulture::stop()
00249   {
00250 
00251     MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQM_VULTURE_TYPE_STP);
00252     mq_->post(msg1);
00253     started_ = -1;
00254     return vulturePid_;
00255   }
00256 
00257  pid_t Vulture::kill() // eventually *could* be called by master app - it isn't now
00258   {
00259     ::kill (vulturePid_, SIGKILL);
00260     int sl;
00261     pid_t killedOrNot = waitpid(vulturePid_,&sl,WNOHANG);
00262     vulturePid_ = 0;
00263     return killedOrNot;
00264   }
00265 
00266   void Vulture::startProwling()
00267   {
00268     timeval now;
00269     gettimeofday(&now,0);
00270     lastUpdate_ = now.tv_sec;
00271     prowling_ = true;
00272     try {
00273       wlProwl_=
00274         toolbox::task::getWorkLoopFactory()->getWorkLoop("Prowl",
00275                                                          "waiting");
00276       if (!wlProwl_->isActive()) wlProwl_->activate();
00277       
00278       asProwl_ = toolbox::task::bind(this,&Vulture::prowling,
00279                                          "Prowl");
00280       wlProwl_->submit(asProwl_);
00281     }
00282     catch (xcept::Exception& e) {
00283       std::string msg = "Failed to start workloop 'Prowl'.";
00284       XCEPT_RETHROW(evf::Exception,msg,e);
00285     }
00286 
00287   }
00288 
00289   bool Vulture::control(toolbox::task::WorkLoop*wl)
00290   {
00291 
00292     MsgBuf msg;
00293     unsigned long mtype = MSQM_MESSAGE_TYPE_NOP;
00294     try{mtype = sq_->rcv(msg);}catch(evf::Exception &e){
00295       std::cout << "Vulture::exception on msgrcv for control, bailing out of control workloop - good bye" << std::endl;
00296       return false;
00297     }
00298     mtype = msg->mtype;
00299     switch(mtype){
00300     case MSQM_VULTURE_TYPE_STA:
00301       {
00302 
00303         vulture_start_message *sta = (vulture_start_message*)msg->mtext;
00304         if(poster_ == 0) poster_ = new CurlPoster(sta->url_);
00305         if(poster_->check(sta->run_)){
00306           try{
00307             startProwling();
00308             MsgBuf msg1(0,MSQS_VULTURE_TYPE_ACK) ;
00309             sq_->post(msg1);
00310           }
00311           catch(evf::Exception &e)
00312             {
00313               std::cout << "Vulture::start - exception in starting prowling workloop " << e.what() << std::endl;
00314               //@EM ToDo generate some message here
00315             }     
00316         }else{
00317           std::cout << "Vulture::start - could not contact iDie - chech Url - will not start prowling loop" << std::endl;
00318           prowling_ = false;
00319         }
00320       
00321         break;
00322       }
00323     case MSQM_VULTURE_TYPE_STP:
00324       {
00325         prowling_ = false;
00326         break;
00327       }
00328     default:
00329       {
00330         // do nothing @EM ToDo generate an appropriate error message
00331       }
00332     }
00333     return true;
00334     
00335   }
00336 
00337   bool Vulture::prowling(toolbox::task::WorkLoop*wl)
00338   {
00339 
00340     if(!prowling_){
00341       char messageDie[5];
00342       sprintf(messageDie,"Dead");
00343       
00344       try{
00345         poster_->postString(messageDie,5,0,CurlPoster::stack);
00346       }
00347       catch(evf::Exception &e){
00348           //do nothing just swallow the exception
00349       }
00350       delete poster_;
00351       poster_=0;
00352       
00353       return false;
00354     }
00355     
00356     newCores_ = 0;
00357     
00358     struct stat filestat;    
00359     
00360     timeval now;
00361     gettimeofday(&now,0);
00362     
00363     // examine /tmp looking for new coredumps
00364     dirent *dirp;
00365     while((dirp = readdir(tmp_))!=0){
00366       if(strncmp(dirp->d_name,"core",4)==0){
00367         stat(dirp->d_name,&filestat);
00368         if(filestat.st_mtime > lastUpdate_){
00369           currentCoreList_.push_back(dirp->d_name);
00370           newCores_++;
00371         }
00372       }
00373     }
00374     rewinddir(tmp_);
00375     lastUpdate_ = now.tv_sec;
00376     try{
00377       analyze();
00378     }
00379     catch(evf::Exception &e){
00380       std::cout << "Vulture cannot send to iDie server, bail out " << std::endl;
00381       return false;
00382     }
00383     ::sleep(60);
00384     return true;
00385   }
00386 
00387   void Vulture::analyze()
00388   {
00389     // do a first analysis of the coredump
00390     if(newCores_==0) return;
00391     for(unsigned int i = currentCoreList_.size()-newCores_; 
00392         i < currentCoreList_.size();
00393         i++){
00394       std::string command = "gdb /opt/xdaq/bin/xdaq.exe -batch -x /tmp/vulture.cmd -c /tmp/";
00395       std::string cmdout;
00396       command += currentCoreList_[i];
00397       std::string filePathAndName = FS + "/";
00398       filePathAndName += currentCoreList_[i];
00399       std::string pid = 
00400         currentCoreList_[i].substr(currentCoreList_[i].find_first_of(".")+1,
00401                                    currentCoreList_[i].length());
00402 
00403       FILE *ps = popen(command.c_str(),"r");
00404       size_t s = 256;
00405       char *p=new char[s];
00406       bool filter = false;
00407       while(getline(&p,&s,ps) != -1){
00408         if(strncmp("Core",p,4)==0) filter = true;
00409         if(filter)cmdout += p;
00410       }
00411       delete[] p;
00412       pclose(ps);
00413       int errsv = 0;
00414       int rch = chmod(filePathAndName.c_str(),0777);
00415       if(rch != 0){
00416         errsv = errno;
00417         std::cout << "ERROR: couldn't change corefile access privileges -" 
00418                   << strerror(errsv)<< std::endl;
00419       }
00420       unsigned int ipid = (unsigned int)atoi(pid.c_str());
00421       poster_->postString(cmdout.c_str(),cmdout.length(),ipid, CurlPoster::stack); 
00422       
00423     }
00424   }
00425 }
00426