71 #include "boost/thread/xtime.hpp" 72 #include "boost/range/adaptor/reversed.hpp" 86 #include <sys/types.h> 88 #include <sys/socket.h> 89 #include <sys/select.h> 90 #include <sys/fcntl.h> 104 class SendSourceTerminationSignalIfException {
108 ~SendSourceTerminationSignalIfException() {
113 void completedSuccessfully() {
125 std::unique_ptr<InputSource>
128 std::shared_ptr<ProductRegistry> preg,
129 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
130 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
131 std::shared_ptr<ActivityRegistry> areg,
132 std::shared_ptr<ProcessConfiguration const> processConfiguration,
135 if(main_input == 0) {
137 <<
"There must be exactly one source in the configuration.\n" 138 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
143 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
146 filler->fill(descriptions);
150 descriptions.validate(*main_input,
std::string(
"source"));
154 std::ostringstream ost;
155 ost <<
"Validating configuration of input source of type " << modtype;
171 processConfiguration.get(),
172 ModuleDescription::getUniqueID());
178 areg->preSourceConstructionSignal_(md);
179 std::unique_ptr<InputSource>
input;
182 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
184 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
185 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
186 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
190 std::ostringstream ost;
191 ost <<
"Constructing input source of type " << modtype;
199 std::shared_ptr<EDLooperBase>
203 std::shared_ptr<EDLooperBase> vLooper;
205 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
207 if(loopers.size() == 0) {
211 assert(1 == loopers.size());
213 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
230 std::vector<std::string>
const& defaultServices,
231 std::vector<std::string>
const& forcedServices) :
234 branchIDListHelper_(),
237 espController_(new eventsetup::EventSetupsController),
240 processConfiguration_(),
246 deferredExceptionPtrIsSet_(
false),
250 beginJobCalled_(
false),
251 shouldWeStop_(
false),
252 stateMachineWasInErrorState_(
false),
255 exceptionMessageFiles_(),
256 exceptionMessageRuns_(),
257 exceptionMessageLumis_(),
258 alreadyHandlingException_(
false),
259 forceLooperToEnd_(
false),
260 looperBeginJobRun_(
false),
261 forceESCacheClearOnNewRun_(
false),
262 numberOfForkedChildren_(0),
263 numberOfSequentialEventsPerChild_(1),
264 setCpuAffinity_(
false),
265 eventSetupDataToExcludeFromPrefetching_() {
267 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
268 processDesc->addServices(defaultServices, forcedServices);
269 init(processDesc, iToken, iLegacy);
273 std::vector<std::string>
const& defaultServices,
274 std::vector<std::string>
const& forcedServices) :
313 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
314 processDesc->addServices(defaultServices, forcedServices);
358 init(processDesc, token, legacy);
402 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
406 auto processDesc = std::make_shared<ProcessDesc>(
config);
424 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
428 bool const hasSubProcesses = !subProcessVParameterSet.empty();
436 unsigned int nThreads=1;
437 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
438 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
446 unsigned int nStreams =1;
447 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
448 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
454 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
460 unsigned int nConcurrentRuns =1;
466 unsigned int nConcurrentLumis =1;
492 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
493 for(
auto const& ps : excluded) {
495 ps.getUntrackedParameter<
std::string>(
"label",
""));
505 auto& serviceSets = processDesc->getServicesPSets();
514 handler->willBeUsingThreads();
518 std::shared_ptr<CommonParams>
common(items.
initMisc(*parameterSet));
560 FDEBUG(2) << parameterSet << std::endl;
572 for(
auto& subProcessPSet : subProcessVParameterSet) {
620 actReg_->preallocateSignal_(bounds);
648 ex.
addContext(
"Calling beginJob for the source");
654 actReg_->postBeginJobSignal_();
665 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
674 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
678 c.
call([actReg](){actReg->preEndJobSignal_();});
687 c.
call([actReg](){actReg->postEndJobSignal_();});
702 volatile bool child_failed =
false;
703 volatile unsigned int num_children_done = 0;
704 volatile int child_fail_exit_status = 0;
705 volatile int child_fail_signal = 0;
711 void ep_sigchld(
int, siginfo_t*,
void*) {
715 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
718 if(WIFEXITED(stat_loc)) {
720 if(0 != WEXITSTATUS(stat_loc)) {
721 child_fail_exit_status = WEXITSTATUS(stat_loc);
725 if(WIFSIGNALED(stat_loc)) {
727 child_fail_signal = WTERMSIG(stat_loc);
730 p = waitpid(-1, &stat_loc, WNOHANG);
745 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
747 while(numberOfChildren != 0) {
749 numberOfChildren /= 10;
759 class MessageSenderToSource {
761 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
765 const std::vector<int>& m_childrenPipes;
766 long const m_nEventsToProcess;
768 unsigned int m_aliveChildren;
772 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
773 std::vector<int>
const& childrenPipes,
774 long iNEventsToProcess):
775 m_childrenPipes(childrenPipes),
776 m_nEventsToProcess(iNEventsToProcess),
777 m_aliveChildren(childrenSockets.size()),
780 FD_ZERO(&m_socketSet);
781 for (
auto const socket : childrenSockets) {
782 FD_SET(socket, &m_socketSet);
783 if (socket > m_maxFd) {
787 for (
auto const pipe : childrenPipes) {
788 FD_SET(
pipe, &m_socketSet);
789 if (
pipe > m_maxFd) {
813 MessageSenderToSource::operator()() {
815 LogInfo(
"ForkingController") <<
"I am controller";
820 sndmsg.
nIndices = m_nEventsToProcess;
823 fd_set readSockets, errorSockets;
825 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
826 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
829 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
831 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
840 if (FD_ISSET(
idx, &errorSockets)) {
841 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
842 FD_CLR(
idx, &m_socketSet);
845 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
853 if (!FD_ISSET(
idx, &readSockets)) {
859 bool is_pipe =
false;
860 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
864 while (((rc = read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
867 FD_CLR(
idx, &m_socketSet);
875 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.
sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
877 FD_CLR(
idx, &m_socketSet);
888 FD_CLR(
idx, &m_socketSet);
897 }
while (m_aliveChildren > 0);
907 if (child_fail_signal) {
908 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
910 }
else if (child_fail_exit_status) {
911 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
912 child_fail_exit_status=0;
914 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
932 itemType =
input_->nextItemType();
938 itemType =
input_->nextItemType();
941 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " <<
input_->runAuxiliary()->run();
943 input_->runAuxiliary()->beginTime());
948 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
950 std::vector<eventsetup::DataKey> dataKeys;
951 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
959 excludedData = &(itExcludeRec->second);
960 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
968 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
969 itDataKey != itDataKeyEnd;
972 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
973 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
977 recordPtr->
doGet(*itDataKey);
985 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
993 actReg_->preForkReleaseResourcesSignal_();
994 input_->doPreForkReleaseResources();
1000 unsigned int childIndex = 0;
1002 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1003 std::vector<pid_t> childrenIds;
1004 childrenIds.reserve(kMaxChildren);
1005 std::vector<int> childrenSockets;
1006 childrenSockets.reserve(kMaxChildren);
1007 std::vector<int> childrenPipes;
1008 childrenPipes.reserve(kMaxChildren);
1009 std::vector<int> childrenSocketsCopy;
1010 childrenSocketsCopy.reserve(kMaxChildren);
1011 std::vector<int> childrenPipesCopy;
1012 childrenPipesCopy.reserve(kMaxChildren);
1019 int sockets[2], fd_flags;
1020 for(; childIndex < kMaxChildren; ++childIndex) {
1022 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1023 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1027 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1031 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1032 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1037 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1038 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1041 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1042 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1045 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1046 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1051 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1052 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1055 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1056 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1060 childrenPipesCopy = childrenPipes;
1061 childrenSocketsCopy = childrenSockets;
1063 pid_t
value = fork();
1069 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1072 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1079 std::stringstream stout;
1080 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1081 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1082 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1084 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1085 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1088 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1097 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1099 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1102 CPU_SET(childIndex, &mask);
1103 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1104 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1116 LogError(
"ForkingChild") <<
"failed to create a child";
1119 childrenIds.push_back(value);
1120 childrenSockets.push_back(sockets[0]);
1121 childrenPipes.push_back(pipes[0]);
1124 if(childIndex < kMaxChildren) {
1125 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1126 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1128 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1129 input_->doPostForkReacquireResources(receiver);
1130 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1135 jobReport->parentAfterFork(jobReportFile);
1145 sigset_t blockingSigSet;
1146 sigset_t unblockingSigSet;
1148 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1149 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1150 sigaddset(&blockingSigSet, SIGCHLD);
1151 sigaddset(&blockingSigSet, SIGUSR2);
1152 sigaddset(&blockingSigSet, SIGINT);
1153 sigdelset(&unblockingSigSet, SIGCHLD);
1154 sigdelset(&unblockingSigSet, SIGUSR2);
1155 sigdelset(&unblockingSigSet, SIGINT);
1156 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1160 bool too_many_fds =
false;
1161 if (pipes[1]+1 > FD_SETSIZE) {
1162 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1163 too_many_fds =
true;
1170 boost::thread senderThread(sender);
1172 if(not too_many_fds) {
1177 sigsuspend(&unblockingSigSet);
1179 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1182 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1184 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1186 LogError(
"ForkingStopping") <<
"child failed";
1189 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1192 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1193 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1194 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1195 it != itEnd; ++it) {
1198 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1199 while(num_children_done != kMaxChildren) {
1200 sigsuspend(&unblockingSigSet);
1202 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1205 senderThread.join();
1207 if (child_fail_signal) {
1208 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1209 }
else if (child_fail_exit_status) {
1210 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1212 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1216 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1221 std::vector<ModuleDescription const*>
1223 return schedule_->getAllModuleDescriptions();
1262 std::unique_ptr<statemachine::Machine>
1271 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1282 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1285 auto machine = std::make_unique<statemachine::Machine>(
1290 machine->initiate();
1296 bool returnValue =
false;
1312 std::unique_ptr<statemachine::Machine> machine;
1336 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1337 more =
input_->skipForForking();
1338 sentry.completedSuccessfully();
1341 if(size < preg_->
size()) {
1348 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1350 sentry.completedSuccessfully();
1353 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1395 <<
"Unknown next item type passed to EventProcessor\n" 1396 <<
"Please report this error to the Framework group\n";
1398 if(machine->terminated()) {
1474 if(machine->terminated()) {
1475 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1481 <<
"The boost state machine in the EventProcessor exited after\n" 1482 <<
"entering the Error state.\n";
1486 if(machine.get() !=
nullptr) {
1489 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n" 1490 <<
"Please report this error to the Framework group\n";
1497 FDEBUG(1) <<
" \treadFile\n";
1499 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1502 if(size < preg_->
size()) {
1511 sentry.completedSuccessfully();
1515 if (
fb_.get() !=
nullptr) {
1516 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1517 input_->closeFile(
fb_.get(), cleaningUpAfterException);
1518 sentry.completedSuccessfully();
1520 FDEBUG(1) <<
"\tcloseInputFile\n";
1524 if (
fb_.get() !=
nullptr) {
1528 FDEBUG(1) <<
"\topenOutputFiles\n";
1532 if (
fb_.get() !=
nullptr) {
1536 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1541 if (
fb_.get() !=
nullptr) {
1545 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1549 if (
fb_.get() !=
nullptr) {
1553 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1563 FDEBUG(1) <<
"\tstartingNewLoop\n";
1569 looper_->setModuleChanger(&changer);
1571 looper_->setModuleChanger(
nullptr);
1575 FDEBUG(1) <<
"\tendOfLoop\n";
1582 FDEBUG(1) <<
"\trewind\n";
1587 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1591 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1594 if(subProcess.shouldWeCloseOutput()) {
1600 return schedule_->shouldWeCloseOutput();
1604 FDEBUG(1) <<
"\tdoErrorStuff\n";
1606 <<
"The EventProcessor state machine encountered an unexpected event\n" 1607 <<
"and went to the error state\n" 1608 <<
"Will attempt to terminate processing normally\n" 1609 <<
"(IF using the looper the next loop will be attempted)\n" 1610 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1617 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1620 sentry.completedSuccessfully();
1629 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1631 sentry.completedSuccessfully();
1643 globalWaitTask->increment_ref_count();
1650 globalWaitTask->wait_for_all();
1651 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1652 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1662 streamLoopWaitTask->increment_ref_count();
1666 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1674 streamLoopWaitTask->wait_for_all();
1675 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1676 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1688 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1693 sentry.completedSuccessfully();
1699 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1701 sentry.completedSuccessfully();
1707 streamLoopWaitTask->increment_ref_count();
1711 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1718 cleaningUpAfterException);
1720 streamLoopWaitTask->wait_for_all();
1721 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1722 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1732 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1733 for_all(
subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1744 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1747 sentry.completedSuccessfully();
1753 rng->preBeginLumi(lb);
1760 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1762 sentry.completedSuccessfully();
1768 globalWaitTask->increment_ref_count();
1775 globalWaitTask->wait_for_all();
1776 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1777 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1780 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1787 streamLoopWaitTask->increment_ref_count();
1791 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1798 streamLoopWaitTask->wait_for_all();
1799 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1800 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1804 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1813 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1818 sentry.completedSuccessfully();
1825 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1827 sentry.completedSuccessfully();
1833 streamLoopWaitTask->increment_ref_count();
1837 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1844 cleaningUpAfterException);
1845 streamLoopWaitTask->wait_for_all();
1846 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1847 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1850 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1857 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1858 for_all(
subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1860 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1869 <<
"EventProcessor::readRun\n" 1870 <<
"Illegal attempt to insert run into cache\n" 1871 <<
"Contact a Framework Developer\n";
1875 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1877 sentry.completedSuccessfully();
1879 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1888 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1889 input_->readAndMergeRun(*runPrincipal);
1890 sentry.completedSuccessfully();
1892 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1899 <<
"EventProcessor::readRun\n" 1900 <<
"Illegal attempt to insert lumi into cache\n" 1901 <<
"Contact a Framework Developer\n";
1905 <<
"EventProcessor::readRun\n" 1906 <<
"Illegal attempt to insert lumi into cache\n" 1907 <<
"Run is invalid\n" 1908 <<
"Contact a Framework Developer\n";
1912 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1914 sentry.completedSuccessfully();
1918 return input_->luminosityBlock();
1924 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1926 sentry.completedSuccessfully();
1928 return input_->luminosityBlock();
1945 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1946 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1951 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1952 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1956 std::atomic<bool>* finishedProcessingEvents) {
1965 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1974 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1982 finishedProcessingEvents->store(
true,std::memory_order_release);
1992 firstEventInBlock_ =
false;
1996 bool expected =
false;
2007 unsigned int iStreamIndex,
2008 std::atomic<bool>* finishedProcessingEvents)
2010 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr
const* iPtr) {
2012 bool expected =
false;
2021 iTask->decrement_ref_count();
2037 iTask->decrement_ref_count();
2053 eventLoopWaitTask->increment_ref_count();
2055 eventLoopWaitTask->wait_for_all();
2061 std::atomic<bool> finishedProcessingEvents{
false};
2062 auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2070 auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2071 eventLoopWaitTask->increment_ref_count();
2074 unsigned int iStreamIndex = 0;
2075 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2076 eventLoopWaitTask->increment_ref_count();
2077 tbb::task::enqueue( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
2081 eventLoopWaitTask->increment_ref_count();
2082 eventLoopWaitTask->spawn_and_wait_for_all( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
2096 SendSourceTerminationSignalIfException sentry(
actReg_.get());
2098 sentry.completedSuccessfully();
2100 FDEBUG(1) <<
"\treadEvent\n";
2104 unsigned int iStreamIndex) {
2110 rng->postEventRead(ev);
2112 assert(pep->luminosityBlockPrincipalPtrValid());
2117 tbb::task::allocate_root(),
2118 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 2127 FDEBUG(1) <<
"\tprocessEvent\n";
2128 pep->clearEventPrincipal();
2139 afterProcessTask =
std::move(finalizeEventTask);
2145 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 2154 subProcess.doEventAsync(finalizeEventTask,*pep);
2157 finalizeEventTask.doneWaiting(*iPtr);
2164 iStreamIndex,*pep,
esp_->eventSetup());
2169 bool randomAccess =
input_->randomAccess();
2178 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
2195 FDEBUG(1) <<
"\tshouldWeStop\n";
2199 if(subProcess.terminate()) {
2225 if(iMachine.get() !=
nullptr) {
2226 if(!iMachine->terminated()) {
2232 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2234 if(iMachine->terminated()) {
2235 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
virtual void openOutputFiles() override
virtual void setExceptionMessageLumis(std::string &message) override
SharedResourcesAcquirer sourceResourcesAcquirer_
virtual void doErrorStuff() override
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
virtual statemachine::Run readAndMergeRun() override
edm::propagate_const< std::unique_ptr< InputSource > > input_
virtual bool endOfLoop() override
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
static PFTauRenderPlugin instance
bool stateMachineWasInErrorState_
bool alreadyHandlingException_
ParameterSetID id() const
virtual bool shouldWeCloseOutput() const override
void possiblyContinueAfterForkChildFailure()
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual int readLuminosityBlock() override
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
int numberOfForkedChildren_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
std::unique_ptr< statemachine::Machine > createStateMachine()
unsigned int numberOfThreads() const
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException) override
void setAtEndTransition(bool iAtEnd)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
bool hasRunPrincipal() const
void installCustomHandler(int signum, CFUNC func)
virtual void setExceptionMessageRuns(std::string &message) override
std::set< std::pair< std::string, std::string > > ExcludedData
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static size_t sizeForBuffer()
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
virtual void startingNewLoop() override
virtual void rewindInput() override
bool alreadyPrinted() const
bool forkProcess(std::string const &jobReportFile)
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
static size_t sizeForBuffer()
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void setEndTime(Timestamp const &time)
virtual bool shouldWeStop() const override
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
bool continueAfterChildFailure_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual statemachine::Run readRun() override
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
virtual void prepareForNextLoop() override
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual bool alreadyHandlingException() const override
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
def pipe(cmdline, input=None)
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
virtual StatusCode runToCompletion() override
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
virtual void readFile() override
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
edm::RunNumber_t runNumber() const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
virtual void closeOutputFiles() override
virtual void writeRun(statemachine::Run const &run) override
virtual void closeInputFile(bool cleaningUpAfterException) override
virtual void respondToOpenInputFile() override
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
edm::ProcessHistoryID const & processHistoryID() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual int readAndMergeLumi() override
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
virtual void respondToCloseInputFile() override
virtual void readAndProcessEvent() override
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
virtual void deleteRunFromCache(statemachine::Run const &run) override
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
virtual void setExceptionMessageFiles(std::string &message) override
T get(const Candidate &c)
virtual void beginRun(statemachine::Run const &run) override
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_