00001
00003
00004 #include <sstream>
00005
00006 #include "toolbox/net/URN.h"
00007 #include "toolbox/task/Action.h"
00008 #include "toolbox/task/WorkLoopFactory.h"
00009 #include "xcept/tools.h"
00010 #include "xdaq/ApplicationDescriptor.h"
00011 #include "xdata/Event.h"
00012 #include "xdata/InfoSpaceFactory.h"
00013
00014 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00015 #include "EventFilter/StorageManager/interface/Exception.h"
00016 #include "EventFilter/StorageManager/interface/MonitoredQuantity.h"
00017 #include "EventFilter/StorageManager/interface/QueueID.h"
00018 #include "EventFilter/StorageManager/interface/SharedResources.h"
00019 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00020 #include "EventFilter/StorageManager/interface/Utils.h"
00021
00022
00023 namespace stor {
00024
00025 StatisticsReporter::StatisticsReporter
00026 (
00027 xdaq::Application *app,
00028 SharedResourcesPtr sr
00029 ) :
00030 app_(app),
00031 alarmHandler_(new AlarmHandler(app)),
00032 sharedResources_(sr),
00033 monitoringSleepSec_(sr->configuration_->
00034 getWorkerThreadParams().monitoringSleepSec_),
00035 runMonCollection_(monitoringSleepSec_, alarmHandler_, sr),
00036 fragMonCollection_(monitoringSleepSec_),
00037 filesMonCollection_(monitoringSleepSec_*5),
00038 streamsMonCollection_(monitoringSleepSec_),
00039 dataSenderMonCollection_(monitoringSleepSec_, alarmHandler_),
00040 dqmEventMonCollection_(monitoringSleepSec_*5),
00041 resourceMonCollection_(monitoringSleepSec_*600, alarmHandler_),
00042 stateMachineMonCollection_(monitoringSleepSec_),
00043 eventConsumerMonCollection_(monitoringSleepSec_),
00044 dqmConsumerMonCollection_(monitoringSleepSec_),
00045 throughputMonCollection_(monitoringSleepSec_,
00046 sr->configuration_->getWorkerThreadParams().throuphputAveragingCycles_),
00047 monitorWL_(0),
00048 doMonitoring_(monitoringSleepSec_>boost::posix_time::seconds(0))
00049 {
00050 reset();
00051 createMonitoringInfoSpace();
00052 collectInfoSpaceItems();
00053 addRunInfoQuantitiesToApplicationInfoSpace();
00054 }
00055
00056
00057 void StatisticsReporter::startWorkLoop(std::string workloopName)
00058 {
00059 if ( !doMonitoring_ ) return;
00060
00061 try
00062 {
00063 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00064
00065 monitorWL_=
00066 toolbox::task::getWorkLoopFactory()->getWorkLoop(
00067 identifier + workloopName, "waiting");
00068
00069 if ( ! monitorWL_->isActive() )
00070 {
00071 toolbox::task::ActionSignature* monitorAction =
00072 toolbox::task::bind(this, &StatisticsReporter::monitorAction,
00073 identifier + "MonitorAction");
00074 monitorWL_->submit(monitorAction);
00075
00076 lastMonitorAction_ = utils::getCurrentTime();
00077 monitorWL_->activate();
00078 }
00079 }
00080 catch (xcept::Exception& e)
00081 {
00082 const std::string msg =
00083 "Failed to start workloop 'StatisticsReporter' with 'MonitorAction'.";
00084 XCEPT_RETHROW(stor::exception::Monitoring, msg, e);
00085 }
00086 }
00087
00088
00089 StatisticsReporter::~StatisticsReporter()
00090 {
00091
00092 doMonitoring_ = false;
00093
00094
00095 if ( monitorWL_ && monitorWL_->isActive() ) monitorWL_->cancel();
00096 }
00097
00098
00099 void StatisticsReporter::createMonitoringInfoSpace()
00100 {
00101
00102
00103
00104
00105 std::ostringstream oss;
00106 oss << "urn:xdaq-monitorable-" << app_->getApplicationDescriptor()->getClassName();
00107
00108 std::string errorMsg =
00109 "Failed to create monitoring info space " + oss.str();
00110
00111 try
00112 {
00113 toolbox::net::URN urn = app_->createQualifiedInfoSpace(oss.str());
00114 xdata::getInfoSpaceFactory()->lock();
00115 infoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00116 xdata::getInfoSpaceFactory()->unlock();
00117 }
00118 catch(xdata::exception::Exception &e)
00119 {
00120 xdata::getInfoSpaceFactory()->unlock();
00121
00122 XCEPT_RETHROW(stor::exception::Infospace, errorMsg, e);
00123 }
00124 catch (...)
00125 {
00126 xdata::getInfoSpaceFactory()->unlock();
00127
00128 errorMsg += " : unknown exception";
00129 XCEPT_RAISE(stor::exception::Infospace, errorMsg);
00130 }
00131 }
00132
00133
00134 void StatisticsReporter::collectInfoSpaceItems()
00135 {
00136 MonitorCollection::InfoSpaceItems infoSpaceItems;
00137 infoSpaceItemNames_.clear();
00138
00139 runMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00140 fragMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00141 filesMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00142 streamsMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00143 dataSenderMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00144 dqmEventMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00145 resourceMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00146 stateMachineMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00147 eventConsumerMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00148 dqmConsumerMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00149 throughputMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00150
00151 putItemsIntoInfoSpace(infoSpaceItems);
00152 }
00153
00154
00155 void StatisticsReporter::putItemsIntoInfoSpace(MonitorCollection::InfoSpaceItems& items)
00156 {
00157
00158 for ( MonitorCollection::InfoSpaceItems::const_iterator it = items.begin(),
00159 itEnd = items.end();
00160 it != itEnd;
00161 ++it )
00162 {
00163 try
00164 {
00165
00166 infoSpace_->fireItemAvailable(it->first, it->second);
00167 }
00168 catch(xdata::exception::Exception &e)
00169 {
00170 std::stringstream oss;
00171
00172 oss << "Failed to put " << it->first;
00173 oss << " into info space " << infoSpace_->name();
00174
00175 XCEPT_RETHROW(stor::exception::Monitoring, oss.str(), e);
00176 }
00177
00178
00179 infoSpaceItemNames_.push_back(it->first);
00180 }
00181 }
00182
00183
00184 void StatisticsReporter::addRunInfoQuantitiesToApplicationInfoSpace()
00185 {
00186 xdata::InfoSpace *infoSpace = app_->getApplicationInfoSpace();
00187
00188
00189 infoSpace->fireItemAvailable("stateName", &stateName_);
00190 infoSpace->fireItemAvailable("storedEvents", &storedEvents_);
00191 infoSpace->fireItemAvailable("closedFiles", &closedFiles_);
00192
00193
00194
00195 infoSpace->addItemRetrieveListener("stateName", this);
00196 infoSpace->addItemRetrieveListener("storedEvents", this);
00197 infoSpace->addItemRetrieveListener("closedFiles", this);
00198 }
00199
00200
00201 bool StatisticsReporter::monitorAction(toolbox::task::WorkLoop* wl)
00202 {
00203 utils::sleepUntil(lastMonitorAction_ + monitoringSleepSec_);
00204 lastMonitorAction_ = utils::getCurrentTime();
00205
00206 std::string errorMsg = "Failed to update the monitoring information";
00207
00208 try
00209 {
00210 calculateStatistics();
00211 updateInfoSpace();
00212 }
00213 catch(exception::DiskSpaceAlarm &e)
00214 {
00215 sharedResources_->moveToFailedState(e);
00216 }
00217 catch(xcept::Exception &e)
00218 {
00219 LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00220 errorMsg << xcept::stdformat_exception_history(e));
00221
00222 XCEPT_DECLARE_NESTED(stor::exception::Monitoring,
00223 sentinelException, errorMsg, e);
00224 app_->notifyQualified("error", sentinelException);
00225 }
00226 catch(std::exception &e)
00227 {
00228 errorMsg += ": ";
00229 errorMsg += e.what();
00230
00231 LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00232 errorMsg);
00233
00234 XCEPT_DECLARE(stor::exception::Monitoring,
00235 sentinelException, errorMsg);
00236 app_->notifyQualified("error", sentinelException);
00237 }
00238 catch(...)
00239 {
00240 errorMsg += ": Unknown exception";
00241
00242 LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00243 errorMsg);
00244
00245 XCEPT_DECLARE(stor::exception::Monitoring,
00246 sentinelException, errorMsg);
00247 app_->notifyQualified("error", sentinelException);
00248 }
00249
00250 return doMonitoring_;
00251 }
00252
00253
00254 void StatisticsReporter::calculateStatistics()
00255 {
00256 const utils::TimePoint_t now = utils::getCurrentTime();
00257
00258 runMonCollection_.calculateStatistics(now);
00259 fragMonCollection_.calculateStatistics(now);
00260 filesMonCollection_.calculateStatistics(now);
00261 streamsMonCollection_.calculateStatistics(now);
00262 dataSenderMonCollection_.calculateStatistics(now);
00263 dqmEventMonCollection_.calculateStatistics(now);
00264 resourceMonCollection_.calculateStatistics(now);
00265 stateMachineMonCollection_.calculateStatistics(now);
00266 eventConsumerMonCollection_.calculateStatistics(now);
00267 dqmConsumerMonCollection_.calculateStatistics(now);
00268 throughputMonCollection_.calculateStatistics(now);
00269 }
00270
00271
00272 void StatisticsReporter::updateInfoSpace()
00273 {
00274 std::string errorMsg =
00275 "Failed to update values of items in info space " + infoSpace_->name();
00276
00277
00278 try
00279 {
00280 infoSpace_->lock();
00281
00282 runMonCollection_.updateInfoSpaceItems();
00283 fragMonCollection_.updateInfoSpaceItems();
00284 filesMonCollection_.updateInfoSpaceItems();
00285 streamsMonCollection_.updateInfoSpaceItems();
00286 dataSenderMonCollection_.updateInfoSpaceItems();
00287 dqmEventMonCollection_.updateInfoSpaceItems();
00288 resourceMonCollection_.updateInfoSpaceItems();
00289 stateMachineMonCollection_.updateInfoSpaceItems();
00290 eventConsumerMonCollection_.updateInfoSpaceItems();
00291 dqmConsumerMonCollection_.updateInfoSpaceItems();
00292 throughputMonCollection_.updateInfoSpaceItems();
00293
00294 infoSpace_->unlock();
00295 }
00296 catch(std::exception &e)
00297 {
00298 infoSpace_->unlock();
00299
00300 errorMsg += ": ";
00301 errorMsg += e.what();
00302 XCEPT_RAISE(stor::exception::Monitoring, errorMsg);
00303 }
00304 catch (...)
00305 {
00306 infoSpace_->unlock();
00307
00308 errorMsg += " : unknown exception";
00309 XCEPT_RAISE(stor::exception::Monitoring, errorMsg);
00310 }
00311
00312 try
00313 {
00314
00315 infoSpace_->fireItemGroupChanged(infoSpaceItemNames_, this);
00316 }
00317 catch (xdata::exception::Exception &e)
00318 {
00319 XCEPT_RETHROW(stor::exception::Monitoring, errorMsg, e);
00320 }
00321 }
00322
00323
00324 void StatisticsReporter::reset()
00325 {
00326 const utils::TimePoint_t now = utils::getCurrentTime();
00327
00328
00329
00330 runMonCollection_.reset(now);
00331 fragMonCollection_.reset(now);
00332 filesMonCollection_.reset(now);
00333 streamsMonCollection_.reset(now);
00334 dataSenderMonCollection_.reset(now);
00335 dqmEventMonCollection_.reset(now);
00336 resourceMonCollection_.reset(now);
00337 eventConsumerMonCollection_.reset(now);
00338 dqmConsumerMonCollection_.reset(now);
00339 throughputMonCollection_.reset(now);
00340
00341 alarmHandler_->clearAllAlarms();
00342 }
00343
00344
00345 void StatisticsReporter::actionPerformed(xdata::Event& ispaceEvent)
00346 {
00347 if (ispaceEvent.type() == "ItemRetrieveEvent")
00348 {
00349 std::string item =
00350 dynamic_cast<xdata::ItemRetrieveEvent&>(ispaceEvent).itemName();
00351 if (item == "closedFiles")
00352 {
00353 filesMonCollection_.updateInfoSpaceItems();
00354 try
00355 {
00356 closedFiles_.setValue( *(infoSpace_->find("closedFiles")) );
00357 }
00358 catch(xdata::exception::Exception& e)
00359 {
00360 closedFiles_ = 0;
00361 }
00362 }
00363 else if (item == "storedEvents")
00364 {
00365 streamsMonCollection_.updateInfoSpaceItems();
00366 try
00367 {
00368 storedEvents_.setValue( *(infoSpace_->find("storedEvents")) );
00369 }
00370 catch(xdata::exception::Exception& e)
00371 {
00372 storedEvents_ = 0;
00373 }
00374 }
00375 else if (item == "stateName")
00376 {
00377 stateMachineMonCollection_.updateInfoSpaceItems();
00378 try
00379 {
00380 stateName_.setValue( *(infoSpace_->find("stateName")) );
00381 }
00382 catch(xdata::exception::Exception& e)
00383 {
00384 stateName_ = "unknown";
00385 }
00386 }
00387 }
00388 }
00389
00390 }
00391