CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/StorageManager/src/DQMTopLevelFolder.cc

Go to the documentation of this file.
00001 // $Id: DQMTopLevelFolder.cc,v 1.1.4.4 2011/04/04 12:14:30 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/DQMEventMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DQMTopLevelFolder.h"
00006 #include "EventFilter/StorageManager/interface/QueueID.h"
00007 #include "EventFilter/StorageManager/interface/SharedResources.h"
00008 
00009 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00010 #include "IOPool/Streamer/interface/StreamDQMDeserializer.h"
00011 #include "IOPool/Streamer/interface/StreamDQMSerializer.h"
00012 
00013 #include "TROOT.h"
00014 
00015 #include "toolbox/net/Utils.h"
00016 
00017 #include <sstream>
00018 #include <unistd.h>
00019 
00020 
00021 namespace stor {
00022   
00023   unsigned int DQMTopLevelFolder::sentEvents_(0);
00024   
00025   DQMTopLevelFolder::DQMTopLevelFolder
00026   (
00027     const DQMKey& dqmKey,
00028     const QueueIDs& dqmConsumers,
00029     const DQMProcessingParams& dqmParams,
00030     DQMEventMonitorCollection& dqmEventMonColl,
00031     const unsigned int expectedUpdates,
00032     AlarmHandlerPtr alarmHandler
00033   ) :
00034   dqmKey_(dqmKey),
00035   dqmConsumers_(dqmConsumers),
00036   dqmParams_(dqmParams),
00037   dqmEventMonColl_(dqmEventMonColl),
00038   expectedUpdates_(expectedUpdates),
00039   alarmHandler_(alarmHandler),
00040   nUpdates_(0),
00041   updateNumber_(0)
00042   {
00043     gROOT->SetBatch(kTRUE);
00044     dqmEventMonColl_.getNumberOfTopLevelFoldersMQ().addSample(1);
00045   }
00046   
00047   
00048   DQMTopLevelFolder::~DQMTopLevelFolder()
00049   {
00050     dqmFolders_.clear();
00051   }
00052   
00053   
00054   void DQMTopLevelFolder::addDQMEvent(const DQMEventMsgView& view)
00055   {
00056     if ( releaseTag_.empty() ) releaseTag_ = view.releaseTag();
00057     // A restarted EP will start counting at 0 again.
00058     // Thus, take the maximum of all updates we get.
00059     updateNumber_ = std::max(updateNumber_, view.updateNumber());
00060     if ( timeStamp_ == edm::Timestamp::invalidTimestamp() )
00061       timeStamp_ = view.timeStamp();
00062     else
00063       timeStamp_ = std::min(timeStamp_, view.timeStamp());
00064 
00065     edm::StreamDQMDeserializer deserializer;
00066     std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00067       deserializer.deserializeDQMEvent(view);
00068     
00069     addEvent(toTablePtr);
00070     
00071     ++nUpdates_;
00072 
00073     if (nUpdates_ > expectedUpdates_)
00074     {
00075       std::ostringstream msg;
00076       msg << "Received " << nUpdates_
00077         << " updates for top level folder " << view.topFolderName()
00078         << " and lumi section " << view.lumiSection()
00079         << " whereas only " << expectedUpdates_
00080         << " updates are expected.";
00081       XCEPT_DECLARE(exception::DQMEventProcessing,
00082         sentinelException, msg.str());
00083       alarmHandler_->notifySentinel(AlarmHandler::ERROR, sentinelException);
00084     }
00085 
00086     lastUpdate_ = utils::getCurrentTime();
00087     
00088     dqmEventMonColl_.getDQMEventSizeMQ().addSample(
00089       static_cast<double>(view.size()) / 0x100000
00090     );
00091   }
00092   
00093   
00094   bool DQMTopLevelFolder::isReady(const utils::TimePoint_t& now) const
00095   {
00096     if ( nUpdates_ == 0 ) return false;
00097     
00098     if ( nUpdates_ == expectedUpdates_ )
00099     {
00100       dqmEventMonColl_.getNumberOfCompleteUpdatesMQ().addSample(1);
00101       return true;
00102     }
00103     
00104     if ( now > lastUpdate_ + dqmParams_.readyTimeDQM_ ) return true;
00105     
00106     return false;
00107   }
00108   
00109   
00110   void DQMTopLevelFolder::addEvent(std::auto_ptr<DQMEvent::TObjectTable> toTablePtr)
00111   {
00112     for (
00113       DQMEvent::TObjectTable::const_iterator it = toTablePtr->begin(),
00114         itEnd = toTablePtr->end();
00115       it != itEnd; 
00116       ++it
00117     ) 
00118     {
00119       const std::string folderName = it->first;
00120       
00121       DQMFoldersMap::iterator pos = dqmFolders_.lower_bound(folderName);
00122       if ( pos == dqmFolders_.end() || (dqmFolders_.key_comp()(folderName, pos->first)) )
00123       {
00124         pos = dqmFolders_.insert(pos, DQMFoldersMap::value_type(
00125             folderName, DQMFolderPtr( new DQMFolder() )
00126           ));
00127       }
00128       pos->second->addObjects(it->second);
00129     }
00130   }
00131   
00132   
00133   bool DQMTopLevelFolder::getRecord(DQMTopLevelFolder::Record& record)
00134   {
00135     if ( nUpdates_ == 0 ) return false;
00136     
00137     record.clear();
00138     record.tagForEventConsumers(dqmConsumers_);
00139     
00140     // Package list of TObjects into a DQMEvent::TObjectTable
00141     DQMEvent::TObjectTable table;
00142     const size_t folderSize = populateTable(table);
00143     
00144     edm::StreamDQMSerializer serializer;
00145     const size_t sourceSize =
00146       serializer.serializeDQMEvent(table,
00147         dqmParams_.useCompressionDQM_,
00148         dqmParams_.compressionLevelDQM_);
00149     
00150     // Add space for header
00151     const size_t totalSize =
00152       sourceSize
00153       + sizeof(DQMEventHeader)
00154       + 12*sizeof(uint32_t)
00155       + releaseTag_.length()
00156       + dqmKey_.topLevelFolderName.length()
00157     + folderSize;
00158     
00159     DQMEventMsgBuilder builder(
00160       record.getBuffer(totalSize),
00161       totalSize,
00162       dqmKey_.runNumber,
00163       ++sentEvents_,
00164       timeStamp_,
00165       dqmKey_.lumiSection,
00166       updateNumber_,
00167       (uint32_t)serializer.adler32_chksum(),
00168       toolbox::net::getHostName().c_str(),
00169       releaseTag_,
00170       dqmKey_.topLevelFolderName,
00171       table
00172     ); 
00173     unsigned char* source = serializer.bufferPointer();
00174     std::copy(source,source+sourceSize, builder.eventAddress());
00175     builder.setEventLength(sourceSize);
00176     if ( dqmParams_.useCompressionDQM_ ) 
00177     {
00178       // the "compression flag" contains the uncompressed size
00179       builder.setCompressionFlag(serializer.currentEventSize());
00180     }
00181     else
00182     {
00183       // a size of 0 indicates no compression
00184       builder.setCompressionFlag(0);
00185     }
00186     
00187     dqmEventMonColl_.getNumberOfUpdatesMQ().addSample(nUpdates_);
00188     dqmEventMonColl_.getServedDQMEventSizeMQ().addSample(
00189       static_cast<double>(record.totalDataSize()) / 0x100000
00190     );
00191     
00192     return true;
00193   }
00194   
00195   
00196   size_t DQMTopLevelFolder::populateTable(DQMEvent::TObjectTable& table) const
00197   {
00198     size_t folderSize = 0;
00199     
00200     for ( DQMFoldersMap::const_iterator it = dqmFolders_.begin(), itEnd = dqmFolders_.end();
00201           it != itEnd; ++it )
00202     {
00203       const std::string folderName = it->first;
00204       const DQMFolderPtr folder = it->second;
00205       
00206       DQMEvent::TObjectTable::iterator pos = table.lower_bound(folderName);
00207       if ( pos == table.end() || (table.key_comp()(folderName, pos->first)) )
00208       {
00209         std::vector<TObject*> newObjectVector;
00210         pos = table.insert(pos, DQMEvent::TObjectTable::value_type(folderName, newObjectVector));
00211         folderSize += 2*sizeof(uint32_t) + folderName.length();
00212       }
00213       folder->fillObjectVector(pos->second);
00214     }
00215     return folderSize;
00216   }
00217   
00218 } // namespace stor
00219