61 #include "boost/bind.hpp"
62 #include "boost/thread/xtime.hpp"
74 #include <sys/types.h>
76 #include <sys/socket.h>
77 #include <sys/select.h>
78 #include <sys/fcntl.h>
88 namespace event_processor {
102 using namespace event_processor;
105 template <
typename T>
106 class ScheduleSignalSentry {
109 a_(a), principal_(principal), es_(es) {
110 if (a_) T::preScheduleSignal(a_, principal_);
112 ~ScheduleSignalSentry() {
113 if (a_)
if (principal_) T::postScheduleSignal(a_, principal_, es_);
119 typename T::MyPrincipal* principal_;
129 char const* stateNames[] = {
144 char const* msgNames[] = {
260 boost::shared_ptr<InputSource>
265 boost::shared_ptr<ActivityRegistry> areg,
266 boost::shared_ptr<ProcessConfiguration> processConfiguration) {
268 if(main_input == 0) {
270 <<
"There must be exactly one source in the configuration.\n"
271 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
274 std::string modtype(main_input->
getParameter<std::string>(
"@module_type"));
276 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
279 filler->fill(descriptions);
283 descriptions.
validate(*main_input, std::string(
"source"));
293 std::ostringstream ost;
294 ost <<
"Validating configuration of input source of type " << modtype;
310 processConfiguration.get());
313 areg->preSourceConstructionSignal_(md);
314 boost::shared_ptr<InputSource>
input;
327 areg->postSourceConstructionSignal_(md);
328 std::ostringstream ost;
329 ost <<
"Constructing input source of type " << modtype;
333 areg->postSourceConstructionSignal_(md);
338 boost::shared_ptr<EDLooperBase>
342 boost::shared_ptr<EDLooperBase> vLooper;
344 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
346 if(loopers.size() == 0) {
350 assert(1 == loopers.size());
352 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
370 std::vector<std::string>
const& defaultServices,
371 std::vector<std::string>
const& forcedServices) :
372 preProcessEventSignal_(),
373 postProcessEventSignal_(),
381 processConfiguration_(),
401 shouldWeStop_(
false),
402 stateMachineWasInErrorState_(
false),
405 exceptionMessageFiles_(),
406 exceptionMessageRuns_(),
407 exceptionMessageLumis_(),
408 alreadyHandlingException_(
false),
409 forceLooperToEnd_(
false),
410 looperBeginJobRun_(
false),
411 forceESCacheClearOnNewRun_(
false),
412 numberOfForkedChildren_(0),
413 numberOfSequentialEventsPerChild_(1),
414 setCpuAffinity_(
false),
415 eventSetupDataToExcludeFromPrefetching_() {
417 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
418 processDesc->addServices(defaultServices, forcedServices);
419 init(processDesc, iToken, iLegacy);
423 std::vector<std::string>
const& defaultServices,
424 std::vector<std::string>
const& forcedServices) :
425 preProcessEventSignal_(),
426 postProcessEventSignal_(),
434 processConfiguration_(),
454 shouldWeStop_(
false),
455 stateMachineWasInErrorState_(
false),
458 exceptionMessageFiles_(),
459 exceptionMessageRuns_(),
460 exceptionMessageLumis_(),
461 alreadyHandlingException_(
false),
462 forceLooperToEnd_(
false),
463 looperBeginJobRun_(
false),
464 forceESCacheClearOnNewRun_(
false),
465 numberOfForkedChildren_(0),
466 numberOfSequentialEventsPerChild_(1),
467 setCpuAffinity_(
false),
468 eventSetupDataToExcludeFromPrefetching_() {
470 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
471 processDesc->addServices(defaultServices, forcedServices);
478 preProcessEventSignal_(),
479 postProcessEventSignal_(),
487 processConfiguration_(),
507 shouldWeStop_(
false),
508 stateMachineWasInErrorState_(
false),
511 exceptionMessageFiles_(),
512 exceptionMessageRuns_(),
513 exceptionMessageLumis_(),
514 alreadyHandlingException_(
false),
515 forceLooperToEnd_(
false),
516 looperBeginJobRun_(
false),
517 forceESCacheClearOnNewRun_(
false),
518 numberOfForkedChildren_(0),
519 numberOfSequentialEventsPerChild_(1),
520 setCpuAffinity_(
false),
521 eventSetupDataToExcludeFromPrefetching_() {
522 init(processDesc, token, legacy);
527 preProcessEventSignal_(),
528 postProcessEventSignal_(),
536 processConfiguration_(),
556 shouldWeStop_(
false),
557 stateMachineWasInErrorState_(
false),
560 exceptionMessageFiles_(),
561 exceptionMessageRuns_(),
562 exceptionMessageLumis_(),
563 alreadyHandlingException_(
false),
564 forceLooperToEnd_(
false),
565 looperBeginJobRun_(
false),
566 forceESCacheClearOnNewRun_(
false),
567 numberOfForkedChildren_(0),
568 numberOfSequentialEventsPerChild_(1),
569 setCpuAffinity_(
false),
570 eventSetupDataToExcludeFromPrefetching_() {
573 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(parameterSet));
577 boost::shared_ptr<ProcessDesc> processDesc(
new ProcessDesc(config));
592 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
600 fileMode_ = optionsPset.getUntrackedParameter<std::string>(
"fileMode",
"");
601 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>(
"emptyRunLumiMode",
"");
606 setCpuAffinity_ = forking.getUntrackedParameter<
bool>(
"setCpuAffinity",
false);
607 std::vector<ParameterSet>
const& excluded = forking.getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
608 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
612 std::make_pair(itPS->getUntrackedParameter<std::string>(
"type",
"*"),
613 itPS->getUntrackedParameter<std::string>(
"label",
"")));
621 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
629 boost::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
653 FDEBUG(2) << parameterSet << std::endl;
657 if(subProcessParameterSet) {
697 psetRegistry->
data().clear();
744 input_->skipEvents(numberToSkip);
790 ex.
addContext(
"Calling beginJob for the source");
796 actReg_->postBeginJobSignal_();
802 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
834 volatile bool child_failed =
false;
835 volatile unsigned int num_children_done = 0;
836 volatile int child_fail_exit_status = 0;
837 volatile int child_fail_signal = 0;
840 void ep_sigchld(
int, siginfo_t*,
void*) {
844 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
847 if(WIFEXITED(stat_loc)) {
849 if(0 != WEXITSTATUS(stat_loc)) {
850 child_fail_exit_status = WEXITSTATUS(stat_loc);
854 if(WIFSIGNALED(stat_loc)) {
856 child_fail_signal = WTERMSIG(stat_loc);
859 p = waitpid(-1, &stat_loc, WNOHANG);
874 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
876 while(numberOfChildren != 0) {
878 numberOfChildren /= 10;
888 class MessageSenderToSource {
890 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
894 const std::vector<int>& m_childrenPipes;
895 long const m_nEventsToProcess;
897 unsigned int m_aliveChildren;
901 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
902 std::vector<int>
const& childrenPipes,
903 long iNEventsToProcess):
904 m_childrenPipes(childrenPipes),
905 m_nEventsToProcess(iNEventsToProcess),
906 m_aliveChildren(childrenSockets.
size()),
909 FD_ZERO(&m_socketSet);
910 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
912 FD_SET(*it, &m_socketSet);
917 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
919 FD_SET(*it, &m_socketSet);
944 MessageSenderToSource::operator()() {
945 multicore::MessageForParent childMsg;
946 LogInfo(
"ForkingController") <<
"I am controller";
949 multicore::MessageForSource sndmsg;
950 sndmsg.startIndex = 0;
951 sndmsg.nIndices = m_nEventsToProcess;
954 fd_set readSockets, errorSockets;
956 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
957 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
960 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
962 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
968 for (
int idx=0; idx<m_maxFd; idx++) {
971 if (FD_ISSET(idx, &errorSockets)) {
972 LogInfo(
"ForkingController") <<
"Error on socket " << idx;
973 FD_CLR(idx, &m_socketSet);
976 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
984 if (!FD_ISSET(idx, &readSockets)) {
990 bool is_pipe =
false;
991 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
995 while (((rc =
read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
998 FD_CLR(idx, &m_socketSet);
1006 while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
1008 FD_CLR(idx, &m_socketSet);
1017 while (((rc = send(idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
1019 FD_CLR(idx, &m_socketSet);
1024 sndmsg.startIndex += sndmsg.nIndices;
1028 }
while (m_aliveChildren > 0);
1036 EventProcessor::forkProcess(std::string
const& jobReportFile) {
1038 if(0 == numberOfForkedChildren_) {
return true;}
1039 assert(0<numberOfForkedChildren_);
1047 itemType = input_->nextItemType();
1049 assert(itemType == InputSource::IsFile);
1053 itemType = input_->nextItemType();
1054 assert(itemType == InputSource::IsRun);
1056 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
1058 input_->runAuxiliary()->beginTime());
1059 espController_->eventSetupForInstance(ts);
1063 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1065 std::vector<eventsetup::DataKey> dataKeys;
1066 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1071 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
1073 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1074 excludedData = &(itExcludeRec->second);
1075 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
1080 if(0 != recordPtr) {
1083 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1084 itDataKey != itDataKeyEnd;
1087 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1088 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
1092 recordPtr->
doGet(*itDataKey);
1100 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
1105 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1108 actReg_->preForkReleaseResourcesSignal_();
1109 input_->doPreForkReleaseResources();
1110 schedule_->preForkReleaseResources();
1115 unsigned int childIndex = 0;
1116 unsigned int const kMaxChildren = numberOfForkedChildren_;
1117 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1118 std::vector<pid_t> childrenIds;
1119 childrenIds.reserve(kMaxChildren);
1120 std::vector<int> childrenSockets;
1121 childrenSockets.reserve(kMaxChildren);
1122 std::vector<int> childrenPipes;
1123 childrenPipes.reserve(kMaxChildren);
1124 std::vector<int> childrenSocketsCopy;
1125 childrenSocketsCopy.reserve(kMaxChildren);
1126 std::vector<int> childrenPipesCopy;
1127 childrenPipesCopy.reserve(kMaxChildren);
1134 int sockets[2], fd_flags;
1135 for(; childIndex < kMaxChildren; ++childIndex) {
1137 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1138 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1142 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1146 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1147 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1152 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1153 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1156 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1157 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1160 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1161 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1166 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1167 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1170 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1171 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1175 childrenPipesCopy = childrenPipes;
1176 childrenSocketsCopy = childrenSockets;
1178 pid_t
value = fork();
1184 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1187 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1194 std::stringstream stout;
1195 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1196 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1197 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1199 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1200 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1203 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1204 if(setCpuAffinity_) {
1212 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1214 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1217 CPU_SET(childIndex, &mask);
1218 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1219 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1231 LogError(
"ForkingChild") <<
"failed to create a child";
1234 childrenIds.push_back(value);
1235 childrenSockets.push_back(sockets[0]);
1236 childrenPipes.push_back(pipes[0]);
1239 if(childIndex < kMaxChildren) {
1240 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1241 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1244 input_->doPostForkReacquireResources(receiver);
1245 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1250 jobReport->parentAfterFork(jobReportFile);
1260 sigset_t blockingSigSet;
1261 sigset_t unblockingSigSet;
1263 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1264 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1265 sigaddset(&blockingSigSet, SIGCHLD);
1266 sigaddset(&blockingSigSet, SIGUSR2);
1267 sigaddset(&blockingSigSet, SIGINT);
1268 sigdelset(&unblockingSigSet, SIGCHLD);
1269 sigdelset(&unblockingSigSet, SIGUSR2);
1270 sigdelset(&unblockingSigSet, SIGINT);
1271 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1275 bool too_many_fds =
false;
1276 if (pipes[1]+1 > FD_SETSIZE) {
1277 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1278 too_many_fds =
true;
1284 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1285 boost::thread senderThread(sender);
1287 while(!too_many_fds && !
shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
1288 sigsuspend(&unblockingSigSet);
1289 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1291 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1293 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1295 LogError(
"ForkingStopping") <<
"child failed";
1298 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1301 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1302 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1303 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1304 it != itEnd; ++it) {
1307 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1308 while(num_children_done != kMaxChildren) {
1309 sigsuspend(&unblockingSigSet);
1311 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1314 senderThread.join();
1316 if (child_fail_signal) {
1317 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1318 }
else if (child_fail_exit_status) {
1319 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1321 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1325 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1339 std::vector<ModuleDescription const*>
1340 EventProcessor::getAllModuleDescriptions()
const {
1341 return schedule_->getAllModuleDescriptions();
1345 EventProcessor::totalEvents()
const {
1346 return schedule_->totalEvents();
1350 EventProcessor::totalEventsPassed()
const {
1351 return schedule_->totalEventsPassed();
1355 EventProcessor::totalEventsFailed()
const {
1356 return schedule_->totalEventsFailed();
1360 EventProcessor::enableEndPaths(
bool active) {
1361 schedule_->enableEndPaths(active);
1365 EventProcessor::endPathsEnabled()
const {
1366 return schedule_->endPathsEnabled();
1371 schedule_->getTriggerReport(rep);
1375 EventProcessor::clearCounters() {
1376 schedule_->clearCounters();
1379 char const* EventProcessor::currentStateName()
const {
1380 return stateName(getState());
1383 char const* EventProcessor::stateName(
State s)
const {
1384 return stateNames[
s];
1387 char const* EventProcessor::msgName(
Msg m)
const {
1391 State EventProcessor::getState()
const {
1403 if(runNumber == 0) {
1406 <<
"EventProcessor::setRunNumber was called with an invalid run number (0)\n"
1407 <<
"Run number was set to 1 instead\n";
1415 input_->setRunNumber(runNumber);
1429 EventProcessor::waitForAsyncCompletion(
unsigned int timeout_seconds) {
1431 boost::xtime timeout;
1432 boost::xtime_get(&timeout, boost::TIME_UTC);
1433 timeout.sec += timeout_seconds;
1439 boost::mutex::scoped_lock sl(stop_lock_);
1442 if(stop_count_ < 0)
return last_rc_;
1444 if(timeout_seconds == 0) {
1445 while(stop_count_ == 0) stopper_.wait(sl);
1447 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) ==
true);
1457 if(id_set_) pthread_cancel(event_loop_id_);
1461 <<
"An asynchronous request was made to shut down "
1462 <<
"the event loop "
1463 <<
"and the event loop did not shutdown after "
1464 << timeout_seconds <<
" seconds\n";
1466 event_loop_->join();
1467 event_loop_.reset();
1472 return rc ==
false ? epTimedOut : last_rc_;
1476 EventProcessor::waitTillDoneAsync(
unsigned int timeout_value_secs) {
1477 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1486 StatusCode rc = waitForAsyncCompletion(secs);
1487 if(rc != epTimedOut) changeState(
mFinished);
1494 StatusCode rc = waitForAsyncCompletion(secs);
1495 if(rc != epTimedOut) changeState(
mFinished);
1500 void EventProcessor::errorState() {
1510 return waitForAsyncCompletion(60*2);
1516 boost::mutex::scoped_lock sl(state_lock_);
1517 State curr = state_;
1523 (curr !=
table[rc].current ||
1524 (curr ==
table[rc].current &&
1530 <<
"A member function of EventProcessor has been called in an"
1531 <<
" inappropriate order.\n"
1532 <<
"Bad transition from " << stateName(curr) <<
" "
1533 <<
"using message " << msgName(msg) <<
"\n"
1534 <<
"No where to go from here.\n";
1536 FDEBUG(1) <<
"changeState: current=" << stateName(curr)
1537 <<
", message=" << msgName(msg)
1538 <<
" -> new=" << stateName(
table[rc].
final) <<
"\n";
1540 state_ =
table[rc].final;
1543 void EventProcessor::runAsync() {
1546 boost::mutex::scoped_lock sl(stop_lock_);
1547 if(id_set_ ==
true) {
1548 std::string err(
"runAsync called while async event loop already running\n");
1556 last_rc_ = epSuccess;
1557 event_loop_.reset(
new boost::thread(boost::bind(EventProcessor::asyncRun,
this)));
1558 boost::xtime timeout;
1559 boost::xtime_get(&timeout, boost::TIME_UTC);
1561 if(starter_.timed_wait(sl, timeout) ==
false) {
1564 <<
"Async run thread did not start in 60 seconds\n";
1579 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1581 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1582 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1592 FDEBUG(2) <<
"asyncRun starting ......................\n";
1595 bool onlineStateTransitions =
true;
1599 LogError(
"FwkJob") <<
"cms::Exception caught in "
1600 <<
"EventProcessor::asyncRun"
1606 LogError(
"FwkJob") <<
"Standard library exception caught in "
1607 <<
"EventProcessor::asyncRun"
1613 LogError(
"FwkJob") <<
"Unknown exception caught in "
1614 <<
"EventProcessor::asyncRun"
1624 boost::mutex::scoped_lock sl(me->
stop_lock_);
1628 FDEBUG(2) <<
"asyncRun ending ......................\n";
1633 EventProcessor::runToCompletion(
bool onlineStateTransitions) {
1637 int numberOfEventsToProcess = -1;
1640 if(machine_.get() != 0) {
1642 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1643 <<
"Please report this error to the Framework group\n";
1652 EventProcessor::runEventCount(
int numberOfEventsToProcess) {
1656 bool onlineStateTransitions =
false;
1665 EventProcessor::runCommon(
bool onlineStateTransitions,
int numberOfEventsToProcess) {
1668 boost::shared_ptr<EventPrincipal> ep(
new EventPrincipal(preg_, *processConfiguration_, historyAppender_.get()));
1669 principalCache_.insert(ep);
1673 if(!onlineStateTransitions) changeState(
mRunCount);
1676 stateMachineWasInErrorState_ =
false;
1681 if(machine_.get() == 0) {
1689 << fileMode_ <<
".\n"
1690 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1700 << emptyRunLumiMode_ <<
".\n"
1701 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1708 machine_->initiate();
1720 itemType = input_->nextItemType();
1722 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1732 FDEBUG(1) <<
"In main processing loop, encountered sStopping state\n";
1733 forceLooperToEnd_ =
true;
1735 forceLooperToEnd_ =
false;
1739 FDEBUG(1) <<
"In main processing loop, encountered sShuttingDown state\n";
1740 forceLooperToEnd_ =
true;
1742 forceLooperToEnd_ =
false;
1748 boost::mutex::scoped_lock sl(
usr2_lock);
1751 returnCode = epSignal;
1752 forceLooperToEnd_ =
true;
1754 forceLooperToEnd_ =
false;
1759 if(itemType == InputSource::IsStop) {
1762 else if(itemType == InputSource::IsFile) {
1765 else if(itemType == InputSource::IsRun) {
1766 machine_->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1768 else if(itemType == InputSource::IsLumi) {
1771 else if(itemType == InputSource::IsEvent) {
1774 if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
1775 returnCode = epCountComplete;
1777 FDEBUG(1) <<
"Event count complete, pausing event loop\n";
1784 <<
"Unknown next item type passed to EventProcessor\n"
1785 <<
"Please report this error to the Framework group\n";
1788 if(machine_->terminated()) {
1847 alreadyHandlingException_ =
true;
1849 alreadyHandlingException_ =
false;
1850 if (!exceptionMessageLumis_.empty()) {
1853 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1856 if (!exceptionMessageRuns_.empty()) {
1859 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1862 if (!exceptionMessageFiles_.empty()) {
1865 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1871 if(machine_->terminated()) {
1872 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1876 if(!onlineStateTransitions) changeState(
mFinished);
1878 if(stateMachineWasInErrorState_) {
1880 <<
"The boost state machine in the EventProcessor exited after\n"
1881 <<
"entering the Error state.\n";
1888 FDEBUG(1) <<
" \treadFile\n";
1889 fb_ = input_->readFile();
1890 if(numberOfForkedChildren_ > 0) {
1891 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1895 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1896 if (fb_.get() != 0) {
1897 input_->closeFile(fb_, cleaningUpAfterException);
1899 FDEBUG(1) <<
"\tcloseInputFile\n";
1902 void EventProcessor::openOutputFiles() {
1903 if (fb_.get() != 0) {
1904 schedule_->openOutputFiles(*fb_);
1905 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1907 FDEBUG(1) <<
"\topenOutputFiles\n";
1910 void EventProcessor::closeOutputFiles() {
1911 if (fb_.get() != 0) {
1912 schedule_->closeOutputFiles();
1913 if(hasSubProcess()) subProcess_->closeOutputFiles();
1915 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1918 void EventProcessor::respondToOpenInputFile() {
1919 if (fb_.get() != 0) {
1920 schedule_->respondToOpenInputFile(*fb_);
1921 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1923 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1926 void EventProcessor::respondToCloseInputFile() {
1927 if (fb_.get() != 0) {
1928 schedule_->respondToCloseInputFile(*fb_);
1929 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1931 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1934 void EventProcessor::respondToOpenOutputFiles() {
1935 if (fb_.get() != 0) {
1936 schedule_->respondToOpenOutputFiles(*fb_);
1937 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1939 FDEBUG(1) <<
"\trespondToOpenOutputFiles\n";
1942 void EventProcessor::respondToCloseOutputFiles() {
1943 if (fb_.get() != 0) {
1944 schedule_->respondToCloseOutputFiles(*fb_);
1945 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1947 FDEBUG(1) <<
"\trespondToCloseOutputFiles\n";
1950 void EventProcessor::startingNewLoop() {
1951 shouldWeStop_ =
false;
1954 if(looper_ && looperBeginJobRun_) {
1955 looper_->doStartingNewLoop();
1957 FDEBUG(1) <<
"\tstartingNewLoop\n";
1960 bool EventProcessor::endOfLoop() {
1963 looper_->setModuleChanger(&changer);
1965 looper_->setModuleChanger(0);
1966 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1969 FDEBUG(1) <<
"\tendOfLoop\n";
1973 void EventProcessor::rewindInput() {
1976 FDEBUG(1) <<
"\trewind\n";
1979 void EventProcessor::prepareForNextLoop() {
1980 looper_->prepareForNextLoop(esp_.get());
1981 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1984 bool EventProcessor::shouldWeCloseOutput()
const {
1985 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1986 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1989 void EventProcessor::doErrorStuff() {
1990 FDEBUG(1) <<
"\tdoErrorStuff\n";
1992 <<
"The EventProcessor state machine encountered an unexpected event\n"
1993 <<
"and went to the error state\n"
1994 <<
"Will attempt to terminate processing normally\n"
1995 <<
"(IF using the looper the next loop will be attempted)\n"
1996 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1997 stateMachineWasInErrorState_ =
true;
2002 input_->doBeginRun(runPrincipal);
2005 if(forceESCacheClearOnNewRun_){
2006 espController_->forceCacheClear();
2008 espController_->eventSetupForInstance(ts);
2010 if(looper_ && looperBeginJobRun_==
false) {
2012 looper_->beginOfJob(es);
2013 looperBeginJobRun_ =
true;
2014 looper_->doStartingNewLoop();
2018 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2019 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
2020 if(hasSubProcess()) {
2021 subProcess_->doBeginRun(runPrincipal, ts);
2026 looper_->doBeginRun(runPrincipal, es);
2032 input_->doEndRun(runPrincipal, cleaningUpAfterException);
2033 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
2035 espController_->eventSetupForInstance(ts);
2039 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2040 schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2041 if(hasSubProcess()) {
2042 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2047 looper_->doEndRun(runPrincipal, es);
2053 input_->doBeginLumi(lumiPrincipal);
2058 rng->preBeginLumi(lb);
2064 espController_->eventSetupForInstance(ts);
2068 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2069 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2070 if(hasSubProcess()) {
2071 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2074 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
2076 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2082 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2087 espController_->eventSetupForInstance(ts);
2091 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2092 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2093 if(hasSubProcess()) {
2094 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2097 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
2099 looper_->doEndLuminosityBlock(lumiPrincipal, es);
2104 input_->readAndCacheRun(merge, *historyAppender_);
2109 int EventProcessor::readAndCacheLumi(
bool merge) {
2110 input_->readAndCacheLumi(merge, *historyAppender_);
2112 return input_->luminosityBlock();
2128 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2129 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2130 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
2134 principalCache_.deleteLumi(phid, run, lumi);
2135 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2136 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
2139 void EventProcessor::readAndProcessEvent() {
2140 EventPrincipal *pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
2141 FDEBUG(1) <<
"\treadEvent\n";
2145 espController_->eventSetupForInstance(ts);
2149 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2150 schedule_->processOneOccurrence<Traits>(*pep, es);
2151 if(hasSubProcess()) {
2152 subProcess_->doEvent(*pep, ts);
2157 bool randomAccess = input_->randomAccess();
2164 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2169 input_->skipEvents(-2);
2177 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2181 FDEBUG(1) <<
"\tprocessEvent\n";
2185 bool EventProcessor::shouldWeStop()
const {
2186 FDEBUG(1) <<
"\tshouldWeStop\n";
2187 if(shouldWeStop_)
return true;
2188 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2191 void EventProcessor::setExceptionMessageFiles(std::string&
message) {
2192 exceptionMessageFiles_ =
message;
2195 void EventProcessor::setExceptionMessageRuns(std::string&
message) {
2196 exceptionMessageRuns_ =
message;
2199 void EventProcessor::setExceptionMessageLumis(std::string&
message) {
2200 exceptionMessageLumis_ =
message;
2203 bool EventProcessor::alreadyHandlingException()
const {
2204 return alreadyHandlingException_;
2207 void EventProcessor::terminateMachine() {
2208 if(machine_.get() != 0) {
2209 if(!machine_->terminated()) {
2210 forceLooperToEnd_ =
true;
2212 forceLooperToEnd_ =
false;
2215 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2217 if(machine_->terminated()) {
2218 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
T getParameter(std::string const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
StatusCode skip(int numberToSkip)
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ActivityRegistry::PostProcessEvent postProcessEventSignal_
virtual std::string explainSelf() const
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
static void clearRegistries()
Timestamp const & endTime() const
int numberOfForkedChildren_
EventID const & id() const
bool lastOperationSucceeded() const
void call(boost::function< void(void)>)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
static ThreadSafeRegistry * instance()
boost::shared_ptr< ActivityRegistry > actReg_
virtual StatusCode runEventCount(int numberOfEventsToProcess)
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< ProcessConfiguration > processConfiguration_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet)
Timestamp const & time() const
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::shared_ptr< ActionTable const > act_table_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
StateSentry(EventProcessor *ep)
ParameterSet const & getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
void stdToEDM(std::exception const &e)
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
volatile event_processor::State state_
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
boost::shared_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, PrincipalCache &pCache, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration)
boost::shared_ptr< ProcessConfiguration > processConfiguration_
void connectSigs(EventProcessor *ep)
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
volatile pthread_t event_loop_id_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
void clearEventPrincipal()
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
bool forceESCacheClearOnNewRun_
boost::condition starter_
author Stefano ARGIRO author Bill Tanenbaum
boost::shared_ptr< ActionTable const > act_table_
edm::ProcessHistoryID const & processHistoryID() const
volatile bool shutdown_flag
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
ActivityRegistry::PreProcessEvent preProcessEventSignal_
collection_type & data()
Provide access to the contained collection.
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
void validate(ParameterSet &pset, std::string const &moduleLabel) const
ParameterSet const & registerIt()
bool insert(Storage &, ItemType *, const IdTag &)
static ComponentFactory< T > * get()
tuple size
Write out results.
Transition requestedTransition() const
T get(const Candidate &c)
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)