31 exception <<
"ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n" 33 <<
"Looking for module label: " <<
moduleLabel() <<
"\n" 36 <<
"This means there is a configuration error.\n" 37 <<
"The module which is asking for this data must be configured to state that it will read this data.";
43 template <
bool callResolver,
typename FUNC>
50 auto presentStatus =
status();
55 auto failedStatusSetter = [
this](
ProductStatus* presentStatus) {
59 *presentStatus = this->
status();
61 std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
70 if(pd->wrapper()->isPresent()) {
81 if(not iFrom) {
return;}
88 }
else if(
original->hasIsProductEqual()) {
89 if(!
original->isProductEqual(iFrom.get())) {
92 <<
"ProductResolver::mergeTheProduct\n" 93 <<
"Two run/lumi products for the same run/lumi which should be equal are not\n" 94 <<
"Using the first, ignoring the second\n" 95 <<
"className = " << bd.className() <<
"\n" 96 <<
"moduleLabel = " << bd.moduleLabel() <<
"\n" 97 <<
"instance = " << bd.productInstanceName() <<
"\n" 98 <<
"process = " << bd.processName() <<
"\n";
103 <<
"ProductResolver::mergeTheProduct\n" 104 <<
"Run/lumi product has neither a mergeProduct nor isProductEqual function\n" 105 <<
"Using the first, ignoring the second in merge\n" 106 <<
"className = " << bd.className() <<
"\n" 107 <<
"moduleLabel = " << bd.moduleLabel() <<
"\n" 108 <<
"instance = " << bd.productInstanceName() <<
"\n" 109 <<
"process = " << bd.processName() <<
"\n";
119 return resolveProductImpl<true>([
this,&
principal,mcc]() {
133 aux_->postModuleDelayedGetSignal_.emit(*(iContext->
getStreamContext()), *iContext); }
136 if(
auto reader=principal.reader()) {
137 std::unique_lock<std::recursive_mutex> guard;
138 if(
auto sr =
reader->sharedResources().second) {
139 guard =std::unique_lock<std::recursive_mutex>(*sr);
154 std::unique_lock<std::recursive_mutex> guard;
155 if(
auto sr =
reader->sharedResources().second) {
156 guard =std::unique_lock<std::recursive_mutex>(*sr);
162 std::unique_ptr<WrapperBase> edp(
reader->getProduct(bk, &principal));
164 if(edp.get() !=
nullptr) {
175 bool skipCurrentProcess,
179 m_waitingTasks.add(waitTask);
181 bool expected =
false;
182 if( m_prefetchRequested.compare_exchange_strong(expected,
true) ) {
184 auto workToDo = [
this, mcc, &
principal, token] () {
188 resolveProductImpl<true>([
this,&
principal,mcc]() {
189 if(principal.branchType() !=
InEvent) {
return; }
190 if(
auto reader = principal.reader()) {
191 std::unique_lock<std::recursive_mutex> guard;
192 if(
auto sr =
reader->sharedResources().second) {
193 guard =std::unique_lock<std::recursive_mutex>(*sr);
202 this->m_waitingTasks.doneWaiting(std::current_exception());
205 this->m_waitingTasks.doneWaiting(
nullptr);
209 if(
auto reader = principal.reader()) {
210 if (
auto shared_res =
reader->sharedResources().first) {
211 queue = &(shared_res->serialQueueChain());
215 queue->
push(workToDo);
220 tbb::task::spawn(*
t);
227 m_prefetchRequested =
false;
228 m_waitingTasks.reset();
246 bool skipCurrentProcess,
249 if (!skipCurrentProcess) {
251 return resolveProductImpl<false>([](){
return;});
258 bool skipCurrentProcess,
262 if(not skipCurrentProcess) {
264 if( not mcc->parent().isAtEndTransition() ) {
268 m_waitingTasks.add(waitTask);
270 bool expected =
false;
271 if(worker_ and prefetchRequested_.compare_exchange_strong(expected,
true)) {
278 [
this](std::exception_ptr
const * iException) {
279 if(
nullptr != iException) {
280 m_waitingTasks.doneWaiting(*iException);
282 m_waitingTasks.doneWaiting(std::exception_ptr());
285 worker_->callWhenDoneAsync(waiting);
293 bool expected =
false;
294 if(prefetchRequested_.compare_exchange_strong(expected,
true)) {
295 m_waitingTasks.doneWaiting(std::exception_ptr());
302 m_waitingTasks.reset();
304 prefetchRequested_ =
false;
317 assert(worker_ !=
nullptr);
323 bool skipCurrentProcess,
326 if (!skipCurrentProcess and worker_) {
327 return resolveProductImpl<true>(
334 auto workCall = [
this,&
event,&parentContext,mcc] () {
339 *(aux_->eventSetup()),
353 std::ostringstream ost;
354 ost <<
"Calling produce method for unscheduled module " 355 << worker_->description().moduleName() <<
"/'" 356 << worker_->description().moduleLabel() <<
"'";
368 bool skipCurrentProcess,
373 if(skipCurrentProcess) {
return; }
374 waitingTasks_.add(waitTask);
375 bool expected =
false;
376 if(prefetchRequested_.compare_exchange_strong(expected,
true)) {
381 [
this](std::exception_ptr
const* iPtr)
386 resolveProductImpl<true>([iPtr]() {
388 std::rethrow_exception(*iPtr);
392 waitingTasks_.doneWaiting(std::current_exception());
395 waitingTasks_.doneWaiting(
nullptr);
402 *(aux_->eventSetup()),
412 prefetchRequested_ =
false;
413 waitingTasks_.reset();
448 if(not prod) {
return;}
468 <<
"It is actually of type " << typeID.className() <<
".\n";
488 auto presentStatus =
status();
550 realProduct_.setProvenance(provRetriever,ph,pid);
554 realProduct_.setProcessHistory(ph);
562 realProduct_.resetProductData_(deleteEarly);
571 <<
"AliasProductResolver::putProduct_() not implemented and should never be called.\n" 572 <<
"Contact a Framework developer\n";
577 <<
"AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 578 <<
"Contact a Framework developer\n";
584 provRetriever_ = provRetriever;
591 return provRetriever_? provRetriever_->branchIDToProvenance(bd_->originalBranchID()):
nullptr;
603 <<
"ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n" 604 <<
"Contact a Framework developer\n";
609 <<
"ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 610 <<
"Contact a Framework developer\n";
630 <<
"ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n" 631 <<
"Contact a Framework developer\n";
636 std::vector<bool>
const& ambiguous,
638 matchingHolders_(matchingHolders),
639 ambiguous_(ambiguous),
641 lastSkipCurrentCheckIndex_(lastCheckIndex_.
load()),
642 prefetchRequested_(
false),
643 skippingPrefetchRequested_(
false),
644 madeAtEnd_{madeAtEnd}
652 bool skipCurrentProcess,
656 return productResolver->
resolveProduct(principal, skipCurrentProcess, sra, mcc);
662 bool skipCurrentProcess,
667 const unsigned int choiceSize =
ambiguous_.size();
670 if( (not skipCurrentProcess) and (
madeAtEnd_ and mcc)) {
671 skipCurrentProcess = not mcc->parent().isAtEndTransition();
681 return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
688 for(
unsigned int k : lookupProcessOrder) {
698 updateCacheIndex =
k;
711 bool skipCurrentProcess,
715 bool timeToMakeAtEnd =
true;
717 timeToMakeAtEnd = mcc->parent().isAtEndTransition();
721 if(not skipCurrentProcess and timeToMakeAtEnd) {
724 bool expected =
false;
731 bool expected =
false;
741 std::exception_ptr iExceptPtr)
const {
742 if( not iSkipCurrentProcess) {
756 unsigned int iResolverIndex,
760 bool iSkipCurrentProcess,
762 resolver_(iResolver),
763 principal_(iPrincipal),
766 serviceToken_(iToken),
767 index_(iResolverIndex),
768 skipCurrentProcess_(iSkipCurrentProcess){}
770 tbb::task*
execute()
override {
771 auto exceptPtr =exceptionPtr();
773 resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
775 if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
776 resolver_->tryPrefetchResolverAsync(index_+1,
794 bool skipCurrentProcess_;
801 bool iSkipCurrentProcess,
802 std::exception_ptr iExceptPtr)
const {
804 auto k = lookupProcessOrder[iProcessingIndex];
806 setCache(iSkipCurrentProcess,
k, iExceptPtr);
813 bool iSkipCurrentProcess)
const {
815 auto k = lookupProcessOrder[iProcessingIndex];
820 setCache(iSkipCurrentProcess, k,
nullptr);
830 bool skipCurrentProcess,
835 auto index = iProcessingIndex;
837 const unsigned int choiceSize =
ambiguous_.size();
839 while(
index < lookupProcessOrder.size()) {
840 auto k = lookupProcessOrder[
index];
852 auto task =
new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
861 task->increment_ref_count();
872 if(0 == task->decrement_ref_count()) {
873 tbb::task::spawn(*task);
880 setCache(skipCurrentProcess, newCacheIndex,
nullptr);
911 <<
"NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n" 912 <<
"Contact a Framework developer\n";
917 <<
"NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n" 918 <<
"Contact a Framework developer\n";
923 <<
"NoProcessProductResolver::productResolved_() not implemented and should never be called.\n" 924 <<
"Contact a Framework developer\n";
929 <<
"NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n" 930 <<
"Contact a Framework developer\n";
935 <<
"NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n" 936 <<
"Contact a Framework developer\n";
941 <<
"NoProcessProductResolver::putProduct_() not implemented and should never be called.\n" 942 <<
"Contact a Framework developer\n";
947 <<
"NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 948 <<
"Contact a Framework developer\n";
953 <<
"NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n" 954 <<
"Contact a Framework developer\n";
959 <<
"NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n" 960 <<
"Contact a Framework developer\n";
965 <<
"NoProcessProductResolver::provenance_() not implemented and should never be called.\n" 966 <<
"Contact a Framework developer\n";
971 <<
"NoProcessProductResolver::connectTo() not implemented and should never be called.\n" 972 <<
"Contact a Framework developer\n";
979 bool skipCurrentProcess,
986 ->resolveProduct(principal,
987 skipCurrentProcess, sra, mcc);
992 bool skipCurrentProcess,
997 ->prefetchAsync(waitTask,principal,
998 skipCurrentProcess, token, sra, mcc);
1020 <<
"SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n" 1021 <<
"Contact a Framework developer\n";
1026 <<
"SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n" 1027 <<
"Contact a Framework developer\n";
1032 <<
"SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n" 1033 <<
"Contact a Framework developer\n";
1038 <<
"SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n" 1039 <<
"Contact a Framework developer\n";
1044 <<
"SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n" 1045 <<
"Contact a Framework developer\n";
1051 <<
"SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n" 1052 <<
"Contact a Framework developer\n";
1057 <<
"SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n" 1058 <<
"Contact a Framework developer\n";
1063 <<
"SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n" 1064 <<
"Contact a Framework developer\n";
1069 <<
"SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n" 1070 <<
"Contact a Framework developer\n";
1075 <<
"SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n" 1076 <<
"Contact a Framework developer\n";
1081 <<
"SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n" 1082 <<
"Contact a Framework developer\n";
void connectTo(ProductResolverBase const &iOther, Principal const *) final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
void setProcessHistory(ProcessHistory const &ph)
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
unsigned int unsetIndexValue() const
void resetProductData_(bool deleteEarly) override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
bool singleProduct_() const override
bool unscheduledWasNotRun_() const override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void throwNullRealProduct() const
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
WaitingTaskList skippingWaitingTasks_
bool singleProduct_() const override
void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
BranchDescription const & branchDescription_() const override
bool productResolved_() const final
static unsigned int kUnsetOffset
void resetProductData_(bool deleteEarly) override
std::string const & processName() const
void connectTo(ProductResolverBase const &, Principal const *) final
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
void throwProductDeletedException() const
bool productUnavailable_() const final
ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
bool productResolved_() const final
void putProduct(std::unique_ptr< WrapperBase > edp) const
WaitingTaskList waitingTasks_
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
bool singleProduct_() const override
bool productWasDeleted() const
Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
void resetProductData_(bool deleteEarly) override
void setProcessHistory_(ProcessHistory const &ph) override
ProductProvenance const * productProvenancePtr_() const override
bool isFromCurrentProcess() const final
bool productUnavailable_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
ProductProvenance const * productProvenancePtr_() const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
std::atomic< bool > prefetchRequested_
ProductProvenance const * productProvenancePtr_() const override
bool productResolved_() const final
std::atomic< unsigned int > lastCheckIndex_
TypeWithDict const & unwrappedType() const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::type_info const & unvalidatedTypeInfo() const
bool productWasDeleted_() const override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual bool isFromCurrentProcess() const =0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
DelayedReader * reader() const
UnscheduledAuxiliary const * auxiliary() const
void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void setProcessHistory_(ProcessHistory const &ph) override
BranchDescription const & branchDescription_() const override
bool productWasDeleted_() const final
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
bool productUnavailable_() const override
void addContext(std::string const &context)
bool singleProduct_() const override
std::vector< ProductResolverIndex > matchingHolders_
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
ProductProvenance const * productProvenancePtr_() const final
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
bool productResolved() const
ProductData const & getProductData() const
bool productWasDeleted_() const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void resetProductData_(bool deleteEarly) override
bool unscheduledWasNotRun_() const override
bool singleProduct_() const final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
static Resolution makeAmbiguous()
void setProcessHistory_(ProcessHistory const &ph) final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
def branchType(schema, name)
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
static HepMC::HEPEVT_Wrapper wrapper
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setFailedStatus() const