61 #include "boost/bind.hpp"
62 #include "boost/thread/xtime.hpp"
74 #include <sys/types.h>
76 #include <sys/socket.h>
77 #include <sys/select.h>
78 #include <sys/fcntl.h>
87 #include "Cintex/Cintex.h"
91 namespace event_processor {
105 using namespace event_processor;
108 template <
typename T>
109 class ScheduleSignalSentry {
112 a_(a), principal_(principal), es_(es), allowThrow_(
false) {
113 if (a_) T::preScheduleSignal(a_, principal_);
117 if (a_)
if (principal_) T::postScheduleSignal(a_, principal_, es_);
119 if( allowThrow_) {
throw;}
129 typename T::MyPrincipal* principal_;
131 bool allowThrow_ =
true;
140 char const* stateNames[] = {
155 char const* msgNames[] = {
271 std::unique_ptr<InputSource>
275 boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
276 boost::shared_ptr<ActivityRegistry> areg,
277 boost::shared_ptr<ProcessConfiguration const> processConfiguration) {
279 if(main_input == 0) {
281 <<
"There must be exactly one source in the configuration.\n"
282 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
287 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
290 filler->fill(descriptions);
304 std::ostringstream ost;
305 ost <<
"Validating configuration of input source of type " << modtype;
321 processConfiguration.get());
324 areg->preSourceConstructionSignal_(md);
325 std::unique_ptr<InputSource>
input;
338 areg->postSourceConstructionSignal_(md);
339 std::ostringstream ost;
340 ost <<
"Constructing input source of type " << modtype;
344 areg->postSourceConstructionSignal_(md);
349 boost::shared_ptr<EDLooperBase>
353 boost::shared_ptr<EDLooperBase> vLooper;
355 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
357 if(loopers.size() == 0) {
361 assert(1 == loopers.size());
363 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
381 std::vector<std::string>
const& defaultServices,
382 std::vector<std::string>
const& forcedServices) :
383 preProcessEventSignal_(),
384 postProcessEventSignal_(),
387 branchIDListHelper_(),
390 espController_(new eventsetup::EventSetupsController),
393 processConfiguration_(),
412 shouldWeStop_(false),
413 stateMachineWasInErrorState_(false),
416 exceptionMessageFiles_(),
417 exceptionMessageRuns_(),
418 exceptionMessageLumis_(),
419 alreadyHandlingException_(false),
420 forceLooperToEnd_(false),
421 looperBeginJobRun_(false),
422 forceESCacheClearOnNewRun_(false),
423 numberOfForkedChildren_(0),
424 numberOfSequentialEventsPerChild_(1),
425 setCpuAffinity_(false),
426 eventSetupDataToExcludeFromPrefetching_() {
428 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
429 processDesc->addServices(defaultServices, forcedServices);
430 init(processDesc, iToken, iLegacy);
434 std::vector<std::string>
const& defaultServices,
435 std::vector<std::string>
const& forcedServices) :
436 preProcessEventSignal_(),
437 postProcessEventSignal_(),
440 branchIDListHelper_(),
443 espController_(new eventsetup::EventSetupsController),
446 processConfiguration_(),
465 shouldWeStop_(false),
466 stateMachineWasInErrorState_(false),
469 exceptionMessageFiles_(),
470 exceptionMessageRuns_(),
471 exceptionMessageLumis_(),
472 alreadyHandlingException_(false),
473 forceLooperToEnd_(false),
474 looperBeginJobRun_(false),
475 forceESCacheClearOnNewRun_(false),
476 numberOfForkedChildren_(0),
477 numberOfSequentialEventsPerChild_(1),
478 setCpuAffinity_(false),
479 eventSetupDataToExcludeFromPrefetching_() {
481 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
482 processDesc->addServices(defaultServices, forcedServices);
489 preProcessEventSignal_(),
490 postProcessEventSignal_(),
493 branchIDListHelper_(),
496 espController_(new eventsetup::EventSetupsController),
499 processConfiguration_(),
518 shouldWeStop_(false),
519 stateMachineWasInErrorState_(false),
522 exceptionMessageFiles_(),
523 exceptionMessageRuns_(),
524 exceptionMessageLumis_(),
525 alreadyHandlingException_(false),
526 forceLooperToEnd_(false),
527 looperBeginJobRun_(false),
528 forceESCacheClearOnNewRun_(false),
529 numberOfForkedChildren_(0),
530 numberOfSequentialEventsPerChild_(1),
531 setCpuAffinity_(false),
532 eventSetupDataToExcludeFromPrefetching_() {
533 init(processDesc, token, legacy);
538 preProcessEventSignal_(),
539 postProcessEventSignal_(),
542 branchIDListHelper_(),
545 espController_(new eventsetup::EventSetupsController),
548 processConfiguration_(),
567 shouldWeStop_(false),
568 stateMachineWasInErrorState_(false),
571 exceptionMessageFiles_(),
572 exceptionMessageRuns_(),
573 exceptionMessageLumis_(),
574 alreadyHandlingException_(false),
575 forceLooperToEnd_(false),
576 looperBeginJobRun_(false),
577 forceESCacheClearOnNewRun_(false),
578 numberOfForkedChildren_(0),
579 numberOfSequentialEventsPerChild_(1),
580 setCpuAffinity_(false),
581 eventSetupDataToExcludeFromPrefetching_() {
584 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
588 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(config));
600 ROOT::Cintex::Cintex::Enable();
602 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
616 setCpuAffinity_ = forking.getUntrackedParameter<
bool>(
"setCpuAffinity",
false);
618 std::vector<ParameterSet>
const& excluded = forking.getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
619 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
623 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
624 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
632 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
640 boost::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
665 FDEBUG(2) << parameterSet << std::endl;
673 if(subProcessParameterSet) {
700 psetRegistry->
data().clear();
746 ex.
addContext(
"Calling beginJob for the source");
752 actReg_->postBeginJobSignal_();
758 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
775 c.
call([actReg](){actReg->postEndJobSignal_();});
790 volatile bool child_failed =
false;
791 volatile unsigned int num_children_done = 0;
792 volatile int child_fail_exit_status = 0;
793 volatile int child_fail_signal = 0;
799 void ep_sigchld(
int, siginfo_t*,
void*) {
803 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
806 if(WIFEXITED(stat_loc)) {
808 if(0 != WEXITSTATUS(stat_loc)) {
809 child_fail_exit_status = WEXITSTATUS(stat_loc);
813 if(WIFSIGNALED(stat_loc)) {
815 child_fail_signal = WTERMSIG(stat_loc);
818 p = waitpid(-1, &stat_loc, WNOHANG);
833 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
835 while(numberOfChildren != 0) {
837 numberOfChildren /= 10;
847 class MessageSenderToSource {
849 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
853 const std::vector<int>& m_childrenPipes;
854 long const m_nEventsToProcess;
856 unsigned int m_aliveChildren;
860 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
861 std::vector<int>
const& childrenPipes,
862 long iNEventsToProcess):
863 m_childrenPipes(childrenPipes),
864 m_nEventsToProcess(iNEventsToProcess),
865 m_aliveChildren(childrenSockets.
size()),
868 FD_ZERO(&m_socketSet);
869 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
871 FD_SET(*it, &m_socketSet);
876 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
878 FD_SET(*it, &m_socketSet);
903 MessageSenderToSource::operator()() {
904 multicore::MessageForParent childMsg;
905 LogInfo(
"ForkingController") <<
"I am controller";
908 multicore::MessageForSource sndmsg;
909 sndmsg.startIndex = 0;
910 sndmsg.nIndices = m_nEventsToProcess;
913 fd_set readSockets, errorSockets;
915 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
916 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
919 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
921 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
930 if (FD_ISSET(
idx, &errorSockets)) {
931 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
932 FD_CLR(
idx, &m_socketSet);
935 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
943 if (!FD_ISSET(
idx, &readSockets)) {
949 bool is_pipe =
false;
950 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
954 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
957 FD_CLR(
idx, &m_socketSet);
965 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
967 FD_CLR(
idx, &m_socketSet);
976 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
978 FD_CLR(
idx, &m_socketSet);
983 sndmsg.startIndex += sndmsg.nIndices;
987 }
while (m_aliveChildren > 0);
995 void EventProcessor::possiblyContinueAfterForkChildFailure() {
996 if(child_failed && continueAfterChildFailure_) {
997 if (child_fail_signal) {
998 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1000 }
else if (child_fail_exit_status) {
1001 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1002 child_fail_exit_status=0;
1004 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1006 child_failed =
false;
1013 if(0 == numberOfForkedChildren_) {
return true;}
1014 assert(0<numberOfForkedChildren_);
1022 itemType = input_->nextItemType();
1024 assert(itemType == InputSource::IsFile);
1028 itemType = input_->nextItemType();
1029 assert(itemType == InputSource::IsRun);
1031 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
1033 input_->runAuxiliary()->beginTime());
1034 espController_->eventSetupForInstance(ts);
1038 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1040 std::vector<eventsetup::DataKey> dataKeys;
1041 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1046 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
1048 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1049 excludedData = &(itExcludeRec->second);
1050 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
1055 if(0 != recordPtr) {
1058 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1059 itDataKey != itDataKeyEnd;
1062 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1063 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
1067 recordPtr->
doGet(*itDataKey);
1075 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
1080 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1083 actReg_->preForkReleaseResourcesSignal_();
1084 input_->doPreForkReleaseResources();
1085 schedule_->preForkReleaseResources();
1090 unsigned int childIndex = 0;
1091 unsigned int const kMaxChildren = numberOfForkedChildren_;
1092 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1093 std::vector<pid_t> childrenIds;
1094 childrenIds.reserve(kMaxChildren);
1095 std::vector<int> childrenSockets;
1096 childrenSockets.reserve(kMaxChildren);
1097 std::vector<int> childrenPipes;
1098 childrenPipes.reserve(kMaxChildren);
1099 std::vector<int> childrenSocketsCopy;
1100 childrenSocketsCopy.reserve(kMaxChildren);
1101 std::vector<int> childrenPipesCopy;
1102 childrenPipesCopy.reserve(kMaxChildren);
1109 int sockets[2], fd_flags;
1110 for(; childIndex < kMaxChildren; ++childIndex) {
1112 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1113 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1117 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1121 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1122 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1127 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1128 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1131 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1132 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1135 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1136 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1141 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1142 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1145 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1146 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1150 childrenPipesCopy = childrenPipes;
1151 childrenSocketsCopy = childrenSockets;
1153 pid_t
value = fork();
1159 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1162 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1169 std::stringstream stout;
1170 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1171 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1172 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1174 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1175 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1178 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1179 if(setCpuAffinity_) {
1187 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1189 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1192 CPU_SET(childIndex, &mask);
1193 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1194 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1206 LogError(
"ForkingChild") <<
"failed to create a child";
1209 childrenIds.push_back(value);
1210 childrenSockets.push_back(sockets[0]);
1211 childrenPipes.push_back(pipes[0]);
1214 if(childIndex < kMaxChildren) {
1215 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1216 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1219 input_->doPostForkReacquireResources(receiver);
1220 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1225 jobReport->parentAfterFork(jobReportFile);
1235 sigset_t blockingSigSet;
1236 sigset_t unblockingSigSet;
1238 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1239 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1240 sigaddset(&blockingSigSet, SIGCHLD);
1241 sigaddset(&blockingSigSet, SIGUSR2);
1242 sigaddset(&blockingSigSet, SIGINT);
1243 sigdelset(&unblockingSigSet, SIGCHLD);
1244 sigdelset(&unblockingSigSet, SIGUSR2);
1245 sigdelset(&unblockingSigSet, SIGINT);
1246 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1250 bool too_many_fds =
false;
1251 if (pipes[1]+1 > FD_SETSIZE) {
1252 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1253 too_many_fds =
true;
1259 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1260 boost::thread senderThread(sender);
1262 if(not too_many_fds) {
1265 possiblyContinueAfterForkChildFailure();
1266 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1267 sigsuspend(&unblockingSigSet);
1268 possiblyContinueAfterForkChildFailure();
1269 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1272 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1274 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1276 LogError(
"ForkingStopping") <<
"child failed";
1279 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1282 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1283 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1284 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1285 it != itEnd; ++it) {
1288 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1289 while(num_children_done != kMaxChildren) {
1290 sigsuspend(&unblockingSigSet);
1292 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1295 senderThread.join();
1296 if(child_failed && !continueAfterChildFailure_) {
1297 if (child_fail_signal) {
1298 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1299 }
else if (child_fail_exit_status) {
1300 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1302 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1306 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1320 std::vector<ModuleDescription const*>
1321 EventProcessor::getAllModuleDescriptions()
const {
1322 return schedule_->getAllModuleDescriptions();
1326 EventProcessor::totalEvents()
const {
1327 return schedule_->totalEvents();
1331 EventProcessor::totalEventsPassed()
const {
1332 return schedule_->totalEventsPassed();
1336 EventProcessor::totalEventsFailed()
const {
1337 return schedule_->totalEventsFailed();
1341 EventProcessor::enableEndPaths(
bool active) {
1342 schedule_->enableEndPaths(active);
1346 EventProcessor::endPathsEnabled()
const {
1347 return schedule_->endPathsEnabled();
1352 schedule_->getTriggerReport(rep);
1356 EventProcessor::clearCounters() {
1357 schedule_->clearCounters();
1360 char const* EventProcessor::currentStateName()
const {
1361 return stateName(getState());
1364 char const* EventProcessor::stateName(
State s)
const {
1365 return stateNames[
s];
1368 char const* EventProcessor::msgName(
Msg m)
const {
1372 State EventProcessor::getState()
const {
1384 if(runNumber == 0) {
1387 <<
"EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
1388 <<
"Run number was set to 1 instead\n";
1396 input_->setRunNumber(runNumber);
1410 EventProcessor::waitForAsyncCompletion(
unsigned int timeout_seconds) {
1412 boost::xtime timeout;
1414 #if BOOST_VERSION >= 105000
1415 boost::xtime_get(&timeout, boost::TIME_UTC_);
1417 boost::xtime_get(&timeout, boost::TIME_UTC);
1419 timeout.sec += timeout_seconds;
1425 boost::mutex::scoped_lock sl(stop_lock_);
1428 if(stop_count_ < 0)
return last_rc_;
1430 if(timeout_seconds == 0) {
1431 while(stop_count_ == 0) stopper_.wait(sl);
1433 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) ==
true);
1443 if(id_set_) pthread_cancel(event_loop_id_);
1447 <<
"An asynchronous request was made to shut down "
1448 <<
"the event loop "
1449 <<
"and the event loop did not shutdown after "
1450 << timeout_seconds <<
" seconds\n";
1452 event_loop_->join();
1453 event_loop_.reset();
1458 return rc ==
false ? epTimedOut : last_rc_;
1462 EventProcessor::waitTillDoneAsync(
unsigned int timeout_value_secs) {
1463 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1472 StatusCode rc = waitForAsyncCompletion(secs);
1473 if(rc != epTimedOut) changeState(
mFinished);
1480 StatusCode rc = waitForAsyncCompletion(secs);
1481 if(rc != epTimedOut) changeState(
mFinished);
1486 void EventProcessor::errorState() {
1496 return waitForAsyncCompletion(60*2);
1502 boost::mutex::scoped_lock sl(state_lock_);
1503 State curr = state_;
1509 (curr !=
table[rc].current ||
1510 (curr ==
table[rc].current &&
1516 <<
"A member function of EventProcessor has been called in an"
1517 <<
" inappropriate order.\n"
1518 <<
"Bad transition from " << stateName(curr) <<
" "
1519 <<
"using message " << msgName(msg) <<
"\n"
1520 <<
"No where to go from here.\n";
1522 FDEBUG(1) <<
"changeState: current=" << stateName(curr)
1523 <<
", message=" << msgName(msg)
1524 <<
" -> new=" << stateName(
table[rc].
final) <<
"\n";
1526 state_ =
table[rc].final;
1529 void EventProcessor::runAsync() {
1532 boost::mutex::scoped_lock sl(stop_lock_);
1533 if(id_set_ ==
true) {
1534 std::string err(
"runAsync called while async event loop already running\n");
1542 last_rc_ = epSuccess;
1543 event_loop_.reset(
new boost::thread(boost::bind(EventProcessor::asyncRun,
this)));
1544 boost::xtime timeout;
1545 #if BOOST_VERSION >= 105000
1546 boost::xtime_get(&timeout, boost::TIME_UTC_);
1548 boost::xtime_get(&timeout, boost::TIME_UTC);
1551 if(starter_.timed_wait(sl, timeout) ==
false) {
1554 <<
"Async run thread did not start in 60 seconds\n";
1569 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1571 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1572 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1575 boost::mutex::scoped_lock sl(me->
stop_lock_);
1582 FDEBUG(2) <<
"asyncRun starting ......................\n";
1585 bool onlineStateTransitions =
true;
1589 LogError(
"FwkJob") <<
"cms::Exception caught in "
1590 <<
"EventProcessor::asyncRun"
1596 LogError(
"FwkJob") <<
"Standard library exception caught in "
1597 <<
"EventProcessor::asyncRun"
1603 LogError(
"FwkJob") <<
"Unknown exception caught in "
1604 <<
"EventProcessor::asyncRun"
1614 boost::mutex::scoped_lock sl(me->
stop_lock_);
1618 FDEBUG(2) <<
"asyncRun ending ......................\n";
1621 std::auto_ptr<statemachine::Machine>
1622 EventProcessor::createStateMachine() {
1629 << fileMode_ <<
".\n"
1630 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1640 << emptyRunLumiMode_ <<
".\n"
1641 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1648 machine->initiate();
1654 EventProcessor::runToCompletion(
bool onlineStateTransitions) {
1659 std::auto_ptr<statemachine::Machine> machine;
1663 if(!onlineStateTransitions) changeState(
mRunCount);
1666 stateMachineWasInErrorState_ =
false;
1671 machine = createStateMachine();
1680 if(numberOfForkedChildren_ > 0) {
1681 size_t size = preg_->size();
1682 more = input_->skipForForking();
1684 if(size < preg_->
size()) {
1685 principalCache_.adjustIndexesAfterProductRegistryAddition();
1687 principalCache_.adjustEventToNewProductRegistry(preg_);
1690 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1692 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1702 FDEBUG(1) <<
"In main processing loop, encountered sStopping state\n";
1703 forceLooperToEnd_ =
true;
1705 forceLooperToEnd_ =
false;
1709 FDEBUG(1) <<
"In main processing loop, encountered sShuttingDown state\n";
1710 forceLooperToEnd_ =
true;
1712 forceLooperToEnd_ =
false;
1718 boost::mutex::scoped_lock sl(
usr2_lock);
1721 returnCode = epSignal;
1722 forceLooperToEnd_ =
true;
1724 forceLooperToEnd_ =
false;
1729 if(itemType == InputSource::IsStop) {
1732 else if(itemType == InputSource::IsFile) {
1735 else if(itemType == InputSource::IsRun) {
1736 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1738 else if(itemType == InputSource::IsLumi) {
1741 else if(itemType == InputSource::IsEvent) {
1747 <<
"Unknown next item type passed to EventProcessor\n"
1748 <<
"Please report this error to the Framework group\n";
1751 if(machine->terminated()) {
1810 alreadyHandlingException_ =
true;
1811 terminateMachine(machine);
1812 alreadyHandlingException_ =
false;
1813 if (!exceptionMessageLumis_.empty()) {
1816 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1819 if (!exceptionMessageRuns_.empty()) {
1822 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1825 if (!exceptionMessageFiles_.empty()) {
1828 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1834 if(machine->terminated()) {
1835 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1839 if(!onlineStateTransitions) changeState(
mFinished);
1841 if(stateMachineWasInErrorState_) {
1843 <<
"The boost state machine in the EventProcessor exited after\n"
1844 <<
"entering the Error state.\n";
1848 if(machine.get() != 0) {
1849 terminateMachine(machine);
1851 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1852 <<
"Please report this error to the Framework group\n";
1860 void EventProcessor::readFile() {
1861 FDEBUG(1) <<
" \treadFile\n";
1862 size_t size = preg_->size();
1863 fb_ = input_->readFile();
1864 if(size < preg_->
size()) {
1865 principalCache_.adjustIndexesAfterProductRegistryAddition();
1867 principalCache_.adjustEventToNewProductRegistry(preg_);
1868 if(numberOfForkedChildren_ > 0) {
1869 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1873 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1874 if (fb_.get() !=
nullptr) {
1875 input_->closeFile(fb_.get(), cleaningUpAfterException);
1877 FDEBUG(1) <<
"\tcloseInputFile\n";
1880 void EventProcessor::openOutputFiles() {
1881 if (fb_.get() !=
nullptr) {
1882 schedule_->openOutputFiles(*fb_);
1883 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1885 FDEBUG(1) <<
"\topenOutputFiles\n";
1888 void EventProcessor::closeOutputFiles() {
1889 if (fb_.get() !=
nullptr) {
1890 schedule_->closeOutputFiles();
1891 if(hasSubProcess()) subProcess_->closeOutputFiles();
1893 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1896 void EventProcessor::respondToOpenInputFile() {
1897 if (fb_.get() !=
nullptr) {
1898 schedule_->respondToOpenInputFile(*fb_);
1899 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1901 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1904 void EventProcessor::respondToCloseInputFile() {
1905 if (fb_.get() !=
nullptr) {
1906 schedule_->respondToCloseInputFile(*fb_);
1907 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1909 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1912 void EventProcessor::respondToOpenOutputFiles() {
1913 if (fb_.get() !=
nullptr) {
1914 schedule_->respondToOpenOutputFiles(*fb_);
1915 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1917 FDEBUG(1) <<
"\trespondToOpenOutputFiles\n";
1920 void EventProcessor::respondToCloseOutputFiles() {
1921 if (fb_.get() !=
nullptr) {
1922 schedule_->respondToCloseOutputFiles(*fb_);
1923 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1925 FDEBUG(1) <<
"\trespondToCloseOutputFiles\n";
1928 void EventProcessor::startingNewLoop() {
1929 shouldWeStop_ =
false;
1932 if(looper_ && looperBeginJobRun_) {
1933 looper_->doStartingNewLoop();
1935 FDEBUG(1) <<
"\tstartingNewLoop\n";
1938 bool EventProcessor::endOfLoop() {
1941 looper_->setModuleChanger(&changer);
1943 looper_->setModuleChanger(
nullptr);
1944 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1947 FDEBUG(1) <<
"\tendOfLoop\n";
1951 void EventProcessor::rewindInput() {
1954 FDEBUG(1) <<
"\trewind\n";
1957 void EventProcessor::prepareForNextLoop() {
1958 looper_->prepareForNextLoop(esp_.get());
1959 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1962 bool EventProcessor::shouldWeCloseOutput()
const {
1963 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1964 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1967 void EventProcessor::doErrorStuff() {
1968 FDEBUG(1) <<
"\tdoErrorStuff\n";
1970 <<
"The EventProcessor state machine encountered an unexpected event\n"
1971 <<
"and went to the error state\n"
1972 <<
"Will attempt to terminate processing normally\n"
1973 <<
"(IF using the looper the next loop will be attempted)\n"
1974 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1975 stateMachineWasInErrorState_ =
true;
1980 input_->doBeginRun(runPrincipal);
1983 if(forceESCacheClearOnNewRun_){
1984 espController_->forceCacheClear();
1986 espController_->eventSetupForInstance(ts);
1988 if(looper_ && looperBeginJobRun_==
false) {
1990 looper_->beginOfJob(es);
1991 looperBeginJobRun_ =
true;
1992 looper_->doStartingNewLoop();
1996 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1997 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1998 if(hasSubProcess()) {
1999 subProcess_->doBeginRun(runPrincipal, ts);
2001 sentry.allowThrow();
2005 looper_->doBeginRun(runPrincipal, es);
2011 input_->doEndRun(runPrincipal, cleaningUpAfterException);
2012 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
2014 espController_->eventSetupForInstance(ts);
2018 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2019 schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2020 if(hasSubProcess()) {
2021 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2023 sentry.allowThrow();
2027 looper_->doEndRun(runPrincipal, es);
2033 input_->doBeginLumi(lumiPrincipal);
2038 rng->preBeginLumi(lb);
2044 espController_->eventSetupForInstance(ts);
2048 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2049 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2050 if(hasSubProcess()) {
2051 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2053 sentry.allowThrow();
2055 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
2057 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2063 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2068 espController_->eventSetupForInstance(ts);
2072 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2073 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2074 if(hasSubProcess()) {
2075 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2077 sentry.allowThrow();
2079 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
2081 looper_->doEndLuminosityBlock(lumiPrincipal, es);
2086 if (principalCache_.hasRunPrincipal()) {
2088 <<
"EventProcessor::readAndCacheRun\n"
2089 <<
"Illegal attempt to insert run into cache\n"
2090 <<
"Contact a Framework Developer\n";
2092 principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
2097 principalCache_.merge(input_->runAuxiliary(), preg_);
2098 input_->readAndMergeRun(principalCache_.runPrincipalPtr());
2102 int EventProcessor::readAndCacheLumi() {
2103 if (principalCache_.hasLumiPrincipal()) {
2105 <<
"EventProcessor::readAndCacheRun\n"
2106 <<
"Illegal attempt to insert lumi into cache\n"
2107 <<
"Contact a Framework Developer\n";
2109 if (!principalCache_.hasRunPrincipal()) {
2111 <<
"EventProcessor::readAndCacheRun\n"
2112 <<
"Illegal attempt to insert lumi into cache\n"
2113 <<
"Run is invalid\n"
2114 <<
"Contact a Framework Developer\n";
2116 principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
2117 principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
2118 return input_->luminosityBlock();
2121 int EventProcessor::readAndMergeLumi() {
2122 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
2123 input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
2124 return input_->luminosityBlock();
2140 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2141 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2142 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
2146 principalCache_.deleteLumi(phid, run, lumi);
2147 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2148 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
2151 void EventProcessor::readAndProcessEvent() {
2152 EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
2153 FDEBUG(1) <<
"\treadEvent\n";
2157 assert(principalCache_.lumiPrincipalPtr()->run() == pep->
run());
2158 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->
luminosityBlock());
2161 espController_->eventSetupForInstance(ts);
2165 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2166 schedule_->processOneOccurrence<Traits>(*pep, es);
2167 if(hasSubProcess()) {
2168 subProcess_->doEvent(*pep, ts);
2170 sentry.allowThrow();
2174 bool randomAccess = input_->randomAccess();
2181 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2186 input_->skipEvents(-2);
2194 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2198 FDEBUG(1) <<
"\tprocessEvent\n";
2202 bool EventProcessor::shouldWeStop()
const {
2203 FDEBUG(1) <<
"\tshouldWeStop\n";
2204 if(shouldWeStop_)
return true;
2205 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2209 exceptionMessageFiles_ =
message;
2213 exceptionMessageRuns_ =
message;
2217 exceptionMessageLumis_ =
message;
2220 bool EventProcessor::alreadyHandlingException()
const {
2221 return alreadyHandlingException_;
2224 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2225 if(iMachine.get() != 0) {
2226 if(!iMachine->terminated()) {
2227 forceLooperToEnd_ =
true;
2229 forceLooperToEnd_ =
false;
2232 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2234 if(iMachine->terminated()) {
2235 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
T getParameter(std::string const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ActivityRegistry::PostProcessEvent postProcessEventSignal_
virtual std::string explainSelf() const
ParameterSetID id() const
static ThreadSafeRegistry * instance()
boost::shared_ptr< EDLooperBase > looper_
std::unique_ptr< ActionTable const > act_table_
Timestamp const & endTime() const
int numberOfForkedChildren_
EventID const & id() const
bool lastOperationSucceeded() const
void call(boost::function< void(void)>)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
boost::shared_ptr< ActivityRegistry > actReg_
LuminosityBlockNumber_t luminosityBlock() const
void installCustomHandler(int signum, CFUNC func)
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< ProcessConfiguration > processConfiguration_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet)
Timestamp const & time() const
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
bool continueAfterChildFailure_
std::string last_error_text_
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
StateSentry(EventProcessor *ep)
std::unique_ptr< HistoryAppender > historyAppender_
ParameterSet const & getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
void stdToEDM(std::exception const &e)
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
volatile event_processor::State state_
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
std::unique_ptr< SignallingProductRegistry > preg_
void connectSigs(EventProcessor *ep)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
void clearEventPrincipal()
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
boost::condition starter_
edm::ProcessHistoryID const & processHistoryID() const
volatile bool shutdown_flag
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
ActivityRegistry::PreProcessEvent preProcessEventSignal_
collection_type & data()
Provide access to the contained collection.
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
void validate(ParameterSet &pset, std::string const &moduleLabel) const
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
bool luminosityBlockPrincipalPtrValid()
ParameterSet const & registerIt()
static ComponentFactory< T > * get()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
tuple size
Write out results.
Transition requestedTransition() const
T get(const Candidate &c)
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)