CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
stor::QueueCollection< T > Class Template Reference

#include <QueueCollection.h>

Public Types

typedef ExpirableQueue< T,
RejectNewest< T > >::SizeType 
SizeType
 
typedef ExpirableQueue< T,
RejectNewest< T > >::ValueType 
ValueType
 

Public Member Functions

void addEvent (T const &)
 
bool allQueuesStale (const utils::TimePoint_t &) const
 
void clearQueue (const QueueID &)
 
void clearQueues ()
 
bool clearStaleQueues (const utils::TimePoint_t &)
 
QueueID createQueue (const EventConsRegPtr, const utils::TimePoint_t &now=utils::getCurrentTime())
 
QueueID createQueue (const RegPtr, const utils::TimePoint_t &now=utils::getCurrentTime())
 
bool empty (const QueueID &) const
 
bool full (const QueueID &) const
 
utils::Duration_t getExpirationInterval (const QueueID &id) const
 
ValueType popEvent (const QueueID &)
 
ValueType popEvent (const ConsumerID &)
 
 QueueCollection (ConsumerMonitorCollection &)
 
void removeQueues ()
 
void setExpirationInterval (const QueueID &, const utils::Duration_t &)
 
SizeType size () const
 
SizeType size (const QueueID &) const
 
bool stale (const QueueID &, const utils::TimePoint_t &) const
 

Private Types

typedef std::vector
< ExpirableDiscardNewQueuePtr
DiscardNewQueues_t
 
typedef std::vector
< ExpirableDiscardOldQueuePtr
DiscardOldQueues_t
 
typedef ExpirableQueue< T,
RejectNewest< T > > 
ExpirableDiscardNewQueue_t
 
typedef boost::shared_ptr
< ExpirableDiscardNewQueue_t
ExpirableDiscardNewQueuePtr
 
typedef ExpirableQueue< T,
KeepNewest< T > > 
ExpirableDiscardOldQueue_t
 
typedef boost::shared_ptr
< ExpirableDiscardOldQueue_t
ExpirableDiscardOldQueuePtr
 
typedef std::map< ConsumerID,
QueueID
IDLookup_t
 
typedef boost::mutex::scoped_lock ReadLock_t
 
typedef boost::mutex ReadWriteMutex_t
 
typedef std::map
< EventConsRegPtr, QueueID,
utils::ptrComp
< EventConsumerRegistrationInfo > > 
ReginfoLookup_t
 
typedef boost::mutex::scoped_lock WriteLock_t
 

Private Member Functions

SizeType enqueueEvent_ (QueueID const &, T const &, utils::TimePoint_t const &)
 
QueueID getQueue (const RegPtr, const utils::TimePoint_t &)
 
QueueCollectionoperator= (QueueCollection const &)
 
 QueueCollection (QueueCollection const &)
 

Private Attributes

ConsumerMonitorCollectionconsumerMonitorCollection_
 
DiscardNewQueues_t discardNewQueues_
 
DiscardOldQueues_t discardOldQueues_
 
ReadWriteMutex_t protectDiscardNewQueues_
 
ReadWriteMutex_t protectDiscardOldQueues_
 
ReadWriteMutex_t protectLookup_
 
IDLookup_t queueIdLookup_
 
ReginfoLookup_t queueReginfoLookup_
 

Detailed Description

template<class T>
class stor::QueueCollection< T >

Class template QueueCollection provides a collection of ConcurrentQueue<T>.

The class T must implement a method getEventConsumerTags() const, returning a std::vector<QueueID> which gives the list of QueueIDs of queues the class should be added.

Author:
mommsen
Revision:
1.12
Date:
2011/03/07 15:31:32

Definition at line 44 of file QueueCollection.h.

Member Typedef Documentation

template<class T>
typedef std::vector<ExpirableDiscardNewQueuePtr> stor::QueueCollection< T >::DiscardNewQueues_t
private

