63 #include "boost/thread/xtime.hpp"
77 #include <sys/types.h>
79 #include <sys/socket.h>
80 #include <sys/select.h>
81 #include <sys/fcntl.h>
90 #include "Cintex/Cintex.h"
98 class SendSourceTerminationSignalIfException {
102 ~SendSourceTerminationSignalIfException() {
107 void completedSuccessfully() {
119 std::unique_ptr<InputSource>
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125 std::shared_ptr<ActivityRegistry>
areg,
129 if(main_input == 0) {
131 <<
"There must be exactly one source in the configuration.\n"
132 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
137 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
140 filler->fill(descriptions);
144 descriptions.validate(*main_input,
std::string(
"source"));
148 std::ostringstream ost;
149 ost <<
"Validating configuration of input source of type " << modtype;
165 processConfiguration.get(),
172 areg->preSourceConstructionSignal_(
md);
173 std::unique_ptr<InputSource>
input;
176 std::shared_ptr<int> sentry(
nullptr,[areg,&
md](
void*){areg->postSourceConstructionSignal_(
md);});
182 std::ostringstream ost;
183 ost <<
"Constructing input source of type " << modtype;
191 boost::shared_ptr<EDLooperBase>
195 boost::shared_ptr<EDLooperBase> vLooper;
197 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
199 if(loopers.size() == 0) {
203 assert(1 == loopers.size());
205 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
223 std::vector<std::string>
const& defaultServices,
224 std::vector<std::string>
const& forcedServices) :
227 branchIDListHelper_(),
230 espController_(new eventsetup::EventSetupsController),
233 processConfiguration_(),
239 deferredExceptionPtrIsSet_(
false),
241 beginJobCalled_(
false),
242 shouldWeStop_(
false),
243 stateMachineWasInErrorState_(
false),
246 exceptionMessageFiles_(),
247 exceptionMessageRuns_(),
248 exceptionMessageLumis_(),
249 alreadyHandlingException_(
false),
250 forceLooperToEnd_(
false),
251 looperBeginJobRun_(
false),
252 forceESCacheClearOnNewRun_(
false),
253 numberOfForkedChildren_(0),
254 numberOfSequentialEventsPerChild_(1),
255 setCpuAffinity_(
false),
256 eventSetupDataToExcludeFromPrefetching_() {
258 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
259 processDesc->addServices(defaultServices, forcedServices);
260 init(processDesc, iToken, iLegacy);
264 std::vector<std::string>
const& defaultServices,
265 std::vector<std::string>
const& forcedServices) :
268 branchIDListHelper_(),
271 espController_(new eventsetup::EventSetupsController),
274 processConfiguration_(),
280 deferredExceptionPtrIsSet_(
false),
282 beginJobCalled_(
false),
283 shouldWeStop_(
false),
284 stateMachineWasInErrorState_(
false),
287 exceptionMessageFiles_(),
288 exceptionMessageRuns_(),
289 exceptionMessageLumis_(),
290 alreadyHandlingException_(
false),
291 forceLooperToEnd_(
false),
292 looperBeginJobRun_(
false),
293 forceESCacheClearOnNewRun_(
false),
294 numberOfForkedChildren_(0),
295 numberOfSequentialEventsPerChild_(1),
296 setCpuAffinity_(
false),
297 asyncStopRequestedWhileProcessingEvents_(
false),
298 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
299 eventSetupDataToExcludeFromPrefetching_()
302 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
303 processDesc->addServices(defaultServices, forcedServices);
312 branchIDListHelper_(),
315 espController_(new eventsetup::EventSetupsController),
318 processConfiguration_(),
324 deferredExceptionPtrIsSet_(
false),
326 beginJobCalled_(
false),
327 shouldWeStop_(
false),
328 stateMachineWasInErrorState_(
false),
331 exceptionMessageFiles_(),
332 exceptionMessageRuns_(),
333 exceptionMessageLumis_(),
334 alreadyHandlingException_(
false),
335 forceLooperToEnd_(
false),
336 looperBeginJobRun_(
false),
337 forceESCacheClearOnNewRun_(
false),
338 numberOfForkedChildren_(0),
339 numberOfSequentialEventsPerChild_(1),
340 setCpuAffinity_(
false),
341 asyncStopRequestedWhileProcessingEvents_(
false),
342 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
343 eventSetupDataToExcludeFromPrefetching_()
345 init(processDesc, token, legacy);
352 branchIDListHelper_(),
355 espController_(new eventsetup::EventSetupsController),
358 processConfiguration_(),
364 deferredExceptionPtrIsSet_(
false),
366 beginJobCalled_(
false),
367 shouldWeStop_(
false),
368 stateMachineWasInErrorState_(
false),
371 exceptionMessageFiles_(),
372 exceptionMessageRuns_(),
373 exceptionMessageLumis_(),
374 alreadyHandlingException_(
false),
375 forceLooperToEnd_(
false),
376 looperBeginJobRun_(
false),
377 forceESCacheClearOnNewRun_(
false),
378 numberOfForkedChildren_(0),
379 numberOfSequentialEventsPerChild_(1),
380 setCpuAffinity_(
false),
381 asyncStopRequestedWhileProcessingEvents_(
false),
382 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
383 eventSetupDataToExcludeFromPrefetching_()
387 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
391 auto processDesc = std::make_shared<ProcessDesc>(
config);
403 ROOT::Cintex::Cintex::Enable();
411 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
423 unsigned int nThreads=1;
424 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
425 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
433 unsigned int nStreams =1;
434 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
435 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
443 unsigned int nConcurrentRuns =1;
449 unsigned int nConcurrentLumis =1;
475 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
476 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
480 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
481 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
489 std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
498 handler->willBeUsingThreads();
502 std::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
537 FDEBUG(2) << parameterSet << std::endl;
543 ep->preModuleDelayedGetSignal_.connect(std::cref(
actReg_->preModuleEventDelayedGetSignal_));
544 ep->postModuleDelayedGetSignal_.connect(std::cref(
actReg_->postModuleEventDelayedGetSignal_));
548 if(subProcessParameterSet) {
595 actReg_->preallocateSignal_(bounds);
616 ex.
addContext(
"Calling beginJob for the source");
622 actReg_->postBeginJobSignal_();
633 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
654 c.
call([actReg](){actReg->postEndJobSignal_();});
669 volatile bool child_failed =
false;
670 volatile unsigned int num_children_done = 0;
671 volatile int child_fail_exit_status = 0;
672 volatile int child_fail_signal = 0;
678 void ep_sigchld(
int, siginfo_t*,
void*) {
682 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
685 if(WIFEXITED(stat_loc)) {
687 if(0 != WEXITSTATUS(stat_loc)) {
688 child_fail_exit_status = WEXITSTATUS(stat_loc);
692 if(WIFSIGNALED(stat_loc)) {
694 child_fail_signal = WTERMSIG(stat_loc);
697 p = waitpid(-1, &stat_loc, WNOHANG);
712 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
714 while(numberOfChildren != 0) {
716 numberOfChildren /= 10;
726 class MessageSenderToSource {
728 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
732 const std::vector<int>& m_childrenPipes;
733 long const m_nEventsToProcess;
735 unsigned int m_aliveChildren;
739 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
740 std::vector<int>
const& childrenPipes,
741 long iNEventsToProcess):
742 m_childrenPipes(childrenPipes),
743 m_nEventsToProcess(iNEventsToProcess),
744 m_aliveChildren(childrenSockets.
size()),
747 FD_ZERO(&m_socketSet);
748 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
750 FD_SET(*it, &m_socketSet);
755 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
757 FD_SET(*it, &m_socketSet);
782 MessageSenderToSource::operator()() {
783 multicore::MessageForParent childMsg;
784 LogInfo(
"ForkingController") <<
"I am controller";
787 multicore::MessageForSource sndmsg;
788 sndmsg.startIndex = 0;
789 sndmsg.nIndices = m_nEventsToProcess;
792 fd_set readSockets, errorSockets;
794 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
795 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
798 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
800 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
809 if (FD_ISSET(
idx, &errorSockets)) {
810 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
811 FD_CLR(
idx, &m_socketSet);
814 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
822 if (!FD_ISSET(
idx, &readSockets)) {
828 bool is_pipe =
false;
829 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
833 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
836 FD_CLR(
idx, &m_socketSet);
844 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
846 FD_CLR(
idx, &m_socketSet);
855 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
857 FD_CLR(
idx, &m_socketSet);
862 sndmsg.startIndex += sndmsg.nIndices;
866 }
while (m_aliveChildren > 0);
874 void EventProcessor::possiblyContinueAfterForkChildFailure() {
875 if(child_failed && continueAfterChildFailure_) {
876 if (child_fail_signal) {
877 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
879 }
else if (child_fail_exit_status) {
880 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
881 child_fail_exit_status=0;
883 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
892 if(0 == numberOfForkedChildren_) {
return true;}
893 assert(0<numberOfForkedChildren_);
901 itemType = input_->nextItemType();
903 assert(itemType == InputSource::IsFile);
907 itemType = input_->nextItemType();
908 assert(itemType == InputSource::IsRun);
910 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
912 input_->runAuxiliary()->beginTime());
913 espController_->eventSetupForInstance(ts);
917 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
919 std::vector<eventsetup::DataKey> dataKeys;
920 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
925 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
927 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
928 excludedData = &(itExcludeRec->second);
929 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
937 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
938 itDataKey != itDataKeyEnd;
941 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
942 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
946 recordPtr->
doGet(*itDataKey);
954 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
959 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
962 actReg_->preForkReleaseResourcesSignal_();
963 input_->doPreForkReleaseResources();
964 schedule_->preForkReleaseResources();
969 unsigned int childIndex = 0;
970 unsigned int const kMaxChildren = numberOfForkedChildren_;
971 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
972 std::vector<pid_t> childrenIds;
973 childrenIds.reserve(kMaxChildren);
974 std::vector<int> childrenSockets;
975 childrenSockets.reserve(kMaxChildren);
976 std::vector<int> childrenPipes;
977 childrenPipes.reserve(kMaxChildren);
978 std::vector<int> childrenSocketsCopy;
979 childrenSocketsCopy.reserve(kMaxChildren);
980 std::vector<int> childrenPipesCopy;
981 childrenPipesCopy.reserve(kMaxChildren);
988 int sockets[2], fd_flags;
989 for(; childIndex < kMaxChildren; ++childIndex) {
991 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
992 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
996 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1000 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1001 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1006 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1007 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1010 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1011 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1014 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1015 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1020 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1021 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1024 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1025 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1029 childrenPipesCopy = childrenPipes;
1030 childrenSocketsCopy = childrenSockets;
1032 pid_t
value = fork();
1038 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1041 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1048 std::stringstream stout;
1049 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1050 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1051 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1053 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1054 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1057 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1058 if(setCpuAffinity_) {
1066 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1068 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1071 CPU_SET(childIndex, &mask);
1072 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1073 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1085 LogError(
"ForkingChild") <<
"failed to create a child";
1088 childrenIds.push_back(value);
1089 childrenSockets.push_back(sockets[0]);
1090 childrenPipes.push_back(pipes[0]);
1093 if(childIndex < kMaxChildren) {
1094 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1095 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1097 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1098 input_->doPostForkReacquireResources(receiver);
1099 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1104 jobReport->parentAfterFork(jobReportFile);
1114 sigset_t blockingSigSet;
1115 sigset_t unblockingSigSet;
1117 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1118 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1119 sigaddset(&blockingSigSet, SIGCHLD);
1120 sigaddset(&blockingSigSet, SIGUSR2);
1121 sigaddset(&blockingSigSet, SIGINT);
1122 sigdelset(&unblockingSigSet, SIGCHLD);
1123 sigdelset(&unblockingSigSet, SIGUSR2);
1124 sigdelset(&unblockingSigSet, SIGINT);
1125 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1129 bool too_many_fds =
false;
1130 if (pipes[1]+1 > FD_SETSIZE) {
1131 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1132 too_many_fds =
true;
1138 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1139 boost::thread senderThread(sender);
1141 if(not too_many_fds) {
1144 possiblyContinueAfterForkChildFailure();
1145 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1146 sigsuspend(&unblockingSigSet);
1147 possiblyContinueAfterForkChildFailure();
1148 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1151 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1153 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1155 LogError(
"ForkingStopping") <<
"child failed";
1158 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1161 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1162 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1163 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1164 it != itEnd; ++it) {
1167 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1168 while(num_children_done != kMaxChildren) {
1169 sigsuspend(&unblockingSigSet);
1171 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1174 senderThread.join();
1175 if(child_failed && !continueAfterChildFailure_) {
1176 if (child_fail_signal) {
1177 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1178 }
else if (child_fail_exit_status) {
1179 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1181 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1185 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1190 std::vector<ModuleDescription const*>
1191 EventProcessor::getAllModuleDescriptions()
const {
1192 return schedule_->getAllModuleDescriptions();
1196 EventProcessor::totalEvents()
const {
1197 return schedule_->totalEvents();
1201 EventProcessor::totalEventsPassed()
const {
1202 return schedule_->totalEventsPassed();
1206 EventProcessor::totalEventsFailed()
const {
1207 return schedule_->totalEventsFailed();
1211 EventProcessor::enableEndPaths(
bool active) {
1212 schedule_->enableEndPaths(active);
1216 EventProcessor::endPathsEnabled()
const {
1217 return schedule_->endPathsEnabled();
1222 schedule_->getTriggerReport(rep);
1226 EventProcessor::clearCounters() {
1227 schedule_->clearCounters();
1231 std::auto_ptr<statemachine::Machine>
1232 EventProcessor::createStateMachine() {
1239 << fileMode_ <<
".\n"
1240 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1250 << emptyRunLumiMode_ <<
".\n"
1251 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1258 machine->initiate();
1264 bool returnValue =
false;
1269 returnCode = epSignal;
1276 EventProcessor::runToCompletion() {
1279 asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1280 std::auto_ptr<statemachine::Machine> machine;
1285 stateMachineWasInErrorState_ =
false;
1290 machine = createStateMachine();
1291 nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1292 asyncStopRequestedWhileProcessingEvents_=
false;
1301 if(numberOfForkedChildren_ > 0) {
1302 size_t size = preg_->size();
1304 SendSourceTerminationSignalIfException sentry(actReg_.get());
1305 more = input_->skipForForking();
1306 sentry.completedSuccessfully();
1309 if(size < preg_->
size()) {
1310 principalCache_.adjustIndexesAfterProductRegistryAddition();
1312 principalCache_.adjustEventsToNewProductRegistry(preg_);
1316 SendSourceTerminationSignalIfException sentry(actReg_.get());
1317 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1318 sentry.completedSuccessfully();
1321 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1323 if(checkForAsyncStopRequest(returnCode)) {
1324 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1325 forceLooperToEnd_ =
true;
1327 forceLooperToEnd_ =
false;
1331 if(itemType == InputSource::IsEvent) {
1333 if(asyncStopRequestedWhileProcessingEvents_) {
1334 forceLooperToEnd_ =
true;
1336 forceLooperToEnd_ =
false;
1337 returnCode = asyncStopStatusCodeFromProcessingEvents_;
1340 itemType = nextItemTypeFromProcessingEvents_;
1343 if(itemType == InputSource::IsEvent) {
1345 else if(itemType == InputSource::IsStop) {
1348 else if(itemType == InputSource::IsFile) {
1351 else if(itemType == InputSource::IsRun) {
1352 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1354 else if(itemType == InputSource::IsLumi) {
1357 else if(itemType == InputSource::IsSynchronize) {
1363 <<
"Unknown next item type passed to EventProcessor\n"
1364 <<
"Please report this error to the Framework group\n";
1366 if(machine->terminated()) {
1418 alreadyHandlingException_ =
true;
1419 terminateMachine(machine);
1420 alreadyHandlingException_ =
false;
1421 if (!exceptionMessageLumis_.empty()) {
1424 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1427 if (!exceptionMessageRuns_.empty()) {
1430 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1433 if (!exceptionMessageFiles_.empty()) {
1436 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1442 if(machine->terminated()) {
1443 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1447 if(stateMachineWasInErrorState_) {
1449 <<
"The boost state machine in the EventProcessor exited after\n"
1450 <<
"entering the Error state.\n";
1454 if(machine.get() != 0) {
1455 terminateMachine(machine);
1457 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1458 <<
"Please report this error to the Framework group\n";
1464 void EventProcessor::readFile() {
1465 FDEBUG(1) <<
" \treadFile\n";
1466 size_t size = preg_->size();
1467 SendSourceTerminationSignalIfException sentry(actReg_.get());
1469 fb_ = input_->readFile();
1470 if(size < preg_->
size()) {
1471 principalCache_.adjustIndexesAfterProductRegistryAddition();
1473 principalCache_.adjustEventsToNewProductRegistry(preg_);
1474 if((numberOfForkedChildren_ > 0)
or
1475 (preallocations_.numberOfStreams()>1 and
1476 preallocations_.numberOfThreads()>1)) {
1477 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1479 sentry.completedSuccessfully();
1482 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1483 if (fb_.get() !=
nullptr) {
1484 SendSourceTerminationSignalIfException sentry(actReg_.get());
1485 input_->closeFile(fb_.get(), cleaningUpAfterException);
1486 sentry.completedSuccessfully();
1488 FDEBUG(1) <<
"\tcloseInputFile\n";
1491 void EventProcessor::openOutputFiles() {
1492 if (fb_.get() !=
nullptr) {
1493 schedule_->openOutputFiles(*fb_);
1494 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1496 FDEBUG(1) <<
"\topenOutputFiles\n";
1499 void EventProcessor::closeOutputFiles() {
1500 if (fb_.get() !=
nullptr) {
1501 schedule_->closeOutputFiles();
1502 if(hasSubProcess()) subProcess_->closeOutputFiles();
1504 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1507 void EventProcessor::respondToOpenInputFile() {
1508 if(hasSubProcess()) {
1509 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1511 if (fb_.get() !=
nullptr) {
1512 schedule_->respondToOpenInputFile(*fb_);
1513 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1515 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1518 void EventProcessor::respondToCloseInputFile() {
1519 if (fb_.get() !=
nullptr) {
1520 schedule_->respondToCloseInputFile(*fb_);
1521 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1523 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1526 void EventProcessor::startingNewLoop() {
1527 shouldWeStop_ =
false;
1530 if(looper_ && looperBeginJobRun_) {
1531 looper_->doStartingNewLoop();
1533 FDEBUG(1) <<
"\tstartingNewLoop\n";
1536 bool EventProcessor::endOfLoop() {
1539 looper_->setModuleChanger(&changer);
1541 looper_->setModuleChanger(
nullptr);
1542 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1545 FDEBUG(1) <<
"\tendOfLoop\n";
1549 void EventProcessor::rewindInput() {
1552 FDEBUG(1) <<
"\trewind\n";
1555 void EventProcessor::prepareForNextLoop() {
1556 looper_->prepareForNextLoop(esp_.get());
1557 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1560 bool EventProcessor::shouldWeCloseOutput()
const {
1561 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1562 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1565 void EventProcessor::doErrorStuff() {
1566 FDEBUG(1) <<
"\tdoErrorStuff\n";
1568 <<
"The EventProcessor state machine encountered an unexpected event\n"
1569 <<
"and went to the error state\n"
1570 <<
"Will attempt to terminate processing normally\n"
1571 <<
"(IF using the looper the next loop will be attempted)\n"
1572 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1573 stateMachineWasInErrorState_ =
true;
1579 SendSourceTerminationSignalIfException sentry(actReg_.get());
1581 input_->doBeginRun(runPrincipal, &processContext_);
1582 sentry.completedSuccessfully();
1587 if(forceESCacheClearOnNewRun_){
1588 espController_->forceCacheClear();
1591 SendSourceTerminationSignalIfException sentry(actReg_.get());
1592 espController_->eventSetupForInstance(ts);
1593 sentry.completedSuccessfully();
1596 if(looper_ && looperBeginJobRun_==
false) {
1598 looper_->beginOfJob(es);
1599 looperBeginJobRun_ =
true;
1600 looper_->doStartingNewLoop();
1604 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1605 if(hasSubProcess()) {
1606 subProcess_->doBeginRun(runPrincipal, ts);
1611 looper_->doBeginRun(runPrincipal, es, &processContext_);
1615 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1616 schedule_->processOneStream<Traits>(
i,runPrincipal, es);
1617 if(hasSubProcess()) {
1618 subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1631 SendSourceTerminationSignalIfException sentry(actReg_.get());
1633 input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1634 sentry.completedSuccessfully();
1637 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1640 SendSourceTerminationSignalIfException sentry(actReg_.get());
1641 espController_->eventSetupForInstance(ts);
1642 sentry.completedSuccessfully();
1646 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1648 schedule_->processOneStream<Traits>(
i,runPrincipal, es, cleaningUpAfterException);
1649 if(hasSubProcess()) {
1650 subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1660 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1661 if(hasSubProcess()) {
1662 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1667 looper_->doEndRun(runPrincipal, es, &processContext_);
1674 SendSourceTerminationSignalIfException sentry(actReg_.get());
1676 input_->doBeginLumi(lumiPrincipal, &processContext_);
1677 sentry.completedSuccessfully();
1683 rng->preBeginLumi(lb);
1690 SendSourceTerminationSignalIfException sentry(actReg_.get());
1691 espController_->eventSetupForInstance(ts);
1692 sentry.completedSuccessfully();
1697 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1698 if(hasSubProcess()) {
1699 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1702 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1704 looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1707 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1709 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es);
1710 if(hasSubProcess()) {
1711 subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1715 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1724 SendSourceTerminationSignalIfException sentry(actReg_.get());
1726 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1727 sentry.completedSuccessfully();
1734 SendSourceTerminationSignalIfException sentry(actReg_.get());
1735 espController_->eventSetupForInstance(ts);
1736 sentry.completedSuccessfully();
1740 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1742 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es, cleaningUpAfterException);
1743 if(hasSubProcess()) {
1744 subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1748 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1754 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1755 if(hasSubProcess()) {
1756 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1759 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1761 looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1766 if (principalCache_.hasRunPrincipal()) {
1768 <<
"EventProcessor::readRun\n"
1769 <<
"Illegal attempt to insert run into cache\n"
1770 <<
"Contact a Framework Developer\n";
1772 auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1774 SendSourceTerminationSignalIfException sentry(actReg_.get());
1775 input_->readRun(*rp, *historyAppender_);
1776 sentry.completedSuccessfully();
1778 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1779 principalCache_.insert(rp);
1784 principalCache_.merge(input_->runAuxiliary(), preg_);
1785 auto runPrincipal =principalCache_.runPrincipalPtr();
1787 SendSourceTerminationSignalIfException sentry(actReg_.get());
1788 input_->readAndMergeRun(*runPrincipal);
1789 sentry.completedSuccessfully();
1791 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1792 return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1795 int EventProcessor::readLuminosityBlock() {
1796 if (principalCache_.hasLumiPrincipal()) {
1798 <<
"EventProcessor::readRun\n"
1799 <<
"Illegal attempt to insert lumi into cache\n"
1800 <<
"Contact a Framework Developer\n";
1802 if (!principalCache_.hasRunPrincipal()) {
1804 <<
"EventProcessor::readRun\n"
1805 <<
"Illegal attempt to insert lumi into cache\n"
1806 <<
"Run is invalid\n"
1807 <<
"Contact a Framework Developer\n";
1809 auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1811 SendSourceTerminationSignalIfException sentry(actReg_.get());
1812 input_->readLuminosityBlock(*lbp, *historyAppender_);
1813 sentry.completedSuccessfully();
1815 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1816 principalCache_.insert(lbp);
1817 return input_->luminosityBlock();
1820 int EventProcessor::readAndMergeLumi() {
1821 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1823 SendSourceTerminationSignalIfException sentry(actReg_.get());
1824 input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1825 sentry.completedSuccessfully();
1827 return input_->luminosityBlock();
1843 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1844 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1845 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1849 principalCache_.deleteLumi(phid, run, lumi);
1850 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1851 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1857 unsigned int iStreamIndex,
1858 std::atomic<bool>* iFinishedProcessingEvents,
1859 tbb::task* iWaitTask):
1861 m_streamID(iStreamIndex),
1862 m_finishedProcessingEvents(iFinishedProcessingEvents),
1863 m_waitTask(iWaitTask){}
1866 m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1867 m_waitTask->decrement_ref_count();
1877 void EventProcessor::processEventsForStreamAsync(
unsigned int iStreamIndex,
1878 std::atomic<bool>* finishedProcessingEvents) {
1882 if(preallocations_.numberOfStreams()>1) {
1884 handler->initializeThisThreadForUse();
1887 if(iStreamIndex==0) {
1891 if(shouldWeStop()) {
1894 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1904 std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1906 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1912 auto sr = input_->resourceSharedWithDelayedReader();
1913 std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1915 delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1918 if (InputSource::IsEvent !=itemType) {
1919 nextItemTypeFromProcessingEvents_ = itemType;
1920 finishedProcessingEvents->store(
true,std::memory_order_release);
1924 if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1926 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1929 readEvent(iStreamIndex);
1932 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1935 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1942 bool expected =
false;
1943 if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,
true)) {
1944 deferredExceptionPtr_ = std::current_exception();
1950 void EventProcessor::readAndProcessEvent() {
1951 if(numberOfForkedChildren_>0) {
1956 nextItemTypeFromProcessingEvents_ = InputSource::IsEvent;
1957 asyncStopRequestedWhileProcessingEvents_ =
false;
1959 std::atomic<bool> finishedProcessingEvents{
false};
1965 tbb::task* eventLoopWaitTask{
new (tbb::task::allocate_root()) tbb::empty_task{}};
1966 eventLoopWaitTask->increment_ref_count();
1968 const unsigned int kNumStreams = preallocations_.numberOfStreams();
1969 unsigned int iStreamIndex = 0;
1970 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1971 eventLoopWaitTask->increment_ref_count();
1972 tbb::task::enqueue( *(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1975 eventLoopWaitTask->increment_ref_count();
1976 eventLoopWaitTask->spawn_and_wait_for_all(*(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1980 if(deferredExceptionPtrIsSet_) {
1981 std::rethrow_exception(deferredExceptionPtr_);
1984 void EventProcessor::readEvent(
unsigned int iStreamIndex) {
1986 auto&
event = principalCache_.eventPrincipal(iStreamIndex);
1989 SendSourceTerminationSignalIfException sentry(actReg_.get());
1990 input_->readEvent(
event, streamContext);
1991 sentry.completedSuccessfully();
1993 FDEBUG(1) <<
"\treadEvent\n";
1996 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1997 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2001 rng->postEventRead(ev);
2003 assert(pep->luminosityBlockPrincipalPtrValid());
2004 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2005 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2013 schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2014 if(hasSubProcess()) {
2015 subProcess_->doEvent(*pep);
2021 bool randomAccess = input_->randomAccess();
2029 StreamContext streamContext(pep->streamID(), &processContext_);
2030 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2035 input_->skipEvents(-2);
2043 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2047 FDEBUG(1) <<
"\tprocessEvent\n";
2048 pep->clearEventPrincipal();
2051 bool EventProcessor::shouldWeStop()
const {
2052 FDEBUG(1) <<
"\tshouldWeStop\n";
2053 if(shouldWeStop_)
return true;
2054 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2058 exceptionMessageFiles_ =
message;
2062 exceptionMessageRuns_ =
message;
2066 exceptionMessageLumis_ =
message;
2069 bool EventProcessor::alreadyHandlingException()
const {
2070 return alreadyHandlingException_;
2073 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2074 if(iMachine.get() != 0) {
2075 if(!iMachine->terminated()) {
2076 forceLooperToEnd_ =
true;
2078 forceLooperToEnd_ =
false;
2081 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2083 if(iMachine->terminated()) {
2084 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
Timestamp const & endTime() const
unsigned int numberOfRuns() const
int numberOfForkedChildren_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
std::shared_ptr< ProcessConfiguration > processConfiguration_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
bool continueAfterChildFailure_
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::shared_ptr< edm::ParameterSet > parameterSet()
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
std::unique_ptr< SignallingProductRegistry > preg_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
int maxSecondsUntilRampdown_