71 #include "boost/thread/xtime.hpp" 72 #include "boost/range/adaptor/reversed.hpp" 86 #include <sys/types.h> 88 #include <sys/socket.h> 89 #include <sys/select.h> 90 #include <sys/fcntl.h> 104 class SendSourceTerminationSignalIfException {
108 ~SendSourceTerminationSignalIfException() {
113 void completedSuccessfully() {
125 std::unique_ptr<InputSource>
128 std::shared_ptr<ProductRegistry> preg,
129 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
130 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
131 std::shared_ptr<ActivityRegistry> areg,
132 std::shared_ptr<ProcessConfiguration const> processConfiguration,
135 if(main_input == 0) {
137 <<
"There must be exactly one source in the configuration.\n" 138 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
143 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
146 filler->fill(descriptions);
150 descriptions.validate(*main_input,
std::string(
"source"));
154 std::ostringstream ost;
155 ost <<
"Validating configuration of input source of type " << modtype;
171 processConfiguration.get(),
172 ModuleDescription::getUniqueID());
178 areg->preSourceConstructionSignal_(md);
179 std::unique_ptr<InputSource>
input;
182 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
184 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
185 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
186 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
190 std::ostringstream ost;
191 ost <<
"Constructing input source of type " << modtype;
199 std::shared_ptr<EDLooperBase>
203 std::shared_ptr<EDLooperBase> vLooper;
205 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
207 if(loopers.size() == 0) {
211 assert(1 == loopers.size());
213 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
230 std::vector<std::string>
const& defaultServices,
231 std::vector<std::string>
const& forcedServices) :
234 branchIDListHelper_(),
237 espController_(new eventsetup::EventSetupsController),
240 processConfiguration_(),
246 deferredExceptionPtrIsSet_(
false),
250 beginJobCalled_(
false),
251 shouldWeStop_(
false),
252 stateMachineWasInErrorState_(
false),
255 exceptionMessageFiles_(),
256 exceptionMessageRuns_(),
257 exceptionMessageLumis_(),
258 alreadyHandlingException_(
false),
259 forceLooperToEnd_(
false),
260 looperBeginJobRun_(
false),
261 forceESCacheClearOnNewRun_(
false),
262 numberOfForkedChildren_(0),
263 numberOfSequentialEventsPerChild_(1),
264 setCpuAffinity_(
false),
265 eventSetupDataToExcludeFromPrefetching_() {
267 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
268 processDesc->addServices(defaultServices, forcedServices);
269 init(processDesc, iToken, iLegacy);
273 std::vector<std::string>
const& defaultServices,
274 std::vector<std::string>
const& forcedServices) :
313 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
314 processDesc->addServices(defaultServices, forcedServices);
358 init(processDesc, token, legacy);
402 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
406 auto processDesc = std::make_shared<ProcessDesc>(
config);
424 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
428 bool const hasSubProcesses = !subProcessVParameterSet.empty();
436 unsigned int nThreads=1;
437 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
438 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
446 unsigned int nStreams =1;
447 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
448 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
454 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
460 unsigned int nConcurrentRuns =1;
466 unsigned int nConcurrentLumis =1;
492 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
493 for(
auto const& ps : excluded) {
495 ps.getUntrackedParameter<
std::string>(
"label",
""));
505 auto& serviceSets = processDesc->getServicesPSets();
514 handler->willBeUsingThreads();
518 std::shared_ptr<CommonParams>
common(items.
initMisc(*parameterSet));
560 FDEBUG(2) << parameterSet << std::endl;
572 for(
auto& subProcessPSet : subProcessVParameterSet) {
620 actReg_->preallocateSignal_(bounds);
647 ex.
addContext(
"Calling beginJob for the source");
653 actReg_->postBeginJobSignal_();
664 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
673 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
677 c.
call([actReg](){actReg->preEndJobSignal_();});
686 c.
call([actReg](){actReg->postEndJobSignal_();});
701 volatile bool child_failed =
false;
702 volatile unsigned int num_children_done = 0;
703 volatile int child_fail_exit_status = 0;
704 volatile int child_fail_signal = 0;
710 void ep_sigchld(
int, siginfo_t*,
void*) {
714 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
717 if(WIFEXITED(stat_loc)) {
719 if(0 != WEXITSTATUS(stat_loc)) {
720 child_fail_exit_status = WEXITSTATUS(stat_loc);
724 if(WIFSIGNALED(stat_loc)) {
726 child_fail_signal = WTERMSIG(stat_loc);
729 p = waitpid(-1, &stat_loc, WNOHANG);
744 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
746 while(numberOfChildren != 0) {
748 numberOfChildren /= 10;
758 class MessageSenderToSource {
760 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
764 const std::vector<int>& m_childrenPipes;
765 long const m_nEventsToProcess;
767 unsigned int m_aliveChildren;
771 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
772 std::vector<int>
const& childrenPipes,
773 long iNEventsToProcess):
774 m_childrenPipes(childrenPipes),
775 m_nEventsToProcess(iNEventsToProcess),
776 m_aliveChildren(childrenSockets.size()),
779 FD_ZERO(&m_socketSet);
780 for (
auto const socket : childrenSockets) {
781 FD_SET(socket, &m_socketSet);
782 if (socket > m_maxFd) {
786 for (
auto const pipe : childrenPipes) {
787 FD_SET(
pipe, &m_socketSet);
788 if (
pipe > m_maxFd) {
812 MessageSenderToSource::operator()() {
814 LogInfo(
"ForkingController") <<
"I am controller";
819 sndmsg.
nIndices = m_nEventsToProcess;
822 fd_set readSockets, errorSockets;
824 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
825 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
828 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
830 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
839 if (FD_ISSET(
idx, &errorSockets)) {
840 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
841 FD_CLR(
idx, &m_socketSet);
844 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
852 if (!FD_ISSET(
idx, &readSockets)) {
858 bool is_pipe =
false;
859 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
863 while (((rc = read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
866 FD_CLR(
idx, &m_socketSet);
874 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.
sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
876 FD_CLR(
idx, &m_socketSet);
887 FD_CLR(
idx, &m_socketSet);
896 }
while (m_aliveChildren > 0);
906 if (child_fail_signal) {
907 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
909 }
else if (child_fail_exit_status) {
910 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
911 child_fail_exit_status=0;
913 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
931 itemType =
input_->nextItemType();
937 itemType =
input_->nextItemType();
940 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " <<
input_->runAuxiliary()->run();
942 input_->runAuxiliary()->beginTime());
947 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
949 std::vector<eventsetup::DataKey> dataKeys;
950 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
958 excludedData = &(itExcludeRec->second);
959 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
967 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
968 itDataKey != itDataKeyEnd;
971 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
972 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
976 recordPtr->
doGet(*itDataKey);
984 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
992 actReg_->preForkReleaseResourcesSignal_();
993 input_->doPreForkReleaseResources();
999 unsigned int childIndex = 0;
1001 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1002 std::vector<pid_t> childrenIds;
1003 childrenIds.reserve(kMaxChildren);
1004 std::vector<int> childrenSockets;
1005 childrenSockets.reserve(kMaxChildren);
1006 std::vector<int> childrenPipes;
1007 childrenPipes.reserve(kMaxChildren);
1008 std::vector<int> childrenSocketsCopy;
1009 childrenSocketsCopy.reserve(kMaxChildren);
1010 std::vector<int> childrenPipesCopy;
1011 childrenPipesCopy.reserve(kMaxChildren);
1018 int sockets[2], fd_flags;
1019 for(; childIndex < kMaxChildren; ++childIndex) {
1021 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1022 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1026 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1030 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1031 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1036 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1037 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1040 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1041 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1044 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1045 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1050 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1051 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1054 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1055 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1059 childrenPipesCopy = childrenPipes;
1060 childrenSocketsCopy = childrenSockets;
1062 pid_t
value = fork();
1068 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1071 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1078 std::stringstream stout;
1079 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1080 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1081 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1083 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1084 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1087 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1096 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1098 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1101 CPU_SET(childIndex, &mask);
1102 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1103 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1115 LogError(
"ForkingChild") <<
"failed to create a child";
1118 childrenIds.push_back(value);
1119 childrenSockets.push_back(sockets[0]);
1120 childrenPipes.push_back(pipes[0]);
1123 if(childIndex < kMaxChildren) {
1124 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1125 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1127 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1128 input_->doPostForkReacquireResources(receiver);
1129 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1134 jobReport->parentAfterFork(jobReportFile);
1144 sigset_t blockingSigSet;
1145 sigset_t unblockingSigSet;
1147 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1148 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1149 sigaddset(&blockingSigSet, SIGCHLD);
1150 sigaddset(&blockingSigSet, SIGUSR2);
1151 sigaddset(&blockingSigSet, SIGINT);
1152 sigdelset(&unblockingSigSet, SIGCHLD);
1153 sigdelset(&unblockingSigSet, SIGUSR2);
1154 sigdelset(&unblockingSigSet, SIGINT);
1155 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1159 bool too_many_fds =
false;
1160 if (pipes[1]+1 > FD_SETSIZE) {
1161 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1162 too_many_fds =
true;
1169 boost::thread senderThread(sender);
1171 if(not too_many_fds) {
1176 sigsuspend(&unblockingSigSet);
1178 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1181 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1183 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1185 LogError(
"ForkingStopping") <<
"child failed";
1188 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1191 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1192 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1193 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1194 it != itEnd; ++it) {
1197 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1198 while(num_children_done != kMaxChildren) {
1199 sigsuspend(&unblockingSigSet);
1201 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1204 senderThread.join();
1206 if (child_fail_signal) {
1207 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1208 }
else if (child_fail_exit_status) {
1209 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1211 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1215 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1220 std::vector<ModuleDescription const*>
1222 return schedule_->getAllModuleDescriptions();
1261 std::unique_ptr<statemachine::Machine>
1270 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1281 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1284 auto machine = std::make_unique<statemachine::Machine>(
1289 machine->initiate();
1295 bool returnValue =
false;
1311 std::unique_ptr<statemachine::Machine> machine;
1335 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1336 more =
input_->skipForForking();
1337 sentry.completedSuccessfully();
1340 if(size < preg_->
size()) {
1347 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1349 sentry.completedSuccessfully();
1352 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1394 <<
"Unknown next item type passed to EventProcessor\n" 1395 <<
"Please report this error to the Framework group\n";
1397 if(machine->terminated()) {
1473 if(machine->terminated()) {
1474 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1480 <<
"The boost state machine in the EventProcessor exited after\n" 1481 <<
"entering the Error state.\n";
1485 if(machine.get() !=
nullptr) {
1488 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n" 1489 <<
"Please report this error to the Framework group\n";
1496 FDEBUG(1) <<
" \treadFile\n";
1498 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1501 if(size < preg_->
size()) {
1510 sentry.completedSuccessfully();
1514 if (
fb_.get() !=
nullptr) {
1515 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1516 input_->closeFile(
fb_.get(), cleaningUpAfterException);
1517 sentry.completedSuccessfully();
1519 FDEBUG(1) <<
"\tcloseInputFile\n";
1523 if (
fb_.get() !=
nullptr) {
1527 FDEBUG(1) <<
"\topenOutputFiles\n";
1531 if (
fb_.get() !=
nullptr) {
1535 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1540 if (
fb_.get() !=
nullptr) {
1544 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1548 if (
fb_.get() !=
nullptr) {
1552 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1562 FDEBUG(1) <<
"\tstartingNewLoop\n";
1568 looper_->setModuleChanger(&changer);
1570 looper_->setModuleChanger(
nullptr);
1574 FDEBUG(1) <<
"\tendOfLoop\n";
1581 FDEBUG(1) <<
"\trewind\n";
1586 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1590 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1593 if(subProcess.shouldWeCloseOutput()) {
1599 return schedule_->shouldWeCloseOutput();
1603 FDEBUG(1) <<
"\tdoErrorStuff\n";
1605 <<
"The EventProcessor state machine encountered an unexpected event\n" 1606 <<
"and went to the error state\n" 1607 <<
"Will attempt to terminate processing normally\n" 1608 <<
"(IF using the looper the next loop will be attempted)\n" 1609 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1616 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1619 sentry.completedSuccessfully();
1628 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1630 sentry.completedSuccessfully();
1642 globalWaitTask->increment_ref_count();
1649 globalWaitTask->wait_for_all();
1650 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1651 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1661 streamLoopWaitTask->increment_ref_count();
1665 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1673 streamLoopWaitTask->wait_for_all();
1674 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1675 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1687 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1692 sentry.completedSuccessfully();
1698 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1700 sentry.completedSuccessfully();
1706 streamLoopWaitTask->increment_ref_count();
1710 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1717 cleaningUpAfterException);
1719 streamLoopWaitTask->wait_for_all();
1720 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1721 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1731 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1732 for_all(
subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1743 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1746 sentry.completedSuccessfully();
1752 rng->preBeginLumi(lb);
1759 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1761 sentry.completedSuccessfully();
1767 globalWaitTask->increment_ref_count();
1774 globalWaitTask->wait_for_all();
1775 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1776 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1779 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1786 streamLoopWaitTask->increment_ref_count();
1790 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1797 streamLoopWaitTask->wait_for_all();
1798 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1799 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1803 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1812 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1817 sentry.completedSuccessfully();
1824 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1826 sentry.completedSuccessfully();
1832 streamLoopWaitTask->increment_ref_count();
1836 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1843 cleaningUpAfterException);
1844 streamLoopWaitTask->wait_for_all();
1845 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1846 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1849 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1856 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1857 for_all(
subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1859 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1868 <<
"EventProcessor::readRun\n" 1869 <<
"Illegal attempt to insert run into cache\n" 1870 <<
"Contact a Framework Developer\n";
1874 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1876 sentry.completedSuccessfully();
1878 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1887 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1888 input_->readAndMergeRun(*runPrincipal);
1889 sentry.completedSuccessfully();
1891 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1898 <<
"EventProcessor::readRun\n" 1899 <<
"Illegal attempt to insert lumi into cache\n" 1900 <<
"Contact a Framework Developer\n";
1904 <<
"EventProcessor::readRun\n" 1905 <<
"Illegal attempt to insert lumi into cache\n" 1906 <<
"Run is invalid\n" 1907 <<
"Contact a Framework Developer\n";
1911 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1913 sentry.completedSuccessfully();
1917 return input_->luminosityBlock();
1923 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1925 sentry.completedSuccessfully();
1927 return input_->luminosityBlock();
1944 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1945 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1950 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1951 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1955 std::atomic<bool>* finishedProcessingEvents) {
1964 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1971 handler->initializeThisThreadForUse();
1978 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1986 finishedProcessingEvents->store(
true,std::memory_order_release);
1996 firstEventInBlock_ =
false;
2000 bool expected =
false;
2011 unsigned int iStreamIndex,
2012 std::atomic<bool>* finishedProcessingEvents)
2014 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr
const* iPtr) {
2016 bool expected =
false;
2025 iTask->decrement_ref_count();
2041 iTask->decrement_ref_count();
2057 eventLoopWaitTask->increment_ref_count();
2059 eventLoopWaitTask->wait_for_all();
2065 std::atomic<bool> finishedProcessingEvents{
false};
2066 auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2074 auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2075 eventLoopWaitTask->increment_ref_count();
2078 unsigned int iStreamIndex = 0;
2079 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2080 eventLoopWaitTask->increment_ref_count();
2081 tbb::task::enqueue( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
2085 eventLoopWaitTask->increment_ref_count();
2086 eventLoopWaitTask->spawn_and_wait_for_all( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
2100 SendSourceTerminationSignalIfException sentry(
actReg_.get());
2102 sentry.completedSuccessfully();
2104 FDEBUG(1) <<
"\treadEvent\n";
2108 unsigned int iStreamIndex) {
2114 rng->postEventRead(ev);
2116 assert(pep->luminosityBlockPrincipalPtrValid());
2121 tbb::task::allocate_root(),
2122 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 2131 FDEBUG(1) <<
"\tprocessEvent\n";
2132 pep->clearEventPrincipal();
2143 afterProcessTask =
std::move(finalizeEventTask);
2149 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 2158 subProcess.doEventAsync(finalizeEventTask,*pep);
2161 finalizeEventTask.doneWaiting(*iPtr);
2168 iStreamIndex,*pep,
esp_->eventSetup());
2173 bool randomAccess =
input_->randomAccess();
2182 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
2199 FDEBUG(1) <<
"\tshouldWeStop\n";
2203 if(subProcess.terminate()) {
2229 if(iMachine.get() !=
nullptr) {
2230 if(!iMachine->terminated()) {
2236 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2238 if(iMachine->terminated()) {
2239 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
virtual void openOutputFiles() override
virtual void setExceptionMessageLumis(std::string &message) override
SharedResourcesAcquirer sourceResourcesAcquirer_
virtual void doErrorStuff() override
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
virtual statemachine::Run readAndMergeRun() override
edm::propagate_const< std::unique_ptr< InputSource > > input_
virtual bool endOfLoop() override
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
static PFTauRenderPlugin instance
bool stateMachineWasInErrorState_
bool alreadyHandlingException_
ParameterSetID id() const
virtual bool shouldWeCloseOutput() const override
void possiblyContinueAfterForkChildFailure()
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual int readLuminosityBlock() override
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
int numberOfForkedChildren_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
std::unique_ptr< statemachine::Machine > createStateMachine()
unsigned int numberOfThreads() const
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException) override
void setAtEndTransition(bool iAtEnd)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
bool hasRunPrincipal() const
void installCustomHandler(int signum, CFUNC func)
virtual void setExceptionMessageRuns(std::string &message) override
std::set< std::pair< std::string, std::string > > ExcludedData
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static size_t sizeForBuffer()
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
virtual void startingNewLoop() override
virtual void rewindInput() override
bool alreadyPrinted() const
bool forkProcess(std::string const &jobReportFile)
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
static size_t sizeForBuffer()
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void setEndTime(Timestamp const &time)
virtual bool shouldWeStop() const override
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
bool continueAfterChildFailure_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual statemachine::Run readRun() override
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
virtual void prepareForNextLoop() override
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual bool alreadyHandlingException() const override
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
def pipe(cmdline, input=None)
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
virtual StatusCode runToCompletion() override
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
virtual void readFile() override
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
edm::RunNumber_t runNumber() const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
virtual void closeOutputFiles() override
virtual void writeRun(statemachine::Run const &run) override
virtual void closeInputFile(bool cleaningUpAfterException) override
virtual void respondToOpenInputFile() override
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
edm::ProcessHistoryID const & processHistoryID() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual int readAndMergeLumi() override
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
virtual void respondToCloseInputFile() override
virtual void readAndProcessEvent() override
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
virtual void deleteRunFromCache(statemachine::Run const &run) override
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
virtual void setExceptionMessageFiles(std::string &message) override
T get(const Candidate &c)
virtual void beginRun(statemachine::Run const &run) override
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_