CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
38 
40 
48 
53 
55 
67 
68 #include "MessageForSource.h"
69 #include "MessageForParent.h"
70 
71 #include "boost/thread/xtime.hpp"
72 #include "boost/range/adaptor/reversed.hpp"
73 
74 #include <exception>
75 #include <iomanip>
76 #include <iostream>
77 #include <utility>
78 #include <sstream>
79 
80 #include <sys/ipc.h>
81 #include <sys/msg.h>
82 
83 #include "tbb/task.h"
84 
85 //Used for forking
86 #include <sys/types.h>
87 #include <sys/wait.h>
88 #include <sys/socket.h>
89 #include <sys/select.h>
90 #include <sys/fcntl.h>
91 #include <unistd.h>
92 
93 
94 //Used for CPU affinity
95 #ifndef __APPLE__
96 #include <sched.h>
97 #endif
98 
99 namespace {
100  //Sentry class to only send a signal if an
101  // exception occurs. An exception is identified
102  // by the destructor being called without first
103  // calling completedSuccessfully().
104  class SendSourceTerminationSignalIfException {
105  public:
106  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
107  reg_(iReg) {}
108  ~SendSourceTerminationSignalIfException() {
109  if(reg_) {
110  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
111  }
112  }
113  void completedSuccessfully() {
114  reg_ = nullptr;
115  }
116  private:
117  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
118  };
119 
120 }
121 
122 namespace edm {
123 
124  // ---------------------------------------------------------------
125  std::unique_ptr<InputSource>
127  CommonParams const& common,
128  std::shared_ptr<ProductRegistry> preg,
129  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
130  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
131  std::shared_ptr<ActivityRegistry> areg,
132  std::shared_ptr<ProcessConfiguration const> processConfiguration,
133  PreallocationConfiguration const& allocations) {
134  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
135  if(main_input == 0) {
137  << "There must be exactly one source in the configuration.\n"
138  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
139  }
140 
141  std::string modtype(main_input->getParameter<std::string>("@module_type"));
142 
143  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
145  ConfigurationDescriptions descriptions(filler->baseType());
146  filler->fill(descriptions);
147 
148  try {
149  convertException::wrap([&]() {
150  descriptions.validate(*main_input, std::string("source"));
151  });
152  }
153  catch (cms::Exception & iException) {
154  std::ostringstream ost;
155  ost << "Validating configuration of input source of type " << modtype;
156  iException.addContext(ost.str());
157  throw;
158  }
159 
160  main_input->registerIt();
161 
162  // Fill in "ModuleDescription", in case the input source produces
163  // any EDProducts, which would be registered in the ProductRegistry.
164  // Also fill in the process history item for this process.
165  // There is no module label for the unnamed input source, so
166  // just use "source".
167  // Only the tracked parameters belong in the process configuration.
168  ModuleDescription md(main_input->id(),
169  main_input->getParameter<std::string>("@module_type"),
170  "source",
171  processConfiguration.get(),
172  ModuleDescription::getUniqueID());
173 
174  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
175  common.maxEventsInput_, common.maxLumisInput_,
176  common.maxSecondsUntilRampdown_, allocations);
177 
178  areg->preSourceConstructionSignal_(md);
179  std::unique_ptr<InputSource> input;
180  try {
181  //even if we have an exception, send the signal
182  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
183  convertException::wrap([&]() {
184  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
185  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
186  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
187  });
188  }
189  catch (cms::Exception& iException) {
190  std::ostringstream ost;
191  ost << "Constructing input source of type " << modtype;
192  iException.addContext(ost.str());
193  throw;
194  }
195  return input;
196  }
197 
198  // ---------------------------------------------------------------
199  std::shared_ptr<EDLooperBase>
202  ParameterSet& params) {
203  std::shared_ptr<EDLooperBase> vLooper;
204 
205  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
206 
207  if(loopers.size() == 0) {
208  return vLooper;
209  }
210 
211  assert(1 == loopers.size());
212 
213  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
214  itName != itNameEnd;
215  ++itName) {
216 
217  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
218  providerPSet->registerIt();
219  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
220  cp,
221  *providerPSet);
222  }
223  return vLooper;
224  }
225 
226  // ---------------------------------------------------------------
227  EventProcessor::EventProcessor(std::string const& config,
228  ServiceToken const& iToken,
230  std::vector<std::string> const& defaultServices,
231  std::vector<std::string> const& forcedServices) :
232  actReg_(),
233  preg_(),
234  branchIDListHelper_(),
235  serviceToken_(),
236  input_(),
237  espController_(new eventsetup::EventSetupsController),
238  esp_(),
239  act_table_(),
240  processConfiguration_(),
241  schedule_(),
242  subProcesses_(),
243  historyAppender_(new HistoryAppender),
244  fb_(),
245  looper_(),
246  deferredExceptionPtrIsSet_(false),
247  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
248  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
249  principalCache_(),
250  beginJobCalled_(false),
251  shouldWeStop_(false),
252  stateMachineWasInErrorState_(false),
253  fileMode_(),
254  emptyRunLumiMode_(),
255  exceptionMessageFiles_(),
256  exceptionMessageRuns_(),
257  exceptionMessageLumis_(),
258  alreadyHandlingException_(false),
259  forceLooperToEnd_(false),
260  looperBeginJobRun_(false),
261  forceESCacheClearOnNewRun_(false),
262  numberOfForkedChildren_(0),
263  numberOfSequentialEventsPerChild_(1),
264  setCpuAffinity_(false),
265  eventSetupDataToExcludeFromPrefetching_() {
266  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
267  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
268  processDesc->addServices(defaultServices, forcedServices);
269  init(processDesc, iToken, iLegacy);
270  }
271 
273  std::vector<std::string> const& defaultServices,
274  std::vector<std::string> const& forcedServices) :
275  actReg_(),
276  preg_(),
278  serviceToken_(),
279  input_(),
280  espController_(new eventsetup::EventSetupsController),
281  esp_(),
282  act_table_(),
284  schedule_(),
285  subProcesses_(),
287  fb_(),
288  looper_(),
290  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
291  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
292  principalCache_(),
296  fileMode_(),
311  {
312  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
313  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
314  processDesc->addServices(defaultServices, forcedServices);
316  }
317 
318  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
319  ServiceToken const& token,
321  actReg_(),
322  preg_(),
324  serviceToken_(),
325  input_(),
326  espController_(new eventsetup::EventSetupsController),
327  esp_(),
328  act_table_(),
330  schedule_(),
331  subProcesses_(),
333  fb_(),
334  looper_(),
336  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
337  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
338  principalCache_(),
342  fileMode_(),
357  {
358  init(processDesc, token, legacy);
359  }
360 
361 
363  actReg_(),
364  preg_(),
366  serviceToken_(),
367  input_(),
368  espController_(new eventsetup::EventSetupsController),
369  esp_(),
370  act_table_(),
372  schedule_(),
373  subProcesses_(),
375  fb_(),
376  looper_(),
378  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
379  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
380  principalCache_(),
384  fileMode_(),
399  {
400  if(isPython) {
401  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
402  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
404  }
405  else {
406  auto processDesc = std::make_shared<ProcessDesc>(config);
408  }
409  }
410 
411  void
412  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
413  ServiceToken const& iToken,
415 
416  //std::cerr << processDesc->dump() << std::endl;
417 
418  // register the empty parentage vector , once and for all
420 
421  // register the empty parameter set, once and for all.
423 
424  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
425 
426  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
427  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
428  bool const hasSubProcesses = !subProcessVParameterSet.empty();
429 
430  // Now set some parameters specific to the main process.
431  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
432  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
433  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
434  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
435  //threading
436  unsigned int nThreads=1;
437  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
438  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
439  if(nThreads == 0) {
440  nThreads = 1;
441  }
442  }
443  /* TODO: when we support having each stream run in a different thread use this default
444  unsigned int nStreams =nThreads;
445  */
446  unsigned int nStreams =1;
447  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
448  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
449  if(nStreams==0) {
450  nStreams = nThreads;
451  }
452  }
453  if(nThreads >1) {
454  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
455  }
456 
457  /*
458  bool nRunsSet = false;
459  */
460  unsigned int nConcurrentRuns =1;
461  /*
462  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
463  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
464  }
465  */
466  unsigned int nConcurrentLumis =1;
467  /*
468  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
469  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
470  } else {
471  nConcurrentLumis = nConcurrentRuns;
472  }
473  */
474  //Check that relationships between threading parameters makes sense
475  /*
476  if(nThreads<nStreams) {
477  //bad
478  }
479  if(nConcurrentRuns>nStreams) {
480  //bad
481  }
482  if(nConcurrentRuns>nConcurrentLumis) {
483  //bad
484  }
485  */
486  //forking
487  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
488  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
489  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
490  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
491  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
492  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
493  for(auto const& ps : excluded) {
494  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
495  ps.getUntrackedParameter<std::string>("label", ""));
496  }
497  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
498 
499  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
500 
501  // Now do general initialization
503 
504  //initialize the services
505  auto& serviceSets = processDesc->getServicesPSets();
506  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
507  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
508 
509  //make the services available
511 
512  if(nStreams>1) {
514  handler->willBeUsingThreads();
515  }
516 
517  // intialize miscellaneous items
518  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
519 
520  // intialize the event setup provider
521  esp_ = espController_->makeProvider(*parameterSet);
522 
523  // initialize the looper, if any
524  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
525  if(looper_) {
526  looper_->setActionTable(items.act_table_.get());
527  looper_->attachTo(*items.actReg_);
528 
529  //For now loopers make us run only 1 transition at a time
530  nStreams=1;
531  nConcurrentLumis=1;
532  nConcurrentRuns=1;
533  }
534 
535  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
536 
537  // initialize the input source
538  input_ = makeInput(*parameterSet,
539  *common,
540  items.preg(),
541  items.branchIDListHelper(),
543  items.actReg_,
544  items.processConfiguration(),
546 
547  // intialize the Schedule
548  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
549 
550  // set the data members
552  actReg_ = items.actReg_;
553  preg_ = items.preg();
558  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
559 
560  FDEBUG(2) << parameterSet << std::endl;
561 
563  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
564  // Reusable event principal
565  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
568  }
569 
570  // fill the subprocesses, if there are any
571  subProcesses_.reserve(subProcessVParameterSet.size());
572  for(auto& subProcessPSet : subProcessVParameterSet) {
573  subProcesses_.emplace_back(subProcessPSet,
574  *parameterSet,
575  preg(),
580  *actReg_,
581  token,
584  &processContext_);
585  }
586  }
587 
589  // Make the services available while everything is being deleted.
590  ServiceToken token = getToken();
591  ServiceRegistry::Operate op(token);
592 
593  // manually destroy all these thing that may need the services around
594  // propagate_const<T> has no reset() function
595  espController_ = nullptr;
596  esp_ = nullptr;
597  schedule_ = nullptr;
598  input_ = nullptr;
599  looper_ = nullptr;
600  actReg_ = nullptr;
601 
604  }
605 
606  void
608  if(beginJobCalled_) return;
609  beginJobCalled_=true;
610  bk::beginJob();
611 
612  // StateSentry toerror(this); // should we add this ?
613  //make the services available
615 
620  actReg_->preallocateSignal_(bounds);
621  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
623 
624  //NOTE: this may throw
626  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
627 
628  //NOTE: This implementation assumes 'Job' means one call
629  // the EventProcessor::run
630  // If it really means once per 'application' then this code will
631  // have to be changed.
632  // Also have to deal with case where have 'run' then new Module
633  // added and do 'run'
634  // again. In that case the newly added Module needs its 'beginJob'
635  // to be called.
636 
637  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
638  // For now we delay calling beginOfJob until first beginOfRun
639  //if(looper_) {
640  // looper_->beginOfJob(es);
641  //}
642  try {
643  convertException::wrap([&]() {
644  input_->doBeginJob();
645  });
646  }
647  catch(cms::Exception& ex) {
648  ex.addContext("Calling beginJob for the source");
649  throw;
650  }
651  schedule_->beginJob(*preg_);
652  // toerror.succeeded(); // should we add this?
653  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
654  actReg_->postBeginJobSignal_();
655 
656  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
657  schedule_->beginStream(i);
658  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
659  }
660  }
661 
662  void
664  // Collects exceptions, so we don't throw before all operations are performed.
665  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
666 
667  //make the services available
669 
670  //NOTE: this really should go elsewhere in the future
671  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
672  c.call([this,i](){this->schedule_->endStream(i);});
673  for(auto& subProcess : subProcesses_) {
674  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
675  }
676  }
677  auto actReg = actReg_.get();
678  c.call([actReg](){actReg->preEndJobSignal_();});
679  schedule_->endJob(c);
680  for(auto& subProcess : subProcesses_) {
681  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
682  }
683  c.call(std::bind(&InputSource::doEndJob, input_.get()));
684  if(looper_) {
685  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
686  }
687  c.call([actReg](){actReg->postEndJobSignal_();});
688  if(c.hasThrown()) {
689  c.rethrow();
690  }
691  }
692 
695  return serviceToken_;
696  }
697 
698  //Setup signal handler to listen for when forked children stop
699  namespace {
700  //These are volatile since the compiler can not be allowed to optimize them
701  // since they can be modified in the signaller handler
702  volatile bool child_failed = false;
703  volatile unsigned int num_children_done = 0;
704  volatile int child_fail_exit_status = 0;
705  volatile int child_fail_signal = 0;
706 
707  //NOTE: We setup the signal handler to run in the main thread which
708  // is also the same thread that then reads the above values
709 
710  extern "C" {
711  void ep_sigchld(int, siginfo_t*, void*) {
712  //printf("in sigchld\n");
713  //FDEBUG(1) << "in sigchld handler\n";
714  int stat_loc;
715  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
716  while(0<p) {
717  //printf(" looping\n");
718  if(WIFEXITED(stat_loc)) {
719  ++num_children_done;
720  if(0 != WEXITSTATUS(stat_loc)) {
721  child_fail_exit_status = WEXITSTATUS(stat_loc);
722  child_failed = true;
723  }
724  }
725  if(WIFSIGNALED(stat_loc)) {
726  ++num_children_done;
727  child_fail_signal = WTERMSIG(stat_loc);
728  child_failed = true;
729  }
730  p = waitpid(-1, &stat_loc, WNOHANG);
731  }
732  }
733  }
734 
735  }
736 
737  enum {
742  };
743 
744  namespace {
745  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
746  unsigned int n = 0;
747  while(numberOfChildren != 0) {
748  ++n;
749  numberOfChildren /= 10;
750  }
751  if(n == 0) {
752  n = 3; // Protect against zero numberOfChildren
753  }
754  return n;
755  }
756 
757  /*This class embodied the thread which is used to listen to the forked children and
758  then tell them which events they should process */
759  class MessageSenderToSource {
760  public:
761  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
762  void operator()();
763 
764  private:
765  const std::vector<int>& m_childrenPipes;
766  long const m_nEventsToProcess;
767  fd_set m_socketSet;
768  unsigned int m_aliveChildren;
769  int m_maxFd;
770  };
771 
772  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
773  std::vector<int> const& childrenPipes,
774  long iNEventsToProcess):
775  m_childrenPipes(childrenPipes),
776  m_nEventsToProcess(iNEventsToProcess),
777  m_aliveChildren(childrenSockets.size()),
778  m_maxFd(0)
779  {
780  FD_ZERO(&m_socketSet);
781  for (auto const socket : childrenSockets) {
782  FD_SET(socket, &m_socketSet);
783  if (socket > m_maxFd) {
784  m_maxFd = socket;
785  }
786  }
787  for (auto const pipe : childrenPipes) {
788  FD_SET(pipe, &m_socketSet);
789  if (pipe > m_maxFd) {
790  m_maxFd = pipe;
791  }
792  }
793  m_maxFd++; // select reads [0,m_maxFd).
794  }
795 
796  /* This function is the heart of the communication between parent and child.
797  * When ready for more data, the child (see MessageReceiverForSource) requests
798  * data through a AF_UNIX socket message. The parent will then assign the next
799  * chunk of data by sending a message back.
800  *
801  * Additionally, this function also monitors the read-side of the pipe fd from the child.
802  * If the child dies unexpectedly, the pipe will be selected as ready for read and
803  * will return EPIPE when read from. Further, if the child thinks the parent has died
804  * (defined as waiting more than 1s for a response), it will write a single byte to
805  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
806  * If still alive, the parent will read the byte and ignore it.
807  *
808  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
809  * handler can distinguish between success and failure cases.
810  */
811 
812  void
813  MessageSenderToSource::operator()() {
815  LogInfo("ForkingController") << "I am controller";
816  //this is the master and therefore the controller
817 
819  sndmsg.startIndex = 0;
820  sndmsg.nIndices = m_nEventsToProcess;
821  do {
822 
823  fd_set readSockets, errorSockets;
824  // Wait for a request from a child for events.
825  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
826  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
827  // Note that we don't timeout; may be reconsidered in the future.
828  ssize_t rc;
829  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
830  if (rc < 0) {
831  std::cerr << "select failed; should be impossible due to preconditions.\n";
832  abort();
833  break;
834  }
835 
836  // Read the message from the child.
837  for (int idx=0; idx<m_maxFd; idx++) {
838 
839  // Handle errors
840  if (FD_ISSET(idx, &errorSockets)) {
841  LogInfo("ForkingController") << "Error on socket " << idx;
842  FD_CLR(idx, &m_socketSet);
843  close(idx);
844  // See if it was the watchdog pipe that died.
845  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
846  if (*it == idx) {
847  m_aliveChildren--;
848  }
849  }
850  continue;
851  }
852 
853  if (!FD_ISSET(idx, &readSockets)) {
854  continue;
855  }
856 
857  // See if this FD is a child watchdog pipe. If so, read from it to prevent
858  // writes from blocking.
859  bool is_pipe = false;
860  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
861  if (*it == idx) {
862  is_pipe = true;
863  char buf;
864  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
865  if (rc <= 0) {
866  m_aliveChildren--;
867  FD_CLR(idx, &m_socketSet);
868  close(idx);
869  }
870  }
871  }
872 
873  // Only execute this block if the FD is a socket for sending the child work.
874  if (!is_pipe) {
875  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
876  if (rc < 0) {
877  FD_CLR(idx, &m_socketSet);
878  close(idx);
879  continue;
880  }
881 
882  // Tell the child what events to process.
883  // If 'send' fails, then the child process has failed (any other possibilities are
884  // eliminated because we are using fixed-size messages with Unix datagram sockets).
885  // Thus, the SIGCHLD handler will fire and set child_fail = true.
886  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
887  if (rc < 0) {
888  FD_CLR(idx, &m_socketSet);
889  close(idx);
890  continue;
891  }
892  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
893  sndmsg.startIndex += sndmsg.nIndices;
894  }
895  }
896 
897  } while (m_aliveChildren > 0);
898 
899  return;
900  }
901 
902  }
903 
904 
906  if(child_failed && continueAfterChildFailure_) {
907  if (child_fail_signal) {
908  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
909  child_fail_signal=0;
910  } else if (child_fail_exit_status) {
911  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
912  child_fail_exit_status=0;
913  } else {
914  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
915  }
916  child_failed =false;
917  }
918  }
919 
920  bool
921  EventProcessor::forkProcess(std::string const& jobReportFile) {
922 
923  if(0 == numberOfForkedChildren_) {return true;}
924  assert(0<numberOfForkedChildren_);
925  //do what we want done in common
926  {
927  beginJob(); //make sure this was run
928  // make the services available
930 
931  InputSource::ItemType itemType;
932  itemType = input_->nextItemType();
933 
934  assert(itemType == InputSource::IsFile);
935  {
936  readFile();
937  }
938  itemType = input_->nextItemType();
939  assert(itemType == InputSource::IsRun);
940 
941  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
942  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
943  input_->runAuxiliary()->beginTime());
944  espController_->eventSetupForInstance(ts);
945  EventSetup const& es = esp_->eventSetup();
946 
947  //now get all the data available in the EventSetup
948  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
949  es.fillAvailableRecordKeys(recordKeys);
950  std::vector<eventsetup::DataKey> dataKeys;
951  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
952  itKey != itEnd;
953  ++itKey) {
954  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
955  //see if this is on our exclusion list
956  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
957  ExcludedData const* excludedData(nullptr);
958  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
959  excludedData = &(itExcludeRec->second);
960  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
961  //skip all items in this record
962  continue;
963  }
964  }
965  if(0 != recordPtr) {
966  dataKeys.clear();
967  recordPtr->fillRegisteredDataKeys(dataKeys);
968  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
969  itDataKey != itDataKeyEnd;
970  ++itDataKey) {
971  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
972  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
973  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
974  continue;
975  }
976  try {
977  recordPtr->doGet(*itDataKey);
978  } catch(cms::Exception& e) {
979  LogWarning("ForkingEventSetupPreFetching") << e.what();
980  }
981  }
982  }
983  }
984  }
985  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
986  {
987  // make the services available
989  Service<JobReport> jobReport;
990  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
991 
992  //Now actually do the forking
993  actReg_->preForkReleaseResourcesSignal_();
994  input_->doPreForkReleaseResources();
995  schedule_->preForkReleaseResources();
996  }
997  installCustomHandler(SIGCHLD, ep_sigchld);
998 
999 
1000  unsigned int childIndex = 0;
1001  unsigned int const kMaxChildren = numberOfForkedChildren_;
1002  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1003  std::vector<pid_t> childrenIds;
1004  childrenIds.reserve(kMaxChildren);
1005  std::vector<int> childrenSockets;
1006  childrenSockets.reserve(kMaxChildren);
1007  std::vector<int> childrenPipes;
1008  childrenPipes.reserve(kMaxChildren);
1009  std::vector<int> childrenSocketsCopy;
1010  childrenSocketsCopy.reserve(kMaxChildren);
1011  std::vector<int> childrenPipesCopy;
1012  childrenPipesCopy.reserve(kMaxChildren);
1013  int pipes[] {0, 0};
1014 
1015  {
1016  // make the services available
1018  Service<JobReport> jobReport;
1019  int sockets[2], fd_flags;
1020  for(; childIndex < kMaxChildren; ++childIndex) {
1021  // Create a UNIX_DGRAM socket pair
1022  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1023  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026  if (pipe(pipes)) {
1027  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1028  exit(EXIT_FAILURE);
1029  }
1030  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1031  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1032  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1033  exit(EXIT_FAILURE);
1034  }
1035  // Mark socket as non-block. Child must be careful to do select prior
1036  // to reading from socket.
1037  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1038  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1039  exit(EXIT_FAILURE);
1040  }
1041  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1042  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1043  exit(EXIT_FAILURE);
1044  }
1045  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1046  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1047  exit(EXIT_FAILURE);
1048  }
1049  // Linux man page notes there are some edge cases where reading from a
1050  // fd can block, even after a select.
1051  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1052  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1053  exit(EXIT_FAILURE);
1054  }
1055  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1056  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1057  exit(EXIT_FAILURE);
1058  }
1059 
1060  childrenPipesCopy = childrenPipes;
1061  childrenSocketsCopy = childrenSockets;
1062 
1063  pid_t value = fork();
1064  if(value == 0) {
1065  // Close the parent's side of the socket and pipe which will talk to us.
1066  close(pipes[0]);
1067  close(sockets[0]);
1068  // Close our copies of the parent's other communication pipes.
1069  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1070  close(*it);
1071  }
1072  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1073  close(*it);
1074  }
1075 
1076  // this is the child process, redirect stdout and stderr to a log file
1077  fflush(stdout);
1078  fflush(stderr);
1079  std::stringstream stout;
1080  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1081  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1082  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1083  }
1084  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1085  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1086  }
1087 
1088  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1089  if(setCpuAffinity_) {
1090  // CPU affinity is handled differently on macosx.
1091  // We disable it and print a message until someone reads:
1092  //
1093  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1094  //
1095  // and implements it.
1096 #ifdef __APPLE__
1097  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1098 #else
1099  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1100  cpu_set_t mask;
1101  CPU_ZERO(&mask);
1102  CPU_SET(childIndex, &mask);
1103  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1104  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1105  exit(-1);
1106  }
1107 #endif
1108  }
1109  break;
1110  } else {
1111  //this is the parent
1112  close(pipes[1]);
1113  close(sockets[1]);
1114  }
1115  if(value < 0) {
1116  LogError("ForkingChild") << "failed to create a child";
1117  exit(-1);
1118  }
1119  childrenIds.push_back(value);
1120  childrenSockets.push_back(sockets[0]);
1121  childrenPipes.push_back(pipes[0]);
1122  }
1123 
1124  if(childIndex < kMaxChildren) {
1125  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1126  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1127 
1128  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1129  input_->doPostForkReacquireResources(receiver);
1130  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1131  //NOTE: sources have to reset themselves by listening to the post fork message
1132  //rewindInput();
1133  return true;
1134  }
1135  jobReport->parentAfterFork(jobReportFile);
1136  }
1137 
1138  //this is the original, which is now the master for all the children
1139 
1140  //Need to wait for signals from the children or externally
1141  // To wait we must
1142  // 1) block the signals we want to wait on so we do not have a race condition
1143  // 2) check that we haven't already meet our ending criteria
1144  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1145  sigset_t blockingSigSet;
1146  sigset_t unblockingSigSet;
1147  sigset_t oldSigSet;
1148  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1149  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1150  sigaddset(&blockingSigSet, SIGCHLD);
1151  sigaddset(&blockingSigSet, SIGUSR2);
1152  sigaddset(&blockingSigSet, SIGINT);
1153  sigdelset(&unblockingSigSet, SIGCHLD);
1154  sigdelset(&unblockingSigSet, SIGUSR2);
1155  sigdelset(&unblockingSigSet, SIGINT);
1156  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1157 
1158  // If there are too many fd's (unlikely, but possible) for select, denote this
1159  // because the sender will fail.
1160  bool too_many_fds = false;
1161  if (pipes[1]+1 > FD_SETSIZE) {
1162  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1163  too_many_fds = true;
1164  }
1165 
1166  //create a thread that sends the units of work to workers
1167  // we create it after all signals were blocked so that this
1168  // thread is never interupted by a signal
1169  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1170  boost::thread senderThread(sender);
1171 
1172  if(not too_many_fds) {
1173  //NOTE: a child could have failed before we got here and even after this call
1174  // which is why the 'if' is conditional on continueAfterChildFailure_
1176  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1177  sigsuspend(&unblockingSigSet);
1179  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1180  }
1181  }
1182  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1183 
1184  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1185  if(child_failed) {
1186  LogError("ForkingStopping") << "child failed";
1187  }
1188  if(shutdown_flag) {
1189  LogSystem("ForkingStopping") << "asked to shutdown";
1190  }
1191 
1192  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1193  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1194  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1195  it != itEnd; ++it) {
1196  /* int result = */ kill(*it, SIGUSR2);
1197  }
1198  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1199  while(num_children_done != kMaxChildren) {
1200  sigsuspend(&unblockingSigSet);
1201  }
1202  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1203  }
1204  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1205  senderThread.join();
1206  if(child_failed && !continueAfterChildFailure_) {
1207  if (child_fail_signal) {
1208  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1209  } else if (child_fail_exit_status) {
1210  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1211  } else {
1212  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1213  }
1214  }
1215  if(too_many_fds) {
1216  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1217  }
1218  return false;
1219  }
1220 
1221  std::vector<ModuleDescription const*>
1223  return schedule_->getAllModuleDescriptions();
1224  }
1225 
1226  int
1228  return schedule_->totalEvents();
1229  }
1230 
1231  int
1233  return schedule_->totalEventsPassed();
1234  }
1235 
1236  int
1238  return schedule_->totalEventsFailed();
1239  }
1240 
1241  void
1243  schedule_->enableEndPaths(active);
1244  }
1245 
1246  bool
1248  return schedule_->endPathsEnabled();
1249  }
1250 
1251  void
1253  schedule_->getTriggerReport(rep);
1254  }
1255 
1256  void
1258  schedule_->clearCounters();
1259  }
1260 
1261 
1262  std::unique_ptr<statemachine::Machine>
1264  statemachine::FileMode fileMode;
1265  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1266  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1267  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1268  else {
1269  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1270  << fileMode_ << ".\n"
1271  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1272  }
1273 
1274  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1275  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1276  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1277  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1278  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1279  else {
1280  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1281  << emptyRunLumiMode_ << ".\n"
1282  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1283  }
1284 
1285  auto machine = std::make_unique<statemachine::Machine>(
1286  this,
1287  fileMode,
1288  emptyRunLumiMode);
1289 
1290  machine->initiate();
1291  return machine;
1292  }
1293 
1294  bool
1296  bool returnValue = false;
1297 
1298  // Look for a shutdown signal
1299  if(shutdown_flag.load(std::memory_order_acquire)) {
1300  returnValue = true;
1301  returnCode = epSignal;
1302  }
1303  return returnValue;
1304  }
1305 
1306 
1309 
1312  std::unique_ptr<statemachine::Machine> machine;
1313  {
1314  beginJob(); //make sure this was called
1315 
1316  //StatusCode returnCode = epSuccess;
1318 
1319  // make the services available
1321 
1322  machine = createStateMachine();
1325  try {
1326  convertException::wrap([&]() {
1327 
1328  InputSource::ItemType itemType;
1329 
1330  while(true) {
1331 
1332  bool more = true;
1333  if(numberOfForkedChildren_ > 0) {
1334  size_t size = preg_->size();
1335  {
1336  SendSourceTerminationSignalIfException sentry(actReg_.get());
1337  more = input_->skipForForking();
1338  sentry.completedSuccessfully();
1339  }
1340  if(more) {
1341  if(size < preg_->size()) {
1343  }
1345  }
1346  }
1347  {
1348  SendSourceTerminationSignalIfException sentry(actReg_.get());
1349  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1350  sentry.completedSuccessfully();
1351  }
1352 
1353  FDEBUG(1) << "itemType = " << itemType << "\n";
1354 
1355  if(checkForAsyncStopRequest(returnCode)) {
1356  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1357  forceLooperToEnd_ = true;
1358  machine->process_event(statemachine::Stop());
1359  forceLooperToEnd_ = false;
1360  break;
1361  }
1362 
1363  if(itemType == InputSource::IsEvent) {
1364  machine->process_event(statemachine::Event());
1366  forceLooperToEnd_ = true;
1367  machine->process_event(statemachine::Stop());
1368  forceLooperToEnd_ = false;
1370  break;
1371  }
1373  }
1374 
1375  if(itemType == InputSource::IsEvent) {
1376  }
1377  else if(itemType == InputSource::IsStop) {
1378  machine->process_event(statemachine::Stop());
1379  }
1380  else if(itemType == InputSource::IsFile) {
1381  machine->process_event(statemachine::File());
1382  }
1383  else if(itemType == InputSource::IsRun) {
1384  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1385  }
1386  else if(itemType == InputSource::IsLumi) {
1387  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1388  }
1389  else if(itemType == InputSource::IsSynchronize) {
1390  //For now, we don't have to do anything
1391  }
1392  // This should be impossible
1393  else {
1395  << "Unknown next item type passed to EventProcessor\n"
1396  << "Please report this error to the Framework group\n";
1397  }
1398  if(machine->terminated()) {
1399  break;
1400  }
1401  } // End of loop over state machine events
1402  }); // convertException::wrap
1403  } // Try block
1404  // Some comments on exception handling related to the boost state machine:
1405  //
1406  // Some states used in the machine are special because they
1407  // perform actions while the machine is being terminated, actions
1408  // such as close files, call endRun, call endLumi etc ... Each of these
1409  // states has two functions that perform these actions. The functions
1410  // are almost identical. The major difference is that one version
1411  // catches all exceptions and the other lets exceptions pass through.
1412  // The destructor catches them and the other function named "exit" lets
1413  // them pass through. On a normal termination, boost will always call
1414  // "exit" and then the state destructor. In our state classes, the
1415  // the destructors do nothing if the exit function already took
1416  // care of things. Here's the interesting part. When boost is
1417  // handling an exception the "exit" function is not called (a boost
1418  // feature).
1419  //
1420  // If an exception occurs while the boost machine is in control
1421  // (which usually means inside a process_event call), then
1422  // the boost state machine destroys its states and "terminates" itself.
1423  // This already done before we hit the catch blocks below. In this case
1424  // the call to terminateMachine below only destroys an already
1425  // terminated state machine. Because exit is not called, the state destructors
1426  // handle cleaning up lumis, runs, and files. The destructors swallow
1427  // all exceptions and only pass through the exceptions messages, which
1428  // are tacked onto the original exception below.
1429  //
1430  // If an exception occurs when the boost state machine is not
1431  // in control (outside the process_event functions), then boost
1432  // cannot destroy its own states. The terminateMachine function
1433  // below takes care of that. The flag "alreadyHandlingException"
1434  // is set true so that the state exit functions do nothing (and
1435  // cannot throw more exceptions while handling the first). Then the
1436  // state destructors take care of this because exit did nothing.
1437  //
1438  // In both cases above, the EventProcessor::endOfLoop function is
1439  // not called because it can throw exceptions.
1440  //
1441  // One tricky aspect of the state machine is that things that can
1442  // throw should not be invoked by the state machine while another
1443  // exception is being handled.
1444  // Another tricky aspect is that it appears to be important to
1445  // terminate the state machine before invoking its destructor.
1446  // We've seen crashes that are not understood when that is not
1447  // done. Maintainers of this code should be careful about this.
1448 
1449  catch (cms::Exception & e) {
1451  terminateMachine(std::move(machine));
1452  alreadyHandlingException_ = false;
1453  if (!exceptionMessageLumis_.empty()) {
1455  if (e.alreadyPrinted()) {
1456  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1457  }
1458  }
1459  if (!exceptionMessageRuns_.empty()) {
1461  if (e.alreadyPrinted()) {
1462  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1463  }
1464  }
1465  if (!exceptionMessageFiles_.empty()) {
1467  if (e.alreadyPrinted()) {
1468  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1469  }
1470  }
1471  throw;
1472  }
1473 
1474  if(machine->terminated()) {
1475  FDEBUG(1) << "The state machine reports it has been terminated\n";
1476  machine.reset();
1477  }
1478 
1480  throw cms::Exception("BadState")
1481  << "The boost state machine in the EventProcessor exited after\n"
1482  << "entering the Error state.\n";
1483  }
1484 
1485  }
1486  if(machine.get() != nullptr) {
1487  terminateMachine(std::move(machine));
1489  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1490  << "Please report this error to the Framework group\n";
1491  }
1492 
1493  return returnCode;
1494  }
1495 
1497  FDEBUG(1) << " \treadFile\n";
1498  size_t size = preg_->size();
1499  SendSourceTerminationSignalIfException sentry(actReg_.get());
1500 
1501  fb_ = input_->readFile();
1502  if(size < preg_->size()) {
1504  }
1506  if((numberOfForkedChildren_ > 0) or
1509  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1510  }
1511  sentry.completedSuccessfully();
1512  }
1513 
1514  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1515  if (fb_.get() != nullptr) {
1516  SendSourceTerminationSignalIfException sentry(actReg_.get());
1517  input_->closeFile(fb_.get(), cleaningUpAfterException);
1518  sentry.completedSuccessfully();
1519  }
1520  FDEBUG(1) << "\tcloseInputFile\n";
1521  }
1522 
1524  if (fb_.get() != nullptr) {
1525  schedule_->openOutputFiles(*fb_);
1526  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1527  }
1528  FDEBUG(1) << "\topenOutputFiles\n";
1529  }
1530 
1532  if (fb_.get() != nullptr) {
1533  schedule_->closeOutputFiles();
1534  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1535  }
1536  FDEBUG(1) << "\tcloseOutputFiles\n";
1537  }
1538 
1540  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1541  if (fb_.get() != nullptr) {
1542  schedule_->respondToOpenInputFile(*fb_);
1543  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1544  }
1545  FDEBUG(1) << "\trespondToOpenInputFile\n";
1546  }
1547 
1549  if (fb_.get() != nullptr) {
1550  schedule_->respondToCloseInputFile(*fb_);
1551  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1552  }
1553  FDEBUG(1) << "\trespondToCloseInputFile\n";
1554  }
1555 
1557  shouldWeStop_ = false;
1558  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1559  // until after we've called beginOfJob
1560  if(looper_ && looperBeginJobRun_) {
1561  looper_->doStartingNewLoop();
1562  }
1563  FDEBUG(1) << "\tstartingNewLoop\n";
1564  }
1565 
1567  if(looper_) {
1568  ModuleChanger changer(schedule_.get(),preg_.get());
1569  looper_->setModuleChanger(&changer);
1570  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1571  looper_->setModuleChanger(nullptr);
1572  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1573  else return false;
1574  }
1575  FDEBUG(1) << "\tendOfLoop\n";
1576  return true;
1577  }
1578 
1580  input_->repeat();
1581  input_->rewind();
1582  FDEBUG(1) << "\trewind\n";
1583  }
1584 
1586  looper_->prepareForNextLoop(esp_.get());
1587  FDEBUG(1) << "\tprepareForNextLoop\n";
1588  }
1589 
1591  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1592  if(!subProcesses_.empty()) {
1593  for(auto const& subProcess : subProcesses_) {
1594  if(subProcess.shouldWeCloseOutput()) {
1595  return true;
1596  }
1597  }
1598  return false;
1599  }
1600  return schedule_->shouldWeCloseOutput();
1601  }
1602 
1604  FDEBUG(1) << "\tdoErrorStuff\n";
1605  LogError("StateMachine")
1606  << "The EventProcessor state machine encountered an unexpected event\n"
1607  << "and went to the error state\n"
1608  << "Will attempt to terminate processing normally\n"
1609  << "(IF using the looper the next loop will be attempted)\n"
1610  << "This likely indicates a bug in an input module or corrupted input or both\n";
1612  }
1613 
1615  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1616  {
1617  SendSourceTerminationSignalIfException sentry(actReg_.get());
1618 
1619  input_->doBeginRun(runPrincipal, &processContext_);
1620  sentry.completedSuccessfully();
1621  }
1622 
1623  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1624  runPrincipal.beginTime());
1626  espController_->forceCacheClear();
1627  }
1628  {
1629  SendSourceTerminationSignalIfException sentry(actReg_.get());
1630  espController_->eventSetupForInstance(ts);
1631  sentry.completedSuccessfully();
1632  }
1633  EventSetup const& es = esp_->eventSetup();
1634  if(looper_ && looperBeginJobRun_== false) {
1635  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1636  looper_->beginOfJob(es);
1637  looperBeginJobRun_ = true;
1638  looper_->doStartingNewLoop();
1639  }
1640  {
1642  auto globalWaitTask = make_empty_waiting_task();
1643  globalWaitTask->increment_ref_count();
1644  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1645  *schedule_,
1646  runPrincipal,
1647  ts,
1648  es,
1649  subProcesses_);
1650  globalWaitTask->wait_for_all();
1651  if(globalWaitTask->exceptionPtr() != nullptr) {
1652  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1653  }
1654  }
1655  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1656  if(looper_) {
1657  looper_->doBeginRun(runPrincipal, es, &processContext_);
1658  }
1659  {
1660  //To wait, the ref count has to be 1+#streams
1661  auto streamLoopWaitTask = make_empty_waiting_task();
1662  streamLoopWaitTask->increment_ref_count();
1663 
1665 
1666  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1667  *schedule_,
1669  runPrincipal,
1670  ts,
1671  es,
1672  subProcesses_);
1673 
1674  streamLoopWaitTask->wait_for_all();
1675  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1676  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1677  }
1678  }
1679  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1680  if(looper_) {
1681  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1682  }
1683  }
1684 
1685  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1686  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1687  {
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689 
1690  runPrincipal.setEndTime(input_->timestamp());
1691  runPrincipal.setComplete();
1692  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1693  sentry.completedSuccessfully();
1694  }
1695 
1697  runPrincipal.endTime());
1698  {
1699  SendSourceTerminationSignalIfException sentry(actReg_.get());
1700  espController_->eventSetupForInstance(ts);
1701  sentry.completedSuccessfully();
1702  }
1703  EventSetup const& es = esp_->eventSetup();
1704  {
1705  //To wait, the ref count has to be 1+#streams
1706  auto streamLoopWaitTask = make_empty_waiting_task();
1707  streamLoopWaitTask->increment_ref_count();
1708 
1710 
1711  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1712  *schedule_,
1714  runPrincipal,
1715  ts,
1716  es,
1717  subProcesses_,
1718  cleaningUpAfterException);
1719 
1720  streamLoopWaitTask->wait_for_all();
1721  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1722  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1723  }
1724  }
1725  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1726  if(looper_) {
1727  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1728  }
1729  {
1730  runPrincipal.setAtEndTransition(true);
1732  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1733  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1734  }
1735  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1736  if(looper_) {
1737  looper_->doEndRun(runPrincipal, es, &processContext_);
1738  }
1739  }
1740 
1742  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1743  {
1744  SendSourceTerminationSignalIfException sentry(actReg_.get());
1745 
1746  input_->doBeginLumi(lumiPrincipal, &processContext_);
1747  sentry.completedSuccessfully();
1748  }
1749 
1751  if(rng.isAvailable()) {
1752  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1753  rng->preBeginLumi(lb);
1754  }
1755 
1756  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1757  // lumi blocks know their start and end times why not also start and end events?
1758  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1759  {
1760  SendSourceTerminationSignalIfException sentry(actReg_.get());
1761  espController_->eventSetupForInstance(ts);
1762  sentry.completedSuccessfully();
1763  }
1764  EventSetup const& es = esp_->eventSetup();
1765  {
1767  auto globalWaitTask = make_empty_waiting_task();
1768  globalWaitTask->increment_ref_count();
1769  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1770  *schedule_,
1771  lumiPrincipal,
1772  ts,
1773  es,
1774  subProcesses_);
1775  globalWaitTask->wait_for_all();
1776  if(globalWaitTask->exceptionPtr() != nullptr) {
1777  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1778  }
1779  }
1780  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1781  if(looper_) {
1782  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1783  }
1784  {
1785  //To wait, the ref count has to b 1+#streams
1786  auto streamLoopWaitTask = make_empty_waiting_task();
1787  streamLoopWaitTask->increment_ref_count();
1788 
1790 
1791  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1792  *schedule_,
1794  lumiPrincipal,
1795  ts,
1796  es,
1797  subProcesses_);
1798  streamLoopWaitTask->wait_for_all();
1799  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1800  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1801  }
1802  }
1803 
1804  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1805  if(looper_) {
1806  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1807  }
1808  }
1809 
1810  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1811  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1812  {
1813  SendSourceTerminationSignalIfException sentry(actReg_.get());
1814 
1815  lumiPrincipal.setEndTime(input_->timestamp());
1816  lumiPrincipal.setComplete();
1817  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1818  sentry.completedSuccessfully();
1819  }
1820  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1821  // lumi blocks know their start and end times why not also start and end events?
1822  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1823  lumiPrincipal.endTime());
1824  {
1825  SendSourceTerminationSignalIfException sentry(actReg_.get());
1826  espController_->eventSetupForInstance(ts);
1827  sentry.completedSuccessfully();
1828  }
1829  EventSetup const& es = esp_->eventSetup();
1830  {
1831  //To wait, the ref count has to b 1+#streams
1832  auto streamLoopWaitTask = make_empty_waiting_task();
1833  streamLoopWaitTask->increment_ref_count();
1834 
1836 
1837  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1838  *schedule_,
1840  lumiPrincipal,
1841  ts,
1842  es,
1843  subProcesses_,
1844  cleaningUpAfterException);
1845  streamLoopWaitTask->wait_for_all();
1846  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1847  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1848  }
1849  }
1850  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1851  if(looper_) {
1852  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1853  }
1854  {
1855  lumiPrincipal.setAtEndTransition(true);
1857  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1858  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1859  }
1860  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1861  if(looper_) {
1862  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1863  }
1864  }
1865 
1869  << "EventProcessor::readRun\n"
1870  << "Illegal attempt to insert run into cache\n"
1871  << "Contact a Framework Developer\n";
1872  }
1873  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1874  {
1875  SendSourceTerminationSignalIfException sentry(actReg_.get());
1876  input_->readRun(*rp, *historyAppender_);
1877  sentry.completedSuccessfully();
1878  }
1879  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1880  principalCache_.insert(rp);
1881  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1882  }
1883 
1885  principalCache_.merge(input_->runAuxiliary(), preg());
1886  auto runPrincipal =principalCache_.runPrincipalPtr();
1887  {
1888  SendSourceTerminationSignalIfException sentry(actReg_.get());
1889  input_->readAndMergeRun(*runPrincipal);
1890  sentry.completedSuccessfully();
1891  }
1892  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1893  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1894  }
1895 
1899  << "EventProcessor::readRun\n"
1900  << "Illegal attempt to insert lumi into cache\n"
1901  << "Contact a Framework Developer\n";
1902  }
1905  << "EventProcessor::readRun\n"
1906  << "Illegal attempt to insert lumi into cache\n"
1907  << "Run is invalid\n"
1908  << "Contact a Framework Developer\n";
1909  }
1910  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1911  {
1912  SendSourceTerminationSignalIfException sentry(actReg_.get());
1913  input_->readLuminosityBlock(*lbp, *historyAppender_);
1914  sentry.completedSuccessfully();
1915  }
1916  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1917  principalCache_.insert(lbp);
1918  return input_->luminosityBlock();
1919  }
1920 
1922  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1923  {
1924  SendSourceTerminationSignalIfException sentry(actReg_.get());
1925  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1926  sentry.completedSuccessfully();
1927  }
1928  return input_->luminosityBlock();
1929  }
1930 
1933  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1934  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1935  }
1936 
1939  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1940  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1941  }
1942 
1944  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1945  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1946  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1947  }
1948 
1950  principalCache_.deleteLumi(phid, run, lumi);
1951  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1952  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1953  }
1954 
1955  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1956  std::atomic<bool>* finishedProcessingEvents) {
1957  if(shouldWeStop()) {
1958  return false;
1959  }
1960 
1961  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1962  return false;
1963  }
1964 
1965  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1966  return false;
1967  }
1968 
1970  try {
1971  //need to use lock in addition to the serial task queue because
1972  // of delayed provenance reading and reading data in response to
1973  // edm::Refs etc
1974  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1975  if(not firstEventInBlock_) {
1976  //The state machine already called input_->nextItemType
1977  // and found an event. We can't call input_->nextItemType
1978  // again since it would move to the next transition
1979  InputSource::ItemType itemType = input_->nextItemType();
1980  if (InputSource::IsEvent !=itemType) {
1982  finishedProcessingEvents->store(true,std::memory_order_release);
1983  //std::cerr<<"next item type "<<itemType<<"\n";
1984  return false;
1985  }
1987  //std::cerr<<"task told to async stop\n";
1988  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1989  return false;
1990  }
1991  } else {
1992  firstEventInBlock_ = false;
1993  }
1994  readEvent(iStreamIndex);
1995  } catch (...) {
1996  bool expected =false;
1997  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1998  deferredExceptionPtr_ = std::current_exception();
1999 
2000  }
2001  return false;
2002  }
2003  return true;
2004  }
2005 
2007  unsigned int iStreamIndex,
2008  std::atomic<bool>* finishedProcessingEvents)
2009  {
2010  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
2011  if(iPtr) {
2012  bool expected = false;
2013  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2014  deferredExceptionPtr_ = *iPtr;
2015  {
2016  WaitingTaskHolder h(iTask);
2017  h.doneWaiting(*iPtr);
2018  }
2019  }
2020  //the stream will stop now
2021  iTask->decrement_ref_count();
2022  return;
2023  }
2024 
2025  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
2026  });
2027 
2028  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
2030 
2031  try {
2032  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
2033  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
2034  } else {
2035  //the stream will stop now
2036  tbb::task::destroy(*recursionTask);
2037  iTask->decrement_ref_count();
2038  }
2039  } catch(...) {
2040  WaitingTaskHolder h(recursionTask);
2041  h.doneWaiting(std::current_exception());
2042  }
2043  });
2044  }
2045 
2047  if(numberOfForkedChildren_>0) {
2048  //Have to do something special for forking since
2049  // after each event the system may have to skip
2050  // some transitions. This is handled in runToCompletion
2051  readEvent(0);
2052  auto eventLoopWaitTask = make_empty_waiting_task();
2053  eventLoopWaitTask->increment_ref_count();
2054  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
2055  eventLoopWaitTask->wait_for_all();
2056  return;
2057  }
2060 
2061  std::atomic<bool> finishedProcessingEvents{false};
2062  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2063 
2064  //The state machine already found the event so
2065  // we have to avoid looking again
2066  firstEventInBlock_ = true;
2067 
2068  //To wait, the ref count has to b 1+#streams
2069  auto eventLoopWaitTask = make_empty_waiting_task();
2070  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2071  eventLoopWaitTask->increment_ref_count();
2072 
2073  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2074  unsigned int iStreamIndex = 0;
2075  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2076  eventLoopWaitTask->increment_ref_count();
2077  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2078  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2079  }) );
2080  }
2081  eventLoopWaitTask->increment_ref_count();
2082  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2083  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2084  }));
2085 
2086  //One of the processing threads saw an exception
2088  std::rethrow_exception(deferredExceptionPtr_);
2089  }
2090  }
2091  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2092  //TODO this will have to become per stream
2093  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2094  StreamContext streamContext(event.streamID(), &processContext_);
2095 
2096  SendSourceTerminationSignalIfException sentry(actReg_.get());
2097  input_->readEvent(event, streamContext);
2098  sentry.completedSuccessfully();
2099 
2100  FDEBUG(1) << "\treadEvent\n";
2101  }
2102 
2104  unsigned int iStreamIndex) {
2105  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2106  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2108  if(rng.isAvailable()) {
2109  Event ev(*pep, ModuleDescription(), nullptr);
2110  rng->postEventRead(ev);
2111  }
2112  assert(pep->luminosityBlockPrincipalPtrValid());
2113  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2114  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2115 
2116  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2117  tbb::task::allocate_root(),
2118  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2119  {
2121 
2122  //NOTE: If we have a looper we only have one Stream
2123  if(looper_) {
2124  processEventWithLooper(*pep);
2125  }
2126 
2127  FDEBUG(1) << "\tprocessEvent\n";
2128  pep->clearEventPrincipal();
2129  if(iPtr) {
2130  iHolder.doneWaiting(*iPtr);
2131  } else {
2132  iHolder.doneWaiting(std::exception_ptr());
2133  }
2134  }
2135  )
2136  );
2137  WaitingTaskHolder afterProcessTask;
2138  if(subProcesses_.empty()) {
2139  afterProcessTask = std::move(finalizeEventTask);
2140  } else {
2141  //Need to run SubProcesses after schedule has finished
2142  // with the event
2143  afterProcessTask = WaitingTaskHolder(
2144  make_waiting_task(tbb::task::allocate_root(),
2145  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2146  {
2147  if(not iPtr) {
2149 
2150  //when run with 1 thread, we want to the order to be what
2151  // it was before. This requires reversing the order since
2152  // tasks are run last one in first one out
2153  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2154  subProcess.doEventAsync(finalizeEventTask,*pep);
2155  }
2156  } else {
2157  finalizeEventTask.doneWaiting(*iPtr);
2158  }
2159  })
2160  );
2161  }
2162 
2163  schedule_->processOneEventAsync(std::move(afterProcessTask),
2164  iStreamIndex,*pep, esp_->eventSetup());
2165 
2166  }
2167 
2169  bool randomAccess = input_->randomAccess();
2170  ProcessingController::ForwardState forwardState = input_->forwardState();
2171  ProcessingController::ReverseState reverseState = input_->reverseState();
2172  ProcessingController pc(forwardState, reverseState, randomAccess);
2173 
2175  do {
2176 
2177  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2178  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2179 
2180  bool succeeded = true;
2181  if(randomAccess) {
2183  input_->skipEvents(-2);
2184  }
2186  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2187  }
2188  }
2189  pc.setLastOperationSucceeded(succeeded);
2190  } while(!pc.lastOperationSucceeded());
2191  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2192  }
2193 
2195  FDEBUG(1) << "\tshouldWeStop\n";
2196  if(shouldWeStop_) return true;
2197  if(!subProcesses_.empty()) {
2198  for(auto const& subProcess : subProcesses_) {
2199  if(subProcess.terminate()) {
2200  return true;
2201  }
2202  }
2203  return false;
2204  }
2205  return schedule_->terminate();
2206  }
2207 
2210  }
2211 
2214  }
2215 
2218  }
2219 
2222  }
2223 
2224  void EventProcessor::terminateMachine(std::unique_ptr<statemachine::Machine> iMachine) {
2225  if(iMachine.get() != nullptr) {
2226  if(!iMachine->terminated()) {
2227  forceLooperToEnd_ = true;
2228  iMachine->process_event(statemachine::Stop());
2229  forceLooperToEnd_ = false;
2230  }
2231  else {
2232  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2233  }
2234  if(iMachine->terminated()) {
2235  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2236  }
2237  }
2238  }
2239 }
std::string emptyRunLumiMode_
size
Write out results.
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
virtual void openOutputFiles() override
virtual void setExceptionMessageLumis(std::string &message) override
SharedResourcesAcquirer sourceResourcesAcquirer_
virtual void doErrorStuff() override
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
virtual statemachine::Run readAndMergeRun() override
edm::propagate_const< std::unique_ptr< InputSource > > input_
virtual bool endOfLoop() override
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
static PFTauRenderPlugin instance
def destroy(e)
Definition: pyrootRender.py:13
ParameterSetID id() const
virtual bool shouldWeCloseOutput() const override
void possiblyContinueAfterForkChildFailure()
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual int readLuminosityBlock() override
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
#define NULL
Definition: scimark2.h:8
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException) override
bool ev
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:322
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
bool hasRunPrincipal() const
Definition: config.py:1
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
virtual void setExceptionMessageRuns(std::string &message) override
std::set< std::pair< std::string, std::string > > ExcludedData
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
virtual void startingNewLoop() override
virtual void rewindInput() override
bool alreadyPrinted() const
Definition: Exception.cc:251
bool forkProcess(std::string const &jobReportFile)
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:91
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
virtual bool shouldWeStop() const override
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual statemachine::Run readRun() override
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
virtual void prepareForNextLoop() override
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual bool alreadyHandlingException() const override
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:102
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:44
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: value.py:1
def pipe(cmdline, input=None)
Definition: pipe.py:5
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
int totalEvents() const
std::shared_ptr< edm::ParameterSet > parameterSet() const
virtual StatusCode runToCompletion() override
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:748
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void readFile() override
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
edm::RunNumber_t runNumber() const
Definition: EPStates.h:50
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
virtual void closeOutputFiles() override
HLT enums.
virtual void writeRun(statemachine::Run const &run) override
virtual void closeInputFile(bool cleaningUpAfterException) override
virtual void respondToOpenInputFile() override
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:49
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual int readAndMergeLumi() override
unsigned int RunNumber_t
#define O_NONBLOCK
Definition: SysFile.h:21
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
virtual void respondToCloseInputFile() override
virtual void readAndProcessEvent() override
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
virtual void deleteRunFromCache(statemachine::Run const &run) override
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
Definition: pipe.py:1
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
virtual void setExceptionMessageFiles(std::string &message) override
T get(const Candidate &c)
Definition: component.h:55
virtual void beginRun(statemachine::Run const &run) override
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31