Go to the documentation of this file.00001
00003
00004 #include <sstream>
00005
00006 #include "toolbox/net/URN.h"
00007 #include "toolbox/task/Action.h"
00008 #include "toolbox/task/WorkLoopFactory.h"
00009 #include "xcept/tools.h"
00010 #include "xdaq/ApplicationDescriptor.h"
00011 #include "xdata/Event.h"
00012 #include "xdata/InfoSpaceFactory.h"
00013
00014 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00015 #include "EventFilter/StorageManager/interface/MonitoredQuantity.h"
00016 #include "EventFilter/SMProxyServer/interface/Exception.h"
00017 #include "EventFilter/SMProxyServer/interface/StatisticsReporter.h"
00018
00019
00020 namespace smproxy {
00021
00022 StatisticsReporter::StatisticsReporter
00023 (
00024 xdaq::Application *app,
00025 const QueueConfigurationParams& qcp
00026 ) :
00027 app_(app),
00028 alarmHandler_(new stor::AlarmHandler(app)),
00029 monitoringSleepSec_(qcp.monitoringSleepSec_),
00030 dataRetrieverMonCollection_(monitoringSleepSec_, alarmHandler_),
00031 dqmEventMonCollection_(monitoringSleepSec_*5),
00032 eventConsumerMonCollection_(monitoringSleepSec_),
00033 dqmConsumerMonCollection_(monitoringSleepSec_),
00034 doMonitoring_(monitoringSleepSec_>boost::posix_time::seconds(0))
00035 {
00036 reset();
00037 createMonitoringInfoSpace();
00038 collectInfoSpaceItems();
00039 }
00040
00041
00042 void StatisticsReporter::startWorkLoop(std::string workloopName)
00043 {
00044 if ( !doMonitoring_ ) return;
00045
00046 try
00047 {
00048 std::string identifier =
00049 stor::utils::getIdentifier(app_->getApplicationDescriptor());
00050
00051 monitorWL_=
00052 toolbox::task::getWorkLoopFactory()->getWorkLoop(
00053 identifier + workloopName, "waiting");
00054
00055 if ( ! monitorWL_->isActive() )
00056 {
00057 toolbox::task::ActionSignature* monitorAction =
00058 toolbox::task::bind(this, &StatisticsReporter::monitorAction,
00059 identifier + "MonitorAction");
00060 monitorWL_->submit(monitorAction);
00061
00062 lastMonitorAction_ = stor::utils::getCurrentTime();
00063 monitorWL_->activate();
00064 }
00065 }
00066 catch (xcept::Exception& e)
00067 {
00068 std::string msg =
00069 "Failed to start workloop 'StatisticsReporter' with 'MonitorAction'.";
00070 XCEPT_RETHROW(exception::Monitoring, msg, e);
00071 }
00072 }
00073
00074
00075 StatisticsReporter::~StatisticsReporter()
00076 {
00077
00078 doMonitoring_ = false;
00079
00080
00081 if ( monitorWL_ && monitorWL_->isActive() ) monitorWL_->cancel();
00082 }
00083
00084
00085 void StatisticsReporter::createMonitoringInfoSpace()
00086 {
00087
00088
00089 std::ostringstream oss;
00090 oss << "urn:xdaq-monitorable-" << app_->getApplicationDescriptor()->getClassName();
00091
00092 std::string errorMsg =
00093 "Failed to create monitoring info space " + oss.str();
00094
00095 try
00096 {
00097 toolbox::net::URN urn = app_->createQualifiedInfoSpace(oss.str());
00098 xdata::getInfoSpaceFactory()->lock();
00099 infoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00100 xdata::getInfoSpaceFactory()->unlock();
00101 }
00102 catch(xdata::exception::Exception &e)
00103 {
00104 xdata::getInfoSpaceFactory()->unlock();
00105
00106 XCEPT_RETHROW(exception::Infospace, errorMsg, e);
00107 }
00108 catch (...)
00109 {
00110 xdata::getInfoSpaceFactory()->unlock();
00111
00112 errorMsg += " : unknown exception";
00113 XCEPT_RAISE(exception::Infospace, errorMsg);
00114 }
00115 }
00116
00117
00118 void StatisticsReporter::collectInfoSpaceItems()
00119 {
00120 stor::MonitorCollection::InfoSpaceItems infoSpaceItems;
00121 infoSpaceItemNames_.clear();
00122
00123 dataRetrieverMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00124 dqmEventMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00125 eventConsumerMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00126 dqmConsumerMonCollection_.appendInfoSpaceItems(infoSpaceItems);
00127
00128 putItemsIntoInfoSpace(infoSpaceItems);
00129 }
00130
00131
00132 void StatisticsReporter::putItemsIntoInfoSpace
00133 (
00134 stor::MonitorCollection::InfoSpaceItems& items
00135 )
00136 {
00137
00138 for ( stor::MonitorCollection::InfoSpaceItems::const_iterator it = items.begin(),
00139 itEnd = items.end();
00140 it != itEnd;
00141 ++it )
00142 {
00143 try
00144 {
00145
00146 infoSpace_->fireItemAvailable(it->first, it->second);
00147 }
00148 catch(xdata::exception::Exception &e)
00149 {
00150 std::stringstream oss;
00151
00152 oss << "Failed to put " << it->first;
00153 oss << " into info space " << infoSpace_->name();
00154
00155 XCEPT_RETHROW(exception::Monitoring, oss.str(), e);
00156 }
00157
00158
00159 infoSpaceItemNames_.push_back(it->first);
00160 }
00161 }
00162
00163
00164 bool StatisticsReporter::monitorAction(toolbox::task::WorkLoop* wl)
00165 {
00166 stor::utils::sleepUntil(lastMonitorAction_ + monitoringSleepSec_);
00167 lastMonitorAction_ = stor::utils::getCurrentTime();
00168
00169 std::string errorMsg = "Failed to update the monitoring information";
00170
00171 try
00172 {
00173 calculateStatistics();
00174 updateInfoSpace();
00175 }
00176 catch(xcept::Exception &e)
00177 {
00178 LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00179 errorMsg << xcept::stdformat_exception_history(e));
00180
00181 XCEPT_DECLARE_NESTED(exception::Monitoring,
00182 sentinelException, errorMsg, e);
00183 app_->notifyQualified("error", sentinelException);
00184 }
00185 catch(std::exception &e)
00186 {
00187 errorMsg += ": ";
00188 errorMsg += e.what();
00189
00190 LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00191 errorMsg);
00192
00193 XCEPT_DECLARE(exception::Monitoring,
00194 sentinelException, errorMsg);
00195 app_->notifyQualified("error", sentinelException);
00196 }
00197 catch(...)
00198 {
00199 errorMsg += ": Unknown exception";
00200
00201 LOG4CPLUS_ERROR(app_->getApplicationLogger(),
00202 errorMsg);
00203
00204 XCEPT_DECLARE(exception::Monitoring,
00205 sentinelException, errorMsg);
00206 app_->notifyQualified("error", sentinelException);
00207 }
00208
00209 return doMonitoring_;
00210 }
00211
00212
00213 void StatisticsReporter::calculateStatistics()
00214 {
00215 const stor::utils::TimePoint_t now = stor::utils::getCurrentTime();
00216
00217 dataRetrieverMonCollection_.calculateStatistics(now);
00218 dqmEventMonCollection_.calculateStatistics(now);
00219 eventConsumerMonCollection_.calculateStatistics(now);
00220 dqmConsumerMonCollection_.calculateStatistics(now);
00221 }
00222
00223
00224 void StatisticsReporter::updateInfoSpace()
00225 {
00226 std::string errorMsg =
00227 "Failed to update values of items in info space " + infoSpace_->name();
00228
00229
00230 try
00231 {
00232 infoSpace_->lock();
00233
00234 dataRetrieverMonCollection_.updateInfoSpaceItems();
00235 dqmEventMonCollection_.updateInfoSpaceItems();
00236 eventConsumerMonCollection_.updateInfoSpaceItems();
00237 dqmConsumerMonCollection_.updateInfoSpaceItems();
00238
00239 infoSpace_->unlock();
00240 }
00241 catch(std::exception &e)
00242 {
00243 infoSpace_->unlock();
00244
00245 errorMsg += ": ";
00246 errorMsg += e.what();
00247 XCEPT_RAISE(exception::Monitoring, errorMsg);
00248 }
00249 catch (...)
00250 {
00251 infoSpace_->unlock();
00252
00253 errorMsg += " : unknown exception";
00254 XCEPT_RAISE(exception::Monitoring, errorMsg);
00255 }
00256
00257 try
00258 {
00259
00260 infoSpace_->fireItemGroupChanged(infoSpaceItemNames_, this);
00261 }
00262 catch (xdata::exception::Exception &e)
00263 {
00264 XCEPT_RETHROW(exception::Monitoring, errorMsg, e);
00265 }
00266 }
00267
00268
00269 void StatisticsReporter::reset()
00270 {
00271 const stor::utils::TimePoint_t now = stor::utils::getCurrentTime();
00272
00273 dataRetrieverMonCollection_.reset(now);
00274 dqmEventMonCollection_.reset(now);
00275 eventConsumerMonCollection_.reset(now);
00276 dqmConsumerMonCollection_.reset(now);
00277
00278 alarmHandler_->clearAllAlarms();
00279 }
00280
00281
00282 void StatisticsReporter::actionPerformed(xdata::Event& ispaceEvent)
00283 {}
00284
00285 }
00286