64 #include "boost/thread/xtime.hpp"
78 #include <sys/types.h>
80 #include <sys/socket.h>
81 #include <sys/select.h>
82 #include <sys/fcntl.h>
95 class SendSourceTerminationSignalIfException {
99 ~SendSourceTerminationSignalIfException() {
104 void completedSuccessfully() {
117 std::unique_ptr<InputSource>
120 std::shared_ptr<ProductRegistry>
preg,
121 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
122 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
123 std::shared_ptr<ActivityRegistry>
areg,
127 if(main_input == 0) {
129 <<
"There must be exactly one source in the configuration.\n"
130 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
135 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
138 filler->fill(descriptions);
142 descriptions.validate(*main_input,
std::string(
"source"));
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 areg->preSourceConstructionSignal_(
md);
171 std::unique_ptr<InputSource>
input;
174 std::shared_ptr<int> sentry(
nullptr,[areg,&
md](
void*){areg->postSourceConstructionSignal_(
md);});
180 std::ostringstream ost;
181 ost <<
"Constructing input source of type " << modtype;
189 std::shared_ptr<EDLooperBase>
193 std::shared_ptr<EDLooperBase> vLooper;
195 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
197 if(loopers.size() == 0) {
201 assert(1 == loopers.size());
203 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
221 std::vector<std::string>
const& defaultServices,
222 std::vector<std::string>
const& forcedServices) :
225 branchIDListHelper_(),
228 espController_(new eventsetup::EventSetupsController),
231 processConfiguration_(),
237 deferredExceptionPtrIsSet_(
false),
239 beginJobCalled_(
false),
240 shouldWeStop_(
false),
241 stateMachineWasInErrorState_(
false),
244 exceptionMessageFiles_(),
245 exceptionMessageRuns_(),
246 exceptionMessageLumis_(),
247 alreadyHandlingException_(
false),
248 forceLooperToEnd_(
false),
249 looperBeginJobRun_(
false),
250 forceESCacheClearOnNewRun_(
false),
251 numberOfForkedChildren_(0),
252 numberOfSequentialEventsPerChild_(1),
253 setCpuAffinity_(
false),
254 eventSetupDataToExcludeFromPrefetching_() {
256 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
257 processDesc->addServices(defaultServices, forcedServices);
258 init(processDesc, iToken, iLegacy);
262 std::vector<std::string>
const& defaultServices,
263 std::vector<std::string>
const& forcedServices) :
266 branchIDListHelper_(),
269 espController_(new eventsetup::EventSetupsController),
272 processConfiguration_(),
278 deferredExceptionPtrIsSet_(
false),
280 beginJobCalled_(
false),
281 shouldWeStop_(
false),
282 stateMachineWasInErrorState_(
false),
285 exceptionMessageFiles_(),
286 exceptionMessageRuns_(),
287 exceptionMessageLumis_(),
288 alreadyHandlingException_(
false),
289 forceLooperToEnd_(
false),
290 looperBeginJobRun_(
false),
291 forceESCacheClearOnNewRun_(
false),
292 numberOfForkedChildren_(0),
293 numberOfSequentialEventsPerChild_(1),
294 setCpuAffinity_(
false),
295 asyncStopRequestedWhileProcessingEvents_(
false),
296 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
297 eventSetupDataToExcludeFromPrefetching_()
300 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
301 processDesc->addServices(defaultServices, forcedServices);
310 branchIDListHelper_(),
313 espController_(new eventsetup::EventSetupsController),
316 processConfiguration_(),
322 deferredExceptionPtrIsSet_(
false),
324 beginJobCalled_(
false),
325 shouldWeStop_(
false),
326 stateMachineWasInErrorState_(
false),
329 exceptionMessageFiles_(),
330 exceptionMessageRuns_(),
331 exceptionMessageLumis_(),
332 alreadyHandlingException_(
false),
333 forceLooperToEnd_(
false),
334 looperBeginJobRun_(
false),
335 forceESCacheClearOnNewRun_(
false),
336 numberOfForkedChildren_(0),
337 numberOfSequentialEventsPerChild_(1),
338 setCpuAffinity_(
false),
339 asyncStopRequestedWhileProcessingEvents_(
false),
340 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
341 eventSetupDataToExcludeFromPrefetching_()
343 init(processDesc, token, legacy);
350 branchIDListHelper_(),
353 espController_(new eventsetup::EventSetupsController),
356 processConfiguration_(),
362 deferredExceptionPtrIsSet_(
false),
364 beginJobCalled_(
false),
365 shouldWeStop_(
false),
366 stateMachineWasInErrorState_(
false),
369 exceptionMessageFiles_(),
370 exceptionMessageRuns_(),
371 exceptionMessageLumis_(),
372 alreadyHandlingException_(
false),
373 forceLooperToEnd_(
false),
374 looperBeginJobRun_(
false),
375 forceESCacheClearOnNewRun_(
false),
376 numberOfForkedChildren_(0),
377 numberOfSequentialEventsPerChild_(1),
378 setCpuAffinity_(
false),
379 asyncStopRequestedWhileProcessingEvents_(
false),
380 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
381 eventSetupDataToExcludeFromPrefetching_()
385 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
389 auto processDesc = std::make_shared<ProcessDesc>(
config);
407 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
419 unsigned int nThreads=1;
420 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
421 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
429 unsigned int nStreams =1;
430 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
431 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
439 unsigned int nConcurrentRuns =1;
445 unsigned int nConcurrentLumis =1;
471 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
472 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
476 std::make_pair(itPS->getUntrackedParameter<
std::string>(
"type",
"*"),
477 itPS->getUntrackedParameter<
std::string>(
"label",
"")));
485 std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
494 handler->willBeUsingThreads();
498 std::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
540 FDEBUG(2) << parameterSet << std::endl;
547 ep->preModuleDelayedGetSignal_.connect(std::cref(
actReg_->preModuleEventDelayedGetSignal_));
548 ep->postModuleDelayedGetSignal_.connect(std::cref(
actReg_->postModuleEventDelayedGetSignal_));
552 if(hasSubProcesses) {
554 subProcesses_ = std::make_unique<std::vector<SubProcess> >();
557 for(
auto& subProcessPSet : *subProcessVParameterSet) {
606 actReg_->preallocateSignal_(bounds);
630 ex.
addContext(
"Calling beginJob for the source");
637 subProcess.doBeginJob();
640 actReg_->postBeginJobSignal_();
646 subProcess.doBeginStream(
i);
655 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
665 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
670 c.
call([actReg](){actReg->preEndJobSignal_();});
681 c.
call([actReg](){actReg->postEndJobSignal_();});
696 volatile bool child_failed =
false;
697 volatile unsigned int num_children_done = 0;
698 volatile int child_fail_exit_status = 0;
699 volatile int child_fail_signal = 0;
705 void ep_sigchld(
int, siginfo_t*,
void*) {
709 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
712 if(WIFEXITED(stat_loc)) {
714 if(0 != WEXITSTATUS(stat_loc)) {
715 child_fail_exit_status = WEXITSTATUS(stat_loc);
719 if(WIFSIGNALED(stat_loc)) {
721 child_fail_signal = WTERMSIG(stat_loc);
724 p = waitpid(-1, &stat_loc, WNOHANG);
739 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
741 while(numberOfChildren != 0) {
743 numberOfChildren /= 10;
753 class MessageSenderToSource {
755 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
759 const std::vector<int>& m_childrenPipes;
760 long const m_nEventsToProcess;
762 unsigned int m_aliveChildren;
766 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
767 std::vector<int>
const& childrenPipes,
768 long iNEventsToProcess):
769 m_childrenPipes(childrenPipes),
770 m_nEventsToProcess(iNEventsToProcess),
771 m_aliveChildren(childrenSockets.
size()),
774 FD_ZERO(&m_socketSet);
775 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
777 FD_SET(*it, &m_socketSet);
782 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
784 FD_SET(*it, &m_socketSet);
809 MessageSenderToSource::operator()() {
810 multicore::MessageForParent childMsg;
811 LogInfo(
"ForkingController") <<
"I am controller";
814 multicore::MessageForSource sndmsg;
815 sndmsg.startIndex = 0;
816 sndmsg.nIndices = m_nEventsToProcess;
819 fd_set readSockets, errorSockets;
821 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
822 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
825 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
827 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
836 if (FD_ISSET(
idx, &errorSockets)) {
837 LogInfo(
"ForkingController") <<
"Error on socket " <<
idx;
838 FD_CLR(
idx, &m_socketSet);
841 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
849 if (!FD_ISSET(
idx, &readSockets)) {
855 bool is_pipe =
false;
856 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
860 while (((rc =
read(
idx, &buf, 1)) < 0) && (errno == EINTR)) {}
863 FD_CLR(
idx, &m_socketSet);
871 while (((rc = recv(
idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
873 FD_CLR(
idx, &m_socketSet);
882 while (((rc = send(
idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
884 FD_CLR(
idx, &m_socketSet);
889 sndmsg.startIndex += sndmsg.nIndices;
893 }
while (m_aliveChildren > 0);
901 void EventProcessor::possiblyContinueAfterForkChildFailure() {
902 if(child_failed && continueAfterChildFailure_) {
903 if (child_fail_signal) {
904 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
906 }
else if (child_fail_exit_status) {
907 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
908 child_fail_exit_status=0;
910 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
919 if(0 == numberOfForkedChildren_) {
return true;}
920 assert(0<numberOfForkedChildren_);
928 itemType = input_->nextItemType();
930 assert(itemType == InputSource::IsFile);
934 itemType = input_->nextItemType();
935 assert(itemType == InputSource::IsRun);
937 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
939 input_->runAuxiliary()->beginTime());
940 espController_->eventSetupForInstance(ts);
944 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
946 std::vector<eventsetup::DataKey> dataKeys;
947 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
952 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
954 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
955 excludedData = &(itExcludeRec->second);
956 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
964 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
965 itDataKey != itDataKeyEnd;
968 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
969 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
973 recordPtr->
doGet(*itDataKey);
981 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
986 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
989 actReg_->preForkReleaseResourcesSignal_();
990 input_->doPreForkReleaseResources();
991 schedule_->preForkReleaseResources();
996 unsigned int childIndex = 0;
997 unsigned int const kMaxChildren = numberOfForkedChildren_;
998 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
999 std::vector<pid_t> childrenIds;
1000 childrenIds.reserve(kMaxChildren);
1001 std::vector<int> childrenSockets;
1002 childrenSockets.reserve(kMaxChildren);
1003 std::vector<int> childrenPipes;
1004 childrenPipes.reserve(kMaxChildren);
1005 std::vector<int> childrenSocketsCopy;
1006 childrenSocketsCopy.reserve(kMaxChildren);
1007 std::vector<int> childrenPipesCopy;
1008 childrenPipesCopy.reserve(kMaxChildren);
1015 int sockets[2], fd_flags;
1016 for(; childIndex < kMaxChildren; ++childIndex) {
1018 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1019 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1023 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1027 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1028 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1033 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1034 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1037 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1038 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1041 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1042 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1047 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1048 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1051 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1052 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1056 childrenPipesCopy = childrenPipes;
1057 childrenSocketsCopy = childrenSockets;
1059 pid_t
value = fork();
1065 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1068 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1075 std::stringstream stout;
1076 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1077 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1078 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1080 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1081 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1084 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1085 if(setCpuAffinity_) {
1093 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1095 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1098 CPU_SET(childIndex, &mask);
1099 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1100 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1112 LogError(
"ForkingChild") <<
"failed to create a child";
1115 childrenIds.push_back(value);
1116 childrenSockets.push_back(sockets[0]);
1117 childrenPipes.push_back(pipes[0]);
1120 if(childIndex < kMaxChildren) {
1121 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1122 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1124 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1125 input_->doPostForkReacquireResources(receiver);
1126 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1131 jobReport->parentAfterFork(jobReportFile);
1141 sigset_t blockingSigSet;
1142 sigset_t unblockingSigSet;
1144 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1145 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1146 sigaddset(&blockingSigSet, SIGCHLD);
1147 sigaddset(&blockingSigSet, SIGUSR2);
1148 sigaddset(&blockingSigSet, SIGINT);
1149 sigdelset(&unblockingSigSet, SIGCHLD);
1150 sigdelset(&unblockingSigSet, SIGUSR2);
1151 sigdelset(&unblockingSigSet, SIGINT);
1152 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1156 bool too_many_fds =
false;
1157 if (pipes[1]+1 > FD_SETSIZE) {
1158 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1159 too_many_fds =
true;
1165 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1166 boost::thread senderThread(sender);
1168 if(not too_many_fds) {
1171 possiblyContinueAfterForkChildFailure();
1172 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1173 sigsuspend(&unblockingSigSet);
1174 possiblyContinueAfterForkChildFailure();
1175 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1178 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1180 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1182 LogError(
"ForkingStopping") <<
"child failed";
1185 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1188 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1189 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1190 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1191 it != itEnd; ++it) {
1194 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1195 while(num_children_done != kMaxChildren) {
1196 sigsuspend(&unblockingSigSet);
1198 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1201 senderThread.join();
1202 if(child_failed && !continueAfterChildFailure_) {
1203 if (child_fail_signal) {
1204 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1205 }
else if (child_fail_exit_status) {
1206 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1208 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1212 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1217 std::vector<ModuleDescription const*>
1218 EventProcessor::getAllModuleDescriptions()
const {
1219 return schedule_->getAllModuleDescriptions();
1223 EventProcessor::totalEvents()
const {
1224 return schedule_->totalEvents();
1228 EventProcessor::totalEventsPassed()
const {
1229 return schedule_->totalEventsPassed();
1233 EventProcessor::totalEventsFailed()
const {
1234 return schedule_->totalEventsFailed();
1238 EventProcessor::enableEndPaths(
bool active) {
1239 schedule_->enableEndPaths(active);
1243 EventProcessor::endPathsEnabled()
const {
1244 return schedule_->endPathsEnabled();
1249 schedule_->getTriggerReport(rep);
1253 EventProcessor::clearCounters() {
1254 schedule_->clearCounters();
1258 std::auto_ptr<statemachine::Machine>
1259 EventProcessor::createStateMachine() {
1266 << fileMode_ <<
".\n"
1267 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1277 << emptyRunLumiMode_ <<
".\n"
1278 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1285 machine->initiate();
1291 bool returnValue =
false;
1296 returnCode = epSignal;
1303 EventProcessor::runToCompletion() {
1306 asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1307 std::auto_ptr<statemachine::Machine> machine;
1312 stateMachineWasInErrorState_ =
false;
1317 machine = createStateMachine();
1318 nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1319 asyncStopRequestedWhileProcessingEvents_=
false;
1328 if(numberOfForkedChildren_ > 0) {
1329 size_t size = preg_->size();
1331 SendSourceTerminationSignalIfException sentry(actReg_.get());
1332 more = input_->skipForForking();
1333 sentry.completedSuccessfully();
1336 if(size < preg_->
size()) {
1337 principalCache_.adjustIndexesAfterProductRegistryAddition();
1339 principalCache_.adjustEventsToNewProductRegistry(
preg());
1343 SendSourceTerminationSignalIfException sentry(actReg_.get());
1344 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1345 sentry.completedSuccessfully();
1348 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1350 if(checkForAsyncStopRequest(returnCode)) {
1351 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1352 forceLooperToEnd_ =
true;
1354 forceLooperToEnd_ =
false;
1358 if(itemType == InputSource::IsEvent) {
1360 if(asyncStopRequestedWhileProcessingEvents_) {
1361 forceLooperToEnd_ =
true;
1363 forceLooperToEnd_ =
false;
1364 returnCode = asyncStopStatusCodeFromProcessingEvents_;
1367 itemType = nextItemTypeFromProcessingEvents_;
1370 if(itemType == InputSource::IsEvent) {
1372 else if(itemType == InputSource::IsStop) {
1375 else if(itemType == InputSource::IsFile) {
1378 else if(itemType == InputSource::IsRun) {
1379 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1381 else if(itemType == InputSource::IsLumi) {
1384 else if(itemType == InputSource::IsSynchronize) {
1390 <<
"Unknown next item type passed to EventProcessor\n"
1391 <<
"Please report this error to the Framework group\n";
1393 if(machine->terminated()) {
1445 alreadyHandlingException_ =
true;
1446 terminateMachine(machine);
1447 alreadyHandlingException_ =
false;
1448 if (!exceptionMessageLumis_.empty()) {
1451 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1454 if (!exceptionMessageRuns_.empty()) {
1457 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1460 if (!exceptionMessageFiles_.empty()) {
1463 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1469 if(machine->terminated()) {
1470 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1474 if(stateMachineWasInErrorState_) {
1476 <<
"The boost state machine in the EventProcessor exited after\n"
1477 <<
"entering the Error state.\n";
1481 if(machine.get() != 0) {
1482 terminateMachine(machine);
1484 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1485 <<
"Please report this error to the Framework group\n";
1491 void EventProcessor::readFile() {
1492 FDEBUG(1) <<
" \treadFile\n";
1493 size_t size = preg_->size();
1494 SendSourceTerminationSignalIfException sentry(actReg_.get());
1496 fb_ = input_->readFile();
1497 if(size < preg_->
size()) {
1498 principalCache_.adjustIndexesAfterProductRegistryAddition();
1500 principalCache_.adjustEventsToNewProductRegistry(
preg());
1501 if((numberOfForkedChildren_ > 0)
or
1502 (preallocations_.numberOfStreams()>1 and
1503 preallocations_.numberOfThreads()>1)) {
1504 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1506 sentry.completedSuccessfully();
1509 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1510 if (fb_.get() !=
nullptr) {
1511 SendSourceTerminationSignalIfException sentry(actReg_.get());
1512 input_->closeFile(fb_.get(), cleaningUpAfterException);
1513 sentry.completedSuccessfully();
1515 FDEBUG(1) <<
"\tcloseInputFile\n";
1518 void EventProcessor::openOutputFiles() {
1519 if (fb_.get() !=
nullptr) {
1520 schedule_->openOutputFiles(*fb_);
1521 if(hasSubProcesses()) {
1522 for(
auto& subProcess : *subProcesses_) {
1523 subProcess.openOutputFiles(*fb_);
1527 FDEBUG(1) <<
"\topenOutputFiles\n";
1530 void EventProcessor::closeOutputFiles() {
1531 if (fb_.get() !=
nullptr) {
1532 schedule_->closeOutputFiles();
1533 if(hasSubProcesses()) {
1534 for(
auto& subProcess : *subProcesses_) {
1535 subProcess.closeOutputFiles();
1539 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1542 void EventProcessor::respondToOpenInputFile() {
1543 if(hasSubProcesses()) {
1544 for(
auto& subProcess : *subProcesses_) {
1545 subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1548 if (fb_.get() !=
nullptr) {
1549 schedule_->respondToOpenInputFile(*fb_);
1550 if(hasSubProcesses()) {
1551 for(
auto& subProcess : *subProcesses_) {
1552 subProcess.respondToOpenInputFile(*fb_);
1556 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1559 void EventProcessor::respondToCloseInputFile() {
1560 if (fb_.get() !=
nullptr) {
1561 schedule_->respondToCloseInputFile(*fb_);
1562 if(hasSubProcesses()) {
1563 for(
auto& subProcess : *subProcesses_) {
1564 subProcess.respondToCloseInputFile(*fb_);
1568 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1571 void EventProcessor::startingNewLoop() {
1572 shouldWeStop_ =
false;
1575 if(looper_ && looperBeginJobRun_) {
1576 looper_->doStartingNewLoop();
1578 FDEBUG(1) <<
"\tstartingNewLoop\n";
1581 bool EventProcessor::endOfLoop() {
1584 looper_->setModuleChanger(&changer);
1586 looper_->setModuleChanger(
nullptr);
1587 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1590 FDEBUG(1) <<
"\tendOfLoop\n";
1594 void EventProcessor::rewindInput() {
1597 FDEBUG(1) <<
"\trewind\n";
1600 void EventProcessor::prepareForNextLoop() {
1601 looper_->prepareForNextLoop(esp_.get());
1602 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1605 bool EventProcessor::shouldWeCloseOutput()
const {
1606 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1607 if(hasSubProcesses()) {
1608 for(
auto const& subProcess : *subProcesses_) {
1609 if(subProcess.shouldWeCloseOutput()) {
1615 return schedule_->shouldWeCloseOutput();
1618 void EventProcessor::doErrorStuff() {
1619 FDEBUG(1) <<
"\tdoErrorStuff\n";
1621 <<
"The EventProcessor state machine encountered an unexpected event\n"
1622 <<
"and went to the error state\n"
1623 <<
"Will attempt to terminate processing normally\n"
1624 <<
"(IF using the looper the next loop will be attempted)\n"
1625 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1626 stateMachineWasInErrorState_ =
true;
1632 SendSourceTerminationSignalIfException sentry(actReg_.get());
1634 input_->doBeginRun(runPrincipal, &processContext_);
1635 sentry.completedSuccessfully();
1640 if(forceESCacheClearOnNewRun_){
1641 espController_->forceCacheClear();
1644 SendSourceTerminationSignalIfException sentry(actReg_.get());
1645 espController_->eventSetupForInstance(ts);
1646 sentry.completedSuccessfully();
1649 if(looper_ && looperBeginJobRun_==
false) {
1651 looper_->beginOfJob(es);
1652 looperBeginJobRun_ =
true;
1653 looper_->doStartingNewLoop();
1657 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1658 if(hasSubProcesses()) {
1659 for(
auto& subProcess : *subProcesses_) {
1660 subProcess.doBeginRun(runPrincipal, ts);
1666 looper_->doBeginRun(runPrincipal, es, &processContext_);
1670 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1671 schedule_->processOneStream<Traits>(
i,runPrincipal, es);
1672 if(hasSubProcesses()) {
1673 for(
auto& subProcess : *subProcesses_) {
1674 subProcess.doStreamBeginRun(i, runPrincipal, ts);
1688 SendSourceTerminationSignalIfException sentry(actReg_.get());
1690 input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1691 sentry.completedSuccessfully();
1694 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1697 SendSourceTerminationSignalIfException sentry(actReg_.get());
1698 espController_->eventSetupForInstance(ts);
1699 sentry.completedSuccessfully();
1703 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1705 schedule_->processOneStream<Traits>(
i,runPrincipal, es, cleaningUpAfterException);
1706 if(hasSubProcesses()) {
1707 for(
auto& subProcess : *subProcesses_) {
1708 subProcess.doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1719 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1720 if(hasSubProcesses()) {
1721 for(
auto& subProcess : *subProcesses_) {
1722 subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException);
1728 looper_->doEndRun(runPrincipal, es, &processContext_);
1735 SendSourceTerminationSignalIfException sentry(actReg_.get());
1737 input_->doBeginLumi(lumiPrincipal, &processContext_);
1738 sentry.completedSuccessfully();
1744 rng->preBeginLumi(lb);
1751 SendSourceTerminationSignalIfException sentry(actReg_.get());
1752 espController_->eventSetupForInstance(ts);
1753 sentry.completedSuccessfully();
1758 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1759 if(hasSubProcesses()) {
1760 for(
auto& subProcess : *subProcesses_) {
1761 subProcess.doBeginLuminosityBlock(lumiPrincipal, ts);
1765 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1767 looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1770 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1772 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es);
1773 if(hasSubProcesses()) {
1774 for(
auto& subProcess : *subProcesses_) {
1775 subProcess.doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1780 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1789 SendSourceTerminationSignalIfException sentry(actReg_.get());
1791 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1792 sentry.completedSuccessfully();
1799 SendSourceTerminationSignalIfException sentry(actReg_.get());
1800 espController_->eventSetupForInstance(ts);
1801 sentry.completedSuccessfully();
1805 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1807 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es, cleaningUpAfterException);
1808 if(hasSubProcesses()) {
1809 for(
auto& subProcess : *subProcesses_) {
1810 subProcess.doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1815 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1821 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1822 if(hasSubProcesses()) {
1823 for(
auto& subProcess : *subProcesses_) {
1824 subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1828 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1830 looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1835 if (principalCache_.hasRunPrincipal()) {
1837 <<
"EventProcessor::readRun\n"
1838 <<
"Illegal attempt to insert run into cache\n"
1839 <<
"Contact a Framework Developer\n";
1841 auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
preg(), *processConfiguration_, historyAppender_.get(), 0);
1843 SendSourceTerminationSignalIfException sentry(actReg_.get());
1844 input_->readRun(*rp, *historyAppender_);
1845 sentry.completedSuccessfully();
1847 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1848 principalCache_.insert(rp);
1853 principalCache_.merge(input_->runAuxiliary(),
preg());
1854 auto runPrincipal =principalCache_.runPrincipalPtr();
1856 SendSourceTerminationSignalIfException sentry(actReg_.get());
1857 input_->readAndMergeRun(*runPrincipal);
1858 sentry.completedSuccessfully();
1860 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1861 return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1864 int EventProcessor::readLuminosityBlock() {
1865 if (principalCache_.hasLumiPrincipal()) {
1867 <<
"EventProcessor::readRun\n"
1868 <<
"Illegal attempt to insert lumi into cache\n"
1869 <<
"Contact a Framework Developer\n";
1871 if (!principalCache_.hasRunPrincipal()) {
1873 <<
"EventProcessor::readRun\n"
1874 <<
"Illegal attempt to insert lumi into cache\n"
1875 <<
"Run is invalid\n"
1876 <<
"Contact a Framework Developer\n";
1878 auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(),
preg(), *processConfiguration_, historyAppender_.get(), 0);
1880 SendSourceTerminationSignalIfException sentry(actReg_.get());
1881 input_->readLuminosityBlock(*lbp, *historyAppender_);
1882 sentry.completedSuccessfully();
1884 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1885 principalCache_.insert(lbp);
1886 return input_->luminosityBlock();
1889 int EventProcessor::readAndMergeLumi() {
1890 principalCache_.merge(input_->luminosityBlockAuxiliary(),
preg());
1892 SendSourceTerminationSignalIfException sentry(actReg_.get());
1893 input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1894 sentry.completedSuccessfully();
1896 return input_->luminosityBlock();
1901 if(hasSubProcesses()) {
1902 for(
auto& subProcess : *subProcesses_) {
1911 if(hasSubProcesses()) {
1912 for(
auto& subProcess : *subProcesses_) {
1920 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1921 if(hasSubProcesses()) {
1922 for(
auto& subProcess : *subProcesses_) {
1923 subProcess.writeLumi(phid, run, lumi);
1926 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1930 principalCache_.deleteLumi(phid, run, lumi);
1931 if(hasSubProcesses()) {
1932 for(
auto& subProcess : *subProcesses_) {
1933 subProcess.deleteLumiFromCache(phid, run, lumi);
1936 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1942 unsigned int iStreamIndex,
1943 std::atomic<bool>* iFinishedProcessingEvents,
1944 tbb::task* iWaitTask):
1946 m_streamID(iStreamIndex),
1947 m_finishedProcessingEvents(iFinishedProcessingEvents),
1948 m_waitTask(iWaitTask){}
1951 m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1952 m_waitTask->decrement_ref_count();
1962 void EventProcessor::processEventsForStreamAsync(
unsigned int iStreamIndex,
1963 std::atomic<bool>* finishedProcessingEvents) {
1967 if(preallocations_.numberOfStreams()>1) {
1969 handler->initializeThisThreadForUse();
1972 if(iStreamIndex==0) {
1976 if(shouldWeStop()) {
1979 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1989 std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1991 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1997 auto sr = input_->resourceSharedWithDelayedReader();
1998 std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
2000 delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
2003 if (InputSource::IsEvent !=itemType) {
2004 nextItemTypeFromProcessingEvents_ = itemType;
2005 finishedProcessingEvents->store(
true,std::memory_order_release);
2009 if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
2011 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
2014 readEvent(iStreamIndex);
2017 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
2020 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
2027 bool expected =
false;
2028 if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,
true)) {
2029 deferredExceptionPtr_ = std::current_exception();
2035 void EventProcessor::readAndProcessEvent() {
2036 if(numberOfForkedChildren_>0) {
2041 nextItemTypeFromProcessingEvents_ = InputSource::IsEvent;
2042 asyncStopRequestedWhileProcessingEvents_ =
false;
2044 std::atomic<bool> finishedProcessingEvents{
false};
2050 tbb::task* eventLoopWaitTask{
new (tbb::task::allocate_root()) tbb::empty_task{}};
2051 eventLoopWaitTask->increment_ref_count();
2053 const unsigned int kNumStreams = preallocations_.numberOfStreams();
2054 unsigned int iStreamIndex = 0;
2055 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2056 eventLoopWaitTask->increment_ref_count();
2057 tbb::task::enqueue( *(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
2060 eventLoopWaitTask->increment_ref_count();
2061 eventLoopWaitTask->spawn_and_wait_for_all(*(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
2065 if(deferredExceptionPtrIsSet_) {
2066 std::rethrow_exception(deferredExceptionPtr_);
2069 void EventProcessor::readEvent(
unsigned int iStreamIndex) {
2071 auto&
event = principalCache_.eventPrincipal(iStreamIndex);
2074 SendSourceTerminationSignalIfException sentry(actReg_.get());
2075 input_->readEvent(
event, streamContext);
2076 sentry.completedSuccessfully();
2078 FDEBUG(1) <<
"\treadEvent\n";
2081 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2082 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2086 rng->postEventRead(ev);
2088 assert(pep->luminosityBlockPrincipalPtrValid());
2089 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2090 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2098 schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2099 if(hasSubProcesses()) {
2100 for(
auto& subProcess : *subProcesses_) {
2101 subProcess.doEvent(*pep);
2108 bool randomAccess = input_->randomAccess();
2116 StreamContext streamContext(pep->streamID(), &processContext_);
2117 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2122 input_->skipEvents(-2);
2130 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2134 FDEBUG(1) <<
"\tprocessEvent\n";
2135 pep->clearEventPrincipal();
2138 bool EventProcessor::shouldWeStop()
const {
2139 FDEBUG(1) <<
"\tshouldWeStop\n";
2140 if(shouldWeStop_)
return true;
2141 if(hasSubProcesses()) {
2142 for(
auto const& subProcess : *subProcesses_) {
2143 if(subProcess.terminate()) {
2149 return schedule_->terminate();
2153 exceptionMessageFiles_ =
message;
2157 exceptionMessageRuns_ =
message;
2161 exceptionMessageLumis_ =
message;
2164 bool EventProcessor::alreadyHandlingException()
const {
2165 return alreadyHandlingException_;
2168 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2169 if(iMachine.get() != 0) {
2170 if(!iMachine->terminated()) {
2171 forceLooperToEnd_ =
true;
2173 forceLooperToEnd_ =
false;
2176 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2178 if(iMachine->terminated()) {
2179 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< tbb::task * > m_waitTask
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< EventProcessor * > m_proc
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ParameterSetID id() const
dispatcher processEvent(e, inputTag, standby)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Timestamp const & endTime() const
unsigned int numberOfRuns() const
int numberOfForkedChildren_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
volatile std::atomic< bool > shutdown_flag
std::unique_ptr< std::vector< ParameterSet > > popSubProcessVParameterSet(ParameterSet ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< std::atomic< bool > * > m_finishedProcessingEvents
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
std::auto_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
bool continueAfterChildFailure_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::shared_ptr< edm::ParameterSet > parameterSet()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasSubProcesses() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
edm::ProcessHistoryID const & processHistoryID() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_