62 #include "boost/bind.hpp"
63 #include "boost/thread/xtime.hpp"
75 #include <sys/types.h>
77 #include <sys/socket.h>
78 #include <sys/select.h>
79 #include <sys/fcntl.h>
88 #include "Cintex/Cintex.h"
93 std::unique_ptr<InputSource>
97 boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
98 boost::shared_ptr<ActivityRegistry>
areg,
102 if(main_input == 0) {
104 <<
"There must be exactly one source in the configuration.\n"
105 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
110 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
113 filler->fill(descriptions);
127 std::ostringstream ost;
128 ost <<
"Validating configuration of input source of type " << modtype;
144 processConfiguration.get(),
148 areg->preSourceConstructionSignal_(md);
149 std::unique_ptr<InputSource>
input;
152 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
164 std::ostringstream ost;
165 ost <<
"Constructing input source of type " << modtype;
173 boost::shared_ptr<EDLooperBase>
177 boost::shared_ptr<EDLooperBase> vLooper;
179 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
181 if(loopers.size() == 0) {
185 assert(1 == loopers.size());
187 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
205 std::vector<std::string>
const& defaultServices,
206 std::vector<std::string>
const& forcedServices) :
209 branchIDListHelper_(),
212 espController_(new eventsetup::EventSetupsController),
215 processConfiguration_(),
222 beginJobCalled_(
false),
223 shouldWeStop_(
false),
224 stateMachineWasInErrorState_(
false),
227 exceptionMessageFiles_(),
228 exceptionMessageRuns_(),
229 exceptionMessageLumis_(),
230 alreadyHandlingException_(
false),
231 forceLooperToEnd_(
false),
232 looperBeginJobRun_(
false),
233 forceESCacheClearOnNewRun_(
false),
234 numberOfForkedChildren_(0),
235 numberOfSequentialEventsPerChild_(1),
236 setCpuAffinity_(
false),
237 eventSetupDataToExcludeFromPrefetching_() {
239 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
240 processDesc->addServices(defaultServices, forcedServices);
241 init(processDesc, iToken, iLegacy);
245 std::vector<std::string>
const& defaultServices,
246 std::vector<std::string>
const& forcedServices) :
249 branchIDListHelper_(),
252 espController_(new eventsetup::EventSetupsController),
255 processConfiguration_(),
262 beginJobCalled_(
false),
263 shouldWeStop_(
false),
264 stateMachineWasInErrorState_(
false),
267 exceptionMessageFiles_(),
268 exceptionMessageRuns_(),
269 exceptionMessageLumis_(),
270 alreadyHandlingException_(
false),
271 forceLooperToEnd_(
false),
272 looperBeginJobRun_(
false),
273 forceESCacheClearOnNewRun_(
false),
274 numberOfForkedChildren_(0),
275 numberOfSequentialEventsPerChild_(1),
276 setCpuAffinity_(
false),
277 asyncStopRequestedWhileProcessingEvents_(
false),
278 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
279 eventSetupDataToExcludeFromPrefetching_()
282 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
283 processDesc->addServices(defaultServices, forcedServices);
292 branchIDListHelper_(),
295 espController_(new eventsetup::EventSetupsController),
298 processConfiguration_(),
305 beginJobCalled_(
false),
306 shouldWeStop_(
false),
307 stateMachineWasInErrorState_(
false),
310 exceptionMessageFiles_(),
311 exceptionMessageRuns_(),
312 exceptionMessageLumis_(),
313 alreadyHandlingException_(
false),
314 forceLooperToEnd_(
false),
315 looperBeginJobRun_(
false),
316 forceESCacheClearOnNewRun_(
false),
317 numberOfForkedChildren_(0),
318 numberOfSequentialEventsPerChild_(1),
319 setCpuAffinity_(
false),
320 asyncStopRequestedWhileProcessingEvents_(
false),
321 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
322 eventSetupDataToExcludeFromPrefetching_()
324 init(processDesc, token, legacy);
331 branchIDListHelper_(),
334 espController_(new eventsetup::EventSetupsController),
337 processConfiguration_(),
344 beginJobCalled_(
false),
345 shouldWeStop_(
false),
346 stateMachineWasInErrorState_(
false),
349 exceptionMessageFiles_(),
350 exceptionMessageRuns_(),
351 exceptionMessageLumis_(),
352 alreadyHandlingException_(
false),
353 forceLooperToEnd_(
false),
354 looperBeginJobRun_(
false),
355 forceESCacheClearOnNewRun_(
false),
356 numberOfForkedChildren_(0),
357 numberOfSequentialEventsPerChild_(1),
358 setCpuAffinity_(
false),
359 asyncStopRequestedWhileProcessingEvents_(
false),
360 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
361 eventSetupDataToExcludeFromPrefetching_()
365 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
369 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(config));
381 ROOT::Cintex::Cintex::Enable();
389 boost::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
401 unsigned int nThreads=1;
402 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
403 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
411 unsigned int nStreams =1;
412 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
413 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
421 unsigned int nConcurrentRuns =1;
427 unsigned int nConcurrentLumis =1;
453 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
454 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
458 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
459 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
467 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
475 boost::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
509 FDEBUG(2) << parameterSet << std::endl;
519 ep->preModuleDelayedGetSignal_.connect(std::cref(
actReg_->preModuleEventDelayedGetSignal_));
520 ep->postModuleDelayedGetSignal_.connect(std::cref(
actReg_->postModuleEventDelayedGetSignal_));
524 if(subProcessParameterSet) {
570 actReg_->preallocateSignal_(bounds);
597 ex.
addContext(
"Calling beginJob for the source");
603 actReg_->postBeginJobSignal_();
614 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
635 c.
call([actReg](){actReg->postEndJobSignal_();});
650 volatile bool child_failed =
false;
651 volatile unsigned int num_children_done = 0;
652 volatile int child_fail_exit_status = 0;
653 volatile int child_fail_signal = 0;
659 void ep_sigchld(
int, siginfo_t*,
void*) {
663 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
666 if(WIFEXITED(stat_loc)) {
668 if(0 != WEXITSTATUS(stat_loc)) {
669 child_fail_exit_status = WEXITSTATUS(stat_loc);
673 if(WIFSIGNALED(stat_loc)) {
675 child_fail_signal = WTERMSIG(stat_loc);
678 p = waitpid(-1, &stat_loc, WNOHANG);
693 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
695 while(numberOfChildren != 0) {
697 numberOfChildren /= 10;
707 class MessageSenderToSource {
709 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
713 const std::vector<int>& m_childrenPipes;
714 long const m_nEventsToProcess;
716 unsigned int m_aliveChildren;
720 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
721 std::vector<int>
const& childrenPipes,
722 long iNEventsToProcess):
723 m_childrenPipes(childrenPipes),
724 m_nEventsToProcess(iNEventsToProcess),
725 m_aliveChildren(childrenSockets.
size()),
728 FD_ZERO(&m_socketSet);
729 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
731 FD_SET(*it, &m_socketSet);
736 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
738 FD_SET(*it, &m_socketSet);
763 MessageSenderToSource::operator()() {
764 multicore::MessageForParent childMsg;
765 LogInfo(
"ForkingController") <<
"I am controller";
768 multicore::MessageForSource sndmsg;
769 sndmsg.startIndex = 0;
770 sndmsg.nIndices = m_nEventsToProcess;
773 fd_set readSockets, errorSockets;
775 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
776 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
779 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
781 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
790 if (FD_ISSET(
idx, &errorSockets)) {
791 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
792 FD_CLR(
idx, &m_socketSet);
795 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
803 if (!FD_ISSET(
idx, &readSockets)) {
809 bool is_pipe =
false;
810 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
814 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
817 FD_CLR(
idx, &m_socketSet);
825 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
827 FD_CLR(
idx, &m_socketSet);
836 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
838 FD_CLR(
idx, &m_socketSet);
843 sndmsg.startIndex += sndmsg.nIndices;
847 }
while (m_aliveChildren > 0);
855 void EventProcessor::possiblyContinueAfterForkChildFailure() {
856 if(child_failed && continueAfterChildFailure_) {
857 if (child_fail_signal) {
858 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
860 }
else if (child_fail_exit_status) {
861 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
862 child_fail_exit_status=0;
864 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
873 if(0 == numberOfForkedChildren_) {
return true;}
874 assert(0<numberOfForkedChildren_);
882 itemType = input_->nextItemType();
884 assert(itemType == InputSource::IsFile);
888 itemType = input_->nextItemType();
889 assert(itemType == InputSource::IsRun);
891 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
893 input_->runAuxiliary()->beginTime());
894 espController_->eventSetupForInstance(ts);
898 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
900 std::vector<eventsetup::DataKey> dataKeys;
901 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
906 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
908 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
909 excludedData = &(itExcludeRec->second);
910 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
918 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
919 itDataKey != itDataKeyEnd;
922 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
923 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
927 recordPtr->
doGet(*itDataKey);
935 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
940 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
943 actReg_->preForkReleaseResourcesSignal_();
944 input_->doPreForkReleaseResources();
945 schedule_->preForkReleaseResources();
950 unsigned int childIndex = 0;
951 unsigned int const kMaxChildren = numberOfForkedChildren_;
952 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
953 std::vector<pid_t> childrenIds;
954 childrenIds.reserve(kMaxChildren);
955 std::vector<int> childrenSockets;
956 childrenSockets.reserve(kMaxChildren);
957 std::vector<int> childrenPipes;
958 childrenPipes.reserve(kMaxChildren);
959 std::vector<int> childrenSocketsCopy;
960 childrenSocketsCopy.reserve(kMaxChildren);
961 std::vector<int> childrenPipesCopy;
962 childrenPipesCopy.reserve(kMaxChildren);
969 int sockets[2], fd_flags;
970 for(; childIndex < kMaxChildren; ++childIndex) {
972 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
973 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
977 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
981 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
982 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
987 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
988 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
991 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
992 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
995 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
996 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1001 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1002 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1005 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1006 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1010 childrenPipesCopy = childrenPipes;
1011 childrenSocketsCopy = childrenSockets;
1013 pid_t
value = fork();
1019 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1022 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1029 std::stringstream stout;
1030 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1031 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1032 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1034 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1035 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1038 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1039 if(setCpuAffinity_) {
1047 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1049 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1052 CPU_SET(childIndex, &mask);
1053 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1054 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1066 LogError(
"ForkingChild") <<
"failed to create a child";
1069 childrenIds.push_back(value);
1070 childrenSockets.push_back(sockets[0]);
1071 childrenPipes.push_back(pipes[0]);
1074 if(childIndex < kMaxChildren) {
1075 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1076 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1079 input_->doPostForkReacquireResources(receiver);
1080 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1085 jobReport->parentAfterFork(jobReportFile);
1095 sigset_t blockingSigSet;
1096 sigset_t unblockingSigSet;
1098 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1099 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1100 sigaddset(&blockingSigSet, SIGCHLD);
1101 sigaddset(&blockingSigSet, SIGUSR2);
1102 sigaddset(&blockingSigSet, SIGINT);
1103 sigdelset(&unblockingSigSet, SIGCHLD);
1104 sigdelset(&unblockingSigSet, SIGUSR2);
1105 sigdelset(&unblockingSigSet, SIGINT);
1106 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1110 bool too_many_fds =
false;
1111 if (pipes[1]+1 > FD_SETSIZE) {
1112 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1113 too_many_fds =
true;
1119 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1120 boost::thread senderThread(sender);
1122 if(not too_many_fds) {
1125 possiblyContinueAfterForkChildFailure();
1126 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1127 sigsuspend(&unblockingSigSet);
1128 possiblyContinueAfterForkChildFailure();
1129 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1132 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1134 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1136 LogError(
"ForkingStopping") <<
"child failed";
1139 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1142 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1143 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1144 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1145 it != itEnd; ++it) {
1148 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1149 while(num_children_done != kMaxChildren) {
1150 sigsuspend(&unblockingSigSet);
1152 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1155 senderThread.join();
1156 if(child_failed && !continueAfterChildFailure_) {
1157 if (child_fail_signal) {
1158 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1159 }
else if (child_fail_exit_status) {
1160 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1162 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1166 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1171 std::vector<ModuleDescription const*>
1172 EventProcessor::getAllModuleDescriptions()
const {
1173 return schedule_->getAllModuleDescriptions();
1177 EventProcessor::totalEvents()
const {
1178 return schedule_->totalEvents();
1182 EventProcessor::totalEventsPassed()
const {
1183 return schedule_->totalEventsPassed();
1187 EventProcessor::totalEventsFailed()
const {
1188 return schedule_->totalEventsFailed();
1192 EventProcessor::enableEndPaths(
bool active) {
1193 schedule_->enableEndPaths(active);
1197 EventProcessor::endPathsEnabled()
const {
1198 return schedule_->endPathsEnabled();
1203 schedule_->getTriggerReport(rep);
1207 EventProcessor::clearCounters() {
1208 schedule_->clearCounters();
1212 std::auto_ptr<statemachine::Machine>
1213 EventProcessor::createStateMachine() {
1220 << fileMode_ <<
".\n"
1221 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1231 << emptyRunLumiMode_ <<
".\n"
1232 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1239 machine->initiate();
1245 bool returnValue =
false;
1250 returnCode = epSignal;
1257 EventProcessor::runToCompletion() {
1260 asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1261 std::auto_ptr<statemachine::Machine> machine;
1266 stateMachineWasInErrorState_ =
false;
1271 machine = createStateMachine();
1272 nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1273 asyncStopRequestedWhileProcessingEvents_=
false;
1282 if(numberOfForkedChildren_ > 0) {
1283 size_t size = preg_->size();
1284 more = input_->skipForForking();
1286 if(size < preg_->
size()) {
1287 principalCache_.adjustIndexesAfterProductRegistryAddition();
1289 principalCache_.adjustEventsToNewProductRegistry(preg_);
1292 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1294 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1296 if(checkForAsyncStopRequest(returnCode)) {
1297 forceLooperToEnd_ =
true;
1299 forceLooperToEnd_ =
false;
1303 if(itemType == InputSource::IsEvent) {
1305 if(asyncStopRequestedWhileProcessingEvents_) {
1306 forceLooperToEnd_ =
true;
1308 forceLooperToEnd_ =
false;
1309 returnCode = asyncStopStatusCodeFromProcessingEvents_;
1312 itemType = nextItemTypeFromProcessingEvents_;
1315 if(itemType == InputSource::IsEvent) {
1317 else if(itemType == InputSource::IsStop) {
1320 else if(itemType == InputSource::IsFile) {
1323 else if(itemType == InputSource::IsRun) {
1324 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1326 else if(itemType == InputSource::IsLumi) {
1329 else if(itemType == InputSource::IsSynchronize) {
1335 <<
"Unknown next item type passed to EventProcessor\n"
1336 <<
"Please report this error to the Framework group\n";
1338 if(machine->terminated()) {
1396 alreadyHandlingException_ =
true;
1397 terminateMachine(machine);
1398 alreadyHandlingException_ =
false;
1399 if (!exceptionMessageLumis_.empty()) {
1402 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1405 if (!exceptionMessageRuns_.empty()) {
1408 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1411 if (!exceptionMessageFiles_.empty()) {
1414 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1420 if(machine->terminated()) {
1421 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1425 if(stateMachineWasInErrorState_) {
1427 <<
"The boost state machine in the EventProcessor exited after\n"
1428 <<
"entering the Error state.\n";
1432 if(machine.get() != 0) {
1433 terminateMachine(machine);
1435 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1436 <<
"Please report this error to the Framework group\n";
1442 void EventProcessor::readFile() {
1443 FDEBUG(1) <<
" \treadFile\n";
1444 size_t size = preg_->size();
1445 fb_ = input_->readFile();
1446 if(size < preg_->
size()) {
1447 principalCache_.adjustIndexesAfterProductRegistryAddition();
1449 principalCache_.adjustEventsToNewProductRegistry(preg_);
1450 if(numberOfForkedChildren_ > 0) {
1451 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1455 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1456 if (fb_.get() !=
nullptr) {
1457 input_->closeFile(fb_.get(), cleaningUpAfterException);
1459 FDEBUG(1) <<
"\tcloseInputFile\n";
1462 void EventProcessor::openOutputFiles() {
1463 if (fb_.get() !=
nullptr) {
1464 schedule_->openOutputFiles(*fb_);
1465 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1467 FDEBUG(1) <<
"\topenOutputFiles\n";
1470 void EventProcessor::closeOutputFiles() {
1471 if (fb_.get() !=
nullptr) {
1472 schedule_->closeOutputFiles();
1473 if(hasSubProcess()) subProcess_->closeOutputFiles();
1475 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1478 void EventProcessor::respondToOpenInputFile() {
1479 if (fb_.get() !=
nullptr) {
1480 schedule_->respondToOpenInputFile(*fb_);
1481 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1483 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1486 void EventProcessor::respondToCloseInputFile() {
1487 if (fb_.get() !=
nullptr) {
1488 schedule_->respondToCloseInputFile(*fb_);
1489 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1491 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1494 void EventProcessor::startingNewLoop() {
1495 shouldWeStop_ =
false;
1498 if(looper_ && looperBeginJobRun_) {
1499 looper_->doStartingNewLoop();
1501 FDEBUG(1) <<
"\tstartingNewLoop\n";
1504 bool EventProcessor::endOfLoop() {
1507 looper_->setModuleChanger(&changer);
1509 looper_->setModuleChanger(
nullptr);
1510 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1513 FDEBUG(1) <<
"\tendOfLoop\n";
1517 void EventProcessor::rewindInput() {
1520 FDEBUG(1) <<
"\trewind\n";
1523 void EventProcessor::prepareForNextLoop() {
1524 looper_->prepareForNextLoop(esp_.get());
1525 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1528 bool EventProcessor::shouldWeCloseOutput()
const {
1529 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1530 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1533 void EventProcessor::doErrorStuff() {
1534 FDEBUG(1) <<
"\tdoErrorStuff\n";
1536 <<
"The EventProcessor state machine encountered an unexpected event\n"
1537 <<
"and went to the error state\n"
1538 <<
"Will attempt to terminate processing normally\n"
1539 <<
"(IF using the looper the next loop will be attempted)\n"
1540 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1541 stateMachineWasInErrorState_ =
true;
1546 input_->doBeginRun(runPrincipal, &processContext_);
1549 if(forceESCacheClearOnNewRun_){
1550 espController_->forceCacheClear();
1552 espController_->eventSetupForInstance(ts);
1554 if(looper_ && looperBeginJobRun_==
false) {
1556 looper_->beginOfJob(es);
1557 looperBeginJobRun_ =
true;
1558 looper_->doStartingNewLoop();
1562 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1563 if(hasSubProcess()) {
1564 subProcess_->doBeginRun(runPrincipal, ts);
1569 looper_->doBeginRun(runPrincipal, es, &processContext_);
1573 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1574 schedule_->processOneStream<Traits>(
i,runPrincipal, es);
1575 if(hasSubProcess()) {
1576 subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1588 input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1589 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1591 espController_->eventSetupForInstance(ts);
1594 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1596 schedule_->processOneStream<Traits>(
i,runPrincipal, es, cleaningUpAfterException);
1597 if(hasSubProcess()) {
1598 subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1608 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1609 if(hasSubProcess()) {
1610 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1615 looper_->doEndRun(runPrincipal, es, &processContext_);
1621 input_->doBeginLumi(lumiPrincipal, &processContext_);
1626 rng->preBeginLumi(lb);
1632 espController_->eventSetupForInstance(ts);
1636 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1637 if(hasSubProcess()) {
1638 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1641 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1643 looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1646 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1648 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es);
1649 if(hasSubProcess()) {
1650 subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1654 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1662 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1667 espController_->eventSetupForInstance(ts);
1670 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1672 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es, cleaningUpAfterException);
1673 if(hasSubProcess()) {
1674 subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1678 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1684 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1685 if(hasSubProcess()) {
1686 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1689 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1691 looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1696 if (principalCache_.hasRunPrincipal()) {
1698 <<
"EventProcessor::readRun\n"
1699 <<
"Illegal attempt to insert run into cache\n"
1700 <<
"Contact a Framework Developer\n";
1702 boost::shared_ptr<RunPrincipal> rp(
new RunPrincipal(input_->runAuxiliary(),
1704 *processConfiguration_,
1705 historyAppender_.get(),
1707 input_->readRun(*rp, *historyAppender_);
1708 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1709 principalCache_.insert(rp);
1714 principalCache_.merge(input_->runAuxiliary(), preg_);
1715 auto runPrincipal =principalCache_.runPrincipalPtr();
1716 input_->readAndMergeRun(*runPrincipal);
1717 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1718 return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1721 int EventProcessor::readLuminosityBlock() {
1722 if (principalCache_.hasLumiPrincipal()) {
1724 <<
"EventProcessor::readRun\n"
1725 <<
"Illegal attempt to insert lumi into cache\n"
1726 <<
"Contact a Framework Developer\n";
1728 if (!principalCache_.hasRunPrincipal()) {
1730 <<
"EventProcessor::readRun\n"
1731 <<
"Illegal attempt to insert lumi into cache\n"
1732 <<
"Run is invalid\n"
1733 <<
"Contact a Framework Developer\n";
1737 *processConfiguration_,
1738 historyAppender_.get(),
1740 input_->readLuminosityBlock(*lbp, *historyAppender_);
1741 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1742 principalCache_.insert(lbp);
1743 return input_->luminosityBlock();
1746 int EventProcessor::readAndMergeLumi() {
1747 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1748 input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1749 return input_->luminosityBlock();
1765 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1766 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1767 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1771 principalCache_.deleteLumi(phid, run, lumi);
1772 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1773 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1776 void EventProcessor::readAndProcessEvent() {
1777 if(numberOfForkedChildren_>0) {
1785 asyncStopRequestedWhileProcessingEvents_ =
false;
1788 unsigned int nextStreamIndex=0;
1789 const unsigned int kNumStreams = preallocations_.numberOfStreams();
1791 readEvent(nextStreamIndex);
1793 nextStreamIndex = (nextStreamIndex+1) % kNumStreams;
1795 if(shouldWeStop()) {
1798 itemType = input_->nextItemType();
1799 if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1802 }
while (itemType == InputSource::IsEvent);
1803 nextItemTypeFromProcessingEvents_ = itemType;
1805 void EventProcessor::readEvent(
unsigned int iStreamIndex) {
1807 auto&
event = principalCache_.eventPrincipal(iStreamIndex);
1809 input_->readEvent(
event, streamContext);
1810 FDEBUG(1) <<
"\treadEvent\n";
1813 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1814 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1815 assert(pep->luminosityBlockPrincipalPtrValid());
1816 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1817 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1825 schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
1826 if(hasSubProcess()) {
1827 subProcess_->doEvent(*pep);
1833 bool randomAccess = input_->randomAccess();
1841 StreamContext streamContext(pep->streamID(), &processContext_);
1842 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
1847 input_->skipEvents(-2);
1855 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
1859 FDEBUG(1) <<
"\tprocessEvent\n";
1860 pep->clearEventPrincipal();
1863 bool EventProcessor::shouldWeStop()
const {
1864 FDEBUG(1) <<
"\tshouldWeStop\n";
1865 if(shouldWeStop_)
return true;
1866 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
1870 exceptionMessageFiles_ =
message;
1874 exceptionMessageRuns_ =
message;
1878 exceptionMessageLumis_ =
message;
1881 bool EventProcessor::alreadyHandlingException()
const {
1882 return alreadyHandlingException_;
1885 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
1886 if(iMachine.get() != 0) {
1887 if(!iMachine->terminated()) {
1888 forceLooperToEnd_ =
true;
1890 forceLooperToEnd_ =
false;
1893 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
1895 if(iMachine->terminated()) {
1896 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
ProcessContext processContext_
void clear()
Not thread safe.
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
Timestamp const & endTime() const
unsigned int numberOfRuns() const
int numberOfForkedChildren_
bool lastOperationSucceeded() const
void call(boost::function< void(void)>)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< ActivityRegistry > actReg_
unsigned int numberOfThreads() const
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< ProcessConfiguration > processConfiguration_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
boost::shared_ptr< edm::ParameterSet > parameterSet()
bool continueAfterChildFailure_
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
void stdToEDM(std::exception const &e)
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
std::unique_ptr< SignallingProductRegistry > preg_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
unsigned int numberOfLuminosityBlocks() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
boost::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void charPtrToEDM(char const *c)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
volatile std::atomic< bool > shutdown_flag false
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
void validate(ParameterSet &pset, std::string const &moduleLabel) const
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)