63 #include "boost/thread/xtime.hpp"
77 #include <sys/types.h>
79 #include <sys/socket.h>
80 #include <sys/select.h>
81 #include <sys/fcntl.h>
90 #include "Cintex/Cintex.h"
98 class SendSourceTerminationSignalIfException {
102 ~SendSourceTerminationSignalIfException() {
107 void completedSuccessfully() {
119 std::unique_ptr<InputSource>
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ActivityRegistry>
areg,
128 if(main_input == 0) {
130 <<
"There must be exactly one source in the configuration.\n"
131 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
136 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
139 filler->fill(descriptions);
143 descriptions.validate(*main_input,
std::string(
"source"));
147 std::ostringstream ost;
148 ost <<
"Validating configuration of input source of type " << modtype;
164 processConfiguration.get(),
168 areg->preSourceConstructionSignal_(
md);
169 std::unique_ptr<InputSource>
input;
172 std::shared_ptr<int> sentry(
nullptr,[areg,&
md](
void*){areg->postSourceConstructionSignal_(
md);});
178 std::ostringstream ost;
179 ost <<
"Constructing input source of type " << modtype;
187 boost::shared_ptr<EDLooperBase>
191 boost::shared_ptr<EDLooperBase> vLooper;
193 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
195 if(loopers.size() == 0) {
199 assert(1 == loopers.size());
201 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
219 std::vector<std::string>
const& defaultServices,
220 std::vector<std::string>
const& forcedServices) :
223 branchIDListHelper_(),
226 espController_(new eventsetup::EventSetupsController),
229 processConfiguration_(),
235 deferredExceptionPtrIsSet_(
false),
237 beginJobCalled_(
false),
238 shouldWeStop_(
false),
239 stateMachineWasInErrorState_(
false),
242 exceptionMessageFiles_(),
243 exceptionMessageRuns_(),
244 exceptionMessageLumis_(),
245 alreadyHandlingException_(
false),
246 forceLooperToEnd_(
false),
247 looperBeginJobRun_(
false),
248 forceESCacheClearOnNewRun_(
false),
249 numberOfForkedChildren_(0),
250 numberOfSequentialEventsPerChild_(1),
251 setCpuAffinity_(
false),
252 eventSetupDataToExcludeFromPrefetching_() {
254 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
255 processDesc->addServices(defaultServices, forcedServices);
256 init(processDesc, iToken, iLegacy);
260 std::vector<std::string>
const& defaultServices,
261 std::vector<std::string>
const& forcedServices) :
264 branchIDListHelper_(),
267 espController_(new eventsetup::EventSetupsController),
270 processConfiguration_(),
276 deferredExceptionPtrIsSet_(
false),
278 beginJobCalled_(
false),
279 shouldWeStop_(
false),
280 stateMachineWasInErrorState_(
false),
283 exceptionMessageFiles_(),
284 exceptionMessageRuns_(),
285 exceptionMessageLumis_(),
286 alreadyHandlingException_(
false),
287 forceLooperToEnd_(
false),
288 looperBeginJobRun_(
false),
289 forceESCacheClearOnNewRun_(
false),
290 numberOfForkedChildren_(0),
291 numberOfSequentialEventsPerChild_(1),
292 setCpuAffinity_(
false),
293 asyncStopRequestedWhileProcessingEvents_(
false),
294 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
295 eventSetupDataToExcludeFromPrefetching_()
298 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
299 processDesc->addServices(defaultServices, forcedServices);
308 branchIDListHelper_(),
311 espController_(new eventsetup::EventSetupsController),
314 processConfiguration_(),
320 deferredExceptionPtrIsSet_(
false),
322 beginJobCalled_(
false),
323 shouldWeStop_(
false),
324 stateMachineWasInErrorState_(
false),
327 exceptionMessageFiles_(),
328 exceptionMessageRuns_(),
329 exceptionMessageLumis_(),
330 alreadyHandlingException_(
false),
331 forceLooperToEnd_(
false),
332 looperBeginJobRun_(
false),
333 forceESCacheClearOnNewRun_(
false),
334 numberOfForkedChildren_(0),
335 numberOfSequentialEventsPerChild_(1),
336 setCpuAffinity_(
false),
337 asyncStopRequestedWhileProcessingEvents_(
false),
338 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
339 eventSetupDataToExcludeFromPrefetching_()
341 init(processDesc, token, legacy);
348 branchIDListHelper_(),
351 espController_(new eventsetup::EventSetupsController),
354 processConfiguration_(),
360 deferredExceptionPtrIsSet_(
false),
362 beginJobCalled_(
false),
363 shouldWeStop_(
false),
364 stateMachineWasInErrorState_(
false),
367 exceptionMessageFiles_(),
368 exceptionMessageRuns_(),
369 exceptionMessageLumis_(),
370 alreadyHandlingException_(
false),
371 forceLooperToEnd_(
false),
372 looperBeginJobRun_(
false),
373 forceESCacheClearOnNewRun_(
false),
374 numberOfForkedChildren_(0),
375 numberOfSequentialEventsPerChild_(1),
376 setCpuAffinity_(
false),
377 asyncStopRequestedWhileProcessingEvents_(
false),
378 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
379 eventSetupDataToExcludeFromPrefetching_()
383 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
387 auto processDesc = std::make_shared<ProcessDesc>(
config);
399 ROOT::Cintex::Cintex::Enable();
407 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
419 unsigned int nThreads=1;
420 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
421 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
429 unsigned int nStreams =1;
430 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
431 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
439 unsigned int nConcurrentRuns =1;
445 unsigned int nConcurrentLumis =1;
471 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
472 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
476 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
477 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
485 std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
494 handler->willBeUsingThreads();
498 std::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
532 FDEBUG(2) << parameterSet << std::endl;
538 ep->preModuleDelayedGetSignal_.connect(std::cref(
actReg_->preModuleEventDelayedGetSignal_));
539 ep->postModuleDelayedGetSignal_.connect(std::cref(
actReg_->postModuleEventDelayedGetSignal_));
543 if(subProcessParameterSet) {
589 actReg_->preallocateSignal_(bounds);
610 ex.
addContext(
"Calling beginJob for the source");
616 actReg_->postBeginJobSignal_();
627 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
648 c.
call([actReg](){actReg->postEndJobSignal_();});
663 volatile bool child_failed =
false;
664 volatile unsigned int num_children_done = 0;
665 volatile int child_fail_exit_status = 0;
666 volatile int child_fail_signal = 0;
672 void ep_sigchld(
int, siginfo_t*,
void*) {
676 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
679 if(WIFEXITED(stat_loc)) {
681 if(0 != WEXITSTATUS(stat_loc)) {
682 child_fail_exit_status = WEXITSTATUS(stat_loc);
686 if(WIFSIGNALED(stat_loc)) {
688 child_fail_signal = WTERMSIG(stat_loc);
691 p = waitpid(-1, &stat_loc, WNOHANG);
706 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
708 while(numberOfChildren != 0) {
710 numberOfChildren /= 10;
720 class MessageSenderToSource {
722 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
726 const std::vector<int>& m_childrenPipes;
727 long const m_nEventsToProcess;
729 unsigned int m_aliveChildren;
733 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
734 std::vector<int>
const& childrenPipes,
735 long iNEventsToProcess):
736 m_childrenPipes(childrenPipes),
737 m_nEventsToProcess(iNEventsToProcess),
738 m_aliveChildren(childrenSockets.
size()),
741 FD_ZERO(&m_socketSet);
742 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
744 FD_SET(*it, &m_socketSet);
749 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
751 FD_SET(*it, &m_socketSet);
776 MessageSenderToSource::operator()() {
777 multicore::MessageForParent childMsg;
778 LogInfo(
"ForkingController") <<
"I am controller";
781 multicore::MessageForSource sndmsg;
782 sndmsg.startIndex = 0;
783 sndmsg.nIndices = m_nEventsToProcess;
786 fd_set readSockets, errorSockets;
788 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
789 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
792 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
794 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
803 if (FD_ISSET(
idx, &errorSockets)) {
804 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
805 FD_CLR(
idx, &m_socketSet);
808 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
816 if (!FD_ISSET(
idx, &readSockets)) {
822 bool is_pipe =
false;
823 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
827 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
830 FD_CLR(
idx, &m_socketSet);
838 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
840 FD_CLR(
idx, &m_socketSet);
849 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
851 FD_CLR(
idx, &m_socketSet);
856 sndmsg.startIndex += sndmsg.nIndices;
860 }
while (m_aliveChildren > 0);
868 void EventProcessor::possiblyContinueAfterForkChildFailure() {
869 if(child_failed && continueAfterChildFailure_) {
870 if (child_fail_signal) {
871 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
873 }
else if (child_fail_exit_status) {
874 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
875 child_fail_exit_status=0;
877 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
886 if(0 == numberOfForkedChildren_) {
return true;}
887 assert(0<numberOfForkedChildren_);
895 itemType = input_->nextItemType();
897 assert(itemType == InputSource::IsFile);
901 itemType = input_->nextItemType();
902 assert(itemType == InputSource::IsRun);
904 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
906 input_->runAuxiliary()->beginTime());
907 espController_->eventSetupForInstance(ts);
911 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
913 std::vector<eventsetup::DataKey> dataKeys;
914 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
919 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
921 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
922 excludedData = &(itExcludeRec->second);
923 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
931 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
932 itDataKey != itDataKeyEnd;
935 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
936 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
940 recordPtr->
doGet(*itDataKey);
948 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
953 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
956 actReg_->preForkReleaseResourcesSignal_();
957 input_->doPreForkReleaseResources();
958 schedule_->preForkReleaseResources();
963 unsigned int childIndex = 0;
964 unsigned int const kMaxChildren = numberOfForkedChildren_;
965 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
966 std::vector<pid_t> childrenIds;
967 childrenIds.reserve(kMaxChildren);
968 std::vector<int> childrenSockets;
969 childrenSockets.reserve(kMaxChildren);
970 std::vector<int> childrenPipes;
971 childrenPipes.reserve(kMaxChildren);
972 std::vector<int> childrenSocketsCopy;
973 childrenSocketsCopy.reserve(kMaxChildren);
974 std::vector<int> childrenPipesCopy;
975 childrenPipesCopy.reserve(kMaxChildren);
982 int sockets[2], fd_flags;
983 for(; childIndex < kMaxChildren; ++childIndex) {
985 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
986 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
990 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
994 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
995 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1000 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1001 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1004 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1005 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1008 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1009 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1014 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1015 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1018 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1019 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1023 childrenPipesCopy = childrenPipes;
1024 childrenSocketsCopy = childrenSockets;
1026 pid_t
value = fork();
1032 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1035 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1042 std::stringstream stout;
1043 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1044 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1045 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1047 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1048 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1051 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1052 if(setCpuAffinity_) {
1060 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1062 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1065 CPU_SET(childIndex, &mask);
1066 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1067 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1079 LogError(
"ForkingChild") <<
"failed to create a child";
1082 childrenIds.push_back(value);
1083 childrenSockets.push_back(sockets[0]);
1084 childrenPipes.push_back(pipes[0]);
1087 if(childIndex < kMaxChildren) {
1088 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1089 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1091 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1092 input_->doPostForkReacquireResources(receiver);
1093 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1098 jobReport->parentAfterFork(jobReportFile);
1108 sigset_t blockingSigSet;
1109 sigset_t unblockingSigSet;
1111 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1112 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1113 sigaddset(&blockingSigSet, SIGCHLD);
1114 sigaddset(&blockingSigSet, SIGUSR2);
1115 sigaddset(&blockingSigSet, SIGINT);
1116 sigdelset(&unblockingSigSet, SIGCHLD);
1117 sigdelset(&unblockingSigSet, SIGUSR2);
1118 sigdelset(&unblockingSigSet, SIGINT);
1119 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1123 bool too_many_fds =
false;
1124 if (pipes[1]+1 > FD_SETSIZE) {
1125 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1126 too_many_fds =
true;
1132 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1133 boost::thread senderThread(sender);
1135 if(not too_many_fds) {
1138 possiblyContinueAfterForkChildFailure();
1139 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1140 sigsuspend(&unblockingSigSet);
1141 possiblyContinueAfterForkChildFailure();
1142 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1145 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1147 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1149 LogError(
"ForkingStopping") <<
"child failed";
1152 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1155 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1156 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1157 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1158 it != itEnd; ++it) {
1161 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1162 while(num_children_done != kMaxChildren) {
1163 sigsuspend(&unblockingSigSet);
1165 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1168 senderThread.join();
1169 if(child_failed && !continueAfterChildFailure_) {
1170 if (child_fail_signal) {
1171 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1172 }
else if (child_fail_exit_status) {
1173 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1175 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1179 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1184 std::vector<ModuleDescription const*>
1185 EventProcessor::getAllModuleDescriptions()
const {
1186 return schedule_->getAllModuleDescriptions();
1190 EventProcessor::totalEvents()
const {
1191 return schedule_->totalEvents();
1195 EventProcessor::totalEventsPassed()
const {
1196 return schedule_->totalEventsPassed();
1200 EventProcessor::totalEventsFailed()
const {
1201 return schedule_->totalEventsFailed();
1205 EventProcessor::enableEndPaths(
bool active) {
1206 schedule_->enableEndPaths(active);
1210 EventProcessor::endPathsEnabled()
const {
1211 return schedule_->endPathsEnabled();
1216 schedule_->getTriggerReport(rep);
1220 EventProcessor::clearCounters() {
1221 schedule_->clearCounters();
1225 std::auto_ptr<statemachine::Machine>
1226 EventProcessor::createStateMachine() {
1233 << fileMode_ <<
".\n"
1234 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1244 << emptyRunLumiMode_ <<
".\n"
1245 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1252 machine->initiate();
1258 bool returnValue =
false;
1263 returnCode = epSignal;
1270 EventProcessor::runToCompletion() {
1273 asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1274 std::auto_ptr<statemachine::Machine> machine;
1279 stateMachineWasInErrorState_ =
false;
1284 machine = createStateMachine();
1285 nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1286 asyncStopRequestedWhileProcessingEvents_=
false;
1295 if(numberOfForkedChildren_ > 0) {
1296 size_t size = preg_->size();
1298 SendSourceTerminationSignalIfException sentry(actReg_.get());
1299 more = input_->skipForForking();
1300 sentry.completedSuccessfully();
1303 if(size < preg_->
size()) {
1304 principalCache_.adjustIndexesAfterProductRegistryAddition();
1306 principalCache_.adjustEventsToNewProductRegistry(preg_);
1310 SendSourceTerminationSignalIfException sentry(actReg_.get());
1311 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1312 sentry.completedSuccessfully();
1315 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1317 if(checkForAsyncStopRequest(returnCode)) {
1318 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1319 forceLooperToEnd_ =
true;
1321 forceLooperToEnd_ =
false;
1325 if(itemType == InputSource::IsEvent) {
1327 if(asyncStopRequestedWhileProcessingEvents_) {
1328 forceLooperToEnd_ =
true;
1330 forceLooperToEnd_ =
false;
1331 returnCode = asyncStopStatusCodeFromProcessingEvents_;
1334 itemType = nextItemTypeFromProcessingEvents_;
1337 if(itemType == InputSource::IsEvent) {
1339 else if(itemType == InputSource::IsStop) {
1342 else if(itemType == InputSource::IsFile) {
1345 else if(itemType == InputSource::IsRun) {
1346 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1348 else if(itemType == InputSource::IsLumi) {
1351 else if(itemType == InputSource::IsSynchronize) {
1357 <<
"Unknown next item type passed to EventProcessor\n"
1358 <<
"Please report this error to the Framework group\n";
1360 if(machine->terminated()) {
1412 alreadyHandlingException_ =
true;
1413 terminateMachine(machine);
1414 alreadyHandlingException_ =
false;
1415 if (!exceptionMessageLumis_.empty()) {
1418 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1421 if (!exceptionMessageRuns_.empty()) {
1424 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1427 if (!exceptionMessageFiles_.empty()) {
1430 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1436 if(machine->terminated()) {
1437 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1441 if(stateMachineWasInErrorState_) {
1443 <<
"The boost state machine in the EventProcessor exited after\n"
1444 <<
"entering the Error state.\n";
1448 if(machine.get() != 0) {
1449 terminateMachine(machine);
1451 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1452 <<
"Please report this error to the Framework group\n";
1458 void EventProcessor::readFile() {
1459 FDEBUG(1) <<
" \treadFile\n";
1460 size_t size = preg_->size();
1461 SendSourceTerminationSignalIfException sentry(actReg_.get());
1463 fb_ = input_->readFile();
1464 if(size < preg_->
size()) {
1465 principalCache_.adjustIndexesAfterProductRegistryAddition();
1467 principalCache_.adjustEventsToNewProductRegistry(preg_);
1468 if((numberOfForkedChildren_ > 0)
or
1469 (preallocations_.numberOfStreams()>1 and
1470 preallocations_.numberOfThreads()>1)) {
1471 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1473 sentry.completedSuccessfully();
1476 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1477 if (fb_.get() !=
nullptr) {
1478 SendSourceTerminationSignalIfException sentry(actReg_.get());
1479 input_->closeFile(fb_.get(), cleaningUpAfterException);
1480 sentry.completedSuccessfully();
1482 FDEBUG(1) <<
"\tcloseInputFile\n";
1485 void EventProcessor::openOutputFiles() {
1486 if (fb_.get() !=
nullptr) {
1487 schedule_->openOutputFiles(*fb_);
1488 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1490 FDEBUG(1) <<
"\topenOutputFiles\n";
1493 void EventProcessor::closeOutputFiles() {
1494 if (fb_.get() !=
nullptr) {
1495 schedule_->closeOutputFiles();
1496 if(hasSubProcess()) subProcess_->closeOutputFiles();
1498 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1501 void EventProcessor::respondToOpenInputFile() {
1502 if(hasSubProcess()) {
1503 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1505 if (fb_.get() !=
nullptr) {
1506 schedule_->respondToOpenInputFile(*fb_);
1507 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1509 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1512 void EventProcessor::respondToCloseInputFile() {
1513 if (fb_.get() !=
nullptr) {
1514 schedule_->respondToCloseInputFile(*fb_);
1515 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1517 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1520 void EventProcessor::startingNewLoop() {
1521 shouldWeStop_ =
false;
1524 if(looper_ && looperBeginJobRun_) {
1525 looper_->doStartingNewLoop();
1527 FDEBUG(1) <<
"\tstartingNewLoop\n";
1530 bool EventProcessor::endOfLoop() {
1533 looper_->setModuleChanger(&changer);
1535 looper_->setModuleChanger(
nullptr);
1536 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1539 FDEBUG(1) <<
"\tendOfLoop\n";
1543 void EventProcessor::rewindInput() {
1546 FDEBUG(1) <<
"\trewind\n";
1549 void EventProcessor::prepareForNextLoop() {
1550 looper_->prepareForNextLoop(esp_.get());
1551 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1554 bool EventProcessor::shouldWeCloseOutput()
const {
1555 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1556 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1559 void EventProcessor::doErrorStuff() {
1560 FDEBUG(1) <<
"\tdoErrorStuff\n";
1562 <<
"The EventProcessor state machine encountered an unexpected event\n"
1563 <<
"and went to the error state\n"
1564 <<
"Will attempt to terminate processing normally\n"
1565 <<
"(IF using the looper the next loop will be attempted)\n"
1566 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1567 stateMachineWasInErrorState_ =
true;
1573 SendSourceTerminationSignalIfException sentry(actReg_.get());
1575 input_->doBeginRun(runPrincipal, &processContext_);
1576 sentry.completedSuccessfully();
1581 if(forceESCacheClearOnNewRun_){
1582 espController_->forceCacheClear();
1585 SendSourceTerminationSignalIfException sentry(actReg_.get());
1586 espController_->eventSetupForInstance(ts);
1587 sentry.completedSuccessfully();
1590 if(looper_ && looperBeginJobRun_==
false) {
1592 looper_->beginOfJob(es);
1593 looperBeginJobRun_ =
true;
1594 looper_->doStartingNewLoop();
1598 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1599 if(hasSubProcess()) {
1600 subProcess_->doBeginRun(runPrincipal, ts);
1605 looper_->doBeginRun(runPrincipal, es, &processContext_);
1609 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1610 schedule_->processOneStream<Traits>(
i,runPrincipal, es);
1611 if(hasSubProcess()) {
1612 subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1625 SendSourceTerminationSignalIfException sentry(actReg_.get());
1627 input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1628 sentry.completedSuccessfully();
1631 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1634 SendSourceTerminationSignalIfException sentry(actReg_.get());
1635 espController_->eventSetupForInstance(ts);
1636 sentry.completedSuccessfully();
1640 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1642 schedule_->processOneStream<Traits>(
i,runPrincipal, es, cleaningUpAfterException);
1643 if(hasSubProcess()) {
1644 subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1654 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1655 if(hasSubProcess()) {
1656 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1661 looper_->doEndRun(runPrincipal, es, &processContext_);
1668 SendSourceTerminationSignalIfException sentry(actReg_.get());
1670 input_->doBeginLumi(lumiPrincipal, &processContext_);
1671 sentry.completedSuccessfully();
1677 rng->preBeginLumi(lb);
1684 SendSourceTerminationSignalIfException sentry(actReg_.get());
1685 espController_->eventSetupForInstance(ts);
1686 sentry.completedSuccessfully();
1691 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1692 if(hasSubProcess()) {
1693 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1696 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1698 looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1701 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1703 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es);
1704 if(hasSubProcess()) {
1705 subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1709 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1718 SendSourceTerminationSignalIfException sentry(actReg_.get());
1720 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1721 sentry.completedSuccessfully();
1728 SendSourceTerminationSignalIfException sentry(actReg_.get());
1729 espController_->eventSetupForInstance(ts);
1730 sentry.completedSuccessfully();
1734 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1736 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es, cleaningUpAfterException);
1737 if(hasSubProcess()) {
1738 subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1742 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1748 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1749 if(hasSubProcess()) {
1750 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1753 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1755 looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1760 if (principalCache_.hasRunPrincipal()) {
1762 <<
"EventProcessor::readRun\n"
1763 <<
"Illegal attempt to insert run into cache\n"
1764 <<
"Contact a Framework Developer\n";
1766 auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1768 SendSourceTerminationSignalIfException sentry(actReg_.get());
1769 input_->readRun(*rp, *historyAppender_);
1770 sentry.completedSuccessfully();
1772 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1773 principalCache_.insert(rp);
1778 principalCache_.merge(input_->runAuxiliary(), preg_);
1779 auto runPrincipal =principalCache_.runPrincipalPtr();
1781 SendSourceTerminationSignalIfException sentry(actReg_.get());
1782 input_->readAndMergeRun(*runPrincipal);
1783 sentry.completedSuccessfully();
1785 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1786 return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1789 int EventProcessor::readLuminosityBlock() {
1790 if (principalCache_.hasLumiPrincipal()) {
1792 <<
"EventProcessor::readRun\n"
1793 <<
"Illegal attempt to insert lumi into cache\n"
1794 <<
"Contact a Framework Developer\n";
1796 if (!principalCache_.hasRunPrincipal()) {
1798 <<
"EventProcessor::readRun\n"
1799 <<
"Illegal attempt to insert lumi into cache\n"
1800 <<
"Run is invalid\n"
1801 <<
"Contact a Framework Developer\n";
1803 auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1805 SendSourceTerminationSignalIfException sentry(actReg_.get());
1806 input_->readLuminosityBlock(*lbp, *historyAppender_);
1807 sentry.completedSuccessfully();
1809 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1810 principalCache_.insert(lbp);
1811 return input_->luminosityBlock();
1814 int EventProcessor::readAndMergeLumi() {
1815 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1817 SendSourceTerminationSignalIfException sentry(actReg_.get());
1818 input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1819 sentry.completedSuccessfully();
1821 return input_->luminosityBlock();
1837 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1838 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1839 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1843 principalCache_.deleteLumi(phid, run, lumi);
1844 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1845 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1851 unsigned int iStreamIndex,
1852 std::atomic<bool>* iFinishedProcessingEvents,
1853 tbb::task* iWaitTask):
1855 m_streamID(iStreamIndex),
1856 m_finishedProcessingEvents(iFinishedProcessingEvents),
1857 m_waitTask(iWaitTask){}
1860 m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1861 m_waitTask->decrement_ref_count();
1871 void EventProcessor::processEventsForStreamAsync(
unsigned int iStreamIndex,
1872 std::atomic<bool>* finishedProcessingEvents) {
1876 if(preallocations_.numberOfStreams()>1) {
1878 handler->initializeThisThreadForUse();
1881 if(iStreamIndex==0) {
1885 if(shouldWeStop()) {
1888 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1898 std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1900 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1906 auto sr = input_->resourceSharedWithDelayedReader();
1907 std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1909 delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1912 if (InputSource::IsEvent !=itemType) {
1913 nextItemTypeFromProcessingEvents_ = itemType;
1914 finishedProcessingEvents->store(
true,std::memory_order_release);
1918 if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1920 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1923 readEvent(iStreamIndex);
1926 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1929 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1936 bool expected =
false;
1937 if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,
true)) {
1938 deferredExceptionPtr_ = std::current_exception();
1944 void EventProcessor::readAndProcessEvent() {
1945 if(numberOfForkedChildren_>0) {
1950 nextItemTypeFromProcessingEvents_ = InputSource::IsEvent;
1951 asyncStopRequestedWhileProcessingEvents_ =
false;
1953 std::atomic<bool> finishedProcessingEvents{
false};
1959 tbb::task* eventLoopWaitTask{
new (tbb::task::allocate_root()) tbb::empty_task{}};
1960 eventLoopWaitTask->increment_ref_count();
1962 const unsigned int kNumStreams = preallocations_.numberOfStreams();
1963 unsigned int iStreamIndex = 0;
1964 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1965 eventLoopWaitTask->increment_ref_count();
1966 tbb::task::enqueue( *(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1969 eventLoopWaitTask->increment_ref_count();
1970 eventLoopWaitTask->spawn_and_wait_for_all(*(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1974 if(deferredExceptionPtrIsSet_) {
1975 std::rethrow_exception(deferredExceptionPtr_);
1978 void EventProcessor::readEvent(
unsigned int iStreamIndex) {
1980 auto&
event = principalCache_.eventPrincipal(iStreamIndex);
1983 SendSourceTerminationSignalIfException sentry(actReg_.get());
1984 input_->readEvent(
event, streamContext);
1985 sentry.completedSuccessfully();
1987 FDEBUG(1) <<
"\treadEvent\n";
1990 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1991 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1995 rng->postEventRead(ev);
1997 assert(pep->luminosityBlockPrincipalPtrValid());
1998 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1999 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2007 schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2008 if(hasSubProcess()) {
2009 subProcess_->doEvent(*pep);
2015 bool randomAccess = input_->randomAccess();
2023 StreamContext streamContext(pep->streamID(), &processContext_);
2024 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2029 input_->skipEvents(-2);
2037 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2041 FDEBUG(1) <<
"\tprocessEvent\n";
2042 pep->clearEventPrincipal();
2045 bool EventProcessor::shouldWeStop()
const {
2046 FDEBUG(1) <<
"\tshouldWeStop\n";
2047 if(shouldWeStop_)
return true;
2048 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2052 exceptionMessageFiles_ =
message;
2056 exceptionMessageRuns_ =
message;
2060 exceptionMessageLumis_ =
message;
2063 bool EventProcessor::alreadyHandlingException()
const {
2064 return alreadyHandlingException_;
2067 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2068 if(iMachine.get() != 0) {
2069 if(!iMachine->terminated()) {
2070 forceLooperToEnd_ =
true;
2072 forceLooperToEnd_ =
false;
2075 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2077 if(iMachine->terminated()) {
2078 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
Timestamp const & endTime() const
unsigned int numberOfRuns() const
int numberOfForkedChildren_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
std::shared_ptr< ProcessConfiguration > processConfiguration_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
bool continueAfterChildFailure_
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::shared_ptr< edm::ParameterSet > parameterSet()
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
std::unique_ptr< SignallingProductRegistry > preg_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void addContext(std::string const &context)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_