44 #include <fmt/format.h> 54 template <
typename InputIterator,
typename ForwardIterator,
typename Func>
55 void transform_into(InputIterator begin, InputIterator end, ForwardIterator
out, Func
func) {
65 template <
typename FROM,
typename TO,
typename FUNC>
66 void fill_summary(FROM
const& from, TO&
to, FUNC
func) {
67 if (
to.size() != from.size()) {
69 transform_into(from.begin(), from.end(),
temp.begin(),
func);
72 transform_into(from.begin(), from.end(),
to.begin(),
func);
76 class BeginStreamTraits {
78 static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext
const* streamContext) {
79 activityRegistry->preBeginStreamSignal_(*streamContext);
81 static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext
const* streamContext) {
82 activityRegistry->postBeginStreamSignal_(*streamContext);
86 class EndStreamTraits {
88 static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext
const* streamContext) {
89 activityRegistry->preEndStreamSignal_(*streamContext);
91 static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext
const* streamContext) {
92 activityRegistry->postEndStreamSignal_(*streamContext);
102 std::shared_ptr<ActivityRegistry> areg,
103 std::shared_ptr<TriggerResultInserter> inserter) {
106 ptr->setActivityRegistry(areg);
110 void initializeBranchToReadingWorker(std::vector<std::string>
const& branchesToDeleteEarly,
111 ProductRegistry
const& preg,
112 std::multimap<std::string, Worker*>& branchToReadingWorker) {
113 auto vBranchesToDeleteEarly = branchesToDeleteEarly;
115 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
116 vBranchesToDeleteEarly.erase(
std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
117 vBranchesToDeleteEarly.end());
120 auto allBranchNames = preg.allBranchNames();
122 for (
auto&
b : allBranchNames) {
123 b.resize(
b.size() - 1);
125 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
126 std::vector<std::string>
temp;
127 temp.reserve(vBranchesToDeleteEarly.size());
130 vBranchesToDeleteEarly.end(),
131 allBranchNames.begin(),
132 allBranchNames.end(),
133 std::back_inserter(
temp));
134 vBranchesToDeleteEarly.swap(
temp);
135 if (
temp.size() != vBranchesToDeleteEarly.size()) {
136 std::vector<std::string> missingProducts;
139 vBranchesToDeleteEarly.begin(),
140 vBranchesToDeleteEarly.end(),
141 std::back_inserter(missingProducts));
142 LogInfo l(
"MissingProductsForCanDeleteEarly");
143 l <<
"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
144 for (
auto const&
n : missingProducts) {
150 for (
auto const&
branch : vBranchesToDeleteEarly) {
151 branchToReadingWorker.insert(std::make_pair(
branch, static_cast<Worker*>(
nullptr)));
157 WorkerManager& workerManager,
158 ProductRegistry& preg,
159 PreallocationConfiguration
const* prealloc,
160 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
163 if (modpset ==
nullptr) {
168 return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration,
moduleLabel);
177 template <
typename T>
178 auto findConditionalTaskModulesRange(
T& modnames) {
179 auto beg =
std::find(modnames.begin(), modnames.end(),
"#");
180 if (beg == modnames.end()) {
181 return std::pair(modnames.end(), modnames.end());
183 return std::pair(beg + 1, std::prev(modnames.end()));
186 std::optional<std::string> findBestMatchingAlias(
187 std::unordered_multimap<std::string, edm::BranchDescription const*>
const& conditionalModuleBranches,
188 std::unordered_multimap<std::string, StreamSchedule::AliasInfo>
const& aliasMap,
190 ConsumesInfo
const& consumesInfo) {
191 std::optional<std::string>
best;
193 bool bestIsAmbiguous =
false;
195 auto updateBest = [&
best, &wildcardsInBest, &bestIsAmbiguous](
197 int const wildcards =
static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
198 if (wildcards == 0) {
199 bestIsAmbiguous =
false;
202 if (not best
or wildcards < wildcardsInBest) {
204 wildcardsInBest = wildcards;
205 bestIsAmbiguous =
false;
206 }
else if (best and *best !=
label and wildcardsInBest == wildcards) {
207 bestIsAmbiguous =
true;
212 auto findAlias = aliasMap.equal_range(productModuleLabel);
213 for (
auto it = findAlias.first;
it != findAlias.second; ++
it) {
215 it->second.instanceLabel !=
"*" ?
it->second.instanceLabel :
it->second.originalInstanceLabel;
216 bool const instanceIsWildcard = (aliasInstanceLabel ==
"*");
217 if (instanceIsWildcard
or consumesInfo.instance() == aliasInstanceLabel) {
218 bool const typeIsWildcard =
it->second.friendlyClassName ==
"*";
219 if (typeIsWildcard
or (consumesInfo.type().friendlyClassName() ==
it->second.friendlyClassName)) {
220 if (updateBest(
it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
221 return it->second.originalModuleLabel;
226 auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
228 if (typeIsWildcard
or itBranch->second->productInstanceName() ==
it->second.originalInstanceLabel) {
230 TypeID(itBranch->second->wrappedType().typeInfo()),
231 itBranch->second->className())) {
232 if (updateBest(
it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
233 return it->second.originalModuleLabel;
241 if (bestIsAmbiguous) {
243 <<
"Encountered ambiguity when trying to find a best-matching alias for\n" 244 <<
" friendly class name " << consumesInfo.type().friendlyClassName() <<
"\n" 245 <<
" module label " << productModuleLabel <<
"\n" 246 <<
" product instance name " << consumesInfo.instance() <<
"\n" 247 <<
"when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of " 249 << wildcardsInBest <<
")";
257 typedef std::vector<std::string>
vstring;
268 std::shared_ptr<ProcessConfiguration const> processConfiguration,
270 std::vector<std::string>
const& trigPathNames) {
271 std::unordered_set<std::string> allConditionalMods;
272 for (
auto const&
pathName : trigPathNames) {
276 auto condRange = findConditionalTaskModulesRange(modnames);
277 if (condRange.first == condRange.second)
281 allConditionalMods.insert(condRange.first, condRange.second);
284 for (
auto const&
cond : allConditionalMods) {
286 (
void)getWorker(
cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration);
294 if (allConditionalMods.find(
prod.first.moduleLabel()) != allConditionalMods.end()) {
303 std::unordered_set<std::string>
const& conditionalmods)
const {
304 std::unordered_multimap<std::string, edm::BranchDescription const*>
ret;
305 for (
auto const&
mod : conditionalmods) {
318 auto aliasedToModuleLabels =
info.getParameterNames();
319 for (
auto const&
mod : aliasedToModuleLabels) {
320 if (not
mod.empty() and
mod[0] !=
'@' and allConditionalMods.find(
mod) != allConditionalMods.end()) {
321 auto aliasVPSet =
info.getParameter<std::vector<edm::ParameterSet>>(
mod);
322 for (
auto const& aliasPSet : aliasVPSet) {
326 if (aliasPSet.exists(
"type")) {
329 if (aliasPSet.exists(
"toProductInstance")) {
332 if (aliasPSet.exists(
"fromProductInstance")) {
333 originalInstance = aliasPSet.getParameter<
std::string>(
"fromProductInstance");
346 std::unordered_set<std::string>
const& allConditionalMods) {
347 auto const& all_modules = proc_pset.
getParameter<std::vector<std::string>>(
"@all_modules");
348 std::vector<std::string> switchEDAliases;
349 for (
auto const&
module : all_modules) {
351 if (mod_pset.getParameter<
std::string>(
"@module_type") ==
"SwitchProducer") {
352 auto const& all_cases = mod_pset.
getParameter<std::vector<std::string>>(
"@all_cases");
353 for (
auto const& case_label : all_cases) {
356 switchEDAliases.push_back(case_label);
362 switchEDAliases, allConditionalMods, proc_pset, processConfiguration.
processName(), preg);
365 std::unordered_multimap<std::string, AliasInfo>
aliasMap_;
372 std::shared_ptr<TriggerResultInserter> inserter,
375 std::shared_ptr<ModuleRegistry> modReg,
381 std::shared_ptr<ActivityRegistry> areg,
382 std::shared_ptr<ProcessConfiguration const> processConfiguration,
385 : workerManagerBeginEnd_(modReg, areg,
actions),
386 workerManagerRuns_(modReg, areg,
actions),
387 workerManagerLumisAndEvents_(modReg, areg,
actions),
395 number_of_unscheduled_modules_(0),
397 streamContext_(streamID_, processContext) {
398 bool hasPath =
false;
399 std::vector<std::string>
const& pathNames = tns.
getTrigPaths();
400 std::vector<std::string>
const& endPathNames = tns.
getEndPaths();
404 std::unordered_set<std::string> conditionalModules;
408 for (
auto const& trig_name : pathNames) {
412 processConfiguration,
417 conditionalTaskHelper,
434 for (
auto const& end_path_name : endPathNames) {
438 processConfiguration,
442 conditionalTaskHelper,
450 std::set<std::string> usedWorkerLabels;
452 usedWorkerLabels.insert(worker->description()->moduleLabel());
454 std::vector<std::string> modulesInConfig(proc_pset.
getParameter<std::vector<std::string>>(
"@all_modules"));
455 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
456 std::vector<std::string> unusedLabels;
458 modulesInConfigSet.end(),
459 usedWorkerLabels.begin(),
460 usedWorkerLabels.end(),
461 back_inserter(unusedLabels));
462 std::set<std::string> unscheduledLabels;
463 std::vector<std::string> shouldBeUsedLabels;
464 if (!unusedLabels.empty()) {
469 for (
auto const&
label : unusedLabels) {
473 assert(modulePSet !=
nullptr);
475 *modulePSet, preg, &prealloc, processConfiguration,
label, unscheduledLabels, shouldBeUsedLabels);
477 if (!shouldBeUsedLabels.empty()) {
478 std::ostringstream unusedStream;
479 unusedStream <<
"'" << shouldBeUsedLabels.front() <<
"'";
480 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
481 itLabelEnd = shouldBeUsedLabels.end();
482 itLabel != itLabelEnd;
484 unusedStream <<
",'" << *itLabel <<
"'";
486 LogInfo(
"path") <<
"The following module labels are not assigned to any path:\n" << unusedStream.str() <<
"\n";
492 if (
streamID.
value() == 0 and not conditionalModules.empty()) {
497 std::vector<std::string_view> labelsToPrint;
499 unscheduledLabels.begin(),
500 unscheduledLabels.end(),
501 std::back_inserter(labelsToPrint),
502 [&conditionalModules](
auto const& lab) {
return conditionalModules.find(lab) != conditionalModules.end(); });
504 if (not labelsToPrint.empty()) {
506 log <<
"The following modules were part of some ConditionalTask, but were not\n" 507 <<
"consumed by any other module in any of the Paths to which the ConditionalTask\n" 508 <<
"was associated. Perhaps they should be either removed from the\n" 509 <<
"job, or moved to a Task to make it explicit they are unscheduled.\n";
510 for (
auto const& modLabel : labelsToPrint) {
511 log.format(
"\n {}", modLabel);
526 if (workerBeginEnd) {
539 std::vector<std::string>
const& branchesToDeleteEarly,
540 std::multimap<std::string, std::string>
const& referencesToBranches,
541 std::vector<std::string>
const& modulesToSkip,
544 std::multimap<std::string, Worker*> branchToReadingWorker;
545 initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
547 const std::vector<std::string>
kEmpty;
548 std::map<Worker*, unsigned int> reserveSizeForWorker;
549 unsigned int upperLimitOnReadingWorker = 0;
550 unsigned int upperLimitOnIndicies = 0;
551 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
557 if (!branchToReadingWorker.empty()) {
563 auto found = branchToReadingWorker.equal_range(
desc.branchName());
565 --nUniqueBranchesToDelete;
566 branchToReadingWorker.erase(
found.first,
found.second);
573 if (branchToReadingWorker.empty()) {
577 std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
579 if (modulesToExclude.end() != modulesToExclude.find(
w->description()->moduleLabel())) {
583 auto consumes =
w->consumesInfo();
584 if (not consumes.empty()) {
585 bool foundAtLeastOneMatchingBranch =
false;
586 for (
auto const& product : consumes) {
588 product.type().friendlyClassName(),
589 product.label().data(),
590 product.instance().data(),
591 product.process().data());
594 auto found = branchToReadingWorker.end();
595 if (product.process().empty()) {
596 auto startFound = branchToReadingWorker.lower_bound(
branch);
597 if (startFound != branchToReadingWorker.end()) {
598 if (startFound->first.substr(0,
branch.size()) ==
branch) {
604 auto exactFound = branchToReadingWorker.equal_range(
branch);
605 if (exactFound.first != exactFound.second) {
606 found = exactFound.first;
609 if (
found != branchToReadingWorker.end()) {
610 if (not foundAtLeastOneMatchingBranch) {
611 ++upperLimitOnReadingWorker;
612 foundAtLeastOneMatchingBranch =
true;
614 ++upperLimitOnIndicies;
615 ++reserveSizeForWorker[
w];
616 if (
nullptr ==
found->second) {
619 branchToReadingWorker.insert(make_pair(
found->first,
w));
625 auto found = referencesToBranches.end();
626 if (product.process().empty()) {
627 auto startFound = referencesToBranches.lower_bound(
branch);
628 if (startFound != referencesToBranches.end()) {
629 if (startFound->first.substr(0,
branch.size()) ==
branch) {
636 auto exactFound = referencesToBranches.equal_range(
branch);
637 if (exactFound.first != exactFound.second) {
638 found = exactFound.first;
641 if (
found != referencesToBranches.end()) {
642 for (
auto itr =
found; (itr != referencesToBranches.end()) and (itr->first ==
found->first); ++itr) {
643 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
644 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
647 if (not foundAtLeastOneMatchingBranch) {
648 ++upperLimitOnReadingWorker;
649 foundAtLeastOneMatchingBranch =
true;
651 ++upperLimitOnIndicies;
652 ++reserveSizeForWorker[
w];
653 if (
nullptr == foundInBranchToReadingWorker->second) {
654 foundInBranchToReadingWorker->second =
w;
656 branchToReadingWorker.insert(make_pair(itr->second,
w));
665 auto it = branchToReadingWorker.begin();
666 std::vector<std::string> unusedBranches;
667 while (
it != branchToReadingWorker.end()) {
668 if (
it->second ==
nullptr) {
669 unusedBranches.push_back(
it->first);
673 branchToReadingWorker.erase(
temp);
678 if (not unusedBranches.empty()) {
680 l <<
"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n" 681 " If possible, remove the producer from the job.";
682 for (
auto const&
n : unusedBranches) {
687 if (!branchToReadingWorker.empty()) {
691 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
693 size_t nextOpenIndex = 0;
695 for (
auto& branchAndWorker : branchToReadingWorker) {
696 if (lastBranchName != branchAndWorker.first) {
698 BranchID bid(branchAndWorker.first +
".");
700 lastBranchName = branchAndWorker.first;
702 auto found = alreadySeenWorkers.find(branchAndWorker.second);
703 if (alreadySeenWorkers.end() ==
found) {
708 size_t index = nextOpenIndex;
709 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
714 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(
earlyDeleteHelpers_.back())));
715 nextOpenIndex += nIndices;
725 if (itLast->end() !=
it->begin()) {
727 unsigned int delta =
it->begin() - itLast->end();
742 p.setEarlyDeleteHelpers(alreadySeenWorkers);
745 p.setEarlyDeleteHelpers(alreadySeenWorkers);
753 std::unordered_set<std::string>& conditionalModules,
754 std::unordered_multimap<std::string, edm::BranchDescription const*>
const& conditionalModuleBranches,
755 std::unordered_multimap<std::string, AliasInfo>
const& aliasMap,
759 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
760 std::vector<Worker*> returnValue;
763 using namespace productholderindexhelper;
764 for (
auto const& ci : consumesInfo) {
765 if (not ci.skipCurrentProcess() and
766 (ci.process().empty()
or ci.process() == processConfiguration->processName())) {
768 bool productFromConditionalModule =
false;
769 auto itFound = conditionalModules.find(productModuleLabel);
770 if (itFound == conditionalModules.end()) {
773 auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
775 productModuleLabel = *foundAlias;
776 productFromConditionalModule =
true;
777 itFound = conditionalModules.find(productModuleLabel);
779 if (itFound == conditionalModules.end()) {
785 auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
786 for (
auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
787 if (itBranch->second->productInstanceName() == ci.instance()) {
789 if (ci.type() == itBranch->second->unwrappedTypeID()) {
790 productFromConditionalModule =
true;
796 ci.type(),
TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->
className())) {
797 productFromConditionalModule =
true;
804 if (productFromConditionalModule) {
805 auto condWorker = getWorker(
809 conditionalModules.erase(itFound);
813 conditionalModuleBranches,
818 processConfiguration);
819 returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
820 returnValue.push_back(condWorker);
830 std::shared_ptr<ProcessConfiguration const> processConfiguration,
834 std::vector<std::string>
const& endPathNames,
836 std::unordered_set<std::string>& allConditionalModules) {
841 auto condRange = findConditionalTaskModulesRange(modnames);
843 std::unordered_set<std::string> conditionalmods;
845 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
846 std::unordered_map<std::string, unsigned int> conditionalModOrder;
847 if (condRange.first != condRange.second) {
848 for (
auto it = condRange.first;
it != condRange.second; ++
it) {
850 conditionalModOrder.emplace(*
it,
it - modnames.begin() - 1);
853 conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
854 std::make_move_iterator(condRange.second));
857 modnames.erase(std::prev(condRange.first), modnames.end());
860 allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
863 unsigned int placeInPath = 0;
864 for (
auto const&
name : modnames) {
866 bool doNotRunConcurrently =
false;
868 if (
name[0] ==
'!') {
870 }
else if (
name[0] ==
'-' or name[0] ==
'+') {
875 doNotRunConcurrently =
true;
885 if (worker ==
nullptr) {
891 <<
"The unknown module label \"" <<
moduleLabel <<
"\" appears in " << pathType <<
" \"" <<
pathName 892 <<
"\"\n please check spelling or remove that label from the path.";
903 <<
"' with module label '" <<
moduleLabel <<
"' appears on EndPath '" 905 <<
"The return value of the filter will be ignored.\n" 906 <<
"To suppress this warning, either remove the filter from the endpath,\n" 907 <<
"or explicitly ignore it in the configuration by using cms.ignore().\n";
910 bool runConcurrently = not doNotRunConcurrently;
912 runConcurrently =
false;
917 conditionalModsBranches,
922 processConfiguration);
923 for (
auto condMod : condModules) {
924 tmpworkers.emplace_back(
925 condMod,
WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()],
true);
928 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
932 out.swap(tmpworkers);
938 std::shared_ptr<ProcessConfiguration const> processConfiguration,
942 std::vector<std::string>
const& endPathNames,
944 std::unordered_set<std::string>& allConditionalModules) {
949 processConfiguration,
954 conditionalTaskHelper,
955 allConditionalModules);
958 if (!tmpworkers.empty()) {
972 std::shared_ptr<ProcessConfiguration const> processConfiguration,
975 std::vector<std::string>
const& endPathNames,
977 std::unordered_set<std::string>& allConditionalModules) {
982 processConfiguration,
987 conditionalTaskHelper,
988 allConditionalModules);
990 if (!tmpworkers.empty()) {
1014 std::exception_ptr exceptionInStream;
1019 exceptionInStream = std::current_exception();
1022 postScheduleSignal<BeginStreamTraits>(&
streamContext_, exceptionInStream);
1024 if (exceptionInStream) {
1025 bool cleaningUpAfterException =
false;
1030 if (exceptionInStream) {
1031 std::rethrow_exception(exceptionInStream);
1042 std::exception_ptr exceptionInStream;
1047 exceptionInStream = std::current_exception();
1050 postScheduleSignal<EndStreamTraits>(&
streamContext_, exceptionInStream);
1052 if (exceptionInStream) {
1053 std::lock_guard<std::mutex> collectorLock(collectorMutex);
1054 collector.call([&exceptionInStream]() { std::rethrow_exception(exceptionInStream); });
1061 if (worker->description()->moduleLabel() == iLabel) {
1073 ex.
addContext(
"Executing StreamSchedule::replaceModule");
1082 if (worker->description()->moduleLabel() == iLabel) {
1089 if (worker->description()->moduleLabel() == iLabel) {
1103 std::vector<ModuleDescription const*>
result;
1139 results_->at(empty_trig_path) = hltPathStatus;
1140 pathStatusInserters[empty_trig_path]->setPathStatus(
streamID_, hltPathStatus);
1151 std::exception_ptr except =
1165 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(
nullptr);
1166 auto pathErrorPtr = pathErrorHolder.get();
1169 [iTask,
this, weakToken, pathError =
std::move(pathErrorHolder)](std::exception_ptr
const* iPtr)
mutable {
1172 std::exception_ptr ptr;
1173 if (pathError->load()) {
1174 ptr = *pathError->load();
1175 delete pathError->load();
1177 if ((not ptr) and iPtr) {
1187 auto pathsDone =
make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo =
info,
this, weakToken](
1188 std::exception_ptr
const* iPtr)
mutable {
1194 auto currentPtr = pathErrorPtr->exchange(
new std::exception_ptr(*iPtr));
1195 assert(currentPtr ==
nullptr);
1233 *(iExcept.load()) = std::exception_ptr();
1235 *(iExcept.load()) = std::current_exception();
1238 *(iExcept.load()) = std::current_exception();
1242 if ((not iExcept) and
results_->accept()) {
1256 std::rethrow_exception(expt);
1261 std::ostringstream ost;
1262 ost <<
"Processing Event " <<
info.principal().id();
1265 iExcept.store(
new std::exception_ptr(std::current_exception()));
1269 iExcept.store(
new std::exception_ptr(std::current_exception()));
1273 std::exception_ptr ptr;
1275 ptr = *iExcept.load();
1288 bool const cleaningUpAfterException =
false;
1294 iExcept = std::current_exception();
1302 iExcept = std::current_exception();
1316 std::back_inserter(oLabelsToFill),
1317 std::bind(&
Path::name, std::placeholders::_1));
1321 TrigPaths::const_iterator itFound = std::find_if(
1324 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
1326 oLabelsToFill.reserve(itFound->size());
1327 for (
size_t i = 0;
i < itFound->size(); ++
i) {
1328 oLabelsToFill.push_back(itFound->getWorker(
i)->description()->moduleLabel());
1334 std::vector<ModuleDescription const*>& descriptions,
1335 unsigned int hint)
const {
1336 descriptions.clear();
1338 TrigPaths::const_iterator itFound;
1342 if (itFound->name() == iPathLabel)
1347 itFound = std::find_if(
1350 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
1355 descriptions.reserve(itFound->size());
1356 for (
size_t i = 0;
i < itFound->size(); ++
i) {
1357 descriptions.push_back(itFound->getWorker(
i)->description());
1363 std::vector<ModuleDescription const*>& descriptions,
1364 unsigned int hint)
const {
1365 descriptions.clear();
1367 TrigPaths::const_iterator itFound;
1371 if (itFound->name() == iEndPathLabel)
1376 itFound = std::find_if(
1379 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&
Path::name, std::placeholders::_1)));
1384 descriptions.reserve(itFound->size());
1385 for (
size_t i = 0;
i < itFound->size(); ++
i) {
1386 descriptions.push_back(itFound->getWorker(
i)->description());
1410 std::vector<ModuleInPathSummary>
temp(sz);
1411 for (
size_t i = 0;
i != sz; ++
i) {
1417 for (
size_t i = 0;
i != sz; ++
i) {
1445 using std::placeholders::_1;
1475 unsigned int indexEmpty = 0;
1476 unsigned int indexOfPath = 0;
1477 for (
auto& pathStatusInserter : pathStatusInserters) {
1478 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
1482 workerPtr->setActivityRegistry(
actReg_);
1491 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1500 for (
auto& endPathStatusInserter : endPathStatusInserters) {
1501 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
1505 workerPtr->setActivityRegistry(
actReg_);
1514 end_paths_.at(indexOfPath).setPathStatusInserter(
nullptr, workerPtr.get());
1522 bool cleaningUpAfterException,
1523 std::exception_ptr& excpt)
const noexcept {
1528 std::ostringstream ost;
1535 excpt = std::current_exception();
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
AllWorkers const & allWorkersBeginEnd() const
returns the collection of pointers to workers
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
void setTimestamp(Timestamp const &v)
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void endStream(ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
void clearCounters() noexcept
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManagerLumisAndEvents, std::vector< std::string > const &trigPathNames)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
StreamID streamID() const
WorkerManager workerManagerBeginEnd_
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
std::shared_ptr< ActivityRegistry > actReg_
void setTransition(Transition v)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
static RunIndex invalidRunIndex()
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
WorkerManager workerManagerRuns_
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
def unique(seq, keepstr=True)
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
ModuleDescription const * description() const noexcept
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void doneWaiting(std::exception_ptr iExcept) noexcept
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
std::string const & className() const
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
void handleException(StreamContext const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
bool search_all(ForwardSequence const &s, Datum const &d)
AllWorkers const & allWorkersLumisAndEvents() const
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
void setEventID(EventID const &v)
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
auto wrap(F iFunc) -> decltype(iFunc())
void endStream(StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::list< std::string > const & context() const
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
std::string const & name() const
WorkerManager workerManagerLumisAndEvents_
void setRunIndex(RunIndex const &v)
void beginStream(StreamID, StreamContext const &)
std::vector< std::string > vstring
AllWorkers const & allWorkersRuns() const
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)