CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RegistrationInfoBase.cc
Go to the documentation of this file.
1 // $Id: RegistrationInfoBase.cc,v 1.3.4.2 2011/03/07 11:33:05 mommsen Exp $
3 
7 
8 using std::string;
9 
10 namespace stor
11 {
13  (
14  const std::string& consumerName,
15  const std::string& remoteHost,
16  const int& queueSize,
17  const enquing_policy::PolicyTag& queuePolicy,
18  const utils::Duration_t& secondsToStale
19  ) :
20  remoteHost_(remoteHost),
21  consumerName_(consumerName),
22  queueSize_(queueSize),
23  queuePolicy_(queuePolicy),
24  secondsToStale_(secondsToStale),
25  consumerId_(0),
26  lastConsumerContact_(utils::getCurrentTime())
27  { }
28 
30  (
31  const edm::ParameterSet& pset,
32  const std::string& remoteHost,
33  const EventServingParams& eventServingParams,
34  const bool useEventServingParams
35  ) :
36  remoteHost_(remoteHost),
37  consumerId_(0),
38  lastConsumerContact_(utils::getCurrentTime())
39  {
40  try
41  {
42  consumerName_ = pset.getUntrackedParameter<std::string>("consumerName");
43  }
44  catch( edm::Exception& e )
45  {
46  consumerName_ = pset.getUntrackedParameter<std::string>("DQMconsumerName", "Unknown");
47  }
48 
49  try
50  {
51  sourceURL_ = pset.getParameter<std::string>("sourceURL");
52  }
53  catch( edm::Exception& e )
54  {
55  sourceURL_ = pset.getUntrackedParameter<std::string>("sourceURL", "Unknown");
56  }
57 
58  const double maxEventRequestRate = pset.getUntrackedParameter<double>("maxEventRequestRate", 0);
59  if ( maxEventRequestRate > 0 )
60  minEventRequestInterval_ = utils::secondsToDuration(1 / maxEventRequestRate);
61  else
62  minEventRequestInterval_ = boost::posix_time::not_a_date_time;
63 
64  maxConnectTries_ = pset.getUntrackedParameter<int>("maxConnectTries", 300);
65 
66  connectTrySleepTime_ = pset.getUntrackedParameter<int>("connectTrySleepTime", 10);
67 
68  retryInterval_ = pset.getUntrackedParameter<int>("retryInterval", 5);
69 
70  queueSize_ = pset.getUntrackedParameter<int>("queueSize",
71  useEventServingParams ? eventServingParams.consumerQueueSize_ : 0);
72 
73  const std::string policy =
74  pset.getUntrackedParameter<std::string>("queuePolicy",
75  useEventServingParams ? eventServingParams.consumerQueuePolicy_ : "Default");
76  if ( policy == "DiscardNew" )
77  {
78  queuePolicy_ = enquing_policy::DiscardNew;
79  }
80  else if ( policy == "DiscardOld" )
81  {
82  queuePolicy_ = enquing_policy::DiscardOld;
83  }
84  else if ( policy == "Default" )
85  {
86  queuePolicy_ = enquing_policy::Max;
87  }
88  else
89  {
90  XCEPT_RAISE( stor::exception::ConsumerRegistration,
91  "Unknown enqueuing policy: " + policy );
92  }
93 
94  secondsToStale_ = utils::secondsToDuration(
95  pset.getUntrackedParameter<double>("consumerTimeOut", 0)
96  );
97  if ( useEventServingParams && secondsToStale_ < boost::posix_time::seconds(1) )
98  secondsToStale_ = eventServingParams.activeConsumerTimeout_;
99  }
100 
102  {
104 
105  if ( consumerName_ != "Unknown" )
106  pset.addUntrackedParameter<std::string>("consumerName", consumerName_);
107 
108  if ( sourceURL_ != "Unknown" )
109  pset.addParameter<std::string>("sourceURL", sourceURL_);
110 
111  if ( maxConnectTries_ != 300 )
112  pset.addUntrackedParameter<int>("maxConnectTries", maxConnectTries_);
113 
114  if ( connectTrySleepTime_ != 10 )
115  pset.addUntrackedParameter<int>("connectTrySleepTime", connectTrySleepTime_);
116 
117  if ( retryInterval_ != 5 )
118  pset.addUntrackedParameter<int>("retryInterval", retryInterval_);
119 
120  if ( queueSize_ > 0 )
121  pset.addUntrackedParameter<int>("queueSize", queueSize_);
122 
123  if ( ! minEventRequestInterval_.is_not_a_date_time() )
124  {
125  const double rate = 1 / utils::durationToSeconds(minEventRequestInterval_);
126  pset.addUntrackedParameter<double>("maxEventRequestRate", rate);
127  }
128 
129  const double secondsToStale = utils::durationToSeconds(secondsToStale_);
130  if ( secondsToStale > 0 )
131  pset.addUntrackedParameter<double>("consumerTimeOut", secondsToStale);
132 
134  pset.addUntrackedParameter<std::string>("queuePolicy", "DiscardNew");
136  pset.addUntrackedParameter<std::string>("queuePolicy", "DiscardOld");
137 
138  do_appendToPSet(pset);
139 
140  return pset;
141  }
142 
144  {
145  if ( queueSize() != other.queueSize() )
146  return ( queueSize() < other.queueSize() );
147  if ( queuePolicy() != other.queuePolicy() )
148  return ( queuePolicy() < other.queuePolicy() );
149  return ( secondsToStale() < other.secondsToStale() );
150  }
151 
153  {
154  return (
155  queueSize() == other.queueSize() &&
156  queuePolicy() == other.queuePolicy() &&
157  secondsToStale() == other.secondsToStale()
158  );
159  }
160 
162  {
163  return ! ( *this == other );
164  }
165 
166  void RegistrationInfoBase::queueInfo(std::ostream& os) const
167  {
168  os << "Queue type: " << queuePolicy_ <<
169  ", size " << queueSize_ <<
170  ", timeout " << secondsToStale_.total_seconds() << "s";
171  }
172 
173  std::ostream& operator<< (std::ostream& os,
174  RegistrationInfoBase const& ri)
175  {
176  os << "\n Consumer name: " << ri.consumerName()
177  << "\n Consumer id: " << ri.consumerId()
178  << "\n Source URL: " << ri.sourceURL()
179  << "\n Remote Host: " << ri.remoteHost()
180  << "\n Queue id: " << ri.queueId()
181  << "\n Maximum size of queue: " << ri.queueSize()
182  << "\n Policy used if queue is full: " << ri.queuePolicy()
183  << "\n Time until queue becomes stale (seconds): " << ri.secondsToStale().total_seconds();
184  return os;
185  }
186 }
187 
188 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
const QueueID & queueId() const
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
double seconds()
const std::string & consumerName() const
Duration_t secondsToDuration(double const &seconds)
Definition: Utils.h:140
utils::Duration_t minEventRequestInterval_
virtual void do_appendToPSet(edm::ParameterSet &) const =0
virtual bool operator!=(const RegistrationInfoBase &) const
edm::ParameterSet getPSet() const
const std::string & sourceURL() const
void queueInfo(std::ostream &) const
std::string consumerQueuePolicy_
Definition: Configuration.h:77
virtual bool operator==(const RegistrationInfoBase &) const
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
RegistrationInfoBase(const std::string &consumerName, const std::string &remoteHost, const int &queueSize, const enquing_policy::PolicyTag &queuePolicy, const utils::Duration_t &secondsToStale)
std::ostream & operator<<(std::ostream &os, ConsumerID id)
Definition: ConsumerID.h:69
utils::Duration_t activeConsumerTimeout_
Definition: Configuration.h:75
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:139
const int & queueSize() const
void addUntrackedParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:203
const std::string & remoteHost() const
const enquing_policy::PolicyTag & queuePolicy() const
enquing_policy::PolicyTag queuePolicy_
const utils::Duration_t & secondsToStale() const
double durationToSeconds(Duration_t const &)
Definition: Utils.h:147
virtual bool operator<(const RegistrationInfoBase &) const
const ConsumerID & consumerId() const