43 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
45 transform_into(InputIterator
begin, InputIterator
end,
46 ForwardIterator
out, Func
func) {
55 template <
typename FROM,
typename TO,
typename FUNC>
57 fill_summary(FROM
const& from, TO&
to, FUNC func) {
58 if(to.size()!=from.size()) {
60 transform_into(from.begin(), from.end(),
temp.begin(),
func);
63 transform_into(from.begin(), from.end(), to.begin(),
func);
73 makeInserter(ExceptionToActionTable
const&
actions,
74 std::shared_ptr<ActivityRegistry> areg,
75 std::shared_ptr<TriggerResultInserter> inserter) {
77 ptr->setActivityRegistry(areg);
83 ProductRegistry
const& preg,
84 std::multimap<std::string,Worker*>& branchToReadingWorker)
87 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly",std::vector<std::string>());
88 if(not vBranchesToDeleteEarly.empty()) {
89 std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
90 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
91 vBranchesToDeleteEarly.end());
94 auto allBranchNames = preg.allBranchNames();
96 for(
auto &
b:allBranchNames) {
99 std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
100 std::vector<std::string>
temp;
101 temp.reserve(vBranchesToDeleteEarly.size());
103 std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
104 allBranchNames.begin(),allBranchNames.end(),
105 std::back_inserter(temp));
106 vBranchesToDeleteEarly.swap(temp);
107 if(temp.size() != vBranchesToDeleteEarly.size()) {
108 std::vector<std::string> missingProducts;
109 std::set_difference(temp.begin(),temp.end(),
110 vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
111 std::back_inserter(missingProducts));
112 LogInfo
l(
"MissingProductsForCanDeleteEarly");
113 l<<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
114 for(
auto const&
n:missingProducts){
120 for(
auto const& branch:vBranchesToDeleteEarly) {
121 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(
nullptr)));
129 typedef std::vector<std::string>
vstring;
134 std::shared_ptr<ModuleRegistry> modReg,
141 std::shared_ptr<ActivityRegistry> areg,
142 std::shared_ptr<ProcessConfiguration> processConfiguration,
143 bool allowEarlyDelete,
146 workerManager_(modReg,areg, actions),
148 trig_name_list_(tns.getTrigPaths()),
149 end_path_name_list_(tns.getEndPaths()),
156 number_of_unscheduled_modules_(0),
158 streamContext_(streamID_, processContext),
159 endpathsAreActive_(
true),
160 skippingEvent_(
false){
163 bool hasPath =
false;
167 vstring labelsOnTriggerPaths;
169 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), &labelsOnTriggerPaths);
176 inserter->setTrigResultForStream(streamID.
value(),
results());
186 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
191 std::set<std::string> usedWorkerLabels;
193 usedWorkerLabels.insert(worker->description().moduleLabel());
195 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string> >(
"@all_modules"));
196 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
197 std::vector<std::string> unusedLabels;
198 set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
199 usedWorkerLabels.begin(), usedWorkerLabels.end(),
200 back_inserter(unusedLabels));
201 std::set<std::string> unscheduledLabels;
202 std::vector<std::string> shouldBeUsedLabels;
203 if (!unusedLabels.empty()) {
208 for (
auto const&
label : unusedLabels) {
212 assert(modulePSet !=
nullptr);
215 if (!shouldBeUsedLabels.empty()) {
216 std::ostringstream unusedStream;
217 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
218 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
219 itLabelEnd = shouldBeUsedLabels.end();
220 itLabel != itLabelEnd;
222 unusedStream <<
",'" << *itLabel <<
"'";
225 <<
"The following module labels are not assigned to any path:\n" 226 << unusedStream.str()
230 if (!unscheduledLabels.empty()) {
243 bool allowEarlyDelete) {
246 if(not allowEarlyDelete)
return;
250 std::multimap<std::string,Worker*> branchToReadingWorker;
251 initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
254 if(branchToReadingWorker.size()==0) {
257 const std::vector<std::string> kEmpty;
258 std::map<Worker*,unsigned int> reserveSizeForWorker;
259 unsigned int upperLimitOnReadingWorker =0;
260 unsigned int upperLimitOnIndicies = 0;
261 unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
267 if(branchToReadingWorker.size()>0) {
271 for(
auto const& item: kept[
InEvent]) {
275 --nUniqueBranchesToDelete;
276 branchToReadingWorker.erase(
found.first,
found.second);
283 if(branchToReadingWorker.size()==0) {
291 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet",kEmpty);
292 if(not branches.empty()) {
293 ++upperLimitOnReadingWorker;
295 for(
auto const& branch:branches){
296 auto found = branchToReadingWorker.equal_range(branch);
298 ++upperLimitOnIndicies;
299 ++reserveSizeForWorker[
w];
300 if(
nullptr ==
found.first->second) {
303 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
310 auto it = branchToReadingWorker.begin();
311 std::vector<std::string> unusedBranches;
312 while(it !=branchToReadingWorker.end()) {
313 if(it->second ==
nullptr) {
314 unusedBranches.push_back(it->first);
318 branchToReadingWorker.erase(temp);
323 if(not unusedBranches.empty()) {
325 l<<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n" 326 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
327 for(
auto const&
n:unusedBranches){
332 if(0!=branchToReadingWorker.size()) {
336 std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
338 size_t nextOpenIndex = 0;
340 for(
auto& branchAndWorker:branchToReadingWorker) {
341 if(lastBranchName != branchAndWorker.first) {
343 BranchID bid(branchAndWorker.first+
".");
345 lastBranchName = branchAndWorker.first;
347 auto found = alreadySeenWorkers.find(branchAndWorker.second);
348 if(alreadySeenWorkers.end() ==
found) {
353 size_t index = nextOpenIndex;
354 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
357 beginAddress+index+1,
360 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(
earlyDeleteHelpers_.back())));
361 nextOpenIndex +=nIndices;
371 if(itLast->end() != it->begin()) {
373 unsigned int delta = it->begin()- itLast->end();
374 it->shiftIndexPointers(delta);
377 (itLast->end()-beginAddress),
379 (it->begin()-beginAddress));
388 p.setEarlyDeleteHelpers(alreadySeenWorkers);
391 p.setEarlyDeleteHelpers(alreadySeenWorkers);
400 std::shared_ptr<ProcessConfiguration const> processConfiguration,
404 vstring* labelsOnPaths) {
405 vstring modnames = proc_pset.
getParameter<vstring>(pathName);
408 unsigned int placeInPath = 0;
409 for (
auto const&
name : modnames) {
411 if (labelsOnPaths) labelsOnPaths->push_back(
name);
428 "The unknown module label \"" << moduleLabel <<
429 "\" appears in " << pathType <<
" \"" << pathName <<
430 "\"\n please check spelling or remove that label from the path.";
438 std::vector<std::string> allowed_filters = proc_pset.
getUntrackedParameter<vstring>(
"@filters_on_endpaths");
443 <<
"The EDFilter '" << worker->
description().
moduleName() <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" << pathName <<
"'.\n" 444 <<
"The return value of the filter will be ignored.\n" 445 <<
"To suppress this warning, either remove the filter from the endpath,\n" 446 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
449 tmpworkers.emplace_back(worker, filterAction, placeInPath);
453 out.swap(tmpworkers);
459 std::shared_ptr<ProcessConfiguration const> processConfiguration,
461 vstring* labelsOnTriggerPaths) {
462 using std::placeholders::_1;
465 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, labelsOnTriggerPaths);
467 for (PathWorkers::iterator wi(tmpworkers.begin()),
468 we(tmpworkers.end()); wi != we; ++wi) {
469 holder.push_back(wi->getWorker());
473 if (!tmpworkers.empty()) {
485 std::shared_ptr<ProcessConfiguration const> processConfiguration,
487 using std::placeholders::_1;
489 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, 0);
492 for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
493 holder.push_back(wi->getWorker());
496 if (!tmpworkers.empty()) {
515 if (worker->description().moduleLabel() == iLabel) {
520 if (
nullptr == found) {
528 std::vector<ModuleDescription const*>
530 std::vector<ModuleDescription const*>
result;
560 [iTask,&ep, &es,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 564 std::exception_ptr ptr;
577 it != itEnd; ++ it) {
588 std::rethrow_exception(iExcept);
596 iExcept = std::exception_ptr();
598 iExcept = std::current_exception();
602 iExcept = std::current_exception();
607 if((not iExcept) and
results_->accept()) {
620 std::ostringstream ost;
621 ost <<
"Processing Event " << ep.
id();
624 iExcept = std::current_exception();
627 iExcept = std::current_exception();
637 [iWait,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 641 std::exception_ptr ptr;
667 std::rethrow_exception(iExcept);
670 bool const cleaningUpAfterException =
false;
676 iExcept = std::current_exception();
686 iExcept = std::current_exception();
702 std::back_inserter(oLabelsToFill),
703 std::bind(&
Path::name, std::placeholders::_1));
718 std::vector<std::string>& oLabelsToFill)
const {
719 TrigPaths::const_iterator itFound =
722 std::bind(std::equal_to<std::string>(),
724 std::bind(&
Path::name, std::placeholders::_1)));
726 oLabelsToFill.reserve(itFound->size());
727 for (
size_t i = 0;
i < itFound->size(); ++
i) {
728 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
735 std::vector<ModuleDescription const*>& descriptions,
736 unsigned int hint)
const {
737 descriptions.clear();
739 TrigPaths::const_iterator itFound;
743 if(itFound->name() == iPathLabel) found =
true;
749 std::bind(std::equal_to<std::string>(),
751 std::bind(&
Path::name, std::placeholders::_1)));
755 descriptions.reserve(itFound->size());
756 for (
size_t i = 0;
i < itFound->size(); ++
i) {
757 descriptions.push_back(itFound->getWorker(
i)->descPtr());
764 std::vector<ModuleDescription const*>& descriptions,
765 unsigned int hint)
const {
766 descriptions.clear();
768 TrigPaths::const_iterator itFound;
772 if(itFound->name() == iEndPathLabel) found =
true;
778 std::bind(std::equal_to<std::string>(),
780 std::bind(&
Path::name, std::placeholders::_1)));
781 if (itFound !=
end_paths_.end()) found =
true;
784 descriptions.reserve(itFound->size());
785 for (
size_t i = 0;
i < itFound->size(); ++
i) {
786 descriptions.push_back(itFound->getWorker(
i)->descPtr());
823 std::vector<ModuleInPathSummary>
temp(sz);
824 for (
size_t i = 0;
i != sz; ++
i) {
830 for (
size_t i = 0;
i != sz; ++
i) {
864 using std::placeholders::_1;
890 ++(earlyDeleteBranchToCount_[
index].count);
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > Workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
void endStream(StreamID iID, StreamContext &streamContext)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
virtual void replaceModuleFor(Worker *) const =0
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
void addToAllWorkers(Worker *w)
vstring end_path_name_list_
exception_actions::ActionCodes find(const std::string &category) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
ServiceToken presentToken() const
std::string const & moduleLabel() const
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
def unique(seq, keepstr=True)
StreamContext streamContext_
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
bool getMapped(key_type const &k, value_type &result) const
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
vstring empty_trig_path_names_
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::atomic< bool > skippingEvent_
std::string const & name() const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
std::vector< ModuleInPathSummary > moduleInPathSummaries
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)