47 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
48 void transform_into(InputIterator
begin, InputIterator
end, ForwardIterator
out, Func
func) {
58 template <
typename FROM,
typename TO,
typename FUNC>
59 void fill_summary(FROM
const& from, TO&
to, FUNC func) {
60 if (to.size() != from.size()) {
62 transform_into(from.begin(), from.end(),
temp.begin(),
func);
65 transform_into(from.begin(), from.end(), to.begin(),
func);
75 std::shared_ptr<ActivityRegistry> areg,
76 std::shared_ptr<TriggerResultInserter> inserter) {
79 ptr->setActivityRegistry(areg);
84 ProductRegistry
const& preg,
85 std::multimap<std::string, Worker*>& branchToReadingWorker) {
87 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly");
88 if (not vBranchesToDeleteEarly.empty()) {
89 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
90 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
91 vBranchesToDeleteEarly.end());
94 auto allBranchNames = preg.allBranchNames();
96 for (
auto&
b : allBranchNames) {
97 b.resize(
b.size() - 1);
99 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
100 std::vector<std::string>
temp;
101 temp.reserve(vBranchesToDeleteEarly.size());
103 std::set_intersection(vBranchesToDeleteEarly.begin(),
104 vBranchesToDeleteEarly.end(),
105 allBranchNames.begin(),
106 allBranchNames.end(),
107 std::back_inserter(temp));
108 vBranchesToDeleteEarly.swap(temp);
109 if (temp.size() != vBranchesToDeleteEarly.size()) {
110 std::vector<std::string> missingProducts;
111 std::set_difference(temp.begin(),
113 vBranchesToDeleteEarly.begin(),
114 vBranchesToDeleteEarly.end(),
115 std::back_inserter(missingProducts));
116 LogInfo
l(
"MissingProductsForCanDeleteEarly");
117 l <<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118 for (
auto const&
n : missingProducts) {
124 for (
auto const&
branch : vBranchesToDeleteEarly) {
125 branchToReadingWorker.insert(std::make_pair(
branch, static_cast<Worker*>(
nullptr)));
133 typedef std::vector<std::string>
vstring;
138 std::shared_ptr<TriggerResultInserter> inserter,
140 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
141 std::shared_ptr<ModuleRegistry> modReg,
148 std::shared_ptr<ActivityRegistry> areg,
149 std::shared_ptr<ProcessConfiguration> processConfiguration,
150 bool allowEarlyDelete,
153 : workerManager_(modReg, areg, actions),
161 number_of_unscheduled_modules_(0),
163 streamContext_(streamID_, processContext),
164 endpathsAreActive_(
true),
165 skippingEvent_(
false) {
167 bool hasPath =
false;
168 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
169 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
173 for (
auto const& trig_name : pathNames) {
174 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
181 inserter->setTrigResultForStream(streamID.
value(),
results());
190 for (
auto const& end_path_name : endPathNames) {
191 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
198 std::set<std::string> usedWorkerLabels;
200 usedWorkerLabels.insert(worker->description().moduleLabel());
202 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string>>(
"@all_modules"));
203 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204 std::vector<std::string> unusedLabels;
205 set_difference(modulesInConfigSet.begin(),
206 modulesInConfigSet.end(),
207 usedWorkerLabels.begin(),
208 usedWorkerLabels.end(),
209 back_inserter(unusedLabels));
210 std::set<std::string> unscheduledLabels;
211 std::vector<std::string> shouldBeUsedLabels;
212 if (!unusedLabels.empty()) {
217 for (
auto const&
label : unusedLabels) {
221 assert(modulePSet !=
nullptr);
223 *modulePSet, preg, &prealloc, processConfiguration,
label, unscheduledLabels, shouldBeUsedLabels);
225 if (!shouldBeUsedLabels.empty()) {
226 std::ostringstream unusedStream;
227 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
228 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229 itLabelEnd = shouldBeUsedLabels.end();
230 itLabel != itLabelEnd;
232 unusedStream <<
",'" << *itLabel <<
"'";
234 LogInfo(
"path") <<
"The following module labels are not assigned to any path:\n" << unusedStream.str() <<
"\n";
246 bool allowEarlyDelete) {
249 if (not allowEarlyDelete)
254 std::multimap<std::string, Worker*> branchToReadingWorker;
255 initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
258 if (branchToReadingWorker.empty()) {
261 const std::vector<std::string> kEmpty;
262 std::map<Worker*, unsigned int> reserveSizeForWorker;
263 unsigned int upperLimitOnReadingWorker = 0;
264 unsigned int upperLimitOnIndicies = 0;
265 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
271 if (!branchToReadingWorker.empty()) {
275 for (
auto const& item : kept[
InEvent]) {
279 --nUniqueBranchesToDelete;
280 branchToReadingWorker.erase(
found.first,
found.second);
287 if (branchToReadingWorker.empty()) {
294 if (
nullptr !=
pset) {
295 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet", kEmpty);
296 if (not branches.empty()) {
297 ++upperLimitOnReadingWorker;
299 for (
auto const&
branch : branches) {
300 auto found = branchToReadingWorker.equal_range(
branch);
302 ++upperLimitOnIndicies;
303 ++reserveSizeForWorker[
w];
304 if (
nullptr ==
found.first->second) {
307 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
314 auto it = branchToReadingWorker.begin();
315 std::vector<std::string> unusedBranches;
316 while (it != branchToReadingWorker.end()) {
317 if (it->second ==
nullptr) {
318 unusedBranches.push_back(it->first);
322 branchToReadingWorker.erase(temp);
327 if (not unusedBranches.empty()) {
329 l <<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n" 330 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331 for (
auto const&
n : unusedBranches) {
336 if (!branchToReadingWorker.empty()) {
340 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
342 size_t nextOpenIndex = 0;
344 for (
auto& branchAndWorker : branchToReadingWorker) {
345 if (lastBranchName != branchAndWorker.first) {
347 BranchID bid(branchAndWorker.first +
".");
349 lastBranchName = branchAndWorker.first;
351 auto found = alreadySeenWorkers.find(branchAndWorker.second);
352 if (alreadySeenWorkers.end() ==
found) {
357 size_t index = nextOpenIndex;
358 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
362 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(
earlyDeleteHelpers_.back())));
363 nextOpenIndex += nIndices;
373 if (itLast->end() != it->begin()) {
375 unsigned int delta = it->begin() - itLast->end();
376 it->shiftIndexPointers(delta);
390 p.setEarlyDeleteHelpers(alreadySeenWorkers);
393 p.setEarlyDeleteHelpers(alreadySeenWorkers);
402 std::shared_ptr<ProcessConfiguration const> processConfiguration,
406 std::vector<std::string>
const& endPathNames) {
407 vstring modnames = proc_pset.
getParameter<vstring>(pathName);
410 unsigned int placeInPath = 0;
411 for (
auto const&
name : modnames) {
415 else if (
name[0] ==
'-')
420 moduleLabel.erase(0, 1);
424 if (modpset ==
nullptr) {
430 <<
"The unknown module label \"" << moduleLabel <<
"\" appears in " << pathType <<
" \"" << pathName
431 <<
"\"\n please check spelling or remove that label from the path.";
439 std::vector<std::string> allowed_filters = proc_pset.
getUntrackedParameter<vstring>(
"@filters_on_endpaths");
444 <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" 445 << pathName <<
"'.\n" 446 <<
"The return value of the filter will be ignored.\n" 447 <<
"To suppress this warning, either remove the filter from the endpath,\n" 448 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
451 tmpworkers.emplace_back(worker, filterAction, placeInPath);
455 out.swap(tmpworkers);
461 std::shared_ptr<ProcessConfiguration const> processConfiguration,
465 std::vector<std::string>
const& endPathNames) {
467 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, endPathNames);
470 if (!tmpworkers.empty()) {
491 std::shared_ptr<ProcessConfiguration const> processConfiguration,
494 std::vector<std::string>
const& endPathNames) {
496 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, endPathNames);
498 if (!tmpworkers.empty()) {
524 if (worker->description().moduleLabel() == iLabel) {
529 if (
nullptr == found) {
538 std::vector<ModuleDescription const*>
result;
566 results_->at(empty_trig_path) = hltPathStatus;
567 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
592 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(
nullptr);
593 auto pathErrorPtr = pathErrorHolder.get();
595 tbb::task::allocate_root(),
596 [iTask,
this, serviceToken, pathError =
std::move(pathErrorHolder)](std::exception_ptr
const* iPtr)
mutable {
599 std::exception_ptr ptr;
600 if (pathError->load()) {
601 ptr = *pathError->load();
602 delete pathError->load();
604 if ((not ptr) and iPtr) {
615 tbb::task::allocate_root(),
616 [allPathsHolder, pathErrorPtr, &ep, &es,
this, serviceToken](std::exception_ptr
const* iPtr)
mutable {
622 pathErrorPtr->store(
new std::exception_ptr(*iPtr));
655 std::rethrow_exception(*(iExcept.load()));
662 *(iExcept.load()) = std::exception_ptr();
664 *(iExcept.load()) = std::current_exception();
667 *(iExcept.load()) = std::current_exception();
671 if ((not iExcept) and
results_->accept()) {
686 std::ostringstream ost;
687 ost <<
"Processing Event " << ep.
id();
690 iExcept.store(
new std::exception_ptr(std::current_exception()));
694 iExcept.store(
new std::exception_ptr(std::current_exception()));
698 std::exception_ptr ptr;
700 ptr = *iExcept.load();
713 bool const cleaningUpAfterException =
false;
719 iExcept = std::current_exception();
729 iExcept = std::current_exception();
743 std::back_inserter(oLabelsToFill),
744 std::bind(&
Path::name, std::placeholders::_1));
748 TrigPaths::const_iterator itFound = std::find_if(
751 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
753 oLabelsToFill.reserve(itFound->size());
754 for (
size_t i = 0;
i < itFound->size(); ++
i) {
755 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
761 std::vector<ModuleDescription const*>& descriptions,
762 unsigned int hint)
const {
763 descriptions.clear();
765 TrigPaths::const_iterator itFound;
769 if (itFound->name() == iPathLabel)
774 itFound = std::find_if(
777 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
782 descriptions.reserve(itFound->size());
783 for (
size_t i = 0;
i < itFound->size(); ++
i) {
784 descriptions.push_back(itFound->getWorker(
i)->descPtr());
790 std::vector<ModuleDescription const*>& descriptions,
791 unsigned int hint)
const {
792 descriptions.clear();
794 TrigPaths::const_iterator itFound;
798 if (itFound->name() == iEndPathLabel)
803 itFound = std::find_if(
806 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
811 descriptions.reserve(itFound->size());
812 for (
size_t i = 0;
i < itFound->size(); ++
i) {
813 descriptions.push_back(itFound->getWorker(
i)->descPtr());
840 std::vector<ModuleInPathSummary>
temp(sz);
841 for (
size_t i = 0;
i != sz; ++
i) {
847 for (
size_t i = 0;
i != sz; ++
i) {
875 using std::placeholders::_1;
896 ++(earlyDeleteBranchToCount_[
index].count);
905 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
908 unsigned int indexEmpty = 0;
909 unsigned int indexOfPath = 0;
910 for (
auto& pathStatusInserter : pathStatusInserters) {
911 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
915 workerPtr->setActivityRegistry(
actReg_);
924 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
933 for (
auto& endPathStatusInserter : endPathStatusInserters) {
934 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
938 workerPtr->setActivityRegistry(
actReg_);
947 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr, workerPtr.get());
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
void setupOnDemandSystem(Principal &principal, EventSetupImpl const &es)
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
T & get_underlying(propagate_const< T > &)
def unique(seq, keepstr=True)
StreamContext streamContext_
std::list< std::string > const & context() const
static void fillPathSummary(Path const &path, PathSummary &sum)
bool getMapped(key_type const &k, value_type &result) const
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetupImpl const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
unsigned int value() const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetupImpl const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
static Registry * instance()
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)