CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/StorageManager/src/StatisticsReporter.cc

Go to the documentation of this file.
00001 // $Id: StatisticsReporter.cc,v 1.21 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include <sstream>
00005 
00006 #include "toolbox/net/URN.h"
00007 #include "toolbox/task/Action.h"
00008 #include "toolbox/task/WorkLoopFactory.h"
00009 #include "xcept/tools.h"
00010 #include "xdaq/ApplicationDescriptor.h"
00011 #include "xdata/Event.h"
00012 #include "xdata/InfoSpaceFactory.h"
00013 
00014 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00015 #include "EventFilter/StorageManager/interface/Exception.h"
00016 #include "EventFilter/StorageManager/interface/MonitoredQuantity.h"
00017 #include "EventFilter/StorageManager/interface/QueueID.h"
00018 #include "EventFilter/StorageManager/interface/SharedResources.h"
00019 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00020 #include "EventFilter/StorageManager/interface/Utils.h"
00021 
00022 
00023 namespace stor {
00024   
00025   StatisticsReporter::StatisticsReporter
00026   (
00027     xdaq::Application *app,
00028     SharedResourcesPtr sr
00029   ) :
00030   app_(app),
00031   alarmHandler_(new AlarmHandler(app)),
00032   sharedResources_(sr),
00033   monitoringSleepSec_(sr->configuration_->
00034     getWorkerThreadParams().monitoringSleepSec_),
00035   runMonCollection_(monitoringSleepSec_, alarmHandler_, sr),
00036   fragMonCollection_(monitoringSleepSec_),
00037   filesMonCollection_(monitoringSleepSec_*5),
00038   streamsMonCollection_(monitoringSleepSec_),
00039   dataSenderMonCollection_(monitoringSleepSec_, alarmHandler_),
00040   dqmEventMonCollection_(monitoringSleepSec_*5),
00041   resourceMonCollection_(monitoringSleepSec_*600, alarmHandler_),
00042   stateMachineMonCollection_(monitoringSleepSec_),
00043   eventConsumerMonCollection_(monitoringSleepSec_),
00044   dqmConsumerMonCollection_(monitoringSleepSec_),
00045   throughputMonCollection_(monitoringSleepSec_,
00046     sr->configuration_->getWorkerThreadParams().throuphputAveragingCycles_),
00047   monitorWL_(0),
00048   doMonitoring_(monitoringSleepSec_>boost::posix_time::seconds(0))
00049   {
00050     reset();
00051     createMonitoringInfoSpace();
00052     collectInfoSpaceItems();
00053     addRunInfoQuantitiesToApplicationInfoSpace();
00054   }
00055   
00056   
00057   void StatisticsReporter::startWorkLoop(std::string workloopName)
00058   {
00059     if ( !doMonitoring_ ) return;
00060     
00061     try
00062     {
00063       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00064       
00065       monitorWL_=
00066         toolbox::task::getWorkLoopFactory()->getWorkLoop(
00067           identifier + workloopName, "waiting");
00068       
00069       if ( ! monitorWL_->isActive() )
00070       {
00071         toolbox::task::ActionSignature* monitorAction = 
00072           toolbox::task::bind(this, &StatisticsReporter::monitorAction, 
00073             identifier + "MonitorAction");
00074         monitorWL_->submit(monitorAction);
00075         
00076         lastMonitorAction_ = utils::getCurrentTime();
00077         monitorWL_->activate();
00078       }
00079     }
00080     catch (xcept::Exception& e)
00081     {
00082       const std::string msg =
00083         "Failed to start workloop 'StatisticsReporter' with 'MonitorAction'.";
00084       XCEPT_RETHROW(stor::exception::Monitoring, msg, e);
00085     }
00086   }
00087   
00088   
00089   StatisticsReporter::~StatisticsReporter()
00090   {
00091     // Stop the monitoring activity
00092     doMonitoring_ = false;
00093     
00094     // Cancel the workloop (will wait until the action has finished)
00095     if ( monitorWL_ && monitorWL_->isActive() ) monitorWL_->cancel();
00096   }
00097   
00098   
00099   void StatisticsReporter::createMonitoringInfoSpace()
00100   {
00101     // Create an infospace which can be monitored.
00102     // The naming follows the old SM scheme.
00103     // In future, the instance number should be included.
00104     
00105     std::ostringstream oss;
00106     oss << "urn:xdaq-monitorable-" << app_->getApplicationDescriptor()->getClassName();
00107     
00108     std::string errorMsg =
00109       "Failed to create monitoring info space " + oss.str();
00110     
00111     try
00112     {
00113       toolbox::net::URN urn = app_->createQualifiedInfoSpace(oss.str());
00114       xdata::getInfoSpaceFactory()->lock();
00115       infoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00116       xdata::getInfoSpaceFactory()->unlock();
00117     }
00118     catch(xdata::exception::Exception &e)
00119     {
00120       xdata::getInfoSpaceFactory()->unlock();
00121       
00122       XCEPT_RETHROW(stor::exception::Infospace, errorMsg, e);
00123     }
00124     catch (...)
00125     {
00126       xdata::getInfoSpaceFactory()->unlock();
00127       
00128       errorMsg += " : unknown exception";
00129       XCEPT_RAISE(stor::exception::Infospace, errorMsg);
00130     }
00131   }
00132   
00133   
00134   void StatisticsReporter::collectInfoSpaceItems()
00135   {
00136     MonitorCollection::InfoSpaceItems infoSpaceItems;
00137     infoSpaceItemNames_.clear();
00138     
00139     runMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00140     fragMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00141     filesMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00142     streamsMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00143     dataSenderMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00144     dqmEventMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00145     resourceMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00146     stateMachineMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00147     eventConsumerMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00148     dqmConsumerMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00149     throughputMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00150     
00151     putItemsIntoInfoSpace(infoSpaceItems);
00152   }
00153   
00154   
00155   void StatisticsReporter::putItemsIntoInfoSpace(MonitorCollection::InfoSpaceItems& items)
00156   {
00157     
00158     for ( MonitorCollection::InfoSpaceItems::const_iterator it = items.begin(),
00159             itEnd = items.end();
00160           it != itEnd;
00161           ++it )
00162     {
00163       try
00164       {
00165         // fireItemAvailable locks the infospace internally
00166         infoSpace_->fireItemAvailable(it->first, it->second);
00167       }
00168       catch(xdata::exception::Exception &e)
00169       {
00170         std::stringstream oss;
00171         
00172         oss << "Failed to put " << it->first;
00173         oss << " into info space " << infoSpace_->name();
00174         
00175         XCEPT_RETHROW(stor::exception::Monitoring, oss.str(), e);
00176       }
00177       
00178       // keep a list of info space names for the fireItemGroupChanged
00179       infoSpaceItemNames_.push_back(it->first);
00180     }
00181   }
00182   
00183   
00184   void StatisticsReporter::addRunInfoQuantitiesToApplicationInfoSpace()
00185   {
00186     xdata::InfoSpace *infoSpace = app_->getApplicationInfoSpace();
00187     
00188     // bind the local xdata variables to the infospace
00189     infoSpace->fireItemAvailable("stateName", &stateName_);
00190     infoSpace->fireItemAvailable("storedEvents", &storedEvents_);
00191     infoSpace->fireItemAvailable("closedFiles", &closedFiles_);
00192     
00193     // spacial handling for the monitoring values requested by the HLTSFM
00194     // we want to assure that the values are current when they are queried
00195     infoSpace->addItemRetrieveListener("stateName", this);
00196     infoSpace->addItemRetrieveListener("storedEvents", this);
00197     infoSpace->addItemRetrieveListener("closedFiles", this);
00198   }
00199   
00200   
00201   bool StatisticsReporter::monitorAction(toolbox::task::WorkLoop* wl)
00202   {
00203     utils::sleepUntil(lastMonitorAction_ + monitoringSleepSec_);
00204     lastMonitorAction_ = utils::getCurrentTime();
00205     
00206     std::string errorMsg = "Failed to update the monitoring information";
00207     
00208     try
00209     {
00210       calculateStatistics();
00211       updateInfoSpace();
00212     }
00213     catch(exception::DiskSpaceAlarm &e)
00214     {
00215       sharedResources_->moveToFailedState(e);
00216     }
00217     catch(xcept::Exception &e)
00218     {
00219       LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00220         errorMsg << xcept::stdformat_exception_history(e));
00221       
00222       XCEPT_DECLARE_NESTED(stor::exception::Monitoring,
00223         sentinelException, errorMsg, e);
00224       app_->notifyQualified("error", sentinelException);
00225     }
00226     catch(std::exception &e)
00227     {
00228       errorMsg += ": ";
00229       errorMsg += e.what();
00230       
00231       LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00232         errorMsg);
00233       
00234       XCEPT_DECLARE(stor::exception::Monitoring,
00235         sentinelException, errorMsg);
00236       app_->notifyQualified("error", sentinelException);
00237     }
00238     catch(...)
00239     {
00240       errorMsg += ": Unknown exception";
00241       
00242       LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00243         errorMsg);
00244       
00245       XCEPT_DECLARE(stor::exception::Monitoring,
00246         sentinelException, errorMsg);
00247       app_->notifyQualified("error", sentinelException);
00248     }
00249     
00250     return doMonitoring_;
00251   }
00252   
00253   
00254   void StatisticsReporter::calculateStatistics()
00255   {
00256     const utils::TimePoint_t now = utils::getCurrentTime();
00257     
00258     runMonCollection_.calculateStatistics(now);
00259     fragMonCollection_.calculateStatistics(now);
00260     filesMonCollection_.calculateStatistics(now);
00261     streamsMonCollection_.calculateStatistics(now);
00262     dataSenderMonCollection_.calculateStatistics(now);
00263     dqmEventMonCollection_.calculateStatistics(now);
00264     resourceMonCollection_.calculateStatistics(now);
00265     stateMachineMonCollection_.calculateStatistics(now);
00266     eventConsumerMonCollection_.calculateStatistics(now);
00267     dqmConsumerMonCollection_.calculateStatistics(now);
00268     throughputMonCollection_.calculateStatistics(now);
00269   }
00270   
00271   
00272   void StatisticsReporter::updateInfoSpace()
00273   {
00274     std::string errorMsg =
00275       "Failed to update values of items in info space " + infoSpace_->name();
00276     
00277     // Lock the infospace to assure that all items are consistent
00278     try
00279     {
00280       infoSpace_->lock();
00281       
00282       runMonCollection_.updateInfoSpaceItems();
00283       fragMonCollection_.updateInfoSpaceItems();
00284       filesMonCollection_.updateInfoSpaceItems();
00285       streamsMonCollection_.updateInfoSpaceItems();
00286       dataSenderMonCollection_.updateInfoSpaceItems();
00287       dqmEventMonCollection_.updateInfoSpaceItems();
00288       resourceMonCollection_.updateInfoSpaceItems();
00289       stateMachineMonCollection_.updateInfoSpaceItems();
00290       eventConsumerMonCollection_.updateInfoSpaceItems();
00291       dqmConsumerMonCollection_.updateInfoSpaceItems();
00292       throughputMonCollection_.updateInfoSpaceItems();
00293       
00294       infoSpace_->unlock();
00295     }
00296     catch(std::exception &e)
00297     {
00298       infoSpace_->unlock();
00299       
00300       errorMsg += ": ";
00301       errorMsg += e.what();
00302       XCEPT_RAISE(stor::exception::Monitoring, errorMsg);
00303     }
00304     catch (...)
00305     {
00306       infoSpace_->unlock();
00307       
00308       errorMsg += " : unknown exception";
00309       XCEPT_RAISE(stor::exception::Monitoring, errorMsg);
00310     }
00311     
00312     try
00313     {
00314       // The fireItemGroupChanged locks the infospace
00315       infoSpace_->fireItemGroupChanged(infoSpaceItemNames_, this);
00316     }
00317     catch (xdata::exception::Exception &e)
00318     {
00319       XCEPT_RETHROW(stor::exception::Monitoring, errorMsg, e);
00320     }
00321   }
00322   
00323   
00324   void StatisticsReporter::reset()
00325   {
00326     const utils::TimePoint_t now = utils::getCurrentTime();
00327     
00328     // do not reset the stateMachineMonCollection, as we want to
00329     // keep the state machine history
00330     runMonCollection_.reset(now);
00331     fragMonCollection_.reset(now);
00332     filesMonCollection_.reset(now);
00333     streamsMonCollection_.reset(now);
00334     dataSenderMonCollection_.reset(now);
00335     dqmEventMonCollection_.reset(now);
00336     resourceMonCollection_.reset(now);
00337     eventConsumerMonCollection_.reset(now);
00338     dqmConsumerMonCollection_.reset(now);
00339     throughputMonCollection_.reset(now);
00340     
00341     alarmHandler_->clearAllAlarms();
00342   }
00343   
00344   
00345   void StatisticsReporter::actionPerformed(xdata::Event& ispaceEvent)
00346   {
00347     if (ispaceEvent.type() == "ItemRetrieveEvent")
00348     {
00349       std::string item =
00350         dynamic_cast<xdata::ItemRetrieveEvent&>(ispaceEvent).itemName();
00351       if (item == "closedFiles")
00352       {
00353         filesMonCollection_.updateInfoSpaceItems();
00354         try
00355         {
00356           closedFiles_.setValue( *(infoSpace_->find("closedFiles")) );
00357         }
00358         catch(xdata::exception::Exception& e)
00359         {
00360           closedFiles_ = 0;
00361         }
00362       }
00363       else if (item == "storedEvents")
00364       {
00365         streamsMonCollection_.updateInfoSpaceItems();
00366         try
00367         {
00368           storedEvents_.setValue( *(infoSpace_->find("storedEvents")) );
00369         }
00370         catch(xdata::exception::Exception& e)
00371         {
00372           storedEvents_ = 0;
00373         }
00374       } 
00375       else if (item == "stateName")
00376       {
00377         stateMachineMonCollection_.updateInfoSpaceItems();
00378         try
00379         {
00380           stateName_.setValue( *(infoSpace_->find("stateName")) );
00381         }
00382         catch(xdata::exception::Exception& e)
00383         {
00384           stateName_ = "unknown";
00385         }
00386       } 
00387     }
00388   }
00389   
00390 } // namespace stor
00391