27 #include "boost/bind.hpp"
28 #include "boost/ref.hpp"
46 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
48 transform_into(InputIterator
begin, InputIterator
end,
49 ForwardIterator
out, Func func) {
50 for (; begin !=
end; ++
begin, ++
out) func(*begin, *out);
58 template <
typename FROM,
typename TO,
typename FUNC>
60 fill_summary(FROM
const& from, TO&
to, FUNC func) {
61 if(to.size()!=from.size()) {
63 transform_into(from.begin(), from.end(),
temp.begin(), func);
66 transform_into(from.begin(), from.end(), to.begin(), func);
76 makeInserter(ExceptionToActionTable
const&
actions,
77 boost::shared_ptr<ActivityRegistry>
areg,
78 TriggerResultInserter* inserter) {
80 ptr->setActivityRegistry(areg);
86 ProductRegistry
const&
preg,
87 std::multimap<std::string,Worker*>& branchToReadingWorker)
90 auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly",std::vector<std::string>());
91 if(not vBranchesToDeleteEarly.empty()) {
92 std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
93 vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
94 vBranchesToDeleteEarly.end());
97 auto allBranchNames =
preg.allBranchNames();
99 for(
auto &
b:allBranchNames) {
100 b.resize(
b.size()-1);
102 std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
103 std::vector<std::string>
temp;
104 temp.reserve(vBranchesToDeleteEarly.size());
106 std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
107 allBranchNames.begin(),allBranchNames.end(),
108 std::back_inserter(temp));
109 vBranchesToDeleteEarly.swap(temp);
110 if(temp.size() != vBranchesToDeleteEarly.size()) {
111 std::vector<std::string> missingProducts;
112 std::set_difference(temp.begin(),temp.end(),
113 vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
114 std::back_inserter(missingProducts));
115 LogInfo
l(
"MissingProductsForCanDeleteEarly");
116 l<<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
117 for(
auto const&
n:missingProducts){
123 for(
auto const& branch:vBranchesToDeleteEarly) {
124 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(
nullptr)));
132 typedef std::vector<std::string>
vstring;
137 boost::shared_ptr<ModuleRegistry> modReg,
144 boost::shared_ptr<ActivityRegistry> areg,
146 bool allowEarlyDelete,
149 workerManager_(modReg,areg, actions),
151 trig_name_list_(tns.getTrigPaths()),
152 end_path_name_list_(tns.getEndPaths()),
160 number_of_unscheduled_modules_(0),
162 streamContext_(streamID_, processContext),
167 bool hasPath =
false;
173 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results_, &labelsOnTriggerPaths);
190 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
195 std::set<std::string> usedWorkerLabels;
197 usedWorkerLabels.insert(worker->description().moduleLabel());
199 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string> >(
"@all_modules"));
200 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
201 std::vector<std::string> unusedLabels;
202 set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
203 usedWorkerLabels.begin(), usedWorkerLabels.end(),
204 back_inserter(unusedLabels));
207 std::set<std::string> unscheduledLabels;
208 std::vector<std::string> shouldBeUsedLabels;
209 if (!unusedLabels.empty()) {
214 for (
auto const&
label : unusedLabels) {
215 if (allowUnscheduled) {
219 assert(modulePSet !=
nullptr);
223 shouldBeUsedLabels.push_back(
label);
226 if (!shouldBeUsedLabels.empty()) {
227 std::ostringstream unusedStream;
228 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
229 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
230 itLabelEnd = shouldBeUsedLabels.end();
231 itLabel != itLabelEnd;
233 unusedStream <<
",'" << *itLabel <<
"'";
236 <<
"The following module labels are not assigned to any path:\n"
237 << unusedStream.str()
241 if (!unscheduledLabels.empty()) {
254 bool allowEarlyDelete) {
257 if(not allowEarlyDelete)
return;
261 std::multimap<std::string,Worker*> branchToReadingWorker;
262 initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
265 if(branchToReadingWorker.size()==0) {
268 const std::vector<std::string> kEmpty;
269 std::map<Worker*,unsigned int> reserveSizeForWorker;
270 unsigned int upperLimitOnReadingWorker =0;
271 unsigned int upperLimitOnIndicies = 0;
272 unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
278 if(branchToReadingWorker.size()>0) {
282 for(
auto const& item: kept[
InEvent]) {
283 auto found = branchToReadingWorker.equal_range(item->branchName());
285 --nUniqueBranchesToDelete;
286 branchToReadingWorker.erase(
found.first,
found.second);
293 if(branchToReadingWorker.size()==0) {
301 auto branches = pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet",kEmpty);
302 if(not branches.empty()) {
303 ++upperLimitOnReadingWorker;
305 for(
auto const& branch:branches){
306 auto found = branchToReadingWorker.equal_range(branch);
308 ++upperLimitOnIndicies;
309 ++reserveSizeForWorker[
w];
310 if(
nullptr ==
found.first->second) {
313 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
320 auto it = branchToReadingWorker.begin();
321 std::vector<std::string> unusedBranches;
322 while(it !=branchToReadingWorker.end()) {
323 if(it->second ==
nullptr) {
324 unusedBranches.push_back(it->first);
328 branchToReadingWorker.erase(temp);
333 if(not unusedBranches.empty()) {
335 l<<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
336 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
337 for(
auto const&
n:unusedBranches){
342 if(0!=branchToReadingWorker.size()) {
346 std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
348 size_t nextOpenIndex = 0;
350 for(
auto& branchAndWorker:branchToReadingWorker) {
351 if(lastBranchName != branchAndWorker.first) {
353 BranchID bid(branchAndWorker.first+
".");
355 lastBranchName = branchAndWorker.first;
357 auto found = alreadySeenWorkers.find(branchAndWorker.second);
358 if(alreadySeenWorkers.end() ==
found) {
363 size_t index = nextOpenIndex;
364 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
367 beginAddress+index+1,
370 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(
earlyDeleteHelpers_.back())));
371 nextOpenIndex +=nIndices;
381 if(itLast->end() != it->begin()) {
383 unsigned int delta = it->begin()- itLast->end();
384 it->shiftIndexPointers(delta);
387 (itLast->end()-beginAddress),
389 (it->begin()-beginAddress));
398 p.setEarlyDeleteHelpers(alreadySeenWorkers);
401 p.setEarlyDeleteHelpers(alreadySeenWorkers);
418 unsigned int placeInPath = 0;
419 for (
auto const& name : modnames) {
421 if (labelsOnPaths) labelsOnPaths->push_back(name);
438 "The unknown module label \"" << moduleLabel <<
439 "\" appears in " << pathType <<
" \"" << name <<
440 "\"\n please check spelling or remove that label from the path.";
453 <<
"The EDFilter '" << worker->
description().
moduleName() <<
"' with module label '" << moduleLabel <<
"' appears on EndPath '" << name <<
"'.\n"
454 <<
"The return value of the filter will be ignored.\n"
455 <<
"To suppress this warning, either remove the filter from the endpath,\n"
456 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
459 tmpworkers.emplace_back(worker, filterAction, placeInPath);
463 out.swap(tmpworkers);
471 vstring* labelsOnTriggerPaths) {
474 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
false, tmpworkers, labelsOnTriggerPaths);
476 for (PathWorkers::iterator wi(tmpworkers.begin()),
477 we(tmpworkers.end()); wi != we; ++wi) {
478 holder.push_back(wi->getWorker());
482 if (!tmpworkers.empty()) {
500 fillWorkers(proc_pset, preg, prealloc, processConfiguration, name,
true, tmpworkers, 0);
503 for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
504 holder.push_back(wi->getWorker());
507 if (!tmpworkers.empty()) {
528 if (worker->description().moduleLabel() == iLabel) {
533 if (
nullptr == found) {
541 std::vector<ModuleDescription const*>
543 std::vector<ModuleDescription const*>
result;
558 std::back_inserter(oLabelsToFill),
564 std::vector<std::string>& oLabelsToFill)
const {
565 TrigPaths::const_iterator itFound =
568 boost::bind(std::equal_to<std::string>(),
572 oLabelsToFill.reserve(itFound->size());
573 for (
size_t i = 0;
i < itFound->size(); ++
i) {
574 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
611 std::vector<ModuleInPathSummary>
temp(sz);
612 for (
size_t i = 0;
i != sz; ++
i) {
618 for (
size_t i = 0;
i != sz; ++
i) {
672 std::vector<ModuleInPathTimingSummary>
temp(sz);
673 for (
size_t i = 0;
i != sz; ++
i) {
679 for (
size_t i = 0;
i != sz; ++
i) {
735 ++(earlyDeleteBranchToCount_[
index].second);
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > Workers
ModuleDescription const & description() const
std::vector< int > empty_trig_paths_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
int totalEventsFailed() const
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
volatile bool endpathsAreActive_
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, bool useStopwatch, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
std::string const & moduleName() const
void addToAllWorkers(Worker *w)
vstring end_path_name_list_
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
std::vector< WorkerSummary > workerSummaries
std::string const & moduleLabel() const
std::pair< double, double > timeCpuReal() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleInPathTimingSummary > moduleInPathSummaries
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
static void fillPathTimingSummary(Path const &path, PathTimingSummary &sum)
std::vector< WorkerInPath > PathWorkers
boost::shared_ptr< ActivityRegistry > actReg_
boost::shared_ptr< Worker > WorkerPtr
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
tuple path
else: Piece not in the list, fine.
std::vector< PathSummary > trigPathSummaries
EventSummary eventSummary
wantSummary_(tns.wantSummary())
EventTimingSummary eventSummary
static void fillWorkerTimingSummary(Worker const *pw, WorkerTimingSummary &sum)
static void fillModuleInPathTimingSummary(Path const &path, size_t which, ModuleInPathTimingSummary &sum)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
static void fillPathSummary(Path const &path, PathSummary &sum)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
std::vector< PathTimingSummary > trigPathSummaries
bool getMapped(key_type const &k, value_type &result) const
vstring empty_trig_path_names_
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
WorkerPtr results_inserter_
unsigned int value() const
boost::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
boost::shared_ptr< HLTGlobalStatus > TrigResPtr
bool search_all(ForwardSequence const &s, Datum const &d)
int timesVisited(size_type i) const
std::pair< double, double > timeCpuReal() const
StreamSchedule(TriggerResultInserter *inserter, boost::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
void setTrigResultForStream(unsigned int iStreamIndex, const TrigResPtr &trptr)
void getTriggerTimingReport(TriggerTimingReport &rep) const
static void fillWorkerTimingSummaryAux(Worker const &w, WorkerTimingSummary &sum)
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::string const & name() const
Worker const * getWorker(size_type i) const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
std::vector< ModuleInPathSummary > moduleInPathSummaries
void addToAllWorkers(Worker *w, bool useStopwatch)
std::vector< WorkerTimingSummary > workerSummaries
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
std::vector< std::string > vstring
tuple size
Write out results.
static Registry * instance()
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)