Definition at line 170 of file QueueCollection.h.

template<class T>
typedef std::vector<ExpirableDiscardOldQueuePtr> stor::QueueCollection< T >::DiscardOldQueues_t
private

Definition at line 172 of file QueueCollection.h.

Definition at line 151 of file QueueCollection.h.

template<class T>
typedef boost::shared_ptr<ExpirableDiscardNewQueue_t> stor::QueueCollection< T >::ExpirableDiscardNewQueuePtr
private

Definition at line 155 of file QueueCollection.h.

template<class T>
typedef ExpirableQueue<T, KeepNewest<T> > stor::QueueCollection< T >::ExpirableDiscardOldQueue_t
private

Definition at line 152 of file QueueCollection.h.

template<class T>
typedef boost::shared_ptr<ExpirableDiscardOldQueue_t> stor::QueueCollection< T >::ExpirableDiscardOldQueuePtr
private

Definition at line 157 of file QueueCollection.h.

template<class T>
typedef std::map<ConsumerID, QueueID> stor::QueueCollection< T >::IDLookup_t
private

Definition at line 175 of file QueueCollection.h.

template<class T>
typedef boost::mutex::scoped_lock stor::QueueCollection< T >::ReadLock_t
private

Definition at line 160 of file QueueCollection.h.

template<class T>
typedef boost::mutex stor::QueueCollection< T >::ReadWriteMutex_t
private

Definition at line 162 of file QueueCollection.h.

Definition at line 179 of file QueueCollection.h.

template<class T>
typedef ExpirableQueue<T, RejectNewest<T> >::SizeType stor::QueueCollection< T >::SizeType

Definition at line 47 of file QueueCollection.h.

Definition at line 48 of file QueueCollection.h.

template<class T>
typedef boost::mutex::scoped_lock stor::QueueCollection< T >::WriteLock_t
private

Definition at line 161 of file QueueCollection.h.

Constructor & Destructor Documentation

A default-constructed QueueCollection contains no queues

Definition at line 221 of file QueueCollection.h.

221  :
224  protectLookup_(),
227  queueIdLookup_(),
229  { }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
ReadWriteMutex_t protectLookup_
ConsumerMonitorCollection & consumerMonitorCollection_
ReadWriteMutex_t protectDiscardNewQueues_
template<class T>
stor::QueueCollection< T >::QueueCollection ( QueueCollection< T > const &  )
private

Member Function Documentation

template<class T >
void stor::QueueCollection< T >::addEvent ( T const &  event)

Add an event to all queues matching the specifications.

Definition at line 445 of file QueueCollection.h.

References stor::utils::getCurrentTime().

446  {
447  ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
448  ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
449 
451  QueueIDs routes = event.getEventConsumerTags();
452 
453  for( QueueIDs::const_iterator it = routes.begin(), itEnd = routes.end();
454  it != itEnd; ++it )
455  {
456  const SizeType droppedEvents = enqueueEvent_( *it, event, now );
457  if ( droppedEvents > 0 )
458  consumerMonitorCollection_.addDroppedEvents( *it, droppedEvents );
459  else
460  consumerMonitorCollection_.addQueuedEventSample( *it, event.totalDataSize() );
461  }
462  }
TimePoint_t getCurrentTime()
Definition: Utils.h:158
void addQueuedEventSample(const QueueID &, const unsigned int &data_size)
ReadWriteMutex_t protectDiscardOldQueues_
std::vector< QueueID > QueueIDs
Definition: QueueID.h:80
SizeType enqueueEvent_(QueueID const &, T const &, utils::TimePoint_t const &)
void addDroppedEvents(const QueueID &, const size_t &count)
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
ConsumerMonitorCollection & consumerMonitorCollection_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
bool stor::QueueCollection< T >::allQueuesStale ( const utils::TimePoint_t now) const

Returns true if all queues are stale at the given time.

Definition at line 777 of file QueueCollection.h.

References i, and CommonMethods::lock().

