45 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
47 transform_into(InputIterator
begin, InputIterator
end,
48 ForwardIterator
out, Func
func) {
57 template <
typename FROM,
typename TO,
typename FUNC>
59 fill_summary(FROM
const& from, TO&
to, FUNC func) {
60 if(to.size()!=from.size()) {
62 transform_into(from.begin(), from.end(),
temp.begin(),
func);
65 transform_into(from.begin(), from.end(), to.begin(),
func);
75 makeInserter(ExceptionToActionTable
const&
actions,
76 std::shared_ptr<ActivityRegistry> areg,
77 std::shared_ptr<TriggerResultInserter> inserter) {
79 ptr->setActivityRegistry(areg);
85 ProductRegistry
const& preg,
86 std::multimap<std::string,Worker*>& branchToReadingWorker)
89 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly",std::vector<std::string>());
90 if(not vBranchesToDeleteEarly.empty()) {
91 std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
92 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
93 vBranchesToDeleteEarly.end());
96 auto allBranchNames = preg.allBranchNames();
98 for(
auto &
b:allBranchNames) {
101 std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
102 std::vector<std::string>
temp;
103 temp.reserve(vBranchesToDeleteEarly.size());
105 std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
106 allBranchNames.begin(),allBranchNames.end(),
107 std::back_inserter(temp));
108 vBranchesToDeleteEarly.swap(temp);
109 if(temp.size() != vBranchesToDeleteEarly.size()) {
110 std::vector<std::string> missingProducts;
111 std::set_difference(temp.begin(),temp.end(),
112 vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
113 std::back_inserter(missingProducts));
114 LogInfo
l(
"MissingProductsForCanDeleteEarly");
115 l<<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
116 for(
auto const&
n:missingProducts){
122 for(
auto const&
branch:vBranchesToDeleteEarly) {
123 branchToReadingWorker.insert(std::make_pair(
branch, static_cast<Worker*>(
nullptr)));
131 typedef std::vector<std::string>
vstring;
137 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
138 std::shared_ptr<ModuleRegistry> modReg,
145 std::shared_ptr<ActivityRegistry> areg,
146 std::shared_ptr<ProcessConfiguration> processConfiguration,
147 bool allowEarlyDelete,
150 workerManager_(modReg,areg, actions),
158 number_of_unscheduled_modules_(0),
160 streamContext_(streamID_, processContext),
161 endpathsAreActive_(
true),
162 skippingEvent_(
false){
165 bool hasPath =
false;
166 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
167 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
171 for (
auto const& trig_name : pathNames) {
172 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
179 inserter->setTrigResultForStream(streamID.
value(),
results());
188 for (
auto const& end_path_name : endPathNames) {
189 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
196 std::set<std::string> usedWorkerLabels;
198 usedWorkerLabels.insert(worker->description().moduleLabel());
200 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string> >(
"@all_modules"));
201 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
202 std::vector<std::string> unusedLabels;
203 set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
204 usedWorkerLabels.begin(), usedWorkerLabels.end(),
205 back_inserter(unusedLabels));
206 std::set<std::string> unscheduledLabels;
207 std::vector<std::string> shouldBeUsedLabels;
208 if (!unusedLabels.empty()) {
213 for (
auto const&
label : unusedLabels) {
217 assert(modulePSet !=
nullptr);
220 if (!shouldBeUsedLabels.empty()) {
221 std::ostringstream unusedStream;
222 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
223 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
224 itLabelEnd = shouldBeUsedLabels.end();
225 itLabel != itLabelEnd;
227 unusedStream <<
",'" << *itLabel <<
"'";
230 <<
"The following module labels are not assigned to any path:\n" 231 << unusedStream.str()
235 if (!unscheduledLabels.empty()) {
248 bool allowEarlyDelete) {
251 if(not allowEarlyDelete)
return;
255 std::multimap<std::string,Worker*> branchToReadingWorker;
256 initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
259 if(branchToReadingWorker.empty()) {
262 const std::vector<std::string> kEmpty;
263 std::map<Worker*,unsigned int> reserveSizeForWorker;
264 unsigned int upperLimitOnReadingWorker =0;
265 unsigned int upperLimitOnIndicies = 0;
266 unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
272 if(!branchToReadingWorker.empty()) {
276 for(
auto const& item: kept[
InEvent]) {
280 --nUniqueBranchesToDelete;
281 branchToReadingWorker.erase(
found.first,
found.second);
288 if(branchToReadingWorker.empty()) {
296 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet",kEmpty);
297 if(not branches.empty()) {
298 ++upperLimitOnReadingWorker;
300 for(
auto const&
branch:branches){
301 auto found = branchToReadingWorker.equal_range(
branch);
303 ++upperLimitOnIndicies;
304 ++reserveSizeForWorker[
w];
305 if(
nullptr ==
found.first->second) {
308 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
315 auto it = branchToReadingWorker.begin();
316 std::vector<std::string> unusedBranches;
317 while(it !=branchToReadingWorker.end()) {
318 if(it->second ==
nullptr) {
319 unusedBranches.push_back(it->first);
323 branchToReadingWorker.erase(temp);
328 if(not unusedBranches.empty()) {
330 l<<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n" 331 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332 for(
auto const&
n:unusedBranches){
337 if(!branchToReadingWorker.empty()) {
341 std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
343 size_t nextOpenIndex = 0;
345 for(
auto& branchAndWorker:branchToReadingWorker) {
346 if(lastBranchName != branchAndWorker.first) {
348 BranchID bid(branchAndWorker.first+
".");
350 lastBranchName = branchAndWorker.first;
352 auto found = alreadySeenWorkers.find(branchAndWorker.second);
353 if(alreadySeenWorkers.end() ==
found) {
358 size_t index = nextOpenIndex;
359 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
362 beginAddress+index+1,
365 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(
earlyDeleteHelpers_.back())));
366 nextOpenIndex +=nIndices;
376 if(itLast->end() != it->begin()) {
378 unsigned int delta = it->begin()- itLast->end();
379 it->shiftIndexPointers(delta);
382 (itLast->end()-beginAddress),
384 (it->begin()-beginAddress));
393 p.setEarlyDeleteHelpers(alreadySeenWorkers);
396 p.setEarlyDeleteHelpers(alreadySeenWorkers);
405 std::shared_ptr<ProcessConfiguration const> processConfiguration,
409 std::vector<std::string>
const& endPathNames) {
410 vstring modnames = proc_pset.
getParameter<vstring>(pathName);
413 unsigned int placeInPath = 0;
414 for (
auto const&
name : modnames) {
425 if (modpset ==
nullptr) {
431 "The unknown module label \"" << moduleLabel <<
432 "\" appears in " << pathType <<
" \"" << pathName <<
433 "\"\n please check spelling or remove that label from the path.";
441 std::vector<std::string> allowed_filters = proc_pset.
getUntrackedParameter<vstring>(
"@filters_on_endpaths");
446 <<
"The EDFilter '" << worker->
description().
moduleName() <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" << pathName <<
"'.\n" 447 <<
"The return value of the filter will be ignored.\n" 448 <<
"To suppress this warning, either remove the filter from the endpath,\n" 449 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
452 tmpworkers.emplace_back(worker, filterAction, placeInPath);
456 out.swap(tmpworkers);
462 std::shared_ptr<ProcessConfiguration const> processConfiguration,
464 std::vector<std::string>
const& endPathNames) {
466 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, endPathNames);
469 if (!tmpworkers.empty()) {
482 std::shared_ptr<ProcessConfiguration const> processConfiguration,
484 std::vector<std::string>
const& endPathNames) {
486 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, endPathNames);
488 if (!tmpworkers.empty()) {
511 if (worker->description().moduleLabel() == iLabel) {
516 if (
nullptr == found) {
524 std::vector<ModuleDescription const*>
526 std::vector<ModuleDescription const*>
result;
549 results_->at(empty_trig_path) = hltPathStatus;
550 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
576 [iTask,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 580 std::exception_ptr ptr;
592 [allPathsHolder,&ep, &es,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 596 std::exception_ptr ptr;
615 it != itEnd; ++ it) {
626 std::rethrow_exception(iExcept);
634 iExcept = std::exception_ptr();
636 iExcept = std::current_exception();
640 iExcept = std::current_exception();
645 if((not iExcept) and
results_->accept()) {
661 std::ostringstream ost;
662 ost <<
"Processing Event " << ep.
id();
665 iExcept = std::current_exception();
670 iExcept = std::current_exception();
686 std::rethrow_exception(iExcept);
689 bool const cleaningUpAfterException =
false;
695 iExcept = std::current_exception();
705 iExcept = std::current_exception();
721 std::back_inserter(oLabelsToFill),
722 std::bind(&
Path::name, std::placeholders::_1));
727 std::vector<std::string>& oLabelsToFill)
const {
728 TrigPaths::const_iterator itFound =
731 std::bind(std::equal_to<std::string>(),
733 std::bind(&
Path::name, std::placeholders::_1)));
735 oLabelsToFill.reserve(itFound->size());
736 for (
size_t i = 0;
i < itFound->size(); ++
i) {
737 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
744 std::vector<ModuleDescription const*>& descriptions,
745 unsigned int hint)
const {
746 descriptions.clear();
748 TrigPaths::const_iterator itFound;
752 if(itFound->name() == iPathLabel) found =
true;
758 std::bind(std::equal_to<std::string>(),
760 std::bind(&
Path::name, std::placeholders::_1)));
764 descriptions.reserve(itFound->size());
765 for (
size_t i = 0;
i < itFound->size(); ++
i) {
766 descriptions.push_back(itFound->getWorker(
i)->descPtr());
773 std::vector<ModuleDescription const*>& descriptions,
774 unsigned int hint)
const {
775 descriptions.clear();
777 TrigPaths::const_iterator itFound;
781 if(itFound->name() == iEndPathLabel) found =
true;
787 std::bind(std::equal_to<std::string>(),
789 std::bind(&
Path::name, std::placeholders::_1)));
790 if (itFound !=
end_paths_.end()) found =
true;
793 descriptions.reserve(itFound->size());
794 for (
size_t i = 0;
i < itFound->size(); ++
i) {
795 descriptions.push_back(itFound->getWorker(
i)->descPtr());
832 std::vector<ModuleInPathSummary>
temp(sz);
833 for (
size_t i = 0;
i != sz; ++
i) {
839 for (
size_t i = 0;
i != sz; ++
i) {
873 using std::placeholders::_1;
899 ++(earlyDeleteBranchToCount_[
index].count);
909 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
913 unsigned int indexEmpty = 0;
914 unsigned int indexOfPath = 0;
915 for(
auto & pathStatusInserter : pathStatusInserters) {
916 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
918 inserterPtr->moduleDescription(),
921 workerPtr->setActivityRegistry(
actReg_);
930 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
940 for(
auto & endPathStatusInserter : endPathStatusInserters) {
941 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
943 inserterPtr->moduleDescription(),
946 workerPtr->setActivityRegistry(
actReg_);
955 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr,
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
ServiceToken presentToken() const
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
T & get_underlying(propagate_const< T > &)
def unique(seq, keepstr=True)
StreamContext streamContext_
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
bool getMapped(key_type const &k, value_type &result) const
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
unsigned int value() const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)