48 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
49 void transform_into(InputIterator begin, InputIterator
end, ForwardIterator
out, Func
func) {
50 for (; begin !=
end; ++begin, ++
out)
59 template <
typename FROM,
typename TO,
typename FUNC>
60 void fill_summary(FROM
const& from, TO&
to, FUNC
func) {
61 if (
to.size() != from.size()) {
63 transform_into(from.begin(), from.end(),
temp.begin(),
func);
66 transform_into(from.begin(), from.end(),
to.begin(),
func);
76 std::shared_ptr<ActivityRegistry> areg,
77 std::shared_ptr<TriggerResultInserter> inserter) {
80 ptr->setActivityRegistry(areg);
85 ProductRegistry
const& preg,
86 std::multimap<std::string, Worker*>& branchToReadingWorker) {
88 auto vBranchesToDeleteEarly =
opts.getUntrackedParameter<std::vector<std::string>>(
"canDeleteEarly");
89 if (not vBranchesToDeleteEarly.empty()) {
90 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
91 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
92 vBranchesToDeleteEarly.end());
95 auto allBranchNames = preg.allBranchNames();
97 for (
auto&
b : allBranchNames) {
98 b.resize(
b.size() - 1);
100 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
101 std::vector<std::string>
temp;
102 temp.reserve(vBranchesToDeleteEarly.size());
104 std::set_intersection(vBranchesToDeleteEarly.begin(),
105 vBranchesToDeleteEarly.end(),
106 allBranchNames.begin(),
107 allBranchNames.end(),
108 std::back_inserter(
temp));
109 vBranchesToDeleteEarly.swap(
temp);
110 if (
temp.size() != vBranchesToDeleteEarly.size()) {
111 std::vector<std::string> missingProducts;
112 std::set_difference(
temp.begin(),
114 vBranchesToDeleteEarly.begin(),
115 vBranchesToDeleteEarly.end(),
116 std::back_inserter(missingProducts));
117 LogInfo l(
"MissingProductsForCanDeleteEarly");
118 l <<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
119 for (
auto const&
n : missingProducts) {
125 for (
auto const&
branch : vBranchesToDeleteEarly) {
126 branchToReadingWorker.insert(std::make_pair(
branch, static_cast<Worker*>(
nullptr)));
134 typedef std::vector<std::string>
vstring;
139 std::shared_ptr<TriggerResultInserter> inserter,
142 std::shared_ptr<ModuleRegistry> modReg,
149 std::shared_ptr<ActivityRegistry> areg,
150 std::shared_ptr<ProcessConfiguration> processConfiguration,
151 bool allowEarlyDelete,
154 : workerManager_(modReg, areg,
actions),
162 number_of_unscheduled_modules_(0),
164 streamContext_(streamID_, processContext),
165 endpathsAreActive_(
true),
166 skippingEvent_(
false) {
168 bool hasPath =
false;
169 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
170 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
174 for (
auto const& trig_name : pathNames) {
175 fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name,
results(), endPathNames);
191 for (
auto const& end_path_name : endPathNames) {
192 fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
199 std::set<std::string> usedWorkerLabels;
201 usedWorkerLabels.insert(worker->description()->moduleLabel());
203 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string>>(
"@all_modules"));
204 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
205 std::vector<std::string> unusedLabels;
206 set_difference(modulesInConfigSet.begin(),
207 modulesInConfigSet.end(),
208 usedWorkerLabels.begin(),
209 usedWorkerLabels.end(),
210 back_inserter(unusedLabels));
211 std::set<std::string> unscheduledLabels;
212 std::vector<std::string> shouldBeUsedLabels;
213 if (!unusedLabels.empty()) {
218 for (
auto const&
label : unusedLabels) {
222 assert(modulePSet !=
nullptr);
224 *modulePSet, preg, &prealloc, processConfiguration,
label, unscheduledLabels, shouldBeUsedLabels);
226 if (!shouldBeUsedLabels.empty()) {
227 std::ostringstream unusedStream;
228 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
229 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
230 itLabelEnd = shouldBeUsedLabels.end();
231 itLabel != itLabelEnd;
233 unusedStream <<
",'" << *itLabel <<
"'";
235 LogInfo(
"path") <<
"The following module labels are not assigned to any path:\n" << unusedStream.str() <<
"\n";
247 bool allowEarlyDelete) {
250 if (not allowEarlyDelete)
255 std::multimap<std::string, Worker*> branchToReadingWorker;
256 initializeBranchToReadingWorker(
opts, preg, branchToReadingWorker);
259 if (branchToReadingWorker.empty()) {
262 const std::vector<std::string> kEmpty;
263 std::map<Worker*, unsigned int> reserveSizeForWorker;
264 unsigned int upperLimitOnReadingWorker = 0;
265 unsigned int upperLimitOnIndicies = 0;
266 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
272 if (!branchToReadingWorker.empty()) {
278 auto found = branchToReadingWorker.equal_range(
desc.branchName());
280 --nUniqueBranchesToDelete;
281 branchToReadingWorker.erase(
found.first,
found.second);
288 if (branchToReadingWorker.empty()) {
295 if (
nullptr !=
pset) {
296 auto branches =
pset->getUntrackedParameter<std::vector<std::string>>(
"mightGet", kEmpty);
297 if (not branches.empty()) {
298 ++upperLimitOnReadingWorker;
300 for (
auto const&
branch : branches) {
301 auto found = branchToReadingWorker.equal_range(
branch);
303 ++upperLimitOnIndicies;
304 ++reserveSizeForWorker[
w];
305 if (
nullptr ==
found.first->second) {
308 branchToReadingWorker.insert(make_pair(
found.first->first,
w));
315 auto it = branchToReadingWorker.begin();
316 std::vector<std::string> unusedBranches;
317 while (it != branchToReadingWorker.end()) {
318 if (it->second ==
nullptr) {
319 unusedBranches.push_back(it->first);
323 branchToReadingWorker.erase(
temp);
328 if (not unusedBranches.empty()) {
330 l <<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
331 " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332 for (
auto const&
n : unusedBranches) {
337 if (!branchToReadingWorker.empty()) {
341 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
343 size_t nextOpenIndex = 0;
345 for (
auto& branchAndWorker : branchToReadingWorker) {
346 if (lastBranchName != branchAndWorker.first) {
348 BranchID bid(branchAndWorker.first +
".");
350 lastBranchName = branchAndWorker.first;
352 auto found = alreadySeenWorkers.find(branchAndWorker.second);
353 if (alreadySeenWorkers.end() ==
found) {
358 size_t index = nextOpenIndex;
359 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
363 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(
earlyDeleteHelpers_.back())));
364 nextOpenIndex += nIndices;
374 if (itLast->end() != it->begin()) {
376 unsigned int delta = it->begin() - itLast->end();
377 it->shiftIndexPointers(
delta);
391 p.setEarlyDeleteHelpers(alreadySeenWorkers);
394 p.setEarlyDeleteHelpers(alreadySeenWorkers);
403 std::shared_ptr<ProcessConfiguration const> processConfiguration,
407 std::vector<std::string>
const& endPathNames) {
411 unsigned int placeInPath = 0;
412 for (
auto const&
name : modnames) {
414 bool doNotRunConcurrently =
false;
416 if (
name[0] ==
'!') {
418 }
else if (
name[0] ==
'-' or name[0] ==
'+') {
423 doNotRunConcurrently =
true;
433 if (modpset ==
nullptr) {
439 <<
"The unknown module label \"" <<
moduleLabel <<
"\" appears in " << pathType <<
" \"" <<
pathName
440 <<
"\"\n please check spelling or remove that label from the path.";
453 <<
"' with module label '" <<
moduleLabel <<
"' appears on EndPath '"
455 <<
"The return value of the filter will be ignored.\n"
456 <<
"To suppress this warning, either remove the filter from the endpath,\n"
457 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
460 bool runConcurrently = not doNotRunConcurrently;
462 runConcurrently =
false;
464 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
468 out.swap(tmpworkers);
474 std::shared_ptr<ProcessConfiguration const> processConfiguration,
478 std::vector<std::string>
const& endPathNames) {
480 fillWorkers(proc_pset, preg, prealloc, processConfiguration,
name,
false, tmpworkers, endPathNames);
483 if (!tmpworkers.empty()) {
504 std::shared_ptr<ProcessConfiguration const> processConfiguration,
507 std::vector<std::string>
const& endPathNames) {
509 fillWorkers(proc_pset, preg, prealloc, processConfiguration,
name,
true, tmpworkers, endPathNames);
511 if (!tmpworkers.empty()) {
537 if (worker->description()->moduleLabel() == iLabel) {
542 if (
nullptr ==
found) {
553 std::vector<ModuleDescription const*>
result;
583 results_->at(empty_trig_path) = hltPathStatus;
584 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
609 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(
nullptr);
610 auto pathErrorPtr = pathErrorHolder.get();
613 [iTask,
this, weakToken, pathError =
std::move(pathErrorHolder)](std::exception_ptr
const* iPtr)
mutable {
616 std::exception_ptr ptr;
617 if (pathError->load()) {
618 ptr = *pathError->load();
619 delete pathError->load();
621 if ((not ptr) and iPtr) {
631 auto pathsDone =
make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo =
info,
this, weakToken](
632 std::exception_ptr
const* iPtr)
mutable {
638 pathErrorPtr->store(
new std::exception_ptr(*iPtr));
677 *(iExcept.load()) = std::exception_ptr();
679 *(iExcept.load()) = std::current_exception();
682 *(iExcept.load()) = std::current_exception();
686 if ((not iExcept) and
results_->accept()) {
700 std::rethrow_exception(expt);
705 std::ostringstream ost;
706 ost <<
"Processing Event " <<
info.principal().id();
709 iExcept.store(
new std::exception_ptr(std::current_exception()));
713 iExcept.store(
new std::exception_ptr(std::current_exception()));
717 std::exception_ptr ptr;
719 ptr = *iExcept.load();
732 bool const cleaningUpAfterException =
false;
738 iExcept = std::current_exception();
746 iExcept = std::current_exception();
760 std::back_inserter(oLabelsToFill),
761 std::bind(&
Path::name, std::placeholders::_1));
765 TrigPaths::const_iterator itFound = std::find_if(
768 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
770 oLabelsToFill.reserve(itFound->size());
771 for (
size_t i = 0;
i < itFound->size(); ++
i) {
772 oLabelsToFill.push_back(itFound->getWorker(
i)->description()->moduleLabel());
778 std::vector<ModuleDescription const*>& descriptions,
779 unsigned int hint)
const {
780 descriptions.clear();
782 TrigPaths::const_iterator itFound;
786 if (itFound->name() == iPathLabel)
791 itFound = std::find_if(
794 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
799 descriptions.reserve(itFound->size());
800 for (
size_t i = 0;
i < itFound->size(); ++
i) {
801 descriptions.push_back(itFound->getWorker(
i)->description());
807 std::vector<ModuleDescription const*>& descriptions,
808 unsigned int hint)
const {
809 descriptions.clear();
811 TrigPaths::const_iterator itFound;
815 if (itFound->name() == iEndPathLabel)
820 itFound = std::find_if(
823 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
828 descriptions.reserve(itFound->size());
829 for (
size_t i = 0;
i < itFound->size(); ++
i) {
830 descriptions.push_back(itFound->getWorker(
i)->description());
857 std::vector<ModuleInPathSummary>
temp(sz);
858 for (
size_t i = 0;
i != sz; ++
i) {
864 for (
size_t i = 0;
i != sz; ++
i) {
892 using std::placeholders::_1;
925 unsigned int indexEmpty = 0;
926 unsigned int indexOfPath = 0;
927 for (
auto& pathStatusInserter : pathStatusInserters) {
928 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
932 workerPtr->setActivityRegistry(
actReg_);
941 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
950 for (
auto& endPathStatusInserter : endPathStatusInserters) {
951 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
955 workerPtr->setActivityRegistry(
actReg_);
964 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr, workerPtr.get());