Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/DQMEventMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DQMTopLevelFolder.h"
00006 #include "EventFilter/StorageManager/interface/QueueID.h"
00007 #include "EventFilter/StorageManager/interface/SharedResources.h"
00008
00009 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00010 #include "IOPool/Streamer/interface/StreamDQMDeserializer.h"
00011 #include "IOPool/Streamer/interface/StreamDQMSerializer.h"
00012
00013 #include "TROOT.h"
00014
00015 #include "toolbox/net/Utils.h"
00016
00017 #include <sstream>
00018 #include <unistd.h>
00019
00020
00021 namespace stor {
00022
00023 unsigned int DQMTopLevelFolder::sentEvents_(0);
00024
00025 DQMTopLevelFolder::DQMTopLevelFolder
00026 (
00027 const DQMKey& dqmKey,
00028 const QueueIDs& dqmConsumers,
00029 const DQMProcessingParams& dqmParams,
00030 DQMEventMonitorCollection& dqmEventMonColl,
00031 const unsigned int expectedUpdates,
00032 AlarmHandlerPtr alarmHandler
00033 ) :
00034 dqmKey_(dqmKey),
00035 dqmConsumers_(dqmConsumers),
00036 dqmParams_(dqmParams),
00037 dqmEventMonColl_(dqmEventMonColl),
00038 expectedUpdates_(expectedUpdates),
00039 alarmHandler_(alarmHandler),
00040 nUpdates_(0),
00041 updateNumber_(0)
00042 {
00043 gROOT->SetBatch(kTRUE);
00044 dqmEventMonColl_.getNumberOfTopLevelFoldersMQ().addSample(1);
00045 }
00046
00047
00048 DQMTopLevelFolder::~DQMTopLevelFolder()
00049 {
00050 dqmFolders_.clear();
00051 }
00052
00053
00054 void DQMTopLevelFolder::addDQMEvent(const DQMEventMsgView& view)
00055 {
00056 if ( releaseTag_.empty() ) releaseTag_ = view.releaseTag();
00057
00058
00059 updateNumber_ = std::max(updateNumber_, view.updateNumber());
00060 if ( timeStamp_ == edm::Timestamp::invalidTimestamp() )
00061 timeStamp_ = view.timeStamp();
00062 else
00063 timeStamp_ = std::min(timeStamp_, view.timeStamp());
00064
00065 edm::StreamDQMDeserializer deserializer;
00066 std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00067 deserializer.deserializeDQMEvent(view);
00068
00069 addEvent(toTablePtr);
00070
00071 ++nUpdates_;
00072
00073 if (nUpdates_ > expectedUpdates_)
00074 {
00075 std::ostringstream msg;
00076 msg << "Received " << nUpdates_
00077 << " updates for top level folder " << view.topFolderName()
00078 << " and lumi section " << view.lumiSection()
00079 << " whereas only " << expectedUpdates_
00080 << " updates are expected.";
00081 XCEPT_DECLARE(exception::DQMEventProcessing,
00082 sentinelException, msg.str());
00083 alarmHandler_->notifySentinel(AlarmHandler::ERROR, sentinelException);
00084 }
00085
00086 lastUpdate_ = utils::getCurrentTime();
00087
00088 dqmEventMonColl_.getDQMEventSizeMQ().addSample(
00089 static_cast<double>(view.size()) / 0x100000
00090 );
00091 }
00092
00093
00094 bool DQMTopLevelFolder::isReady(const utils::TimePoint_t& now) const
00095 {
00096 if ( nUpdates_ == 0 ) return false;
00097
00098 if ( nUpdates_ == expectedUpdates_ )
00099 {
00100 dqmEventMonColl_.getNumberOfCompleteUpdatesMQ().addSample(1);
00101 return true;
00102 }
00103
00104 if ( now > lastUpdate_ + dqmParams_.readyTimeDQM_ ) return true;
00105
00106 return false;
00107 }
00108
00109
00110 void DQMTopLevelFolder::addEvent(std::auto_ptr<DQMEvent::TObjectTable> toTablePtr)
00111 {
00112 for (
00113 DQMEvent::TObjectTable::const_iterator it = toTablePtr->begin(),
00114 itEnd = toTablePtr->end();
00115 it != itEnd;
00116 ++it
00117 )
00118 {
00119 const std::string folderName = it->first;
00120
00121 DQMFoldersMap::iterator pos = dqmFolders_.lower_bound(folderName);
00122 if ( pos == dqmFolders_.end() || (dqmFolders_.key_comp()(folderName, pos->first)) )
00123 {
00124 pos = dqmFolders_.insert(pos, DQMFoldersMap::value_type(
00125 folderName, DQMFolderPtr( new DQMFolder() )
00126 ));
00127 }
00128 pos->second->addObjects(it->second);
00129 }
00130 }
00131
00132
00133 bool DQMTopLevelFolder::getRecord(DQMTopLevelFolder::Record& record)
00134 {
00135 if ( nUpdates_ == 0 ) return false;
00136
00137 record.clear();
00138 record.tagForEventConsumers(dqmConsumers_);
00139
00140
00141 DQMEvent::TObjectTable table;
00142 const size_t folderSize = populateTable(table);
00143
00144 edm::StreamDQMSerializer serializer;
00145 const size_t sourceSize =
00146 serializer.serializeDQMEvent(table,
00147 dqmParams_.useCompressionDQM_,
00148 dqmParams_.compressionLevelDQM_);
00149
00150
00151 const size_t totalSize =
00152 sourceSize
00153 + sizeof(DQMEventHeader)
00154 + 12*sizeof(uint32_t)
00155 + releaseTag_.length()
00156 + dqmKey_.topLevelFolderName.length()
00157 + folderSize;
00158
00159 DQMEventMsgBuilder builder(
00160 record.getBuffer(totalSize),
00161 totalSize,
00162 dqmKey_.runNumber,
00163 ++sentEvents_,
00164 timeStamp_,
00165 dqmKey_.lumiSection,
00166 updateNumber_,
00167 (uint32_t)serializer.adler32_chksum(),
00168 toolbox::net::getHostName().c_str(),
00169 releaseTag_,
00170 dqmKey_.topLevelFolderName,
00171 table
00172 );
00173 unsigned char* source = serializer.bufferPointer();
00174 std::copy(source,source+sourceSize, builder.eventAddress());
00175 builder.setEventLength(sourceSize);
00176 if ( dqmParams_.useCompressionDQM_ )
00177 {
00178
00179 builder.setCompressionFlag(serializer.currentEventSize());
00180 }
00181 else
00182 {
00183
00184 builder.setCompressionFlag(0);
00185 }
00186
00187 dqmEventMonColl_.getNumberOfUpdatesMQ().addSample(nUpdates_);
00188 dqmEventMonColl_.getServedDQMEventSizeMQ().addSample(
00189 static_cast<double>(record.totalDataSize()) / 0x100000
00190 );
00191
00192 return true;
00193 }
00194
00195
00196 size_t DQMTopLevelFolder::populateTable(DQMEvent::TObjectTable& table) const
00197 {
00198 size_t folderSize = 0;
00199
00200 for ( DQMFoldersMap::const_iterator it = dqmFolders_.begin(), itEnd = dqmFolders_.end();
00201 it != itEnd; ++it )
00202 {
00203 const std::string folderName = it->first;
00204 const DQMFolderPtr folder = it->second;
00205
00206 DQMEvent::TObjectTable::iterator pos = table.lower_bound(folderName);
00207 if ( pos == table.end() || (table.key_comp()(folderName, pos->first)) )
00208 {
00209 std::vector<TObject*> newObjectVector;
00210 pos = table.insert(pos, DQMEvent::TObjectTable::value_type(folderName, newObjectVector));
00211 folderSize += 2*sizeof(uint32_t) + folderName.length();
00212 }
00213 folder->fillObjectVector(pos->second);
00214 }
00215 return folderSize;
00216 }
00217
00218 }
00219