778  {
779  {
781  const SizeType numQueues = discardNewQueues_.size();
782  for (SizeType i = 0; i < numQueues; ++i)
783  {
784  if ( ! discardNewQueues_[i]->stale(now) ) return false;
785  }
786  }
787  {
789  const SizeType numQueues = discardOldQueues_.size();
790  for (SizeType i = 0; i < numQueues; ++i)
791  {
792  if ( ! discardOldQueues_[i]->stale(now) ) return false;
793  }
794  }
795  return true;
796  }
int i
Definition: DBlmapReader.cc:9
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
bool stale(const QueueID &, const utils::TimePoint_t &) const
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
void stor::QueueCollection< T >::clearQueue ( const QueueID id)

Clear the queue with the given QueueID.

Definition at line 529 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), and evf::utils::policy.

530  {
531  switch (id.policy())
532  {
534  {
536  try
537  {
538  const SizeType clearedEvents =
539  discardNewQueues_.at(id.index())->clear();
540 
542  id, clearedEvents);
543  }
544  catch(std::out_of_range)
545  {
546  throwUnknownQueueid(id);
547  }
548  break;
549  }
551  {
553  try
554  {
555  const SizeType clearedEvents =
556  discardOldQueues_.at(id.index())->clear();
557 
559  id, clearedEvents);
560  }
561  catch(std::out_of_range)
562  {
563  throwUnknownQueueid(id);
564  }
565  break;
566  }
567  default:
568  {
569  throwUnknownQueueid(id);
570  // does not return, no break needed
571  }
572  }
573  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
void addDroppedEvents(const QueueID &, const size_t &count)
ConsumerMonitorCollection & consumerMonitorCollection_
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
void stor::QueueCollection< T >::clearQueues ( )

Clear all the contained queues.

Definition at line 617 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, i, and CommonMethods::lock().

618  {
619  {
621  const SizeType numQueues = discardNewQueues_.size();
622  for (SizeType i = 0; i < numQueues; ++i)
623  {
624  const SizeType clearedEvents =
625  discardNewQueues_[i]->clear();
626 
628  QueueID(enquing_policy::DiscardNew, i),
629  clearedEvents
630  );
631  }
632  }
633  {
635  const SizeType numQueues = discardOldQueues_.size();
636  for (SizeType i = 0; i < numQueues; ++i)
637  {
638  const SizeType clearedEvents =
639  discardOldQueues_[i]->clear();
640 
642  QueueID(enquing_policy::DiscardOld, i),
643  clearedEvents
644  );
645  }
646  }
647  }
int i
Definition: DBlmapReader.cc:9
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
void addDroppedEvents(const QueueID &, const size_t &count)
ConsumerMonitorCollection & consumerMonitorCollection_
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
bool stor::QueueCollection< T >::clearStaleQueues ( const utils::TimePoint_t now)

Clear all queues which are stale at the specified point in time.

Definition at line 577 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, i, CommonMethods::lock(), and query::result.

578  {
579  bool result(false);
580  SizeType clearedEvents;
581 
582  {
584  const SizeType numQueues = discardNewQueues_.size();
585  for (SizeType i = 0; i < numQueues; ++i)
586  {
587  if ( discardNewQueues_[i]->clearIfStale(now, clearedEvents) )
588  {
590  QueueID(enquing_policy::DiscardNew, i),
591  clearedEvents
592  );
593  result = true;
594  }
595  }
596  }
597  {
599  const SizeType numQueues = discardOldQueues_.size();
600  for (SizeType i = 0; i < numQueues; ++i)
601  {
602  if ( discardOldQueues_[i]->clearIfStale(now, clearedEvents) )
603  {
605  QueueID(enquing_policy::DiscardOld, i),
606  clearedEvents
607  );
608  result = true;
609  }
610  }
611  }
612  return result;
613  }
int i
Definition: DBlmapReader.cc:9
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
void addDroppedEvents(const QueueID &, const size_t &count)
tuple result
Definition: query.py:137
ConsumerMonitorCollection & consumerMonitorCollection_
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
QueueID stor::QueueCollection< T >::createQueue ( const EventConsRegPtr  reginfo,
const utils::TimePoint_t now = utils::getCurrentTime() 
)

