CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMTopLevelFolder.cc
Go to the documentation of this file.
1 // $Id: DQMTopLevelFolder.cc,v 1.4 2011/04/04 12:03:30 mommsen Exp $
3 
8 
12 
13 #include "TROOT.h"
14 
15 #include "toolbox/net/Utils.h"
16 
17 #include <sstream>
18 #include <unistd.h>
19 
20 
21 namespace stor {
22 
23  unsigned int DQMTopLevelFolder::sentEvents_(0);
24 
26  (
27  const DQMKey& dqmKey,
28  const QueueIDs& dqmConsumers,
29  const DQMProcessingParams& dqmParams,
30  DQMEventMonitorCollection& dqmEventMonColl,
31  const unsigned int expectedUpdates,
32  AlarmHandlerPtr alarmHandler
33  ) :
34  dqmKey_(dqmKey),
35  dqmConsumers_(dqmConsumers),
36  dqmParams_(dqmParams),
37  dqmEventMonColl_(dqmEventMonColl),
38  expectedUpdates_(expectedUpdates),
39  alarmHandler_(alarmHandler),
40  nUpdates_(0),
41  updateNumber_(0)
42  {
43  gROOT->SetBatch(kTRUE);
44  dqmEventMonColl_.getNumberOfTopLevelFoldersMQ().addSample(1);
45  }
46 
47 
49  {
50  dqmFolders_.clear();
51  }
52 
53 
55  {
56  if ( releaseTag_.empty() ) releaseTag_ = view.releaseTag();
57  // A restarted EP will start counting at 0 again.
58  // Thus, take the maximum of all updates we get.
61  timeStamp_ = view.timeStamp();
62  else
64 
65  edm::StreamDQMDeserializer deserializer;
66  std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
67  deserializer.deserializeDQMEvent(view);
68 
69  addEvent(toTablePtr);
70 
71  ++nUpdates_;
72 
74  {
75  std::ostringstream msg;
76  msg << "Received " << nUpdates_
77  << " updates for top level folder " << view.topFolderName()
78  << " and lumi section " << view.lumiSection()
79  << " whereas only " << expectedUpdates_
80  << " updates are expected.";
81  XCEPT_DECLARE(exception::DQMEventProcessing,
82  sentinelException, msg.str());
83  alarmHandler_->notifySentinel(AlarmHandler::ERROR, sentinelException);
84  }
85 
87 
89  static_cast<double>(view.size()) / 0x100000
90  );
91  }
92 
93 
95  {
96  if ( nUpdates_ == 0 ) return false;
97 
98  if ( nUpdates_ == expectedUpdates_ )
99  {
101  return true;
102  }
103 
104  if ( now > lastUpdate_ + dqmParams_.readyTimeDQM_ ) return true;
105 
106  return false;
107  }
108 
109 
110  void DQMTopLevelFolder::addEvent(std::auto_ptr<DQMEvent::TObjectTable> toTablePtr)
111  {
112  for (
113  DQMEvent::TObjectTable::const_iterator it = toTablePtr->begin(),
114  itEnd = toTablePtr->end();
115  it != itEnd;
116  ++it
117  )
118  {
119  const std::string folderName = it->first;
120 
121  DQMFoldersMap::iterator pos = dqmFolders_.lower_bound(folderName);
122  if ( pos == dqmFolders_.end() || (dqmFolders_.key_comp()(folderName, pos->first)) )
123  {
124  pos = dqmFolders_.insert(pos, DQMFoldersMap::value_type(
125  folderName, DQMFolderPtr( new DQMFolder() )
126  ));
127  }
128  pos->second->addObjects(it->second);
129  }
130  }
131 
132 
134  {
135  if ( nUpdates_ == 0 ) return false;
136 
137  record.clear();
139 
140  // Package list of TObjects into a DQMEvent::TObjectTable
142  const size_t folderSize = populateTable(table);
143 
144  edm::StreamDQMSerializer serializer;
145  const size_t sourceSize =
146  serializer.serializeDQMEvent(table,
149 
150  // Add space for header
151  const size_t totalSize =
152  sourceSize
153  + sizeof(DQMEventHeader)
154  + 12*sizeof(uint32_t)
155  + releaseTag_.length()
156  + dqmKey_.topLevelFolderName.length()
157  + folderSize;
158 
159  DQMEventMsgBuilder builder(
160  record.getBuffer(totalSize),
161  totalSize,
163  ++sentEvents_,
164  timeStamp_,
167  (uint32_t)serializer.adler32_chksum(),
168  toolbox::net::getHostName().c_str(),
169  releaseTag_,
171  table
172  );
173  unsigned char* source = serializer.bufferPointer();
174  std::copy(source,source+sourceSize, builder.eventAddress());
175  builder.setEventLength(sourceSize);
177  {
178  // the "compression flag" contains the uncompressed size
179  builder.setCompressionFlag(serializer.currentEventSize());
180  }
181  else
182  {
183  // a size of 0 indicates no compression
184  builder.setCompressionFlag(0);
185  }
186 
189  static_cast<double>(record.totalDataSize()) / 0x100000
190  );
191 
192  return true;
193  }
194 
195 
197  {
198  size_t folderSize = 0;
199 
200  for ( DQMFoldersMap::const_iterator it = dqmFolders_.begin(), itEnd = dqmFolders_.end();
201  it != itEnd; ++it )
202  {
203  const std::string folderName = it->first;
204  const DQMFolderPtr folder = it->second;
205 
206  DQMEvent::TObjectTable::iterator pos = table.lower_bound(folderName);
207  if ( pos == table.end() || (table.key_comp()(folderName, pos->first)) )
208  {
209  std::vector<TObject*> newObjectVector;
210  pos = table.insert(pos, DQMEvent::TObjectTable::value_type(folderName, newObjectVector));
211  folderSize += 2*sizeof(uint32_t) + folderName.length();
212  }
213  folder->fillObjectVector(pos->second);
214  }
215  return folderSize;
216  }
217 
218 } // namespace stor
219 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
list table
Definition: asciidump.py:386
uint32_t adler32_chksum() const
void addDQMEvent(const DQMEventMsgView &)
size_t populateTable(DQMEvent::TObjectTable &) const
bool isReady(const utils::TimePoint_t &now) const
JetCorrectorParameters::Record record
Definition: classes.h:11
std::vector< QueueID > QueueIDs
Definition: QueueID.h:80
void addSample(const double &value=1)
void * getBuffer(size_t size) const
#define min(a, b)
Definition: mlp_lapack.h:161
Container::value_type value_type
static unsigned int sentEvents_
int serializeDQMEvent(DQMEvent::TObjectTable &toTable, bool use_compression, int compression_level)
uint32 updateNumber() const
unsigned int currentEventSize() const
utils::Duration_t readyTimeDQM_
Definition: Configuration.h:63
std::string topFolderName() const
std::auto_ptr< DQMEvent::TObjectTable > deserializeDQMEvent(DQMEventMsgView const &dqmEventView)
void addEvent(std::auto_ptr< DQMEvent::TObjectTable >)
std::string topLevelFolderName
Definition: DQMKey.h:25
const unsigned int expectedUpdates_
const T & max(const T &a, const T &b)
const DQMProcessingParams dqmParams_
const MonitoredQuantity & getNumberOfUpdatesMQ() const
utils::TimePoint_t lastUpdate_
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
unsigned char * bufferPointer() const
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
Definition: AlarmHandler.h:91
const MonitoredQuantity & getDQMEventSizeMQ() const
DQMTopLevelFolder(const DQMKey &, const QueueIDs &, const DQMProcessingParams &, DQMEventMonitorCollection &, const unsigned int expectedUpdates, AlarmHandlerPtr)
const MonitoredQuantity & getNumberOfCompleteUpdatesMQ() const
std::string releaseTag() const
static Timestamp const & invalidTimestamp()
Definition: Timestamp.cc:83
uint32_t runNumber
Definition: DQMKey.h:23
unsigned long totalDataSize() const
DQMEventMonitorCollection & dqmEventMonColl_
uint32 lumiSection() const
boost::shared_ptr< DQMFolder > DQMFolderPtr
const MonitoredQuantity & getServedDQMEventSizeMQ() const
uint32_t lumiSection
Definition: DQMKey.h:24
std::map< std::string, std::vector< TObject * > > TObjectTable
uint32 size() const
AlarmHandlerPtr alarmHandler_
void tagForEventConsumers(const QueueIDs &ids)
edm::Timestamp timeStamp() const