48 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
49 void transform_into(InputIterator
begin, InputIterator
end, ForwardIterator
out, Func
func) {
59 template <
typename FROM,
typename TO,
typename FUNC>
60 void fill_summary(FROM
const& from, TO&
to, FUNC func) {
61 if (to.size() != from.size()) {
63 transform_into(from.begin(), from.end(),
temp.begin(),
func);
66 transform_into(from.begin(), from.end(), to.begin(),
func);
76 std::shared_ptr<ActivityRegistry>
areg,
77 std::shared_ptr<TriggerResultInserter> inserter) {
80 ptr->setActivityRegistry(areg);
85 ProductRegistry
const&
preg,
86 std::multimap<std::string, Worker*>& branchToReadingWorker) {
88 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly");
89 if (not vBranchesToDeleteEarly.empty()) {
90 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
91 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
92 vBranchesToDeleteEarly.end());
95 auto allBranchNames = preg.allBranchNames();
97 for (
auto&
b : allBranchNames) {
98 b.resize(
b.size() - 1);
100 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
101 std::vector<std::string>
temp;
102 temp.reserve(vBranchesToDeleteEarly.size());
105 vBranchesToDeleteEarly.end(),
106 allBranchNames.begin(),
107 allBranchNames.end(),
108 std::back_inserter(temp));
109 vBranchesToDeleteEarly.swap(temp);
110 if (temp.size() != vBranchesToDeleteEarly.size()) {
111 std::vector<std::string> missingProducts;
114 vBranchesToDeleteEarly.begin(),
115 vBranchesToDeleteEarly.end(),
116 std::back_inserter(missingProducts));
117 LogInfo l(
"MissingProductsForCanDeleteEarly");
118 l <<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
119 for (
auto const&
n : missingProducts) {
125 for (
auto const& branch : vBranchesToDeleteEarly) {
126 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(
nullptr)));
134 typedef std::vector<std::string>
vstring;
139 std::shared_ptr<TriggerResultInserter> inserter,
142 std::shared_ptr<ModuleRegistry> modReg,
149 std::shared_ptr<ActivityRegistry> areg,
151 bool allowEarlyDelete,
154 : workerManager_(modReg, areg, actions),
162 number_of_unscheduled_modules_(0),
164 streamContext_(streamID_, processContext),
165 skippingEvent_(
false) {
167 bool hasPath =
false;
168 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
169 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
173 for (
auto const& trig_name : pathNames) {
174 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
181 inserter->setTrigResultForStream(streamID.
value(),
results());
190 for (
auto const& end_path_name : endPathNames) {
191 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
198 std::set<std::string> usedWorkerLabels;
200 usedWorkerLabels.insert(worker->description()->moduleLabel());
202 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string>>(
"@all_modules"));
203 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204 std::vector<std::string> unusedLabels;
206 modulesInConfigSet.end(),
207 usedWorkerLabels.begin(),
208 usedWorkerLabels.end(),
209 back_inserter(unusedLabels));
210 std::set<std::string> unscheduledLabels;
211 std::vector<std::string> shouldBeUsedLabels;
212 if (!unusedLabels.empty()) {
217 for (
auto const&
label : unusedLabels) {
221 assert(modulePSet !=
nullptr);
223 *modulePSet, preg, &prealloc, processConfiguration,
label, unscheduledLabels, shouldBeUsedLabels);
225 if (!shouldBeUsedLabels.empty()) {
226 std::ostringstream unusedStream;
227 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
228 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229 itLabelEnd = shouldBeUsedLabels.end();
230 itLabel != itLabelEnd;
232 unusedStream <<
",'" << *itLabel <<
"'";
234 LogInfo(
"path") <<
"The following module labels are not assigned to any path:\n" << unusedStream.str() <<
"\n";
246 bool allowEarlyDelete) {
249 if (not allowEarlyDelete)
254 std::multimap<std::string, Worker*> branchToReadingWorker;
255 initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
258 if (branchToReadingWorker.empty()) {
261 const std::vector<std::string> kEmpty;
262 std::map<Worker*, unsigned int> reserveSizeForWorker;
263 unsigned int upperLimitOnReadingWorker = 0;
264 unsigned int upperLimitOnIndicies = 0;
265 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
271 if (!branchToReadingWorker.empty()) {
279 --nUniqueBranchesToDelete;
280 branchToReadingWorker.erase(
found.first,
found.second);
287 if (branchToReadingWorker.empty()) {
294 if (
nullptr !=
pset) {
295 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet", kEmpty);
296 if (not branches.empty()) {
297 ++upperLimitOnReadingWorker;
299 for (
auto const& branch : branches) {
300 auto found = branchToReadingWorker.equal_range(branch);
302 ++upperLimitOnIndicies;
303 ++reserveSizeForWorker[
w];
304 if (
nullptr ==
found.first->second) {
307 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
314 auto it = branchToReadingWorker.begin();
315 std::vector<std::string> unusedBranches;
316 while (it != branchToReadingWorker.end()) {
317 if (it->second ==
nullptr) {
318 unusedBranches.push_back(it->first);
322 branchToReadingWorker.erase(temp);
327 if (not unusedBranches.empty()) {
329 l <<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331 for (
auto const&
n : unusedBranches) {
336 if (!branchToReadingWorker.empty()) {
340 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
342 size_t nextOpenIndex = 0;
344 for (
auto& branchAndWorker : branchToReadingWorker) {
345 if (lastBranchName != branchAndWorker.first) {
347 BranchID bid(branchAndWorker.first +
".");
349 lastBranchName = branchAndWorker.first;
351 auto found = alreadySeenWorkers.find(branchAndWorker.second);
352 if (alreadySeenWorkers.end() ==
found) {
357 size_t index = nextOpenIndex;
358 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
362 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(
earlyDeleteHelpers_.back())));
363 nextOpenIndex += nIndices;
373 if (itLast->end() != it->begin()) {
375 unsigned int delta = it->begin() - itLast->end();
376 it->shiftIndexPointers(delta);
390 p.setEarlyDeleteHelpers(alreadySeenWorkers);
393 p.setEarlyDeleteHelpers(alreadySeenWorkers);
406 std::vector<std::string>
const& endPathNames) {
410 unsigned int placeInPath = 0;
411 for (
auto const&
name : modnames) {
413 bool doNotRunConcurrently =
false;
415 if (
name[0] ==
'!') {
417 }
else if (
name[0] ==
'-' or name[0] ==
'+') {
422 doNotRunConcurrently =
true;
427 moduleLabel.erase(0, 1);
432 if (modpset ==
nullptr) {
438 <<
"The unknown module label \"" << moduleLabel <<
"\" appears in " << pathType <<
" \"" << pathName
439 <<
"\"\n please check spelling or remove that label from the path.";
452 <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '"
453 << pathName <<
"'.\n"
454 <<
"The return value of the filter will be ignored.\n"
455 <<
"To suppress this warning, either remove the filter from the endpath,\n"
456 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
459 bool runConcurrently = not doNotRunConcurrently;
461 runConcurrently =
false;
463 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
467 out.swap(tmpworkers);
477 std::vector<std::string>
const& endPathNames) {
479 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, endPathNames);
482 if (!tmpworkers.empty()) {
506 std::vector<std::string>
const& endPathNames) {
508 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, endPathNames);
510 if (!tmpworkers.empty()) {
536 if (worker->description()->moduleLabel() == iLabel) {
541 if (
nullptr == found) {
552 std::vector<ModuleDescription const*>
result;
582 results_->at(empty_trig_path) = hltPathStatus;
583 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
608 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(
nullptr);
609 auto pathErrorPtr = pathErrorHolder.get();
612 [iTask,
this, weakToken, pathError =
std::move(pathErrorHolder)](std::exception_ptr
const* iPtr)
mutable {
615 std::exception_ptr ptr;
616 if (pathError->load()) {
617 ptr = *pathError->load();
618 delete pathError->load();
620 if ((not ptr) and iPtr) {
630 auto pathsDone =
make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info,
this, weakToken](
631 std::exception_ptr
const* iPtr)
mutable {
637 pathErrorPtr->store(
new std::exception_ptr(*iPtr));
676 *(iExcept.load()) = std::exception_ptr();
678 *(iExcept.load()) = std::current_exception();
681 *(iExcept.load()) = std::current_exception();
685 if ((not iExcept) and
results_->accept()) {
699 std::rethrow_exception(expt);
704 std::ostringstream ost;
705 ost <<
"Processing Event " << info.
principal().
id();
708 iExcept.store(
new std::exception_ptr(std::current_exception()));
712 iExcept.store(
new std::exception_ptr(std::current_exception()));
716 std::exception_ptr ptr;
718 ptr = *iExcept.load();
731 bool const cleaningUpAfterException =
false;
737 iExcept = std::current_exception();
745 iExcept = std::current_exception();
759 std::back_inserter(oLabelsToFill),
760 std::bind(&
Path::name, std::placeholders::_1));
764 TrigPaths::const_iterator itFound = std::find_if(
767 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
769 oLabelsToFill.reserve(itFound->size());
770 for (
size_t i = 0;
i < itFound->size(); ++
i) {
771 oLabelsToFill.push_back(itFound->getWorker(
i)->description()->moduleLabel());
777 std::vector<ModuleDescription const*>& descriptions,
778 unsigned int hint)
const {
779 descriptions.clear();
781 TrigPaths::const_iterator itFound;
785 if (itFound->name() == iPathLabel)
790 itFound = std::find_if(
793 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
798 descriptions.reserve(itFound->size());
799 for (
size_t i = 0;
i < itFound->size(); ++
i) {
800 descriptions.push_back(itFound->getWorker(
i)->description());
806 std::vector<ModuleDescription const*>& descriptions,
807 unsigned int hint)
const {
808 descriptions.clear();
810 TrigPaths::const_iterator itFound;
814 if (itFound->name() == iEndPathLabel)
819 itFound = std::find_if(
822 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
827 descriptions.reserve(itFound->size());
828 for (
size_t i = 0;
i < itFound->size(); ++
i) {
829 descriptions.push_back(itFound->getWorker(
i)->description());
852 std::vector<ModuleInPathSummary>
temp(sz);
853 for (
size_t i = 0;
i != sz; ++
i) {
859 for (
size_t i = 0;
i != sz; ++
i) {
887 using std::placeholders::_1;
908 ++(earlyDeleteBranchToCount_[
index].count);
920 unsigned int indexEmpty = 0;
921 unsigned int indexOfPath = 0;
922 for (
auto& pathStatusInserter : pathStatusInserters) {
923 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
927 workerPtr->setActivityRegistry(
actReg_);
936 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
945 for (
auto& endPathStatusInserter : endPathStatusInserters) {
946 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
950 workerPtr->setActivityRegistry(
actReg_);
959 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr, workerPtr.get());
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
ServiceToken lock() const
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
ModuleDescription const * description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
int totalEventsFailed() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
EventPrincipal & principal()
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void setupOnDemandSystem(EventTransitionInfo const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t Func __host__ __device__ V int Func func
std::string const & category() const
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
bool getMapped(key_type const &k, value_type &result) const
FunctorWaitingTask< F > * make_waiting_task(F f)
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
T getParameter(std::string const &) const
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
tbb::task_group * group() const noexcept
constexpr element_type const * get() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
void setupResolvers(Principal &principal)
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
auto wrap(F iFunc) -> decltype(iFunc())
Log< level::Warning, false > LogWarning
TrigResConstPtr results() const
std::vector< std::string > vstring
tuple size
Write out results.
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)