70 #include "boost/thread/xtime.hpp" 71 #include "boost/range/adaptor/reversed.hpp" 85 #include <sys/types.h> 87 #include <sys/socket.h> 88 #include <sys/select.h> 89 #include <sys/fcntl.h> 103 class SendSourceTerminationSignalIfException {
107 ~SendSourceTerminationSignalIfException() {
112 void completedSuccessfully() {
124 std::unique_ptr<InputSource>
127 std::shared_ptr<ProductRegistry> preg,
128 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
129 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
130 std::shared_ptr<ActivityRegistry> areg,
131 std::shared_ptr<ProcessConfiguration const> processConfiguration,
134 if(main_input == 0) {
136 <<
"There must be exactly one source in the configuration.\n" 137 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
142 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
145 filler->fill(descriptions);
149 descriptions.validate(*main_input,
std::string(
"source"));
153 std::ostringstream ost;
154 ost <<
"Validating configuration of input source of type " << modtype;
170 processConfiguration.get(),
171 ModuleDescription::getUniqueID());
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
183 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
189 std::ostringstream ost;
190 ost <<
"Constructing input source of type " << modtype;
198 std::shared_ptr<EDLooperBase>
202 std::shared_ptr<EDLooperBase> vLooper;
204 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
206 if(loopers.size() == 0) {
210 assert(1 == loopers.size());
212 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
229 std::vector<std::string>
const& defaultServices,
230 std::vector<std::string>
const& forcedServices) :
233 branchIDListHelper_(),
236 espController_(new eventsetup::EventSetupsController),
239 processConfiguration_(),
245 deferredExceptionPtrIsSet_(
false),
249 beginJobCalled_(
false),
250 shouldWeStop_(
false),
251 stateMachineWasInErrorState_(
false),
254 exceptionMessageFiles_(),
255 exceptionMessageRuns_(),
256 exceptionMessageLumis_(),
257 alreadyHandlingException_(
false),
258 forceLooperToEnd_(
false),
259 looperBeginJobRun_(
false),
260 forceESCacheClearOnNewRun_(
false),
261 numberOfForkedChildren_(0),
262 numberOfSequentialEventsPerChild_(1),
263 setCpuAffinity_(
false),
264 eventSetupDataToExcludeFromPrefetching_() {
266 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
267 processDesc->addServices(defaultServices, forcedServices);
268 init(processDesc, iToken, iLegacy);
272 std::vector<std::string>
const& defaultServices,
273 std::vector<std::string>
const& forcedServices) :
312 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
313 processDesc->addServices(defaultServices, forcedServices);
357 init(processDesc, token, legacy);
401 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
405 auto processDesc = std::make_shared<ProcessDesc>(
config);
423 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
427 bool const hasSubProcesses = !subProcessVParameterSet.empty();
435 unsigned int nThreads=1;
436 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
437 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
445 unsigned int nStreams =1;
446 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
447 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
452 edm::LogInfo(
"StreamSetup") <<
"setting # streams "<<nStreams;
457 unsigned int nConcurrentRuns =1;
463 unsigned int nConcurrentLumis =1;
489 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
490 for(
auto const& ps : excluded) {
492 ps.getUntrackedParameter<
std::string>(
"label",
""));
502 auto& serviceSets = processDesc->getServicesPSets();
511 handler->willBeUsingThreads();
515 std::shared_ptr<CommonParams>
common(items.
initMisc(*parameterSet));
557 FDEBUG(2) << parameterSet << std::endl;
569 for(
auto& subProcessPSet : subProcessVParameterSet) {
617 actReg_->preallocateSignal_(bounds);
644 ex.
addContext(
"Calling beginJob for the source");
650 actReg_->postBeginJobSignal_();
661 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
670 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
674 c.
call([actReg](){actReg->preEndJobSignal_();});
683 c.
call([actReg](){actReg->postEndJobSignal_();});
698 volatile bool child_failed =
false;
699 volatile unsigned int num_children_done = 0;
700 volatile int child_fail_exit_status = 0;
701 volatile int child_fail_signal = 0;
707 void ep_sigchld(
int, siginfo_t*,
void*) {
711 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
714 if(WIFEXITED(stat_loc)) {
716 if(0 != WEXITSTATUS(stat_loc)) {
717 child_fail_exit_status = WEXITSTATUS(stat_loc);
721 if(WIFSIGNALED(stat_loc)) {
723 child_fail_signal = WTERMSIG(stat_loc);
726 p = waitpid(-1, &stat_loc, WNOHANG);
741 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
743 while(numberOfChildren != 0) {
745 numberOfChildren /= 10;
755 class MessageSenderToSource {
757 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
761 const std::vector<int>& m_childrenPipes;
762 long const m_nEventsToProcess;
764 unsigned int m_aliveChildren;
768 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
769 std::vector<int>
const& childrenPipes,
770 long iNEventsToProcess):
771 m_childrenPipes(childrenPipes),
772 m_nEventsToProcess(iNEventsToProcess),
773 m_aliveChildren(childrenSockets.size()),
776 FD_ZERO(&m_socketSet);
777 for (
auto const socket : childrenSockets) {
778 FD_SET(socket, &m_socketSet);
779 if (socket > m_maxFd) {
783 for (
auto const pipe : childrenPipes) {
784 FD_SET(
pipe, &m_socketSet);
785 if (
pipe > m_maxFd) {
809 MessageSenderToSource::operator()() {
811 LogInfo(
"ForkingController") <<
"I am controller";
816 sndmsg.
nIndices = m_nEventsToProcess;
819 fd_set readSockets, errorSockets;
821 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
822 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
825 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
827 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
836 if (FD_ISSET(
idx, &errorSockets)) {
837 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
838 FD_CLR(
idx, &m_socketSet);
841 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
849 if (!FD_ISSET(
idx, &readSockets)) {
855 bool is_pipe =
false;
856 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
860 while (((rc = read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
863 FD_CLR(
idx, &m_socketSet);
871 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.
sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
873 FD_CLR(
idx, &m_socketSet);
884 FD_CLR(
idx, &m_socketSet);
893 }
while (m_aliveChildren > 0);
903 if (child_fail_signal) {
904 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
906 }
else if (child_fail_exit_status) {
907 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
908 child_fail_exit_status=0;
910 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
928 itemType =
input_->nextItemType();
934 itemType =
input_->nextItemType();
937 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " <<
input_->runAuxiliary()->run();
939 input_->runAuxiliary()->beginTime());
944 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
946 std::vector<eventsetup::DataKey> dataKeys;
947 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
955 excludedData = &(itExcludeRec->second);
956 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
964 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
965 itDataKey != itDataKeyEnd;
968 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
969 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
973 recordPtr->
doGet(*itDataKey);
981 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
989 actReg_->preForkReleaseResourcesSignal_();
990 input_->doPreForkReleaseResources();
996 unsigned int childIndex = 0;
998 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
999 std::vector<pid_t> childrenIds;
1000 childrenIds.reserve(kMaxChildren);
1001 std::vector<int> childrenSockets;
1002 childrenSockets.reserve(kMaxChildren);
1003 std::vector<int> childrenPipes;
1004 childrenPipes.reserve(kMaxChildren);
1005 std::vector<int> childrenSocketsCopy;
1006 childrenSocketsCopy.reserve(kMaxChildren);
1007 std::vector<int> childrenPipesCopy;
1008 childrenPipesCopy.reserve(kMaxChildren);
1015 int sockets[2], fd_flags;
1016 for(; childIndex < kMaxChildren; ++childIndex) {
1018 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1019 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1023 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1027 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1028 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1033 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1034 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1037 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1038 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1041 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1042 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1047 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1048 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1051 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1052 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1056 childrenPipesCopy = childrenPipes;
1057 childrenSocketsCopy = childrenSockets;
1059 pid_t
value = fork();
1065 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1068 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1075 std::stringstream stout;
1076 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1077 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1078 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1080 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1081 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1084 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1093 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1095 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1098 CPU_SET(childIndex, &mask);
1099 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1100 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1112 LogError(
"ForkingChild") <<
"failed to create a child";
1115 childrenIds.push_back(value);
1116 childrenSockets.push_back(sockets[0]);
1117 childrenPipes.push_back(pipes[0]);
1120 if(childIndex < kMaxChildren) {
1121 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1122 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1124 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1125 input_->doPostForkReacquireResources(receiver);
1126 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1131 jobReport->parentAfterFork(jobReportFile);
1141 sigset_t blockingSigSet;
1142 sigset_t unblockingSigSet;
1144 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1145 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1146 sigaddset(&blockingSigSet, SIGCHLD);
1147 sigaddset(&blockingSigSet, SIGUSR2);
1148 sigaddset(&blockingSigSet, SIGINT);
1149 sigdelset(&unblockingSigSet, SIGCHLD);
1150 sigdelset(&unblockingSigSet, SIGUSR2);
1151 sigdelset(&unblockingSigSet, SIGINT);
1152 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1156 bool too_many_fds =
false;
1157 if (pipes[1]+1 > FD_SETSIZE) {
1158 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1159 too_many_fds =
true;
1166 boost::thread senderThread(sender);
1168 if(not too_many_fds) {
1173 sigsuspend(&unblockingSigSet);
1175 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1178 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1180 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1182 LogError(
"ForkingStopping") <<
"child failed";
1185 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1188 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1189 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1190 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1191 it != itEnd; ++it) {
1194 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1195 while(num_children_done != kMaxChildren) {
1196 sigsuspend(&unblockingSigSet);
1198 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1201 senderThread.join();
1203 if (child_fail_signal) {
1204 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1205 }
else if (child_fail_exit_status) {
1206 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1208 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1212 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1217 std::vector<ModuleDescription const*>
1219 return schedule_->getAllModuleDescriptions();
1258 std::unique_ptr<statemachine::Machine>
1267 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1278 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1281 auto machine = std::make_unique<statemachine::Machine>(
1286 machine->initiate();
1292 bool returnValue =
false;
1308 std::unique_ptr<statemachine::Machine> machine;
1332 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1333 more =
input_->skipForForking();
1334 sentry.completedSuccessfully();
1337 if(size < preg_->
size()) {
1344 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1346 sentry.completedSuccessfully();
1349 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1391 <<
"Unknown next item type passed to EventProcessor\n" 1392 <<
"Please report this error to the Framework group\n";
1394 if(machine->terminated()) {
1470 if(machine->terminated()) {
1471 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1477 <<
"The boost state machine in the EventProcessor exited after\n" 1478 <<
"entering the Error state.\n";
1482 if(machine.get() !=
nullptr) {
1485 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n" 1486 <<
"Please report this error to the Framework group\n";
1493 FDEBUG(1) <<
" \treadFile\n";
1495 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1498 if(size < preg_->
size()) {
1507 sentry.completedSuccessfully();
1511 if (
fb_.get() !=
nullptr) {
1512 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1513 input_->closeFile(
fb_.get(), cleaningUpAfterException);
1514 sentry.completedSuccessfully();
1516 FDEBUG(1) <<
"\tcloseInputFile\n";
1520 if (
fb_.get() !=
nullptr) {
1524 FDEBUG(1) <<
"\topenOutputFiles\n";
1528 if (
fb_.get() !=
nullptr) {
1532 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1537 if (
fb_.get() !=
nullptr) {
1541 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1545 if (
fb_.get() !=
nullptr) {
1549 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1559 FDEBUG(1) <<
"\tstartingNewLoop\n";
1565 looper_->setModuleChanger(&changer);
1567 looper_->setModuleChanger(
nullptr);
1571 FDEBUG(1) <<
"\tendOfLoop\n";
1578 FDEBUG(1) <<
"\trewind\n";
1583 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1587 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1590 if(subProcess.shouldWeCloseOutput()) {
1596 return schedule_->shouldWeCloseOutput();
1600 FDEBUG(1) <<
"\tdoErrorStuff\n";
1602 <<
"The EventProcessor state machine encountered an unexpected event\n" 1603 <<
"and went to the error state\n" 1604 <<
"Will attempt to terminate processing normally\n" 1605 <<
"(IF using the looper the next loop will be attempted)\n" 1606 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1613 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1616 sentry.completedSuccessfully();
1625 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1627 sentry.completedSuccessfully();
1638 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1639 for_all(
subProcesses_, [&runPrincipal, &ts](
auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
1648 streamLoopWaitTask->increment_ref_count();
1652 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1660 streamLoopWaitTask->wait_for_all();
1661 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1662 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1674 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1679 sentry.completedSuccessfully();
1685 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1687 sentry.completedSuccessfully();
1693 streamLoopWaitTask->increment_ref_count();
1697 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1704 cleaningUpAfterException);
1706 streamLoopWaitTask->wait_for_all();
1707 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1708 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1717 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1718 for_all(
subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1729 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1732 sentry.completedSuccessfully();
1738 rng->preBeginLumi(lb);
1745 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1747 sentry.completedSuccessfully();
1752 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1753 for_all(
subProcesses_, [&lumiPrincipal, &ts](
auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
1755 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1762 streamLoopWaitTask->increment_ref_count();
1766 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1773 streamLoopWaitTask->wait_for_all();
1774 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1775 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1779 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1788 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1793 sentry.completedSuccessfully();
1800 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1802 sentry.completedSuccessfully();
1808 streamLoopWaitTask->increment_ref_count();
1812 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1819 cleaningUpAfterException);
1820 streamLoopWaitTask->wait_for_all();
1821 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1822 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1825 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1831 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1832 for_all(
subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1834 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1843 <<
"EventProcessor::readRun\n" 1844 <<
"Illegal attempt to insert run into cache\n" 1845 <<
"Contact a Framework Developer\n";
1849 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1851 sentry.completedSuccessfully();
1853 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1862 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1863 input_->readAndMergeRun(*runPrincipal);
1864 sentry.completedSuccessfully();
1866 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1873 <<
"EventProcessor::readRun\n" 1874 <<
"Illegal attempt to insert lumi into cache\n" 1875 <<
"Contact a Framework Developer\n";
1879 <<
"EventProcessor::readRun\n" 1880 <<
"Illegal attempt to insert lumi into cache\n" 1881 <<
"Run is invalid\n" 1882 <<
"Contact a Framework Developer\n";
1886 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1888 sentry.completedSuccessfully();
1892 return input_->luminosityBlock();
1898 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1900 sentry.completedSuccessfully();
1902 return input_->luminosityBlock();
1919 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1920 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1925 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1926 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1930 std::atomic<bool>* finishedProcessingEvents) {
1939 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1946 handler->initializeThisThreadForUse();
1953 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1961 finishedProcessingEvents->store(
true,std::memory_order_release);
1971 firstEventInBlock_ =
false;
1975 bool expected =
false;
1986 unsigned int iStreamIndex,
1987 std::atomic<bool>* finishedProcessingEvents)
1989 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr
const* iPtr) {
1991 bool expected =
false;
2000 iTask->decrement_ref_count();
2016 iTask->decrement_ref_count();
2032 eventLoopWaitTask->increment_ref_count();
2034 eventLoopWaitTask->wait_for_all();
2040 std::atomic<bool> finishedProcessingEvents{
false};
2041 auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2049 auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2050 eventLoopWaitTask->increment_ref_count();
2053 unsigned int iStreamIndex = 0;
2054 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2055 eventLoopWaitTask->increment_ref_count();
2056 tbb::task::enqueue( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
2060 eventLoopWaitTask->increment_ref_count();
2061 eventLoopWaitTask->spawn_and_wait_for_all( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
2075 SendSourceTerminationSignalIfException sentry(
actReg_.get());
2077 sentry.completedSuccessfully();
2079 FDEBUG(1) <<
"\treadEvent\n";
2083 unsigned int iStreamIndex) {
2089 rng->postEventRead(ev);
2091 assert(pep->luminosityBlockPrincipalPtrValid());
2096 tbb::task::allocate_root(),
2097 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 2106 FDEBUG(1) <<
"\tprocessEvent\n";
2107 pep->clearEventPrincipal();
2118 afterProcessTask =
std::move(finalizeEventTask);
2124 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 2133 subProcess.doEventAsync(finalizeEventTask,*pep);
2136 finalizeEventTask.doneWaiting(*iPtr);
2143 iStreamIndex,*pep,
esp_->eventSetup());
2148 bool randomAccess =
input_->randomAccess();
2157 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
2174 FDEBUG(1) <<
"\tshouldWeStop\n";
2178 if(subProcess.terminate()) {
2204 if(iMachine.get() !=
nullptr) {
2205 if(!iMachine->terminated()) {
2211 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2213 if(iMachine->terminated()) {
2214 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
virtual void openOutputFiles() override
virtual void setExceptionMessageLumis(std::string &message) override
SharedResourcesAcquirer sourceResourcesAcquirer_
virtual void doErrorStuff() override
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
virtual statemachine::Run readAndMergeRun() override
edm::propagate_const< std::unique_ptr< InputSource > > input_
virtual bool endOfLoop() override
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
static PFTauRenderPlugin instance
bool stateMachineWasInErrorState_
bool alreadyHandlingException_
ParameterSetID id() const
virtual bool shouldWeCloseOutput() const override
void possiblyContinueAfterForkChildFailure()
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual int readLuminosityBlock() override
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
int numberOfForkedChildren_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
std::unique_ptr< statemachine::Machine > createStateMachine()
unsigned int numberOfThreads() const
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException) override
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
bool hasRunPrincipal() const
void installCustomHandler(int signum, CFUNC func)
virtual void setExceptionMessageRuns(std::string &message) override
std::set< std::pair< std::string, std::string > > ExcludedData
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static size_t sizeForBuffer()
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
virtual void startingNewLoop() override
virtual void rewindInput() override
bool alreadyPrinted() const
bool forkProcess(std::string const &jobReportFile)
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
static size_t sizeForBuffer()
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void setEndTime(Timestamp const &time)
virtual bool shouldWeStop() const override
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
bool continueAfterChildFailure_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual statemachine::Run readRun() override
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
virtual void prepareForNextLoop() override
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual bool alreadyHandlingException() const override
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
def pipe(cmdline, input=None)
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
virtual StatusCode runToCompletion() override
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
virtual void readFile() override
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
edm::RunNumber_t runNumber() const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
virtual void closeOutputFiles() override
virtual void writeRun(statemachine::Run const &run) override
virtual void closeInputFile(bool cleaningUpAfterException) override
virtual void respondToOpenInputFile() override
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
edm::ProcessHistoryID const & processHistoryID() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual int readAndMergeLumi() override
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
virtual void respondToCloseInputFile() override
virtual void readAndProcessEvent() override
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
virtual void deleteRunFromCache(statemachine::Run const &run) override
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
virtual void setExceptionMessageFiles(std::string &message) override
T get(const Candidate &c)
virtual void beginRun(statemachine::Run const &run) override
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_