30 exception <<
"ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n" 32 <<
"Looking for module label: " <<
moduleLabel() <<
"\n" 35 <<
"This means there is a configuration error.\n" 36 <<
"The module which is asking for this data must be configured to state that it will read this data.";
42 template <
bool callResolver,
typename FUNC>
49 auto presentStatus =
status();
54 auto failedStatusSetter = [
this](
ProductStatus* presentStatus) {
58 *presentStatus = this->
status();
60 std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
69 if(pd->wrapper()->isPresent()) {
80 if(not iFrom) {
return;}
87 }
else if(
original->hasIsProductEqual()) {
88 if(!
original->isProductEqual(iFrom.get())) {
91 <<
"ProductResolver::mergeTheProduct\n" 92 <<
"Two run/lumi products for the same run/lumi which should be equal are not\n" 93 <<
"Using the first, ignoring the second\n" 94 <<
"className = " << bd.className() <<
"\n" 95 <<
"moduleLabel = " << bd.moduleLabel() <<
"\n" 96 <<
"instance = " << bd.productInstanceName() <<
"\n" 97 <<
"process = " << bd.processName() <<
"\n";
102 <<
"ProductResolver::mergeTheProduct\n" 103 <<
"Run/lumi product has neither a mergeProduct nor isProductEqual function\n" 104 <<
"Using the first, ignoring the second in merge\n" 105 <<
"className = " << bd.className() <<
"\n" 106 <<
"moduleLabel = " << bd.moduleLabel() <<
"\n" 107 <<
"instance = " << bd.productInstanceName() <<
"\n" 108 <<
"process = " << bd.processName() <<
"\n";
118 return resolveProductImpl<true>([
this,&
principal,mcc]() {
132 aux_->postModuleDelayedGetSignal_.emit(*(iContext->
getStreamContext()), *iContext); }
135 if(
auto reader=principal.reader()) {
136 std::unique_lock<std::recursive_mutex> guard;
137 if(
auto sr =
reader->sharedResources().second) {
138 guard =std::unique_lock<std::recursive_mutex>(*sr);
153 std::unique_lock<std::recursive_mutex> guard;
154 if(
auto sr =
reader->sharedResources().second) {
155 guard =std::unique_lock<std::recursive_mutex>(*sr);
161 std::unique_ptr<WrapperBase> edp(
reader->getProduct(bk, &principal));
163 if(edp.get() !=
nullptr) {
174 bool skipCurrentProcess,
177 m_waitingTasks.add(waitTask);
179 bool expected =
false;
180 if( m_prefetchRequested.compare_exchange_strong(expected,
true) ) {
184 auto workToDo = [
this, mcc, &
principal, token] () {
187 resolveProductImpl<true>([
this,&
principal,mcc]() {
188 if(principal.branchType() !=
InEvent) {
return; }
189 if(
auto reader = principal.reader()) {
190 std::unique_lock<std::recursive_mutex> guard;
191 if(
auto sr =
reader->sharedResources().second) {
192 guard =std::unique_lock<std::recursive_mutex>(*sr);
201 this->m_waitingTasks.doneWaiting(std::current_exception());
204 this->m_waitingTasks.doneWaiting(
nullptr);
208 if(
auto reader = principal.reader()) {
209 if (
auto shared_res =
reader->sharedResources().first) {
210 queue = &(shared_res->serialQueueChain());
214 queue->
push(workToDo);
219 tbb::task::spawn(*
t);
226 m_prefetchRequested =
false;
227 m_waitingTasks.reset();
245 bool skipCurrentProcess,
248 if (!skipCurrentProcess) {
250 return resolveProductImpl<false>([](){
return;});
257 bool skipCurrentProcess,
260 if(not skipCurrentProcess) {
261 m_waitingTasks.add(waitTask);
263 bool expected =
false;
264 if(worker_ and prefetchRequested_.compare_exchange_strong(expected,
true)) {
271 [
this](std::exception_ptr
const * iException) {
272 if(
nullptr != iException) {
273 m_waitingTasks.doneWaiting(*iException);
275 m_waitingTasks.doneWaiting(std::exception_ptr());
278 worker_->callWhenDoneAsync(waiting);
286 bool expected =
false;
287 if(prefetchRequested_.compare_exchange_strong(expected,
true)) {
288 m_waitingTasks.doneWaiting(std::exception_ptr());
295 m_waitingTasks.reset();
297 prefetchRequested_ =
false;
310 assert(worker_ !=
nullptr);
316 bool skipCurrentProcess,
319 if (!skipCurrentProcess and worker_) {
320 return resolveProductImpl<true>(
327 auto workCall = [
this,&
event,&parentContext,mcc] () {
332 *(aux_->eventSetup()),
346 std::ostringstream ost;
347 ost <<
"Calling produce method for unscheduled module " 348 << worker_->description().moduleName() <<
"/'" 349 << worker_->description().moduleLabel() <<
"'";
361 bool skipCurrentProcess,
365 if(skipCurrentProcess) {
return; }
366 waitingTasks_.add(waitTask);
367 bool expected =
false;
368 if(prefetchRequested_.compare_exchange_strong(expected,
true)) {
373 [
this,&principal, skipCurrentProcess,sra,mcc](std::exception_ptr
const* iPtr)
378 resolveProductImpl<true>([iPtr]() {
380 std::rethrow_exception(*iPtr);
384 waitingTasks_.doneWaiting(std::current_exception());
387 waitingTasks_.doneWaiting(
nullptr);
394 *(aux_->eventSetup()),
403 prefetchRequested_ =
false;
404 waitingTasks_.reset();
447 if(not prod) {
return;}
467 <<
"It is actually of type " << typeID.className() <<
".\n";
487 auto presentStatus =
status();
549 realProduct_.setProvenance(provRetriever,ph,pid);
553 realProduct_.setProcessHistory(ph);
561 realProduct_.resetProductData_(deleteEarly);
570 <<
"AliasProductResolver::putProduct_() not implemented and should never be called.\n" 571 <<
"Contact a Framework developer\n";
576 <<
"AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 577 <<
"Contact a Framework developer\n";
583 provRetriever_ = provRetriever;
590 return provRetriever_? provRetriever_->branchIDToProvenance(bd_->originalBranchID()):
nullptr;
602 <<
"ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n" 603 <<
"Contact a Framework developer\n";
608 <<
"ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 609 <<
"Contact a Framework developer\n";
629 <<
"ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n" 630 <<
"Contact a Framework developer\n";
635 std::vector<bool>
const& ambiguous) :
636 matchingHolders_(matchingHolders),
637 ambiguous_(ambiguous),
639 lastSkipCurrentCheckIndex_(lastCheckIndex_.
load()),
640 prefetchRequested_(
false),
641 skippingPrefetchRequested_(
false) {
648 bool skipCurrentProcess,
652 return productResolver->
resolveProduct(principal, skipCurrentProcess, sra, mcc);
658 bool skipCurrentProcess,
663 const unsigned int choiceSize =
ambiguous_.size();
672 return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
680 for(
unsigned int k : lookupProcessOrder) {
690 updateCacheIndex =
k;
703 bool skipCurrentProcess,
706 if(not skipCurrentProcess) {
709 bool expected =
false;
716 bool expected =
false;
726 std::exception_ptr iExceptPtr)
const {
727 if( not iSkipCurrentProcess) {
741 unsigned int iResolverIndex,
745 bool iSkipCurrentProcess,
747 resolver_(iResolver),
748 principal_(iPrincipal),
751 serviceToken_(iToken),
752 index_(iResolverIndex),
753 skipCurrentProcess_(iSkipCurrentProcess){}
755 tbb::task*
execute()
override {
756 auto exceptPtr =exceptionPtr();
758 resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
760 if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
761 resolver_->tryPrefetchResolverAsync(index_+1,
779 bool skipCurrentProcess_;
786 bool iSkipCurrentProcess,
787 std::exception_ptr iExceptPtr)
const {
789 auto k = lookupProcessOrder[iProcessingIndex];
791 setCache(iSkipCurrentProcess,
k, iExceptPtr);
798 bool iSkipCurrentProcess)
const {
800 auto k = lookupProcessOrder[iProcessingIndex];
805 setCache(iSkipCurrentProcess, k,
nullptr);
815 bool skipCurrentProcess,
820 auto index = iProcessingIndex;
822 const unsigned int choiceSize =
ambiguous_.size();
824 while(
index < lookupProcessOrder.size()) {
825 auto k = lookupProcessOrder[
index];
837 auto task =
new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
846 task->increment_ref_count();
856 if(0 == task->decrement_ref_count()) {
857 tbb::task::spawn(*task);
864 setCache(skipCurrentProcess, newCacheIndex,
nullptr);
895 <<
"NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n" 896 <<
"Contact a Framework developer\n";
901 <<
"NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n" 902 <<
"Contact a Framework developer\n";
907 <<
"NoProcessProductResolver::productResolved_() not implemented and should never be called.\n" 908 <<
"Contact a Framework developer\n";
913 <<
"NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n" 914 <<
"Contact a Framework developer\n";
919 <<
"NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n" 920 <<
"Contact a Framework developer\n";
925 <<
"NoProcessProductResolver::putProduct_() not implemented and should never be called.\n" 926 <<
"Contact a Framework developer\n";
931 <<
"NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 932 <<
"Contact a Framework developer\n";
937 <<
"NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n" 938 <<
"Contact a Framework developer\n";
943 <<
"NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n" 944 <<
"Contact a Framework developer\n";
949 <<
"NoProcessProductResolver::provenance_() not implemented and should never be called.\n" 950 <<
"Contact a Framework developer\n";
955 <<
"NoProcessProductResolver::connectTo() not implemented and should never be called.\n" 956 <<
"Contact a Framework developer\n";
963 bool skipCurrentProcess,
970 ->resolveProduct(principal,
971 skipCurrentProcess, sra, mcc);
976 bool skipCurrentProcess,
980 ->prefetchAsync(waitTask,principal,
981 skipCurrentProcess, sra, mcc);
1003 <<
"SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n" 1004 <<
"Contact a Framework developer\n";
1009 <<
"SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n" 1010 <<
"Contact a Framework developer\n";
1015 <<
"SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n" 1016 <<
"Contact a Framework developer\n";
1021 <<
"SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n" 1022 <<
"Contact a Framework developer\n";
1027 <<
"SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n" 1028 <<
"Contact a Framework developer\n";
1034 <<
"SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n" 1035 <<
"Contact a Framework developer\n";
1040 <<
"SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 1041 <<
"Contact a Framework developer\n";
1046 <<
"SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n" 1047 <<
"Contact a Framework developer\n";
1052 <<
"SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n" 1053 <<
"Contact a Framework developer\n";
1058 <<
"SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n" 1059 <<
"Contact a Framework developer\n";
1064 <<
"SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n" 1065 <<
"Contact a Framework developer\n";
virtual bool productResolved_() const override final
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) override final
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
virtual void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
void setProcessHistory(ProcessHistory const &ph)
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
virtual bool productUnavailable_() const override final
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
unsigned int unsetIndexValue() const
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void resetProductData_(bool deleteEarly) override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
virtual void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
virtual bool singleProduct_() const override
virtual bool unscheduledWasNotRun_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void throwNullRealProduct() const
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) override final
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override final
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
WaitingTaskList skippingWaitingTasks_
virtual bool productWasDeleted_() const override final
virtual bool singleProduct_() const override
virtual void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
virtual BranchDescription const & branchDescription_() const override
static unsigned int kUnsetOffset
ServiceToken presentToken() const
virtual void resetProductData_(bool deleteEarly) override
std::string const & processName() const
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void throwProductDeletedException() const
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
void putProduct(std::unique_ptr< WrapperBase > edp) const
WaitingTaskList waitingTasks_
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual void resetFailedFromThisProcess() override
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
virtual void setProcessHistory_(ProcessHistory const &ph) override final
virtual bool isFromCurrentProcess() const override final
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
virtual bool singleProduct_() const override
bool productWasDeleted() const
virtual Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
virtual void resetProductData_(bool deleteEarly) override
static ServiceRegistry & instance()
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual ProductProvenance const * productProvenancePtr_() const override
virtual bool productUnavailable_() const override
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< bool > prefetchRequested_
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< unsigned int > lastCheckIndex_
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual bool productWasDeleted_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override final
virtual bool isFromCurrentProcess() const =0
virtual ProductProvenance const * productProvenancePtr_() const override final
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
virtual bool singleProduct_() const override final
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
virtual bool productResolved_() const override final
DelayedReader * reader() const
UnscheduledAuxiliary const * auxiliary() const
virtual void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual BranchDescription const & branchDescription_() const override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
virtual bool productUnavailable_() const override
void addContext(std::string const &context)
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final
virtual bool singleProduct_() const override
std::vector< ProductResolverIndex > matchingHolders_
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
BranchDescription const & branchDescription() const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
virtual bool productResolved_() const override final
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool productResolved() const
ProductData const & getProductData() const
virtual bool productWasDeleted_() const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual void resetProductData_(bool deleteEarly) override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous)
virtual bool unscheduledWasNotRun_() const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
static Resolution makeAmbiguous()
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
def branchType(schema, name)
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
static HepMC::HEPEVT_Wrapper wrapper
virtual void connectTo(ProductResolverBase const &, Principal const *) override final
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setFailedStatus() const
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final