Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/DQMEventMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DQMTopLevelFolder.h"
00006 #include "EventFilter/StorageManager/interface/QueueID.h"
00007 #include "EventFilter/StorageManager/interface/SharedResources.h"
00008
00009 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00010 #include "IOPool/Streamer/interface/StreamDQMDeserializer.h"
00011 #include "IOPool/Streamer/interface/StreamDQMSerializer.h"
00012
00013 #include "TROOT.h"
00014
00015 #include "toolbox/net/Utils.h"
00016
00017 #include <sstream>
00018 #include <unistd.h>
00019
00020
00021 namespace stor {
00022
00023 unsigned int DQMTopLevelFolder::sentEvents_(0);
00024
00025 DQMTopLevelFolder::DQMTopLevelFolder
00026 (
00027 const DQMKey& dqmKey,
00028 const QueueIDs& dqmConsumers,
00029 const DQMProcessingParams& dqmParams,
00030 DQMEventMonitorCollection& dqmEventMonColl,
00031 const unsigned int expectedUpdates,
00032 AlarmHandlerPtr alarmHandler
00033 ) :
00034 dqmKey_(dqmKey),
00035 dqmConsumers_(dqmConsumers),
00036 dqmParams_(dqmParams),
00037 dqmEventMonColl_(dqmEventMonColl),
00038 expectedUpdates_(expectedUpdates),
00039 alarmHandler_(alarmHandler),
00040 nUpdates_(0),
00041 mergeCount_(0),
00042 updateNumber_(0)
00043 {
00044 gROOT->SetBatch(kTRUE);
00045 dqmEventMonColl_.getNumberOfTopLevelFoldersMQ().addSample(1);
00046 }
00047
00048
00049 DQMTopLevelFolder::~DQMTopLevelFolder()
00050 {
00051 dqmFolders_.clear();
00052 }
00053
00054
00055 void DQMTopLevelFolder::addDQMEvent(const DQMEventMsgView& view)
00056 {
00057 if ( releaseTag_.empty() ) releaseTag_ = view.releaseTag();
00058
00059
00060 updateNumber_ = std::max(updateNumber_, view.updateNumber());
00061 if ( timeStamp_ == edm::Timestamp::invalidTimestamp() )
00062 timeStamp_ = view.timeStamp();
00063 else
00064 timeStamp_ = std::min(timeStamp_, view.timeStamp());
00065 mergeCount_ += std::max(1U, view.mergeCount());
00066
00067 edm::StreamDQMDeserializer deserializer;
00068 std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00069 deserializer.deserializeDQMEvent(view);
00070
00071 addEvent(toTablePtr);
00072
00073 ++nUpdates_;
00074
00075 if (nUpdates_ > expectedUpdates_)
00076 {
00077 std::ostringstream msg;
00078 msg << "Received " << nUpdates_
00079 << " updates for top level folder " << view.topFolderName()
00080 << " and lumi section " << view.lumiSection()
00081 << " whereas only " << expectedUpdates_
00082 << " updates are expected.";
00083 XCEPT_DECLARE(exception::DQMEventProcessing,
00084 sentinelException, msg.str());
00085 alarmHandler_->notifySentinel(AlarmHandler::ERROR, sentinelException);
00086 }
00087
00088 lastUpdate_ = utils::getCurrentTime();
00089
00090 dqmEventMonColl_.getDQMEventSizeMQ().addSample(
00091 static_cast<double>(view.size()) / 0x100000
00092 );
00093 }
00094
00095
00096 bool DQMTopLevelFolder::isReady(const utils::TimePoint_t& now) const
00097 {
00098 if ( nUpdates_ == 0 ) return false;
00099
00100 if ( nUpdates_ == expectedUpdates_ )
00101 {
00102 dqmEventMonColl_.getNumberOfCompleteUpdatesMQ().addSample(1);
00103 return true;
00104 }
00105
00106 if ( now > lastUpdate_ + dqmParams_.readyTimeDQM_ ) return true;
00107
00108 return false;
00109 }
00110
00111
00112 void DQMTopLevelFolder::addEvent(std::auto_ptr<DQMEvent::TObjectTable> toTablePtr)
00113 {
00114 for (
00115 DQMEvent::TObjectTable::const_iterator it = toTablePtr->begin(),
00116 itEnd = toTablePtr->end();
00117 it != itEnd;
00118 ++it
00119 )
00120 {
00121 const std::string folderName = it->first;
00122
00123 DQMFoldersMap::iterator pos = dqmFolders_.lower_bound(folderName);
00124 if ( pos == dqmFolders_.end() || (dqmFolders_.key_comp()(folderName, pos->first)) )
00125 {
00126 pos = dqmFolders_.insert(pos, DQMFoldersMap::value_type(
00127 folderName, DQMFolderPtr( new DQMFolder() )
00128 ));
00129 }
00130 pos->second->addObjects(it->second);
00131 }
00132 }
00133
00134
00135 bool DQMTopLevelFolder::getRecord(DQMTopLevelFolder::Record& record)
00136 {
00137 if ( nUpdates_ == 0 ) return false;
00138
00139 record.clear();
00140 record.tagForEventConsumers(dqmConsumers_);
00141
00142
00143 DQMEvent::TObjectTable table;
00144 const size_t folderSize = populateTable(table);
00145
00146 edm::StreamDQMSerializer serializer;
00147 const size_t sourceSize =
00148 serializer.serializeDQMEvent(table,
00149 dqmParams_.useCompressionDQM_,
00150 dqmParams_.compressionLevelDQM_);
00151
00152
00153 const size_t totalSize =
00154 sourceSize
00155 + sizeof(DQMEventHeader)
00156 + 12*sizeof(uint32_t)
00157 + releaseTag_.length()
00158 + dqmKey_.topLevelFolderName.length()
00159 + folderSize;
00160
00161 DQMEventMsgBuilder builder(
00162 record.getBuffer(totalSize),
00163 totalSize,
00164 dqmKey_.runNumber,
00165 ++sentEvents_,
00166 timeStamp_,
00167 dqmKey_.lumiSection,
00168 updateNumber_,
00169 (uint32_t)serializer.adler32_chksum(),
00170 toolbox::net::getHostName().c_str(),
00171 releaseTag_,
00172 dqmKey_.topLevelFolderName,
00173 table
00174 );
00175 unsigned char* source = serializer.bufferPointer();
00176 std::copy(source,source+sourceSize, builder.eventAddress());
00177 builder.setEventLength(sourceSize);
00178 if ( dqmParams_.useCompressionDQM_ )
00179 {
00180
00181 builder.setCompressionFlag(serializer.currentEventSize());
00182 }
00183 else
00184 {
00185
00186 builder.setCompressionFlag(0);
00187 }
00188 builder.setMergeCount(mergeCount_);
00189 dqmEventMonColl_.getNumberOfUpdatesMQ().addSample(nUpdates_);
00190 dqmEventMonColl_.getServedDQMEventSizeMQ().addSample(
00191 static_cast<double>(record.totalDataSize()) / 0x100000
00192 );
00193
00194 return true;
00195 }
00196
00197
00198 size_t DQMTopLevelFolder::populateTable(DQMEvent::TObjectTable& table) const
00199 {
00200 size_t folderSize = 0;
00201
00202 for ( DQMFoldersMap::const_iterator it = dqmFolders_.begin(), itEnd = dqmFolders_.end();
00203 it != itEnd; ++it )
00204 {
00205 const std::string folderName = it->first;
00206 const DQMFolderPtr folder = it->second;
00207
00208 DQMEvent::TObjectTable::iterator pos = table.lower_bound(folderName);
00209 if ( pos == table.end() || (table.key_comp()(folderName, pos->first)) )
00210 {
00211 std::vector<TObject*> newObjectVector;
00212 pos = table.insert(pos, DQMEvent::TObjectTable::value_type(folderName, newObjectVector));
00213 folderSize += 2*sizeof(uint32_t) + folderName.length();
00214 }
00215 folder->fillObjectVector(pos->second);
00216 }
00217 return folderSize;
00218 }
00219
00220 }
00221