CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/StorageManager/src/RegistrationInfoBase.cc

Go to the documentation of this file.
00001 // $Id: RegistrationInfoBase.cc,v 1.4 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h"
00005 #include "EventFilter/StorageManager/interface/Exception.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007 
00008 using std::string;
00009 
00010 namespace stor
00011 {
00012   RegistrationInfoBase::RegistrationInfoBase
00013   (
00014     const std::string& consumerName,
00015     const std::string& remoteHost,
00016     const int& queueSize,
00017     const enquing_policy::PolicyTag& queuePolicy,
00018     const utils::Duration_t& secondsToStale
00019   ) :
00020   remoteHost_(remoteHost),
00021   consumerName_(consumerName),
00022   queueSize_(queueSize),
00023   queuePolicy_(queuePolicy),
00024   secondsToStale_(secondsToStale),
00025   consumerId_(0),
00026   lastConsumerContact_(utils::getCurrentTime())
00027   { }
00028 
00029   RegistrationInfoBase::RegistrationInfoBase
00030   (
00031     const edm::ParameterSet& pset,
00032     const std::string& remoteHost,
00033     const EventServingParams& eventServingParams,
00034     const bool useEventServingParams
00035   ) :
00036   remoteHost_(remoteHost),
00037   consumerId_(0),
00038   lastConsumerContact_(utils::getCurrentTime())
00039   {
00040     try
00041     {
00042       consumerName_ = pset.getUntrackedParameter<std::string>("consumerName");
00043     }
00044     catch( edm::Exception& e )
00045     {
00046       consumerName_ = pset.getUntrackedParameter<std::string>("DQMconsumerName", "Unknown");
00047     }
00048 
00049     try
00050     {
00051       sourceURL_ = pset.getParameter<std::string>("sourceURL");
00052     }
00053     catch( edm::Exception& e )
00054     {
00055       sourceURL_ = pset.getUntrackedParameter<std::string>("sourceURL", "Unknown");
00056     }
00057 
00058     const double maxEventRequestRate = pset.getUntrackedParameter<double>("maxEventRequestRate", 0);
00059     if ( maxEventRequestRate > 0 )
00060       minEventRequestInterval_ = utils::secondsToDuration(1 / maxEventRequestRate);
00061     else
00062       minEventRequestInterval_ = boost::posix_time::not_a_date_time;
00063 
00064     maxConnectTries_ = pset.getUntrackedParameter<int>("maxConnectTries", 300);
00065 
00066     connectTrySleepTime_ = pset.getUntrackedParameter<int>("connectTrySleepTime", 10);
00067 
00068     retryInterval_ = pset.getUntrackedParameter<int>("retryInterval", 5);
00069 
00070     queueSize_ = pset.getUntrackedParameter<int>("queueSize",
00071       useEventServingParams ? eventServingParams.consumerQueueSize_ : 0);
00072 
00073     const std::string policy =
00074       pset.getUntrackedParameter<std::string>("queuePolicy",
00075         useEventServingParams ? eventServingParams.consumerQueuePolicy_ : "Default");
00076     if ( policy == "DiscardNew" )
00077     {
00078       queuePolicy_ = enquing_policy::DiscardNew;
00079     }
00080     else if ( policy == "DiscardOld" )
00081     {
00082       queuePolicy_ = enquing_policy::DiscardOld;
00083     }
00084     else if ( policy == "Default" )
00085     {
00086       queuePolicy_ = enquing_policy::Max;
00087     }
00088     else
00089     {
00090       XCEPT_RAISE( stor::exception::ConsumerRegistration,
00091         "Unknown enqueuing policy: " + policy );
00092     }
00093 
00094     secondsToStale_ = utils::secondsToDuration(
00095       pset.getUntrackedParameter<double>("consumerTimeOut", 0)
00096     );
00097     if ( useEventServingParams && secondsToStale_ < boost::posix_time::seconds(1) )
00098       secondsToStale_ = eventServingParams.activeConsumerTimeout_;
00099   }
00100 
00101   edm::ParameterSet RegistrationInfoBase::getPSet() const
00102   {
00103     edm::ParameterSet pset;
00104 
00105     if ( consumerName_ != "Unknown" )
00106       pset.addUntrackedParameter<std::string>("consumerName", consumerName_);
00107 
00108     if ( sourceURL_  != "Unknown" )
00109       pset.addParameter<std::string>("sourceURL", sourceURL_);
00110 
00111     if ( maxConnectTries_ != 300 )
00112       pset.addUntrackedParameter<int>("maxConnectTries", maxConnectTries_);
00113     
00114     if ( connectTrySleepTime_ != 10 )
00115       pset.addUntrackedParameter<int>("connectTrySleepTime", connectTrySleepTime_);
00116 
00117     if ( retryInterval_ != 5 )
00118       pset.addUntrackedParameter<int>("retryInterval", retryInterval_);
00119     
00120     if ( queueSize_ > 0 )
00121       pset.addUntrackedParameter<int>("queueSize", queueSize_);
00122     
00123     if ( ! minEventRequestInterval_.is_not_a_date_time() )
00124     {
00125       const double rate = 1 / utils::durationToSeconds(minEventRequestInterval_);
00126       pset.addUntrackedParameter<double>("maxEventRequestRate", rate);
00127     }
00128 
00129     const double secondsToStale = utils::durationToSeconds(secondsToStale_);
00130     if ( secondsToStale > 0 )
00131       pset.addUntrackedParameter<double>("consumerTimeOut", secondsToStale);
00132 
00133     if ( queuePolicy_ == enquing_policy::DiscardNew )
00134       pset.addUntrackedParameter<std::string>("queuePolicy", "DiscardNew");
00135     if ( queuePolicy_ == enquing_policy::DiscardOld )
00136       pset.addUntrackedParameter<std::string>("queuePolicy", "DiscardOld");
00137 
00138     do_appendToPSet(pset);
00139 
00140     return pset;
00141   }
00142 
00143   bool RegistrationInfoBase::operator<(const RegistrationInfoBase& other) const
00144   {
00145     if ( queueSize() != other.queueSize() )
00146       return ( queueSize() < other.queueSize() );
00147     if ( queuePolicy() != other.queuePolicy() )
00148       return ( queuePolicy() < other.queuePolicy() );
00149     return ( secondsToStale() < other.secondsToStale() );
00150   }
00151 
00152   bool RegistrationInfoBase::operator==(const RegistrationInfoBase& other) const
00153   {
00154     return (
00155       queueSize() == other.queueSize() &&
00156       queuePolicy() == other.queuePolicy() &&
00157       secondsToStale() == other.secondsToStale()
00158     );
00159   }
00160 
00161   bool RegistrationInfoBase::operator!=(const RegistrationInfoBase& other) const
00162   {
00163     return ! ( *this == other );
00164   }
00165 
00166   void RegistrationInfoBase::queueInfo(std::ostream& os) const
00167   {
00168     os << "Queue type: " << queuePolicy_ <<
00169       ", size " << queueSize_ << 
00170       ", timeout " << secondsToStale_.total_seconds() << "s";
00171   }
00172 
00173   std::ostream& operator<< (std::ostream& os,
00174                             RegistrationInfoBase const& ri)
00175   {
00176     os << "\n Consumer name: " << ri.consumerName()
00177       << "\n Consumer id: " << ri.consumerId()
00178       << "\n Source URL: " << ri.sourceURL()
00179       << "\n Remote Host: " << ri.remoteHost()
00180       << "\n Queue id: " << ri.queueId()
00181       << "\n Maximum size of queue: " << ri.queueSize()
00182       << "\n Policy used if queue is full: " << ri.queuePolicy()
00183       << "\n Time until queue becomes stale (seconds): " << ri.secondsToStale().total_seconds();
00184     return os;
00185   }
00186 }
00187 
00188