63 #include "boost/bind.hpp"
64 #include "boost/thread/xtime.hpp"
78 #include <sys/types.h>
80 #include <sys/socket.h>
81 #include <sys/select.h>
82 #include <sys/fcntl.h>
91 #include "Cintex/Cintex.h"
96 std::unique_ptr<InputSource>
100 boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
101 boost::shared_ptr<ActivityRegistry>
areg,
105 if(main_input == 0) {
107 <<
"There must be exactly one source in the configuration.\n"
108 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
113 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
116 filler->fill(descriptions);
120 descriptions.validate(*main_input,
std::string(
"source"));
124 std::ostringstream ost;
125 ost <<
"Validating configuration of input source of type " << modtype;
141 processConfiguration.get(),
145 areg->preSourceConstructionSignal_(md);
146 std::unique_ptr<InputSource>
input;
149 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
155 std::ostringstream ost;
156 ost <<
"Constructing input source of type " << modtype;
164 boost::shared_ptr<EDLooperBase>
168 boost::shared_ptr<EDLooperBase> vLooper;
170 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
172 if(loopers.size() == 0) {
176 assert(1 == loopers.size());
178 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
196 std::vector<std::string>
const& defaultServices,
197 std::vector<std::string>
const& forcedServices) :
200 branchIDListHelper_(),
203 espController_(new eventsetup::EventSetupsController),
206 processConfiguration_(),
212 deferredExceptionPtrIsSet_(
false),
214 beginJobCalled_(
false),
215 shouldWeStop_(
false),
216 stateMachineWasInErrorState_(
false),
219 exceptionMessageFiles_(),
220 exceptionMessageRuns_(),
221 exceptionMessageLumis_(),
222 alreadyHandlingException_(
false),
223 forceLooperToEnd_(
false),
224 looperBeginJobRun_(
false),
225 forceESCacheClearOnNewRun_(
false),
226 numberOfForkedChildren_(0),
227 numberOfSequentialEventsPerChild_(1),
228 setCpuAffinity_(
false),
229 eventSetupDataToExcludeFromPrefetching_() {
231 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
232 processDesc->addServices(defaultServices, forcedServices);
233 init(processDesc, iToken, iLegacy);
237 std::vector<std::string>
const& defaultServices,
238 std::vector<std::string>
const& forcedServices) :
241 branchIDListHelper_(),
244 espController_(new eventsetup::EventSetupsController),
247 processConfiguration_(),
253 deferredExceptionPtrIsSet_(
false),
255 beginJobCalled_(
false),
256 shouldWeStop_(
false),
257 stateMachineWasInErrorState_(
false),
260 exceptionMessageFiles_(),
261 exceptionMessageRuns_(),
262 exceptionMessageLumis_(),
263 alreadyHandlingException_(
false),
264 forceLooperToEnd_(
false),
265 looperBeginJobRun_(
false),
266 forceESCacheClearOnNewRun_(
false),
267 numberOfForkedChildren_(0),
268 numberOfSequentialEventsPerChild_(1),
269 setCpuAffinity_(
false),
270 asyncStopRequestedWhileProcessingEvents_(
false),
271 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
272 eventSetupDataToExcludeFromPrefetching_()
275 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
276 processDesc->addServices(defaultServices, forcedServices);
285 branchIDListHelper_(),
288 espController_(new eventsetup::EventSetupsController),
291 processConfiguration_(),
297 deferredExceptionPtrIsSet_(
false),
299 beginJobCalled_(
false),
300 shouldWeStop_(
false),
301 stateMachineWasInErrorState_(
false),
304 exceptionMessageFiles_(),
305 exceptionMessageRuns_(),
306 exceptionMessageLumis_(),
307 alreadyHandlingException_(
false),
308 forceLooperToEnd_(
false),
309 looperBeginJobRun_(
false),
310 forceESCacheClearOnNewRun_(
false),
311 numberOfForkedChildren_(0),
312 numberOfSequentialEventsPerChild_(1),
313 setCpuAffinity_(
false),
314 asyncStopRequestedWhileProcessingEvents_(
false),
315 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
316 eventSetupDataToExcludeFromPrefetching_()
318 init(processDesc, token, legacy);
325 branchIDListHelper_(),
328 espController_(new eventsetup::EventSetupsController),
331 processConfiguration_(),
337 deferredExceptionPtrIsSet_(
false),
339 beginJobCalled_(
false),
340 shouldWeStop_(
false),
341 stateMachineWasInErrorState_(
false),
344 exceptionMessageFiles_(),
345 exceptionMessageRuns_(),
346 exceptionMessageLumis_(),
347 alreadyHandlingException_(
false),
348 forceLooperToEnd_(
false),
349 looperBeginJobRun_(
false),
350 forceESCacheClearOnNewRun_(
false),
351 numberOfForkedChildren_(0),
352 numberOfSequentialEventsPerChild_(1),
353 setCpuAffinity_(
false),
354 asyncStopRequestedWhileProcessingEvents_(
false),
355 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
356 eventSetupDataToExcludeFromPrefetching_()
360 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
364 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(config));
376 ROOT::Cintex::Cintex::Enable();
384 boost::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
396 unsigned int nThreads=1;
397 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
398 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
406 unsigned int nStreams =1;
407 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
408 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
416 unsigned int nConcurrentRuns =1;
422 unsigned int nConcurrentLumis =1;
448 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
449 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
453 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
454 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
462 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
471 handler->willBeUsingThreads();
475 boost::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
509 FDEBUG(2) << parameterSet << std::endl;
519 ep->preModuleDelayedGetSignal_.connect(std::cref(
actReg_->preModuleEventDelayedGetSignal_));
520 ep->postModuleDelayedGetSignal_.connect(std::cref(
actReg_->postModuleEventDelayedGetSignal_));
524 if(subProcessParameterSet) {
570 actReg_->preallocateSignal_(bounds);
591 ex.
addContext(
"Calling beginJob for the source");
597 actReg_->postBeginJobSignal_();
608 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
629 c.
call([actReg](){actReg->postEndJobSignal_();});
644 volatile bool child_failed =
false;
645 volatile unsigned int num_children_done = 0;
646 volatile int child_fail_exit_status = 0;
647 volatile int child_fail_signal = 0;
653 void ep_sigchld(
int, siginfo_t*,
void*) {
657 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
660 if(WIFEXITED(stat_loc)) {
662 if(0 != WEXITSTATUS(stat_loc)) {
663 child_fail_exit_status = WEXITSTATUS(stat_loc);
667 if(WIFSIGNALED(stat_loc)) {
669 child_fail_signal = WTERMSIG(stat_loc);
672 p = waitpid(-1, &stat_loc, WNOHANG);
687 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
689 while(numberOfChildren != 0) {
691 numberOfChildren /= 10;
701 class MessageSenderToSource {
703 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
707 const std::vector<int>& m_childrenPipes;
708 long const m_nEventsToProcess;
710 unsigned int m_aliveChildren;
714 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
715 std::vector<int>
const& childrenPipes,
716 long iNEventsToProcess):
717 m_childrenPipes(childrenPipes),
718 m_nEventsToProcess(iNEventsToProcess),
719 m_aliveChildren(childrenSockets.
size()),
722 FD_ZERO(&m_socketSet);
723 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
725 FD_SET(*it, &m_socketSet);
730 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
732 FD_SET(*it, &m_socketSet);
757 MessageSenderToSource::operator()() {
758 multicore::MessageForParent childMsg;
759 LogInfo(
"ForkingController") <<
"I am controller";
762 multicore::MessageForSource sndmsg;
763 sndmsg.startIndex = 0;
764 sndmsg.nIndices = m_nEventsToProcess;
767 fd_set readSockets, errorSockets;
769 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
770 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
773 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
775 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
784 if (FD_ISSET(
idx, &errorSockets)) {
785 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
786 FD_CLR(
idx, &m_socketSet);
789 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
797 if (!FD_ISSET(
idx, &readSockets)) {
803 bool is_pipe =
false;
804 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
808 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
811 FD_CLR(
idx, &m_socketSet);
819 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
821 FD_CLR(
idx, &m_socketSet);
830 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
832 FD_CLR(
idx, &m_socketSet);
837 sndmsg.startIndex += sndmsg.nIndices;
841 }
while (m_aliveChildren > 0);
849 void EventProcessor::possiblyContinueAfterForkChildFailure() {
850 if(child_failed && continueAfterChildFailure_) {
851 if (child_fail_signal) {
852 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
854 }
else if (child_fail_exit_status) {
855 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
856 child_fail_exit_status=0;
858 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
867 if(0 == numberOfForkedChildren_) {
return true;}
868 assert(0<numberOfForkedChildren_);
876 itemType = input_->nextItemType();
878 assert(itemType == InputSource::IsFile);
882 itemType = input_->nextItemType();
883 assert(itemType == InputSource::IsRun);
885 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
887 input_->runAuxiliary()->beginTime());
888 espController_->eventSetupForInstance(ts);
892 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
894 std::vector<eventsetup::DataKey> dataKeys;
895 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
900 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
902 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
903 excludedData = &(itExcludeRec->second);
904 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
912 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
913 itDataKey != itDataKeyEnd;
916 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
917 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
921 recordPtr->
doGet(*itDataKey);
929 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
934 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
937 actReg_->preForkReleaseResourcesSignal_();
938 input_->doPreForkReleaseResources();
939 schedule_->preForkReleaseResources();
944 unsigned int childIndex = 0;
945 unsigned int const kMaxChildren = numberOfForkedChildren_;
946 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
947 std::vector<pid_t> childrenIds;
948 childrenIds.reserve(kMaxChildren);
949 std::vector<int> childrenSockets;
950 childrenSockets.reserve(kMaxChildren);
951 std::vector<int> childrenPipes;
952 childrenPipes.reserve(kMaxChildren);
953 std::vector<int> childrenSocketsCopy;
954 childrenSocketsCopy.reserve(kMaxChildren);
955 std::vector<int> childrenPipesCopy;
956 childrenPipesCopy.reserve(kMaxChildren);
963 int sockets[2], fd_flags;
964 for(; childIndex < kMaxChildren; ++childIndex) {
966 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
967 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
971 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
975 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
976 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
981 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
982 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
985 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
986 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
989 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
990 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
995 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
996 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
999 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1000 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1004 childrenPipesCopy = childrenPipes;
1005 childrenSocketsCopy = childrenSockets;
1007 pid_t
value = fork();
1013 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1016 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1023 std::stringstream stout;
1024 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1025 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1026 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1028 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1029 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1032 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1033 if(setCpuAffinity_) {
1041 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1043 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1046 CPU_SET(childIndex, &mask);
1047 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1048 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1060 LogError(
"ForkingChild") <<
"failed to create a child";
1063 childrenIds.push_back(value);
1064 childrenSockets.push_back(sockets[0]);
1065 childrenPipes.push_back(pipes[0]);
1068 if(childIndex < kMaxChildren) {
1069 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1070 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1073 input_->doPostForkReacquireResources(receiver);
1074 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1079 jobReport->parentAfterFork(jobReportFile);
1089 sigset_t blockingSigSet;
1090 sigset_t unblockingSigSet;
1092 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1093 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1094 sigaddset(&blockingSigSet, SIGCHLD);
1095 sigaddset(&blockingSigSet, SIGUSR2);
1096 sigaddset(&blockingSigSet, SIGINT);
1097 sigdelset(&unblockingSigSet, SIGCHLD);
1098 sigdelset(&unblockingSigSet, SIGUSR2);
1099 sigdelset(&unblockingSigSet, SIGINT);
1100 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1104 bool too_many_fds =
false;
1105 if (pipes[1]+1 > FD_SETSIZE) {
1106 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1107 too_many_fds =
true;
1113 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1114 boost::thread senderThread(sender);
1116 if(not too_many_fds) {
1119 possiblyContinueAfterForkChildFailure();
1120 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1121 sigsuspend(&unblockingSigSet);
1122 possiblyContinueAfterForkChildFailure();
1123 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1126 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1128 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1130 LogError(
"ForkingStopping") <<
"child failed";
1133 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1136 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1137 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1138 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1139 it != itEnd; ++it) {
1142 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1143 while(num_children_done != kMaxChildren) {
1144 sigsuspend(&unblockingSigSet);
1146 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1149 senderThread.join();
1150 if(child_failed && !continueAfterChildFailure_) {
1151 if (child_fail_signal) {
1152 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1153 }
else if (child_fail_exit_status) {
1154 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1156 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1160 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1165 std::vector<ModuleDescription const*>
1166 EventProcessor::getAllModuleDescriptions()
const {
1167 return schedule_->getAllModuleDescriptions();
1171 EventProcessor::totalEvents()
const {
1172 return schedule_->totalEvents();
1176 EventProcessor::totalEventsPassed()
const {
1177 return schedule_->totalEventsPassed();
1181 EventProcessor::totalEventsFailed()
const {
1182 return schedule_->totalEventsFailed();
1186 EventProcessor::enableEndPaths(
bool active) {
1187 schedule_->enableEndPaths(active);
1191 EventProcessor::endPathsEnabled()
const {
1192 return schedule_->endPathsEnabled();
1197 schedule_->getTriggerReport(rep);
1201 EventProcessor::clearCounters() {
1202 schedule_->clearCounters();
1206 std::auto_ptr<statemachine::Machine>
1207 EventProcessor::createStateMachine() {
1214 << fileMode_ <<
".\n"
1215 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1225 << emptyRunLumiMode_ <<
".\n"
1226 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1233 machine->initiate();
1239 bool returnValue =
false;
1244 returnCode = epSignal;
1251 EventProcessor::runToCompletion() {
1254 asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1255 std::auto_ptr<statemachine::Machine> machine;
1260 stateMachineWasInErrorState_ =
false;
1265 machine = createStateMachine();
1266 nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1267 asyncStopRequestedWhileProcessingEvents_=
false;
1276 if(numberOfForkedChildren_ > 0) {
1277 size_t size = preg_->size();
1278 more = input_->skipForForking();
1280 if(size < preg_->
size()) {
1281 principalCache_.adjustIndexesAfterProductRegistryAddition();
1283 principalCache_.adjustEventsToNewProductRegistry(preg_);
1286 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1288 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1290 if(checkForAsyncStopRequest(returnCode)) {
1291 forceLooperToEnd_ =
true;
1293 forceLooperToEnd_ =
false;
1297 if(itemType == InputSource::IsEvent) {
1299 if(asyncStopRequestedWhileProcessingEvents_) {
1300 forceLooperToEnd_ =
true;
1302 forceLooperToEnd_ =
false;
1303 returnCode = asyncStopStatusCodeFromProcessingEvents_;
1306 itemType = nextItemTypeFromProcessingEvents_;
1309 if(itemType == InputSource::IsEvent) {
1311 else if(itemType == InputSource::IsStop) {
1314 else if(itemType == InputSource::IsFile) {
1317 else if(itemType == InputSource::IsRun) {
1318 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1320 else if(itemType == InputSource::IsLumi) {
1323 else if(itemType == InputSource::IsSynchronize) {
1329 <<
"Unknown next item type passed to EventProcessor\n"
1330 <<
"Please report this error to the Framework group\n";
1332 if(machine->terminated()) {
1384 alreadyHandlingException_ =
true;
1385 terminateMachine(machine);
1386 alreadyHandlingException_ =
false;
1387 if (!exceptionMessageLumis_.empty()) {
1390 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1393 if (!exceptionMessageRuns_.empty()) {
1396 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1399 if (!exceptionMessageFiles_.empty()) {
1402 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1408 if(machine->terminated()) {
1409 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1413 if(stateMachineWasInErrorState_) {
1415 <<
"The boost state machine in the EventProcessor exited after\n"
1416 <<
"entering the Error state.\n";
1420 if(machine.get() != 0) {
1421 terminateMachine(machine);
1423 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1424 <<
"Please report this error to the Framework group\n";
1430 void EventProcessor::readFile() {
1431 FDEBUG(1) <<
" \treadFile\n";
1432 size_t size = preg_->size();
1433 fb_ = input_->readFile();
1434 if(size < preg_->
size()) {
1435 principalCache_.adjustIndexesAfterProductRegistryAddition();
1437 principalCache_.adjustEventsToNewProductRegistry(preg_);
1438 if((numberOfForkedChildren_ > 0)
or
1439 (preallocations_.numberOfStreams()>1 and
1440 preallocations_.numberOfThreads()>1)) {
1441 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1445 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1446 if (fb_.get() !=
nullptr) {
1447 input_->closeFile(fb_.get(), cleaningUpAfterException);
1449 FDEBUG(1) <<
"\tcloseInputFile\n";
1452 void EventProcessor::openOutputFiles() {
1453 if (fb_.get() !=
nullptr) {
1454 schedule_->openOutputFiles(*fb_);
1455 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1457 FDEBUG(1) <<
"\topenOutputFiles\n";
1460 void EventProcessor::closeOutputFiles() {
1461 if (fb_.get() !=
nullptr) {
1462 schedule_->closeOutputFiles();
1463 if(hasSubProcess()) subProcess_->closeOutputFiles();
1465 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1468 void EventProcessor::respondToOpenInputFile() {
1469 if(hasSubProcess()) {
1470 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1472 if (fb_.get() !=
nullptr) {
1473 schedule_->respondToOpenInputFile(*fb_);
1474 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1476 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1479 void EventProcessor::respondToCloseInputFile() {
1480 if (fb_.get() !=
nullptr) {
1481 schedule_->respondToCloseInputFile(*fb_);
1482 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1484 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1487 void EventProcessor::startingNewLoop() {
1488 shouldWeStop_ =
false;
1491 if(looper_ && looperBeginJobRun_) {
1492 looper_->doStartingNewLoop();
1494 FDEBUG(1) <<
"\tstartingNewLoop\n";
1497 bool EventProcessor::endOfLoop() {
1500 looper_->setModuleChanger(&changer);
1502 looper_->setModuleChanger(
nullptr);
1503 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1506 FDEBUG(1) <<
"\tendOfLoop\n";
1510 void EventProcessor::rewindInput() {
1513 FDEBUG(1) <<
"\trewind\n";
1516 void EventProcessor::prepareForNextLoop() {
1517 looper_->prepareForNextLoop(esp_.get());
1518 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1521 bool EventProcessor::shouldWeCloseOutput()
const {
1522 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1523 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1526 void EventProcessor::doErrorStuff() {
1527 FDEBUG(1) <<
"\tdoErrorStuff\n";
1529 <<
"The EventProcessor state machine encountered an unexpected event\n"
1530 <<
"and went to the error state\n"
1531 <<
"Will attempt to terminate processing normally\n"
1532 <<
"(IF using the looper the next loop will be attempted)\n"
1533 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1534 stateMachineWasInErrorState_ =
true;
1539 input_->doBeginRun(runPrincipal, &processContext_);
1542 if(forceESCacheClearOnNewRun_){
1543 espController_->forceCacheClear();
1545 espController_->eventSetupForInstance(ts);
1547 if(looper_ && looperBeginJobRun_==
false) {
1549 looper_->beginOfJob(es);
1550 looperBeginJobRun_ =
true;
1551 looper_->doStartingNewLoop();
1555 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1556 if(hasSubProcess()) {
1557 subProcess_->doBeginRun(runPrincipal, ts);
1562 looper_->doBeginRun(runPrincipal, es, &processContext_);
1566 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1567 schedule_->processOneStream<Traits>(
i,runPrincipal, es);
1568 if(hasSubProcess()) {
1569 subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1581 input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1582 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1584 espController_->eventSetupForInstance(ts);
1587 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1589 schedule_->processOneStream<Traits>(
i,runPrincipal, es, cleaningUpAfterException);
1590 if(hasSubProcess()) {
1591 subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1601 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1602 if(hasSubProcess()) {
1603 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1608 looper_->doEndRun(runPrincipal, es, &processContext_);
1614 input_->doBeginLumi(lumiPrincipal, &processContext_);
1619 rng->preBeginLumi(lb);
1625 espController_->eventSetupForInstance(ts);
1629 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1630 if(hasSubProcess()) {
1631 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1634 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1636 looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1639 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1641 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es);
1642 if(hasSubProcess()) {
1643 subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1647 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1655 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1660 espController_->eventSetupForInstance(ts);
1663 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1665 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es, cleaningUpAfterException);
1666 if(hasSubProcess()) {
1667 subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1671 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1677 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1678 if(hasSubProcess()) {
1679 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1682 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1684 looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1689 if (principalCache_.hasRunPrincipal()) {
1691 <<
"EventProcessor::readRun\n"
1692 <<
"Illegal attempt to insert run into cache\n"
1693 <<
"Contact a Framework Developer\n";
1695 boost::shared_ptr<RunPrincipal> rp(
new RunPrincipal(input_->runAuxiliary(),
1697 *processConfiguration_,
1698 historyAppender_.get(),
1700 input_->readRun(*rp, *historyAppender_);
1701 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1702 principalCache_.insert(rp);
1707 principalCache_.merge(input_->runAuxiliary(), preg_);
1708 auto runPrincipal =principalCache_.runPrincipalPtr();
1709 input_->readAndMergeRun(*runPrincipal);
1710 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1711 return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1714 int EventProcessor::readLuminosityBlock() {
1715 if (principalCache_.hasLumiPrincipal()) {
1717 <<
"EventProcessor::readRun\n"
1718 <<
"Illegal attempt to insert lumi into cache\n"
1719 <<
"Contact a Framework Developer\n";
1721 if (!principalCache_.hasRunPrincipal()) {
1723 <<
"EventProcessor::readRun\n"
1724 <<
"Illegal attempt to insert lumi into cache\n"
1725 <<
"Run is invalid\n"
1726 <<
"Contact a Framework Developer\n";
1730 *processConfiguration_,
1731 historyAppender_.get(),
1733 input_->readLuminosityBlock(*lbp, *historyAppender_);
1734 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1735 principalCache_.insert(lbp);
1736 return input_->luminosityBlock();
1739 int EventProcessor::readAndMergeLumi() {
1740 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1741 input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1742 return input_->luminosityBlock();
1758 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1759 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1760 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1764 principalCache_.deleteLumi(phid, run, lumi);
1765 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1766 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1772 unsigned int iStreamIndex,
1773 std::atomic<bool>* iFinishedProcessingEvents,
1774 tbb::task* iWaitTask):
1776 m_streamID(iStreamIndex),
1777 m_finishedProcessingEvents(iFinishedProcessingEvents),
1778 m_waitTask(iWaitTask){}
1781 m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1782 m_waitTask->decrement_ref_count();
1792 void EventProcessor::processEventsForStreamAsync(
unsigned int iStreamIndex,
1793 std::atomic<bool>* finishedProcessingEvents) {
1797 if(preallocations_.numberOfStreams()>1) {
1799 handler->initializeThisThreadForUse();
1802 if(iStreamIndex==0) {
1806 if(shouldWeStop()) {
1809 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1819 std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1821 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1827 auto sr = input_->resourceSharedWithDelayedReader();
1828 std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1830 delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1834 if (InputSource::IsEvent !=itemType) {
1835 nextItemTypeFromProcessingEvents_ = itemType;
1836 finishedProcessingEvents->store(
true,std::memory_order_release);
1840 if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1844 readEvent(iStreamIndex);
1847 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1855 bool expected =
false;
1856 if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,
true)) {
1857 deferredExceptionPtr_ = std::current_exception();
1863 void EventProcessor::readAndProcessEvent() {
1864 if(numberOfForkedChildren_>0) {
1869 nextItemTypeFromProcessingEvents_ = InputSource::IsEvent;
1870 asyncStopRequestedWhileProcessingEvents_ =
false;
1872 std::atomic<bool> finishedProcessingEvents{
false};
1878 tbb::task* eventLoopWaitTask{
new (tbb::task::allocate_root()) tbb::empty_task{}};
1879 eventLoopWaitTask->increment_ref_count();
1881 const unsigned int kNumStreams = preallocations_.numberOfStreams();
1882 unsigned int iStreamIndex = 0;
1883 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1884 eventLoopWaitTask->increment_ref_count();
1885 tbb::task::enqueue( *(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1888 eventLoopWaitTask->increment_ref_count();
1889 eventLoopWaitTask->spawn_and_wait_for_all(*(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1893 if(deferredExceptionPtrIsSet_) {
1894 std::rethrow_exception(deferredExceptionPtr_);
1897 void EventProcessor::readEvent(
unsigned int iStreamIndex) {
1899 auto&
event = principalCache_.eventPrincipal(iStreamIndex);
1901 input_->readEvent(
event, streamContext);
1902 FDEBUG(1) <<
"\treadEvent\n";
1905 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1906 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1910 rng->postEventRead(ev);
1912 assert(pep->luminosityBlockPrincipalPtrValid());
1913 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1914 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1922 schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
1923 if(hasSubProcess()) {
1924 subProcess_->doEvent(*pep);
1930 bool randomAccess = input_->randomAccess();
1938 StreamContext streamContext(pep->streamID(), &processContext_);
1939 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
1944 input_->skipEvents(-2);
1952 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
1956 FDEBUG(1) <<
"\tprocessEvent\n";
1957 pep->clearEventPrincipal();
1960 bool EventProcessor::shouldWeStop()
const {
1961 FDEBUG(1) <<
"\tshouldWeStop\n";
1962 if(shouldWeStop_)
return true;
1963 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
1967 exceptionMessageFiles_ =
message;
1971 exceptionMessageRuns_ =
message;
1975 exceptionMessageLumis_ =
message;
1978 bool EventProcessor::alreadyHandlingException()
const {
1979 return alreadyHandlingException_;
1982 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
1983 if(iMachine.get() != 0) {
1984 if(!iMachine->terminated()) {
1985 forceLooperToEnd_ =
true;
1987 forceLooperToEnd_ =
false;
1990 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
1992 if(iMachine->terminated()) {
1993 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
ProcessContext processContext_
void clear()
Not thread safe.
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
Timestamp const & endTime() const
unsigned int numberOfRuns() const
int numberOfForkedChildren_
bool lastOperationSucceeded() const
void call(boost::function< void(void)>)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< ActivityRegistry > actReg_
unsigned int numberOfThreads() const
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< ProcessConfiguration > processConfiguration_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
boost::shared_ptr< edm::ParameterSet > parameterSet()
bool continueAfterChildFailure_
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
std::unique_ptr< SignallingProductRegistry > preg_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
unsigned int numberOfLuminosityBlocks() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
boost::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)