Create a new contained queue, with the given policy and given maximum size. It returns a unique identifier to later identify requests originating from this consumer.

Definition at line 319 of file QueueCollection.h.

References stor::ConsumerID::isValid().

323  {
324  QueueID qid;
325  const ConsumerID& cid = reginfo->consumerId();
326 
327  // We don't proceed if the given ConsumerID is invalid, or if
328  // we've already seen that value before.
329  if (!cid.isValid()) return qid;
330  WriteLock_t lockLookup(protectLookup_);
331  if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
332 
333  if ( reginfo->uniqueEvents() )
334  {
335  // another consumer wants to share the
336  // queue to get unique events.
337  ReginfoLookup_t::const_iterator it =
338  queueReginfoLookup_.find(reginfo);
339  if ( it != queueReginfoLookup_.end() )
340  {
341  qid = it->second;
342  queueIdLookup_[cid] = qid;
343  return qid;
344  }
345  }
346 
347  qid = getQueue(reginfo, now);
348  queueIdLookup_[cid] = qid;
349  queueReginfoLookup_[reginfo] = qid;
350  return qid;
351  }
ReadWriteMutex_t protectLookup_
boost::mutex::scoped_lock WriteLock_t
ReginfoLookup_t queueReginfoLookup_
QueueID getQueue(const RegPtr, const utils::TimePoint_t &)
template<class T >
QueueID stor::QueueCollection< T >::createQueue ( const RegPtr  reginfo,
const utils::TimePoint_t now = utils::getCurrentTime() 
)

Definition at line 356 of file QueueCollection.h.

References stor::ConsumerID::isValid().

360  {
361  QueueID qid;
362  const ConsumerID& cid = reginfo->consumerId();
363 
364  // We don't proceed if the given ConsumerID is invalid, or if
365  // we've already seen that value before.
366  if (!cid.isValid()) return qid;
367  WriteLock_t lockLookup(protectLookup_);
368  if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
369  qid = getQueue(reginfo, now);
370  queueIdLookup_[cid] = qid;
371  return qid;
372  }
ReadWriteMutex_t protectLookup_
boost::mutex::scoped_lock WriteLock_t
QueueID getQueue(const RegPtr, const utils::TimePoint_t &)
template<class T >
bool stor::QueueCollection< T >::empty ( const QueueID id) const

Test to see if the queue with the given QueueID is empty.

Definition at line 651 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.

Referenced by Vispa.Gui.VispaWidget.TextField::setAutosizeFont(), and Vispa.Gui.VispaWidget.TextField::setAutotruncate().

