CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
QueueCollection.h
Go to the documentation of this file.
1 // $Id: QueueCollection.h,v 1.11.2.1 2011/03/07 11:33:04 mommsen Exp $
3 
4 #ifndef EventFilter_StorageManager_QueueCollection_h
5 #define EventFilter_StorageManager_QueueCollection_h
6 
7 #include <vector>
8 #include <limits>
9 
10 #include "boost/bind.hpp"
11 #include "boost/thread/mutex.hpp"
12 #include "boost/shared_ptr.hpp"
13 
15 
26 
27 
28 namespace stor {
29 
43  template <class T>
45  {
46  public:
49 
54 
59  void setExpirationInterval(const QueueID&, const utils::Duration_t&);
61 
68  (
69  const EventConsRegPtr,
71  );
73  (
74  const RegPtr,
76  );
77 
82  void removeQueues();
83 
87  SizeType size() const;
88 
92  void addEvent(T const&);
93 
99  ValueType popEvent(const QueueID&);
100 
106  ValueType popEvent(const ConsumerID&);
107 
111  void clearQueue(const QueueID&);
112 
117 
121  void clearQueues();
122 
126  bool empty(const QueueID&) const;
127 
131  bool full(const QueueID&) const;
132 
137  bool stale(const QueueID&, const utils::TimePoint_t&) const;
138 
142  bool allQueuesStale(const utils::TimePoint_t&) const;
143 
147  SizeType size(const QueueID&) const;
148 
149 
150  private:
153 
154  typedef boost::shared_ptr<ExpirableDiscardNewQueue_t>
156  typedef boost::shared_ptr<ExpirableDiscardOldQueue_t>
158 
159  // These typedefs need to be changed when we move to Boost 1.38
160  typedef boost::mutex::scoped_lock ReadLock_t;
161  typedef boost::mutex::scoped_lock WriteLock_t;
163 
164  // It is possible that one mutex would be better than these
165  // three. Only profiling the application will tell for sure.
169 
170  typedef std::vector<ExpirableDiscardNewQueuePtr> DiscardNewQueues_t;
172  typedef std::vector<ExpirableDiscardOldQueuePtr> DiscardOldQueues_t;
174 
175  typedef std::map<ConsumerID, QueueID> IDLookup_t;
182 
183  /*
184  These functions are declared private and not implemented to
185  prevent their use.
186  */
189 
190  /*
191  These are helper functions used in the implementation.
192  */
193 
194  SizeType enqueueEvent_(QueueID const&, T const&, utils::TimePoint_t const&);
195  QueueID getQueue(const RegPtr, const utils::TimePoint_t&);
196 
197  };
198 
199  //------------------------------------------------------------------
200  // Implementation follows
201  //------------------------------------------------------------------
202 
209  namespace
210  {
211  void throwUnknownQueueid(const QueueID& id)
212  {
213  std::ostringstream msg;
214  msg << "Unable to retrieve queue with signature: ";
215  msg << id;
216  XCEPT_RAISE(exception::UnknownQueueId, msg.str());
217  }
218  } // anonymous namespace
219 
220  template <class T>
222  protectDiscardNewQueues_(),
223  protectDiscardOldQueues_(),
224  protectLookup_(),
225  discardNewQueues_(),
226  discardOldQueues_(),
227  queueIdLookup_(),
228  consumerMonitorCollection_( ccp )
229  { }
230 
231  template <class T>
232  void
234  const QueueID& id,
236  )
237  {
238  switch (id.policy())
239  {
241  {
242  ReadLock_t lock(protectDiscardNewQueues_);
243  try
244  {
245  discardNewQueues_.at(id.index())->setStalenessInterval(interval);
246  }
247  catch(std::out_of_range)
248  {
249  throwUnknownQueueid(id);
250  }
251  break;
252  }
254  {
255  ReadLock_t lock(protectDiscardOldQueues_);
256  try
257  {
258  discardOldQueues_.at(id.index())->setStalenessInterval(interval);
259  }
260  catch(std::out_of_range)
261  {
262  throwUnknownQueueid(id);
263  }
264  break;
265  }
266  default:
267  {
268  throwUnknownQueueid(id);
269  // does not return, no break needed
270  }
271  }
272  }
273 
274  template <class T>
277  {
279  switch (id.policy())
280  {
282  {
283  ReadLock_t lock(protectDiscardNewQueues_);
284  try
285  {
286  result = discardNewQueues_.at(id.index())->stalenessInterval();
287  }
288  catch(std::out_of_range)
289  {
290  throwUnknownQueueid(id);
291  }
292  break;
293  }
295  {
296  ReadLock_t lock(protectDiscardOldQueues_);
297  try
298  {
299  result = discardOldQueues_.at(id.index())->stalenessInterval();
300  }
301  catch(std::out_of_range)
302  {
303  throwUnknownQueueid(id);
304  }
305  break;
306  }
307  default:
308  {
309  throwUnknownQueueid(id);
310  // does not return, no break needed
311  }
312  }
313  return result;
314  }
315 
316  template <class T>
317  QueueID
319  (
320  const EventConsRegPtr reginfo,
321  const utils::TimePoint_t& now
322  )
323  {
324  QueueID qid;
325  const ConsumerID& cid = reginfo->consumerId();
326 
327  // We don't proceed if the given ConsumerID is invalid, or if
328  // we've already seen that value before.
329  if (!cid.isValid()) return qid;
330  WriteLock_t lockLookup(protectLookup_);
331  if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
332 
333  if ( reginfo->uniqueEvents() )
334  {
335  // another consumer wants to share the
336  // queue to get unique events.
337  ReginfoLookup_t::const_iterator it =
338  queueReginfoLookup_.find(reginfo);
339  if ( it != queueReginfoLookup_.end() )
340  {
341  qid = it->second;
342  queueIdLookup_[cid] = qid;
343  return qid;
344  }
345  }
346 
347  qid = getQueue(reginfo, now);
348  queueIdLookup_[cid] = qid;
349  queueReginfoLookup_[reginfo] = qid;
350  return qid;
351  }
352 
353  template <class T>
354  QueueID
356  (
357  const RegPtr reginfo,
358  const utils::TimePoint_t& now
359  )
360  {
361  QueueID qid;
362  const ConsumerID& cid = reginfo->consumerId();
363 
364  // We don't proceed if the given ConsumerID is invalid, or if
365  // we've already seen that value before.
366  if (!cid.isValid()) return qid;
367  WriteLock_t lockLookup(protectLookup_);
368  if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
369  qid = getQueue(reginfo, now);
370  queueIdLookup_[cid] = qid;
371  return qid;
372  }
373 
374  template <class T>
375  QueueID
377  (
378  const RegPtr reginfo,
379  const utils::TimePoint_t& now
380  )
381  {
382  if (reginfo->queuePolicy() == enquing_policy::DiscardNew)
383  {
384  WriteLock_t lock(protectDiscardNewQueues_);
387  reginfo->queueSize(),
388  reginfo->secondsToStale(),
389  now
390  )
391  );
392  discardNewQueues_.push_back(newborn);
393  return QueueID(
395  discardNewQueues_.size()-1
396  );
397  }
398  else if (reginfo->queuePolicy() == enquing_policy::DiscardOld)
399  {
400  WriteLock_t lock(protectDiscardOldQueues_);
403  reginfo->queueSize(),
404  reginfo->secondsToStale(),
405  now
406  )
407  );
408  discardOldQueues_.push_back(newborn);
409  return QueueID(
411  discardOldQueues_.size()-1
412  );
413  }
414  return QueueID();
415  }
416 
417  template <class T>
418  void
420  {
421  clearQueues();
422 
423  WriteLock_t lockDiscardNew(protectDiscardNewQueues_);
424  WriteLock_t lockDiscardOld(protectDiscardOldQueues_);
425  discardNewQueues_.clear();
426  discardOldQueues_.clear();
427 
428  WriteLock_t lockLookup(protectLookup_);
429  queueIdLookup_.clear();
430  }
431 
432  template <class T>
435  {
436  // We obtain locks not because it is unsafe to read the sizes
437  // without locking, but because we want consistent values.
438  ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
439  ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
440  return discardNewQueues_.size() + discardOldQueues_.size();
441  }
442 
443  template <class T>
444  void
446  {
447  ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
448  ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
449 
451  QueueIDs routes = event.getEventConsumerTags();
452 
453  for( QueueIDs::const_iterator it = routes.begin(), itEnd = routes.end();
454  it != itEnd; ++it )
455  {
456  const SizeType droppedEvents = enqueueEvent_( *it, event, now );
457  if ( droppedEvents > 0 )
458  consumerMonitorCollection_.addDroppedEvents( *it, droppedEvents );
459  else
460  consumerMonitorCollection_.addQueuedEventSample( *it, event.totalDataSize() );
461  }
462  }
463 
464  template <class T>
467  {
469  switch (id.policy())
470  {
472  {
473  ReadLock_t lock(protectDiscardNewQueues_);
474  try
475  {
476  if ( discardNewQueues_.at(id.index())->deqNowait(result) )
477  consumerMonitorCollection_.addServedEventSample(id,
478  result.first.totalDataSize());
479  }
480  catch(std::out_of_range)
481  {
482  throwUnknownQueueid(id);
483  }
484  break;
485  }
487  {
488  ReadLock_t lock(protectDiscardOldQueues_);
489  try
490  {
491  if ( discardOldQueues_.at(id.index())->deqNowait(result) )
492  consumerMonitorCollection_.addServedEventSample(id,
493  result.first.totalDataSize());
494  }
495  catch(std::out_of_range)
496  {
497  throwUnknownQueueid(id);
498  }
499  break;
500  }
501  default:
502  {
503  throwUnknownQueueid(id);
504  // does not return, no break needed
505  }
506  }
507  return result;
508  }
509 
510  template <class T>
513  {
515  if (!cid.isValid()) return result;
516  QueueID id;
517  {
518  // Scope to control lifetime of lock.
519  ReadLock_t lock(protectLookup_);
520  IDLookup_t::const_iterator i = queueIdLookup_.find(cid);
521  if (i == queueIdLookup_.end()) return result;
522  id = i->second;
523  }
524  return popEvent(id);
525  }
526 
527  template <class T>
528  void
530  {
531  switch (id.policy())
532  {
534  {
535  ReadLock_t lock(protectDiscardNewQueues_);
536  try
537  {
538  const SizeType clearedEvents =
539  discardNewQueues_.at(id.index())->clear();
540 
541  consumerMonitorCollection_.addDroppedEvents(
542  id, clearedEvents);
543  }
544  catch(std::out_of_range)
545  {
546  throwUnknownQueueid(id);
547  }
548  break;
549  }
551  {
552  ReadLock_t lock(protectDiscardOldQueues_);
553  try
554  {
555  const SizeType clearedEvents =
556  discardOldQueues_.at(id.index())->clear();
557 
558  consumerMonitorCollection_.addDroppedEvents(
559  id, clearedEvents);
560  }
561  catch(std::out_of_range)
562  {
563  throwUnknownQueueid(id);
564  }
565  break;
566  }
567  default:
568  {
569  throwUnknownQueueid(id);
570  // does not return, no break needed
571  }
572  }
573  }
574 
575  template <class T>
576  bool
578  {
579  bool result(false);
580  SizeType clearedEvents;
581 
582  {
583  ReadLock_t lock(protectDiscardNewQueues_);
584  const SizeType numQueues = discardNewQueues_.size();
585  for (SizeType i = 0; i < numQueues; ++i)
586  {
587  if ( discardNewQueues_[i]->clearIfStale(now, clearedEvents) )
588  {
589  consumerMonitorCollection_.addDroppedEvents(
591  clearedEvents
592  );
593  result = true;
594  }
595  }
596  }
597  {
598  ReadLock_t lock(protectDiscardOldQueues_);
599  const SizeType numQueues = discardOldQueues_.size();
600  for (SizeType i = 0; i < numQueues; ++i)
601  {
602  if ( discardOldQueues_[i]->clearIfStale(now, clearedEvents) )
603  {
604  consumerMonitorCollection_.addDroppedEvents(
606  clearedEvents
607  );
608  result = true;
609  }
610  }
611  }
612  return result;
613  }
614 
615  template <class T>
616  void
618  {
619  {
620  ReadLock_t lock(protectDiscardNewQueues_);
621  const SizeType numQueues = discardNewQueues_.size();
622  for (SizeType i = 0; i < numQueues; ++i)
623  {
624  const SizeType clearedEvents =
625  discardNewQueues_[i]->clear();
626 
627  consumerMonitorCollection_.addDroppedEvents(
629  clearedEvents
630  );
631  }
632  }
633  {
634  ReadLock_t lock(protectDiscardOldQueues_);
635  const SizeType numQueues = discardOldQueues_.size();
636  for (SizeType i = 0; i < numQueues; ++i)
637  {
638  const SizeType clearedEvents =
639  discardOldQueues_[i]->clear();
640 
641  consumerMonitorCollection_.addDroppedEvents(
643  clearedEvents
644  );
645  }
646  }
647  }
648 
649  template <class T>
650  bool
652  {
653  bool result(true);
654  switch (id.policy())
655  {
657  {
658  ReadLock_t lock(protectDiscardNewQueues_);
659  try
660  {
661  result = discardNewQueues_.at(id.index())->empty();
662  }
663  catch(std::out_of_range)
664  {
665  throwUnknownQueueid(id);
666  }
667  break;
668  }
670  {
671  ReadLock_t lock(protectDiscardOldQueues_);
672  try
673  {
674  result = discardOldQueues_.at(id.index())->empty();
675  }
676  catch(std::out_of_range)
677  {
678  throwUnknownQueueid(id);
679  }
680  break;
681  }
682  default:
683  {
684  throwUnknownQueueid(id);
685  // does not return, no break needed
686  }
687  }
688  return result;
689  }
690 
691  template <class T>
692  bool
694  {
695  bool result(true);
696  switch (id.policy())
697  {
699  {
700  ReadLock_t lock(protectDiscardNewQueues_);
701  try
702  {
703  result = discardNewQueues_.at(id.index())->full();
704  }
705  catch(std::out_of_range)
706  {
707  throwUnknownQueueid(id);
708  }
709  break;
710  }
712  {
713  ReadLock_t lock(protectDiscardOldQueues_);
714  try
715  {
716  result = discardOldQueues_.at(id.index())->full();
717  }
718  catch(std::out_of_range)
719  {
720  throwUnknownQueueid(id);
721  }
722  break;
723  }
724  default:
725  {
726  throwUnknownQueueid(id);
727  // does not return, no break needed
728  }
729  }
730  return result;
731  }
732 
733  template <class T>
734  bool
736  {
737  bool result(true);
738  switch (id.policy())
739  {
741  {
742  ReadLock_t lock(protectDiscardNewQueues_);
743  try
744  {
745  result = discardNewQueues_.at(id.index())->stale(now);
746  }
747  catch(std::out_of_range)
748  {
749  throwUnknownQueueid(id);
750  }
751  break;
752  }
754  {
755  ReadLock_t lock(protectDiscardOldQueues_);
756  try
757  {
758  result = discardOldQueues_.at(id.index())->stale(now);
759  }
760  catch(std::out_of_range)
761  {
762  throwUnknownQueueid(id);
763  }
764  break;
765  }
766  default:
767  {
768  throwUnknownQueueid(id);
769  // does not return, no break needed
770  }
771  }
772  return result;
773  }
774 
775  template <class T>
776  bool
778  {
779  {
780  ReadLock_t lock(protectDiscardNewQueues_);
781  const SizeType numQueues = discardNewQueues_.size();
782  for (SizeType i = 0; i < numQueues; ++i)
783  {
784  if ( ! discardNewQueues_[i]->stale(now) ) return false;
785  }
786  }
787  {
788  ReadLock_t lock(protectDiscardOldQueues_);
789  const SizeType numQueues = discardOldQueues_.size();
790  for (SizeType i = 0; i < numQueues; ++i)
791  {
792  if ( ! discardOldQueues_[i]->stale(now) ) return false;
793  }
794  }
795  return true;
796  }
797 
798  template <class T>
801  {
802  SizeType result = 0;
803  switch (id.policy())
804  {
806  {
807  ReadLock_t lock(protectDiscardNewQueues_);
808  try
809  {
810  result = discardNewQueues_.at(id.index())->size();
811  }
812  catch(std::out_of_range)
813  {
814  throwUnknownQueueid(id);
815  }
816  break;
817  }
819  {
820  ReadLock_t lock(protectDiscardOldQueues_);
821  try
822  {
823  result = discardOldQueues_.at(id.index())->size();
824  }
825  catch(std::out_of_range)
826  {
827  throwUnknownQueueid(id);
828  }
829  break;
830  }
831  default:
832  {
833  throwUnknownQueueid( id );
834  // does not return, no break needed
835  }
836  }
837  return result;
838  }
839 
840  template <class T>
843  (
844  QueueID const& id,
845  T const& event,
846  utils::TimePoint_t const& now
847  )
848  {
849  switch (id.policy())
850  {
852  {
853  try
854  {
855  return discardNewQueues_.at(id.index())->enqNowait(event,now);
856  }
857  catch(std::out_of_range)
858  {
859  throwUnknownQueueid(id);
860  }
861  break;
862  }
864  {
865  try
866  {
867  return discardOldQueues_.at(id.index())->enqNowait(event,now);
868  }
869  catch(std::out_of_range)
870  {
871  throwUnknownQueueid(id);
872  }
873  break;
874  }
875  default:
876  {
877  throwUnknownQueueid(id);
878  // does not return, no break needed
879  }
880  }
881  return 1; // event could not be entered
882  }
883 
884 } // namespace stor
885 
886 #endif // EventFilter_StorageManager_QueueCollection_h
887 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
int i
Definition: DBlmapReader.cc:9
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
utils::Duration_t getExpirationInterval(const QueueID &id) const
ReadWriteMutex_t protectDiscardOldQueues_
void clearQueue(const QueueID &)
boost::shared_ptr< RegistrationInfoBase > RegPtr
double seconds()
bool full(const QueueID &) const
ReadWriteMutex_t protectLookup_
tuple interval
Definition: MergeJob_cfg.py:20
static boost::mutex mutex
Definition: LHEProxy.cc:11
std::vector< QueueID > QueueIDs
Definition: QueueID.h:80
SizeType enqueueEvent_(QueueID const &, T const &, utils::TimePoint_t const &)
std::vector< ExpirableDiscardNewQueuePtr > DiscardNewQueues_t
void setExpirationInterval(const QueueID &, const utils::Duration_t &)
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
void addEvent(T const &)
std::map< EventConsRegPtr, QueueID, utils::ptrComp< EventConsumerRegistrationInfo > > ReginfoLookup_t
dictionary map
Definition: Association.py:160
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
ExpirableQueue< T, RejectNewest< T > > ExpirableDiscardNewQueue_t
QueueCollection(ConsumerMonitorCollection &)
tuple result
Definition: query.py:137
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
ConsumerMonitorCollection & consumerMonitorCollection_
bool stale(const QueueID &, const utils::TimePoint_t &) const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
boost::mutex::scoped_lock WriteLock_t
boost::mutex ReadWriteMutex_t
ValueType popEvent(const QueueID &)
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::shared_ptr< ExpirableDiscardOldQueue_t > ExpirableDiscardOldQueuePtr
bool clearStaleQueues(const utils::TimePoint_t &)
ExpirableQueue< T, KeepNewest< T > > ExpirableDiscardOldQueue_t
bool empty(const QueueID &) const
QueueID createQueue(const EventConsRegPtr, const utils::TimePoint_t &now=utils::getCurrentTime())
QueueCollection & operator=(QueueCollection const &)
boost::mutex::scoped_lock ReadLock_t
ExpirableQueue< T, RejectNewest< T > >::ValueType ValueType
ReginfoLookup_t queueReginfoLookup_
SizeType size() const
std::map< ConsumerID, QueueID > IDLookup_t
QueueID getQueue(const RegPtr, const utils::TimePoint_t &)
std::vector< ExpirableDiscardOldQueuePtr > DiscardOldQueues_t
bool allQueuesStale(const utils::TimePoint_t &) const
boost::shared_ptr< ExpirableDiscardNewQueue_t > ExpirableDiscardNewQueuePtr
bool isValid() const
Definition: ConsumerID.h:34