47 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
48 void transform_into(InputIterator
begin, InputIterator
end, ForwardIterator
out, Func
func) {
58 template <
typename FROM,
typename TO,
typename FUNC>
59 void fill_summary(FROM
const& from, TO&
to, FUNC
func) {
60 if (
to.size() != from.size()) {
62 transform_into(from.begin(), from.end(),
temp.begin(),
func);
65 transform_into(from.begin(), from.end(),
to.begin(),
func);
75 std::shared_ptr<ActivityRegistry> areg,
76 std::shared_ptr<TriggerResultInserter> inserter) {
79 ptr->setActivityRegistry(areg);
84 ProductRegistry
const& preg,
85 std::multimap<std::string, Worker*>& branchToReadingWorker) {
87 auto vBranchesToDeleteEarly =
opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly");
88 if (not vBranchesToDeleteEarly.empty()) {
89 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
90 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
91 vBranchesToDeleteEarly.end());
94 auto allBranchNames = preg.allBranchNames();
96 for (
auto&
b : allBranchNames) {
97 b.resize(
b.size() - 1);
99 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
100 std::vector<std::string>
temp;
101 temp.reserve(vBranchesToDeleteEarly.size());
103 std::set_intersection(vBranchesToDeleteEarly.begin(),
104 vBranchesToDeleteEarly.end(),
105 allBranchNames.begin(),
106 allBranchNames.end(),
107 std::back_inserter(
temp));
108 vBranchesToDeleteEarly.swap(
temp);
109 if (
temp.size() != vBranchesToDeleteEarly.size()) {
110 std::vector<std::string> missingProducts;
111 std::set_difference(
temp.begin(),
113 vBranchesToDeleteEarly.begin(),
114 vBranchesToDeleteEarly.end(),
115 std::back_inserter(missingProducts));
116 LogInfo
l(
"MissingProductsForCanDeleteEarly");
117 l <<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118 for (
auto const&
n : missingProducts) {
124 for (
auto const&
branch : vBranchesToDeleteEarly) {
125 branchToReadingWorker.insert(std::make_pair(
branch, static_cast<Worker*>(
nullptr)));
133 typedef std::vector<std::string>
vstring;
138 std::shared_ptr<TriggerResultInserter> inserter,
140 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
141 std::shared_ptr<ModuleRegistry> modReg,
148 std::shared_ptr<ActivityRegistry> areg,
149 std::shared_ptr<ProcessConfiguration> processConfiguration,
150 bool allowEarlyDelete,
153 : workerManager_(modReg, areg,
actions),
161 number_of_unscheduled_modules_(0),
163 streamContext_(streamID_, processContext),
164 endpathsAreActive_(
true),
165 skippingEvent_(
false) {
167 bool hasPath =
false;
169 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
173 for (
auto const& trig_name :
pathNames) {
174 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
190 for (
auto const& end_path_name : endPathNames) {
191 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
198 std::set<std::string> usedWorkerLabels;
200 usedWorkerLabels.insert(worker->description().moduleLabel());
202 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string>>(
"@all_modules"));
203 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204 std::vector<std::string> unusedLabels;
205 set_difference(modulesInConfigSet.begin(),
206 modulesInConfigSet.end(),
207 usedWorkerLabels.begin(),
208 usedWorkerLabels.end(),
209 back_inserter(unusedLabels));
210 std::set<std::string> unscheduledLabels;
211 std::vector<std::string> shouldBeUsedLabels;
212 if (!unusedLabels.empty()) {
217 for (
auto const&
label : unusedLabels) {
221 assert(modulePSet !=
nullptr);
223 *modulePSet, preg, &prealloc, processConfiguration,
label, unscheduledLabels, shouldBeUsedLabels);
225 if (!shouldBeUsedLabels.empty()) {
226 std::ostringstream unusedStream;
227 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
228 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229 itLabelEnd = shouldBeUsedLabels.end();
230 itLabel != itLabelEnd;
232 unusedStream <<
",'" << *itLabel <<
"'";
234 LogInfo(
"path") <<
"The following module labels are not assigned to any path:\n" << unusedStream.str() <<
"\n";
246 bool allowEarlyDelete) {
249 if (not allowEarlyDelete)
254 std::multimap<std::string, Worker*> branchToReadingWorker;
255 initializeBranchToReadingWorker(
opts, preg, branchToReadingWorker);
258 if (branchToReadingWorker.empty()) {
261 const std::vector<std::string> kEmpty;
262 std::map<Worker*, unsigned int> reserveSizeForWorker;
263 unsigned int upperLimitOnReadingWorker = 0;
264 unsigned int upperLimitOnIndicies = 0;
265 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
271 if (!branchToReadingWorker.empty()) {
279 --nUniqueBranchesToDelete;
280 branchToReadingWorker.erase(
found.first,
found.second);
287 if (branchToReadingWorker.empty()) {
294 if (
nullptr !=
pset) {
295 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet", kEmpty);
296 if (not branches.empty()) {
297 ++upperLimitOnReadingWorker;
299 for (
auto const&
branch : branches) {
300 auto found = branchToReadingWorker.equal_range(
branch);
302 ++upperLimitOnIndicies;
303 ++reserveSizeForWorker[
w];
304 if (
nullptr ==
found.first->second) {
307 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
314 auto it = branchToReadingWorker.begin();
315 std::vector<std::string> unusedBranches;
316 while (it != branchToReadingWorker.end()) {
317 if (it->second ==
nullptr) {
318 unusedBranches.push_back(it->first);
322 branchToReadingWorker.erase(
temp);
327 if (not unusedBranches.empty()) {
329 l <<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331 for (
auto const&
n : unusedBranches) {
336 if (!branchToReadingWorker.empty()) {
340 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
342 size_t nextOpenIndex = 0;
344 for (
auto& branchAndWorker : branchToReadingWorker) {
345 if (lastBranchName != branchAndWorker.first) {
347 BranchID bid(branchAndWorker.first +
".");
349 lastBranchName = branchAndWorker.first;
351 auto found = alreadySeenWorkers.find(branchAndWorker.second);
352 if (alreadySeenWorkers.end() ==
found) {
357 size_t index = nextOpenIndex;
358 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
362 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(
earlyDeleteHelpers_.back())));
363 nextOpenIndex += nIndices;
373 if (itLast->end() != it->begin()) {
375 unsigned int delta = it->begin() - itLast->end();
376 it->shiftIndexPointers(
delta);
390 p.setEarlyDeleteHelpers(alreadySeenWorkers);
393 p.setEarlyDeleteHelpers(alreadySeenWorkers);
402 std::shared_ptr<ProcessConfiguration const> processConfiguration,
406 std::vector<std::string>
const& endPathNames) {
410 unsigned int placeInPath = 0;
411 for (
auto const&
name : modnames) {
413 bool doNotRunConcurrently =
false;
415 if (
name[0] ==
'!') {
417 }
else if (
name[0] ==
'-' or name[0] ==
'+') {
422 doNotRunConcurrently =
true;
432 if (modpset ==
nullptr) {
438 <<
"The unknown module label \"" <<
moduleLabel <<
"\" appears in " << pathType <<
" \"" <<
pathName
439 <<
"\"\n please check spelling or remove that label from the path.";
452 <<
"' with module label '" <<
moduleLabel <<
"' appears on EndPath '"
454 <<
"The return value of the filter will be ignored.\n"
455 <<
"To suppress this warning, either remove the filter from the endpath,\n"
456 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
459 bool runConcurrently = not doNotRunConcurrently;
461 runConcurrently =
false;
463 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
467 out.swap(tmpworkers);
473 std::shared_ptr<ProcessConfiguration const> processConfiguration,
477 std::vector<std::string>
const& endPathNames) {
479 fillWorkers(proc_pset, preg, prealloc, processConfiguration,
name,
false, tmpworkers, endPathNames);
482 if (!tmpworkers.empty()) {
503 std::shared_ptr<ProcessConfiguration const> processConfiguration,
506 std::vector<std::string>
const& endPathNames) {
508 fillWorkers(proc_pset, preg, prealloc, processConfiguration,
name,
true, tmpworkers, endPathNames);
510 if (!tmpworkers.empty()) {
536 if (worker->description().moduleLabel() == iLabel) {
541 if (
nullptr ==
found) {
550 std::vector<ModuleDescription const*>
result;
579 results_->at(empty_trig_path) = hltPathStatus;
580 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
605 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(
nullptr);
606 auto pathErrorPtr = pathErrorHolder.get();
608 tbb::task::allocate_root(),
609 [iTask,
this, serviceToken, pathError =
std::move(pathErrorHolder)](std::exception_ptr
const* iPtr)
mutable {
612 std::exception_ptr ptr;
613 if (pathError->load()) {
614 ptr = *pathError->load();
615 delete pathError->load();
617 if ((not ptr) and iPtr) {
628 tbb::task::allocate_root(),
629 [allPathsHolder, pathErrorPtr, &
ep, &es,
this, serviceToken](std::exception_ptr
const* iPtr)
mutable {
635 pathErrorPtr->store(
new std::exception_ptr(*iPtr));
674 *(iExcept.load()) = std::exception_ptr();
676 *(iExcept.load()) = std::current_exception();
679 *(iExcept.load()) = std::current_exception();
683 if ((not iExcept) and
results_->accept()) {
699 std::ostringstream ost;
700 ost <<
"Processing Event " <<
ep.id();
703 iExcept.store(
new std::exception_ptr(std::current_exception()));
707 iExcept.store(
new std::exception_ptr(std::current_exception()));
711 std::exception_ptr ptr;
713 ptr = *iExcept.load();
726 bool const cleaningUpAfterException =
false;
732 iExcept = std::current_exception();
740 iExcept = std::current_exception();
754 std::back_inserter(oLabelsToFill),
755 std::bind(&
Path::name, std::placeholders::_1));
759 TrigPaths::const_iterator itFound = std::find_if(
762 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
764 oLabelsToFill.reserve(itFound->size());
765 for (
size_t i = 0;
i < itFound->size(); ++
i) {
766 oLabelsToFill.push_back(itFound->getWorker(
i)->description().moduleLabel());
772 std::vector<ModuleDescription const*>& descriptions,
773 unsigned int hint)
const {
774 descriptions.clear();
776 TrigPaths::const_iterator itFound;
780 if (itFound->name() == iPathLabel)
785 itFound = std::find_if(
788 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
793 descriptions.reserve(itFound->size());
794 for (
size_t i = 0;
i < itFound->size(); ++
i) {
795 descriptions.push_back(itFound->getWorker(
i)->descPtr());
801 std::vector<ModuleDescription const*>& descriptions,
802 unsigned int hint)
const {
803 descriptions.clear();
805 TrigPaths::const_iterator itFound;
809 if (itFound->name() == iEndPathLabel)
814 itFound = std::find_if(
817 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
822 descriptions.reserve(itFound->size());
823 for (
size_t i = 0;
i < itFound->size(); ++
i) {
824 descriptions.push_back(itFound->getWorker(
i)->descPtr());
851 std::vector<ModuleInPathSummary>
temp(sz);
852 for (
size_t i = 0;
i != sz; ++
i) {
858 for (
size_t i = 0;
i != sz; ++
i) {
886 using std::placeholders::_1;
916 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
919 unsigned int indexEmpty = 0;
920 unsigned int indexOfPath = 0;
921 for (
auto& pathStatusInserter : pathStatusInserters) {
922 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
926 workerPtr->setActivityRegistry(
actReg_);
935 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
944 for (
auto& endPathStatusInserter : endPathStatusInserters) {
945 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
949 workerPtr->setActivityRegistry(
actReg_);
958 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr, workerPtr.get());