63 #include "boost/thread/xtime.hpp"
77 #include <sys/types.h>
79 #include <sys/socket.h>
80 #include <sys/select.h>
81 #include <sys/fcntl.h>
94 class SendSourceTerminationSignalIfException {
98 ~SendSourceTerminationSignalIfException() {
103 void completedSuccessfully() {
115 std::unique_ptr<InputSource>
119 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
120 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
121 std::shared_ptr<ActivityRegistry>
areg,
125 if(main_input == 0) {
127 <<
"There must be exactly one source in the configuration.\n"
128 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
133 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
136 filler->fill(descriptions);
140 descriptions.validate(*main_input,
std::string(
"source"));
144 std::ostringstream ost;
145 ost <<
"Validating configuration of input source of type " << modtype;
161 processConfiguration.get(),
168 areg->preSourceConstructionSignal_(
md);
169 std::unique_ptr<InputSource>
input;
172 std::shared_ptr<int> sentry(
nullptr,[areg,&
md](
void*){areg->postSourceConstructionSignal_(
md);});
178 std::ostringstream ost;
179 ost <<
"Constructing input source of type " << modtype;
187 boost::shared_ptr<EDLooperBase>
191 boost::shared_ptr<EDLooperBase> vLooper;
193 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
195 if(loopers.size() == 0) {
199 assert(1 == loopers.size());
201 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
219 std::vector<std::string>
const& defaultServices,
220 std::vector<std::string>
const& forcedServices) :
223 branchIDListHelper_(),
226 espController_(new eventsetup::EventSetupsController),
229 processConfiguration_(),
235 deferredExceptionPtrIsSet_(
false),
237 beginJobCalled_(
false),
238 shouldWeStop_(
false),
239 stateMachineWasInErrorState_(
false),
242 exceptionMessageFiles_(),
243 exceptionMessageRuns_(),
244 exceptionMessageLumis_(),
245 alreadyHandlingException_(
false),
246 forceLooperToEnd_(
false),
247 looperBeginJobRun_(
false),
248 forceESCacheClearOnNewRun_(
false),
249 numberOfForkedChildren_(0),
250 numberOfSequentialEventsPerChild_(1),
251 setCpuAffinity_(
false),
252 eventSetupDataToExcludeFromPrefetching_() {
254 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
255 processDesc->addServices(defaultServices, forcedServices);
256 init(processDesc, iToken, iLegacy);
260 std::vector<std::string>
const& defaultServices,
261 std::vector<std::string>
const& forcedServices) :
264 branchIDListHelper_(),
267 espController_(new eventsetup::EventSetupsController),
270 processConfiguration_(),
276 deferredExceptionPtrIsSet_(
false),
278 beginJobCalled_(
false),
279 shouldWeStop_(
false),
280 stateMachineWasInErrorState_(
false),
283 exceptionMessageFiles_(),
284 exceptionMessageRuns_(),
285 exceptionMessageLumis_(),
286 alreadyHandlingException_(
false),
287 forceLooperToEnd_(
false),
288 looperBeginJobRun_(
false),
289 forceESCacheClearOnNewRun_(
false),
290 numberOfForkedChildren_(0),
291 numberOfSequentialEventsPerChild_(1),
292 setCpuAffinity_(
false),
293 asyncStopRequestedWhileProcessingEvents_(
false),
294 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
295 eventSetupDataToExcludeFromPrefetching_()
298 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
299 processDesc->addServices(defaultServices, forcedServices);
308 branchIDListHelper_(),
311 espController_(new eventsetup::EventSetupsController),
314 processConfiguration_(),
320 deferredExceptionPtrIsSet_(
false),
322 beginJobCalled_(
false),
323 shouldWeStop_(
false),
324 stateMachineWasInErrorState_(
false),
327 exceptionMessageFiles_(),
328 exceptionMessageRuns_(),
329 exceptionMessageLumis_(),
330 alreadyHandlingException_(
false),
331 forceLooperToEnd_(
false),
332 looperBeginJobRun_(
false),
333 forceESCacheClearOnNewRun_(
false),
334 numberOfForkedChildren_(0),
335 numberOfSequentialEventsPerChild_(1),
336 setCpuAffinity_(
false),
337 asyncStopRequestedWhileProcessingEvents_(
false),
338 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
339 eventSetupDataToExcludeFromPrefetching_()
341 init(processDesc, token, legacy);
348 branchIDListHelper_(),
351 espController_(new eventsetup::EventSetupsController),
354 processConfiguration_(),
360 deferredExceptionPtrIsSet_(
false),
362 beginJobCalled_(
false),
363 shouldWeStop_(
false),
364 stateMachineWasInErrorState_(
false),
367 exceptionMessageFiles_(),
368 exceptionMessageRuns_(),
369 exceptionMessageLumis_(),
370 alreadyHandlingException_(
false),
371 forceLooperToEnd_(
false),
372 looperBeginJobRun_(
false),
373 forceESCacheClearOnNewRun_(
false),
374 numberOfForkedChildren_(0),
375 numberOfSequentialEventsPerChild_(1),
376 setCpuAffinity_(
false),
377 asyncStopRequestedWhileProcessingEvents_(
false),
378 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
379 eventSetupDataToExcludeFromPrefetching_()
383 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
387 auto processDesc = std::make_shared<ProcessDesc>(
config);
405 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
417 unsigned int nThreads=1;
418 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
419 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
427 unsigned int nStreams =1;
428 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
429 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
437 unsigned int nConcurrentRuns =1;
443 unsigned int nConcurrentLumis =1;
469 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
470 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
474 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
475 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
483 std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
492 handler->willBeUsingThreads();
496 std::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
531 FDEBUG(2) << parameterSet << std::endl;
537 ep->preModuleDelayedGetSignal_.connect(std::cref(
actReg_->preModuleEventDelayedGetSignal_));
538 ep->postModuleDelayedGetSignal_.connect(std::cref(
actReg_->postModuleEventDelayedGetSignal_));
542 if(subProcessParameterSet) {
589 actReg_->preallocateSignal_(bounds);
613 ex.
addContext(
"Calling beginJob for the source");
619 actReg_->postBeginJobSignal_();
630 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
643 c.
call([actReg](){actReg->preEndJobSignal_();});
652 c.
call([actReg](){actReg->postEndJobSignal_();});
667 volatile bool child_failed =
false;
668 volatile unsigned int num_children_done = 0;
669 volatile int child_fail_exit_status = 0;
670 volatile int child_fail_signal = 0;
676 void ep_sigchld(
int, siginfo_t*,
void*) {
680 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
683 if(WIFEXITED(stat_loc)) {
685 if(0 != WEXITSTATUS(stat_loc)) {
686 child_fail_exit_status = WEXITSTATUS(stat_loc);
690 if(WIFSIGNALED(stat_loc)) {
692 child_fail_signal = WTERMSIG(stat_loc);
695 p = waitpid(-1, &stat_loc, WNOHANG);
710 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
712 while(numberOfChildren != 0) {
714 numberOfChildren /= 10;
724 class MessageSenderToSource {
726 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
730 const std::vector<int>& m_childrenPipes;
731 long const m_nEventsToProcess;
733 unsigned int m_aliveChildren;
737 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
738 std::vector<int>
const& childrenPipes,
739 long iNEventsToProcess):
740 m_childrenPipes(childrenPipes),
741 m_nEventsToProcess(iNEventsToProcess),
742 m_aliveChildren(childrenSockets.
size()),
745 FD_ZERO(&m_socketSet);
746 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
748 FD_SET(*it, &m_socketSet);
753 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
755 FD_SET(*it, &m_socketSet);
780 MessageSenderToSource::operator()() {
781 multicore::MessageForParent childMsg;
782 LogInfo(
"ForkingController") <<
"I am controller";
785 multicore::MessageForSource sndmsg;
786 sndmsg.startIndex = 0;
787 sndmsg.nIndices = m_nEventsToProcess;
790 fd_set readSockets, errorSockets;
792 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
793 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
796 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
798 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
807 if (FD_ISSET(
idx, &errorSockets)) {
808 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
809 FD_CLR(
idx, &m_socketSet);
812 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
820 if (!FD_ISSET(
idx, &readSockets)) {
826 bool is_pipe =
false;
827 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
831 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
834 FD_CLR(
idx, &m_socketSet);
842 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
844 FD_CLR(
idx, &m_socketSet);
853 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
855 FD_CLR(
idx, &m_socketSet);
860 sndmsg.startIndex += sndmsg.nIndices;
864 }
while (m_aliveChildren > 0);
872 void EventProcessor::possiblyContinueAfterForkChildFailure() {
873 if(child_failed && continueAfterChildFailure_) {
874 if (child_fail_signal) {
875 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
877 }
else if (child_fail_exit_status) {
878 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
879 child_fail_exit_status=0;
881 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
890 if(0 == numberOfForkedChildren_) {
return true;}
891 assert(0<numberOfForkedChildren_);
899 itemType = input_->nextItemType();
901 assert(itemType == InputSource::IsFile);
905 itemType = input_->nextItemType();
906 assert(itemType == InputSource::IsRun);
908 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
910 input_->runAuxiliary()->beginTime());
911 espController_->eventSetupForInstance(ts);
915 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
917 std::vector<eventsetup::DataKey> dataKeys;
918 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
923 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
925 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
926 excludedData = &(itExcludeRec->second);
927 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
935 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
936 itDataKey != itDataKeyEnd;
939 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
940 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
944 recordPtr->
doGet(*itDataKey);
952 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
957 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
960 actReg_->preForkReleaseResourcesSignal_();
961 input_->doPreForkReleaseResources();
962 schedule_->preForkReleaseResources();
967 unsigned int childIndex = 0;
968 unsigned int const kMaxChildren = numberOfForkedChildren_;
969 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
970 std::vector<pid_t> childrenIds;
971 childrenIds.reserve(kMaxChildren);
972 std::vector<int> childrenSockets;
973 childrenSockets.reserve(kMaxChildren);
974 std::vector<int> childrenPipes;
975 childrenPipes.reserve(kMaxChildren);
976 std::vector<int> childrenSocketsCopy;
977 childrenSocketsCopy.reserve(kMaxChildren);
978 std::vector<int> childrenPipesCopy;
979 childrenPipesCopy.reserve(kMaxChildren);
986 int sockets[2], fd_flags;
987 for(; childIndex < kMaxChildren; ++childIndex) {
989 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
990 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
994 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
998 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
999 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1004 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1005 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1008 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1009 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1012 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1013 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1018 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1019 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1022 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1023 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1027 childrenPipesCopy = childrenPipes;
1028 childrenSocketsCopy = childrenSockets;
1030 pid_t
value = fork();
1036 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1039 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1046 std::stringstream stout;
1047 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1048 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1049 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1051 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1052 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1055 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1056 if(setCpuAffinity_) {
1064 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1066 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1069 CPU_SET(childIndex, &mask);
1070 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1071 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1083 LogError(
"ForkingChild") <<
"failed to create a child";
1086 childrenIds.push_back(value);
1087 childrenSockets.push_back(sockets[0]);
1088 childrenPipes.push_back(pipes[0]);
1091 if(childIndex < kMaxChildren) {
1092 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1093 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1095 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1096 input_->doPostForkReacquireResources(receiver);
1097 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1102 jobReport->parentAfterFork(jobReportFile);
1112 sigset_t blockingSigSet;
1113 sigset_t unblockingSigSet;
1115 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1116 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1117 sigaddset(&blockingSigSet, SIGCHLD);
1118 sigaddset(&blockingSigSet, SIGUSR2);
1119 sigaddset(&blockingSigSet, SIGINT);
1120 sigdelset(&unblockingSigSet, SIGCHLD);
1121 sigdelset(&unblockingSigSet, SIGUSR2);
1122 sigdelset(&unblockingSigSet, SIGINT);
1123 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1127 bool too_many_fds =
false;
1128 if (pipes[1]+1 > FD_SETSIZE) {
1129 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1130 too_many_fds =
true;
1136 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1137 boost::thread senderThread(sender);
1139 if(not too_many_fds) {
1142 possiblyContinueAfterForkChildFailure();
1143 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1144 sigsuspend(&unblockingSigSet);
1145 possiblyContinueAfterForkChildFailure();
1146 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1149 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1151 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1153 LogError(
"ForkingStopping") <<
"child failed";
1156 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1159 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1160 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1161 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1162 it != itEnd; ++it) {
1165 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1166 while(num_children_done != kMaxChildren) {
1167 sigsuspend(&unblockingSigSet);
1169 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1172 senderThread.join();
1173 if(child_failed && !continueAfterChildFailure_) {
1174 if (child_fail_signal) {
1175 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1176 }
else if (child_fail_exit_status) {
1177 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1179 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1183 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1188 std::vector<ModuleDescription const*>
1189 EventProcessor::getAllModuleDescriptions()
const {
1190 return schedule_->getAllModuleDescriptions();
1194 EventProcessor::totalEvents()
const {
1195 return schedule_->totalEvents();
1199 EventProcessor::totalEventsPassed()
const {
1200 return schedule_->totalEventsPassed();
1204 EventProcessor::totalEventsFailed()
const {
1205 return schedule_->totalEventsFailed();
1209 EventProcessor::enableEndPaths(
bool active) {
1210 schedule_->enableEndPaths(active);
1214 EventProcessor::endPathsEnabled()
const {
1215 return schedule_->endPathsEnabled();
1220 schedule_->getTriggerReport(rep);
1224 EventProcessor::clearCounters() {
1225 schedule_->clearCounters();
1229 std::auto_ptr<statemachine::Machine>
1230 EventProcessor::createStateMachine() {
1237 << fileMode_ <<
".\n"
1238 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1248 << emptyRunLumiMode_ <<
".\n"
1249 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1256 machine->initiate();
1262 bool returnValue =
false;
1267 returnCode = epSignal;
1274 EventProcessor::runToCompletion() {
1277 asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1278 std::auto_ptr<statemachine::Machine> machine;
1283 stateMachineWasInErrorState_ =
false;
1288 machine = createStateMachine();
1289 nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1290 asyncStopRequestedWhileProcessingEvents_=
false;
1299 if(numberOfForkedChildren_ > 0) {
1300 size_t size = preg_->size();
1302 SendSourceTerminationSignalIfException sentry(actReg_.get());
1303 more = input_->skipForForking();
1304 sentry.completedSuccessfully();
1307 if(size < preg_->
size()) {
1308 principalCache_.adjustIndexesAfterProductRegistryAddition();
1310 principalCache_.adjustEventsToNewProductRegistry(preg_);
1314 SendSourceTerminationSignalIfException sentry(actReg_.get());
1315 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1316 sentry.completedSuccessfully();
1319 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1321 if(checkForAsyncStopRequest(returnCode)) {
1322 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1323 forceLooperToEnd_ =
true;
1325 forceLooperToEnd_ =
false;
1329 if(itemType == InputSource::IsEvent) {
1331 if(asyncStopRequestedWhileProcessingEvents_) {
1332 forceLooperToEnd_ =
true;
1334 forceLooperToEnd_ =
false;
1335 returnCode = asyncStopStatusCodeFromProcessingEvents_;
1338 itemType = nextItemTypeFromProcessingEvents_;
1341 if(itemType == InputSource::IsEvent) {
1343 else if(itemType == InputSource::IsStop) {
1346 else if(itemType == InputSource::IsFile) {
1349 else if(itemType == InputSource::IsRun) {
1350 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1352 else if(itemType == InputSource::IsLumi) {
1355 else if(itemType == InputSource::IsSynchronize) {
1361 <<
"Unknown next item type passed to EventProcessor\n"
1362 <<
"Please report this error to the Framework group\n";
1364 if(machine->terminated()) {
1416 alreadyHandlingException_ =
true;
1417 terminateMachine(machine);
1418 alreadyHandlingException_ =
false;
1419 if (!exceptionMessageLumis_.empty()) {
1422 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1425 if (!exceptionMessageRuns_.empty()) {
1428 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1431 if (!exceptionMessageFiles_.empty()) {
1434 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1440 if(machine->terminated()) {
1441 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1445 if(stateMachineWasInErrorState_) {
1447 <<
"The boost state machine in the EventProcessor exited after\n"
1448 <<
"entering the Error state.\n";
1452 if(machine.get() != 0) {
1453 terminateMachine(machine);
1455 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1456 <<
"Please report this error to the Framework group\n";
1462 void EventProcessor::readFile() {
1463 FDEBUG(1) <<
" \treadFile\n";
1464 size_t size = preg_->size();
1465 SendSourceTerminationSignalIfException sentry(actReg_.get());
1467 fb_ = input_->readFile();
1468 if(size < preg_->
size()) {
1469 principalCache_.adjustIndexesAfterProductRegistryAddition();
1471 principalCache_.adjustEventsToNewProductRegistry(preg_);
1472 if((numberOfForkedChildren_ > 0)
or
1473 (preallocations_.numberOfStreams()>1 and
1474 preallocations_.numberOfThreads()>1)) {
1475 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1477 sentry.completedSuccessfully();
1480 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1481 if (fb_.get() !=
nullptr) {
1482 SendSourceTerminationSignalIfException sentry(actReg_.get());
1483 input_->closeFile(fb_.get(), cleaningUpAfterException);
1484 sentry.completedSuccessfully();
1486 FDEBUG(1) <<
"\tcloseInputFile\n";
1489 void EventProcessor::openOutputFiles() {
1490 if (fb_.get() !=
nullptr) {
1491 schedule_->openOutputFiles(*fb_);
1492 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1494 FDEBUG(1) <<
"\topenOutputFiles\n";
1497 void EventProcessor::closeOutputFiles() {
1498 if (fb_.get() !=
nullptr) {
1499 schedule_->closeOutputFiles();
1500 if(hasSubProcess()) subProcess_->closeOutputFiles();
1502 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1505 void EventProcessor::respondToOpenInputFile() {
1506 if(hasSubProcess()) {
1507 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1509 if (fb_.get() !=
nullptr) {
1510 schedule_->respondToOpenInputFile(*fb_);
1511 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1513 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1516 void EventProcessor::respondToCloseInputFile() {
1517 if (fb_.get() !=
nullptr) {
1518 schedule_->respondToCloseInputFile(*fb_);
1519 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1521 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1524 void EventProcessor::startingNewLoop() {
1525 shouldWeStop_ =
false;
1528 if(looper_ && looperBeginJobRun_) {
1529 looper_->doStartingNewLoop();
1531 FDEBUG(1) <<
"\tstartingNewLoop\n";
1534 bool EventProcessor::endOfLoop() {
1537 looper_->setModuleChanger(&changer);
1539 looper_->setModuleChanger(
nullptr);
1540 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1543 FDEBUG(1) <<
"\tendOfLoop\n";
1547 void EventProcessor::rewindInput() {
1550 FDEBUG(1) <<
"\trewind\n";
1553 void EventProcessor::prepareForNextLoop() {
1554 looper_->prepareForNextLoop(esp_.get());
1555 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1558 bool EventProcessor::shouldWeCloseOutput()
const {
1559 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1560 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1563 void EventProcessor::doErrorStuff() {
1564 FDEBUG(1) <<
"\tdoErrorStuff\n";
1566 <<
"The EventProcessor state machine encountered an unexpected event\n"
1567 <<
"and went to the error state\n"
1568 <<
"Will attempt to terminate processing normally\n"
1569 <<
"(IF using the looper the next loop will be attempted)\n"
1570 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1571 stateMachineWasInErrorState_ =
true;
1577 SendSourceTerminationSignalIfException sentry(actReg_.get());
1579 input_->doBeginRun(runPrincipal, &processContext_);
1580 sentry.completedSuccessfully();
1585 if(forceESCacheClearOnNewRun_){
1586 espController_->forceCacheClear();
1589 SendSourceTerminationSignalIfException sentry(actReg_.get());
1590 espController_->eventSetupForInstance(ts);
1591 sentry.completedSuccessfully();
1594 if(looper_ && looperBeginJobRun_==
false) {
1596 looper_->beginOfJob(es);
1597 looperBeginJobRun_ =
true;
1598 looper_->doStartingNewLoop();
1602 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1603 if(hasSubProcess()) {
1604 subProcess_->doBeginRun(runPrincipal, ts);
1609 looper_->doBeginRun(runPrincipal, es, &processContext_);
1613 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1614 schedule_->processOneStream<Traits>(
i,runPrincipal, es);
1615 if(hasSubProcess()) {
1616 subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1629 SendSourceTerminationSignalIfException sentry(actReg_.get());
1631 input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1632 sentry.completedSuccessfully();
1635 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1638 SendSourceTerminationSignalIfException sentry(actReg_.get());
1639 espController_->eventSetupForInstance(ts);
1640 sentry.completedSuccessfully();
1644 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1646 schedule_->processOneStream<Traits>(
i,runPrincipal, es, cleaningUpAfterException);
1647 if(hasSubProcess()) {
1648 subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1658 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1659 if(hasSubProcess()) {
1660 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1665 looper_->doEndRun(runPrincipal, es, &processContext_);
1672 SendSourceTerminationSignalIfException sentry(actReg_.get());
1674 input_->doBeginLumi(lumiPrincipal, &processContext_);
1675 sentry.completedSuccessfully();
1681 rng->preBeginLumi(lb);
1688 SendSourceTerminationSignalIfException sentry(actReg_.get());
1689 espController_->eventSetupForInstance(ts);
1690 sentry.completedSuccessfully();
1695 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1696 if(hasSubProcess()) {
1697 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1700 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1702 looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1705 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1707 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es);
1708 if(hasSubProcess()) {
1709 subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1713 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1722 SendSourceTerminationSignalIfException sentry(actReg_.get());
1724 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1725 sentry.completedSuccessfully();
1732 SendSourceTerminationSignalIfException sentry(actReg_.get());
1733 espController_->eventSetupForInstance(ts);
1734 sentry.completedSuccessfully();
1738 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1740 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es, cleaningUpAfterException);
1741 if(hasSubProcess()) {
1742 subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1746 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1752 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1753 if(hasSubProcess()) {
1754 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1757 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1759 looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1764 if (principalCache_.hasRunPrincipal()) {
1766 <<
"EventProcessor::readRun\n"
1767 <<
"Illegal attempt to insert run into cache\n"
1768 <<
"Contact a Framework Developer\n";
1770 auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1772 SendSourceTerminationSignalIfException sentry(actReg_.get());
1773 input_->readRun(*rp, *historyAppender_);
1774 sentry.completedSuccessfully();
1776 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1777 principalCache_.insert(rp);
1782 principalCache_.merge(input_->runAuxiliary(), preg_);
1783 auto runPrincipal =principalCache_.runPrincipalPtr();
1785 SendSourceTerminationSignalIfException sentry(actReg_.get());
1786 input_->readAndMergeRun(*runPrincipal);
1787 sentry.completedSuccessfully();
1789 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1790 return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1793 int EventProcessor::readLuminosityBlock() {
1794 if (principalCache_.hasLumiPrincipal()) {
1796 <<
"EventProcessor::readRun\n"
1797 <<
"Illegal attempt to insert lumi into cache\n"
1798 <<
"Contact a Framework Developer\n";
1800 if (!principalCache_.hasRunPrincipal()) {
1802 <<
"EventProcessor::readRun\n"
1803 <<
"Illegal attempt to insert lumi into cache\n"
1804 <<
"Run is invalid\n"
1805 <<
"Contact a Framework Developer\n";
1807 auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1809 SendSourceTerminationSignalIfException sentry(actReg_.get());
1810 input_->readLuminosityBlock(*lbp, *historyAppender_);
1811 sentry.completedSuccessfully();
1813 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1814 principalCache_.insert(lbp);
1815 return input_->luminosityBlock();
1818 int EventProcessor::readAndMergeLumi() {
1819 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1821 SendSourceTerminationSignalIfException sentry(actReg_.get());
1822 input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1823 sentry.completedSuccessfully();
1825 return input_->luminosityBlock();
1841 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1842 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1843 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1847 principalCache_.deleteLumi(phid, run, lumi);
1848 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1849 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1855 unsigned int iStreamIndex,
1856 std::atomic<bool>* iFinishedProcessingEvents,
1857 tbb::task* iWaitTask):
1859 m_streamID(iStreamIndex),
1860 m_finishedProcessingEvents(iFinishedProcessingEvents),
1861 m_waitTask(iWaitTask){}
1864 m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1865 m_waitTask->decrement_ref_count();
1875 void EventProcessor::processEventsForStreamAsync(
unsigned int iStreamIndex,
1876 std::atomic<bool>* finishedProcessingEvents) {
1880 if(preallocations_.numberOfStreams()>1) {
1882 handler->initializeThisThreadForUse();
1885 if(iStreamIndex==0) {
1889 if(shouldWeStop()) {
1892 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1902 std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1904 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1910 auto sr = input_->resourceSharedWithDelayedReader();
1911 std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1913 delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1916 if (InputSource::IsEvent !=itemType) {
1917 nextItemTypeFromProcessingEvents_ = itemType;
1918 finishedProcessingEvents->store(
true,std::memory_order_release);
1922 if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1924 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1927 readEvent(iStreamIndex);
1930 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1933 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1940 bool expected =
false;
1941 if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,
true)) {
1942 deferredExceptionPtr_ = std::current_exception();
1948 void EventProcessor::readAndProcessEvent() {
1949 if(numberOfForkedChildren_>0) {
1954 nextItemTypeFromProcessingEvents_ = InputSource::IsEvent;
1955 asyncStopRequestedWhileProcessingEvents_ =
false;
1957 std::atomic<bool> finishedProcessingEvents{
false};
1963 tbb::task* eventLoopWaitTask{
new (tbb::task::allocate_root()) tbb::empty_task{}};
1964 eventLoopWaitTask->increment_ref_count();
1966 const unsigned int kNumStreams = preallocations_.numberOfStreams();
1967 unsigned int iStreamIndex = 0;
1968 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1969 eventLoopWaitTask->increment_ref_count();
1970 tbb::task::enqueue( *(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1973 eventLoopWaitTask->increment_ref_count();
1974 eventLoopWaitTask->spawn_and_wait_for_all(*(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1978 if(deferredExceptionPtrIsSet_) {
1979 std::rethrow_exception(deferredExceptionPtr_);
1982 void EventProcessor::readEvent(
unsigned int iStreamIndex) {
1984 auto&
event = principalCache_.eventPrincipal(iStreamIndex);
1987 SendSourceTerminationSignalIfException sentry(actReg_.get());
1988 input_->readEvent(
event, streamContext);
1989 sentry.completedSuccessfully();
1991 FDEBUG(1) <<
"\treadEvent\n";
1994 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1995 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1999 rng->postEventRead(ev);
2001 assert(pep->luminosityBlockPrincipalPtrValid());
2002 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2003 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2011 schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2012 if(hasSubProcess()) {
2013 subProcess_->doEvent(*pep);
2019 bool randomAccess = input_->randomAccess();
2027 StreamContext streamContext(pep->streamID(), &processContext_);
2028 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2033 input_->skipEvents(-2);
2041 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2045 FDEBUG(1) <<
"\tprocessEvent\n";
2046 pep->clearEventPrincipal();
2049 bool EventProcessor::shouldWeStop()
const {
2050 FDEBUG(1) <<
"\tshouldWeStop\n";
2051 if(shouldWeStop_)
return true;
2052 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2056 exceptionMessageFiles_ =
message;
2060 exceptionMessageRuns_ =
message;
2064 exceptionMessageLumis_ =
message;
2067 bool EventProcessor::alreadyHandlingException()
const {
2068 return alreadyHandlingException_;
2071 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2072 if(iMachine.get() != 0) {
2073 if(!iMachine->terminated()) {
2074 forceLooperToEnd_ =
true;
2076 forceLooperToEnd_ =
false;
2079 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2081 if(iMachine->terminated()) {
2082 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
Timestamp const & endTime() const
unsigned int numberOfRuns() const
int numberOfForkedChildren_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
std::shared_ptr< ProcessConfiguration > processConfiguration_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
bool continueAfterChildFailure_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::shared_ptr< edm::ParameterSet > parameterSet()
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
std::unique_ptr< SignallingProductRegistry > preg_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
int maxSecondsUntilRampdown_