27 #include "boost/bind.hpp"
28 #include "boost/ref.hpp"
46 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
48 transform_into(InputIterator
begin, InputIterator
end,
49 ForwardIterator
out, Func
func) {
58 template <
typename FROM,
typename TO,
typename FUNC>
60 fill_summary(FROM
const& from, TO&
to, FUNC func) {
61 if(to.size()!=from.size()) {
63 transform_into(from.begin(), from.end(),
temp.begin(),
func);
66 transform_into(from.begin(), from.end(), to.begin(),
func);
76 makeInserter(ExceptionToActionTable
const&
actions,
77 boost::shared_ptr<ActivityRegistry>
areg,
78 TriggerResultInserter* inserter) {
80 ptr->setActivityRegistry(areg);
84 bool binary_search_string(std::vector<std::string>
const&
v,
std::string const&
s) {
85 return std::binary_search(v.begin(), v.end(),
s);
90 ProductRegistry
const&
preg,
91 std::multimap<std::string,Worker*>& branchToReadingWorker)
94 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly",std::vector<std::string>());
95 if(not vBranchesToDeleteEarly.empty()) {
96 std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
97 vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
98 vBranchesToDeleteEarly.end());
101 auto allBranchNames =
preg.allBranchNames();
103 for(
auto &
b:allBranchNames) {
104 b.resize(
b.size()-1);
106 std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
107 std::vector<std::string>
temp;
108 temp.reserve(vBranchesToDeleteEarly.size());
110 std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
111 allBranchNames.begin(),allBranchNames.end(),
112 std::back_inserter(temp));
113 vBranchesToDeleteEarly.swap(temp);
114 if(temp.size() != vBranchesToDeleteEarly.size()) {
115 std::vector<std::string> missingProducts;
116 std::set_difference(temp.begin(),temp.end(),
117 vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
118 std::back_inserter(missingProducts));
119 LogInfo
l(
"MissingProductsForCanDeleteEarly");
120 l<<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
121 for(
auto const&
n:missingProducts){
127 for(
auto const& branch:vBranchesToDeleteEarly) {
128 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(
nullptr)));
136 typedef std::vector<std::string>
vstring;
141 boost::shared_ptr<ModuleRegistry> modReg,
148 boost::shared_ptr<ActivityRegistry> areg,
150 bool allowEarlyDelete,
153 workerManager_(modReg,areg, actions),
155 trig_name_list_(tns.getTrigPaths()),
156 end_path_name_list_(tns.getEndPaths()),
164 number_of_unscheduled_modules_(0),
166 streamContext_(streamID_, processContext),
171 bool hasPath =
false;
177 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results_, &labelsOnTriggerPaths);
194 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
199 std::set<std::string> usedWorkerLabels;
201 usedWorkerLabels.insert(worker->description().moduleLabel());
203 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string> >(
"@all_modules"));
204 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
205 std::vector<std::string> unusedLabels;
206 set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
207 usedWorkerLabels.begin(), usedWorkerLabels.end(),
208 back_inserter(unusedLabels));
211 std::set<std::string> unscheduledLabels;
212 std::vector<std::string> shouldBeUsedLabels;
213 if (!unusedLabels.empty()) {
218 for (
auto const&
label : unusedLabels) {
219 if (allowUnscheduled) {
223 assert(modulePSet !=
nullptr);
227 shouldBeUsedLabels.push_back(
label);
230 if (!shouldBeUsedLabels.empty()) {
231 std::ostringstream unusedStream;
232 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
233 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
234 itLabelEnd = shouldBeUsedLabels.end();
235 itLabel != itLabelEnd;
237 unusedStream <<
",'" << *itLabel <<
"'";
240 <<
"The following module labels are not assigned to any path:\n"
241 << unusedStream.str()
245 if (!unscheduledLabels.empty()) {
258 bool allowEarlyDelete) {
261 if(not allowEarlyDelete)
return;
265 std::multimap<std::string,Worker*> branchToReadingWorker;
266 initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
269 if(branchToReadingWorker.size()==0) {
272 const std::vector<std::string> kEmpty;
273 std::map<Worker*,unsigned int> reserveSizeForWorker;
274 unsigned int upperLimitOnReadingWorker =0;
275 unsigned int upperLimitOnIndicies = 0;
276 unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
279 modReg.
forAllModuleHolders([
this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
280 auto comm = iHolder->createOutputModuleCommunicator();
282 if(branchToReadingWorker.size()>0) {
286 for(
auto const& item: kept[
InEvent]) {
287 auto found = branchToReadingWorker.equal_range(item->branchName());
289 --nUniqueBranchesToDelete;
290 branchToReadingWorker.erase(
found.first,
found.second);
297 if(branchToReadingWorker.size()==0) {
305 auto branches = pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet",kEmpty);
306 if(not branches.empty()) {
307 ++upperLimitOnReadingWorker;
309 for(
auto const& branch:branches){
310 auto found = branchToReadingWorker.equal_range(branch);
312 ++upperLimitOnIndicies;
313 ++reserveSizeForWorker[
w];
314 if(
nullptr ==
found.first->second) {
317 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
324 auto it = branchToReadingWorker.begin();
325 std::vector<std::string> unusedBranches;
326 while(it !=branchToReadingWorker.end()) {
327 if(it->second ==
nullptr) {
328 unusedBranches.push_back(it->first);
332 branchToReadingWorker.erase(temp);
337 if(not unusedBranches.empty()) {
339 l<<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
340 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
341 for(
auto const&
n:unusedBranches){
346 if(0!=branchToReadingWorker.size()) {
350 std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
352 size_t nextOpenIndex = 0;
354 for(
auto& branchAndWorker:branchToReadingWorker) {
355 if(lastBranchName != branchAndWorker.first) {
357 BranchID bid(branchAndWorker.first+
".");
359 lastBranchName = branchAndWorker.first;
361 auto found = alreadySeenWorkers.find(branchAndWorker.second);
362 if(alreadySeenWorkers.end() ==
found) {
367 size_t index = nextOpenIndex;
368 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
371 beginAddress+index+1,
374 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(
earlyDeleteHelpers_.back())));
375 nextOpenIndex +=nIndices;
385 if(itLast->end() != it->begin()) {
387 unsigned int delta = it->begin()- itLast->end();
388 it->shiftIndexPointers(delta);
391 (itLast->end()-beginAddress),
393 (it->begin()-beginAddress));
402 p.setEarlyDeleteHelpers(alreadySeenWorkers);
405 p.setEarlyDeleteHelpers(alreadySeenWorkers);
422 unsigned int placeInPath = 0;
423 for (
auto const& name : modnames) {
425 if (labelsOnPaths) labelsOnPaths->push_back(name);
442 "The unknown module label \"" << moduleLabel <<
443 "\" appears in " << pathType <<
" \"" << name <<
444 "\"\n please check spelling or remove that label from the path.";
457 <<
"The EDFilter '" << worker->
description().
moduleName() <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" << name <<
"'.\n"
458 <<
"The return value of the filter will be ignored.\n"
459 <<
"To suppress this warning, either remove the filter from the endpath,\n"
460 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
463 tmpworkers.emplace_back(worker, filterAction, placeInPath);
467 out.swap(tmpworkers);
475 vstring* labelsOnTriggerPaths) {
478 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, labelsOnTriggerPaths);
480 for (PathWorkers::iterator wi(tmpworkers.begin()),
481 we(tmpworkers.end()); wi != we; ++wi) {
482 holder.push_back(wi->getWorker());
486 if (!tmpworkers.empty()) {
504 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, 0);
507 for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
508 holder.push_back(wi->getWorker());
511 if (!tmpworkers.empty()) {
532 if (worker->description().moduleLabel() == iLabel) {
537 if (
nullptr == found) {
541 iMod->replaceModuleFor(found);
545 std::vector<ModuleDescription const*>
547 std::vector<ModuleDescription const*>
result;
562 std::back_inserter(oLabelsToFill),
568 std::vector<std::string>& oLabelsToFill)
const {
569 TrigPaths::const_iterator itFound =
572 boost::bind(std::equal_to<std::string>(),
576 oLabelsToFill.reserve(itFound->size());
577 for (
size_t i = 0;
i < itFound->size(); ++
i) {
578 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
615 std::vector<ModuleInPathSummary>
temp(sz);
616 for (
size_t i = 0;
i != sz; ++
i) {
622 for (
size_t i = 0;
i != sz; ++
i) {
676 std::vector<ModuleInPathTimingSummary>
temp(sz);
677 for (
size_t i = 0;
i != sz; ++
i) {
683 for (
size_t i = 0;
i != sz; ++
i) {
739 ++(earlyDeleteBranchToCount_[
index].second);
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > Workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void endStream(StreamID iID, StreamContext &streamContext)
int totalEventsFailed() const
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
volatile bool endpathsAreActive_
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, bool useStopwatch, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
std::string const & moduleName() const
void addToAllWorkers(Worker *w)
vstring end_path_name_list_
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::vector< WorkerSummary > workerSummaries
std::string const & moduleLabel() const
std::pair< double, double > timeCpuReal() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleInPathTimingSummary > moduleInPathSummaries
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
static void fillPathTimingSummary(Path const &path, PathTimingSummary &sum)
std::vector< WorkerInPath > PathWorkers
boost::shared_ptr< ActivityRegistry > actReg_
boost::shared_ptr< Worker > WorkerPtr
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
wantSummary_(tns.wantSummary())
EventTimingSummary eventSummary
static void fillWorkerTimingSummary(Worker const *pw, WorkerTimingSummary &sum)
static void fillModuleInPathTimingSummary(Path const &path, size_t which, ModuleInPathTimingSummary &sum)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
static void fillPathSummary(Path const &path, PathSummary &sum)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
std::vector< PathTimingSummary > trigPathSummaries
bool getMapped(key_type const &k, value_type &result) const
vstring empty_trig_path_names_
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
WorkerPtr results_inserter_
unsigned int value() const
boost::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
boost::shared_ptr< HLTGlobalStatus > TrigResPtr
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
std::pair< double, double > timeCpuReal() const
StreamSchedule(TriggerResultInserter *inserter, boost::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
void setTrigResultForStream(unsigned int iStreamIndex, const TrigResPtr &trptr)
void getTriggerTimingReport(TriggerTimingReport &rep) const
static void fillWorkerTimingSummaryAux(Worker const &w, WorkerTimingSummary &sum)
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::string const & name() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
std::vector< ModuleInPathSummary > moduleInPathSummaries
void addToAllWorkers(Worker *w, bool useStopwatch)
std::vector< WorkerTimingSummary > workerSummaries
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
std::vector< std::string > vstring
tuple size
Write out results.
static Registry * instance()
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)