00001
00003
00004 #ifndef EventFilter_StorageManager_QueueCollection_h
00005 #define EventFilter_StorageManager_QueueCollection_h
00006
00007 #include <vector>
00008 #include <limits>
00009
00010 #include "boost/bind.hpp"
00011 #include "boost/thread/mutex.hpp"
00012 #include "boost/shared_ptr.hpp"
00013
00014 #include "FWCore/Utilities/interface/Algorithms.h"
00015
00016 #include "EventFilter/StorageManager/interface/ConcurrentQueue.h"
00017 #include "EventFilter/StorageManager/interface/ConsumerID.h"
00018 #include "EventFilter/StorageManager/interface/EnquingPolicyTag.h"
00019 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h"
00020 #include "EventFilter/StorageManager/interface/Exception.h"
00021 #include "EventFilter/StorageManager/interface/ExpirableQueue.h"
00022 #include "EventFilter/StorageManager/interface/QueueID.h"
00023 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h"
00024 #include "EventFilter/StorageManager/interface/Utils.h"
00025 #include "EventFilter/StorageManager/interface/ConsumerMonitorCollection.h"
00026
00027
00028 namespace stor {
00029
00043 template <class T>
00044 class QueueCollection
00045 {
00046 public:
00047 typedef typename ExpirableQueue<T, RejectNewest<T> >::SizeType SizeType;
00048 typedef typename ExpirableQueue<T, RejectNewest<T> >::ValueType ValueType;
00049
00053 QueueCollection(ConsumerMonitorCollection&);
00054
00059 void setExpirationInterval(const QueueID&, const utils::Duration_t&);
00060 utils::Duration_t getExpirationInterval(const QueueID& id) const;
00061
00067 QueueID createQueue
00068 (
00069 const EventConsRegPtr,
00070 const utils::TimePoint_t& now = utils::getCurrentTime()
00071 );
00072 QueueID createQueue
00073 (
00074 const RegPtr,
00075 const utils::TimePoint_t& now = utils::getCurrentTime()
00076 );
00077
00082 void removeQueues();
00083
00087 SizeType size() const;
00088
00092 void addEvent(T const&);
00093
00099 ValueType popEvent(const QueueID&);
00100
00106 ValueType popEvent(const ConsumerID&);
00107
00111 void clearQueue(const QueueID&);
00112
00116 bool clearStaleQueues(const utils::TimePoint_t&);
00117
00121 void clearQueues();
00122
00126 bool empty(const QueueID&) const;
00127
00131 bool full(const QueueID&) const;
00132
00137 bool stale(const QueueID&, const utils::TimePoint_t&) const;
00138
00142 bool allQueuesStale(const utils::TimePoint_t&) const;
00143
00147 SizeType size(const QueueID&) const;
00148
00149
00150 private:
00151 typedef ExpirableQueue<T, RejectNewest<T> > ExpirableDiscardNewQueue_t;
00152 typedef ExpirableQueue<T, KeepNewest<T> > ExpirableDiscardOldQueue_t;
00153
00154 typedef boost::shared_ptr<ExpirableDiscardNewQueue_t>
00155 ExpirableDiscardNewQueuePtr;
00156 typedef boost::shared_ptr<ExpirableDiscardOldQueue_t>
00157 ExpirableDiscardOldQueuePtr;
00158
00159
00160 typedef boost::mutex::scoped_lock ReadLock_t;
00161 typedef boost::mutex::scoped_lock WriteLock_t;
00162 typedef boost::mutex ReadWriteMutex_t;
00163
00164
00165
00166 mutable ReadWriteMutex_t protectDiscardNewQueues_;
00167 mutable ReadWriteMutex_t protectDiscardOldQueues_;
00168 mutable ReadWriteMutex_t protectLookup_;
00169
00170 typedef std::vector<ExpirableDiscardNewQueuePtr> DiscardNewQueues_t;
00171 DiscardNewQueues_t discardNewQueues_;
00172 typedef std::vector<ExpirableDiscardOldQueuePtr> DiscardOldQueues_t;
00173 DiscardOldQueues_t discardOldQueues_;
00174
00175 typedef std::map<ConsumerID, QueueID> IDLookup_t;
00176 IDLookup_t queueIdLookup_;
00177 typedef std::map<EventConsRegPtr, QueueID,
00178 utils::ptrComp<EventConsumerRegistrationInfo>
00179 > ReginfoLookup_t;
00180 ReginfoLookup_t queueReginfoLookup_;
00181 ConsumerMonitorCollection& consumerMonitorCollection_;
00182
00183
00184
00185
00186
00187 QueueCollection(QueueCollection const&);
00188 QueueCollection& operator=(QueueCollection const&);
00189
00190
00191
00192
00193
00194 SizeType enqueueEvent_(QueueID const&, T const&, utils::TimePoint_t const&);
00195 QueueID getQueue(const RegPtr, const utils::TimePoint_t&);
00196
00197 };
00198
00199
00200
00201
00202
00209 namespace
00210 {
00211 void throwUnknownQueueid(const QueueID& id)
00212 {
00213 std::ostringstream msg;
00214 msg << "Unable to retrieve queue with signature: ";
00215 msg << id;
00216 XCEPT_RAISE(exception::UnknownQueueId, msg.str());
00217 }
00218 }
00219
00220 template <class T>
00221 QueueCollection<T>::QueueCollection(ConsumerMonitorCollection& ccp ) :
00222 protectDiscardNewQueues_(),
00223 protectDiscardOldQueues_(),
00224 protectLookup_(),
00225 discardNewQueues_(),
00226 discardOldQueues_(),
00227 queueIdLookup_(),
00228 consumerMonitorCollection_( ccp )
00229 { }
00230
00231 template <class T>
00232 void
00233 QueueCollection<T>::setExpirationInterval(
00234 const QueueID& id,
00235 const utils::Duration_t& interval
00236 )
00237 {
00238 switch (id.policy())
00239 {
00240 case enquing_policy::DiscardNew:
00241 {
00242 ReadLock_t lock(protectDiscardNewQueues_);
00243 try
00244 {
00245 discardNewQueues_.at(id.index())->setStalenessInterval(interval);
00246 }
00247 catch(std::out_of_range)
00248 {
00249 throwUnknownQueueid(id);
00250 }
00251 break;
00252 }
00253 case enquing_policy::DiscardOld:
00254 {
00255 ReadLock_t lock(protectDiscardOldQueues_);
00256 try
00257 {
00258 discardOldQueues_.at(id.index())->setStalenessInterval(interval);
00259 }
00260 catch(std::out_of_range)
00261 {
00262 throwUnknownQueueid(id);
00263 }
00264 break;
00265 }
00266 default:
00267 {
00268 throwUnknownQueueid(id);
00269
00270 }
00271 }
00272 }
00273
00274 template <class T>
00275 utils::Duration_t
00276 QueueCollection<T>::getExpirationInterval(const QueueID& id) const
00277 {
00278 utils::Duration_t result = boost::posix_time::seconds(0);
00279 switch (id.policy())
00280 {
00281 case enquing_policy::DiscardNew:
00282 {
00283 ReadLock_t lock(protectDiscardNewQueues_);
00284 try
00285 {
00286 result = discardNewQueues_.at(id.index())->stalenessInterval();
00287 }
00288 catch(std::out_of_range)
00289 {
00290 throwUnknownQueueid(id);
00291 }
00292 break;
00293 }
00294 case enquing_policy::DiscardOld:
00295 {
00296 ReadLock_t lock(protectDiscardOldQueues_);
00297 try
00298 {
00299 result = discardOldQueues_.at(id.index())->stalenessInterval();
00300 }
00301 catch(std::out_of_range)
00302 {
00303 throwUnknownQueueid(id);
00304 }
00305 break;
00306 }
00307 default:
00308 {
00309 throwUnknownQueueid(id);
00310
00311 }
00312 }
00313 return result;
00314 }
00315
00316 template <class T>
00317 QueueID
00318 QueueCollection<T>::createQueue
00319 (
00320 const EventConsRegPtr reginfo,
00321 const utils::TimePoint_t& now
00322 )
00323 {
00324 QueueID qid;
00325 const ConsumerID& cid = reginfo->consumerId();
00326
00327
00328
00329 if (!cid.isValid()) return qid;
00330 WriteLock_t lockLookup(protectLookup_);
00331 if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
00332
00333 if ( reginfo->uniqueEvents() )
00334 {
00335
00336
00337 ReginfoLookup_t::const_iterator it =
00338 queueReginfoLookup_.find(reginfo);
00339 if ( it != queueReginfoLookup_.end() )
00340 {
00341 qid = it->second;
00342 queueIdLookup_[cid] = qid;
00343 return qid;
00344 }
00345 }
00346
00347 qid = getQueue(reginfo, now);
00348 queueIdLookup_[cid] = qid;
00349 queueReginfoLookup_[reginfo] = qid;
00350 return qid;
00351 }
00352
00353 template <class T>
00354 QueueID
00355 QueueCollection<T>::createQueue
00356 (
00357 const RegPtr reginfo,
00358 const utils::TimePoint_t& now
00359 )
00360 {
00361 QueueID qid;
00362 const ConsumerID& cid = reginfo->consumerId();
00363
00364
00365
00366 if (!cid.isValid()) return qid;
00367 WriteLock_t lockLookup(protectLookup_);
00368 if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
00369 qid = getQueue(reginfo, now);
00370 queueIdLookup_[cid] = qid;
00371 return qid;
00372 }
00373
00374 template <class T>
00375 QueueID
00376 QueueCollection<T>::getQueue
00377 (
00378 const RegPtr reginfo,
00379 const utils::TimePoint_t& now
00380 )
00381 {
00382 if (reginfo->queuePolicy() == enquing_policy::DiscardNew)
00383 {
00384 WriteLock_t lock(protectDiscardNewQueues_);
00385 ExpirableDiscardNewQueuePtr newborn(
00386 new ExpirableDiscardNewQueue_t(
00387 reginfo->queueSize(),
00388 reginfo->secondsToStale(),
00389 now
00390 )
00391 );
00392 discardNewQueues_.push_back(newborn);
00393 return QueueID(
00394 enquing_policy::DiscardNew,
00395 discardNewQueues_.size()-1
00396 );
00397 }
00398 else if (reginfo->queuePolicy() == enquing_policy::DiscardOld)
00399 {
00400 WriteLock_t lock(protectDiscardOldQueues_);
00401 ExpirableDiscardOldQueuePtr newborn(
00402 new ExpirableDiscardOldQueue_t(
00403 reginfo->queueSize(),
00404 reginfo->secondsToStale(),
00405 now
00406 )
00407 );
00408 discardOldQueues_.push_back(newborn);
00409 return QueueID(
00410 enquing_policy::DiscardOld,
00411 discardOldQueues_.size()-1
00412 );
00413 }
00414 return QueueID();
00415 }
00416
00417 template <class T>
00418 void
00419 QueueCollection<T>::removeQueues()
00420 {
00421 clearQueues();
00422
00423 WriteLock_t lockDiscardNew(protectDiscardNewQueues_);
00424 WriteLock_t lockDiscardOld(protectDiscardOldQueues_);
00425 discardNewQueues_.clear();
00426 discardOldQueues_.clear();
00427
00428 WriteLock_t lockLookup(protectLookup_);
00429 queueIdLookup_.clear();
00430 }
00431
00432 template <class T>
00433 typename QueueCollection<T>::SizeType
00434 QueueCollection<T>::size() const
00435 {
00436
00437
00438 ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
00439 ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
00440 return discardNewQueues_.size() + discardOldQueues_.size();
00441 }
00442
00443 template <class T>
00444 void
00445 QueueCollection<T>::addEvent(T const& event)
00446 {
00447 ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
00448 ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
00449
00450 utils::TimePoint_t now = utils::getCurrentTime();
00451 QueueIDs routes = event.getEventConsumerTags();
00452
00453 for( QueueIDs::const_iterator it = routes.begin(), itEnd = routes.end();
00454 it != itEnd; ++it )
00455 {
00456 const SizeType droppedEvents = enqueueEvent_( *it, event, now );
00457 if ( droppedEvents > 0 )
00458 consumerMonitorCollection_.addDroppedEvents( *it, droppedEvents );
00459 else
00460 consumerMonitorCollection_.addQueuedEventSample( *it, event.totalDataSize() );
00461 }
00462 }
00463
00464 template <class T>
00465 typename QueueCollection<T>::ValueType
00466 QueueCollection<T>::popEvent(const QueueID& id)
00467 {
00468 ValueType result;
00469 switch (id.policy())
00470 {
00471 case enquing_policy::DiscardNew:
00472 {
00473 ReadLock_t lock(protectDiscardNewQueues_);
00474 try
00475 {
00476 if ( discardNewQueues_.at(id.index())->deqNowait(result) )
00477 consumerMonitorCollection_.addServedEventSample(id,
00478 result.first.totalDataSize());
00479 }
00480 catch(std::out_of_range)
00481 {
00482 throwUnknownQueueid(id);
00483 }
00484 break;
00485 }
00486 case enquing_policy::DiscardOld:
00487 {
00488 ReadLock_t lock(protectDiscardOldQueues_);
00489 try
00490 {
00491 if ( discardOldQueues_.at(id.index())->deqNowait(result) )
00492 consumerMonitorCollection_.addServedEventSample(id,
00493 result.first.totalDataSize());
00494 }
00495 catch(std::out_of_range)
00496 {
00497 throwUnknownQueueid(id);
00498 }
00499 break;
00500 }
00501 default:
00502 {
00503 throwUnknownQueueid(id);
00504
00505 }
00506 }
00507 return result;
00508 }
00509
00510 template <class T>
00511 typename QueueCollection<T>::ValueType
00512 QueueCollection<T>::popEvent(const ConsumerID& cid)
00513 {
00514 ValueType result;
00515 if (!cid.isValid()) return result;
00516 QueueID id;
00517 {
00518
00519 ReadLock_t lock(protectLookup_);
00520 IDLookup_t::const_iterator i = queueIdLookup_.find(cid);
00521 if (i == queueIdLookup_.end()) return result;
00522 id = i->second;
00523 }
00524 return popEvent(id);
00525 }
00526
00527 template <class T>
00528 void
00529 QueueCollection<T>::clearQueue(const QueueID& id)
00530 {
00531 switch (id.policy())
00532 {
00533 case enquing_policy::DiscardNew:
00534 {
00535 ReadLock_t lock(protectDiscardNewQueues_);
00536 try
00537 {
00538 const SizeType clearedEvents =
00539 discardNewQueues_.at(id.index())->clear();
00540
00541 consumerMonitorCollection_.addDroppedEvents(
00542 id, clearedEvents);
00543 }
00544 catch(std::out_of_range)
00545 {
00546 throwUnknownQueueid(id);
00547 }
00548 break;
00549 }
00550 case enquing_policy::DiscardOld:
00551 {
00552 ReadLock_t lock(protectDiscardOldQueues_);
00553 try
00554 {
00555 const SizeType clearedEvents =
00556 discardOldQueues_.at(id.index())->clear();
00557
00558 consumerMonitorCollection_.addDroppedEvents(
00559 id, clearedEvents);
00560 }
00561 catch(std::out_of_range)
00562 {
00563 throwUnknownQueueid(id);
00564 }
00565 break;
00566 }
00567 default:
00568 {
00569 throwUnknownQueueid(id);
00570
00571 }
00572 }
00573 }
00574
00575 template <class T>
00576 bool
00577 QueueCollection<T>::clearStaleQueues(const utils::TimePoint_t& now)
00578 {
00579 bool result(false);
00580 SizeType clearedEvents;
00581
00582 {
00583 ReadLock_t lock(protectDiscardNewQueues_);
00584 const SizeType numQueues = discardNewQueues_.size();
00585 for (SizeType i = 0; i < numQueues; ++i)
00586 {
00587 if ( discardNewQueues_[i]->clearIfStale(now, clearedEvents) )
00588 {
00589 consumerMonitorCollection_.addDroppedEvents(
00590 QueueID(enquing_policy::DiscardNew, i),
00591 clearedEvents
00592 );
00593 result = true;
00594 }
00595 }
00596 }
00597 {
00598 ReadLock_t lock(protectDiscardOldQueues_);
00599 const SizeType numQueues = discardOldQueues_.size();
00600 for (SizeType i = 0; i < numQueues; ++i)
00601 {
00602 if ( discardOldQueues_[i]->clearIfStale(now, clearedEvents) )
00603 {
00604 consumerMonitorCollection_.addDroppedEvents(
00605 QueueID(enquing_policy::DiscardOld, i),
00606 clearedEvents
00607 );
00608 result = true;
00609 }
00610 }
00611 }
00612 return result;
00613 }
00614
00615 template <class T>
00616 void
00617 QueueCollection<T>::clearQueues()
00618 {
00619 {
00620 ReadLock_t lock(protectDiscardNewQueues_);
00621 const SizeType numQueues = discardNewQueues_.size();
00622 for (SizeType i = 0; i < numQueues; ++i)
00623 {
00624 const SizeType clearedEvents =
00625 discardNewQueues_[i]->clear();
00626
00627 consumerMonitorCollection_.addDroppedEvents(
00628 QueueID(enquing_policy::DiscardNew, i),
00629 clearedEvents
00630 );
00631 }
00632 }
00633 {
00634 ReadLock_t lock(protectDiscardOldQueues_);
00635 const SizeType numQueues = discardOldQueues_.size();
00636 for (SizeType i = 0; i < numQueues; ++i)
00637 {
00638 const SizeType clearedEvents =
00639 discardOldQueues_[i]->clear();
00640
00641 consumerMonitorCollection_.addDroppedEvents(
00642 QueueID(enquing_policy::DiscardOld, i),
00643 clearedEvents
00644 );
00645 }
00646 }
00647 }
00648
00649 template <class T>
00650 bool
00651 QueueCollection<T>::empty(const QueueID& id) const
00652 {
00653 bool result(true);
00654 switch (id.policy())
00655 {
00656 case enquing_policy::DiscardNew:
00657 {
00658 ReadLock_t lock(protectDiscardNewQueues_);
00659 try
00660 {
00661 result = discardNewQueues_.at(id.index())->empty();
00662 }
00663 catch(std::out_of_range)
00664 {
00665 throwUnknownQueueid(id);
00666 }
00667 break;
00668 }
00669 case enquing_policy::DiscardOld:
00670 {
00671 ReadLock_t lock(protectDiscardOldQueues_);
00672 try
00673 {
00674 result = discardOldQueues_.at(id.index())->empty();
00675 }
00676 catch(std::out_of_range)
00677 {
00678 throwUnknownQueueid(id);
00679 }
00680 break;
00681 }
00682 default:
00683 {
00684 throwUnknownQueueid(id);
00685
00686 }
00687 }
00688 return result;
00689 }
00690
00691 template <class T>
00692 bool
00693 QueueCollection<T>::full(const QueueID& id) const
00694 {
00695 bool result(true);
00696 switch (id.policy())
00697 {
00698 case enquing_policy::DiscardNew:
00699 {
00700 ReadLock_t lock(protectDiscardNewQueues_);
00701 try
00702 {
00703 result = discardNewQueues_.at(id.index())->full();
00704 }
00705 catch(std::out_of_range)
00706 {
00707 throwUnknownQueueid(id);
00708 }
00709 break;
00710 }
00711 case enquing_policy::DiscardOld:
00712 {
00713 ReadLock_t lock(protectDiscardOldQueues_);
00714 try
00715 {
00716 result = discardOldQueues_.at(id.index())->full();
00717 }
00718 catch(std::out_of_range)
00719 {
00720 throwUnknownQueueid(id);
00721 }
00722 break;
00723 }
00724 default:
00725 {
00726 throwUnknownQueueid(id);
00727
00728 }
00729 }
00730 return result;
00731 }
00732
00733 template <class T>
00734 bool
00735 QueueCollection<T>::stale(const QueueID& id, const utils::TimePoint_t& now) const
00736 {
00737 bool result(true);
00738 switch (id.policy())
00739 {
00740 case enquing_policy::DiscardNew:
00741 {
00742 ReadLock_t lock(protectDiscardNewQueues_);
00743 try
00744 {
00745 result = discardNewQueues_.at(id.index())->stale(now);
00746 }
00747 catch(std::out_of_range)
00748 {
00749 throwUnknownQueueid(id);
00750 }
00751 break;
00752 }
00753 case enquing_policy::DiscardOld:
00754 {
00755 ReadLock_t lock(protectDiscardOldQueues_);
00756 try
00757 {
00758 result = discardOldQueues_.at(id.index())->stale(now);
00759 }
00760 catch(std::out_of_range)
00761 {
00762 throwUnknownQueueid(id);
00763 }
00764 break;
00765 }
00766 default:
00767 {
00768 throwUnknownQueueid(id);
00769
00770 }
00771 }
00772 return result;
00773 }
00774
00775 template <class T>
00776 bool
00777 QueueCollection<T>::allQueuesStale(const utils::TimePoint_t& now) const
00778 {
00779 {
00780 ReadLock_t lock(protectDiscardNewQueues_);
00781 const SizeType numQueues = discardNewQueues_.size();
00782 for (SizeType i = 0; i < numQueues; ++i)
00783 {
00784 if ( ! discardNewQueues_[i]->stale(now) ) return false;
00785 }
00786 }
00787 {
00788 ReadLock_t lock(protectDiscardOldQueues_);
00789 const SizeType numQueues = discardOldQueues_.size();
00790 for (SizeType i = 0; i < numQueues; ++i)
00791 {
00792 if ( ! discardOldQueues_[i]->stale(now) ) return false;
00793 }
00794 }
00795 return true;
00796 }
00797
00798 template <class T>
00799 typename QueueCollection<T>::SizeType
00800 QueueCollection<T>::size(const QueueID& id) const
00801 {
00802 SizeType result = 0;
00803 switch (id.policy())
00804 {
00805 case enquing_policy::DiscardNew:
00806 {
00807 ReadLock_t lock(protectDiscardNewQueues_);
00808 try
00809 {
00810 result = discardNewQueues_.at(id.index())->size();
00811 }
00812 catch(std::out_of_range)
00813 {
00814 throwUnknownQueueid(id);
00815 }
00816 break;
00817 }
00818 case enquing_policy::DiscardOld:
00819 {
00820 ReadLock_t lock(protectDiscardOldQueues_);
00821 try
00822 {
00823 result = discardOldQueues_.at(id.index())->size();
00824 }
00825 catch(std::out_of_range)
00826 {
00827 throwUnknownQueueid(id);
00828 }
00829 break;
00830 }
00831 default:
00832 {
00833 throwUnknownQueueid( id );
00834
00835 }
00836 }
00837 return result;
00838 }
00839
00840 template <class T>
00841 typename QueueCollection<T>::SizeType
00842 QueueCollection<T>::enqueueEvent_
00843 (
00844 QueueID const& id,
00845 T const& event,
00846 utils::TimePoint_t const& now
00847 )
00848 {
00849 switch (id.policy())
00850 {
00851 case enquing_policy::DiscardNew:
00852 {
00853 try
00854 {
00855 return discardNewQueues_.at(id.index())->enqNowait(event,now);
00856 }
00857 catch(std::out_of_range)
00858 {
00859 throwUnknownQueueid(id);
00860 }
00861 break;
00862 }
00863 case enquing_policy::DiscardOld:
00864 {
00865 try
00866 {
00867 return discardOldQueues_.at(id.index())->enqNowait(event,now);
00868 }
00869 catch(std::out_of_range)
00870 {
00871 throwUnknownQueueid(id);
00872 }
00873 break;
00874 }
00875 default:
00876 {
00877 throwUnknownQueueid(id);
00878
00879 }
00880 }
00881 return 1;
00882 }
00883
00884 }
00885
00886 #endif // EventFilter_StorageManager_QueueCollection_h
00887