47 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
49 transform_into(InputIterator
begin, InputIterator
end,
50 ForwardIterator
out, Func
func) {
59 template <
typename FROM,
typename TO,
typename FUNC>
61 fill_summary(FROM
const& from, TO&
to, FUNC func) {
62 if(to.size()!=from.size()) {
64 transform_into(from.begin(), from.end(),
temp.begin(),
func);
67 transform_into(from.begin(), from.end(), to.begin(),
func);
77 makeInserter(ExceptionToActionTable
const&
actions,
78 std::shared_ptr<ActivityRegistry> areg,
79 std::shared_ptr<TriggerResultInserter> inserter) {
81 ptr->setActivityRegistry(areg);
87 ProductRegistry
const& preg,
88 std::multimap<std::string,Worker*>& branchToReadingWorker)
91 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly");
92 if(not vBranchesToDeleteEarly.empty()) {
93 std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
94 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
95 vBranchesToDeleteEarly.end());
98 auto allBranchNames = preg.allBranchNames();
100 for(
auto &
b:allBranchNames) {
101 b.resize(
b.size()-1);
103 std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
104 std::vector<std::string>
temp;
105 temp.reserve(vBranchesToDeleteEarly.size());
107 std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
108 allBranchNames.begin(),allBranchNames.end(),
109 std::back_inserter(temp));
110 vBranchesToDeleteEarly.swap(temp);
111 if(temp.size() != vBranchesToDeleteEarly.size()) {
112 std::vector<std::string> missingProducts;
113 std::set_difference(temp.begin(),temp.end(),
114 vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
115 std::back_inserter(missingProducts));
116 LogInfo
l(
"MissingProductsForCanDeleteEarly");
117 l<<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118 for(
auto const&
n:missingProducts){
124 for(
auto const&
branch:vBranchesToDeleteEarly) {
125 branchToReadingWorker.insert(std::make_pair(
branch, static_cast<Worker*>(
nullptr)));
133 typedef std::vector<std::string>
vstring;
139 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
140 std::shared_ptr<ModuleRegistry> modReg,
147 std::shared_ptr<ActivityRegistry> areg,
148 std::shared_ptr<ProcessConfiguration> processConfiguration,
149 bool allowEarlyDelete,
152 workerManager_(modReg,areg, actions),
160 number_of_unscheduled_modules_(0),
162 streamContext_(streamID_, processContext),
163 endpathsAreActive_(
true),
164 skippingEvent_(
false){
167 bool hasPath =
false;
168 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
169 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
173 for (
auto const& trig_name : pathNames) {
174 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
181 inserter->setTrigResultForStream(streamID.
value(),
results());
190 for (
auto const& end_path_name : endPathNames) {
191 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
198 std::set<std::string> usedWorkerLabels;
200 usedWorkerLabels.insert(worker->description().moduleLabel());
202 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string> >(
"@all_modules"));
203 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204 std::vector<std::string> unusedLabels;
205 set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
206 usedWorkerLabels.begin(), usedWorkerLabels.end(),
207 back_inserter(unusedLabels));
208 std::set<std::string> unscheduledLabels;
209 std::vector<std::string> shouldBeUsedLabels;
210 if (!unusedLabels.empty()) {
215 for (
auto const&
label : unusedLabels) {
219 assert(modulePSet !=
nullptr);
222 if (!shouldBeUsedLabels.empty()) {
223 std::ostringstream unusedStream;
224 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
225 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
226 itLabelEnd = shouldBeUsedLabels.end();
227 itLabel != itLabelEnd;
229 unusedStream <<
",'" << *itLabel <<
"'";
232 <<
"The following module labels are not assigned to any path:\n" 233 << unusedStream.str()
237 if (!unscheduledLabels.empty()) {
250 bool allowEarlyDelete) {
253 if(not allowEarlyDelete)
return;
257 std::multimap<std::string,Worker*> branchToReadingWorker;
258 initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
261 if(branchToReadingWorker.empty()) {
264 const std::vector<std::string> kEmpty;
265 std::map<Worker*,unsigned int> reserveSizeForWorker;
266 unsigned int upperLimitOnReadingWorker =0;
267 unsigned int upperLimitOnIndicies = 0;
268 unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
274 if(!branchToReadingWorker.empty()) {
278 for(
auto const& item: kept[
InEvent]) {
282 --nUniqueBranchesToDelete;
283 branchToReadingWorker.erase(
found.first,
found.second);
290 if(branchToReadingWorker.empty()) {
298 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet",kEmpty);
299 if(not branches.empty()) {
300 ++upperLimitOnReadingWorker;
302 for(
auto const&
branch:branches){
303 auto found = branchToReadingWorker.equal_range(
branch);
305 ++upperLimitOnIndicies;
306 ++reserveSizeForWorker[
w];
307 if(
nullptr ==
found.first->second) {
310 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
317 auto it = branchToReadingWorker.begin();
318 std::vector<std::string> unusedBranches;
319 while(it !=branchToReadingWorker.end()) {
320 if(it->second ==
nullptr) {
321 unusedBranches.push_back(it->first);
325 branchToReadingWorker.erase(temp);
330 if(not unusedBranches.empty()) {
332 l<<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n" 333 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
334 for(
auto const&
n:unusedBranches){
339 if(!branchToReadingWorker.empty()) {
343 std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
345 size_t nextOpenIndex = 0;
347 for(
auto& branchAndWorker:branchToReadingWorker) {
348 if(lastBranchName != branchAndWorker.first) {
350 BranchID bid(branchAndWorker.first+
".");
352 lastBranchName = branchAndWorker.first;
354 auto found = alreadySeenWorkers.find(branchAndWorker.second);
355 if(alreadySeenWorkers.end() ==
found) {
360 size_t index = nextOpenIndex;
361 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
364 beginAddress+index+1,
367 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(
earlyDeleteHelpers_.back())));
368 nextOpenIndex +=nIndices;
378 if(itLast->end() != it->begin()) {
380 unsigned int delta = it->begin()- itLast->end();
381 it->shiftIndexPointers(delta);
384 (itLast->end()-beginAddress),
386 (it->begin()-beginAddress));
395 p.setEarlyDeleteHelpers(alreadySeenWorkers);
398 p.setEarlyDeleteHelpers(alreadySeenWorkers);
407 std::shared_ptr<ProcessConfiguration const> processConfiguration,
411 std::vector<std::string>
const& endPathNames) {
412 vstring modnames = proc_pset.
getParameter<vstring>(pathName);
415 unsigned int placeInPath = 0;
416 for (
auto const&
name : modnames) {
427 if (modpset ==
nullptr) {
433 "The unknown module label \"" << moduleLabel <<
434 "\" appears in " << pathType <<
" \"" << pathName <<
435 "\"\n please check spelling or remove that label from the path.";
443 std::vector<std::string> allowed_filters = proc_pset.
getUntrackedParameter<vstring>(
"@filters_on_endpaths");
448 <<
"The EDFilter '" << worker->
description().
moduleName() <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" << pathName <<
"'.\n" 449 <<
"The return value of the filter will be ignored.\n" 450 <<
"To suppress this warning, either remove the filter from the endpath,\n" 451 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
454 tmpworkers.emplace_back(worker, filterAction, placeInPath);
458 out.swap(tmpworkers);
464 std::shared_ptr<ProcessConfiguration const> processConfiguration,
466 std::vector<std::string>
const& endPathNames) {
468 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, endPathNames);
471 if (!tmpworkers.empty()) {
484 std::shared_ptr<ProcessConfiguration const> processConfiguration,
486 std::vector<std::string>
const& endPathNames) {
488 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, endPathNames);
490 if (!tmpworkers.empty()) {
513 if (worker->description().moduleLabel() == iLabel) {
518 if (
nullptr == found) {
526 std::vector<ModuleDescription const*>
528 std::vector<ModuleDescription const*>
result;
555 results_->at(empty_trig_path) = hltPathStatus;
556 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
581 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(
nullptr);
582 auto pathErrorPtr = pathErrorHolder.get();
584 [iTask,
this,serviceToken,pathError=
std::move(pathErrorHolder)](std::exception_ptr
const* iPtr)
mutable 588 std::exception_ptr ptr;
589 if(pathError->load()) {
590 ptr = *pathError->load();
591 delete pathError->load();
593 if( (not ptr) and iPtr) {
604 [allPathsHolder,pathErrorPtr,&ep, &es,
this,serviceToken](std::exception_ptr
const* iPtr)
mutable 611 pathErrorPtr->store(
new std::exception_ptr(*iPtr) );
628 it != itEnd; ++ it) {
646 std::rethrow_exception(*(iExcept.load()));
654 *(iExcept.load()) = std::exception_ptr();
656 *(iExcept.load()) = std::current_exception();
660 *(iExcept.load()) = std::current_exception();
665 if((not iExcept) and
results_->accept()) {
681 std::ostringstream ost;
682 ost <<
"Processing Event " << ep.
id();
685 iExcept.store(
new std::exception_ptr(std::current_exception()));
690 iExcept.store(
new std::exception_ptr(std::current_exception()));
694 std::exception_ptr ptr;
696 ptr = *iExcept.load();
710 std::rethrow_exception(iExcept);
713 bool const cleaningUpAfterException =
false;
719 iExcept = std::current_exception();
729 iExcept = std::current_exception();
744 std::back_inserter(oLabelsToFill),
745 std::bind(&
Path::name, std::placeholders::_1));
750 std::vector<std::string>& oLabelsToFill)
const {
751 TrigPaths::const_iterator itFound =
754 std::bind(std::equal_to<std::string>(),
756 std::bind(&
Path::name, std::placeholders::_1)));
758 oLabelsToFill.reserve(itFound->size());
759 for (
size_t i = 0;
i < itFound->size(); ++
i) {
760 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
767 std::vector<ModuleDescription const*>& descriptions,
768 unsigned int hint)
const {
769 descriptions.clear();
771 TrigPaths::const_iterator itFound;
775 if(itFound->name() == iPathLabel) found =
true;
781 std::bind(std::equal_to<std::string>(),
783 std::bind(&
Path::name, std::placeholders::_1)));
787 descriptions.reserve(itFound->size());
788 for (
size_t i = 0;
i < itFound->size(); ++
i) {
789 descriptions.push_back(itFound->getWorker(
i)->descPtr());
796 std::vector<ModuleDescription const*>& descriptions,
797 unsigned int hint)
const {
798 descriptions.clear();
800 TrigPaths::const_iterator itFound;
804 if(itFound->name() == iEndPathLabel) found =
true;
810 std::bind(std::equal_to<std::string>(),
812 std::bind(&
Path::name, std::placeholders::_1)));
813 if (itFound !=
end_paths_.end()) found =
true;
816 descriptions.reserve(itFound->size());
817 for (
size_t i = 0;
i < itFound->size(); ++
i) {
818 descriptions.push_back(itFound->getWorker(
i)->descPtr());
855 std::vector<ModuleInPathSummary>
temp(sz);
856 for (
size_t i = 0;
i != sz; ++
i) {
862 for (
size_t i = 0;
i != sz; ++
i) {
896 using std::placeholders::_1;
922 ++(earlyDeleteBranchToCount_[
index].count);
932 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
936 unsigned int indexEmpty = 0;
937 unsigned int indexOfPath = 0;
938 for(
auto & pathStatusInserter : pathStatusInserters) {
939 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
941 inserterPtr->moduleDescription(),
944 workerPtr->setActivityRegistry(
actReg_);
953 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
963 for(
auto & endPathStatusInserter : endPathStatusInserters) {
964 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
966 inserterPtr->moduleDescription(),
969 workerPtr->setActivityRegistry(
actReg_);
978 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr,
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
T & get_underlying(propagate_const< T > &)
def unique(seq, keepstr=True)
StreamContext streamContext_
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
bool getMapped(key_type const &k, value_type &result) const
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
TrigResConstPtr results() const
std::vector< std::string > vstring
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)