45 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
47 transform_into(InputIterator
begin, InputIterator
end,
48 ForwardIterator
out, Func
func) {
57 template <
typename FROM,
typename TO,
typename FUNC>
59 fill_summary(FROM
const& from, TO&
to, FUNC func) {
60 if(to.size()!=from.size()) {
62 transform_into(from.begin(), from.end(),
temp.begin(),
func);
65 transform_into(from.begin(), from.end(), to.begin(),
func);
75 makeInserter(ExceptionToActionTable
const&
actions,
76 std::shared_ptr<ActivityRegistry> areg,
77 std::shared_ptr<TriggerResultInserter> inserter) {
79 ptr->setActivityRegistry(areg);
85 ProductRegistry
const& preg,
86 std::multimap<std::string,Worker*>& branchToReadingWorker)
89 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly",std::vector<std::string>());
90 if(not vBranchesToDeleteEarly.empty()) {
91 std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
92 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
93 vBranchesToDeleteEarly.end());
96 auto allBranchNames = preg.allBranchNames();
98 for(
auto &
b:allBranchNames) {
101 std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
102 std::vector<std::string>
temp;
103 temp.reserve(vBranchesToDeleteEarly.size());
105 std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
106 allBranchNames.begin(),allBranchNames.end(),
107 std::back_inserter(temp));
108 vBranchesToDeleteEarly.swap(temp);
109 if(temp.size() != vBranchesToDeleteEarly.size()) {
110 std::vector<std::string> missingProducts;
111 std::set_difference(temp.begin(),temp.end(),
112 vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
113 std::back_inserter(missingProducts));
114 LogInfo
l(
"MissingProductsForCanDeleteEarly");
115 l<<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
116 for(
auto const&
n:missingProducts){
122 for(
auto const&
branch:vBranchesToDeleteEarly) {
123 branchToReadingWorker.insert(std::make_pair(
branch, static_cast<Worker*>(
nullptr)));
131 typedef std::vector<std::string>
vstring;
137 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
138 std::shared_ptr<ModuleRegistry> modReg,
145 std::shared_ptr<ActivityRegistry> areg,
146 std::shared_ptr<ProcessConfiguration> processConfiguration,
147 bool allowEarlyDelete,
150 workerManager_(modReg,areg, actions),
158 number_of_unscheduled_modules_(0),
160 streamContext_(streamID_, processContext),
161 endpathsAreActive_(
true),
162 skippingEvent_(
false){
165 bool hasPath =
false;
166 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
167 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
171 for (
auto const& trig_name : pathNames) {
172 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
179 inserter->setTrigResultForStream(streamID.
value(),
results());
188 for (
auto const& end_path_name : endPathNames) {
189 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
196 std::set<std::string> usedWorkerLabels;
198 usedWorkerLabels.insert(worker->description().moduleLabel());
200 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string> >(
"@all_modules"));
201 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
202 std::vector<std::string> unusedLabels;
203 set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
204 usedWorkerLabels.begin(), usedWorkerLabels.end(),
205 back_inserter(unusedLabels));
206 std::set<std::string> unscheduledLabels;
207 std::vector<std::string> shouldBeUsedLabels;
208 if (!unusedLabels.empty()) {
213 for (
auto const&
label : unusedLabels) {
217 assert(modulePSet !=
nullptr);
220 if (!shouldBeUsedLabels.empty()) {
221 std::ostringstream unusedStream;
222 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
223 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
224 itLabelEnd = shouldBeUsedLabels.end();
225 itLabel != itLabelEnd;
227 unusedStream <<
",'" << *itLabel <<
"'";
230 <<
"The following module labels are not assigned to any path:\n" 231 << unusedStream.str()
235 if (!unscheduledLabels.empty()) {
248 bool allowEarlyDelete) {
251 if(not allowEarlyDelete)
return;
255 std::multimap<std::string,Worker*> branchToReadingWorker;
256 initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
259 if(branchToReadingWorker.size()==0) {
262 const std::vector<std::string> kEmpty;
263 std::map<Worker*,unsigned int> reserveSizeForWorker;
264 unsigned int upperLimitOnReadingWorker =0;
265 unsigned int upperLimitOnIndicies = 0;
266 unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
272 if(branchToReadingWorker.size()>0) {
276 for(
auto const& item: kept[
InEvent]) {
280 --nUniqueBranchesToDelete;
281 branchToReadingWorker.erase(
found.first,
found.second);
288 if(branchToReadingWorker.size()==0) {
296 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet",kEmpty);
297 if(not branches.empty()) {
298 ++upperLimitOnReadingWorker;
300 for(
auto const&
branch:branches){
301 auto found = branchToReadingWorker.equal_range(
branch);
303 ++upperLimitOnIndicies;
304 ++reserveSizeForWorker[
w];
305 if(
nullptr ==
found.first->second) {
308 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
315 auto it = branchToReadingWorker.begin();
316 std::vector<std::string> unusedBranches;
317 while(it !=branchToReadingWorker.end()) {
318 if(it->second ==
nullptr) {
319 unusedBranches.push_back(it->first);
323 branchToReadingWorker.erase(temp);
328 if(not unusedBranches.empty()) {
330 l<<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n" 331 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332 for(
auto const&
n:unusedBranches){
337 if(0!=branchToReadingWorker.size()) {
341 std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
343 size_t nextOpenIndex = 0;
345 for(
auto& branchAndWorker:branchToReadingWorker) {
346 if(lastBranchName != branchAndWorker.first) {
348 BranchID bid(branchAndWorker.first+
".");
350 lastBranchName = branchAndWorker.first;
352 auto found = alreadySeenWorkers.find(branchAndWorker.second);
353 if(alreadySeenWorkers.end() ==
found) {
358 size_t index = nextOpenIndex;
359 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
362 beginAddress+index+1,
365 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(
earlyDeleteHelpers_.back())));
366 nextOpenIndex +=nIndices;
376 if(itLast->end() != it->begin()) {
378 unsigned int delta = it->begin()- itLast->end();
379 it->shiftIndexPointers(delta);
382 (itLast->end()-beginAddress),
384 (it->begin()-beginAddress));
393 p.setEarlyDeleteHelpers(alreadySeenWorkers);
396 p.setEarlyDeleteHelpers(alreadySeenWorkers);
405 std::shared_ptr<ProcessConfiguration const> processConfiguration,
409 std::vector<std::string>
const& endPathNames) {
410 vstring modnames = proc_pset.
getParameter<vstring>(pathName);
413 unsigned int placeInPath = 0;
414 for (
auto const&
name : modnames) {
431 "The unknown module label \"" << moduleLabel <<
432 "\" appears in " << pathType <<
" \"" << pathName <<
433 "\"\n please check spelling or remove that label from the path.";
441 std::vector<std::string> allowed_filters = proc_pset.
getUntrackedParameter<vstring>(
"@filters_on_endpaths");
446 <<
"The EDFilter '" << worker->
description().
moduleName() <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" << pathName <<
"'.\n" 447 <<
"The return value of the filter will be ignored.\n" 448 <<
"To suppress this warning, either remove the filter from the endpath,\n" 449 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
452 tmpworkers.emplace_back(worker, filterAction, placeInPath);
456 out.swap(tmpworkers);
462 std::shared_ptr<ProcessConfiguration const> processConfiguration,
464 std::vector<std::string>
const& endPathNames) {
466 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, endPathNames);
469 if (!tmpworkers.empty()) {
482 std::shared_ptr<ProcessConfiguration const> processConfiguration,
484 std::vector<std::string>
const& endPathNames) {
486 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, endPathNames);
488 if (!tmpworkers.empty()) {
511 if (worker->description().moduleLabel() == iLabel) {
516 if (
nullptr == found) {
524 std::vector<ModuleDescription const*>
526 std::vector<ModuleDescription const*>
result;
549 results_->at(empty_trig_path) = hltPathStatus;
550 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
575 [iTask,&ep, &es,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 579 std::exception_ptr ptr;
592 it != itEnd; ++ it) {
603 std::rethrow_exception(iExcept);
611 iExcept = std::exception_ptr();
613 iExcept = std::current_exception();
617 iExcept = std::current_exception();
622 if((not iExcept) and
results_->accept()) {
635 std::ostringstream ost;
636 ost <<
"Processing Event " << ep.
id();
639 iExcept = std::current_exception();
642 iExcept = std::current_exception();
652 [iWait,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 656 std::exception_ptr ptr;
682 std::rethrow_exception(iExcept);
685 bool const cleaningUpAfterException =
false;
691 iExcept = std::current_exception();
701 iExcept = std::current_exception();
717 std::back_inserter(oLabelsToFill),
718 std::bind(&
Path::name, std::placeholders::_1));
723 std::vector<std::string>& oLabelsToFill)
const {
724 TrigPaths::const_iterator itFound =
727 std::bind(std::equal_to<std::string>(),
729 std::bind(&
Path::name, std::placeholders::_1)));
731 oLabelsToFill.reserve(itFound->size());
732 for (
size_t i = 0;
i < itFound->size(); ++
i) {
733 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
740 std::vector<ModuleDescription const*>& descriptions,
741 unsigned int hint)
const {
742 descriptions.clear();
744 TrigPaths::const_iterator itFound;
748 if(itFound->name() == iPathLabel) found =
true;
754 std::bind(std::equal_to<std::string>(),
756 std::bind(&
Path::name, std::placeholders::_1)));
760 descriptions.reserve(itFound->size());
761 for (
size_t i = 0;
i < itFound->size(); ++
i) {
762 descriptions.push_back(itFound->getWorker(
i)->descPtr());
769 std::vector<ModuleDescription const*>& descriptions,
770 unsigned int hint)
const {
771 descriptions.clear();
773 TrigPaths::const_iterator itFound;
777 if(itFound->name() == iEndPathLabel) found =
true;
783 std::bind(std::equal_to<std::string>(),
785 std::bind(&
Path::name, std::placeholders::_1)));
786 if (itFound !=
end_paths_.end()) found =
true;
789 descriptions.reserve(itFound->size());
790 for (
size_t i = 0;
i < itFound->size(); ++
i) {
791 descriptions.push_back(itFound->getWorker(
i)->descPtr());
828 std::vector<ModuleInPathSummary>
temp(sz);
829 for (
size_t i = 0;
i != sz; ++
i) {
835 for (
size_t i = 0;
i != sz; ++
i) {
869 using std::placeholders::_1;
895 ++(earlyDeleteBranchToCount_[
index].count);
905 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
909 unsigned int indexEmpty = 0;
910 unsigned int indexOfPath = 0;
911 for(
auto & pathStatusInserter : pathStatusInserters) {
912 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
914 inserterPtr->moduleDescription(),
917 workerPtr->setActivityRegistry(
actReg_);
926 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
936 for(
auto & endPathStatusInserter : endPathStatusInserters) {
937 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
939 inserterPtr->moduleDescription(),
942 workerPtr->setActivityRegistry(
actReg_);
951 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr,
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
ServiceToken presentToken() const
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
T & get_underlying(propagate_const< T > &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
def unique(seq, keepstr=True)
StreamContext streamContext_
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
bool getMapped(key_type const &k, value_type &result) const
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
unsigned int value() const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)