CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/EventFilter/StorageManager/src/DQMTopLevelFolder.cc

Go to the documentation of this file.
00001 // $Id: DQMTopLevelFolder.cc,v 1.5 2011/04/04 16:05:37 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/DQMEventMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DQMTopLevelFolder.h"
00006 #include "EventFilter/StorageManager/interface/QueueID.h"
00007 #include "EventFilter/StorageManager/interface/SharedResources.h"
00008 
00009 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00010 #include "IOPool/Streamer/interface/StreamDQMDeserializer.h"
00011 #include "IOPool/Streamer/interface/StreamDQMSerializer.h"
00012 
00013 #include "TROOT.h"
00014 
00015 #include "toolbox/net/Utils.h"
00016 
00017 #include <sstream>
00018 #include <unistd.h>
00019 
00020 
00021 namespace stor {
00022   
00023   unsigned int DQMTopLevelFolder::sentEvents_(0);
00024   
00025   DQMTopLevelFolder::DQMTopLevelFolder
00026   (
00027     const DQMKey& dqmKey,
00028     const QueueIDs& dqmConsumers,
00029     const DQMProcessingParams& dqmParams,
00030     DQMEventMonitorCollection& dqmEventMonColl,
00031     const unsigned int expectedUpdates,
00032     AlarmHandlerPtr alarmHandler
00033   ) :
00034   dqmKey_(dqmKey),
00035   dqmConsumers_(dqmConsumers),
00036   dqmParams_(dqmParams),
00037   dqmEventMonColl_(dqmEventMonColl),
00038   expectedUpdates_(expectedUpdates),
00039   alarmHandler_(alarmHandler),
00040   nUpdates_(0),
00041   mergeCount_(0),
00042   updateNumber_(0)
00043   {
00044     gROOT->SetBatch(kTRUE);
00045     dqmEventMonColl_.getNumberOfTopLevelFoldersMQ().addSample(1);
00046   }
00047   
00048   
00049   DQMTopLevelFolder::~DQMTopLevelFolder()
00050   {
00051     dqmFolders_.clear();
00052   }
00053   
00054   
00055   void DQMTopLevelFolder::addDQMEvent(const DQMEventMsgView& view)
00056   {
00057     if ( releaseTag_.empty() ) releaseTag_ = view.releaseTag();
00058     // A restarted EP will start counting at 0 again.
00059     // Thus, take the maximum of all updates we get.
00060     updateNumber_ = std::max(updateNumber_, view.updateNumber());
00061     if ( timeStamp_ == edm::Timestamp::invalidTimestamp() )
00062       timeStamp_ = view.timeStamp();
00063     else
00064       timeStamp_ = std::min(timeStamp_, view.timeStamp());
00065     mergeCount_ += std::max(1U, view.mergeCount());
00066 
00067     edm::StreamDQMDeserializer deserializer;
00068     std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00069       deserializer.deserializeDQMEvent(view);
00070     
00071     addEvent(toTablePtr);
00072     
00073     ++nUpdates_;
00074 
00075     if (nUpdates_ > expectedUpdates_)
00076     {
00077       std::ostringstream msg;
00078       msg << "Received " << nUpdates_
00079         << " updates for top level folder " << view.topFolderName()
00080         << " and lumi section " << view.lumiSection()
00081         << " whereas only " << expectedUpdates_
00082         << " updates are expected.";
00083       XCEPT_DECLARE(exception::DQMEventProcessing,
00084         sentinelException, msg.str());
00085       alarmHandler_->notifySentinel(AlarmHandler::ERROR, sentinelException);
00086     }
00087 
00088     lastUpdate_ = utils::getCurrentTime();
00089     
00090     dqmEventMonColl_.getDQMEventSizeMQ().addSample(
00091       static_cast<double>(view.size()) / 0x100000
00092     );
00093   }
00094   
00095   
00096   bool DQMTopLevelFolder::isReady(const utils::TimePoint_t& now) const
00097   {
00098     if ( nUpdates_ == 0 ) return false;
00099     
00100     if ( nUpdates_ == expectedUpdates_ )
00101     {
00102       dqmEventMonColl_.getNumberOfCompleteUpdatesMQ().addSample(1);
00103       return true;
00104     }
00105     
00106     if ( now > lastUpdate_ + dqmParams_.readyTimeDQM_ ) return true;
00107     
00108     return false;
00109   }
00110   
00111   
00112   void DQMTopLevelFolder::addEvent(std::auto_ptr<DQMEvent::TObjectTable> toTablePtr)
00113   {
00114     for (
00115       DQMEvent::TObjectTable::const_iterator it = toTablePtr->begin(),
00116         itEnd = toTablePtr->end();
00117       it != itEnd; 
00118       ++it
00119     ) 
00120     {
00121       const std::string folderName = it->first;
00122       
00123       DQMFoldersMap::iterator pos = dqmFolders_.lower_bound(folderName);
00124       if ( pos == dqmFolders_.end() || (dqmFolders_.key_comp()(folderName, pos->first)) )
00125       {
00126         pos = dqmFolders_.insert(pos, DQMFoldersMap::value_type(
00127             folderName, DQMFolderPtr( new DQMFolder() )
00128           ));
00129       }
00130       pos->second->addObjects(it->second);
00131     }
00132   }
00133   
00134   
00135   bool DQMTopLevelFolder::getRecord(DQMTopLevelFolder::Record& record)
00136   {
00137     if ( nUpdates_ == 0 ) return false;
00138     
00139     record.clear();
00140     record.tagForEventConsumers(dqmConsumers_);
00141     
00142     // Package list of TObjects into a DQMEvent::TObjectTable
00143     DQMEvent::TObjectTable table;
00144     const size_t folderSize = populateTable(table);
00145     
00146     edm::StreamDQMSerializer serializer;
00147     const size_t sourceSize =
00148       serializer.serializeDQMEvent(table,
00149         dqmParams_.useCompressionDQM_,
00150         dqmParams_.compressionLevelDQM_);
00151     
00152     // Add space for header
00153     const size_t totalSize =
00154       sourceSize
00155       + sizeof(DQMEventHeader)
00156       + 12*sizeof(uint32_t)
00157       + releaseTag_.length()
00158       + dqmKey_.topLevelFolderName.length()
00159     + folderSize;
00160     
00161     DQMEventMsgBuilder builder(
00162       record.getBuffer(totalSize),
00163       totalSize,
00164       dqmKey_.runNumber,
00165       ++sentEvents_,
00166       timeStamp_,
00167       dqmKey_.lumiSection,
00168       updateNumber_,
00169       (uint32_t)serializer.adler32_chksum(),
00170       toolbox::net::getHostName().c_str(),
00171       releaseTag_,
00172       dqmKey_.topLevelFolderName,
00173       table
00174     ); 
00175     unsigned char* source = serializer.bufferPointer();
00176     std::copy(source,source+sourceSize, builder.eventAddress());
00177     builder.setEventLength(sourceSize);
00178     if ( dqmParams_.useCompressionDQM_ ) 
00179     {
00180       // the "compression flag" contains the uncompressed size
00181       builder.setCompressionFlag(serializer.currentEventSize());
00182     }
00183     else
00184     {
00185       // a size of 0 indicates no compression
00186       builder.setCompressionFlag(0);
00187     }
00188     builder.setMergeCount(mergeCount_);
00189     dqmEventMonColl_.getNumberOfUpdatesMQ().addSample(nUpdates_);
00190     dqmEventMonColl_.getServedDQMEventSizeMQ().addSample(
00191       static_cast<double>(record.totalDataSize()) / 0x100000
00192     );
00193     
00194     return true;
00195   }
00196   
00197   
00198   size_t DQMTopLevelFolder::populateTable(DQMEvent::TObjectTable& table) const
00199   {
00200     size_t folderSize = 0;
00201     
00202     for ( DQMFoldersMap::const_iterator it = dqmFolders_.begin(), itEnd = dqmFolders_.end();
00203           it != itEnd; ++it )
00204     {
00205       const std::string folderName = it->first;
00206       const DQMFolderPtr folder = it->second;
00207       
00208       DQMEvent::TObjectTable::iterator pos = table.lower_bound(folderName);
00209       if ( pos == table.end() || (table.key_comp()(folderName, pos->first)) )
00210       {
00211         std::vector<TObject*> newObjectVector;
00212         pos = table.insert(pos, DQMEvent::TObjectTable::value_type(folderName, newObjectVector));
00213         folderSize += 2*sizeof(uint32_t) + folderName.length();
00214       }
00215       folder->fillObjectVector(pos->second);
00216     }
00217     return folderSize;
00218   }
00219   
00220 } // namespace stor
00221