652  {
653  bool result(true);
654  switch (id.policy())
655  {
657  {
659  try
660  {
661  result = discardNewQueues_.at(id.index())->empty();
662  }
663  catch(std::out_of_range)
664  {
665  throwUnknownQueueid(id);
666  }
667  break;
668  }
670  {
672  try
673  {
674  result = discardOldQueues_.at(id.index())->empty();
675  }
676  catch(std::out_of_range)
677  {
678  throwUnknownQueueid(id);
679  }
680  break;
681  }
682  default:
683  {
684  throwUnknownQueueid(id);
685  // does not return, no break needed
686  }
687  }
688  return result;
689  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
tuple result
Definition: query.py:137
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
QueueCollection< T >::SizeType stor::QueueCollection< T >::enqueueEvent_ ( QueueID const &  id,
T const &  event,
utils::TimePoint_t const &  now 
)
private

Definition at line 843 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, and evf::utils::policy.

848  {
849  switch (id.policy())
850  {
852  {
853  try
854  {
855  return discardNewQueues_.at(id.index())->enqNowait(event,now);
856  }
857  catch(std::out_of_range)
858  {
859  throwUnknownQueueid(id);
860  }
861  break;
862  }
864  {
865  try
866  {
867  return discardOldQueues_.at(id.index())->enqNowait(event,now);
868  }
869  catch(std::out_of_range)
870  {
871  throwUnknownQueueid(id);
872  }
873  break;
874  }
875  default:
876  {
877  throwUnknownQueueid(id);
878  // does not return, no break needed
879  }
880  }
881  return 1; // event could not be entered
882  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
template<class T >
bool stor::QueueCollection< T >::full ( const QueueID id) const

Test to see if the queue with the given QueueID is full.

Definition at line 693 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.

694  {
695  bool result(true);
696  switch (id.policy())
697  {
699  {
701  try
702  {
703  result = discardNewQueues_.at(id.index())->full();
704  }
705  catch(std::out_of_range)
706  {
707  throwUnknownQueueid(id);
708  }
709  break;
710  }
712  {
714  try
715  {
716  result = discardOldQueues_.at(id.index())->full();
717  }
718  catch(std::out_of_range)
719  {
720  throwUnknownQueueid(id);
721  }
722  break;
723  }
724  default:
725  {
726  throwUnknownQueueid(id);
727  // does not return, no break needed
728  }
729  }
730  return result;
731  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
tuple result
Definition: query.py:137
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
utils::Duration_t stor::QueueCollection< T >::getExpirationInterval ( const QueueID id) const

Definition at line 276 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, query::result, and seconds().

277  {
279  switch (id.policy())
280  {
282  {
284  try
285  {
286  result = discardNewQueues_.at(id.index())->stalenessInterval();
287  }
288  catch(std::out_of_range)
289  {
290  throwUnknownQueueid(id);
291  }
292  break;
293  }
295  {
297  try
298  {
299  result = discardOldQueues_.at(id.index())->stalenessInterval();
300  }
301  catch(std::out_of_range)
302  {
303  throwUnknownQueueid(id);
304  }
305  break;
306  }
307  default:
308  {
309  throwUnknownQueueid(id);
310  // does not return, no break needed
311  }
312  }
313  return result;
314  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
double seconds()
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
tuple result
Definition: query.py:137
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
QueueID stor::QueueCollection< T >::getQueue ( const RegPtr  reginfo,
const utils::TimePoint_t now 
)
private

Definition at line 377 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, CommonMethods::lock(), and cmsPerfSuiteHarvest::now.

381  {
382  if (reginfo->queuePolicy() == enquing_policy::DiscardNew)
383  {
387  reginfo->queueSize(),
388  reginfo->secondsToStale(),
389  now
390  )
391  );
392  discardNewQueues_.push_back(newborn);
393  return QueueID(
395  discardNewQueues_.size()-1
396  );
397  }
398  else if (reginfo->queuePolicy() == enquing_policy::DiscardOld)
399  {
403  reginfo->queueSize(),
404  reginfo->secondsToStale(),
405  now
406  )
407  );
408  discardOldQueues_.push_back(newborn);
409  return QueueID(
411  discardOldQueues_.size()-1
412  );
413  }
414  return QueueID();
415  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
ExpirableQueue< T, RejectNewest< T > > ExpirableDiscardNewQueue_t
boost::mutex::scoped_lock WriteLock_t
ReadWriteMutex_t protectDiscardNewQueues_
boost::shared_ptr< ExpirableDiscardOldQueue_t > ExpirableDiscardOldQueuePtr
ExpirableQueue< T, KeepNewest< T > > ExpirableDiscardOldQueue_t
boost::shared_ptr< ExpirableDiscardNewQueue_t > ExpirableDiscardNewQueuePtr
template<class T>
QueueCollection& stor::QueueCollection< T >::operator= ( QueueCollection< T > const &  )
private
template<class T >
QueueCollection< T >::ValueType stor::QueueCollection< T >::popEvent ( const QueueID id)

Remove and return an event from the queue for the consumer with the given id. If there is no event in that queue, an empty event is returned.

Definition at line 466 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, CommonMethods::lock(), evf::utils::policy, and query::result.

467  {
469  switch (id.policy())
470  {
472  {
474  try
475  {
476  if ( discardNewQueues_.at(id.index())->deqNowait(result) )
478  result.first.totalDataSize());
479  }
480  catch(std::out_of_range)
481  {
482  throwUnknownQueueid(id);
483  }
484  break;
485  }
487  {
489  try
490  {
491  if ( discardOldQueues_.at(id.index())->deqNowait(result) )
493  result.first.totalDataSize());
494  }
495  catch(std::out_of_range)
496  {
497  throwUnknownQueueid(id);
498  }
499  break;
500  }
501  default:
502  {
503  throwUnknownQueueid(id);
504  // does not return, no break needed
505  }
506  }
507  return result;
508  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
tuple result
Definition: query.py:137
ConsumerMonitorCollection & consumerMonitorCollection_
ReadWriteMutex_t protectDiscardNewQueues_
void addServedEventSample(const QueueID &, const unsigned int &data_size)
boost::mutex::scoped_lock ReadLock_t
ExpirableQueue< T, RejectNewest< T > >::ValueType ValueType
template<class T >
QueueCollection< T >::ValueType stor::QueueCollection< T >::popEvent ( const ConsumerID cid)

