00001
00002 #include "EventFilter/Utilities/interface/Vulture.h"
00003 #include "EventFilter/Utilities/interface/Exception.h"
00004 #include "EventFilter/Utilities/interface/CurlPoster.h"
00005
00006
00007 #include "pt/PeerTransportReceiver.h"
00008 #include "pt/PeerTransportAgent.h"
00009
00010 #include "toolbox/task/Action.h"
00011 #include "toolbox/task/WorkLoop.h"
00012 #include "toolbox/task/WorkLoopFactory.h"
00013
00014 #include <unistd.h>
00015 #ifdef linux
00016 #include <sys/prctl.h>
00017 #endif
00018 #include <signal.h>
00019 #include <string.h>
00020 #include <sys/wait.h>
00021 #include <sys/time.h>
00022
00023 #include <sys/stat.h>
00024 #include <unistd.h>
00025 #include <stdlib.h>
00026 #include <stdio.h>
00027
00028 #ifdef __APPLE__
00029
00030
00031 #ifndef SIZE_MAX
00032 # define SIZE_MAX ((size_t) -1)
00033 #endif
00034 #ifndef SSIZE_MAX
00035 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
00036 #endif
00037
00038 ssize_t getline (char **lineptr, size_t *n, FILE *fp)
00039 {
00040 ssize_t result;
00041 size_t cur_len = 0;
00042
00043 if (lineptr == NULL || n == NULL || fp == NULL)
00044 {
00045 errno = EINVAL;
00046 return -1;
00047 }
00048
00049 if (*lineptr == NULL || *n == 0)
00050 {
00051 *n = 120;
00052 *lineptr = (char *) malloc (*n);
00053 if (*lineptr == NULL)
00054 {
00055 result = -1;
00056 goto end;
00057 }
00058 }
00059
00060 for (;;)
00061 {
00062 int i;
00063
00064 i = getc (fp);
00065 if (i == EOF)
00066 {
00067 result = -1;
00068 break;
00069 }
00070
00071
00072 if (cur_len + 1 >= *n)
00073 {
00074 size_t needed_max =
00075 SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
00076 size_t needed = 2 * *n + 1;
00077 char *new_lineptr;
00078
00079 if (needed_max < needed)
00080 needed = needed_max;
00081 if (cur_len + 1 >= needed)
00082 {
00083 result = -1;
00084 goto end;
00085 }
00086
00087 new_lineptr = (char *) realloc (*lineptr, needed);
00088 if (new_lineptr == NULL)
00089 {
00090 result = -1;
00091 goto end;
00092 }
00093
00094 *lineptr = new_lineptr;
00095 *n = needed;
00096 }
00097
00098 (*lineptr)[cur_len] = i;
00099 cur_len++;
00100
00101 if (i == '\n')
00102 break;
00103 }
00104 (*lineptr)[cur_len] = '\0';
00105 result = cur_len ? (ssize_t) cur_len : result;
00106
00107 end:
00108 return result;
00109 }
00110 #endif
00111
00112 namespace evf{
00113
00114 const std::string Vulture::FS="/tmp";
00115
00116 Vulture::Vulture(bool push)
00117 : wlCtrl_(0)
00118 , asCtrl_(0)
00119 , running_(false)
00120 , wlProwl_(0)
00121 , asProwl_(0)
00122 , prowling_(false)
00123 , iDieUrl_("")
00124 , updateMode_(push)
00125 , vulturePid_(0)
00126 , tmp_(0)
00127 , newCores_(0)
00128 , poster_(0)
00129 , mq_(new MasterQueue(vulture_queue_offset))
00130 , sq_(0)
00131 , started_(-1)
00132 , stopped_(-1)
00133 , handicapped_(false)
00134 {
00135
00136 FILE *outf = fopen("/tmp/vulture.cmd","w");
00137 fprintf(outf,"where\n");
00138 fclose(outf);
00139 }
00140
00141 Vulture::~Vulture()
00142 {
00143 delete mq_;
00144 if(sq_ != 0) delete sq_;
00145 if(poster_ != 0) delete poster_;
00146 }
00147
00148 pid_t Vulture::makeProcess(){
00149
00150 pid_t retval = fork();
00151 if(retval==0){
00152 int success = -1;
00153 #ifdef linux
00154 success = prctl( PR_SET_DUMPABLE, 0 );
00155 #endif
00156 if(success != 0){
00157 std::cout << "Vulture::could not set process undumpable" << std::endl;
00158 handicapped_ = true;
00159 }
00160 #ifdef linux
00161 success = prctl( PR_SET_PDEATHSIG, SIGKILL );
00162 #endif
00163 if(success != 0){
00164 std::cout << "Vulture::could not set process death signal" << std::endl;
00165 handicapped_ = true;
00166 }
00167 tmp_ = opendir(FS.c_str());
00168 #ifdef linux
00169 success = prctl ( PR_SET_NAME , "vulture");
00170 #endif
00171 if(success != 0){
00172 std::cout << "Vulture::could not set process name" << std::endl;
00173 handicapped_ = true;
00174 }
00175
00176 try{
00177 pt::PeerTransport * ptr =
00178 pt::getPeerTransportAgent()->getPeerTransport("https","soap",pt::Receiver);
00179 delete ptr;
00180 }
00181 catch (pt::exception::PeerTransportNotFound & e ){
00182
00183 }
00184
00185 sq_ = new SlaveQueue(vulture_queue_offset);
00186
00187 try {
00188 wlCtrl_=
00189 toolbox::task::getWorkLoopFactory()->getWorkLoop("Ctrll",
00190 "waiting");
00191 if (!wlCtrl_->isActive()) wlCtrl_->activate();
00192
00193 asCtrl_ = toolbox::task::bind(this,&Vulture::control,
00194 "Ctrl");
00195 wlCtrl_->submit(asCtrl_);
00196 }
00197 catch (xcept::Exception& e) {
00198 std::cout << "Vulture:constructor - could not start workloop 'Ctrl' for process " << retval << std::endl;
00199 }
00200 }
00201 else{
00202 vulturePid_ = retval;
00203 }
00204 return retval;
00205
00206
00207 }
00208
00209 int Vulture::hasStarted(){
00210 if(started_<0){
00211 MsgBuf msg2(MAX_MSG_SIZE,MSQS_VULTURE_TYPE_ACK);
00212 try{
00213 mq_->rcvNonBlocking(msg2);
00214 started_ = 0;
00215 }
00216 catch(evf::Exception &e){
00217 }
00218 } else {started_ = 1;}
00219 return started_;
00220 }
00221
00222 int Vulture::hasStopped(){
00223 if(stopped_<0){
00224 MsgBuf msg2(MAX_MSG_SIZE,MSQS_VULTURE_TYPE_ACK);
00225 try{
00226 mq_->rcvNonBlocking(msg2);
00227 stopped_ = 0;
00228 }
00229 catch(evf::Exception &e){
00230 }
00231 } else {stopped_ = 1;}
00232 return stopped_;
00233 }
00234
00235 pid_t Vulture::start(std::string url, int run){
00236
00237
00238 vulture_start_message stamsg;
00239 strcpy(stamsg.url_,url.c_str());
00240 stamsg.run_ = run;
00241 MsgBuf msg1(sizeof(vulture_start_message),MSQM_VULTURE_TYPE_STA);
00242 memcpy(msg1->mtext,&stamsg,sizeof(vulture_start_message));
00243 mq_->post(msg1);
00244 stopped_ = -1;
00245 return vulturePid_;
00246 }
00247
00248 pid_t Vulture::stop()
00249 {
00250
00251 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQM_VULTURE_TYPE_STP);
00252 mq_->post(msg1);
00253 started_ = -1;
00254 return vulturePid_;
00255 }
00256
00257 pid_t Vulture::kill()
00258 {
00259 ::kill (vulturePid_, SIGKILL);
00260 int sl;
00261 pid_t killedOrNot = waitpid(vulturePid_,&sl,WNOHANG);
00262 vulturePid_ = 0;
00263 return killedOrNot;
00264 }
00265
00266 void Vulture::startProwling()
00267 {
00268 timeval now;
00269 gettimeofday(&now,0);
00270 lastUpdate_ = now.tv_sec;
00271 prowling_ = true;
00272 try {
00273 wlProwl_=
00274 toolbox::task::getWorkLoopFactory()->getWorkLoop("Prowl",
00275 "waiting");
00276 if (!wlProwl_->isActive()) wlProwl_->activate();
00277
00278 asProwl_ = toolbox::task::bind(this,&Vulture::prowling,
00279 "Prowl");
00280 wlProwl_->submit(asProwl_);
00281 }
00282 catch (xcept::Exception& e) {
00283 std::string msg = "Failed to start workloop 'Prowl'.";
00284 XCEPT_RETHROW(evf::Exception,msg,e);
00285 }
00286
00287 }
00288
00289 bool Vulture::control(toolbox::task::WorkLoop*wl)
00290 {
00291
00292 MsgBuf msg;
00293 unsigned long mtype = MSQM_MESSAGE_TYPE_NOP;
00294 try{mtype = sq_->rcv(msg);}catch(evf::Exception &e){
00295 std::cout << "Vulture::exception on msgrcv for control, bailing out of control workloop - good bye" << std::endl;
00296 return false;
00297 }
00298 mtype = msg->mtype;
00299 switch(mtype){
00300 case MSQM_VULTURE_TYPE_STA:
00301 {
00302
00303 vulture_start_message *sta = (vulture_start_message*)msg->mtext;
00304 if(poster_ == 0) poster_ = new CurlPoster(sta->url_);
00305 if(poster_->check(sta->run_)){
00306 try{
00307 startProwling();
00308 MsgBuf msg1(0,MSQS_VULTURE_TYPE_ACK) ;
00309 sq_->post(msg1);
00310 }
00311 catch(evf::Exception &e)
00312 {
00313 std::cout << "Vulture::start - exception in starting prowling workloop " << e.what() << std::endl;
00314
00315 }
00316 }else{
00317 std::cout << "Vulture::start - could not contact iDie - chech Url - will not start prowling loop" << std::endl;
00318 prowling_ = false;
00319 }
00320
00321 break;
00322 }
00323 case MSQM_VULTURE_TYPE_STP:
00324 {
00325 prowling_ = false;
00326 break;
00327 }
00328 default:
00329 {
00330
00331 }
00332 }
00333 return true;
00334
00335 }
00336
00337 bool Vulture::prowling(toolbox::task::WorkLoop*wl)
00338 {
00339
00340 if(!prowling_){
00341 char messageDie[5];
00342 sprintf(messageDie,"Dead");
00343
00344 try{
00345 poster_->postString(messageDie,5,0,CurlPoster::stack);
00346 }
00347 catch(evf::Exception &e){
00348
00349 }
00350 delete poster_;
00351 poster_=0;
00352
00353 return false;
00354 }
00355
00356 newCores_ = 0;
00357
00358 struct stat filestat;
00359
00360 timeval now;
00361 gettimeofday(&now,0);
00362
00363
00364 dirent *dirp;
00365 while((dirp = readdir(tmp_))!=0){
00366 if(strncmp(dirp->d_name,"core",4)==0){
00367 stat(dirp->d_name,&filestat);
00368 if(filestat.st_mtime > lastUpdate_){
00369 currentCoreList_.push_back(dirp->d_name);
00370 newCores_++;
00371 }
00372 }
00373 }
00374 rewinddir(tmp_);
00375 lastUpdate_ = now.tv_sec;
00376 try{
00377 analyze();
00378 }
00379 catch(evf::Exception &e){
00380 std::cout << "Vulture cannot send to iDie server, bail out " << std::endl;
00381 return false;
00382 }
00383 ::sleep(60);
00384 return true;
00385 }
00386
00387 void Vulture::analyze()
00388 {
00389
00390 if(newCores_==0) return;
00391 for(unsigned int i = currentCoreList_.size()-newCores_;
00392 i < currentCoreList_.size();
00393 i++){
00394 std::string command = "gdb /opt/xdaq/bin/xdaq.exe -batch -x /tmp/vulture.cmd -c /tmp/";
00395 std::string cmdout;
00396 command += currentCoreList_[i];
00397 std::string filePathAndName = FS + "/";
00398 filePathAndName += currentCoreList_[i];
00399 std::string pid =
00400 currentCoreList_[i].substr(currentCoreList_[i].find_first_of(".")+1,
00401 currentCoreList_[i].length());
00402
00403 FILE *ps = popen(command.c_str(),"r");
00404 size_t s = 256;
00405 char *p=new char[s];
00406 bool filter = false;
00407 while(getline(&p,&s,ps) != -1){
00408 if(strncmp("Core",p,4)==0) filter = true;
00409 if(filter)cmdout += p;
00410 }
00411 delete[] p;
00412 pclose(ps);
00413 int errsv = 0;
00414 int rch = chmod(filePathAndName.c_str(),0777);
00415 if(rch != 0){
00416 errsv = errno;
00417 std::cout << "ERROR: couldn't change corefile access privileges -"
00418 << strerror(errsv)<< std::endl;
00419 }
00420 unsigned int ipid = (unsigned int)atoi(pid.c_str());
00421 poster_->postString(cmdout.c_str(),cmdout.length(),ipid, CurlPoster::stack);
00422
00423 }
00424 }
00425 }
00426