12 #include <toolbox/net/Utils.h>
20 unsigned long instanceNumber) :
21 streamConfigurationChanged_(
false),
22 infospaceRunNumber_(0),
46 boost::mutex::scoped_lock sl(generalMutex_);
47 return diskWriteParamCopy_;
52 boost::mutex::scoped_lock sl(generalMutex_);
58 boost::mutex::scoped_lock sl(generalMutex_);
59 return eventServeParamCopy_;
64 boost::mutex::scoped_lock sl(generalMutex_);
65 return queueConfigParamCopy_;
70 boost::mutex::scoped_lock sl(generalMutex_);
71 return workerThreadParamCopy_;
76 boost::mutex::scoped_lock sl(generalMutex_);
77 return resourceMonitorParamCopy_;
82 boost::mutex::scoped_lock sl(generalMutex_);
83 return alarmParamCopy_;
126 if (ispaceEvent.type() ==
"ItemChangedEvent")
129 dynamic_cast<xdata::ItemChangedEvent&
>(ispaceEvent).itemName();
130 if (item ==
"STparameterSet")
133 std::string tmpStreamConfiguration = smpset.
getAsString();
188 std::ostringstream oss;
189 oss << instanceNumber;
192 std::string tmpString(toolbox::net::getHostName());
195 if (pos != std::string::npos) {
196 std::string basename = tmpString.substr(0,pos);
197 tmpString = basename;
293 infoSpace->fireItemAvailable(
"fileName", &
fileName_);
294 infoSpace->fireItemAvailable(
"filePath", &
filePath_);
295 infoSpace->fireItemAvailable(
"dbFilePath", &
dbFilePath_);
297 infoSpace->fireItemAvailable(
"setupLabel", &
setupLabel_);
298 infoSpace->fireItemAvailable(
"nLogicalDisk", &
nLogicalDisk_);
299 infoSpace->fireItemAvailable(
"maxFileSize", &
maxFileSize_);
303 infoSpace->fireItemAvailable(
"fileClosingTestInterval",
307 infoSpace->fireItemAvailable(
"checkAdler32", &
checkAdler32_);
312 infoSpace->addItemChangedListener(
"STparameterSet",
this);
326 infoSpace->fireItemAvailable(
"collateDQM", &
collateDQM_);
327 infoSpace->fireItemAvailable(
"readyTimeDQM", &
readyTimeDQM_);
346 infoSpace->fireItemAvailable(
"activeConsumerTimeout",
350 infoSpace->fireItemAvailable(
"DQMactiveConsumerTimeout",
352 infoSpace->fireItemAvailable(
"DQMconsumerQueueSize",
416 infoSpace->fireItemAvailable(
"sataUser", &
sataUser_);
422 infoSpace->fireItemAvailable(
"nCopyWorkers", &
nCopyWorkers_);
437 infoSpace->fireItemAvailable(
"errorEvents", &
errorEvents_);
554 if (cfgString ==
"")
return;
557 boost::shared_ptr<edm::ProcessDesc> pdesc = py_pdesc.
processDesc();
558 boost::shared_ptr<edm::ParameterSet> smPSet = pdesc->getProcessPSet();
561 std::vector<std::string> allEndPaths =
562 smPSet->getParameter<std::vector<std::string> >(
"@end_paths");
563 for(std::vector<std::string>::iterator endPathIter = allEndPaths.begin();
564 endPathIter != allEndPaths.end(); ++endPathIter) {
567 std::vector<std::string> anEndPath =
568 smPSet->getParameter<std::vector<std::string> >((*endPathIter));
569 for(std::vector<std::string>::iterator ep2Iter = anEndPath.begin();
570 ep2Iter != anEndPath.end(); ++ep2Iter) {
575 if (! endPathPSet.
empty()) {
576 std::string mod_type =
578 if (mod_type ==
"EventStreamFileWriter") {
582 int maxFileSizeMB = endPathPSet.
getParameter<
int> (
"maxSize");
586 std::string requestedOMLabel =
589 double fractionToDisk =
598 evtCfgList->push_back(cfgInfo);
600 else if (mod_type ==
"ErrorStreamFileWriter" ||
601 mod_type ==
"FRDStreamFileWriter") {
605 int maxFileSizeMB = endPathPSet.
getParameter<
int> (
"maxSize");
609 errCfgList->push_back(cfgInfo);
std::string previousStreamCfg_
void updateLocalAlarmData()
xdata::Double fileSizeTolerance_
xdata::Double monitoringSleepSec_
T getParameter(std::string const &) const
xdata::String copyWorkersUser_
T getUntrackedParameter(std::string const &, T const &) const
xdata::UnsignedInteger32 streamQueueSize_
utils::Duration_t lumiSectionTimeOut_
boost::shared_ptr< ErrStrConfigList > ErrStrConfigListPtr
unsigned int dqmEventQueueMemoryLimitMB_
double fileSizeTolerance_
xdata::Integer nInjectWorkers_
xdata::Integer nLogicalDisk_
unsigned int discardDQMUpdatesForOlderLS_
void setDQMProcessingDefaults()
xdata::Double lumiSectionTimeOut_
void setupDiskWritingInfoSpaceParams(xdata::InfoSpace *infoSpace)
xdata::UnsignedInteger32 throuphputAveragingCycles_
xdata::Integer _DQMactiveConsumerTimeout
xdata::Double DQMEPdeqWaitTime_
bool streamConfigurationHasChanged() const
void updateLocalWorkerThreadData()
xdata::Integer compressionLevelDQM_
Duration_t secondsToDuration(double const &seconds)
unsigned int fragmentQueueMemoryLimitMB_
xdata::Double failHighWaterMark_
xdata::Double FPdeqWaitTime_
xdata::UnsignedInteger32 dqmEventQueueSize_
xdata::Integer maxFileSize_
xdata::Vector< xdata::String > otherDiskPaths_
xdata::Integer nCopyWorkers_
void setupEventServingInfoSpaceParams(xdata::InfoSpace *infoSpace)
xdata::UnsignedInteger32 registrationQueueSize_
unsigned int streamQueueMemoryLimitMB_
OtherDiskPaths otherDiskPaths_
xdata::UnsignedInteger32 fragmentQueueSize_
void updateDiskWritingParams()
unsigned int fragmentQueueSize_
std::string smInstanceString_
xdata::Double highWaterMark_
bool streamConfigurationChanged_
unsigned int fragmentStoreMemoryLimitMB_
void updateLocalDQMProcessingData()
xdata::Boolean isProductionSystem_
void updateLocalDiskWritingData()
ErrStrConfigListPtr getCurrentErrorStreamConfig() const
xdata::UnsignedInteger32 infospaceRunNumber_
struct EventServingParams eventServeParamCopy_
utils::Duration_t monitoringSleepSec_
xdata::Integer _DQMconsumerQueueSize
EvtStrConfigListPtr getCurrentEventStreamConfig() const
boost::mutex evtStrCfgMutex_
boost::mutex generalMutex_
void setupWorkerThreadInfoSpaceParams(xdata::InfoSpace *infoSpace)
std::string consumerQueuePolicy_
xdata::String consumerQueuePolicy_
utils::Duration_t readyTimeDQM_
xdata::UnsignedInteger32 fragmentStoreMemoryLimitMB_
xdata::UnsignedInteger32 fragmentQueueMemoryLimitMB_
std::string _DQMconsumerQueuePolicy
boost::shared_ptr< EvtStrConfigList > EvtStrConfigListPtr
void setWorkerThreadDefaults()
std::string getAsString() const
unsigned int registrationQueueSize_
unsigned int throuphputAveragingCycles_
xdata::Double staleFragmentTimeOut_
boost::mutex errStrCfgMutex_
xdata::String copyWorkersCommand_
struct WorkerThreadParams workerThreadParamCopy_
xdata::UnsignedInteger32 commandQueueSize_
void setupResourceMonitorInfoSpaceParams(xdata::InfoSpace *infoSpace)
utils::Duration_t activeConsumerTimeout_
boost::posix_time::time_duration DQMEPdeqWaitTime_
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
void parseStreamConfiguration(std::string cfgString, EvtStrConfigListPtr evtCfgList, ErrStrConfigListPtr errCfgList)
unsigned int localRunNumber_
xdata::Integer readyTimeDQM_
struct AlarmParams alarmParamCopy_
void getStdVector(xdata::Vector< xdata::String > &, std::vector< std::string > &)
utils::Duration_t staleFragmentTimeOut_
boost::posix_time::time_duration FPdeqWaitTime_
unsigned int unwantedEvents_
xdata::String dbFilePath_
void setQueueConfigurationDefaults()
void setupAlarmInfoSpaceParams(xdata::InfoSpace *infoSpace)
WorkerParams copyWorkers_
struct DQMProcessingParams dqmParamCopy_
void updateLocalRunNumberData()
unsigned int getRunNumber() const
unsigned int dqmEventQueueSize_
double failHighWaterMark_
Configuration(xdata::InfoSpace *infoSpace, unsigned long instanceNumber)
xdata::String injectWorkersCommand_
xdata::Boolean collateDQM_
xdata::String faultyEventsStream_
utils::Duration_t _DQMactiveConsumerTimeout
void setResourceMonitorDefaults()
void updateLocalResourceMonitorData()
xdata::String _DQMconsumerQueuePolicy
boost::shared_ptr< edm::ProcessDesc > processDesc()
std::string faultyEventsStream_
bool careAboutUnwantedEvents_
virtual void actionPerformed(xdata::Event &isEvt)
xdata::Integer activeConsumerTimeout_
void setupDQMProcessingInfoSpaceParams(xdata::InfoSpace *infoSpace)
void getXdataVector(const std::vector< std::string > &, xdata::Vector< xdata::String > &)
WorkerParams injectWorkers_
xdata::UnsignedInteger32 unwantedEvents_
xdata::UnsignedInteger32 streamQueueMemoryLimitMB_
xdata::UnsignedInteger32 errorEvents_
void setCurrentErrorStreamConfig(ErrStrConfigListPtr cfgList)
utils::Duration_t fileClosingTestInterval_
void setCurrentEventStreamConfig(EvtStrConfigListPtr cfgList)
xdata::Integer consumerQueueSize_
void setEventServingDefaults()
unsigned int errorEvents_
xdata::Boolean careAboutUnwantedEvents_
xdata::Boolean checkAdler32_
boost::posix_time::time_duration DWdeqWaitTime_
double durationToSeconds(Duration_t const &)
unsigned int commandQueueSize_
void updateLocalQueueConfigurationData()
int _DQMconsumerQueueSize
xdata::String streamConfiguration_
xdata::UnsignedInteger32 dqmEventQueueMemoryLimitMB_
xdata::String setupLabel_
std::string streamConfiguration_
void updateLocalEventServingData()
EvtStrConfigListPtr currentEventStreamConfig_
xdata::UnsignedInteger32 discardDQMUpdatesForOlderLS_
xdata::Integer fileClosingTestInterval_
xdata::Boolean useCompressionDQM_
void setupQueueConfigurationInfoSpaceParams(xdata::InfoSpace *infoSpace)
struct ResourceMonitorParams resourceMonitorParamCopy_
struct DiskWritingParams diskWriteParamCopy_
ErrStrConfigListPtr currentErrorStreamConfig_
unsigned int streamQueueSize_
void setDiskWritingDefaults(unsigned long instanceNumber)
struct QueueConfigurationParams queueConfigParamCopy_
xdata::String injectWorkersUser_
xdata::Double DWdeqWaitTime_