Remove and return an event from the queue for the consumer with the given ConsumerID. If there is no event in that queue, an empty event is returned.

Definition at line 512 of file QueueCollection.h.

References i, stor::ConsumerID::isValid(), CommonMethods::lock(), and query::result.

513  {
515  if (!cid.isValid()) return result;
516  QueueID id;
517  {
518  // Scope to control lifetime of lock.
520  IDLookup_t::const_iterator i = queueIdLookup_.find(cid);
521  if (i == queueIdLookup_.end()) return result;
522  id = i->second;
523  }
524  return popEvent(id);
525  }
int i
Definition: DBlmapReader.cc:9
ReadWriteMutex_t protectLookup_
tuple result
Definition: query.py:137
ValueType popEvent(const QueueID &)
boost::mutex::scoped_lock ReadLock_t
ExpirableQueue< T, RejectNewest< T > >::ValueType ValueType
template<class T >
void stor::QueueCollection< T >::removeQueues ( )

Remove all contained queues. Note that this has the effect of clearing all the queues as well.

Definition at line 419 of file QueueCollection.h.

420  {
421  clearQueues();
422 
423  WriteLock_t lockDiscardNew(protectDiscardNewQueues_);
424  WriteLock_t lockDiscardOld(protectDiscardOldQueues_);
425  discardNewQueues_.clear();
426  discardOldQueues_.clear();
427 
428  WriteLock_t lockLookup(protectLookup_);
429  queueIdLookup_.clear();
430  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
ReadWriteMutex_t protectLookup_
boost::mutex::scoped_lock WriteLock_t
ReadWriteMutex_t protectDiscardNewQueues_
template<class T >
void stor::QueueCollection< T >::setExpirationInterval ( const QueueID id,
const utils::Duration_t interval 
)

Set or get the time in seconds that the queue with the given id can be unused (by a consumer) before becoming stale.

Definition at line 233 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), and evf::utils::policy.

