61 #include "boost/bind.hpp"
62 #include "boost/thread/xtime.hpp"
74 #include <sys/types.h>
76 #include <sys/socket.h>
77 #include <sys/select.h>
78 #include <sys/fcntl.h>
87 #include "Cintex/Cintex.h"
91 namespace event_processor {
105 using namespace event_processor;
108 template <
typename T>
109 class ScheduleSignalSentry {
112 a_(a), principal_(principal), es_(es) {
113 if (a_) T::preScheduleSignal(a_, principal_);
115 ~ScheduleSignalSentry() {
116 if (a_)
if (principal_) T::postScheduleSignal(a_, principal_, es_);
122 typename T::MyPrincipal* principal_;
132 char const* stateNames[] = {
147 char const* msgNames[] = {
263 std::unique_ptr<InputSource>
267 boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
268 boost::shared_ptr<ActivityRegistry> areg,
269 boost::shared_ptr<ProcessConfiguration const> processConfiguration) {
271 if(main_input == 0) {
273 <<
"There must be exactly one source in the configuration.\n"
274 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
279 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
282 filler->fill(descriptions);
296 std::ostringstream ost;
297 ost <<
"Validating configuration of input source of type " << modtype;
313 processConfiguration.get());
316 areg->preSourceConstructionSignal_(md);
317 std::unique_ptr<InputSource>
input;
330 areg->postSourceConstructionSignal_(md);
331 std::ostringstream ost;
332 ost <<
"Constructing input source of type " << modtype;
336 areg->postSourceConstructionSignal_(md);
341 boost::shared_ptr<EDLooperBase>
345 boost::shared_ptr<EDLooperBase> vLooper;
347 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
349 if(loopers.size() == 0) {
353 assert(1 == loopers.size());
355 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
373 std::vector<std::string>
const& defaultServices,
374 std::vector<std::string>
const& forcedServices) :
375 preProcessEventSignal_(),
376 postProcessEventSignal_(),
379 branchIDListHelper_(),
382 espController_(new eventsetup::EventSetupsController),
385 processConfiguration_(),
404 shouldWeStop_(
false),
405 stateMachineWasInErrorState_(
false),
408 exceptionMessageFiles_(),
409 exceptionMessageRuns_(),
410 exceptionMessageLumis_(),
411 alreadyHandlingException_(
false),
412 forceLooperToEnd_(
false),
413 looperBeginJobRun_(
false),
414 forceESCacheClearOnNewRun_(
false),
415 numberOfForkedChildren_(0),
416 numberOfSequentialEventsPerChild_(1),
417 setCpuAffinity_(
false),
418 eventSetupDataToExcludeFromPrefetching_() {
420 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
421 processDesc->addServices(defaultServices, forcedServices);
422 init(processDesc, iToken, iLegacy);
426 std::vector<std::string>
const& defaultServices,
427 std::vector<std::string>
const& forcedServices) :
428 preProcessEventSignal_(),
429 postProcessEventSignal_(),
432 branchIDListHelper_(),
435 espController_(new eventsetup::EventSetupsController),
438 processConfiguration_(),
457 shouldWeStop_(
false),
458 stateMachineWasInErrorState_(
false),
461 exceptionMessageFiles_(),
462 exceptionMessageRuns_(),
463 exceptionMessageLumis_(),
464 alreadyHandlingException_(
false),
465 forceLooperToEnd_(
false),
466 looperBeginJobRun_(
false),
467 forceESCacheClearOnNewRun_(
false),
468 numberOfForkedChildren_(0),
469 numberOfSequentialEventsPerChild_(1),
470 setCpuAffinity_(
false),
471 eventSetupDataToExcludeFromPrefetching_() {
473 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
474 processDesc->addServices(defaultServices, forcedServices);
481 preProcessEventSignal_(),
482 postProcessEventSignal_(),
485 branchIDListHelper_(),
488 espController_(new eventsetup::EventSetupsController),
491 processConfiguration_(),
510 shouldWeStop_(
false),
511 stateMachineWasInErrorState_(
false),
514 exceptionMessageFiles_(),
515 exceptionMessageRuns_(),
516 exceptionMessageLumis_(),
517 alreadyHandlingException_(
false),
518 forceLooperToEnd_(
false),
519 looperBeginJobRun_(
false),
520 forceESCacheClearOnNewRun_(
false),
521 numberOfForkedChildren_(0),
522 numberOfSequentialEventsPerChild_(1),
523 setCpuAffinity_(
false),
524 eventSetupDataToExcludeFromPrefetching_() {
525 init(processDesc, token, legacy);
530 preProcessEventSignal_(),
531 postProcessEventSignal_(),
534 branchIDListHelper_(),
537 espController_(new eventsetup::EventSetupsController),
540 processConfiguration_(),
559 shouldWeStop_(
false),
560 stateMachineWasInErrorState_(
false),
563 exceptionMessageFiles_(),
564 exceptionMessageRuns_(),
565 exceptionMessageLumis_(),
566 alreadyHandlingException_(
false),
567 forceLooperToEnd_(
false),
568 looperBeginJobRun_(
false),
569 forceESCacheClearOnNewRun_(
false),
570 numberOfForkedChildren_(0),
571 numberOfSequentialEventsPerChild_(1),
572 setCpuAffinity_(
false),
573 eventSetupDataToExcludeFromPrefetching_() {
576 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
580 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(config));
592 ROOT::Cintex::Cintex::Enable();
594 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
608 setCpuAffinity_ = forking.getUntrackedParameter<
bool>(
"setCpuAffinity",
false);
610 std::vector<ParameterSet>
const& excluded = forking.getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
611 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
615 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
616 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
624 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
632 boost::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
657 FDEBUG(2) << parameterSet << std::endl;
665 if(subProcessParameterSet) {
692 psetRegistry->
data().clear();
738 ex.
addContext(
"Calling beginJob for the source");
744 actReg_->postBeginJobSignal_();
750 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 c.
call([actReg](){actReg->postEndJobSignal_();});
782 volatile bool child_failed =
false;
783 volatile unsigned int num_children_done = 0;
784 volatile int child_fail_exit_status = 0;
785 volatile int child_fail_signal = 0;
791 void ep_sigchld(
int, siginfo_t*,
void*) {
795 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
798 if(WIFEXITED(stat_loc)) {
800 if(0 != WEXITSTATUS(stat_loc)) {
801 child_fail_exit_status = WEXITSTATUS(stat_loc);
805 if(WIFSIGNALED(stat_loc)) {
807 child_fail_signal = WTERMSIG(stat_loc);
810 p = waitpid(-1, &stat_loc, WNOHANG);
825 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
827 while(numberOfChildren != 0) {
829 numberOfChildren /= 10;
839 class MessageSenderToSource {
841 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
845 const std::vector<int>& m_childrenPipes;
846 long const m_nEventsToProcess;
848 unsigned int m_aliveChildren;
852 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
853 std::vector<int>
const& childrenPipes,
854 long iNEventsToProcess):
855 m_childrenPipes(childrenPipes),
856 m_nEventsToProcess(iNEventsToProcess),
857 m_aliveChildren(childrenSockets.
size()),
860 FD_ZERO(&m_socketSet);
861 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
863 FD_SET(*it, &m_socketSet);
868 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
870 FD_SET(*it, &m_socketSet);
895 MessageSenderToSource::operator()() {
896 multicore::MessageForParent childMsg;
897 LogInfo(
"ForkingController") <<
"I am controller";
900 multicore::MessageForSource sndmsg;
901 sndmsg.startIndex = 0;
902 sndmsg.nIndices = m_nEventsToProcess;
905 fd_set readSockets, errorSockets;
907 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
908 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
911 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
913 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
922 if (FD_ISSET(
idx, &errorSockets)) {
923 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
924 FD_CLR(
idx, &m_socketSet);
927 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
935 if (!FD_ISSET(
idx, &readSockets)) {
941 bool is_pipe =
false;
942 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
946 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
949 FD_CLR(
idx, &m_socketSet);
957 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
959 FD_CLR(
idx, &m_socketSet);
968 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
970 FD_CLR(
idx, &m_socketSet);
975 sndmsg.startIndex += sndmsg.nIndices;
979 }
while (m_aliveChildren > 0);
987 void EventProcessor::possiblyContinueAfterForkChildFailure() {
988 if(child_failed && continueAfterChildFailure_) {
989 if (child_fail_signal) {
990 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
992 }
else if (child_fail_exit_status) {
993 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
994 child_fail_exit_status=0;
996 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1005 if(0 == numberOfForkedChildren_) {
return true;}
1006 assert(0<numberOfForkedChildren_);
1014 itemType = input_->nextItemType();
1016 assert(itemType == InputSource::IsFile);
1020 itemType = input_->nextItemType();
1021 assert(itemType == InputSource::IsRun);
1023 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
1025 input_->runAuxiliary()->beginTime());
1026 espController_->eventSetupForInstance(ts);
1030 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1032 std::vector<eventsetup::DataKey> dataKeys;
1033 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1038 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
1040 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1041 excludedData = &(itExcludeRec->second);
1042 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
1047 if(0 != recordPtr) {
1050 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1051 itDataKey != itDataKeyEnd;
1054 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1055 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
1059 recordPtr->
doGet(*itDataKey);
1067 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
1072 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1075 actReg_->preForkReleaseResourcesSignal_();
1076 input_->doPreForkReleaseResources();
1077 schedule_->preForkReleaseResources();
1082 unsigned int childIndex = 0;
1083 unsigned int const kMaxChildren = numberOfForkedChildren_;
1084 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1085 std::vector<pid_t> childrenIds;
1086 childrenIds.reserve(kMaxChildren);
1087 std::vector<int> childrenSockets;
1088 childrenSockets.reserve(kMaxChildren);
1089 std::vector<int> childrenPipes;
1090 childrenPipes.reserve(kMaxChildren);
1091 std::vector<int> childrenSocketsCopy;
1092 childrenSocketsCopy.reserve(kMaxChildren);
1093 std::vector<int> childrenPipesCopy;
1094 childrenPipesCopy.reserve(kMaxChildren);
1101 int sockets[2], fd_flags;
1102 for(; childIndex < kMaxChildren; ++childIndex) {
1104 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1105 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1109 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1113 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1114 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1119 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1120 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1123 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1124 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1127 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1128 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1133 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1134 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1137 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1138 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1142 childrenPipesCopy = childrenPipes;
1143 childrenSocketsCopy = childrenSockets;
1145 pid_t
value = fork();
1151 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1154 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1161 std::stringstream stout;
1162 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1163 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1164 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1166 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1167 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1170 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1171 if(setCpuAffinity_) {
1179 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1181 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1184 CPU_SET(childIndex, &mask);
1185 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1186 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1198 LogError(
"ForkingChild") <<
"failed to create a child";
1201 childrenIds.push_back(value);
1202 childrenSockets.push_back(sockets[0]);
1203 childrenPipes.push_back(pipes[0]);
1206 if(childIndex < kMaxChildren) {
1207 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1208 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1211 input_->doPostForkReacquireResources(receiver);
1212 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1217 jobReport->parentAfterFork(jobReportFile);
1227 sigset_t blockingSigSet;
1228 sigset_t unblockingSigSet;
1230 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1231 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1232 sigaddset(&blockingSigSet, SIGCHLD);
1233 sigaddset(&blockingSigSet, SIGUSR2);
1234 sigaddset(&blockingSigSet, SIGINT);
1235 sigdelset(&unblockingSigSet, SIGCHLD);
1236 sigdelset(&unblockingSigSet, SIGUSR2);
1237 sigdelset(&unblockingSigSet, SIGINT);
1238 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1242 bool too_many_fds =
false;
1243 if (pipes[1]+1 > FD_SETSIZE) {
1244 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1245 too_many_fds =
true;
1251 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1252 boost::thread senderThread(sender);
1254 if(not too_many_fds) {
1257 possiblyContinueAfterForkChildFailure();
1258 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1259 sigsuspend(&unblockingSigSet);
1260 possiblyContinueAfterForkChildFailure();
1261 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1264 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1266 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1268 LogError(
"ForkingStopping") <<
"child failed";
1271 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1274 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1275 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1276 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1277 it != itEnd; ++it) {
1280 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1281 while(num_children_done != kMaxChildren) {
1282 sigsuspend(&unblockingSigSet);
1284 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1287 senderThread.join();
1288 if(child_failed && !continueAfterChildFailure_) {
1289 if (child_fail_signal) {
1290 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1291 }
else if (child_fail_exit_status) {
1292 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1294 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1298 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1312 std::vector<ModuleDescription const*>
1313 EventProcessor::getAllModuleDescriptions()
const {
1314 return schedule_->getAllModuleDescriptions();
1318 EventProcessor::totalEvents()
const {
1319 return schedule_->totalEvents();
1323 EventProcessor::totalEventsPassed()
const {
1324 return schedule_->totalEventsPassed();
1328 EventProcessor::totalEventsFailed()
const {
1329 return schedule_->totalEventsFailed();
1333 EventProcessor::enableEndPaths(
bool active) {
1334 schedule_->enableEndPaths(active);
1338 EventProcessor::endPathsEnabled()
const {
1339 return schedule_->endPathsEnabled();
1344 schedule_->getTriggerReport(rep);
1348 EventProcessor::clearCounters() {
1349 schedule_->clearCounters();
1352 char const* EventProcessor::currentStateName()
const {
1353 return stateName(getState());
1356 char const* EventProcessor::stateName(
State s)
const {
1357 return stateNames[
s];
1360 char const* EventProcessor::msgName(
Msg m)
const {
1364 State EventProcessor::getState()
const {
1376 if(runNumber == 0) {
1379 <<
"EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
1380 <<
"Run number was set to 1 instead\n";
1388 input_->setRunNumber(runNumber);
1402 EventProcessor::waitForAsyncCompletion(
unsigned int timeout_seconds) {
1404 boost::xtime timeout;
1406 #if BOOST_VERSION >= 105000
1407 boost::xtime_get(&timeout, boost::TIME_UTC_);
1409 boost::xtime_get(&timeout, boost::TIME_UTC);
1411 timeout.sec += timeout_seconds;
1417 boost::mutex::scoped_lock sl(stop_lock_);
1420 if(stop_count_ < 0)
return last_rc_;
1422 if(timeout_seconds == 0) {
1423 while(stop_count_ == 0) stopper_.wait(sl);
1425 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) ==
true);
1435 if(id_set_) pthread_cancel(event_loop_id_);
1439 <<
"An asynchronous request was made to shut down "
1440 <<
"the event loop "
1441 <<
"and the event loop did not shutdown after "
1442 << timeout_seconds <<
" seconds\n";
1444 event_loop_->join();
1445 event_loop_.reset();
1450 return rc ==
false ? epTimedOut : last_rc_;
1454 EventProcessor::waitTillDoneAsync(
unsigned int timeout_value_secs) {
1455 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1464 StatusCode rc = waitForAsyncCompletion(secs);
1465 if(rc != epTimedOut) changeState(
mFinished);
1472 StatusCode rc = waitForAsyncCompletion(secs);
1473 if(rc != epTimedOut) changeState(
mFinished);
1478 void EventProcessor::errorState() {
1488 return waitForAsyncCompletion(60*2);
1494 boost::mutex::scoped_lock sl(state_lock_);
1495 State curr = state_;
1501 (curr !=
table[rc].current ||
1502 (curr ==
table[rc].current &&
1508 <<
"A member function of EventProcessor has been called in an"
1509 <<
" inappropriate order.\n"
1510 <<
"Bad transition from " << stateName(curr) <<
" "
1511 <<
"using message " << msgName(msg) <<
"\n"
1512 <<
"No where to go from here.\n";
1514 FDEBUG(1) <<
"changeState: current=" << stateName(curr)
1515 <<
", message=" << msgName(msg)
1516 <<
" -> new=" << stateName(
table[rc].
final) <<
"\n";
1518 state_ =
table[rc].final;
1521 void EventProcessor::runAsync() {
1524 boost::mutex::scoped_lock sl(stop_lock_);
1525 if(id_set_ ==
true) {
1526 std::string err(
"runAsync called while async event loop already running\n");
1534 last_rc_ = epSuccess;
1535 event_loop_.reset(
new boost::thread(boost::bind(EventProcessor::asyncRun,
this)));
1536 boost::xtime timeout;
1537 #if BOOST_VERSION >= 105000
1538 boost::xtime_get(&timeout, boost::TIME_UTC_);
1540 boost::xtime_get(&timeout, boost::TIME_UTC);
1543 if(starter_.timed_wait(sl, timeout) ==
false) {
1546 <<
"Async run thread did not start in 60 seconds\n";
1561 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1563 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1564 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1567 boost::mutex::scoped_lock sl(me->
stop_lock_);
1574 FDEBUG(2) <<
"asyncRun starting ......................\n";
1577 bool onlineStateTransitions =
true;
1581 LogError(
"FwkJob") <<
"cms::Exception caught in "
1582 <<
"EventProcessor::asyncRun"
1588 LogError(
"FwkJob") <<
"Standard library exception caught in "
1589 <<
"EventProcessor::asyncRun"
1595 LogError(
"FwkJob") <<
"Unknown exception caught in "
1596 <<
"EventProcessor::asyncRun"
1606 boost::mutex::scoped_lock sl(me->
stop_lock_);
1610 FDEBUG(2) <<
"asyncRun ending ......................\n";
1613 std::auto_ptr<statemachine::Machine>
1614 EventProcessor::createStateMachine() {
1621 << fileMode_ <<
".\n"
1622 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1632 << emptyRunLumiMode_ <<
".\n"
1633 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1640 machine->initiate();
1646 EventProcessor::runToCompletion(
bool onlineStateTransitions) {
1651 std::auto_ptr<statemachine::Machine> machine;
1655 if(!onlineStateTransitions) changeState(
mRunCount);
1658 stateMachineWasInErrorState_ =
false;
1663 machine = createStateMachine();
1672 if(numberOfForkedChildren_ > 0) {
1673 size_t size = preg_->size();
1674 more = input_->skipForForking();
1676 if(size < preg_->
size()) {
1677 principalCache_.adjustIndexesAfterProductRegistryAddition();
1679 principalCache_.adjustEventToNewProductRegistry(preg_);
1682 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1684 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1694 FDEBUG(1) <<
"In main processing loop, encountered sStopping state\n";
1695 forceLooperToEnd_ =
true;
1697 forceLooperToEnd_ =
false;
1701 FDEBUG(1) <<
"In main processing loop, encountered sShuttingDown state\n";
1702 forceLooperToEnd_ =
true;
1704 forceLooperToEnd_ =
false;
1710 boost::mutex::scoped_lock sl(
usr2_lock);
1713 returnCode = epSignal;
1714 forceLooperToEnd_ =
true;
1716 forceLooperToEnd_ =
false;
1721 if(itemType == InputSource::IsStop) {
1724 else if(itemType == InputSource::IsFile) {
1727 else if(itemType == InputSource::IsRun) {
1728 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1730 else if(itemType == InputSource::IsLumi) {
1733 else if(itemType == InputSource::IsEvent) {
1739 <<
"Unknown next item type passed to EventProcessor\n"
1740 <<
"Please report this error to the Framework group\n";
1743 if(machine->terminated()) {
1802 alreadyHandlingException_ =
true;
1803 terminateMachine(machine);
1804 alreadyHandlingException_ =
false;
1805 if (!exceptionMessageLumis_.empty()) {
1808 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1811 if (!exceptionMessageRuns_.empty()) {
1814 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1817 if (!exceptionMessageFiles_.empty()) {
1820 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1826 if(machine->terminated()) {
1827 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1831 if(!onlineStateTransitions) changeState(
mFinished);
1833 if(stateMachineWasInErrorState_) {
1835 <<
"The boost state machine in the EventProcessor exited after\n"
1836 <<
"entering the Error state.\n";
1840 if(machine.get() != 0) {
1841 terminateMachine(machine);
1843 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1844 <<
"Please report this error to the Framework group\n";
1852 void EventProcessor::readFile() {
1853 FDEBUG(1) <<
" \treadFile\n";
1854 size_t size = preg_->size();
1855 fb_ = input_->readFile();
1856 if(size < preg_->
size()) {
1857 principalCache_.adjustIndexesAfterProductRegistryAddition();
1859 principalCache_.adjustEventToNewProductRegistry(preg_);
1860 if(numberOfForkedChildren_ > 0) {
1861 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1865 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1866 if (fb_.get() !=
nullptr) {
1867 input_->closeFile(fb_.get(), cleaningUpAfterException);
1869 FDEBUG(1) <<
"\tcloseInputFile\n";
1872 void EventProcessor::openOutputFiles() {
1873 if (fb_.get() !=
nullptr) {
1874 schedule_->openOutputFiles(*fb_);
1875 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1877 FDEBUG(1) <<
"\topenOutputFiles\n";
1880 void EventProcessor::closeOutputFiles() {
1881 if (fb_.get() !=
nullptr) {
1882 schedule_->closeOutputFiles();
1883 if(hasSubProcess()) subProcess_->closeOutputFiles();
1885 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1888 void EventProcessor::respondToOpenInputFile() {
1889 if (fb_.get() !=
nullptr) {
1890 schedule_->respondToOpenInputFile(*fb_);
1891 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1893 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1896 void EventProcessor::respondToCloseInputFile() {
1897 if (fb_.get() !=
nullptr) {
1898 schedule_->respondToCloseInputFile(*fb_);
1899 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1901 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1904 void EventProcessor::respondToOpenOutputFiles() {
1905 if (fb_.get() !=
nullptr) {
1906 schedule_->respondToOpenOutputFiles(*fb_);
1907 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1909 FDEBUG(1) <<
"\trespondToOpenOutputFiles\n";
1912 void EventProcessor::respondToCloseOutputFiles() {
1913 if (fb_.get() !=
nullptr) {
1914 schedule_->respondToCloseOutputFiles(*fb_);
1915 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1917 FDEBUG(1) <<
"\trespondToCloseOutputFiles\n";
1920 void EventProcessor::startingNewLoop() {
1921 shouldWeStop_ =
false;
1924 if(looper_ && looperBeginJobRun_) {
1925 looper_->doStartingNewLoop();
1927 FDEBUG(1) <<
"\tstartingNewLoop\n";
1930 bool EventProcessor::endOfLoop() {
1933 looper_->setModuleChanger(&changer);
1935 looper_->setModuleChanger(
nullptr);
1936 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1939 FDEBUG(1) <<
"\tendOfLoop\n";
1943 void EventProcessor::rewindInput() {
1946 FDEBUG(1) <<
"\trewind\n";
1949 void EventProcessor::prepareForNextLoop() {
1950 looper_->prepareForNextLoop(esp_.get());
1951 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1954 bool EventProcessor::shouldWeCloseOutput()
const {
1955 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1956 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1959 void EventProcessor::doErrorStuff() {
1960 FDEBUG(1) <<
"\tdoErrorStuff\n";
1962 <<
"The EventProcessor state machine encountered an unexpected event\n"
1963 <<
"and went to the error state\n"
1964 <<
"Will attempt to terminate processing normally\n"
1965 <<
"(IF using the looper the next loop will be attempted)\n"
1966 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1967 stateMachineWasInErrorState_ =
true;
1972 input_->doBeginRun(runPrincipal);
1975 if(forceESCacheClearOnNewRun_){
1976 espController_->forceCacheClear();
1978 espController_->eventSetupForInstance(ts);
1980 if(looper_ && looperBeginJobRun_==
false) {
1982 looper_->beginOfJob(es);
1983 looperBeginJobRun_ =
true;
1984 looper_->doStartingNewLoop();
1988 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1989 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1990 if(hasSubProcess()) {
1991 subProcess_->doBeginRun(runPrincipal, ts);
1996 looper_->doBeginRun(runPrincipal, es);
2002 input_->doEndRun(runPrincipal, cleaningUpAfterException);
2003 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
2005 espController_->eventSetupForInstance(ts);
2009 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2010 schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2011 if(hasSubProcess()) {
2012 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2017 looper_->doEndRun(runPrincipal, es);
2023 input_->doBeginLumi(lumiPrincipal);
2028 rng->preBeginLumi(lb);
2034 espController_->eventSetupForInstance(ts);
2038 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2039 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2040 if(hasSubProcess()) {
2041 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2044 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
2046 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2052 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2057 espController_->eventSetupForInstance(ts);
2061 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2062 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2063 if(hasSubProcess()) {
2064 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2067 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
2069 looper_->doEndLuminosityBlock(lumiPrincipal, es);
2074 if (principalCache_.hasRunPrincipal()) {
2076 <<
"EventProcessor::readAndCacheRun\n"
2077 <<
"Illegal attempt to insert run into cache\n"
2078 <<
"Contact a Framework Developer\n";
2080 principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
2085 principalCache_.merge(input_->runAuxiliary(), preg_);
2086 input_->readAndMergeRun(principalCache_.runPrincipalPtr());
2090 int EventProcessor::readAndCacheLumi() {
2091 if (principalCache_.hasLumiPrincipal()) {
2093 <<
"EventProcessor::readAndCacheRun\n"
2094 <<
"Illegal attempt to insert lumi into cache\n"
2095 <<
"Contact a Framework Developer\n";
2097 if (!principalCache_.hasRunPrincipal()) {
2099 <<
"EventProcessor::readAndCacheRun\n"
2100 <<
"Illegal attempt to insert lumi into cache\n"
2101 <<
"Run is invalid\n"
2102 <<
"Contact a Framework Developer\n";
2104 principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
2105 principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
2106 return input_->luminosityBlock();
2109 int EventProcessor::readAndMergeLumi() {
2110 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
2111 input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
2112 return input_->luminosityBlock();
2128 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2129 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2130 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
2134 principalCache_.deleteLumi(phid, run, lumi);
2135 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2136 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
2139 void EventProcessor::readAndProcessEvent() {
2140 EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
2141 FDEBUG(1) <<
"\treadEvent\n";
2145 assert(principalCache_.lumiPrincipalPtr()->run() == pep->
run());
2146 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->
luminosityBlock());
2149 espController_->eventSetupForInstance(ts);
2153 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2154 schedule_->processOneOccurrence<Traits>(*pep, es);
2155 if(hasSubProcess()) {
2156 subProcess_->doEvent(*pep, ts);
2161 bool randomAccess = input_->randomAccess();
2168 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2173 input_->skipEvents(-2);
2181 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2185 FDEBUG(1) <<
"\tprocessEvent\n";
2189 bool EventProcessor::shouldWeStop()
const {
2190 FDEBUG(1) <<
"\tshouldWeStop\n";
2191 if(shouldWeStop_)
return true;
2192 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2196 exceptionMessageFiles_ =
message;
2200 exceptionMessageRuns_ =
message;
2204 exceptionMessageLumis_ =
message;
2207 bool EventProcessor::alreadyHandlingException()
const {
2208 return alreadyHandlingException_;
2211 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2212 if(iMachine.get() != 0) {
2213 if(!iMachine->terminated()) {
2214 forceLooperToEnd_ =
true;
2216 forceLooperToEnd_ =
false;
2219 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2221 if(iMachine->terminated()) {
2222 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
T getParameter(std::string const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ActivityRegistry::PostProcessEvent postProcessEventSignal_
virtual std::string explainSelf() const
ParameterSetID id() const
static ThreadSafeRegistry * instance()
boost::shared_ptr< EDLooperBase > looper_
std::unique_ptr< ActionTable const > act_table_
Timestamp const & endTime() const
int numberOfForkedChildren_
EventID const & id() const
bool lastOperationSucceeded() const
void call(boost::function< void(void)>)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
boost::shared_ptr< ActivityRegistry > actReg_
LuminosityBlockNumber_t luminosityBlock() const
void installCustomHandler(int signum, CFUNC func)
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< ProcessConfiguration > processConfiguration_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet)
Timestamp const & time() const
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
bool continueAfterChildFailure_
std::string last_error_text_
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
StateSentry(EventProcessor *ep)
std::unique_ptr< HistoryAppender > historyAppender_
ParameterSet const & getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
void stdToEDM(std::exception const &e)
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
volatile event_processor::State state_
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
std::unique_ptr< SignallingProductRegistry > preg_
void connectSigs(EventProcessor *ep)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
void clearEventPrincipal()
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
boost::condition starter_
edm::ProcessHistoryID const & processHistoryID() const
volatile bool shutdown_flag
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
ActivityRegistry::PreProcessEvent preProcessEventSignal_
collection_type & data()
Provide access to the contained collection.
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
void validate(ParameterSet &pset, std::string const &moduleLabel) const
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
bool luminosityBlockPrincipalPtrValid()
ParameterSet const & registerIt()
static ComponentFactory< T > * get()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
tuple size
Write out results.
Transition requestedTransition() const
T get(const Candidate &c)
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)