Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h"
00005 #include "EventFilter/StorageManager/interface/Exception.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007
00008 using std::string;
00009
00010 namespace stor
00011 {
00012 RegistrationInfoBase::RegistrationInfoBase
00013 (
00014 const std::string& consumerName,
00015 const std::string& remoteHost,
00016 const int& queueSize,
00017 const enquing_policy::PolicyTag& queuePolicy,
00018 const utils::Duration_t& secondsToStale
00019 ) :
00020 remoteHost_(remoteHost),
00021 consumerName_(consumerName),
00022 queueSize_(queueSize),
00023 queuePolicy_(queuePolicy),
00024 secondsToStale_(secondsToStale),
00025 consumerId_(0),
00026 lastConsumerContact_(utils::getCurrentTime())
00027 { }
00028
00029 RegistrationInfoBase::RegistrationInfoBase
00030 (
00031 const edm::ParameterSet& pset,
00032 const std::string& remoteHost,
00033 const EventServingParams& eventServingParams,
00034 const bool useEventServingParams
00035 ) :
00036 remoteHost_(remoteHost),
00037 consumerId_(0),
00038 lastConsumerContact_(utils::getCurrentTime())
00039 {
00040 try
00041 {
00042 consumerName_ = pset.getUntrackedParameter<std::string>("consumerName");
00043 }
00044 catch( edm::Exception& e )
00045 {
00046 consumerName_ = pset.getUntrackedParameter<std::string>("DQMconsumerName", "Unknown");
00047 }
00048
00049 try
00050 {
00051 sourceURL_ = pset.getParameter<std::string>("sourceURL");
00052 }
00053 catch( edm::Exception& e )
00054 {
00055 sourceURL_ = pset.getUntrackedParameter<std::string>("sourceURL", "Unknown");
00056 }
00057
00058 const double maxEventRequestRate = pset.getUntrackedParameter<double>("maxEventRequestRate", 0);
00059 if ( maxEventRequestRate > 0 )
00060 minEventRequestInterval_ = utils::secondsToDuration(1 / maxEventRequestRate);
00061 else
00062 minEventRequestInterval_ = boost::posix_time::not_a_date_time;
00063
00064 maxConnectTries_ = pset.getUntrackedParameter<int>("maxConnectTries", 300);
00065
00066 connectTrySleepTime_ = pset.getUntrackedParameter<int>("connectTrySleepTime", 10);
00067
00068 retryInterval_ = pset.getUntrackedParameter<int>("retryInterval", 5);
00069
00070 queueSize_ = pset.getUntrackedParameter<int>("queueSize",
00071 useEventServingParams ? eventServingParams.consumerQueueSize_ : 0);
00072
00073 const std::string policy =
00074 pset.getUntrackedParameter<std::string>("queuePolicy",
00075 useEventServingParams ? eventServingParams.consumerQueuePolicy_ : "Default");
00076 if ( policy == "DiscardNew" )
00077 {
00078 queuePolicy_ = enquing_policy::DiscardNew;
00079 }
00080 else if ( policy == "DiscardOld" )
00081 {
00082 queuePolicy_ = enquing_policy::DiscardOld;
00083 }
00084 else if ( policy == "Default" )
00085 {
00086 queuePolicy_ = enquing_policy::Max;
00087 }
00088 else
00089 {
00090 XCEPT_RAISE( stor::exception::ConsumerRegistration,
00091 "Unknown enqueuing policy: " + policy );
00092 }
00093
00094 secondsToStale_ = utils::secondsToDuration(
00095 pset.getUntrackedParameter<double>("consumerTimeOut", 0)
00096 );
00097 if ( useEventServingParams && secondsToStale_ < boost::posix_time::seconds(1) )
00098 secondsToStale_ = eventServingParams.activeConsumerTimeout_;
00099 }
00100
00101 edm::ParameterSet RegistrationInfoBase::getPSet() const
00102 {
00103 edm::ParameterSet pset;
00104
00105 if ( consumerName_ != "Unknown" )
00106 pset.addUntrackedParameter<std::string>("consumerName", consumerName_);
00107
00108 if ( sourceURL_ != "Unknown" )
00109 pset.addParameter<std::string>("sourceURL", sourceURL_);
00110
00111 if ( maxConnectTries_ != 300 )
00112 pset.addUntrackedParameter<int>("maxConnectTries", maxConnectTries_);
00113
00114 if ( connectTrySleepTime_ != 10 )
00115 pset.addUntrackedParameter<int>("connectTrySleepTime", connectTrySleepTime_);
00116
00117 if ( retryInterval_ != 5 )
00118 pset.addUntrackedParameter<int>("retryInterval", retryInterval_);
00119
00120 if ( queueSize_ > 0 )
00121 pset.addUntrackedParameter<int>("queueSize", queueSize_);
00122
00123 if ( ! minEventRequestInterval_.is_not_a_date_time() )
00124 {
00125 const double rate = 1 / utils::durationToSeconds(minEventRequestInterval_);
00126 pset.addUntrackedParameter<double>("maxEventRequestRate", rate);
00127 }
00128
00129 const double secondsToStale = utils::durationToSeconds(secondsToStale_);
00130 if ( secondsToStale > 0 )
00131 pset.addUntrackedParameter<double>("consumerTimeOut", secondsToStale);
00132
00133 if ( queuePolicy_ == enquing_policy::DiscardNew )
00134 pset.addUntrackedParameter<std::string>("queuePolicy", "DiscardNew");
00135 if ( queuePolicy_ == enquing_policy::DiscardOld )
00136 pset.addUntrackedParameter<std::string>("queuePolicy", "DiscardOld");
00137
00138 do_appendToPSet(pset);
00139
00140 return pset;
00141 }
00142
00143 bool RegistrationInfoBase::operator<(const RegistrationInfoBase& other) const
00144 {
00145 if ( queueSize() != other.queueSize() )
00146 return ( queueSize() < other.queueSize() );
00147 if ( queuePolicy() != other.queuePolicy() )
00148 return ( queuePolicy() < other.queuePolicy() );
00149 return ( secondsToStale() < other.secondsToStale() );
00150 }
00151
00152 bool RegistrationInfoBase::operator==(const RegistrationInfoBase& other) const
00153 {
00154 return (
00155 queueSize() == other.queueSize() &&
00156 queuePolicy() == other.queuePolicy() &&
00157 secondsToStale() == other.secondsToStale()
00158 );
00159 }
00160
00161 bool RegistrationInfoBase::operator!=(const RegistrationInfoBase& other) const
00162 {
00163 return ! ( *this == other );
00164 }
00165
00166 void RegistrationInfoBase::queueInfo(std::ostream& os) const
00167 {
00168 os << "Queue type: " << queuePolicy_ <<
00169 ", size " << queueSize_ <<
00170 ", timeout " << secondsToStale_.total_seconds() << "s";
00171 }
00172
00173 std::ostream& operator<< (std::ostream& os,
00174 RegistrationInfoBase const& ri)
00175 {
00176 os << "\n Consumer name: " << ri.consumerName()
00177 << "\n Consumer id: " << ri.consumerId()
00178 << "\n Source URL: " << ri.sourceURL()
00179 << "\n Remote Host: " << ri.remoteHost()
00180 << "\n Queue id: " << ri.queueId()
00181 << "\n Maximum size of queue: " << ri.queueSize()
00182 << "\n Policy used if queue is full: " << ri.queuePolicy()
00183 << "\n Time until queue becomes stale (seconds): " << ri.secondsToStale().total_seconds();
00184 return os;
00185 }
00186 }
00187
00188