29 exception <<
"ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
31 <<
"Looking for module label: " <<
moduleLabel() <<
"\n"
34 <<
"This means there is a configuration error.\n"
35 <<
"The module which is asking for this data must be configured to state that it will read this data.";
41 template <
bool callResolver,
typename FUNC>
48 auto presentStatus =
status();
53 auto failedStatusSetter = [
this](
ProductStatus* presentStatus) {
57 *presentStatus = this->
status();
59 std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
68 if(pd->wrapper()->isPresent()) {
79 if(not iFrom) {
return;}
86 }
else if(
original->hasIsProductEqual()) {
87 if(!
original->isProductEqual(iFrom.get())) {
90 <<
"ProductResolver::mergeTheProduct\n"
91 <<
"Two run/lumi products for the same run/lumi which should be equal are not\n"
92 <<
"Using the first, ignoring the second\n"
93 <<
"className = " << bd.className() <<
"\n"
94 <<
"moduleLabel = " << bd.moduleLabel() <<
"\n"
95 <<
"instance = " << bd.productInstanceName() <<
"\n"
96 <<
"process = " << bd.processName() <<
"\n";
101 <<
"ProductResolver::mergeTheProduct\n"
102 <<
"Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
103 <<
"Using the first, ignoring the second in merge\n"
104 <<
"className = " << bd.className() <<
"\n"
105 <<
"moduleLabel = " << bd.moduleLabel() <<
"\n"
106 <<
"instance = " << bd.productInstanceName() <<
"\n"
107 <<
"process = " << bd.processName() <<
"\n";
117 return resolveProductImpl<true>([
this,&
principal,mcc]() {
131 aux_->postModuleDelayedGetSignal_.emit(*(iContext->
getStreamContext()), *iContext); }
134 if(
auto reader=principal.reader()) {
135 std::unique_lock<std::recursive_mutex> guard;
136 if(
auto sr =
reader->sharedResources().second) {
137 guard =std::unique_lock<std::recursive_mutex>(*sr);
152 std::unique_lock<std::recursive_mutex> guard;
153 if(
auto sr =
reader->sharedResources().second) {
154 guard =std::unique_lock<std::recursive_mutex>(*sr);
160 std::unique_ptr<WrapperBase> edp(
reader->getProduct(bk, &principal));
162 if(edp.get() !=
nullptr) {
185 template<
typename ALLOC,
typename F>
192 bool skipCurrentProcess,
197 bool expected =
false;
205 resolveProductImpl<true>([
this,&
principal,mcc]() {
206 if(principal.branchType() !=
InEvent) {
return; }
207 if(
auto reader = principal.reader()) {
208 std::unique_lock<std::recursive_mutex> guard;
209 if(
auto sr =
reader->sharedResources().second) {
210 guard =std::unique_lock<std::recursive_mutex>(*sr);
226 if(
auto reader = principal.reader()) {
227 if (
auto shared_res =
reader->sharedResources().first) {
228 queue = &(shared_res->serialQueueChain());
232 queue->
push(workToDo);
237 tbb::task::spawn(*
t);
263 bool skipCurrentProcess,
266 if (!skipCurrentProcess) {
268 return resolveProductImpl<false>([](){
return;});
275 bool skipCurrentProcess,
278 if(not skipCurrentProcess) {
281 bool expected =
false;
289 [
this](std::exception_ptr
const * iException) {
290 if(
nullptr != iException) {
304 bool expected =
false;
334 bool skipCurrentProcess,
337 if (!skipCurrentProcess and
worker_) {
338 return resolveProductImpl<true>(
345 auto workCall = [
this,&
event,&parentContext,mcc] () {
364 std::ostringstream ost;
365 ost <<
"Calling produce method for unscheduled module "
366 << worker_->description().moduleName() <<
"/'"
367 << worker_->description().moduleLabel() <<
"'";
379 bool skipCurrentProcess,
383 if(skipCurrentProcess) {
return; }
385 bool expected =
false;
391 [
this,&principal, skipCurrentProcess,sra,mcc](std::exception_ptr
const* iPtr)
396 resolveProductImpl<true>([iPtr]() {
398 std::rethrow_exception(*iPtr);
465 if(not prod) {
return;}
485 <<
"It is actually of type " << typeID.className() <<
".\n";
505 auto presentStatus =
status();
588 <<
"AliasProductResolver::putProduct_() not implemented and should never be called.\n"
589 <<
"Contact a Framework developer\n";
594 <<
"AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
595 <<
"Contact a Framework developer\n";
620 <<
"ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
621 <<
"Contact a Framework developer\n";
626 <<
"ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
627 <<
"Contact a Framework developer\n";
632 std::vector<bool>
const& ambiguous) :
633 matchingHolders_(matchingHolders),
634 ambiguous_(ambiguous),
636 lastSkipCurrentCheckIndex_(lastCheckIndex_.
load()),
637 prefetchRequested_(
false),
638 skippingPrefetchRequested_(
false) {
645 bool skipCurrentProcess,
649 return productResolver->
resolveProduct(principal, skipCurrentProcess, sra, mcc);
655 bool skipCurrentProcess,
660 const unsigned int choiceSize =
ambiguous_.size();
669 return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
677 for(
unsigned int k : lookupProcessOrder) {
687 updateCacheIndex =
k;
700 bool skipCurrentProcess,
703 if(not skipCurrentProcess) {
706 bool expected =
false;
713 bool expected =
false;
723 std::exception_ptr iExceptPtr)
const {
724 if( not iSkipCurrentProcess) {
738 unsigned int iResolverIndex,
742 bool iSkipCurrentProcess,
744 resolver_(iResolver),
745 principal_(iPrincipal),
748 serviceToken_(iToken),
749 index_(iResolverIndex),
750 skipCurrentProcess_(iSkipCurrentProcess){}
752 tbb::task*
execute()
override {
753 auto exceptPtr =exceptionPtr();
755 resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
757 if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
758 resolver_->tryPrefetchResolverAsync(index_+1,
770 NoProcessProductResolver
const* resolver_;
771 Principal
const* principal_;
773 ModuleCallingContext
const* mcc_;
776 bool skipCurrentProcess_;
783 bool iSkipCurrentProcess,
784 std::exception_ptr iExceptPtr)
const {
786 auto k = lookupProcessOrder[iProcessingIndex];
788 setCache(iSkipCurrentProcess,
k, iExceptPtr);
795 bool iSkipCurrentProcess)
const {
797 auto k = lookupProcessOrder[iProcessingIndex];
802 setCache(iSkipCurrentProcess, k,
nullptr);
812 bool skipCurrentProcess,
817 auto index = iProcessingIndex;
819 const unsigned int choiceSize =
ambiguous_.size();
821 while(
index < lookupProcessOrder.size()) {
822 auto k = lookupProcessOrder[
index];
834 auto task =
new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
843 task->increment_ref_count();
853 if(0 == task->decrement_ref_count()) {
854 tbb::task::spawn(*task);
861 setCache(skipCurrentProcess, newCacheIndex,
nullptr);
892 <<
"NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
893 <<
"Contact a Framework developer\n";
898 <<
"NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
899 <<
"Contact a Framework developer\n";
904 <<
"NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
905 <<
"Contact a Framework developer\n";
910 <<
"NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
911 <<
"Contact a Framework developer\n";
916 <<
"NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
917 <<
"Contact a Framework developer\n";
922 <<
"NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
923 <<
"Contact a Framework developer\n";
928 <<
"NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
929 <<
"Contact a Framework developer\n";
934 <<
"NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
935 <<
"Contact a Framework developer\n";
940 <<
"NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
941 <<
"Contact a Framework developer\n";
946 <<
"NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
947 <<
"Contact a Framework developer\n";
952 <<
"NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
953 <<
"Contact a Framework developer\n";
960 bool skipCurrentProcess,
967 ->resolveProduct(principal,
968 skipCurrentProcess, sra, mcc);
973 bool skipCurrentProcess,
977 ->prefetchAsync(waitTask,principal,
978 skipCurrentProcess, sra, mcc);
1000 <<
"SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1001 <<
"Contact a Framework developer\n";
1006 <<
"SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1007 <<
"Contact a Framework developer\n";
1012 <<
"SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1013 <<
"Contact a Framework developer\n";
1018 <<
"SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1019 <<
"Contact a Framework developer\n";
1024 <<
"SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1025 <<
"Contact a Framework developer\n";
1031 <<
"SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1032 <<
"Contact a Framework developer\n";
1037 <<
"SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
1038 <<
"Contact a Framework developer\n";
1043 <<
"SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1044 <<
"Contact a Framework developer\n";
1049 <<
"SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1050 <<
"Contact a Framework developer\n";
1055 <<
"SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1056 <<
"Contact a Framework developer\n";
1061 <<
"SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1062 <<
"Contact a Framework developer\n";
virtual ProductProvenance const * productProvenancePtr_() const override
ProducedProductResolver & realProduct_
virtual void setupUnscheduled(UnscheduledConfigurator const &) overridefinal
std::string const & branchName() const
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
virtual void resetProductData_(bool deleteEarly) override
virtual bool productWasDeleted_() const override
Provenance const * provenance() const
void callWhenDoneAsync(WaitingTask *task)
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
void setProcessHistory(ProcessHistory const &ph)
virtual bool singleProduct_() const override
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
virtual void setProcessHistory_(ProcessHistory const &ph) overridefinal
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
WaitingTaskList m_waitingTasks
unsigned int unsetIndexValue() const
std::shared_ptr< BranchDescription const > bd_
virtual bool productResolved_() const overridefinal
virtual void resetProductData_(bool deleteEarly) override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
virtual void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
WaitingTaskList skippingWaitingTasks_
virtual void resetProductData_(bool deleteEarly) override
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
ProductProvenance const * productProvenance() const
virtual bool unscheduledWasNotRun_() const override
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual bool singleProduct_() const override
virtual bool productUnavailable_() const override
std::atomic< bool > prefetchRequested_
EventSetup const * eventSetup() const
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
static unsigned int kUnsetOffset
virtual BranchDescription const & branchDescription_() const override
ServiceToken presentToken() const
virtual void resetProductData_(bool deleteEarly) override
std::string const & processName() const
virtual Provenance const * provenance_() const override
void throwProductDeletedException() const
static unsigned int kMissingOffset
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
WaitingTaskList waitingTasks_
virtual bool singleProduct_() const override
virtual bool productResolved_() const overridefinal
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
virtual void resetFailedFromThisProcess() override
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
virtual bool isFromCurrentProcess() const =0
virtual bool isFromCurrentProcess() const overridefinal
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
virtual BranchDescription const & branchDescription_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
task * execute() override
bool productWasDeleted() const
std::vector< unsigned int > const & lookupProcessOrder() const
std::atomic< bool > prefetchRequested_
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
virtual void resetProductData_(bool deleteEarly) override
static ServiceRegistry & instance()
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
virtual bool productUnavailable_() const overridefinal
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setProcessHistory(ProcessHistory const &ph)
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual ProductProvenance const * productProvenancePtr_() const overridefinal
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
DelayedReader * reader() const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) overridefinal
virtual bool productWasDeleted_() const overridefinal
UnscheduledAuxiliary const * auxiliary() const
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) overridefinal
virtual void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
virtual bool singleProduct_() const override
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
virtual void setProcessHistory_(ProcessHistory const &ph) override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void addContext(std::string const &context)
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
std::vector< ProductResolverIndex > matchingHolders_
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const overridefinal
virtual bool singleProduct_() const overridefinal
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
virtual bool unscheduledWasNotRun_() const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual void setProcessHistory_(ProcessHistory const &ph) override
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
virtual bool productWasDeleted_() const override
virtual void setupUnscheduled(UnscheduledConfigurator const &) overridefinal
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) overridefinal
bool productResolved() const
ProductData const & getProductData() const
virtual Provenance const * provenance_() const override
virtual void connectTo(ProductResolverBase const &, Principal const *) overridefinal
volatile std::atomic< bool > shutdown_flag false
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
virtual void resetProductData_(bool deleteEarly) override
virtual ProductProvenance const * productProvenancePtr_() const override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous)
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
virtual bool productUnavailable_() const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual ProductProvenance const * productProvenancePtr_() const override
static Resolution makeAmbiguous()
virtual bool productResolved_() const overridefinal
tuple size
Write out results.
WaitingTaskList waitingTasks_
void emit(Args &&...args) const
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
static HepMC::HEPEVT_Wrapper wrapper
ProductResolverIndex realResolverIndex_
void setFailedStatus() const
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override