CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMTopLevelFolder.cc
Go to the documentation of this file.
1 // $Id: DQMTopLevelFolder.cc,v 1.5 2011/04/04 16:05:37 mommsen Exp $
3 
8 
12 
13 #include "TROOT.h"
14 
15 #include "toolbox/net/Utils.h"
16 
17 #include <sstream>
18 #include <unistd.h>
19 
20 
21 namespace stor {
22 
23  unsigned int DQMTopLevelFolder::sentEvents_(0);
24 
26  (
27  const DQMKey& dqmKey,
28  const QueueIDs& dqmConsumers,
29  const DQMProcessingParams& dqmParams,
30  DQMEventMonitorCollection& dqmEventMonColl,
31  const unsigned int expectedUpdates,
32  AlarmHandlerPtr alarmHandler
33  ) :
34  dqmKey_(dqmKey),
35  dqmConsumers_(dqmConsumers),
36  dqmParams_(dqmParams),
37  dqmEventMonColl_(dqmEventMonColl),
38  expectedUpdates_(expectedUpdates),
39  alarmHandler_(alarmHandler),
40  nUpdates_(0),
41  mergeCount_(0),
42  updateNumber_(0)
43  {
44  gROOT->SetBatch(kTRUE);
45  dqmEventMonColl_.getNumberOfTopLevelFoldersMQ().addSample(1);
46  }
47 
48 
50  {
51  dqmFolders_.clear();
52  }
53 
54 
56  {
57  if ( releaseTag_.empty() ) releaseTag_ = view.releaseTag();
58  // A restarted EP will start counting at 0 again.
59  // Thus, take the maximum of all updates we get.
62  timeStamp_ = view.timeStamp();
63  else
65  mergeCount_ += std::max(1U, view.mergeCount());
66 
67  edm::StreamDQMDeserializer deserializer;
68  std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
69  deserializer.deserializeDQMEvent(view);
70 
71  addEvent(toTablePtr);
72 
73  ++nUpdates_;
74 
76  {
77  std::ostringstream msg;
78  msg << "Received " << nUpdates_
79  << " updates for top level folder " << view.topFolderName()
80  << " and lumi section " << view.lumiSection()
81  << " whereas only " << expectedUpdates_
82  << " updates are expected.";
83  XCEPT_DECLARE(exception::DQMEventProcessing,
84  sentinelException, msg.str());
85  alarmHandler_->notifySentinel(AlarmHandler::ERROR, sentinelException);
86  }
87 
89 
91  static_cast<double>(view.size()) / 0x100000
92  );
93  }
94 
95 
97  {
98  if ( nUpdates_ == 0 ) return false;
99 
100  if ( nUpdates_ == expectedUpdates_ )
101  {
103  return true;
104  }
105 
106  if ( now > lastUpdate_ + dqmParams_.readyTimeDQM_ ) return true;
107 
108  return false;
109  }
110 
111 
112  void DQMTopLevelFolder::addEvent(std::auto_ptr<DQMEvent::TObjectTable> toTablePtr)
113  {
114  for (
115  DQMEvent::TObjectTable::const_iterator it = toTablePtr->begin(),
116  itEnd = toTablePtr->end();
117  it != itEnd;
118  ++it
119  )
120  {
121  const std::string folderName = it->first;
122 
123  DQMFoldersMap::iterator pos = dqmFolders_.lower_bound(folderName);
124  if ( pos == dqmFolders_.end() || (dqmFolders_.key_comp()(folderName, pos->first)) )
125  {
126  pos = dqmFolders_.insert(pos, DQMFoldersMap::value_type(
127  folderName, DQMFolderPtr( new DQMFolder() )
128  ));
129  }
130  pos->second->addObjects(it->second);
131  }
132  }
133 
134 
136  {
137  if ( nUpdates_ == 0 ) return false;
138 
139  record.clear();
141 
142  // Package list of TObjects into a DQMEvent::TObjectTable
144  const size_t folderSize = populateTable(table);
145 
146  edm::StreamDQMSerializer serializer;
147  const size_t sourceSize =
148  serializer.serializeDQMEvent(table,
151 
152  // Add space for header
153  const size_t totalSize =
154  sourceSize
155  + sizeof(DQMEventHeader)
156  + 12*sizeof(uint32_t)
157  + releaseTag_.length()
158  + dqmKey_.topLevelFolderName.length()
159  + folderSize;
160 
161  DQMEventMsgBuilder builder(
162  record.getBuffer(totalSize),
163  totalSize,
165  ++sentEvents_,
166  timeStamp_,
169  (uint32_t)serializer.adler32_chksum(),
170  toolbox::net::getHostName().c_str(),
171  releaseTag_,
173  table
174  );
175  unsigned char* source = serializer.bufferPointer();
176  std::copy(source,source+sourceSize, builder.eventAddress());
177  builder.setEventLength(sourceSize);
179  {
180  // the "compression flag" contains the uncompressed size
181  builder.setCompressionFlag(serializer.currentEventSize());
182  }
183  else
184  {
185  // a size of 0 indicates no compression
186  builder.setCompressionFlag(0);
187  }
188  builder.setMergeCount(mergeCount_);
191  static_cast<double>(record.totalDataSize()) / 0x100000
192  );
193 
194  return true;
195  }
196 
197 
199  {
200  size_t folderSize = 0;
201 
202  for ( DQMFoldersMap::const_iterator it = dqmFolders_.begin(), itEnd = dqmFolders_.end();
203  it != itEnd; ++it )
204  {
205  const std::string folderName = it->first;
206  const DQMFolderPtr folder = it->second;
207 
208  DQMEvent::TObjectTable::iterator pos = table.lower_bound(folderName);
209  if ( pos == table.end() || (table.key_comp()(folderName, pos->first)) )
210  {
211  std::vector<TObject*> newObjectVector;
212  pos = table.insert(pos, DQMEvent::TObjectTable::value_type(folderName, newObjectVector));
213  folderSize += 2*sizeof(uint32_t) + folderName.length();
214  }
215  folder->fillObjectVector(pos->second);
216  }
217  return folderSize;
218  }
219 
220 } // namespace stor
221 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
list table
Definition: asciidump.py:386
uint32_t adler32_chksum() const
void addDQMEvent(const DQMEventMsgView &)
size_t populateTable(DQMEvent::TObjectTable &) const
bool isReady(const utils::TimePoint_t &now) const
JetCorrectorParameters::Record record
Definition: classes.h:13
std::vector< QueueID > QueueIDs
Definition: QueueID.h:80
void addSample(const double &value=1)
void * getBuffer(size_t size) const
#define min(a, b)
Definition: mlp_lapack.h:161
static unsigned int sentEvents_
int serializeDQMEvent(DQMEvent::TObjectTable &toTable, bool use_compression, int compression_level)
uint32 updateNumber() const
unsigned int currentEventSize() const
utils::Duration_t readyTimeDQM_
Definition: Configuration.h:63
std::string topFolderName() const
std::auto_ptr< DQMEvent::TObjectTable > deserializeDQMEvent(DQMEventMsgView const &dqmEventView)
void addEvent(std::auto_ptr< DQMEvent::TObjectTable >)
std::string topLevelFolderName
Definition: DQMKey.h:25
const unsigned int expectedUpdates_
const T & max(const T &a, const T &b)
const DQMProcessingParams dqmParams_
const MonitoredQuantity & getNumberOfUpdatesMQ() const
utils::TimePoint_t lastUpdate_
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
unsigned char * bufferPointer() const
Container::value_type value_type
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
Definition: AlarmHandler.h:116
const MonitoredQuantity & getDQMEventSizeMQ() const
DQMTopLevelFolder(const DQMKey &, const QueueIDs &, const DQMProcessingParams &, DQMEventMonitorCollection &, const unsigned int expectedUpdates, AlarmHandlerPtr)
const MonitoredQuantity & getNumberOfCompleteUpdatesMQ() const
std::string releaseTag() const
static Timestamp const & invalidTimestamp()
Definition: Timestamp.cc:83
uint32_t runNumber
Definition: DQMKey.h:23
unsigned long totalDataSize() const
DQMEventMonitorCollection & dqmEventMonColl_
uint32 lumiSection() const
boost::shared_ptr< DQMFolder > DQMFolderPtr
const MonitoredQuantity & getServedDQMEventSizeMQ() const
uint32_t lumiSection
Definition: DQMKey.h:24
std::map< std::string, std::vector< TObject * > > TObjectTable
uint32 size() const
AlarmHandlerPtr alarmHandler_
void tagForEventConsumers(const QueueIDs &ids)
edm::Timestamp timeStamp() const
uint32 mergeCount() const