43 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
45 transform_into(InputIterator
begin, InputIterator
end,
46 ForwardIterator
out, Func
func) {
55 template <
typename FROM,
typename TO,
typename FUNC>
57 fill_summary(FROM
const& from, TO&
to, FUNC func) {
58 if(to.size()!=from.size()) {
60 transform_into(from.begin(), from.end(),
temp.begin(),
func);
63 transform_into(from.begin(), from.end(), to.begin(),
func);
73 makeInserter(ExceptionToActionTable
const&
actions,
74 std::shared_ptr<ActivityRegistry> areg,
75 std::shared_ptr<TriggerResultInserter> inserter) {
77 ptr->setActivityRegistry(areg);
83 ProductRegistry
const& preg,
84 std::multimap<std::string,Worker*>& branchToReadingWorker)
87 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly",std::vector<std::string>());
88 if(not vBranchesToDeleteEarly.empty()) {
89 std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
90 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
91 vBranchesToDeleteEarly.end());
94 auto allBranchNames = preg.allBranchNames();
96 for(
auto &
b:allBranchNames) {
99 std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
100 std::vector<std::string>
temp;
101 temp.reserve(vBranchesToDeleteEarly.size());
103 std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
104 allBranchNames.begin(),allBranchNames.end(),
105 std::back_inserter(temp));
106 vBranchesToDeleteEarly.swap(temp);
107 if(temp.size() != vBranchesToDeleteEarly.size()) {
108 std::vector<std::string> missingProducts;
109 std::set_difference(temp.begin(),temp.end(),
110 vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
111 std::back_inserter(missingProducts));
112 LogInfo
l(
"MissingProductsForCanDeleteEarly");
113 l<<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
114 for(
auto const&
n:missingProducts){
120 for(
auto const& branch:vBranchesToDeleteEarly) {
121 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(
nullptr)));
129 typedef std::vector<std::string>
vstring;
134 std::shared_ptr<ModuleRegistry> modReg,
141 std::shared_ptr<ActivityRegistry> areg,
142 std::shared_ptr<ProcessConfiguration> processConfiguration,
143 bool allowEarlyDelete,
146 workerManager_(modReg,areg, actions),
148 trig_name_list_(tns.getTrigPaths()),
149 end_path_name_list_(tns.getEndPaths()),
156 number_of_unscheduled_modules_(0),
158 streamContext_(streamID_, processContext),
159 endpathsAreActive_(
true),
160 skippingEvent_(
false){
163 bool hasPath =
false;
167 vstring labelsOnTriggerPaths;
169 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), &labelsOnTriggerPaths);
176 inserter->setTrigResultForStream(streamID.
value(),
results());
186 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
191 std::set<std::string> usedWorkerLabels;
193 usedWorkerLabels.insert(worker->description().moduleLabel());
195 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string> >(
"@all_modules"));
196 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
197 std::vector<std::string> unusedLabels;
198 set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
199 usedWorkerLabels.begin(), usedWorkerLabels.end(),
200 back_inserter(unusedLabels));
203 std::set<std::string> unscheduledLabels;
204 std::vector<std::string> shouldBeUsedLabels;
205 if (!unusedLabels.empty()) {
210 for (
auto const&
label : unusedLabels) {
211 if (allowUnscheduled) {
215 assert(modulePSet !=
nullptr);
219 shouldBeUsedLabels.push_back(
label);
222 if (!shouldBeUsedLabels.empty()) {
223 std::ostringstream unusedStream;
224 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
225 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
226 itLabelEnd = shouldBeUsedLabels.end();
227 itLabel != itLabelEnd;
229 unusedStream <<
",'" << *itLabel <<
"'";
232 <<
"The following module labels are not assigned to any path:\n" 233 << unusedStream.str()
237 if (!unscheduledLabels.empty()) {
250 bool allowEarlyDelete) {
253 if(not allowEarlyDelete)
return;
257 std::multimap<std::string,Worker*> branchToReadingWorker;
258 initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
261 if(branchToReadingWorker.size()==0) {
264 const std::vector<std::string> kEmpty;
265 std::map<Worker*,unsigned int> reserveSizeForWorker;
266 unsigned int upperLimitOnReadingWorker =0;
267 unsigned int upperLimitOnIndicies = 0;
268 unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
274 if(branchToReadingWorker.size()>0) {
278 for(
auto const& item: kept[
InEvent]) {
282 --nUniqueBranchesToDelete;
283 branchToReadingWorker.erase(
found.first,
found.second);
290 if(branchToReadingWorker.size()==0) {
298 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet",kEmpty);
299 if(not branches.empty()) {
300 ++upperLimitOnReadingWorker;
302 for(
auto const& branch:branches){
303 auto found = branchToReadingWorker.equal_range(branch);
305 ++upperLimitOnIndicies;
306 ++reserveSizeForWorker[
w];
307 if(
nullptr ==
found.first->second) {
310 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
317 auto it = branchToReadingWorker.begin();
318 std::vector<std::string> unusedBranches;
319 while(it !=branchToReadingWorker.end()) {
320 if(it->second ==
nullptr) {
321 unusedBranches.push_back(it->first);
325 branchToReadingWorker.erase(temp);
330 if(not unusedBranches.empty()) {
332 l<<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n" 333 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
334 for(
auto const&
n:unusedBranches){
339 if(0!=branchToReadingWorker.size()) {
343 std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
345 size_t nextOpenIndex = 0;
347 for(
auto& branchAndWorker:branchToReadingWorker) {
348 if(lastBranchName != branchAndWorker.first) {
350 BranchID bid(branchAndWorker.first+
".");
352 lastBranchName = branchAndWorker.first;
354 auto found = alreadySeenWorkers.find(branchAndWorker.second);
355 if(alreadySeenWorkers.end() ==
found) {
360 size_t index = nextOpenIndex;
361 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
364 beginAddress+index+1,
367 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(
earlyDeleteHelpers_.back())));
368 nextOpenIndex +=nIndices;
378 if(itLast->end() != it->begin()) {
380 unsigned int delta = it->begin()- itLast->end();
381 it->shiftIndexPointers(delta);
384 (itLast->end()-beginAddress),
386 (it->begin()-beginAddress));
395 p.setEarlyDeleteHelpers(alreadySeenWorkers);
398 p.setEarlyDeleteHelpers(alreadySeenWorkers);
407 std::shared_ptr<ProcessConfiguration const> processConfiguration,
411 vstring* labelsOnPaths) {
412 vstring modnames = proc_pset.
getParameter<vstring>(pathName);
415 unsigned int placeInPath = 0;
416 for (
auto const&
name : modnames) {
418 if (labelsOnPaths) labelsOnPaths->push_back(
name);
435 "The unknown module label \"" << moduleLabel <<
436 "\" appears in " << pathType <<
" \"" << pathName <<
437 "\"\n please check spelling or remove that label from the path.";
445 std::vector<std::string> allowed_filters = proc_pset.
getUntrackedParameter<vstring>(
"@filters_on_endpaths");
450 <<
"The EDFilter '" << worker->
description().
moduleName() <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" << pathName <<
"'.\n" 451 <<
"The return value of the filter will be ignored.\n" 452 <<
"To suppress this warning, either remove the filter from the endpath,\n" 453 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
456 tmpworkers.emplace_back(worker, filterAction, placeInPath);
460 out.swap(tmpworkers);
466 std::shared_ptr<ProcessConfiguration const> processConfiguration,
468 vstring* labelsOnTriggerPaths) {
469 using std::placeholders::_1;
472 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, labelsOnTriggerPaths);
474 for (PathWorkers::iterator wi(tmpworkers.begin()),
475 we(tmpworkers.end()); wi != we; ++wi) {
476 holder.push_back(wi->getWorker());
480 if (!tmpworkers.empty()) {
492 std::shared_ptr<ProcessConfiguration const> processConfiguration,
494 using std::placeholders::_1;
496 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, 0);
499 for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
500 holder.push_back(wi->getWorker());
503 if (!tmpworkers.empty()) {
522 if (worker->description().moduleLabel() == iLabel) {
527 if (
nullptr == found) {
535 std::vector<ModuleDescription const*>
537 std::vector<ModuleDescription const*>
result;
567 [iTask,&ep, &es,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 571 std::exception_ptr ptr;
584 it != itEnd; ++ it) {
595 std::rethrow_exception(iExcept);
603 iExcept = std::exception_ptr();
605 iExcept = std::current_exception();
609 iExcept = std::current_exception();
614 if((not iExcept) and
results_->accept()) {
626 ex.
addContext(
"Calling produce method for module TriggerResultInserter");
627 std::ostringstream ost;
628 ost <<
"Processing " << ep.
id();
630 iExcept = std::current_exception();
633 iExcept = std::current_exception();
643 [iWait,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 647 std::exception_ptr ptr;
673 std::rethrow_exception(iExcept);
676 bool const cleaningUpAfterException =
false;
682 iExcept = std::current_exception();
692 iExcept = std::current_exception();
708 std::back_inserter(oLabelsToFill),
709 std::bind(&
Path::name, std::placeholders::_1));
724 std::vector<std::string>& oLabelsToFill)
const {
725 TrigPaths::const_iterator itFound =
728 std::bind(std::equal_to<std::string>(),
730 std::bind(&
Path::name, std::placeholders::_1)));
732 oLabelsToFill.reserve(itFound->size());
733 for (
size_t i = 0;
i < itFound->size(); ++
i) {
734 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
741 std::vector<ModuleDescription const*>& descriptions,
742 unsigned int hint)
const {
743 descriptions.clear();
745 TrigPaths::const_iterator itFound;
749 if(itFound->name() == iPathLabel) found =
true;
755 std::bind(std::equal_to<std::string>(),
757 std::bind(&
Path::name, std::placeholders::_1)));
761 descriptions.reserve(itFound->size());
762 for (
size_t i = 0;
i < itFound->size(); ++
i) {
763 descriptions.push_back(itFound->getWorker(
i)->descPtr());
770 std::vector<ModuleDescription const*>& descriptions,
771 unsigned int hint)
const {
772 descriptions.clear();
774 TrigPaths::const_iterator itFound;
778 if(itFound->name() == iEndPathLabel) found =
true;
784 std::bind(std::equal_to<std::string>(),
786 std::bind(&
Path::name, std::placeholders::_1)));
787 if (itFound !=
end_paths_.end()) found =
true;
790 descriptions.reserve(itFound->size());
791 for (
size_t i = 0;
i < itFound->size(); ++
i) {
792 descriptions.push_back(itFound->getWorker(
i)->descPtr());
829 std::vector<ModuleInPathSummary>
temp(sz);
830 for (
size_t i = 0;
i != sz; ++
i) {
836 for (
size_t i = 0;
i != sz; ++
i) {
871 using std::placeholders::_1;
897 ++(earlyDeleteBranchToCount_[
index].count);
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > Workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
void endStream(StreamID iID, StreamContext &streamContext)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
virtual void replaceModuleFor(Worker *) const =0
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
void addToAllWorkers(Worker *w)
vstring end_path_name_list_
exception_actions::ActionCodes find(const std::string &category) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
ServiceToken presentToken() const
std::string const & moduleLabel() const
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
def unique(seq, keepstr=True)
StreamContext streamContext_
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
bool getMapped(key_type const &k, value_type &result) const
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
vstring empty_trig_path_names_
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::atomic< bool > skippingEvent_
std::string const & name() const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
std::vector< ModuleInPathSummary > moduleInPathSummaries
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)