CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Configuration.cc
Go to the documentation of this file.
1 // $Id: Configuration.cc,v 1.47 2012/06/08 10:20:33 mommsen Exp $
3 
11 
12 #include <toolbox/net/Utils.h>
13 
14 #include <sstream>
15 
16 
17 namespace stor
18 {
19  Configuration::Configuration(xdata::InfoSpace* infoSpace,
20  unsigned long instanceNumber) :
21  streamConfigurationChanged_(false),
22  infospaceRunNumber_(0),
23  localRunNumber_(0)
24  {
25  // default values are used to initialize infospace values,
26  // so they should be set first
27  setDiskWritingDefaults(instanceNumber);
34 
41  setupAlarmInfoSpaceParams(infoSpace);
42  }
43 
44  struct DiskWritingParams Configuration::getDiskWritingParams() const
45  {
46  boost::mutex::scoped_lock sl(generalMutex_);
47  return diskWriteParamCopy_;
48  }
49 
50  struct DQMProcessingParams Configuration::getDQMProcessingParams() const
51  {
52  boost::mutex::scoped_lock sl(generalMutex_);
53  return dqmParamCopy_;
54  }
55 
56  struct EventServingParams Configuration::getEventServingParams() const
57  {
58  boost::mutex::scoped_lock sl(generalMutex_);
59  return eventServeParamCopy_;
60  }
61 
62  struct QueueConfigurationParams Configuration::getQueueConfigurationParams() const
63  {
64  boost::mutex::scoped_lock sl(generalMutex_);
65  return queueConfigParamCopy_;
66  }
67 
68  struct WorkerThreadParams Configuration::getWorkerThreadParams() const
69  {
70  boost::mutex::scoped_lock sl(generalMutex_);
71  return workerThreadParamCopy_;
72  }
73 
74  struct ResourceMonitorParams Configuration::getResourceMonitorParams() const
75  {
76  boost::mutex::scoped_lock sl(generalMutex_);
77  return resourceMonitorParamCopy_;
78  }
79 
80  struct AlarmParams Configuration::getAlarmParams() const
81  {
82  boost::mutex::scoped_lock sl(generalMutex_);
83  return alarmParamCopy_;
84  }
85 
87  {
88  boost::mutex::scoped_lock sl(generalMutex_);
97  }
98 
99  unsigned int Configuration::getRunNumber() const
100  {
101  return localRunNumber_;
102  }
103 
105  {
106  boost::mutex::scoped_lock sl(generalMutex_);
108  }
109 
111  {
112  boost::mutex::scoped_lock sl(generalMutex_);
114  }
115 
117  {
118  boost::mutex::scoped_lock sl(generalMutex_);
120  }
121 
122  void Configuration::actionPerformed(xdata::Event& ispaceEvent)
123  {
124  boost::mutex::scoped_lock sl(generalMutex_);
125 
126  if (ispaceEvent.type() == "ItemChangedEvent")
127  {
128  std::string item =
129  dynamic_cast<xdata::ItemChangedEvent&>(ispaceEvent).itemName();
130  if (item == "STparameterSet")
131  {
133  std::string tmpStreamConfiguration = smpset.getAsString();
134 
135  if (tmpStreamConfiguration != previousStreamCfg_)
136  {
138  previousStreamCfg_ = tmpStreamConfiguration;
139  }
140  }
141  }
142  }
143 
145  {
146  boost::mutex::scoped_lock sl(evtStrCfgMutex_);
147  currentEventStreamConfig_ = cfgList;
148  }
149 
151  {
152  boost::mutex::scoped_lock sl(errStrCfgMutex_);
153  currentErrorStreamConfig_ = cfgList;
154  }
155 
157  {
158  boost::mutex::scoped_lock sl(evtStrCfgMutex_);
160  }
161 
163  {
164  boost::mutex::scoped_lock sl(errStrCfgMutex_);
166  }
167 
168  void Configuration::setDiskWritingDefaults(unsigned long instanceNumber)
169  {
171  diskWriteParamCopy_.fileName_ = "storageManager";
173  diskWriteParamCopy_.dbFilePath_ = ""; // use default filePath_+"/log"
185 
187 
188  std::ostringstream oss;
189  oss << instanceNumber;
191 
192  std::string tmpString(toolbox::net::getHostName());
193  // strip domainame
194  std::string::size_type pos = tmpString.find('.');
195  if (pos != std::string::npos) {
196  std::string basename = tmpString.substr(0,pos);
197  tmpString = basename;
198  }
199  diskWriteParamCopy_.hostName_ = tmpString;
200 
202  }
203 
205  {
206  dqmParamCopy_.collateDQM_ = true;
211  }
212 
214  {
221  }
222 
224  {
234  }
235 
237  {
238  // set defaults
239  workerThreadParamCopy_.FPdeqWaitTime_ = boost::posix_time::millisec(250);
240  workerThreadParamCopy_.DWdeqWaitTime_ = boost::posix_time::millisec(500);
241  workerThreadParamCopy_.DQMEPdeqWaitTime_ = boost::posix_time::millisec(500);
242 
246  }
247 
249  {
250  // set defaults
253  resourceMonitorParamCopy_.injectWorkers_.command_ = "/InjectWorker.pl /store/global";
256  resourceMonitorParamCopy_.copyWorkers_.command_ = "CopyManager/CopyWorker.pl";
258  }
259 
261  {
262  // set defaults
267  }
268 
269  void Configuration::
270  setupDiskWritingInfoSpaceParams(xdata::InfoSpace* infoSpace)
271  {
272  // copy the initial defaults into the xdata variables
287 
289 
290 
291  // bind the local xdata variables to the infospace
292  infoSpace->fireItemAvailable("STparameterSet", &streamConfiguration_);
293  infoSpace->fireItemAvailable("fileName", &fileName_);
294  infoSpace->fireItemAvailable("filePath", &filePath_);
295  infoSpace->fireItemAvailable("dbFilePath", &dbFilePath_);
296  infoSpace->fireItemAvailable("otherDiskPaths", &otherDiskPaths_);
297  infoSpace->fireItemAvailable("setupLabel", &setupLabel_);
298  infoSpace->fireItemAvailable("nLogicalDisk", &nLogicalDisk_);
299  infoSpace->fireItemAvailable("maxFileSize", &maxFileSize_);
300  infoSpace->fireItemAvailable("highWaterMark", &highWaterMark_);
301  infoSpace->fireItemAvailable("failHighWaterMark", &failHighWaterMark_);
302  infoSpace->fireItemAvailable("lumiSectionTimeOut", &lumiSectionTimeOut_);
303  infoSpace->fireItemAvailable("fileClosingTestInterval",
305  infoSpace->fireItemAvailable("fileSizeTolerance", &fileSizeTolerance_);
306  infoSpace->fireItemAvailable("faultyEventsStream", &faultyEventsStream_);
307  infoSpace->fireItemAvailable("checkAdler32", &checkAdler32_);
308 
309  // special handling for the stream configuration string (we
310  // want to note when it changes to see if we need to reconfigure
311  // between runs)
312  infoSpace->addItemChangedListener("STparameterSet", this);
313  }
314 
315  void Configuration::
316  setupDQMProcessingInfoSpaceParams(xdata::InfoSpace* infoSpace)
317  {
318  // copy the initial defaults to the xdata variables
320  readyTimeDQM_ = dqmParamCopy_.readyTimeDQM_.total_seconds();
324 
325  // bind the local xdata variables to the infospace
326  infoSpace->fireItemAvailable("collateDQM", &collateDQM_);
327  infoSpace->fireItemAvailable("readyTimeDQM", &readyTimeDQM_);
328  infoSpace->fireItemAvailable("useCompressionDQM", &useCompressionDQM_);
329  infoSpace->fireItemAvailable("compressionLevelDQM", &compressionLevelDQM_);
330  infoSpace->fireItemAvailable("discardDQMUpdatesForOlderLS", &discardDQMUpdatesForOlderLS_);
331  }
332 
333  void Configuration::
334  setupEventServingInfoSpaceParams(xdata::InfoSpace* infoSpace)
335  {
336  // copy the initial defaults to the xdata variables
343 
344  // bind the local xdata variables to the infospace
345  infoSpace->fireItemAvailable("runNumber", &infospaceRunNumber_);
346  infoSpace->fireItemAvailable("activeConsumerTimeout",
348  infoSpace->fireItemAvailable("consumerQueueSize",&consumerQueueSize_);
349  infoSpace->fireItemAvailable("consumerQueuePolicy",&consumerQueuePolicy_);
350  infoSpace->fireItemAvailable("DQMactiveConsumerTimeout",
352  infoSpace->fireItemAvailable("DQMconsumerQueueSize",
354  infoSpace->fireItemAvailable("DQMconsumerQueuePolicy",&_DQMconsumerQueuePolicy);
355  }
356 
357  void Configuration::
358  setupQueueConfigurationInfoSpaceParams(xdata::InfoSpace* infoSpace)
359  {
360  // copy the initial defaults to the xdata variables
370 
371  // bind the local xdata variables to the infospace
372  infoSpace->fireItemAvailable("commandQueueSize", &commandQueueSize_);
373  infoSpace->fireItemAvailable("dqmEventQueueSize", &dqmEventQueueSize_);
374  infoSpace->fireItemAvailable("dqmEventQueueMemoryLimitMB", &dqmEventQueueMemoryLimitMB_);
375  infoSpace->fireItemAvailable("fragmentQueueSize", &fragmentQueueSize_);
376  infoSpace->fireItemAvailable("fragmentQueueMemoryLimitMB", &fragmentQueueMemoryLimitMB_);
377  infoSpace->fireItemAvailable("registrationQueueSize", &registrationQueueSize_);
378  infoSpace->fireItemAvailable("streamQueueSize", &streamQueueSize_);
379  infoSpace->fireItemAvailable("streamQueueMemoryLimitMB", &streamQueueMemoryLimitMB_);
380  infoSpace->fireItemAvailable("fragmentStoreMemoryLimitMB", &fragmentStoreMemoryLimitMB_);
381  }
382 
383  void Configuration::
384  setupWorkerThreadInfoSpaceParams(xdata::InfoSpace* infoSpace)
385  {
386  // copy the initial defaults to the xdata variables
393 
394  // bind the local xdata variables to the infospace
395  infoSpace->fireItemAvailable("FPdeqWaitTime", &FPdeqWaitTime_);
396  infoSpace->fireItemAvailable("DWdeqWaitTime", &DWdeqWaitTime_);
397  infoSpace->fireItemAvailable("DQMEPdeqWaitTime", &DQMEPdeqWaitTime_);
398  infoSpace->fireItemAvailable("staleFragmentTimeOut", &staleFragmentTimeOut_);
399  infoSpace->fireItemAvailable("monitoringSleepSec", &monitoringSleepSec_);
400  infoSpace->fireItemAvailable("throuphputAveragingCycles", &throuphputAveragingCycles_);
401  }
402 
403  void Configuration::
404  setupResourceMonitorInfoSpaceParams(xdata::InfoSpace* infoSpace)
405  {
406  // copy the initial defaults to the xdata variables
414 
415  // bind the local xdata variables to the infospace
416  infoSpace->fireItemAvailable("sataUser", &sataUser_);
417  infoSpace->fireItemAvailable("injectWorkersUser", &injectWorkersUser_);
418  infoSpace->fireItemAvailable("injectWorkersCommand", &injectWorkersCommand_);
419  infoSpace->fireItemAvailable("nInjectWorkers", &nInjectWorkers_);
420  infoSpace->fireItemAvailable("copyWorkersUser", &copyWorkersUser_);
421  infoSpace->fireItemAvailable("copyWorkersCommand", &copyWorkersCommand_);
422  infoSpace->fireItemAvailable("nCopyWorkers", &nCopyWorkers_);
423  }
424 
425  void Configuration::
426  setupAlarmInfoSpaceParams(xdata::InfoSpace* infoSpace)
427  {
428  // copy the initial defaults to the xdata variables
433 
434  // bind the local xdata variables to the infospace
435  infoSpace->fireItemAvailable("isProductionSystem", &isProductionSystem_);
436  infoSpace->fireItemAvailable("careAboutUnwantedEvents", &careAboutUnwantedEvents_);
437  infoSpace->fireItemAvailable("errorEvents", &errorEvents_);
438  infoSpace->fireItemAvailable("unwantedEvents", &unwantedEvents_);
439  }
440 
442  {
445 
448  if ( dbFilePath_.value_.empty() )
449  diskWriteParamCopy_.dbFilePath_ = filePath_.value_ + "/log";
450  else
463 
465 
466 
468  }
469 
471  {
474  boost::posix_time::seconds( static_cast<int>(readyTimeDQM_) );
478  }
479 
481  {
490 
491  // validation
493  {
495  }
497  {
499  }
500  }
501 
503  {
513  }
514 
516  {
520 
524  }
525 
527  {
535  }
536 
538  {
543  }
544 
546  {
548  }
549 
550  void parseStreamConfiguration(std::string cfgString,
551  EvtStrConfigListPtr evtCfgList,
552  ErrStrConfigListPtr errCfgList)
553  {
554  if (cfgString == "") return;
555 
556  PythonProcessDesc py_pdesc(cfgString.c_str());
557  boost::shared_ptr<edm::ProcessDesc> pdesc = py_pdesc.processDesc();
558  boost::shared_ptr<edm::ParameterSet> smPSet = pdesc->getProcessPSet();
559 
560  // loop over each end path
561  std::vector<std::string> allEndPaths =
562  smPSet->getParameter<std::vector<std::string> >("@end_paths");
563  for(std::vector<std::string>::iterator endPathIter = allEndPaths.begin();
564  endPathIter != allEndPaths.end(); ++endPathIter) {
565 
566  // loop over each element in the end path list (not sure why...)
567  std::vector<std::string> anEndPath =
568  smPSet->getParameter<std::vector<std::string> >((*endPathIter));
569  for(std::vector<std::string>::iterator ep2Iter = anEndPath.begin();
570  ep2Iter != anEndPath.end(); ++ep2Iter) {
571 
572  // fetch the end path parameter set
573  edm::ParameterSet endPathPSet =
574  smPSet->getParameter<edm::ParameterSet>((*ep2Iter));
575  if (! endPathPSet.empty()) {
576  std::string mod_type =
577  endPathPSet.getParameter<std::string> ("@module_type");
578  if (mod_type == "EventStreamFileWriter") {
579 
580  std::string streamLabel =
581  endPathPSet.getParameter<std::string> ("streamLabel");
582  int maxFileSizeMB = endPathPSet.getParameter<int> ("maxSize");
583  std::string newRequestedEvents = endPathPSet.getUntrackedParameter("TriggerSelector",std::string());
584  Strings requestedEvents =
586  std::string requestedOMLabel =
587  endPathPSet.getUntrackedParameter<std::string>("SelectHLTOutput",
588  std::string());
589  double fractionToDisk =
590  endPathPSet.getUntrackedParameter<double>("fractionToDisk", 1);
591 
592  EventStreamConfigurationInfo cfgInfo(streamLabel,
593  maxFileSizeMB,
594  newRequestedEvents,
595  requestedEvents,
596  requestedOMLabel,
597  fractionToDisk);
598  evtCfgList->push_back(cfgInfo);
599  }
600  else if (mod_type == "ErrorStreamFileWriter" ||
601  mod_type == "FRDStreamFileWriter") {
602 
603  std::string streamLabel =
604  endPathPSet.getParameter<std::string> ("streamLabel");
605  int maxFileSizeMB = endPathPSet.getParameter<int> ("maxSize");
606 
607  ErrorStreamConfigurationInfo cfgInfo(streamLabel,
608  maxFileSizeMB);
609  errCfgList->push_back(cfgInfo);
610  }
611  }
612  }
613  }
614  }
615 
616 } // namespace stor
617 
std::string previousStreamCfg_
xdata::Double fileSizeTolerance_
xdata::Double monitoringSleepSec_
T getParameter(std::string const &) const
bool empty() const
Definition: ParameterSet.h:219
xdata::String copyWorkersUser_
T getUntrackedParameter(std::string const &, T const &) const
xdata::UnsignedInteger32 streamQueueSize_
utils::Duration_t lumiSectionTimeOut_
Definition: Configuration.h:41
boost::shared_ptr< ErrStrConfigList > ErrStrConfigListPtr
double seconds()
unsigned int dqmEventQueueMemoryLimitMB_
Definition: Configuration.h:91
xdata::Integer nInjectWorkers_
xdata::Integer nLogicalDisk_
unsigned int discardDQMUpdatesForOlderLS_
Definition: Configuration.h:66
xdata::Double lumiSectionTimeOut_
void setupDiskWritingInfoSpaceParams(xdata::InfoSpace *infoSpace)
xdata::String fileName_
xdata::UnsignedInteger32 throuphputAveragingCycles_
xdata::Integer _DQMactiveConsumerTimeout
xdata::Double DQMEPdeqWaitTime_
bool streamConfigurationHasChanged() const
void updateLocalWorkerThreadData()
std::vector< std::string > Strings
Definition: MsgTools.h:18
xdata::Integer compressionLevelDQM_
Duration_t secondsToDuration(double const &seconds)
Definition: Utils.h:140
unsigned int fragmentQueueMemoryLimitMB_
Definition: Configuration.h:93
xdata::Double failHighWaterMark_
xdata::Double FPdeqWaitTime_
xdata::UnsignedInteger32 dqmEventQueueSize_
xdata::Integer maxFileSize_
xdata::Vector< xdata::String > otherDiskPaths_
xdata::Integer nCopyWorkers_
void setupEventServingInfoSpaceParams(xdata::InfoSpace *infoSpace)
xdata::UnsignedInteger32 registrationQueueSize_
unsigned int streamQueueMemoryLimitMB_
Definition: Configuration.h:96
OtherDiskPaths otherDiskPaths_
Definition: Configuration.h:48
xdata::UnsignedInteger32 fragmentQueueSize_
std::string smInstanceString_
Definition: Configuration.h:51
xdata::Double highWaterMark_
xdata::String sataUser_
unsigned int fragmentStoreMemoryLimitMB_
Definition: Configuration.h:97
void updateLocalDQMProcessingData()
xdata::Boolean isProductionSystem_
void updateLocalDiskWritingData()
ErrStrConfigListPtr getCurrentErrorStreamConfig() const
xdata::UnsignedInteger32 infospaceRunNumber_
struct EventServingParams eventServeParamCopy_
xdata::String filePath_
utils::Duration_t monitoringSleepSec_
xdata::Integer _DQMconsumerQueueSize
EvtStrConfigListPtr getCurrentEventStreamConfig() const
uint16_t size_type
boost::mutex evtStrCfgMutex_
boost::mutex generalMutex_
void setupWorkerThreadInfoSpaceParams(xdata::InfoSpace *infoSpace)
std::string consumerQueuePolicy_
Definition: Configuration.h:77
xdata::String consumerQueuePolicy_
utils::Duration_t readyTimeDQM_
Definition: Configuration.h:63
xdata::UnsignedInteger32 fragmentStoreMemoryLimitMB_
xdata::UnsignedInteger32 fragmentQueueMemoryLimitMB_
std::string _DQMconsumerQueuePolicy
Definition: Configuration.h:80
boost::shared_ptr< EvtStrConfigList > EvtStrConfigListPtr
unsigned int throuphputAveragingCycles_
xdata::Double staleFragmentTimeOut_
boost::mutex errStrCfgMutex_
xdata::String copyWorkersCommand_
struct WorkerThreadParams workerThreadParamCopy_
xdata::UnsignedInteger32 commandQueueSize_
void setupResourceMonitorInfoSpaceParams(xdata::InfoSpace *infoSpace)
utils::Duration_t activeConsumerTimeout_
Definition: Configuration.h:75
boost::posix_time::time_duration DQMEPdeqWaitTime_
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
void parseStreamConfiguration(std::string cfgString, EvtStrConfigListPtr evtCfgList, ErrStrConfigListPtr errCfgList)
unsigned int localRunNumber_
xdata::Integer readyTimeDQM_
struct AlarmParams alarmParamCopy_
void getStdVector(xdata::Vector< xdata::String > &, std::vector< std::string > &)
Definition: Utils.cc:105
utils::Duration_t staleFragmentTimeOut_
boost::posix_time::time_duration FPdeqWaitTime_
unsigned int unwantedEvents_
xdata::String dbFilePath_
void setQueueConfigurationDefaults()
void setupAlarmInfoSpaceParams(xdata::InfoSpace *infoSpace)
struct DQMProcessingParams dqmParamCopy_
unsigned int getRunNumber() const
Configuration(xdata::InfoSpace *infoSpace, unsigned long instanceNumber)
xdata::String injectWorkersCommand_
xdata::Boolean collateDQM_
xdata::String faultyEventsStream_
utils::Duration_t _DQMactiveConsumerTimeout
Definition: Configuration.h:78
void setResourceMonitorDefaults()
void updateLocalResourceMonitorData()
xdata::String _DQMconsumerQueuePolicy
boost::shared_ptr< edm::ProcessDesc > processDesc()
std::string faultyEventsStream_
Definition: Configuration.h:44
virtual void actionPerformed(xdata::Event &isEvt)
xdata::Integer activeConsumerTimeout_
void setupDQMProcessingInfoSpaceParams(xdata::InfoSpace *infoSpace)
string const
Definition: compareJSON.py:14
void getXdataVector(const std::vector< std::string > &, xdata::Vector< xdata::String > &)
Definition: Utils.cc:119
xdata::UnsignedInteger32 unwantedEvents_
xdata::UnsignedInteger32 streamQueueMemoryLimitMB_
xdata::UnsignedInteger32 errorEvents_
void setCurrentErrorStreamConfig(ErrStrConfigListPtr cfgList)
utils::Duration_t fileClosingTestInterval_
Definition: Configuration.h:42
void setCurrentEventStreamConfig(EvtStrConfigListPtr cfgList)
xdata::Integer consumerQueueSize_
unsigned int errorEvents_
xdata::Boolean careAboutUnwantedEvents_
xdata::Boolean checkAdler32_
boost::posix_time::time_duration DWdeqWaitTime_
double durationToSeconds(Duration_t const &)
Definition: Utils.h:147
void updateLocalQueueConfigurationData()
xdata::String streamConfiguration_
xdata::UnsignedInteger32 dqmEventQueueMemoryLimitMB_
xdata::String setupLabel_
std::string streamConfiguration_
Definition: Configuration.h:32
void updateLocalEventServingData()
EvtStrConfigListPtr currentEventStreamConfig_
xdata::UnsignedInteger32 discardDQMUpdatesForOlderLS_
xdata::Integer fileClosingTestInterval_
xdata::Boolean useCompressionDQM_
void setupQueueConfigurationInfoSpaceParams(xdata::InfoSpace *infoSpace)
struct ResourceMonitorParams resourceMonitorParamCopy_
struct DiskWritingParams diskWriteParamCopy_
ErrStrConfigListPtr currentErrorStreamConfig_
void setDiskWritingDefaults(unsigned long instanceNumber)
struct QueueConfigurationParams queueConfigParamCopy_
xdata::String injectWorkersUser_
xdata::Double DWdeqWaitTime_