64 #include "boost/thread/xtime.hpp"
78 #include <sys/types.h>
80 #include <sys/socket.h>
81 #include <sys/select.h>
82 #include <sys/fcntl.h>
96 class SendSourceTerminationSignalIfException {
100 ~SendSourceTerminationSignalIfException() {
105 void completedSuccessfully() {
117 std::unique_ptr<InputSource>
120 std::shared_ptr<ProductRegistry>
preg,
121 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
122 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
123 std::shared_ptr<ActivityRegistry>
areg,
127 if(main_input == 0) {
129 <<
"There must be exactly one source in the configuration.\n"
130 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
135 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
138 filler->fill(descriptions);
142 descriptions.validate(*main_input,
std::string(
"source"));
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 areg->preSourceConstructionSignal_(md);
171 std::unique_ptr<InputSource>
input;
174 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
177 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
178 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
182 std::ostringstream ost;
183 ost <<
"Constructing input source of type " << modtype;
191 std::shared_ptr<EDLooperBase>
195 std::shared_ptr<EDLooperBase> vLooper;
197 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
199 if(loopers.size() == 0) {
203 assert(1 == loopers.size());
205 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
222 std::vector<std::string>
const& defaultServices,
223 std::vector<std::string>
const& forcedServices) :
226 branchIDListHelper_(),
229 espController_(new eventsetup::EventSetupsController),
232 processConfiguration_(),
238 deferredExceptionPtrIsSet_(
false),
240 beginJobCalled_(
false),
241 shouldWeStop_(
false),
242 stateMachineWasInErrorState_(
false),
245 exceptionMessageFiles_(),
246 exceptionMessageRuns_(),
247 exceptionMessageLumis_(),
248 alreadyHandlingException_(
false),
249 forceLooperToEnd_(
false),
250 looperBeginJobRun_(
false),
251 forceESCacheClearOnNewRun_(
false),
252 numberOfForkedChildren_(0),
253 numberOfSequentialEventsPerChild_(1),
254 setCpuAffinity_(
false),
255 eventSetupDataToExcludeFromPrefetching_() {
257 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
258 processDesc->addServices(defaultServices, forcedServices);
259 init(processDesc, iToken, iLegacy);
263 std::vector<std::string>
const& defaultServices,
264 std::vector<std::string>
const& forcedServices) :
267 branchIDListHelper_(),
270 espController_(new eventsetup::EventSetupsController),
273 processConfiguration_(),
279 deferredExceptionPtrIsSet_(
false),
281 beginJobCalled_(
false),
282 shouldWeStop_(
false),
283 stateMachineWasInErrorState_(
false),
286 exceptionMessageFiles_(),
287 exceptionMessageRuns_(),
288 exceptionMessageLumis_(),
289 alreadyHandlingException_(
false),
290 forceLooperToEnd_(
false),
291 looperBeginJobRun_(
false),
292 forceESCacheClearOnNewRun_(
false),
293 numberOfForkedChildren_(0),
294 numberOfSequentialEventsPerChild_(1),
295 setCpuAffinity_(
false),
296 asyncStopRequestedWhileProcessingEvents_(
false),
297 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
298 eventSetupDataToExcludeFromPrefetching_()
301 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
302 processDesc->addServices(defaultServices, forcedServices);
311 branchIDListHelper_(),
314 espController_(new eventsetup::EventSetupsController),
317 processConfiguration_(),
323 deferredExceptionPtrIsSet_(
false),
325 beginJobCalled_(
false),
326 shouldWeStop_(
false),
327 stateMachineWasInErrorState_(
false),
330 exceptionMessageFiles_(),
331 exceptionMessageRuns_(),
332 exceptionMessageLumis_(),
333 alreadyHandlingException_(
false),
334 forceLooperToEnd_(
false),
335 looperBeginJobRun_(
false),
336 forceESCacheClearOnNewRun_(
false),
337 numberOfForkedChildren_(0),
338 numberOfSequentialEventsPerChild_(1),
339 setCpuAffinity_(
false),
340 asyncStopRequestedWhileProcessingEvents_(
false),
341 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
342 eventSetupDataToExcludeFromPrefetching_()
344 init(processDesc, token, legacy);
351 branchIDListHelper_(),
354 espController_(new eventsetup::EventSetupsController),
357 processConfiguration_(),
363 deferredExceptionPtrIsSet_(
false),
365 beginJobCalled_(
false),
366 shouldWeStop_(
false),
367 stateMachineWasInErrorState_(
false),
370 exceptionMessageFiles_(),
371 exceptionMessageRuns_(),
372 exceptionMessageLumis_(),
373 alreadyHandlingException_(
false),
374 forceLooperToEnd_(
false),
375 looperBeginJobRun_(
false),
376 forceESCacheClearOnNewRun_(
false),
377 numberOfForkedChildren_(0),
378 numberOfSequentialEventsPerChild_(1),
379 setCpuAffinity_(
false),
380 asyncStopRequestedWhileProcessingEvents_(
false),
381 nextItemTypeFromProcessingEvents_(
InputSource::IsEvent),
382 eventSetupDataToExcludeFromPrefetching_()
386 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
390 auto processDesc = std::make_shared<ProcessDesc>(
config);
408 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
412 bool const hasSubProcesses = !subProcessVParameterSet.empty();
420 unsigned int nThreads=1;
421 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
422 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
430 unsigned int nStreams =1;
431 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
432 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
437 edm::LogInfo(
"StreamSetup") <<
"setting # streams "<<nStreams;
442 unsigned int nConcurrentRuns =1;
448 unsigned int nConcurrentLumis =1;
474 std::vector<ParameterSet>
const& excluded = forking.
getUntrackedParameterSetVector(
"eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
475 for(
auto const& ps : excluded) {
477 ps.getUntrackedParameter<
std::string>(
"label",
""));
487 auto& serviceSets = processDesc->getServicesPSets();
496 handler->willBeUsingThreads();
500 std::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
542 FDEBUG(2) << parameterSet << std::endl;
554 for(
auto& subProcessPSet : subProcessVParameterSet) {
601 actReg_->preallocateSignal_(bounds);
628 ex.
addContext(
"Calling beginJob for the source");
634 actReg_->postBeginJobSignal_();
645 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
654 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
658 c.
call([actReg](){actReg->preEndJobSignal_();});
667 c.
call([actReg](){actReg->postEndJobSignal_();});
682 volatile bool child_failed =
false;
683 volatile unsigned int num_children_done = 0;
684 volatile int child_fail_exit_status = 0;
685 volatile int child_fail_signal = 0;
691 void ep_sigchld(
int, siginfo_t*,
void*) {
695 pid_t
p = waitpid(-1, &stat_loc, WNOHANG);
698 if(WIFEXITED(stat_loc)) {
700 if(0 != WEXITSTATUS(stat_loc)) {
701 child_fail_exit_status = WEXITSTATUS(stat_loc);
705 if(WIFSIGNALED(stat_loc)) {
707 child_fail_signal = WTERMSIG(stat_loc);
710 p = waitpid(-1, &stat_loc, WNOHANG);
725 unsigned int numberOfDigitsInChildIndex(
unsigned int numberOfChildren) {
727 while(numberOfChildren != 0) {
729 numberOfChildren /= 10;
739 class MessageSenderToSource {
741 MessageSenderToSource(std::vector<int>
const& childrenSockets, std::vector<int>
const& childrenPipes,
long iNEventsToProcess);
745 const std::vector<int>& m_childrenPipes;
746 long const m_nEventsToProcess;
748 unsigned int m_aliveChildren;
752 MessageSenderToSource::MessageSenderToSource(std::vector<int>
const& childrenSockets,
753 std::vector<int>
const& childrenPipes,
754 long iNEventsToProcess):
755 m_childrenPipes(childrenPipes),
756 m_nEventsToProcess(iNEventsToProcess),
757 m_aliveChildren(childrenSockets.
size()),
760 FD_ZERO(&m_socketSet);
761 for (
auto const socket : childrenSockets) {
762 FD_SET(socket, &m_socketSet);
763 if (socket > m_maxFd) {
767 for (
auto const pipe : childrenPipes) {
768 FD_SET(
pipe, &m_socketSet);
769 if (
pipe > m_maxFd) {
793 MessageSenderToSource::operator()() {
794 multicore::MessageForParent childMsg;
795 LogInfo(
"ForkingController") <<
"I am controller";
798 multicore::MessageForSource sndmsg;
799 sndmsg.startIndex = 0;
800 sndmsg.nIndices = m_nEventsToProcess;
803 fd_set readSockets, errorSockets;
805 memcpy(&readSockets, &m_socketSet,
sizeof(m_socketSet));
806 memcpy(&errorSockets, &m_socketSet,
sizeof(m_socketSet));
809 while (((rc =
select(m_maxFd, &readSockets,
NULL, &errorSockets,
NULL)) < 0) && (errno == EINTR)) {}
811 std::cerr <<
"select failed; should be impossible due to preconditions.\n";
817 for (
int idx=0; idx<m_maxFd; idx++) {
820 if (FD_ISSET(idx, &errorSockets)) {
821 LogInfo(
"ForkingController") <<
"Error on socket " << idx;
822 FD_CLR(idx, &m_socketSet);
825 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
833 if (!FD_ISSET(idx, &readSockets)) {
839 bool is_pipe =
false;
840 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
844 while (((rc =
read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
847 FD_CLR(idx, &m_socketSet);
855 while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
857 FD_CLR(idx, &m_socketSet);
866 while (((rc = send(idx, (
char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
868 FD_CLR(idx, &m_socketSet);
873 sndmsg.startIndex += sndmsg.nIndices;
877 }
while (m_aliveChildren > 0);
885 void EventProcessor::possiblyContinueAfterForkChildFailure() {
886 if(child_failed && continueAfterChildFailure_) {
887 if (child_fail_signal) {
888 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
890 }
else if (child_fail_exit_status) {
891 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
892 child_fail_exit_status=0;
894 LogSystem(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
903 if(0 == numberOfForkedChildren_) {
return true;}
904 assert(0<numberOfForkedChildren_);
912 itemType = input_->nextItemType();
914 assert(itemType == InputSource::IsFile);
918 itemType = input_->nextItemType();
919 assert(itemType == InputSource::IsRun);
921 LogSystem(
"ForkingEventSetupPreFetching") <<
" prefetching for run " << input_->runAuxiliary()->run();
923 input_->runAuxiliary()->beginTime());
924 espController_->eventSetupForInstance(ts);
928 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
930 std::vector<eventsetup::DataKey> dataKeys;
931 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
936 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.
find(itKey->type().name());
938 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
939 excludedData = &(itExcludeRec->second);
940 if(excludedData->size() == 0 || excludedData->begin()->first ==
"*") {
948 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
949 itDataKey != itDataKeyEnd;
952 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
953 LogInfo(
"ForkingEventSetupPreFetching") <<
" excluding:" << itDataKey->type().name() <<
" " << itDataKey->name().value() << std::endl;
957 recordPtr->
doGet(*itDataKey);
965 LogSystem(
"ForkingEventSetupPreFetching") <<
" done prefetching";
970 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
973 actReg_->preForkReleaseResourcesSignal_();
974 input_->doPreForkReleaseResources();
975 schedule_->preForkReleaseResources();
980 unsigned int childIndex = 0;
981 unsigned int const kMaxChildren = numberOfForkedChildren_;
982 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
983 std::vector<pid_t> childrenIds;
984 childrenIds.reserve(kMaxChildren);
985 std::vector<int> childrenSockets;
986 childrenSockets.reserve(kMaxChildren);
987 std::vector<int> childrenPipes;
988 childrenPipes.reserve(kMaxChildren);
989 std::vector<int> childrenSocketsCopy;
990 childrenSocketsCopy.reserve(kMaxChildren);
991 std::vector<int> childrenPipesCopy;
992 childrenPipesCopy.reserve(kMaxChildren);
999 int sockets[2], fd_flags;
1000 for(; childIndex < kMaxChildren; ++childIndex) {
1002 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1003 printf(
"Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1007 printf(
"Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1011 if ((fd_flags = fcntl(sockets[1], F_GETFD,
NULL)) == -1) {
1012 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1017 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC |
O_NONBLOCK) == -1) {
1018 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1021 if ((fd_flags = fcntl(pipes[1], F_GETFD,
NULL)) == -1) {
1022 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1025 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1026 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1031 if ((fd_flags = fcntl(pipes[0], F_GETFD,
NULL)) == -1) {
1032 printf(
"Failed to get fd flags: %d %s\n", errno, strerror(errno));
1035 if (fcntl(pipes[0], F_SETFD, fd_flags |
O_NONBLOCK) == -1) {
1036 printf(
"Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1040 childrenPipesCopy = childrenPipes;
1041 childrenSocketsCopy = childrenSockets;
1043 pid_t
value = fork();
1049 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1052 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1059 std::stringstream stout;
1060 stout <<
"redirectout_" << getpgrp() <<
"_" << std::setw(numberOfDigitsInIndex) << std::setfill(
'0') << childIndex <<
".log";
1061 if(0 == freopen(stout.str().c_str(),
"w", stdout)) {
1062 LogError(
"ForkingStdOutRedirect") <<
"Error during freopen of child process "<< childIndex;
1064 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1065 LogError(
"ForkingStdOutRedirect") <<
"Error during dup2 of child process"<< childIndex;
1068 LogInfo(
"ForkingChild") <<
"I am child " << childIndex <<
" with pgid " << getpgrp();
1069 if(setCpuAffinity_) {
1077 LogInfo(
"ForkingChildAffinity") <<
"Architecture support for CPU affinity not implemented.";
1079 LogInfo(
"ForkingChildAffinity") <<
"Setting CPU affinity, setting this child to cpu " << childIndex;
1082 CPU_SET(childIndex, &mask);
1083 if(sched_setaffinity(0,
sizeof(mask), &mask) != 0) {
1084 LogError(
"ForkingChildAffinity") <<
"Failed to set the cpu affinity, errno " << errno;
1096 LogError(
"ForkingChild") <<
"failed to create a child";
1099 childrenIds.push_back(value);
1100 childrenSockets.push_back(sockets[0]);
1101 childrenPipes.push_back(pipes[0]);
1104 if(childIndex < kMaxChildren) {
1105 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1106 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1108 auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1109 input_->doPostForkReacquireResources(receiver);
1110 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1115 jobReport->parentAfterFork(jobReportFile);
1125 sigset_t blockingSigSet;
1126 sigset_t unblockingSigSet;
1128 pthread_sigmask(SIG_SETMASK,
NULL, &unblockingSigSet);
1129 pthread_sigmask(SIG_SETMASK,
NULL, &blockingSigSet);
1130 sigaddset(&blockingSigSet, SIGCHLD);
1131 sigaddset(&blockingSigSet, SIGUSR2);
1132 sigaddset(&blockingSigSet, SIGINT);
1133 sigdelset(&unblockingSigSet, SIGCHLD);
1134 sigdelset(&unblockingSigSet, SIGUSR2);
1135 sigdelset(&unblockingSigSet, SIGINT);
1136 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1140 bool too_many_fds =
false;
1141 if (pipes[1]+1 > FD_SETSIZE) {
1142 LogError(
"ForkingFileDescriptors") <<
"too many file descriptors for multicore job";
1143 too_many_fds =
true;
1149 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1150 boost::thread senderThread(sender);
1152 if(not too_many_fds) {
1155 possiblyContinueAfterForkChildFailure();
1156 while(!
shutdown_flag && (!child_failed
or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1157 sigsuspend(&unblockingSigSet);
1158 possiblyContinueAfterForkChildFailure();
1159 LogInfo(
"ForkingAwake") <<
"woke from sigwait" << std::endl;
1162 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1164 LogInfo(
"ForkingStopping") <<
"num children who have already stopped " << num_children_done;
1166 LogError(
"ForkingStopping") <<
"child failed";
1169 LogSystem(
"ForkingStopping") <<
"asked to shutdown";
1172 if(too_many_fds ||
shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1173 LogInfo(
"ForkingStopping") <<
"must stop children" << std::endl;
1174 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1175 it != itEnd; ++it) {
1178 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1179 while(num_children_done != kMaxChildren) {
1180 sigsuspend(&unblockingSigSet);
1182 pthread_sigmask(SIG_SETMASK, &oldSigSet,
NULL);
1185 senderThread.join();
1186 if(child_failed && !continueAfterChildFailure_) {
1187 if (child_fail_signal) {
1188 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with signal " << child_fail_signal;
1189 }
else if (child_fail_exit_status) {
1190 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally with exit code " << child_fail_exit_status;
1192 throw cms::Exception(
"ForkedChildFailed") <<
"child process ended abnormally for unknown reason";
1196 throw cms::Exception(
"ForkedParentFailed") <<
"hit select limit for number of fds";
1201 std::vector<ModuleDescription const*>
1202 EventProcessor::getAllModuleDescriptions()
const {
1203 return schedule_->getAllModuleDescriptions();
1207 EventProcessor::totalEvents()
const {
1208 return schedule_->totalEvents();
1212 EventProcessor::totalEventsPassed()
const {
1213 return schedule_->totalEventsPassed();
1217 EventProcessor::totalEventsFailed()
const {
1218 return schedule_->totalEventsFailed();
1222 EventProcessor::enableEndPaths(
bool active) {
1223 schedule_->enableEndPaths(active);
1227 EventProcessor::endPathsEnabled()
const {
1228 return schedule_->endPathsEnabled();
1233 schedule_->getTriggerReport(rep);
1237 EventProcessor::clearCounters() {
1238 schedule_->clearCounters();
1242 std::unique_ptr<statemachine::Machine>
1243 EventProcessor::createStateMachine() {
1250 << fileMode_ <<
".\n"
1251 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1261 << emptyRunLumiMode_ <<
".\n"
1262 <<
"Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1265 auto machine = std::make_unique<statemachine::Machine>(
1270 machine->initiate();
1276 bool returnValue =
false;
1281 returnCode = epSignal;
1288 EventProcessor::runToCompletion() {
1291 asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1292 std::unique_ptr<statemachine::Machine> machine;
1297 stateMachineWasInErrorState_ =
false;
1302 machine = createStateMachine();
1303 nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1304 asyncStopRequestedWhileProcessingEvents_=
false;
1313 if(numberOfForkedChildren_ > 0) {
1314 size_t size = preg_->size();
1316 SendSourceTerminationSignalIfException sentry(actReg_.get());
1317 more = input_->skipForForking();
1318 sentry.completedSuccessfully();
1321 if(size < preg_->
size()) {
1322 principalCache_.adjustIndexesAfterProductRegistryAddition();
1324 principalCache_.adjustEventsToNewProductRegistry(
preg());
1328 SendSourceTerminationSignalIfException sentry(actReg_.get());
1329 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1330 sentry.completedSuccessfully();
1333 FDEBUG(1) <<
"itemType = " << itemType <<
"\n";
1335 if(checkForAsyncStopRequest(returnCode)) {
1336 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1337 forceLooperToEnd_ =
true;
1339 forceLooperToEnd_ =
false;
1343 if(itemType == InputSource::IsEvent) {
1345 if(asyncStopRequestedWhileProcessingEvents_) {
1346 forceLooperToEnd_ =
true;
1348 forceLooperToEnd_ =
false;
1349 returnCode = asyncStopStatusCodeFromProcessingEvents_;
1352 itemType = nextItemTypeFromProcessingEvents_;
1355 if(itemType == InputSource::IsEvent) {
1357 else if(itemType == InputSource::IsStop) {
1360 else if(itemType == InputSource::IsFile) {
1363 else if(itemType == InputSource::IsRun) {
1364 machine->process_event(
statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1366 else if(itemType == InputSource::IsLumi) {
1369 else if(itemType == InputSource::IsSynchronize) {
1375 <<
"Unknown next item type passed to EventProcessor\n"
1376 <<
"Please report this error to the Framework group\n";
1378 if(machine->terminated()) {
1430 alreadyHandlingException_ =
true;
1432 alreadyHandlingException_ =
false;
1433 if (!exceptionMessageLumis_.empty()) {
1436 LogAbsolute(
"Additional Exceptions") << exceptionMessageLumis_;
1439 if (!exceptionMessageRuns_.empty()) {
1442 LogAbsolute(
"Additional Exceptions") << exceptionMessageRuns_;
1445 if (!exceptionMessageFiles_.empty()) {
1448 LogAbsolute(
"Additional Exceptions") << exceptionMessageFiles_;
1454 if(machine->terminated()) {
1455 FDEBUG(1) <<
"The state machine reports it has been terminated\n";
1459 if(stateMachineWasInErrorState_) {
1461 <<
"The boost state machine in the EventProcessor exited after\n"
1462 <<
"entering the Error state.\n";
1466 if(machine.get() !=
nullptr) {
1469 <<
"State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1470 <<
"Please report this error to the Framework group\n";
1476 void EventProcessor::readFile() {
1477 FDEBUG(1) <<
" \treadFile\n";
1478 size_t size = preg_->size();
1479 SendSourceTerminationSignalIfException sentry(actReg_.get());
1481 fb_ = input_->readFile();
1482 if(size < preg_->
size()) {
1483 principalCache_.adjustIndexesAfterProductRegistryAddition();
1485 principalCache_.adjustEventsToNewProductRegistry(
preg());
1486 if((numberOfForkedChildren_ > 0)
or
1487 (preallocations_.numberOfStreams()>1 and
1488 preallocations_.numberOfThreads()>1)) {
1489 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1491 sentry.completedSuccessfully();
1494 void EventProcessor::closeInputFile(
bool cleaningUpAfterException) {
1495 if (fb_.get() !=
nullptr) {
1496 SendSourceTerminationSignalIfException sentry(actReg_.get());
1497 input_->closeFile(fb_.get(), cleaningUpAfterException);
1498 sentry.completedSuccessfully();
1500 FDEBUG(1) <<
"\tcloseInputFile\n";
1503 void EventProcessor::openOutputFiles() {
1504 if (fb_.get() !=
nullptr) {
1505 schedule_->openOutputFiles(*fb_);
1506 for_all(subProcesses_, [
this](
auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1508 FDEBUG(1) <<
"\topenOutputFiles\n";
1511 void EventProcessor::closeOutputFiles() {
1512 if (fb_.get() !=
nullptr) {
1513 schedule_->closeOutputFiles();
1514 for_all(subProcesses_, [](
auto& subProcess){ subProcess.closeOutputFiles(); });
1516 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1519 void EventProcessor::respondToOpenInputFile() {
1520 for_all(subProcesses_, [
this](
auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1521 if (fb_.get() !=
nullptr) {
1522 schedule_->respondToOpenInputFile(*fb_);
1523 for_all(subProcesses_, [
this](
auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1525 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1528 void EventProcessor::respondToCloseInputFile() {
1529 if (fb_.get() !=
nullptr) {
1530 schedule_->respondToCloseInputFile(*fb_);
1531 for_all(subProcesses_, [
this](
auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1533 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1536 void EventProcessor::startingNewLoop() {
1537 shouldWeStop_ =
false;
1540 if(looper_ && looperBeginJobRun_) {
1541 looper_->doStartingNewLoop();
1543 FDEBUG(1) <<
"\tstartingNewLoop\n";
1546 bool EventProcessor::endOfLoop() {
1549 looper_->setModuleChanger(&changer);
1551 looper_->setModuleChanger(
nullptr);
1552 if(status != EDLooperBase::kContinue || forceLooperToEnd_)
return true;
1555 FDEBUG(1) <<
"\tendOfLoop\n";
1559 void EventProcessor::rewindInput() {
1562 FDEBUG(1) <<
"\trewind\n";
1565 void EventProcessor::prepareForNextLoop() {
1566 looper_->prepareForNextLoop(esp_.get());
1567 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1570 bool EventProcessor::shouldWeCloseOutput()
const {
1571 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1572 if(!subProcesses_.empty()) {
1573 for(
auto const& subProcess : subProcesses_) {
1574 if(subProcess.shouldWeCloseOutput()) {
1580 return schedule_->shouldWeCloseOutput();
1583 void EventProcessor::doErrorStuff() {
1584 FDEBUG(1) <<
"\tdoErrorStuff\n";
1586 <<
"The EventProcessor state machine encountered an unexpected event\n"
1587 <<
"and went to the error state\n"
1588 <<
"Will attempt to terminate processing normally\n"
1589 <<
"(IF using the looper the next loop will be attempted)\n"
1590 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1591 stateMachineWasInErrorState_ =
true;
1597 SendSourceTerminationSignalIfException sentry(actReg_.get());
1599 input_->doBeginRun(runPrincipal, &processContext_);
1600 sentry.completedSuccessfully();
1605 if(forceESCacheClearOnNewRun_){
1606 espController_->forceCacheClear();
1609 SendSourceTerminationSignalIfException sentry(actReg_.get());
1610 espController_->eventSetupForInstance(ts);
1611 sentry.completedSuccessfully();
1614 if(looper_ && looperBeginJobRun_==
false) {
1616 looper_->beginOfJob(es);
1617 looperBeginJobRun_ =
true;
1618 looper_->doStartingNewLoop();
1622 schedule_->processOneGlobal<Traits>(runPrincipal, es);
1623 for_all(subProcesses_, [&runPrincipal, &ts](
auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
1627 looper_->doBeginRun(runPrincipal, es, &processContext_);
1631 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1632 schedule_->processOneStream<Traits>(
i,runPrincipal, es);
1633 for_all(subProcesses_, [i, &runPrincipal, &ts](
auto& subProcess){ subProcess.doStreamBeginRun(i, runPrincipal, ts); });
1645 SendSourceTerminationSignalIfException sentry(actReg_.get());
1647 runPrincipal.
setEndTime(input_->timestamp());
1649 input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1650 sentry.completedSuccessfully();
1653 IOVSyncValue ts(
EventID(runPrincipal.
run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1656 SendSourceTerminationSignalIfException sentry(actReg_.get());
1657 espController_->eventSetupForInstance(ts);
1658 sentry.completedSuccessfully();
1662 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1664 schedule_->processOneStream<Traits>(
i,runPrincipal, es, cleaningUpAfterException);
1665 for_all(subProcesses_, [i, &runPrincipal, &ts, cleaningUpAfterException](
auto& subProcess) { subProcess.doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1675 schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1676 for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1680 looper_->doEndRun(runPrincipal, es, &processContext_);
1687 SendSourceTerminationSignalIfException sentry(actReg_.get());
1689 input_->doBeginLumi(lumiPrincipal, &processContext_);
1690 sentry.completedSuccessfully();
1696 rng->preBeginLumi(lb);
1703 SendSourceTerminationSignalIfException sentry(actReg_.get());
1704 espController_->eventSetupForInstance(ts);
1705 sentry.completedSuccessfully();
1710 schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1711 for_all(subProcesses_, [&lumiPrincipal, &ts](
auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
1713 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1715 looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1718 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1720 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es);
1721 for_all(subProcesses_, [i, &lumiPrincipal, &ts](
auto& subProcess){ subProcess.doStreamBeginLuminosityBlock(i,lumiPrincipal, ts); });
1724 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1733 SendSourceTerminationSignalIfException sentry(actReg_.get());
1735 lumiPrincipal.
setEndTime(input_->timestamp());
1737 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1738 sentry.completedSuccessfully();
1745 SendSourceTerminationSignalIfException sentry(actReg_.get());
1746 espController_->eventSetupForInstance(ts);
1747 sentry.completedSuccessfully();
1751 for(
unsigned int i=0;
i<preallocations_.numberOfStreams();++
i) {
1753 schedule_->processOneStream<Traits>(
i,lumiPrincipal, es, cleaningUpAfterException);
1754 for_all(subProcesses_, [i, &lumiPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){ subProcess.doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException); });
1757 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1763 schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1764 for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](
auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1766 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1768 looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1773 if (principalCache_.hasRunPrincipal()) {
1775 <<
"EventProcessor::readRun\n"
1776 <<
"Illegal attempt to insert run into cache\n"
1777 <<
"Contact a Framework Developer\n";
1779 auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
preg(), *processConfiguration_, historyAppender_.get(), 0);
1781 SendSourceTerminationSignalIfException sentry(actReg_.get());
1782 input_->readRun(*rp, *historyAppender_);
1783 sentry.completedSuccessfully();
1785 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1786 principalCache_.insert(rp);
1791 principalCache_.merge(input_->runAuxiliary(),
preg());
1792 auto runPrincipal =principalCache_.runPrincipalPtr();
1794 SendSourceTerminationSignalIfException sentry(actReg_.get());
1795 input_->readAndMergeRun(*runPrincipal);
1796 sentry.completedSuccessfully();
1798 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1799 return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1802 int EventProcessor::readLuminosityBlock() {
1803 if (principalCache_.hasLumiPrincipal()) {
1805 <<
"EventProcessor::readRun\n"
1806 <<
"Illegal attempt to insert lumi into cache\n"
1807 <<
"Contact a Framework Developer\n";
1809 if (!principalCache_.hasRunPrincipal()) {
1811 <<
"EventProcessor::readRun\n"
1812 <<
"Illegal attempt to insert lumi into cache\n"
1813 <<
"Run is invalid\n"
1814 <<
"Contact a Framework Developer\n";
1816 auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(),
preg(), *processConfiguration_, historyAppender_.get(), 0);
1818 SendSourceTerminationSignalIfException sentry(actReg_.get());
1819 input_->readLuminosityBlock(*lbp, *historyAppender_);
1820 sentry.completedSuccessfully();
1822 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1823 principalCache_.insert(lbp);
1824 return input_->luminosityBlock();
1827 int EventProcessor::readAndMergeLumi() {
1828 principalCache_.merge(input_->luminosityBlockAuxiliary(),
preg());
1830 SendSourceTerminationSignalIfException sentry(actReg_.get());
1831 input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1832 sentry.completedSuccessfully();
1834 return input_->luminosityBlock();
1850 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1851 for_all(subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1852 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1856 principalCache_.deleteLumi(phid, run, lumi);
1857 for_all(subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1858 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1864 unsigned int iStreamIndex,
1865 std::atomic<bool>* iFinishedProcessingEvents,
1866 tbb::task* iWaitTask):
1868 m_streamID(iStreamIndex),
1869 m_finishedProcessingEvents(iFinishedProcessingEvents),
1870 m_waitTask(iWaitTask){}
1873 m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1874 m_waitTask->decrement_ref_count();
1884 void EventProcessor::processEventsForStreamAsync(
unsigned int iStreamIndex,
1885 std::atomic<bool>* finishedProcessingEvents) {
1889 if(preallocations_.numberOfStreams()>1) {
1891 handler->initializeThisThreadForUse();
1894 if(iStreamIndex==0) {
1898 if(shouldWeStop()) {
1901 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1911 std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1913 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1919 auto sr = input_->resourceSharedWithDelayedReader().second;
1920 std::unique_lock<std::recursive_mutex> delayedReaderGuard;
1922 delayedReaderGuard = std::unique_lock<std::recursive_mutex>(*sr);
1925 if (InputSource::IsEvent !=itemType) {
1926 nextItemTypeFromProcessingEvents_ = itemType;
1927 finishedProcessingEvents->store(
true,std::memory_order_release);
1931 if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1933 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1936 readEvent(iStreamIndex);
1939 if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1942 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1949 bool expected =
false;
1950 if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,
true)) {
1951 deferredExceptionPtr_ = std::current_exception();
1957 void EventProcessor::readAndProcessEvent() {
1958 if(numberOfForkedChildren_>0) {
1963 nextItemTypeFromProcessingEvents_ = InputSource::IsEvent;
1964 asyncStopRequestedWhileProcessingEvents_ =
false;
1966 std::atomic<bool> finishedProcessingEvents{
false};
1972 tbb::task* eventLoopWaitTask{
new (tbb::task::allocate_root()) tbb::empty_task{}};
1973 eventLoopWaitTask->increment_ref_count();
1975 const unsigned int kNumStreams = preallocations_.numberOfStreams();
1976 unsigned int iStreamIndex = 0;
1977 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1978 eventLoopWaitTask->increment_ref_count();
1979 tbb::task::enqueue( *(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1982 eventLoopWaitTask->increment_ref_count();
1983 eventLoopWaitTask->spawn_and_wait_for_all(*(
new (tbb::task::allocate_root())
StreamProcessingTask{
this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1987 if(deferredExceptionPtrIsSet_) {
1988 std::rethrow_exception(deferredExceptionPtr_);
1991 void EventProcessor::readEvent(
unsigned int iStreamIndex) {
1993 auto&
event = principalCache_.eventPrincipal(iStreamIndex);
1996 SendSourceTerminationSignalIfException sentry(actReg_.get());
1997 input_->readEvent(
event, streamContext);
1998 sentry.completedSuccessfully();
2000 FDEBUG(1) <<
"\treadEvent\n";
2003 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2004 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2008 rng->postEventRead(ev);
2010 assert(pep->luminosityBlockPrincipalPtrValid());
2011 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2012 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2019 schedule_->processOneEvent(iStreamIndex,*pep, es);
2020 for_all(subProcesses_, [pep](
auto& subProcess) { subProcess.doEvent(*pep); });
2025 bool randomAccess = input_->randomAccess();
2033 StreamContext streamContext(pep->streamID(), &processContext_);
2034 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2039 input_->skipEvents(-2);
2047 if(status != EDLooperBase::kContinue) shouldWeStop_ =
true;
2051 FDEBUG(1) <<
"\tprocessEvent\n";
2052 pep->clearEventPrincipal();
2055 bool EventProcessor::shouldWeStop()
const {
2056 FDEBUG(1) <<
"\tshouldWeStop\n";
2057 if(shouldWeStop_)
return true;
2058 if(!subProcesses_.empty()) {
2059 for(
auto const& subProcess : subProcesses_) {
2060 if(subProcess.terminate()) {
2066 return schedule_->terminate();
2070 exceptionMessageFiles_ =
message;
2074 exceptionMessageRuns_ =
message;
2078 exceptionMessageLumis_ =
message;
2081 bool EventProcessor::alreadyHandlingException()
const {
2082 return alreadyHandlingException_;
2085 void EventProcessor::terminateMachine(std::unique_ptr<statemachine::Machine> iMachine) {
2086 if(iMachine.get() !=
nullptr) {
2087 if(!iMachine->terminated()) {
2088 forceLooperToEnd_ =
true;
2090 forceLooperToEnd_ =
false;
2093 FDEBUG(1) <<
"EventProcess::terminateMachine The state machine was already terminated \n";
2095 if(iMachine->terminated()) {
2096 FDEBUG(1) <<
"The state machine reports it has been terminated (3)\n";
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
virtual char const * what() const
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< tbb::task * > m_waitTask
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
edm::propagate_const< EventProcessor * > m_proc
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::Service< edm::RandomNumberGenerator > rng
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ParameterSetID id() const
dispatcher processEvent(e, inputTag, standby)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Timestamp const & endTime() const
unsigned int numberOfRuns() const
int numberOfForkedChildren_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< std::atomic< bool > * > m_finishedProcessingEvents
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
bool continueAfterChildFailure_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::shared_ptr< edm::ParameterSet > parameterSet()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
void clear()
Not thread safe.
Timestamp const & endTime() const
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
bool forceESCacheClearOnNewRun_
edm::RunNumber_t runNumber() const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
edm::ProcessHistoryID const & processHistoryID() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_