237  {
238  switch (id.policy())
239  {
241  {
243  try
244  {
245  discardNewQueues_.at(id.index())->setStalenessInterval(interval);
246  }
247  catch(std::out_of_range)
248  {
249  throwUnknownQueueid(id);
250  }
251  break;
252  }
254  {
256  try
257  {
258  discardOldQueues_.at(id.index())->setStalenessInterval(interval);
259  }
260  catch(std::out_of_range)
261  {
262  throwUnknownQueueid(id);
263  }
264  break;
265  }
266  default:
267  {
268  throwUnknownQueueid(id);
269  // does not return, no break needed
270  }
271  }
272  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
tuple interval
Definition: MergeJob_cfg.py:20
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
QueueCollection< T >::SizeType stor::QueueCollection< T >::size ( void  ) const

Return the number of queues in the collection.

Definition at line 434 of file QueueCollection.h.

435  {
436  // We obtain locks not because it is unsafe to read the sizes
437  // without locking, but because we want consistent values.
438  ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
439  ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
440  return discardNewQueues_.size() + discardOldQueues_.size();
441  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
QueueCollection< T >::SizeType stor::QueueCollection< T >::size ( const QueueID id) const

Get number of elements in queue

Definition at line 800 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.

801  {
802  SizeType result = 0;
803  switch (id.policy())
804  {
806  {
808  try
809  {
810  result = discardNewQueues_.at(id.index())->size();
811  }
812  catch(std::out_of_range)
813  {
814  throwUnknownQueueid(id);
815  }
816  break;
817  }
819  {
821  try
822  {
823  result = discardOldQueues_.at(id.index())->size();
824  }
825  catch(std::out_of_range)
826  {
827  throwUnknownQueueid(id);
828  }
829  break;
830  }
831  default:
832  {
833  throwUnknownQueueid( id );
834  // does not return, no break needed
835  }
836  }
837  return result;
838  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
tuple result
Definition: query.py:137
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t
template<class T >
bool stor::QueueCollection< T >::stale ( const QueueID id,
const utils::TimePoint_t now 
) const

Test to see if the queue with the given QueueID is stale at the given time.

Definition at line 735 of file QueueCollection.h.

References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.

736  {
737  bool result(true);
738  switch (id.policy())
739  {
741  {
743  try
744  {
745  result = discardNewQueues_.at(id.index())->stale(now);
746  }
747  catch(std::out_of_range)
748  {
749  throwUnknownQueueid(id);
750  }
751  break;
752  }
754  {
756  try
757  {
758  result = discardOldQueues_.at(id.index())->stale(now);
759  }
760  catch(std::out_of_range)
761  {
762  throwUnknownQueueid(id);
763  }
764  break;
765  }
766  default:
767  {
768  throwUnknownQueueid(id);
769  // does not return, no break needed
770  }
771  }
772  return result;
773  }
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
ReadWriteMutex_t protectDiscardOldQueues_
tuple result
Definition: query.py:137
ReadWriteMutex_t protectDiscardNewQueues_
boost::mutex::scoped_lock ReadLock_t

Member Data Documentation

template<class T>
ConsumerMonitorCollection& stor::QueueCollection< T >::consumerMonitorCollection_
private

Definition at line 181 of file QueueCollection.h.

template<class T>
DiscardNewQueues_t stor::QueueCollection< T >::discardNewQueues_
private

Definition at line 171 of file QueueCollection.h.

template<class T>
DiscardOldQueues_t stor::QueueCollection< T >::discardOldQueues_
private

Definition at line 173 of file QueueCollection.h.

template<class T>
ReadWriteMutex_t stor::QueueCollection< T >::protectDiscardNewQueues_
mutableprivate

Definition at line 166 of file QueueCollection.h.

template<class T>
ReadWriteMutex_t stor::QueueCollection< T >::protectDiscardOldQueues_
mutableprivate

Definition at line 167 of file QueueCollection.h.

template<class T>
ReadWriteMutex_t stor::QueueCollection< T >::protectLookup_
mutableprivate

Definition at line 168 of file QueueCollection.h.

template<class T>
IDLookup_t stor::QueueCollection< T >::queueIdLookup_
private

Definition at line 176 of file QueueCollection.h.

template<class T>
ReginfoLookup_t stor::QueueCollection< T >::queueReginfoLookup_
private

Definition at line 180 of file QueueCollection.h.