48 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
49 void transform_into(InputIterator
begin, InputIterator
end, ForwardIterator
out, Func
func) {
59 template <
typename FROM,
typename TO,
typename FUNC>
60 void fill_summary(FROM
const& from, TO&
to, FUNC func) {
61 if (to.size() != from.size()) {
63 transform_into(from.begin(), from.end(),
temp.begin(),
func);
66 transform_into(from.begin(), from.end(), to.begin(),
func);
76 std::shared_ptr<ActivityRegistry>
areg,
77 std::shared_ptr<TriggerResultInserter> inserter) {
80 ptr->setActivityRegistry(areg);
84 void initializeBranchToReadingWorker(std::vector<std::string>
const& branchesToDeleteEarly,
85 ProductRegistry
const&
preg,
86 std::multimap<std::string, Worker*>& branchToReadingWorker) {
87 auto vBranchesToDeleteEarly = branchesToDeleteEarly;
89 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
90 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
91 vBranchesToDeleteEarly.end());
94 auto allBranchNames = preg.allBranchNames();
96 for (
auto&
b : allBranchNames) {
97 b.resize(
b.size() - 1);
99 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
100 std::vector<std::string>
temp;
101 temp.reserve(vBranchesToDeleteEarly.size());
104 vBranchesToDeleteEarly.end(),
105 allBranchNames.begin(),
106 allBranchNames.end(),
107 std::back_inserter(temp));
108 vBranchesToDeleteEarly.swap(temp);
109 if (temp.size() != vBranchesToDeleteEarly.size()) {
110 std::vector<std::string> missingProducts;
113 vBranchesToDeleteEarly.begin(),
114 vBranchesToDeleteEarly.end(),
115 std::back_inserter(missingProducts));
116 LogInfo l(
"MissingProductsForCanDeleteEarly");
117 l <<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118 for (
auto const&
n : missingProducts) {
124 for (
auto const& branch : vBranchesToDeleteEarly) {
125 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(
nullptr)));
132 typedef std::vector<std::string>
vstring;
137 std::shared_ptr<TriggerResultInserter> inserter,
140 std::shared_ptr<ModuleRegistry> modReg,
147 std::shared_ptr<ActivityRegistry> areg,
151 : workerManager_(modReg, areg, actions),
159 number_of_unscheduled_modules_(0),
161 streamContext_(streamID_, processContext),
162 skippingEvent_(
false) {
163 bool hasPath =
false;
164 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
165 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
169 for (
auto const& trig_name : pathNames) {
170 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
177 inserter->setTrigResultForStream(streamID.
value(),
results());
186 for (
auto const& end_path_name : endPathNames) {
187 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
194 std::set<std::string> usedWorkerLabels;
196 usedWorkerLabels.insert(worker->description()->moduleLabel());
198 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string>>(
"@all_modules"));
199 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
200 std::vector<std::string> unusedLabels;
202 modulesInConfigSet.end(),
203 usedWorkerLabels.begin(),
204 usedWorkerLabels.end(),
205 back_inserter(unusedLabels));
206 std::set<std::string> unscheduledLabels;
207 std::vector<std::string> shouldBeUsedLabels;
208 if (!unusedLabels.empty()) {
213 for (
auto const&
label : unusedLabels) {
217 assert(modulePSet !=
nullptr);
219 *modulePSet, preg, &prealloc, processConfiguration,
label, unscheduledLabels, shouldBeUsedLabels);
221 if (!shouldBeUsedLabels.empty()) {
222 std::ostringstream unusedStream;
223 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
224 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
225 itLabelEnd = shouldBeUsedLabels.end();
226 itLabel != itLabelEnd;
228 unusedStream <<
",'" << *itLabel <<
"'";
230 LogInfo(
"path") <<
"The following module labels are not assigned to any path:\n" << unusedStream.str() <<
"\n";
237 std::vector<std::string>
const& branchesToDeleteEarly,
240 std::multimap<std::string, Worker*> branchToReadingWorker;
241 initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
243 const std::vector<std::string> kEmpty;
244 std::map<Worker*, unsigned int> reserveSizeForWorker;
245 unsigned int upperLimitOnReadingWorker = 0;
246 unsigned int upperLimitOnIndicies = 0;
247 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
253 if (!branchToReadingWorker.empty()) {
261 --nUniqueBranchesToDelete;
262 branchToReadingWorker.erase(
found.first,
found.second);
269 if (branchToReadingWorker.empty()) {
276 if (
nullptr !=
pset) {
277 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet", kEmpty);
278 if (not branches.empty()) {
279 ++upperLimitOnReadingWorker;
281 for (
auto const& branch : branches) {
282 auto found = branchToReadingWorker.equal_range(branch);
284 ++upperLimitOnIndicies;
285 ++reserveSizeForWorker[
w];
286 if (
nullptr ==
found.first->second) {
289 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
296 auto it = branchToReadingWorker.begin();
297 std::vector<std::string> unusedBranches;
298 while (it != branchToReadingWorker.end()) {
299 if (it->second ==
nullptr) {
300 unusedBranches.push_back(it->first);
304 branchToReadingWorker.erase(temp);
309 if (not unusedBranches.empty()) {
311 l <<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
312 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
313 for (
auto const&
n : unusedBranches) {
318 if (!branchToReadingWorker.empty()) {
322 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
324 size_t nextOpenIndex = 0;
326 for (
auto& branchAndWorker : branchToReadingWorker) {
327 if (lastBranchName != branchAndWorker.first) {
329 BranchID bid(branchAndWorker.first +
".");
331 lastBranchName = branchAndWorker.first;
333 auto found = alreadySeenWorkers.find(branchAndWorker.second);
334 if (alreadySeenWorkers.end() ==
found) {
339 size_t index = nextOpenIndex;
340 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
344 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(
earlyDeleteHelpers_.back())));
345 nextOpenIndex += nIndices;
355 if (itLast->end() != it->begin()) {
357 unsigned int delta = it->begin() - itLast->end();
358 it->shiftIndexPointers(delta);
372 p.setEarlyDeleteHelpers(alreadySeenWorkers);
375 p.setEarlyDeleteHelpers(alreadySeenWorkers);
388 std::vector<std::string>
const& endPathNames) {
392 unsigned int placeInPath = 0;
393 for (
auto const&
name : modnames) {
395 bool doNotRunConcurrently =
false;
397 if (
name[0] ==
'!') {
399 }
else if (
name[0] ==
'-' or name[0] ==
'+') {
404 doNotRunConcurrently =
true;
409 moduleLabel.erase(0, 1);
414 if (modpset ==
nullptr) {
420 <<
"The unknown module label \"" << moduleLabel <<
"\" appears in " << pathType <<
" \"" << pathName
421 <<
"\"\n please check spelling or remove that label from the path.";
434 <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '"
435 << pathName <<
"'.\n"
436 <<
"The return value of the filter will be ignored.\n"
437 <<
"To suppress this warning, either remove the filter from the endpath,\n"
438 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
441 bool runConcurrently = not doNotRunConcurrently;
443 runConcurrently =
false;
445 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
449 out.swap(tmpworkers);
459 std::vector<std::string>
const& endPathNames) {
461 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, endPathNames);
464 if (!tmpworkers.empty()) {
488 std::vector<std::string>
const& endPathNames) {
490 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, endPathNames);
492 if (!tmpworkers.empty()) {
518 if (worker->description()->moduleLabel() == iLabel) {
523 if (
nullptr == found) {
534 std::vector<ModuleDescription const*>
result;
564 results_->at(empty_trig_path) = hltPathStatus;
565 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
590 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(
nullptr);
591 auto pathErrorPtr = pathErrorHolder.get();
594 [iTask,
this, weakToken, pathError =
std::move(pathErrorHolder)](std::exception_ptr
const* iPtr)
mutable {
597 std::exception_ptr ptr;
598 if (pathError->load()) {
599 ptr = *pathError->load();
600 delete pathError->load();
602 if ((not ptr) and iPtr) {
612 auto pathsDone =
make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info,
this, weakToken](
613 std::exception_ptr
const* iPtr)
mutable {
619 pathErrorPtr->store(
new std::exception_ptr(*iPtr));
658 *(iExcept.load()) = std::exception_ptr();
660 *(iExcept.load()) = std::current_exception();
663 *(iExcept.load()) = std::current_exception();
667 if ((not iExcept) and
results_->accept()) {
681 std::rethrow_exception(expt);
686 std::ostringstream ost;
687 ost <<
"Processing Event " << info.
principal().
id();
690 iExcept.store(
new std::exception_ptr(std::current_exception()));
694 iExcept.store(
new std::exception_ptr(std::current_exception()));
698 std::exception_ptr ptr;
700 ptr = *iExcept.load();
713 bool const cleaningUpAfterException =
false;
719 iExcept = std::current_exception();
727 iExcept = std::current_exception();
741 std::back_inserter(oLabelsToFill),
742 std::bind(&
Path::name, std::placeholders::_1));
746 TrigPaths::const_iterator itFound = std::find_if(
749 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
751 oLabelsToFill.reserve(itFound->size());
752 for (
size_t i = 0;
i < itFound->size(); ++
i) {
753 oLabelsToFill.push_back(itFound->getWorker(
i)->description()->moduleLabel());
759 std::vector<ModuleDescription const*>& descriptions,
760 unsigned int hint)
const {
761 descriptions.clear();
763 TrigPaths::const_iterator itFound;
767 if (itFound->name() == iPathLabel)
772 itFound = std::find_if(
775 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
780 descriptions.reserve(itFound->size());
781 for (
size_t i = 0;
i < itFound->size(); ++
i) {
782 descriptions.push_back(itFound->getWorker(
i)->description());
788 std::vector<ModuleDescription const*>& descriptions,
789 unsigned int hint)
const {
790 descriptions.clear();
792 TrigPaths::const_iterator itFound;
796 if (itFound->name() == iEndPathLabel)
801 itFound = std::find_if(
804 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
809 descriptions.reserve(itFound->size());
810 for (
size_t i = 0;
i < itFound->size(); ++
i) {
811 descriptions.push_back(itFound->getWorker(
i)->description());
834 std::vector<ModuleInPathSummary>
temp(sz);
835 for (
size_t i = 0;
i != sz; ++
i) {
841 for (
size_t i = 0;
i != sz; ++
i) {
869 using std::placeholders::_1;
890 ++(earlyDeleteBranchToCount_[
index].count);
902 unsigned int indexEmpty = 0;
903 unsigned int indexOfPath = 0;
904 for (
auto& pathStatusInserter : pathStatusInserters) {
905 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
909 workerPtr->setActivityRegistry(
actReg_);
918 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
927 for (
auto& endPathStatusInserter : endPathStatusInserters) {
928 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
932 workerPtr->setActivityRegistry(
actReg_);
941 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr, workerPtr.get());
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
ServiceToken lock() const
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
ModuleDescription const * description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
int totalEventsFailed() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
EventPrincipal & principal()
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void setupOnDemandSystem(EventTransitionInfo const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t Func __host__ __device__ V int Func func
std::string const & category() const
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
oneapi::tbb::task_group * group() const noexcept
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
bool getMapped(key_type const &k, value_type &result) const
FunctorWaitingTask< F > * make_waiting_task(F f)
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
unsigned int value() const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
T getParameter(std::string const &) const
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
constexpr element_type const * get() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::atomic< bool > skippingEvent_
std::string const & name() const
void setupResolvers(Principal &principal)
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
auto wrap(F iFunc) -> decltype(iFunc())
Log< level::Warning, false > LogWarning
TrigResConstPtr results() const
std::vector< std::string > vstring
tuple size
Write out results.
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)