00001
00003
00004 #include <stdio.h>
00005 #include <string>
00006 #include <sstream>
00007 #include <iomanip>
00008 #include <sys/types.h>
00009 #include <sys/stat.h>
00010 #ifdef __APPLE__
00011 #include <sys/param.h>
00012 #include <sys/mount.h>
00013 #else
00014 #include <sys/statfs.h>
00015 #endif
00016 #include <fcntl.h>
00017 #include <dirent.h>
00018 #include <fnmatch.h>
00019 #include <pwd.h>
00020 #include <fstream>
00021 #include <algorithm>
00022
00023 #include <boost/bind.hpp>
00024 #include <boost/regex.hpp>
00025
00026 #include "EventFilter/StorageManager/interface/CurlInterface.h"
00027 #include "EventFilter/StorageManager/interface/Exception.h"
00028 #include "EventFilter/StorageManager/interface/ResourceMonitorCollection.h"
00029 #include "EventFilter/StorageManager/interface/Utils.h"
00030
00031
00032 namespace stor {
00033
00034 ResourceMonitorCollection::ResourceMonitorCollection
00035 (
00036 const utils::Duration_t& updateInterval,
00037 AlarmHandlerPtr ah
00038 ) :
00039 MonitorCollection(updateInterval),
00040 updateInterval_(updateInterval),
00041 alarmHandler_(ah),
00042 numberOfCopyWorkers_(-1),
00043 numberOfInjectWorkers_(-1),
00044 nLogicalDisks_(0),
00045 latchedSataBeastStatus_(-1)
00046 {}
00047
00048
00049 void ResourceMonitorCollection::configureDisks(DiskWritingParams const& dwParams)
00050 {
00051 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00052
00053 dwParams_ = dwParams;
00054
00055 nLogicalDisks_ = std::max(dwParams.nLogicalDisk_, 1);
00056 diskUsageList_.clear();
00057 diskUsageList_.reserve(nLogicalDisks_+dwParams.otherDiskPaths_.size()+1);
00058
00059 for (unsigned int i=0; i<nLogicalDisks_; ++i) {
00060
00061 std::ostringstream pathName;
00062 pathName << dwParams.filePath_;
00063 if( dwParams.nLogicalDisk_ > 0 ) {
00064 pathName << "/" << std::setfill('0') << std::setw(2) << i;
00065 }
00066 addDisk(pathName.str());
00067 }
00068 addDisk(dwParams.dbFilePath_);
00069
00070 if ( alarmParams_.isProductionSystem_ )
00071 {
00072 addOtherDisks();
00073 }
00074 }
00075
00076
00077 void ResourceMonitorCollection::addDisk(const std::string& pathname)
00078 {
00079 if ( pathname.empty() ) return;
00080
00081 DiskUsagePtr diskUsage( new DiskUsage() );
00082 diskUsage->pathName = pathname;
00083 retrieveDiskSize(diskUsage);
00084 diskUsageList_.push_back(diskUsage);
00085 }
00086
00087
00088 void ResourceMonitorCollection::addOtherDisks()
00089 {
00090 for ( DiskWritingParams::OtherDiskPaths::const_iterator
00091 it = dwParams_.otherDiskPaths_.begin(),
00092 itEnd = dwParams_.otherDiskPaths_.end();
00093 it != itEnd;
00094 ++it)
00095 {
00096 addDisk(*it);
00097 }
00098 }
00099
00100
00101 void ResourceMonitorCollection::configureResources
00102 (
00103 ResourceMonitorParams const& rmParams
00104 )
00105 {
00106 rmParams_ = rmParams;
00107 }
00108
00109
00110 void ResourceMonitorCollection::configureAlarms
00111 (
00112 AlarmParams const& alarmParams
00113 )
00114 {
00115 alarmParams_ = alarmParams;
00116 }
00117
00118
00119 void ResourceMonitorCollection::getStats(Stats& stats) const
00120 {
00121 getDiskStats(stats);
00122
00123 stats.numberOfCopyWorkers = numberOfCopyWorkers_;
00124 stats.numberOfInjectWorkers = numberOfInjectWorkers_;
00125
00126 stats.sataBeastStatus = latchedSataBeastStatus_;
00127 }
00128
00129
00130 void ResourceMonitorCollection::getDiskStats(Stats& stats) const
00131 {
00132 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00133
00134 stats.diskUsageStatsList.clear();
00135 stats.diskUsageStatsList.reserve(diskUsageList_.size());
00136 for ( DiskUsagePtrList::const_iterator it = diskUsageList_.begin(),
00137 itEnd = diskUsageList_.end();
00138 it != itEnd;
00139 ++it)
00140 {
00141 DiskUsageStatsPtr diskUsageStats(new DiskUsageStats);
00142 diskUsageStats->diskSize = (*it)->diskSize;
00143 diskUsageStats->absDiskUsage = (*it)->absDiskUsage;
00144 diskUsageStats->relDiskUsage = (*it)->relDiskUsage;
00145 diskUsageStats->pathName = (*it)->pathName;
00146 diskUsageStats->alarmState = (*it)->alarmState;
00147 stats.diskUsageStatsList.push_back(diskUsageStats);
00148 }
00149 }
00150
00151
00152 void ResourceMonitorCollection::do_calculateStatistics()
00153 {
00154 calcDiskUsage();
00155 calcNumberOfCopyWorkers();
00156 calcNumberOfInjectWorkers();
00157 checkSataBeasts();
00158 }
00159
00160
00161 void ResourceMonitorCollection::do_reset()
00162 {
00163 numberOfCopyWorkers_ = -1;
00164 numberOfInjectWorkers_ = -1;
00165 latchedSataBeastStatus_ = -1;
00166
00167 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00168 for ( DiskUsagePtrList::const_iterator it = diskUsageList_.begin(),
00169 itEnd = diskUsageList_.end();
00170 it != itEnd;
00171 ++it)
00172 {
00173 (*it)->absDiskUsage = -1;
00174 (*it)->relDiskUsage = -1;
00175 (*it)->alarmState = AlarmHandler::OKAY;
00176 }
00177 }
00178
00179
00180 void ResourceMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00181 {
00182 infoSpaceItems.push_back(std::make_pair("copyWorkers", ©Workers_));
00183 infoSpaceItems.push_back(std::make_pair("injectWorkers", &injectWorkers_));
00184 infoSpaceItems.push_back(std::make_pair("sataBeastStatus", &sataBeastStatus_));
00185 infoSpaceItems.push_back(std::make_pair("numberOfDisks", &numberOfDisks_));
00186 infoSpaceItems.push_back(std::make_pair("diskPaths", &diskPaths_));
00187 infoSpaceItems.push_back(std::make_pair("totalDiskSpace", &totalDiskSpace_));
00188 infoSpaceItems.push_back(std::make_pair("usedDiskSpace", &usedDiskSpace_));
00189 }
00190
00191
00192 void ResourceMonitorCollection::do_updateInfoSpaceItems()
00193 {
00194 Stats stats;
00195 getStats(stats);
00196
00197 if (stats.numberOfCopyWorkers > 0)
00198 copyWorkers_ = static_cast<xdata::UnsignedInteger32>(stats.numberOfCopyWorkers);
00199 else
00200 copyWorkers_ = 0;
00201
00202 if (stats.numberOfInjectWorkers > 0)
00203 injectWorkers_ = static_cast<xdata::UnsignedInteger32>(stats.numberOfInjectWorkers);
00204 else
00205 injectWorkers_ = 0;
00206
00207 sataBeastStatus_ = stats.sataBeastStatus;
00208 numberOfDisks_ = nLogicalDisks_;
00209
00210 diskPaths_.clear();
00211 totalDiskSpace_.clear();
00212 usedDiskSpace_.clear();
00213
00214 diskPaths_.reserve(stats.diskUsageStatsList.size());
00215 totalDiskSpace_.reserve(stats.diskUsageStatsList.size());
00216 usedDiskSpace_.reserve(stats.diskUsageStatsList.size());
00217
00218 for (DiskUsageStatsPtrList::const_iterator
00219 it = stats.diskUsageStatsList.begin(),
00220 itEnd = stats.diskUsageStatsList.end();
00221 it != itEnd;
00222 ++it)
00223 {
00224 diskPaths_.push_back(
00225 static_cast<xdata::String>( (*it)->pathName )
00226 );
00227 totalDiskSpace_.push_back(
00228 static_cast<xdata::UnsignedInteger32>(
00229 static_cast<unsigned int>( (*it)->diskSize * 1024 )
00230 )
00231 );
00232 usedDiskSpace_.push_back(
00233 static_cast<xdata::UnsignedInteger32>(
00234 static_cast<unsigned int>( (*it)->absDiskUsage * 1024 )
00235 )
00236 );
00237 }
00238
00239 calcDiskUsage();
00240 }
00241
00242
00243 void ResourceMonitorCollection::calcDiskUsage()
00244 {
00245 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00246
00247 for ( DiskUsagePtrList::iterator it = diskUsageList_.begin(),
00248 itEnd = diskUsageList_.end();
00249 it != itEnd;
00250 ++it)
00251 {
00252 retrieveDiskSize(*it);
00253 }
00254 }
00255
00256 void ResourceMonitorCollection::retrieveDiskSize(DiskUsagePtr diskUsage)
00257 {
00258 #if __APPLE__
00259 struct statfs buf;
00260 int retVal = statfs(diskUsage->pathName.c_str(), &buf);
00261 #else
00262 struct statfs64 buf;
00263 int retVal = statfs64(diskUsage->pathName.c_str(), &buf);
00264 #endif
00265 if(retVal==0) {
00266 unsigned int blksize = buf.f_bsize;
00267 diskUsage->diskSize =
00268 static_cast<double>(buf.f_blocks * blksize) / 1024 / 1024 / 1024;
00269 diskUsage->absDiskUsage =
00270 diskUsage->diskSize -
00271 static_cast<double>(buf.f_bavail * blksize) / 1024 / 1024 / 1024;
00272 diskUsage->relDiskUsage = (100 * (diskUsage->absDiskUsage / diskUsage->diskSize));
00273 if ( diskUsage->relDiskUsage > dwParams_.highWaterMark_ )
00274 {
00275 emitDiskSpaceAlarm(diskUsage);
00276 }
00277 else if ( diskUsage->relDiskUsage < dwParams_.highWaterMark_*0.95 )
00278
00279 {
00280 revokeDiskAlarm(diskUsage);
00281 }
00282 }
00283 else
00284 {
00285 emitDiskAlarm(diskUsage, errno);
00286 diskUsage->diskSize = -1;
00287 diskUsage->absDiskUsage = -1;
00288 diskUsage->relDiskUsage = -1;
00289 }
00290 }
00291
00292
00293 void ResourceMonitorCollection::emitDiskAlarm(DiskUsagePtr diskUsage, error_t e)
00294
00295 {
00296 std::string msg;
00297
00298 switch(e)
00299 {
00300 case ENOENT :
00301 diskUsage->alarmState = AlarmHandler::ERROR;
00302 msg = "Cannot access " + diskUsage->pathName + ". Is it mounted?";
00303 break;
00304
00305 default :
00306 diskUsage->alarmState = AlarmHandler::WARNING;
00307 msg = "Failed to retrieve disk space information for " + diskUsage->pathName + ":"
00308 + strerror(e);
00309 }
00310
00311 XCEPT_DECLARE(stor::exception::DiskSpaceAlarm, ex, msg);
00312 alarmHandler_->notifySentinel(diskUsage->alarmState, ex);
00313 }
00314
00315
00316 void ResourceMonitorCollection::emitDiskSpaceAlarm(DiskUsagePtr diskUsage)
00317 {
00318 if ( diskUsage->relDiskUsage > dwParams_.failHighWaterMark_ )
00319 {
00320 failIfImportantDisk(diskUsage);
00321 }
00322
00323 diskUsage->alarmState = AlarmHandler::WARNING;
00324
00325 XCEPT_DECLARE(stor::exception::DiskSpaceAlarm, ex, diskUsage->toString());
00326 alarmHandler_->raiseAlarm(diskUsage->pathName, diskUsage->alarmState, ex);
00327 }
00328
00329
00330 void ResourceMonitorCollection::failIfImportantDisk(DiskUsagePtr diskUsage)
00331 {
00332
00333 DiskWritingParams::OtherDiskPaths::const_iterator begin =
00334 dwParams_.otherDiskPaths_.begin();
00335 DiskWritingParams::OtherDiskPaths::const_iterator end =
00336 dwParams_.otherDiskPaths_.end();
00337 if ( std::find(begin, end, diskUsage->pathName) != end ) return;
00338
00339 diskUsage->alarmState = AlarmHandler::FATAL;
00340 XCEPT_RAISE(stor::exception::DiskSpaceAlarm, diskUsage->toString());
00341 }
00342
00343
00344 void ResourceMonitorCollection::revokeDiskAlarm(DiskUsagePtr diskUsage)
00345 {
00346 diskUsage->alarmState = AlarmHandler::OKAY;
00347
00348 alarmHandler_->revokeAlarm(diskUsage->pathName);
00349 }
00350
00351
00352 void ResourceMonitorCollection::calcNumberOfCopyWorkers()
00353 {
00354 struct passwd* passwd = getpwnam(rmParams_.copyWorkers_.user_.c_str());
00355 if (passwd)
00356 {
00357 numberOfCopyWorkers_ =
00358 getProcessCount(rmParams_.copyWorkers_.command_, passwd->pw_uid);
00359 }
00360 else
00361 {
00362 numberOfCopyWorkers_ = 0;
00363 }
00364
00365 if ( alarmParams_.isProductionSystem_ && rmParams_.copyWorkers_.expectedCount_ >= 0 )
00366 {
00367 checkNumberOfCopyWorkers();
00368 }
00369 }
00370
00371
00372 void ResourceMonitorCollection::checkNumberOfCopyWorkers()
00373 {
00374 const std::string alarmName = "CopyWorkers";
00375
00376 if ( numberOfCopyWorkers_ < rmParams_.copyWorkers_.expectedCount_ )
00377 {
00378 std::ostringstream msg;
00379 msg << "Expected " << rmParams_.copyWorkers_.expectedCount_ <<
00380 " running CopyWorkers, but found " <<
00381 numberOfCopyWorkers_ << ".";
00382 XCEPT_DECLARE(stor::exception::CopyWorkers, ex, msg.str());
00383 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::WARNING, ex);
00384 }
00385 else
00386 {
00387 alarmHandler_->revokeAlarm(alarmName);
00388 }
00389 }
00390
00391
00392 void ResourceMonitorCollection::calcNumberOfInjectWorkers()
00393 {
00394 struct passwd* passwd = getpwnam(rmParams_.injectWorkers_.user_.c_str());
00395 if (passwd)
00396 {
00397 numberOfInjectWorkers_ = getProcessCount(rmParams_.injectWorkers_.command_, passwd->pw_uid);
00398 }
00399 else
00400 {
00401 numberOfInjectWorkers_ = 0;
00402 }
00403
00404 if (
00405 alarmParams_.isProductionSystem_ &&
00406 rmParams_.injectWorkers_.expectedCount_ >= 0
00407 )
00408 {
00409 checkNumberOfInjectWorkers();
00410 }
00411 }
00412
00413
00414 void ResourceMonitorCollection::checkNumberOfInjectWorkers()
00415 {
00416 const std::string alarmName = "InjectWorkers";
00417
00418 if ( numberOfInjectWorkers_ != rmParams_.injectWorkers_.expectedCount_ )
00419 {
00420 std::ostringstream msg;
00421 msg << "Expected " << rmParams_.injectWorkers_.expectedCount_ <<
00422 " running InjectWorkers, but found " <<
00423 numberOfInjectWorkers_ << ".";
00424 XCEPT_DECLARE(stor::exception::InjectWorkers, ex, msg.str());
00425 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::WARNING, ex);
00426 }
00427 else
00428 {
00429 alarmHandler_->revokeAlarm(alarmName);
00430 }
00431 }
00432
00433
00434 void ResourceMonitorCollection::checkSataBeasts()
00435 {
00436 SATABeasts sataBeasts;
00437 if ( getSataBeasts(sataBeasts) )
00438 {
00439 for (
00440 SATABeasts::const_iterator it = sataBeasts.begin(),
00441 itEnd= sataBeasts.end();
00442 it != itEnd;
00443 ++it
00444 )
00445 {
00446 checkSataBeast(*it);
00447 }
00448 }
00449 else
00450 {
00451 latchedSataBeastStatus_ = -1;
00452 }
00453 }
00454
00455
00456 bool ResourceMonitorCollection::getSataBeasts(SATABeasts& sataBeasts)
00457 {
00458 if (! alarmParams_.isProductionSystem_) return false;
00459
00460 std::ifstream in;
00461 in.open( "/proc/mounts" );
00462
00463 if ( ! in.is_open() ) return false;
00464
00465 std::string line;
00466 while( getline(in,line) )
00467 {
00468 size_t pos = line.find("sata");
00469 if ( pos != std::string::npos )
00470 {
00471 std::ostringstream host;
00472 host << "satab-c2c"
00473 << std::setw(2) << std::setfill('0')
00474 << line.substr(pos+4,1)
00475 << "-"
00476 << std::setw(2) << std::setfill('0')
00477 << line.substr(pos+5,1);
00478 sataBeasts.insert(host.str());
00479 }
00480 }
00481 return !sataBeasts.empty();
00482 }
00483
00484
00485 void ResourceMonitorCollection::checkSataBeast(const std::string& sataBeast)
00486 {
00487 if ( ! (checkSataDisks(sataBeast,"-00.cms") || checkSataDisks(sataBeast,"-10.cms")) )
00488 {
00489 XCEPT_DECLARE(stor::exception::SataBeast, ex,
00490 "Failed to connect to SATA beast " + sataBeast);
00491 alarmHandler_->raiseAlarm(sataBeast, AlarmHandler::ERROR, ex);
00492
00493 latchedSataBeastStatus_ = 99999;
00494 }
00495 }
00496
00497
00498 bool ResourceMonitorCollection::checkSataDisks
00499 (
00500 const std::string& sataBeast,
00501 const std::string& hostSuffix
00502 )
00503 {
00504 CurlInterface curlInterface;
00505 CurlInterface::Content content;
00506
00507
00508 if ( rmParams_.sataUser_.empty() ) return true;
00509
00510 const CURLcode returnCode =
00511 curlInterface.getContent(
00512 "https://" + sataBeast + hostSuffix + "/status.asp",rmParams_.sataUser_,
00513 content
00514 );
00515
00516 if (returnCode == CURLE_OK)
00517 {
00518 updateSataBeastStatus(sataBeast, std::string(&content[0]));
00519 return true;
00520 }
00521 else
00522 {
00523 std::ostringstream msg;
00524 msg << "Failed to connect to SATA controller "
00525 << sataBeast << hostSuffix
00526 << ": " << std::string(&content[0]);
00527 XCEPT_DECLARE(stor::exception::SataBeast, ex, msg.str());
00528 alarmHandler_->notifySentinel(AlarmHandler::WARNING, ex);
00529
00530 return false;
00531 }
00532 }
00533
00534
00535 void ResourceMonitorCollection::updateSataBeastStatus
00536 (
00537 const std::string& sataBeast,
00538 const std::string& content
00539 )
00540 {
00541 boost::regex failedEntry(">([^<]* has failed[^<]*)");
00542 boost::regex failedDisk("Hard disk([[:digit:]]+)");
00543 boost::regex failedController("RAID controller ([[:digit:]]+)");
00544 boost::match_results<std::string::const_iterator> matchedEntry, matchedCause;
00545 boost::match_flag_type flags = boost::match_default;
00546
00547 std::string::const_iterator start = content.begin();
00548 std::string::const_iterator end = content.end();
00549
00550 unsigned int newSataBeastStatus = 0;
00551
00552 while( regex_search(start, end, matchedEntry, failedEntry, flags) )
00553 {
00554 std::string errorMsg = matchedEntry[1];
00555 XCEPT_DECLARE(stor::exception::SataBeast, ex, sataBeast+": "+errorMsg);
00556 alarmHandler_->raiseAlarm(sataBeast, AlarmHandler::ERROR, ex);
00557
00558
00559 if ( regex_search(errorMsg, matchedCause, failedDisk) )
00560 {
00561
00562 ++newSataBeastStatus;
00563 }
00564 else if ( regex_search(errorMsg, matchedCause, failedController) )
00565 {
00566
00567 newSataBeastStatus += 100;
00568 }
00569 else
00570 {
00571
00572 newSataBeastStatus += 1000;
00573 }
00574
00575
00576 start = matchedEntry[0].second;
00577
00578 flags |= boost::match_prev_avail;
00579 flags |= boost::match_not_bob;
00580 }
00581
00582 latchedSataBeastStatus_ = newSataBeastStatus;
00583
00584 if (latchedSataBeastStatus_ == 0)
00585 alarmHandler_->revokeAlarm(sataBeast);
00586
00587 }
00588
00589
00590 namespace {
00591 int filter(const struct dirent *dir)
00592 {
00593 return !fnmatch("[1-9]*", dir->d_name, 0);
00594 }
00595
00596 bool matchUid(const std::string& filename, const uid_t& uid)
00597 {
00598 struct stat filestat;
00599 int result = stat(filename.c_str(), &filestat);
00600 return (result == 0 && filestat.st_uid == uid);
00601 }
00602
00603 bool isMaster(const char* pid)
00604 {
00605
00606 char buf[800];
00607 int fd;
00608 int ppid = 0;
00609 std::ostringstream statfile;
00610 statfile << "/proc/" << pid << "/stat";
00611 snprintf(buf, 32, statfile.str().c_str(), pid);
00612 if ( (fd = open(buf, O_RDONLY, 0) ) == -1 ) return false;
00613 int num = read(fd, buf, sizeof buf - 1);
00614 if(num<80) return false;
00615 buf[num] = '\0';
00616 char* tmp = strrchr(buf, ')');
00617 num = sscanf(tmp + 4,
00618 "%d", &ppid);
00619 close(fd);
00620 return ( num == 1 && ppid == 1 );
00621 }
00622
00623 bool grep(const std::string& cmdline, const std::string& name)
00624 {
00625
00626 std::ifstream in;
00627 in.open( cmdline.c_str() );
00628
00629 std::string line;
00630 if ( in.is_open() )
00631 {
00632 std::string tmp;
00633 while( getline(in,tmp,'\0') )
00634 {
00635 line.append(tmp);
00636 line.append(" ");
00637 }
00638 in.close();
00639 }
00640
00641 return ( line.find(name) != std::string::npos );
00642 }
00643 }
00644
00645
00646 int ResourceMonitorCollection::getProcessCount
00647 (
00648 const std::string& processName,
00649 const int& uid
00650 )
00651 {
00652 int count(0);
00653 struct dirent **namelist;
00654 int n;
00655
00656 #if __APPLE__
00657 return -1;
00658 #else
00659 n = scandir("/proc", &namelist, filter, 0);
00660 #endif
00661 if (n < 0) return -1;
00662
00663 while(n--)
00664 {
00665 std::ostringstream cmdline;
00666 cmdline << "/proc/" << namelist[n]->d_name << "/cmdline";
00667
00668 if ( grep(cmdline.str(), processName) &&
00669 (uid < 0 || matchUid(cmdline.str(), uid)) &&
00670 isMaster(namelist[n]->d_name) )
00671 {
00672 ++count;
00673 }
00674 free(namelist[n]);
00675 }
00676 free(namelist);
00677
00678 return count;
00679 }
00680
00681
00682 std::string ResourceMonitorCollection::DiskUsage::toString()
00683 {
00684 std::ostringstream msg;
00685 msg << std::fixed << std::setprecision(1) <<
00686 "Disk space usage for " << pathName <<
00687 " is " << relDiskUsage << "% (" <<
00688 absDiskUsage << "GB of " <<
00689 diskSize << "GB).";
00690 return msg.str();
00691 }
00692
00693 }
00694