00001
00003
00004 #include <stdio.h>
00005 #include <string>
00006 #include <sstream>
00007 #include <iomanip>
00008 #include <sys/types.h>
00009 #include <sys/stat.h>
00010 #ifdef __APPLE__
00011 #include <sys/param.h>
00012 #include <sys/mount.h>
00013 #else
00014 #include <sys/statfs.h>
00015 #endif
00016 #include <fcntl.h>
00017 #include <dirent.h>
00018 #include <fnmatch.h>
00019 #include <pwd.h>
00020 #include <fstream>
00021 #include <algorithm>
00022
00023 #include <boost/bind.hpp>
00024 #include <boost/regex.hpp>
00025
00026 #include "EventFilter/StorageManager/interface/CurlInterface.h"
00027 #include "EventFilter/StorageManager/interface/Exception.h"
00028 #include "EventFilter/StorageManager/interface/ResourceMonitorCollection.h"
00029 #include "EventFilter/StorageManager/interface/Utils.h"
00030
00031
00032 namespace stor {
00033
00034 ResourceMonitorCollection::ResourceMonitorCollection
00035 (
00036 const utils::Duration_t& updateInterval,
00037 AlarmHandlerPtr ah
00038 ) :
00039 MonitorCollection(updateInterval),
00040 updateInterval_(updateInterval),
00041 alarmHandler_(ah),
00042 numberOfCopyWorkers_(-1),
00043 numberOfInjectWorkers_(-1),
00044 nLogicalDisks_(0),
00045 latchedSataBeastStatus_(-1)
00046 {}
00047
00048
00049 void ResourceMonitorCollection::configureDisks(DiskWritingParams const& dwParams)
00050 {
00051 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00052
00053 dwParams_ = dwParams;
00054
00055 nLogicalDisks_ = std::max(dwParams.nLogicalDisk_, 1);
00056 diskUsageList_.clear();
00057 diskUsageList_.reserve(nLogicalDisks_+dwParams.otherDiskPaths_.size()+1);
00058
00059 for (unsigned int i=0; i<nLogicalDisks_; ++i) {
00060
00061 std::ostringstream pathName;
00062 pathName << dwParams.filePath_;
00063 if( dwParams.nLogicalDisk_ > 0 ) {
00064 pathName << "/" << std::setfill('0') << std::setw(2) << i;
00065 }
00066 addDisk(pathName.str());
00067 }
00068 addDisk(dwParams.dbFilePath_);
00069
00070 if ( alarmParams_.isProductionSystem_ )
00071 {
00072 addOtherDisks();
00073 }
00074 }
00075
00076
00077 void ResourceMonitorCollection::addDisk(const std::string& pathname)
00078 {
00079 if ( pathname.empty() ) return;
00080
00081 DiskUsagePtr diskUsage( new DiskUsage() );
00082 diskUsage->pathName = pathname;
00083 retrieveDiskSize(diskUsage);
00084 diskUsageList_.push_back(diskUsage);
00085 }
00086
00087
00088 void ResourceMonitorCollection::addOtherDisks()
00089 {
00090 for ( DiskWritingParams::OtherDiskPaths::const_iterator
00091 it = dwParams_.otherDiskPaths_.begin(),
00092 itEnd = dwParams_.otherDiskPaths_.end();
00093 it != itEnd;
00094 ++it)
00095 {
00096 addDisk(*it);
00097 }
00098 }
00099
00100
00101 void ResourceMonitorCollection::configureResources
00102 (
00103 ResourceMonitorParams const& rmParams
00104 )
00105 {
00106 rmParams_ = rmParams;
00107 }
00108
00109
00110 void ResourceMonitorCollection::configureAlarms
00111 (
00112 AlarmParams const& alarmParams
00113 )
00114 {
00115 alarmParams_ = alarmParams;
00116 }
00117
00118
00119 void ResourceMonitorCollection::getStats(Stats& stats) const
00120 {
00121 getDiskStats(stats);
00122
00123 stats.numberOfCopyWorkers = numberOfCopyWorkers_;
00124 stats.numberOfInjectWorkers = numberOfInjectWorkers_;
00125
00126 stats.sataBeastStatus = latchedSataBeastStatus_;
00127 }
00128
00129
00130 void ResourceMonitorCollection::getDiskStats(Stats& stats) const
00131 {
00132 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00133
00134 stats.diskUsageStatsList.clear();
00135 stats.diskUsageStatsList.reserve(diskUsageList_.size());
00136 for ( DiskUsagePtrList::const_iterator it = diskUsageList_.begin(),
00137 itEnd = diskUsageList_.end();
00138 it != itEnd;
00139 ++it)
00140 {
00141 DiskUsageStatsPtr diskUsageStats(new DiskUsageStats);
00142 diskUsageStats->diskSize = (*it)->diskSize;
00143 diskUsageStats->absDiskUsage = (*it)->absDiskUsage;
00144 diskUsageStats->relDiskUsage = (*it)->relDiskUsage;
00145 diskUsageStats->pathName = (*it)->pathName;
00146 diskUsageStats->alarmState = (*it)->alarmState;
00147 stats.diskUsageStatsList.push_back(diskUsageStats);
00148 }
00149 }
00150
00151
00152 void ResourceMonitorCollection::do_calculateStatistics()
00153 {
00154 calcDiskUsage();
00155 calcNumberOfCopyWorkers();
00156 calcNumberOfInjectWorkers();
00157 checkSataBeasts();
00158 }
00159
00160
00161 void ResourceMonitorCollection::do_reset()
00162 {
00163 numberOfCopyWorkers_ = -1;
00164 numberOfInjectWorkers_ = -1;
00165 latchedSataBeastStatus_ = -1;
00166
00167 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00168 for ( DiskUsagePtrList::const_iterator it = diskUsageList_.begin(),
00169 itEnd = diskUsageList_.end();
00170 it != itEnd;
00171 ++it)
00172 {
00173 (*it)->absDiskUsage = -1;
00174 (*it)->relDiskUsage = -1;
00175 (*it)->alarmState = AlarmHandler::OKAY;
00176 }
00177 }
00178
00179
00180 void ResourceMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00181 {
00182 infoSpaceItems.push_back(std::make_pair("copyWorkers", ©Workers_));
00183 infoSpaceItems.push_back(std::make_pair("injectWorkers", &injectWorkers_));
00184 infoSpaceItems.push_back(std::make_pair("sataBeastStatus", &sataBeastStatus_));
00185 infoSpaceItems.push_back(std::make_pair("numberOfDisks", &numberOfDisks_));
00186 infoSpaceItems.push_back(std::make_pair("diskPaths", &diskPaths_));
00187 infoSpaceItems.push_back(std::make_pair("totalDiskSpace", &totalDiskSpace_));
00188 infoSpaceItems.push_back(std::make_pair("usedDiskSpace", &usedDiskSpace_));
00189 }
00190
00191
00192 void ResourceMonitorCollection::do_updateInfoSpaceItems()
00193 {
00194 Stats stats;
00195 getStats(stats);
00196
00197 if (stats.numberOfCopyWorkers > 0)
00198 copyWorkers_ = static_cast<xdata::UnsignedInteger32>(stats.numberOfCopyWorkers);
00199 else
00200 copyWorkers_ = 0;
00201
00202 if (stats.numberOfInjectWorkers > 0)
00203 injectWorkers_ = static_cast<xdata::UnsignedInteger32>(stats.numberOfInjectWorkers);
00204 else
00205 injectWorkers_ = 0;
00206
00207 sataBeastStatus_ = stats.sataBeastStatus;
00208 numberOfDisks_ = nLogicalDisks_;
00209
00210 const size_t statsCount = stats.diskUsageStatsList.size();
00211 const size_t infospaceCount = diskPaths_.size();
00212
00213 if ( statsCount != infospaceCount )
00214 {
00215 diskPaths_.resize(statsCount);
00216 totalDiskSpace_.resize(statsCount);
00217 usedDiskSpace_.resize(statsCount);
00218 }
00219
00220 for (size_t i=0; i < statsCount; ++i)
00221 {
00222 diskPaths_.at(i) = static_cast<xdata::String>(stats.diskUsageStatsList.at(i)->pathName);
00223 totalDiskSpace_.at(i) = static_cast<xdata::UnsignedInteger32>(
00224 static_cast<unsigned int>(stats.diskUsageStatsList.at(i)->diskSize * 1024));
00225 usedDiskSpace_.at(i) = static_cast<xdata::UnsignedInteger32>(
00226 static_cast<unsigned int>(stats.diskUsageStatsList.at(i)->absDiskUsage * 1024));
00227 }
00228
00229 calcDiskUsage();
00230 }
00231
00232
00233 void ResourceMonitorCollection::calcDiskUsage()
00234 {
00235 boost::mutex::scoped_lock sl(diskUsageListMutex_);
00236
00237 for ( DiskUsagePtrList::iterator it = diskUsageList_.begin(),
00238 itEnd = diskUsageList_.end();
00239 it != itEnd;
00240 ++it)
00241 {
00242 retrieveDiskSize(*it);
00243 }
00244 }
00245
00246 void ResourceMonitorCollection::retrieveDiskSize(DiskUsagePtr diskUsage)
00247 {
00248 #if __APPLE__
00249 struct statfs buf;
00250 int retVal = statfs(diskUsage->pathName.c_str(), &buf);
00251 #else
00252 struct statfs64 buf;
00253 int retVal = statfs64(diskUsage->pathName.c_str(), &buf);
00254 #endif
00255 if(retVal==0) {
00256 unsigned int blksize = buf.f_bsize;
00257 diskUsage->diskSize =
00258 static_cast<double>(buf.f_blocks * blksize) / 1024 / 1024 / 1024;
00259 diskUsage->absDiskUsage =
00260 diskUsage->diskSize -
00261 static_cast<double>(buf.f_bavail * blksize) / 1024 / 1024 / 1024;
00262 diskUsage->relDiskUsage = (100 * (diskUsage->absDiskUsage / diskUsage->diskSize));
00263 if ( diskUsage->relDiskUsage > dwParams_.highWaterMark_ )
00264 {
00265 emitDiskSpaceAlarm(diskUsage);
00266 }
00267 else if ( diskUsage->relDiskUsage < dwParams_.highWaterMark_*0.95 )
00268
00269 {
00270 revokeDiskAlarm(diskUsage);
00271 }
00272 }
00273 else
00274 {
00275 emitDiskAlarm(diskUsage, errno);
00276 diskUsage->diskSize = -1;
00277 diskUsage->absDiskUsage = -1;
00278 diskUsage->relDiskUsage = -1;
00279 }
00280 }
00281
00282
00283 void ResourceMonitorCollection::emitDiskAlarm(DiskUsagePtr diskUsage, error_t e)
00284
00285 {
00286 std::string msg;
00287
00288 switch(e)
00289 {
00290 case ENOENT :
00291 diskUsage->alarmState = AlarmHandler::ERROR;
00292 msg = "Cannot access " + diskUsage->pathName + ". Is it mounted?";
00293 break;
00294
00295 default :
00296 diskUsage->alarmState = AlarmHandler::WARNING;
00297 msg = "Failed to retrieve disk space information for " + diskUsage->pathName + ":"
00298 + strerror(e);
00299 }
00300
00301 XCEPT_DECLARE(stor::exception::DiskSpaceAlarm, ex, msg);
00302 alarmHandler_->notifySentinel(diskUsage->alarmState, ex);
00303 }
00304
00305
00306 void ResourceMonitorCollection::emitDiskSpaceAlarm(DiskUsagePtr diskUsage)
00307 {
00308 if ( diskUsage->relDiskUsage > dwParams_.failHighWaterMark_ )
00309 {
00310 failIfImportantDisk(diskUsage);
00311 }
00312
00313 diskUsage->alarmState = AlarmHandler::WARNING;
00314
00315 XCEPT_DECLARE(stor::exception::DiskSpaceAlarm, ex, diskUsage->toString());
00316 alarmHandler_->raiseAlarm(diskUsage->pathName, diskUsage->alarmState, ex);
00317 }
00318
00319
00320 void ResourceMonitorCollection::failIfImportantDisk(DiskUsagePtr diskUsage)
00321 {
00322
00323 DiskWritingParams::OtherDiskPaths::const_iterator begin =
00324 dwParams_.otherDiskPaths_.begin();
00325 DiskWritingParams::OtherDiskPaths::const_iterator end =
00326 dwParams_.otherDiskPaths_.end();
00327 if ( std::find(begin, end, diskUsage->pathName) != end ) return;
00328
00329 diskUsage->alarmState = AlarmHandler::FATAL;
00330 XCEPT_RAISE(stor::exception::DiskSpaceAlarm, diskUsage->toString());
00331 }
00332
00333
00334 void ResourceMonitorCollection::revokeDiskAlarm(DiskUsagePtr diskUsage)
00335 {
00336 diskUsage->alarmState = AlarmHandler::OKAY;
00337
00338 alarmHandler_->revokeAlarm(diskUsage->pathName);
00339 }
00340
00341
00342 void ResourceMonitorCollection::calcNumberOfCopyWorkers()
00343 {
00344 struct passwd* passwd = getpwnam(rmParams_.copyWorkers_.user_.c_str());
00345 if (passwd)
00346 {
00347 numberOfCopyWorkers_ =
00348 getProcessCount(rmParams_.copyWorkers_.command_, passwd->pw_uid);
00349 }
00350 else
00351 {
00352 numberOfCopyWorkers_ = 0;
00353 }
00354
00355 if ( alarmParams_.isProductionSystem_ && rmParams_.copyWorkers_.expectedCount_ >= 0 )
00356 {
00357 checkNumberOfCopyWorkers();
00358 }
00359 }
00360
00361
00362 void ResourceMonitorCollection::checkNumberOfCopyWorkers()
00363 {
00364 const std::string alarmName = "CopyWorkers";
00365
00366 if ( numberOfCopyWorkers_ < rmParams_.copyWorkers_.expectedCount_ )
00367 {
00368 std::ostringstream msg;
00369 msg << "Expected " << rmParams_.copyWorkers_.expectedCount_ <<
00370 " running CopyWorkers, but found " <<
00371 numberOfCopyWorkers_ << ".";
00372 XCEPT_DECLARE(stor::exception::CopyWorkers, ex, msg.str());
00373 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::WARNING, ex);
00374 }
00375 else
00376 {
00377 alarmHandler_->revokeAlarm(alarmName);
00378 }
00379 }
00380
00381
00382 void ResourceMonitorCollection::calcNumberOfInjectWorkers()
00383 {
00384 struct passwd* passwd = getpwnam(rmParams_.injectWorkers_.user_.c_str());
00385 if (passwd)
00386 {
00387 numberOfInjectWorkers_ = getProcessCount(rmParams_.injectWorkers_.command_, passwd->pw_uid);
00388 }
00389 else
00390 {
00391 numberOfInjectWorkers_ = 0;
00392 }
00393
00394 if (
00395 alarmParams_.isProductionSystem_ &&
00396 rmParams_.injectWorkers_.expectedCount_ >= 0
00397 )
00398 {
00399 checkNumberOfInjectWorkers();
00400 }
00401 }
00402
00403
00404 void ResourceMonitorCollection::checkNumberOfInjectWorkers()
00405 {
00406 const std::string alarmName = "InjectWorkers";
00407
00408 if ( numberOfInjectWorkers_ != rmParams_.injectWorkers_.expectedCount_ )
00409 {
00410 std::ostringstream msg;
00411 msg << "Expected " << rmParams_.injectWorkers_.expectedCount_ <<
00412 " running InjectWorkers, but found " <<
00413 numberOfInjectWorkers_ << ".";
00414 XCEPT_DECLARE(stor::exception::InjectWorkers, ex, msg.str());
00415 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::WARNING, ex);
00416 }
00417 else
00418 {
00419 alarmHandler_->revokeAlarm(alarmName);
00420 }
00421 }
00422
00423
00424 void ResourceMonitorCollection::checkSataBeasts()
00425 {
00426 SATABeasts sataBeasts;
00427 if ( getSataBeasts(sataBeasts) )
00428 {
00429 for (
00430 SATABeasts::const_iterator it = sataBeasts.begin(),
00431 itEnd= sataBeasts.end();
00432 it != itEnd;
00433 ++it
00434 )
00435 {
00436 checkSataBeast(*it);
00437 }
00438 }
00439 else
00440 {
00441 latchedSataBeastStatus_ = -1;
00442 }
00443 }
00444
00445
00446 bool ResourceMonitorCollection::getSataBeasts(SATABeasts& sataBeasts)
00447 {
00448 if (! alarmParams_.isProductionSystem_) return false;
00449
00450 std::ifstream in;
00451 in.open( "/proc/mounts" );
00452
00453 if ( ! in.is_open() ) return false;
00454
00455 std::string line;
00456 while( getline(in,line) )
00457 {
00458 size_t pos = line.find("sata");
00459 if ( pos != std::string::npos )
00460 {
00461 std::ostringstream host;
00462 host << "satab-c2c"
00463 << std::setw(2) << std::setfill('0')
00464 << line.substr(pos+4,1)
00465 << "-"
00466 << std::setw(2) << std::setfill('0')
00467 << line.substr(pos+5,1);
00468 sataBeasts.insert(host.str());
00469 }
00470 }
00471 return !sataBeasts.empty();
00472 }
00473
00474
00475 void ResourceMonitorCollection::checkSataBeast(const std::string& sataBeast)
00476 {
00477 if ( ! (checkSataDisks(sataBeast,"-00.cms") || checkSataDisks(sataBeast,"-10.cms")) )
00478 {
00479 XCEPT_DECLARE(stor::exception::SataBeast, ex,
00480 "Failed to connect to SATA beast " + sataBeast);
00481 alarmHandler_->raiseAlarm(sataBeast, AlarmHandler::ERROR, ex);
00482
00483 latchedSataBeastStatus_ = 99999;
00484 }
00485 }
00486
00487
00488 bool ResourceMonitorCollection::checkSataDisks
00489 (
00490 const std::string& sataBeast,
00491 const std::string& hostSuffix
00492 )
00493 {
00494 CurlInterface curlInterface;
00495 CurlInterface::Content content;
00496
00497
00498 if ( rmParams_.sataUser_.empty() ) return true;
00499
00500 const CURLcode returnCode =
00501 curlInterface.getContent(
00502 "https://" + sataBeast + hostSuffix + "/status.asp",rmParams_.sataUser_,
00503 content
00504 );
00505
00506 if (returnCode == CURLE_OK)
00507 {
00508 updateSataBeastStatus(sataBeast, std::string(&content[0]));
00509 return true;
00510 }
00511 else
00512 {
00513 std::ostringstream msg;
00514 msg << "Failed to connect to SATA controller "
00515 << sataBeast << hostSuffix
00516 << " with user name '" << rmParams_.sataUser_
00517 << "': " << std::string(&content[0]);
00518 XCEPT_DECLARE(stor::exception::SataBeast, ex, msg.str());
00519 alarmHandler_->notifySentinel(AlarmHandler::WARNING, ex);
00520
00521 return false;
00522 }
00523 }
00524
00525
00526 void ResourceMonitorCollection::updateSataBeastStatus
00527 (
00528 const std::string& sataBeast,
00529 const std::string& content
00530 )
00531 {
00532 boost::regex failedEntry(">([^<]* has failed[^<]*)");
00533 boost::regex failedDisk("Hard disk([[:digit:]]+)");
00534 boost::regex failedController("RAID controller ([[:digit:]]+)");
00535 boost::match_results<std::string::const_iterator> matchedEntry, matchedCause;
00536 boost::match_flag_type flags = boost::match_default;
00537
00538 std::string::const_iterator start = content.begin();
00539 std::string::const_iterator end = content.end();
00540
00541 unsigned int newSataBeastStatus = 0;
00542
00543 while( regex_search(start, end, matchedEntry, failedEntry, flags) )
00544 {
00545 std::string errorMsg = matchedEntry[1];
00546 XCEPT_DECLARE(stor::exception::SataBeast, ex, sataBeast+": "+errorMsg);
00547 alarmHandler_->raiseAlarm(sataBeast, AlarmHandler::ERROR, ex);
00548
00549
00550 if ( regex_search(errorMsg, matchedCause, failedDisk) )
00551 {
00552
00553 ++newSataBeastStatus;
00554 }
00555 else if ( regex_search(errorMsg, matchedCause, failedController) )
00556 {
00557
00558 newSataBeastStatus += 100;
00559 }
00560 else
00561 {
00562
00563 newSataBeastStatus += 1000;
00564 }
00565
00566
00567 start = matchedEntry[0].second;
00568
00569 flags |= boost::match_prev_avail;
00570 flags |= boost::match_not_bob;
00571 }
00572
00573 latchedSataBeastStatus_ = newSataBeastStatus;
00574
00575 if (latchedSataBeastStatus_ == 0)
00576 alarmHandler_->revokeAlarm(sataBeast);
00577
00578 }
00579
00580
00581 namespace {
00582 int filter(const struct dirent *dir)
00583 {
00584 return !fnmatch("[1-9]*", dir->d_name, 0);
00585 }
00586
00587 bool matchUid(const std::string& filename, const uid_t& uid)
00588 {
00589 struct stat filestat;
00590 int result = stat(filename.c_str(), &filestat);
00591 return (result == 0 && filestat.st_uid == uid);
00592 }
00593
00594 bool isMaster(const char* pid)
00595 {
00596
00597 char buf[800];
00598 int fd;
00599 int ppid = 0;
00600 std::ostringstream statfile;
00601 statfile << "/proc/" << pid << "/stat";
00602 snprintf(buf, 32, statfile.str().c_str(), pid);
00603 if ( (fd = open(buf, O_RDONLY, 0) ) == -1 ) return false;
00604 int num = read(fd, buf, sizeof buf - 1);
00605 if(num<80) return false;
00606 buf[num] = '\0';
00607 char* tmp = strrchr(buf, ')');
00608 num = sscanf(tmp + 4,
00609 "%d", &ppid);
00610 close(fd);
00611 return ( num == 1 && ppid == 1 );
00612 }
00613
00614 bool grep(const std::string& cmdline, const std::string& name)
00615 {
00616
00617 std::ifstream in;
00618 in.open( cmdline.c_str() );
00619
00620 std::string line;
00621 if ( in.is_open() )
00622 {
00623 std::string tmp;
00624 while( getline(in,tmp,'\0') )
00625 {
00626 line.append(tmp);
00627 line.append(" ");
00628 }
00629 in.close();
00630 }
00631
00632 return ( line.find(name) != std::string::npos );
00633 }
00634 }
00635
00636
00637 int ResourceMonitorCollection::getProcessCount
00638 (
00639 const std::string& processName,
00640 const int& uid
00641 )
00642 {
00643 int count(0);
00644 struct dirent **namelist;
00645 int n;
00646
00647 #if __APPLE__
00648 return -1;
00649 #else
00650 n = scandir("/proc", &namelist, filter, 0);
00651 #endif
00652 if (n < 0) return -1;
00653
00654 while(n--)
00655 {
00656 std::ostringstream cmdline;
00657 cmdline << "/proc/" << namelist[n]->d_name << "/cmdline";
00658
00659 if ( grep(cmdline.str(), processName) &&
00660 (uid < 0 || matchUid(cmdline.str(), uid)) &&
00661 isMaster(namelist[n]->d_name) )
00662 {
00663 ++count;
00664 }
00665 free(namelist[n]);
00666 }
00667 free(namelist);
00668
00669 return count;
00670 }
00671
00672
00673 std::string ResourceMonitorCollection::DiskUsage::toString()
00674 {
00675 std::ostringstream msg;
00676 msg << std::fixed << std::setprecision(1) <<
00677 "Disk space usage for " << pathName <<
00678 " is " << relDiskUsage << "% (" <<
00679 absDiskUsage << "GB of " <<
00680 diskSize << "GB).";
00681 return msg.str();
00682 }
00683
00684 }
00685