CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
2 
9 
37 
39 
47 
52 
54 
66 
67 #include "MessageForSource.h"
68 #include "MessageForParent.h"
69 
70 #include "boost/thread/xtime.hpp"
71 #include "boost/range/adaptor/reversed.hpp"
72 
73 #include <exception>
74 #include <iomanip>
75 #include <iostream>
76 #include <utility>
77 #include <sstream>
78 
79 #include <sys/ipc.h>
80 #include <sys/msg.h>
81 
82 #include "tbb/task.h"
83 
84 //Used for forking
85 #include <sys/types.h>
86 #include <sys/wait.h>
87 #include <sys/socket.h>
88 #include <sys/select.h>
89 #include <sys/fcntl.h>
90 #include <unistd.h>
91 
92 
93 //Used for CPU affinity
94 #ifndef __APPLE__
95 #include <sched.h>
96 #endif
97 
98 namespace {
99  //Sentry class to only send a signal if an
100  // exception occurs. An exception is identified
101  // by the destructor being called without first
102  // calling completedSuccessfully().
103  class SendSourceTerminationSignalIfException {
104  public:
105  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
106  reg_(iReg) {}
107  ~SendSourceTerminationSignalIfException() {
108  if(reg_) {
109  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
110  }
111  }
112  void completedSuccessfully() {
113  reg_ = nullptr;
114  }
115  private:
116  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
117  };
118 
119 }
120 
121 namespace edm {
122 
123  // ---------------------------------------------------------------
124  std::unique_ptr<InputSource>
126  CommonParams const& common,
127  std::shared_ptr<ProductRegistry> preg,
128  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
129  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
130  std::shared_ptr<ActivityRegistry> areg,
131  std::shared_ptr<ProcessConfiguration const> processConfiguration,
132  PreallocationConfiguration const& allocations) {
133  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
134  if(main_input == 0) {
136  << "There must be exactly one source in the configuration.\n"
137  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138  }
139 
140  std::string modtype(main_input->getParameter<std::string>("@module_type"));
141 
142  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
144  ConfigurationDescriptions descriptions(filler->baseType());
145  filler->fill(descriptions);
146 
147  try {
148  convertException::wrap([&]() {
149  descriptions.validate(*main_input, std::string("source"));
150  });
151  }
152  catch (cms::Exception & iException) {
153  std::ostringstream ost;
154  ost << "Validating configuration of input source of type " << modtype;
155  iException.addContext(ost.str());
156  throw;
157  }
158 
159  main_input->registerIt();
160 
161  // Fill in "ModuleDescription", in case the input source produces
162  // any EDProducts, which would be registered in the ProductRegistry.
163  // Also fill in the process history item for this process.
164  // There is no module label for the unnamed input source, so
165  // just use "source".
166  // Only the tracked parameters belong in the process configuration.
167  ModuleDescription md(main_input->id(),
168  main_input->getParameter<std::string>("@module_type"),
169  "source",
170  processConfiguration.get(),
171  ModuleDescription::getUniqueID());
172 
173  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
174  common.maxEventsInput_, common.maxLumisInput_,
175  common.maxSecondsUntilRampdown_, allocations);
176 
177  areg->preSourceConstructionSignal_(md);
178  std::unique_ptr<InputSource> input;
179  try {
180  //even if we have an exception, send the signal
181  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
182  convertException::wrap([&]() {
183  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
184  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186  });
187  }
188  catch (cms::Exception& iException) {
189  std::ostringstream ost;
190  ost << "Constructing input source of type " << modtype;
191  iException.addContext(ost.str());
192  throw;
193  }
194  return input;
195  }
196 
197  // ---------------------------------------------------------------
198  std::shared_ptr<EDLooperBase>
201  ParameterSet& params) {
202  std::shared_ptr<EDLooperBase> vLooper;
203 
204  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
205 
206  if(loopers.size() == 0) {
207  return vLooper;
208  }
209 
210  assert(1 == loopers.size());
211 
212  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
213  itName != itNameEnd;
214  ++itName) {
215 
216  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
217  providerPSet->registerIt();
218  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
219  cp,
220  *providerPSet);
221  }
222  return vLooper;
223  }
224 
225  // ---------------------------------------------------------------
226  EventProcessor::EventProcessor(std::string const& config,
227  ServiceToken const& iToken,
229  std::vector<std::string> const& defaultServices,
230  std::vector<std::string> const& forcedServices) :
231  actReg_(),
232  preg_(),
233  branchIDListHelper_(),
234  serviceToken_(),
235  input_(),
236  espController_(new eventsetup::EventSetupsController),
237  esp_(),
238  act_table_(),
239  processConfiguration_(),
240  schedule_(),
241  subProcesses_(),
242  historyAppender_(new HistoryAppender),
243  fb_(),
244  looper_(),
245  deferredExceptionPtrIsSet_(false),
246  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
247  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
248  principalCache_(),
249  beginJobCalled_(false),
250  shouldWeStop_(false),
251  stateMachineWasInErrorState_(false),
252  fileMode_(),
253  emptyRunLumiMode_(),
254  exceptionMessageFiles_(),
255  exceptionMessageRuns_(),
256  exceptionMessageLumis_(),
257  alreadyHandlingException_(false),
258  forceLooperToEnd_(false),
259  looperBeginJobRun_(false),
260  forceESCacheClearOnNewRun_(false),
261  numberOfForkedChildren_(0),
262  numberOfSequentialEventsPerChild_(1),
263  setCpuAffinity_(false),
264  eventSetupDataToExcludeFromPrefetching_() {
265  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
266  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
267  processDesc->addServices(defaultServices, forcedServices);
268  init(processDesc, iToken, iLegacy);
269  }
270 
272  std::vector<std::string> const& defaultServices,
273  std::vector<std::string> const& forcedServices) :
274  actReg_(),
275  preg_(),
277  serviceToken_(),
278  input_(),
279  espController_(new eventsetup::EventSetupsController),
280  esp_(),
281  act_table_(),
283  schedule_(),
284  subProcesses_(),
286  fb_(),
287  looper_(),
289  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
290  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
291  principalCache_(),
295  fileMode_(),
310  {
311  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
312  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
313  processDesc->addServices(defaultServices, forcedServices);
315  }
316 
317  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
318  ServiceToken const& token,
320  actReg_(),
321  preg_(),
323  serviceToken_(),
324  input_(),
325  espController_(new eventsetup::EventSetupsController),
326  esp_(),
327  act_table_(),
329  schedule_(),
330  subProcesses_(),
332  fb_(),
333  looper_(),
335  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
336  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
337  principalCache_(),
341  fileMode_(),
356  {
357  init(processDesc, token, legacy);
358  }
359 
360 
362  actReg_(),
363  preg_(),
365  serviceToken_(),
366  input_(),
367  espController_(new eventsetup::EventSetupsController),
368  esp_(),
369  act_table_(),
371  schedule_(),
372  subProcesses_(),
374  fb_(),
375  looper_(),
377  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
378  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
379  principalCache_(),
383  fileMode_(),
398  {
399  if(isPython) {
400  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
401  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
403  }
404  else {
405  auto processDesc = std::make_shared<ProcessDesc>(config);
407  }
408  }
409 
410  void
411  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
412  ServiceToken const& iToken,
414 
415  //std::cerr << processDesc->dump() << std::endl;
416 
417  // register the empty parentage vector , once and for all
419 
420  // register the empty parameter set, once and for all.
422 
423  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
424 
425  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
426  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
427  bool const hasSubProcesses = !subProcessVParameterSet.empty();
428 
429  // Now set some parameters specific to the main process.
430  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
431  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
432  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
433  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
434  //threading
435  unsigned int nThreads=1;
436  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
437  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
438  if(nThreads == 0) {
439  nThreads = 1;
440  }
441  }
442  /* TODO: when we support having each stream run in a different thread use this default
443  unsigned int nStreams =nThreads;
444  */
445  unsigned int nStreams =1;
446  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
447  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
448  if(nStreams==0) {
449  nStreams = nThreads;
450  }
451  // PG: Log the number of streams
452  edm::LogInfo("StreamSetup") <<"setting # streams "<<nStreams;
453  }
454  /*
455  bool nRunsSet = false;
456  */
457  unsigned int nConcurrentRuns =1;
458  /*
459  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
460  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
461  }
462  */
463  unsigned int nConcurrentLumis =1;
464  /*
465  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
466  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
467  } else {
468  nConcurrentLumis = nConcurrentRuns;
469  }
470  */
471  //Check that relationships between threading parameters makes sense
472  /*
473  if(nThreads<nStreams) {
474  //bad
475  }
476  if(nConcurrentRuns>nStreams) {
477  //bad
478  }
479  if(nConcurrentRuns>nConcurrentLumis) {
480  //bad
481  }
482  */
483  //forking
484  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
485  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
486  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
487  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
488  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
489  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
490  for(auto const& ps : excluded) {
491  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
492  ps.getUntrackedParameter<std::string>("label", ""));
493  }
494  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
495 
496  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
497 
498  // Now do general initialization
500 
501  //initialize the services
502  auto& serviceSets = processDesc->getServicesPSets();
503  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
504  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
505 
506  //make the services available
508 
509  if(nStreams>1) {
511  handler->willBeUsingThreads();
512  }
513 
514  // intialize miscellaneous items
515  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
516 
517  // intialize the event setup provider
518  esp_ = espController_->makeProvider(*parameterSet);
519 
520  // initialize the looper, if any
521  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
522  if(looper_) {
523  looper_->setActionTable(items.act_table_.get());
524  looper_->attachTo(*items.actReg_);
525 
526  //For now loopers make us run only 1 transition at a time
527  nStreams=1;
528  nConcurrentLumis=1;
529  nConcurrentRuns=1;
530  }
531 
532  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
533 
534  // initialize the input source
535  input_ = makeInput(*parameterSet,
536  *common,
537  items.preg(),
538  items.branchIDListHelper(),
540  items.actReg_,
541  items.processConfiguration(),
543 
544  // intialize the Schedule
545  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
546 
547  // set the data members
549  actReg_ = items.actReg_;
550  preg_ = items.preg();
555  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
556 
557  FDEBUG(2) << parameterSet << std::endl;
558 
560  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
561  // Reusable event principal
562  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
565  }
566 
567  // fill the subprocesses, if there are any
568  subProcesses_.reserve(subProcessVParameterSet.size());
569  for(auto& subProcessPSet : subProcessVParameterSet) {
570  subProcesses_.emplace_back(subProcessPSet,
571  *parameterSet,
572  preg(),
577  *actReg_,
578  token,
581  &processContext_);
582  }
583  }
584 
586  // Make the services available while everything is being deleted.
587  ServiceToken token = getToken();
588  ServiceRegistry::Operate op(token);
589 
590  // manually destroy all these thing that may need the services around
591  // propagate_const<T> has no reset() function
592  espController_ = nullptr;
593  esp_ = nullptr;
594  schedule_ = nullptr;
595  input_ = nullptr;
596  looper_ = nullptr;
597  actReg_ = nullptr;
598 
601  }
602 
603  void
605  if(beginJobCalled_) return;
606  beginJobCalled_=true;
607  bk::beginJob();
608 
609  // StateSentry toerror(this); // should we add this ?
610  //make the services available
612 
617  actReg_->preallocateSignal_(bounds);
619 
620  //NOTE: this may throw
622  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
623 
624  //NOTE: This implementation assumes 'Job' means one call
625  // the EventProcessor::run
626  // If it really means once per 'application' then this code will
627  // have to be changed.
628  // Also have to deal with case where have 'run' then new Module
629  // added and do 'run'
630  // again. In that case the newly added Module needs its 'beginJob'
631  // to be called.
632 
633  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
634  // For now we delay calling beginOfJob until first beginOfRun
635  //if(looper_) {
636  // looper_->beginOfJob(es);
637  //}
638  try {
639  convertException::wrap([&]() {
640  input_->doBeginJob();
641  });
642  }
643  catch(cms::Exception& ex) {
644  ex.addContext("Calling beginJob for the source");
645  throw;
646  }
647  schedule_->beginJob(*preg_);
648  // toerror.succeeded(); // should we add this?
649  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
650  actReg_->postBeginJobSignal_();
651 
652  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
653  schedule_->beginStream(i);
654  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
655  }
656  }
657 
658  void
660  // Collects exceptions, so we don't throw before all operations are performed.
661  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
662 
663  //make the services available
665 
666  //NOTE: this really should go elsewhere in the future
667  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
668  c.call([this,i](){this->schedule_->endStream(i);});
669  for(auto& subProcess : subProcesses_) {
670  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
671  }
672  }
673  auto actReg = actReg_.get();
674  c.call([actReg](){actReg->preEndJobSignal_();});
675  schedule_->endJob(c);
676  for(auto& subProcess : subProcesses_) {
677  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
678  }
679  c.call(std::bind(&InputSource::doEndJob, input_.get()));
680  if(looper_) {
681  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
682  }
683  c.call([actReg](){actReg->postEndJobSignal_();});
684  if(c.hasThrown()) {
685  c.rethrow();
686  }
687  }
688 
691  return serviceToken_;
692  }
693 
694  //Setup signal handler to listen for when forked children stop
695  namespace {
696  //These are volatile since the compiler can not be allowed to optimize them
697  // since they can be modified in the signaller handler
698  volatile bool child_failed = false;
699  volatile unsigned int num_children_done = 0;
700  volatile int child_fail_exit_status = 0;
701  volatile int child_fail_signal = 0;
702 
703  //NOTE: We setup the signal handler to run in the main thread which
704  // is also the same thread that then reads the above values
705 
706  extern "C" {
707  void ep_sigchld(int, siginfo_t*, void*) {
708  //printf("in sigchld\n");
709  //FDEBUG(1) << "in sigchld handler\n";
710  int stat_loc;
711  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
712  while(0<p) {
713  //printf(" looping\n");
714  if(WIFEXITED(stat_loc)) {
715  ++num_children_done;
716  if(0 != WEXITSTATUS(stat_loc)) {
717  child_fail_exit_status = WEXITSTATUS(stat_loc);
718  child_failed = true;
719  }
720  }
721  if(WIFSIGNALED(stat_loc)) {
722  ++num_children_done;
723  child_fail_signal = WTERMSIG(stat_loc);
724  child_failed = true;
725  }
726  p = waitpid(-1, &stat_loc, WNOHANG);
727  }
728  }
729  }
730 
731  }
732 
733  enum {
738  };
739 
740  namespace {
741  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
742  unsigned int n = 0;
743  while(numberOfChildren != 0) {
744  ++n;
745  numberOfChildren /= 10;
746  }
747  if(n == 0) {
748  n = 3; // Protect against zero numberOfChildren
749  }
750  return n;
751  }
752 
753  /*This class embodied the thread which is used to listen to the forked children and
754  then tell them which events they should process */
755  class MessageSenderToSource {
756  public:
757  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
758  void operator()();
759 
760  private:
761  const std::vector<int>& m_childrenPipes;
762  long const m_nEventsToProcess;
763  fd_set m_socketSet;
764  unsigned int m_aliveChildren;
765  int m_maxFd;
766  };
767 
768  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
769  std::vector<int> const& childrenPipes,
770  long iNEventsToProcess):
771  m_childrenPipes(childrenPipes),
772  m_nEventsToProcess(iNEventsToProcess),
773  m_aliveChildren(childrenSockets.size()),
774  m_maxFd(0)
775  {
776  FD_ZERO(&m_socketSet);
777  for (auto const socket : childrenSockets) {
778  FD_SET(socket, &m_socketSet);
779  if (socket > m_maxFd) {
780  m_maxFd = socket;
781  }
782  }
783  for (auto const pipe : childrenPipes) {
784  FD_SET(pipe, &m_socketSet);
785  if (pipe > m_maxFd) {
786  m_maxFd = pipe;
787  }
788  }
789  m_maxFd++; // select reads [0,m_maxFd).
790  }
791 
792  /* This function is the heart of the communication between parent and child.
793  * When ready for more data, the child (see MessageReceiverForSource) requests
794  * data through a AF_UNIX socket message. The parent will then assign the next
795  * chunk of data by sending a message back.
796  *
797  * Additionally, this function also monitors the read-side of the pipe fd from the child.
798  * If the child dies unexpectedly, the pipe will be selected as ready for read and
799  * will return EPIPE when read from. Further, if the child thinks the parent has died
800  * (defined as waiting more than 1s for a response), it will write a single byte to
801  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
802  * If still alive, the parent will read the byte and ignore it.
803  *
804  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
805  * handler can distinguish between success and failure cases.
806  */
807 
808  void
809  MessageSenderToSource::operator()() {
811  LogInfo("ForkingController") << "I am controller";
812  //this is the master and therefore the controller
813 
815  sndmsg.startIndex = 0;
816  sndmsg.nIndices = m_nEventsToProcess;
817  do {
818 
819  fd_set readSockets, errorSockets;
820  // Wait for a request from a child for events.
821  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
822  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
823  // Note that we don't timeout; may be reconsidered in the future.
824  ssize_t rc;
825  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
826  if (rc < 0) {
827  std::cerr << "select failed; should be impossible due to preconditions.\n";
828  abort();
829  break;
830  }
831 
832  // Read the message from the child.
833  for (int idx=0; idx<m_maxFd; idx++) {
834 
835  // Handle errors
836  if (FD_ISSET(idx, &errorSockets)) {
837  LogInfo("ForkingController") << "Error on socket " << idx;
838  FD_CLR(idx, &m_socketSet);
839  close(idx);
840  // See if it was the watchdog pipe that died.
841  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
842  if (*it == idx) {
843  m_aliveChildren--;
844  }
845  }
846  continue;
847  }
848 
849  if (!FD_ISSET(idx, &readSockets)) {
850  continue;
851  }
852 
853  // See if this FD is a child watchdog pipe. If so, read from it to prevent
854  // writes from blocking.
855  bool is_pipe = false;
856  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
857  if (*it == idx) {
858  is_pipe = true;
859  char buf;
860  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
861  if (rc <= 0) {
862  m_aliveChildren--;
863  FD_CLR(idx, &m_socketSet);
864  close(idx);
865  }
866  }
867  }
868 
869  // Only execute this block if the FD is a socket for sending the child work.
870  if (!is_pipe) {
871  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
872  if (rc < 0) {
873  FD_CLR(idx, &m_socketSet);
874  close(idx);
875  continue;
876  }
877 
878  // Tell the child what events to process.
879  // If 'send' fails, then the child process has failed (any other possibilities are
880  // eliminated because we are using fixed-size messages with Unix datagram sockets).
881  // Thus, the SIGCHLD handler will fire and set child_fail = true.
882  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
883  if (rc < 0) {
884  FD_CLR(idx, &m_socketSet);
885  close(idx);
886  continue;
887  }
888  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
889  sndmsg.startIndex += sndmsg.nIndices;
890  }
891  }
892 
893  } while (m_aliveChildren > 0);
894 
895  return;
896  }
897 
898  }
899 
900 
902  if(child_failed && continueAfterChildFailure_) {
903  if (child_fail_signal) {
904  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
905  child_fail_signal=0;
906  } else if (child_fail_exit_status) {
907  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
908  child_fail_exit_status=0;
909  } else {
910  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
911  }
912  child_failed =false;
913  }
914  }
915 
916  bool
917  EventProcessor::forkProcess(std::string const& jobReportFile) {
918 
919  if(0 == numberOfForkedChildren_) {return true;}
920  assert(0<numberOfForkedChildren_);
921  //do what we want done in common
922  {
923  beginJob(); //make sure this was run
924  // make the services available
926 
927  InputSource::ItemType itemType;
928  itemType = input_->nextItemType();
929 
930  assert(itemType == InputSource::IsFile);
931  {
932  readFile();
933  }
934  itemType = input_->nextItemType();
935  assert(itemType == InputSource::IsRun);
936 
937  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
938  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
939  input_->runAuxiliary()->beginTime());
940  espController_->eventSetupForInstance(ts);
941  EventSetup const& es = esp_->eventSetup();
942 
943  //now get all the data available in the EventSetup
944  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
945  es.fillAvailableRecordKeys(recordKeys);
946  std::vector<eventsetup::DataKey> dataKeys;
947  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
948  itKey != itEnd;
949  ++itKey) {
950  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
951  //see if this is on our exclusion list
952  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
953  ExcludedData const* excludedData(nullptr);
954  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
955  excludedData = &(itExcludeRec->second);
956  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
957  //skip all items in this record
958  continue;
959  }
960  }
961  if(0 != recordPtr) {
962  dataKeys.clear();
963  recordPtr->fillRegisteredDataKeys(dataKeys);
964  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
965  itDataKey != itDataKeyEnd;
966  ++itDataKey) {
967  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
968  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
969  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
970  continue;
971  }
972  try {
973  recordPtr->doGet(*itDataKey);
974  } catch(cms::Exception& e) {
975  LogWarning("ForkingEventSetupPreFetching") << e.what();
976  }
977  }
978  }
979  }
980  }
981  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
982  {
983  // make the services available
985  Service<JobReport> jobReport;
986  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
987 
988  //Now actually do the forking
989  actReg_->preForkReleaseResourcesSignal_();
990  input_->doPreForkReleaseResources();
991  schedule_->preForkReleaseResources();
992  }
993  installCustomHandler(SIGCHLD, ep_sigchld);
994 
995 
996  unsigned int childIndex = 0;
997  unsigned int const kMaxChildren = numberOfForkedChildren_;
998  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
999  std::vector<pid_t> childrenIds;
1000  childrenIds.reserve(kMaxChildren);
1001  std::vector<int> childrenSockets;
1002  childrenSockets.reserve(kMaxChildren);
1003  std::vector<int> childrenPipes;
1004  childrenPipes.reserve(kMaxChildren);
1005  std::vector<int> childrenSocketsCopy;
1006  childrenSocketsCopy.reserve(kMaxChildren);
1007  std::vector<int> childrenPipesCopy;
1008  childrenPipesCopy.reserve(kMaxChildren);
1009  int pipes[] {0, 0};
1010 
1011  {
1012  // make the services available
1014  Service<JobReport> jobReport;
1015  int sockets[2], fd_flags;
1016  for(; childIndex < kMaxChildren; ++childIndex) {
1017  // Create a UNIX_DGRAM socket pair
1018  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1019  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022  if (pipe(pipes)) {
1023  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1027  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1028  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1029  exit(EXIT_FAILURE);
1030  }
1031  // Mark socket as non-block. Child must be careful to do select prior
1032  // to reading from socket.
1033  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1034  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1035  exit(EXIT_FAILURE);
1036  }
1037  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1038  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1039  exit(EXIT_FAILURE);
1040  }
1041  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1042  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1043  exit(EXIT_FAILURE);
1044  }
1045  // Linux man page notes there are some edge cases where reading from a
1046  // fd can block, even after a select.
1047  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1048  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1049  exit(EXIT_FAILURE);
1050  }
1051  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1052  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1053  exit(EXIT_FAILURE);
1054  }
1055 
1056  childrenPipesCopy = childrenPipes;
1057  childrenSocketsCopy = childrenSockets;
1058 
1059  pid_t value = fork();
1060  if(value == 0) {
1061  // Close the parent's side of the socket and pipe which will talk to us.
1062  close(pipes[0]);
1063  close(sockets[0]);
1064  // Close our copies of the parent's other communication pipes.
1065  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1066  close(*it);
1067  }
1068  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1069  close(*it);
1070  }
1071 
1072  // this is the child process, redirect stdout and stderr to a log file
1073  fflush(stdout);
1074  fflush(stderr);
1075  std::stringstream stout;
1076  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1077  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1078  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1079  }
1080  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1081  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1082  }
1083 
1084  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1085  if(setCpuAffinity_) {
1086  // CPU affinity is handled differently on macosx.
1087  // We disable it and print a message until someone reads:
1088  //
1089  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1090  //
1091  // and implements it.
1092 #ifdef __APPLE__
1093  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1094 #else
1095  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1096  cpu_set_t mask;
1097  CPU_ZERO(&mask);
1098  CPU_SET(childIndex, &mask);
1099  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1100  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1101  exit(-1);
1102  }
1103 #endif
1104  }
1105  break;
1106  } else {
1107  //this is the parent
1108  close(pipes[1]);
1109  close(sockets[1]);
1110  }
1111  if(value < 0) {
1112  LogError("ForkingChild") << "failed to create a child";
1113  exit(-1);
1114  }
1115  childrenIds.push_back(value);
1116  childrenSockets.push_back(sockets[0]);
1117  childrenPipes.push_back(pipes[0]);
1118  }
1119 
1120  if(childIndex < kMaxChildren) {
1121  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1122  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1123 
1124  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1125  input_->doPostForkReacquireResources(receiver);
1126  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1127  //NOTE: sources have to reset themselves by listening to the post fork message
1128  //rewindInput();
1129  return true;
1130  }
1131  jobReport->parentAfterFork(jobReportFile);
1132  }
1133 
1134  //this is the original, which is now the master for all the children
1135 
1136  //Need to wait for signals from the children or externally
1137  // To wait we must
1138  // 1) block the signals we want to wait on so we do not have a race condition
1139  // 2) check that we haven't already meet our ending criteria
1140  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1141  sigset_t blockingSigSet;
1142  sigset_t unblockingSigSet;
1143  sigset_t oldSigSet;
1144  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1145  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1146  sigaddset(&blockingSigSet, SIGCHLD);
1147  sigaddset(&blockingSigSet, SIGUSR2);
1148  sigaddset(&blockingSigSet, SIGINT);
1149  sigdelset(&unblockingSigSet, SIGCHLD);
1150  sigdelset(&unblockingSigSet, SIGUSR2);
1151  sigdelset(&unblockingSigSet, SIGINT);
1152  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1153 
1154  // If there are too many fd's (unlikely, but possible) for select, denote this
1155  // because the sender will fail.
1156  bool too_many_fds = false;
1157  if (pipes[1]+1 > FD_SETSIZE) {
1158  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1159  too_many_fds = true;
1160  }
1161 
1162  //create a thread that sends the units of work to workers
1163  // we create it after all signals were blocked so that this
1164  // thread is never interupted by a signal
1165  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1166  boost::thread senderThread(sender);
1167 
1168  if(not too_many_fds) {
1169  //NOTE: a child could have failed before we got here and even after this call
1170  // which is why the 'if' is conditional on continueAfterChildFailure_
1172  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1173  sigsuspend(&unblockingSigSet);
1175  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1176  }
1177  }
1178  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1179 
1180  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1181  if(child_failed) {
1182  LogError("ForkingStopping") << "child failed";
1183  }
1184  if(shutdown_flag) {
1185  LogSystem("ForkingStopping") << "asked to shutdown";
1186  }
1187 
1188  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1189  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1190  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1191  it != itEnd; ++it) {
1192  /* int result = */ kill(*it, SIGUSR2);
1193  }
1194  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1195  while(num_children_done != kMaxChildren) {
1196  sigsuspend(&unblockingSigSet);
1197  }
1198  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1199  }
1200  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1201  senderThread.join();
1202  if(child_failed && !continueAfterChildFailure_) {
1203  if (child_fail_signal) {
1204  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1205  } else if (child_fail_exit_status) {
1206  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1207  } else {
1208  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1209  }
1210  }
1211  if(too_many_fds) {
1212  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1213  }
1214  return false;
1215  }
1216 
1217  std::vector<ModuleDescription const*>
1219  return schedule_->getAllModuleDescriptions();
1220  }
1221 
1222  int
1224  return schedule_->totalEvents();
1225  }
1226 
1227  int
1229  return schedule_->totalEventsPassed();
1230  }
1231 
1232  int
1234  return schedule_->totalEventsFailed();
1235  }
1236 
1237  void
1239  schedule_->enableEndPaths(active);
1240  }
1241 
1242  bool
1244  return schedule_->endPathsEnabled();
1245  }
1246 
1247  void
1249  schedule_->getTriggerReport(rep);
1250  }
1251 
1252  void
1254  schedule_->clearCounters();
1255  }
1256 
1257 
1258  std::unique_ptr<statemachine::Machine>
1260  statemachine::FileMode fileMode;
1261  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1262  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1263  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1264  else {
1265  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1266  << fileMode_ << ".\n"
1267  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1268  }
1269 
1270  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1271  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1272  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1273  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1274  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1275  else {
1276  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1277  << emptyRunLumiMode_ << ".\n"
1278  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1279  }
1280 
1281  auto machine = std::make_unique<statemachine::Machine>(
1282  this,
1283  fileMode,
1284  emptyRunLumiMode);
1285 
1286  machine->initiate();
1287  return machine;
1288  }
1289 
1290  bool
1292  bool returnValue = false;
1293 
1294  // Look for a shutdown signal
1295  if(shutdown_flag.load(std::memory_order_acquire)) {
1296  returnValue = true;
1297  returnCode = epSignal;
1298  }
1299  return returnValue;
1300  }
1301 
1302 
1305 
1308  std::unique_ptr<statemachine::Machine> machine;
1309  {
1310  beginJob(); //make sure this was called
1311 
1312  //StatusCode returnCode = epSuccess;
1314 
1315  // make the services available
1317 
1318  machine = createStateMachine();
1321  try {
1322  convertException::wrap([&]() {
1323 
1324  InputSource::ItemType itemType;
1325 
1326  while(true) {
1327 
1328  bool more = true;
1329  if(numberOfForkedChildren_ > 0) {
1330  size_t size = preg_->size();
1331  {
1332  SendSourceTerminationSignalIfException sentry(actReg_.get());
1333  more = input_->skipForForking();
1334  sentry.completedSuccessfully();
1335  }
1336  if(more) {
1337  if(size < preg_->size()) {
1339  }
1341  }
1342  }
1343  {
1344  SendSourceTerminationSignalIfException sentry(actReg_.get());
1345  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1346  sentry.completedSuccessfully();
1347  }
1348 
1349  FDEBUG(1) << "itemType = " << itemType << "\n";
1350 
1351  if(checkForAsyncStopRequest(returnCode)) {
1352  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1353  forceLooperToEnd_ = true;
1354  machine->process_event(statemachine::Stop());
1355  forceLooperToEnd_ = false;
1356  break;
1357  }
1358 
1359  if(itemType == InputSource::IsEvent) {
1360  machine->process_event(statemachine::Event());
1362  forceLooperToEnd_ = true;
1363  machine->process_event(statemachine::Stop());
1364  forceLooperToEnd_ = false;
1366  break;
1367  }
1369  }
1370 
1371  if(itemType == InputSource::IsEvent) {
1372  }
1373  else if(itemType == InputSource::IsStop) {
1374  machine->process_event(statemachine::Stop());
1375  }
1376  else if(itemType == InputSource::IsFile) {
1377  machine->process_event(statemachine::File());
1378  }
1379  else if(itemType == InputSource::IsRun) {
1380  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1381  }
1382  else if(itemType == InputSource::IsLumi) {
1383  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1384  }
1385  else if(itemType == InputSource::IsSynchronize) {
1386  //For now, we don't have to do anything
1387  }
1388  // This should be impossible
1389  else {
1391  << "Unknown next item type passed to EventProcessor\n"
1392  << "Please report this error to the Framework group\n";
1393  }
1394  if(machine->terminated()) {
1395  break;
1396  }
1397  } // End of loop over state machine events
1398  }); // convertException::wrap
1399  } // Try block
1400  // Some comments on exception handling related to the boost state machine:
1401  //
1402  // Some states used in the machine are special because they
1403  // perform actions while the machine is being terminated, actions
1404  // such as close files, call endRun, call endLumi etc ... Each of these
1405  // states has two functions that perform these actions. The functions
1406  // are almost identical. The major difference is that one version
1407  // catches all exceptions and the other lets exceptions pass through.
1408  // The destructor catches them and the other function named "exit" lets
1409  // them pass through. On a normal termination, boost will always call
1410  // "exit" and then the state destructor. In our state classes, the
1411  // the destructors do nothing if the exit function already took
1412  // care of things. Here's the interesting part. When boost is
1413  // handling an exception the "exit" function is not called (a boost
1414  // feature).
1415  //
1416  // If an exception occurs while the boost machine is in control
1417  // (which usually means inside a process_event call), then
1418  // the boost state machine destroys its states and "terminates" itself.
1419  // This already done before we hit the catch blocks below. In this case
1420  // the call to terminateMachine below only destroys an already
1421  // terminated state machine. Because exit is not called, the state destructors
1422  // handle cleaning up lumis, runs, and files. The destructors swallow
1423  // all exceptions and only pass through the exceptions messages, which
1424  // are tacked onto the original exception below.
1425  //
1426  // If an exception occurs when the boost state machine is not
1427  // in control (outside the process_event functions), then boost
1428  // cannot destroy its own states. The terminateMachine function
1429  // below takes care of that. The flag "alreadyHandlingException"
1430  // is set true so that the state exit functions do nothing (and
1431  // cannot throw more exceptions while handling the first). Then the
1432  // state destructors take care of this because exit did nothing.
1433  //
1434  // In both cases above, the EventProcessor::endOfLoop function is
1435  // not called because it can throw exceptions.
1436  //
1437  // One tricky aspect of the state machine is that things that can
1438  // throw should not be invoked by the state machine while another
1439  // exception is being handled.
1440  // Another tricky aspect is that it appears to be important to
1441  // terminate the state machine before invoking its destructor.
1442  // We've seen crashes that are not understood when that is not
1443  // done. Maintainers of this code should be careful about this.
1444 
1445  catch (cms::Exception & e) {
1447  terminateMachine(std::move(machine));
1448  alreadyHandlingException_ = false;
1449  if (!exceptionMessageLumis_.empty()) {
1451  if (e.alreadyPrinted()) {
1452  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1453  }
1454  }
1455  if (!exceptionMessageRuns_.empty()) {
1457  if (e.alreadyPrinted()) {
1458  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1459  }
1460  }
1461  if (!exceptionMessageFiles_.empty()) {
1463  if (e.alreadyPrinted()) {
1464  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1465  }
1466  }
1467  throw;
1468  }
1469 
1470  if(machine->terminated()) {
1471  FDEBUG(1) << "The state machine reports it has been terminated\n";
1472  machine.reset();
1473  }
1474 
1476  throw cms::Exception("BadState")
1477  << "The boost state machine in the EventProcessor exited after\n"
1478  << "entering the Error state.\n";
1479  }
1480 
1481  }
1482  if(machine.get() != nullptr) {
1483  terminateMachine(std::move(machine));
1485  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1486  << "Please report this error to the Framework group\n";
1487  }
1488 
1489  return returnCode;
1490  }
1491 
1493  FDEBUG(1) << " \treadFile\n";
1494  size_t size = preg_->size();
1495  SendSourceTerminationSignalIfException sentry(actReg_.get());
1496 
1497  fb_ = input_->readFile();
1498  if(size < preg_->size()) {
1500  }
1502  if((numberOfForkedChildren_ > 0) or
1505  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1506  }
1507  sentry.completedSuccessfully();
1508  }
1509 
1510  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1511  if (fb_.get() != nullptr) {
1512  SendSourceTerminationSignalIfException sentry(actReg_.get());
1513  input_->closeFile(fb_.get(), cleaningUpAfterException);
1514  sentry.completedSuccessfully();
1515  }
1516  FDEBUG(1) << "\tcloseInputFile\n";
1517  }
1518 
1520  if (fb_.get() != nullptr) {
1521  schedule_->openOutputFiles(*fb_);
1522  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1523  }
1524  FDEBUG(1) << "\topenOutputFiles\n";
1525  }
1526 
1528  if (fb_.get() != nullptr) {
1529  schedule_->closeOutputFiles();
1530  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1531  }
1532  FDEBUG(1) << "\tcloseOutputFiles\n";
1533  }
1534 
1536  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1537  if (fb_.get() != nullptr) {
1538  schedule_->respondToOpenInputFile(*fb_);
1539  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1540  }
1541  FDEBUG(1) << "\trespondToOpenInputFile\n";
1542  }
1543 
1545  if (fb_.get() != nullptr) {
1546  schedule_->respondToCloseInputFile(*fb_);
1547  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1548  }
1549  FDEBUG(1) << "\trespondToCloseInputFile\n";
1550  }
1551 
1553  shouldWeStop_ = false;
1554  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1555  // until after we've called beginOfJob
1556  if(looper_ && looperBeginJobRun_) {
1557  looper_->doStartingNewLoop();
1558  }
1559  FDEBUG(1) << "\tstartingNewLoop\n";
1560  }
1561 
1563  if(looper_) {
1564  ModuleChanger changer(schedule_.get(),preg_.get());
1565  looper_->setModuleChanger(&changer);
1566  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1567  looper_->setModuleChanger(nullptr);
1568  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1569  else return false;
1570  }
1571  FDEBUG(1) << "\tendOfLoop\n";
1572  return true;
1573  }
1574 
1576  input_->repeat();
1577  input_->rewind();
1578  FDEBUG(1) << "\trewind\n";
1579  }
1580 
1582  looper_->prepareForNextLoop(esp_.get());
1583  FDEBUG(1) << "\tprepareForNextLoop\n";
1584  }
1585 
1587  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1588  if(!subProcesses_.empty()) {
1589  for(auto const& subProcess : subProcesses_) {
1590  if(subProcess.shouldWeCloseOutput()) {
1591  return true;
1592  }
1593  }
1594  return false;
1595  }
1596  return schedule_->shouldWeCloseOutput();
1597  }
1598 
1600  FDEBUG(1) << "\tdoErrorStuff\n";
1601  LogError("StateMachine")
1602  << "The EventProcessor state machine encountered an unexpected event\n"
1603  << "and went to the error state\n"
1604  << "Will attempt to terminate processing normally\n"
1605  << "(IF using the looper the next loop will be attempted)\n"
1606  << "This likely indicates a bug in an input module or corrupted input or both\n";
1608  }
1609 
1611  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1612  {
1613  SendSourceTerminationSignalIfException sentry(actReg_.get());
1614 
1615  input_->doBeginRun(runPrincipal, &processContext_);
1616  sentry.completedSuccessfully();
1617  }
1618 
1619  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1620  runPrincipal.beginTime());
1622  espController_->forceCacheClear();
1623  }
1624  {
1625  SendSourceTerminationSignalIfException sentry(actReg_.get());
1626  espController_->eventSetupForInstance(ts);
1627  sentry.completedSuccessfully();
1628  }
1629  EventSetup const& es = esp_->eventSetup();
1630  if(looper_ && looperBeginJobRun_== false) {
1631  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1632  looper_->beginOfJob(es);
1633  looperBeginJobRun_ = true;
1634  looper_->doStartingNewLoop();
1635  }
1636  {
1638  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1639  for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
1640  }
1641  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1642  if(looper_) {
1643  looper_->doBeginRun(runPrincipal, es, &processContext_);
1644  }
1645  {
1646  //To wait, the ref count has to be 1+#streams
1647  auto streamLoopWaitTask = make_empty_waiting_task();
1648  streamLoopWaitTask->increment_ref_count();
1649 
1651 
1652  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1653  *schedule_,
1655  runPrincipal,
1656  ts,
1657  es,
1658  subProcesses_);
1659 
1660  streamLoopWaitTask->wait_for_all();
1661  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1662  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1663  }
1664  }
1665  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1666  if(looper_) {
1667  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1668  }
1669  }
1670 
1671  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1672  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1673  {
1674  SendSourceTerminationSignalIfException sentry(actReg_.get());
1675 
1676  runPrincipal.setEndTime(input_->timestamp());
1677  runPrincipal.setComplete();
1678  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1679  sentry.completedSuccessfully();
1680  }
1681 
1683  runPrincipal.endTime());
1684  {
1685  SendSourceTerminationSignalIfException sentry(actReg_.get());
1686  espController_->eventSetupForInstance(ts);
1687  sentry.completedSuccessfully();
1688  }
1689  EventSetup const& es = esp_->eventSetup();
1690  {
1691  //To wait, the ref count has to be 1+#streams
1692  auto streamLoopWaitTask = make_empty_waiting_task();
1693  streamLoopWaitTask->increment_ref_count();
1694 
1696 
1697  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1698  *schedule_,
1700  runPrincipal,
1701  ts,
1702  es,
1703  subProcesses_,
1704  cleaningUpAfterException);
1705 
1706  streamLoopWaitTask->wait_for_all();
1707  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1708  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1709  }
1710  }
1711  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1712  if(looper_) {
1713  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1714  }
1715  {
1717  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1718  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1719  }
1720  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1721  if(looper_) {
1722  looper_->doEndRun(runPrincipal, es, &processContext_);
1723  }
1724  }
1725 
1727  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1728  {
1729  SendSourceTerminationSignalIfException sentry(actReg_.get());
1730 
1731  input_->doBeginLumi(lumiPrincipal, &processContext_);
1732  sentry.completedSuccessfully();
1733  }
1734 
1736  if(rng.isAvailable()) {
1737  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1738  rng->preBeginLumi(lb);
1739  }
1740 
1741  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1742  // lumi blocks know their start and end times why not also start and end events?
1743  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1744  {
1745  SendSourceTerminationSignalIfException sentry(actReg_.get());
1746  espController_->eventSetupForInstance(ts);
1747  sentry.completedSuccessfully();
1748  }
1749  EventSetup const& es = esp_->eventSetup();
1750  {
1752  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1753  for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
1754  }
1755  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1756  if(looper_) {
1757  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1758  }
1759  {
1760  //To wait, the ref count has to b 1+#streams
1761  auto streamLoopWaitTask = make_empty_waiting_task();
1762  streamLoopWaitTask->increment_ref_count();
1763 
1765 
1766  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1767  *schedule_,
1769  lumiPrincipal,
1770  ts,
1771  es,
1772  subProcesses_);
1773  streamLoopWaitTask->wait_for_all();
1774  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1775  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1776  }
1777  }
1778 
1779  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1780  if(looper_) {
1781  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1782  }
1783  }
1784 
1785  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1786  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1787  {
1788  SendSourceTerminationSignalIfException sentry(actReg_.get());
1789 
1790  lumiPrincipal.setEndTime(input_->timestamp());
1791  lumiPrincipal.setComplete();
1792  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1793  sentry.completedSuccessfully();
1794  }
1795  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1796  // lumi blocks know their start and end times why not also start and end events?
1797  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1798  lumiPrincipal.endTime());
1799  {
1800  SendSourceTerminationSignalIfException sentry(actReg_.get());
1801  espController_->eventSetupForInstance(ts);
1802  sentry.completedSuccessfully();
1803  }
1804  EventSetup const& es = esp_->eventSetup();
1805  {
1806  //To wait, the ref count has to b 1+#streams
1807  auto streamLoopWaitTask = make_empty_waiting_task();
1808  streamLoopWaitTask->increment_ref_count();
1809 
1811 
1812  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1813  *schedule_,
1815  lumiPrincipal,
1816  ts,
1817  es,
1818  subProcesses_,
1819  cleaningUpAfterException);
1820  streamLoopWaitTask->wait_for_all();
1821  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1822  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1823  }
1824  }
1825  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1826  if(looper_) {
1827  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1828  }
1829  {
1831  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1832  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1833  }
1834  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1835  if(looper_) {
1836  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1837  }
1838  }
1839 
1843  << "EventProcessor::readRun\n"
1844  << "Illegal attempt to insert run into cache\n"
1845  << "Contact a Framework Developer\n";
1846  }
1847  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1848  {
1849  SendSourceTerminationSignalIfException sentry(actReg_.get());
1850  input_->readRun(*rp, *historyAppender_);
1851  sentry.completedSuccessfully();
1852  }
1853  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1854  principalCache_.insert(rp);
1855  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1856  }
1857 
1859  principalCache_.merge(input_->runAuxiliary(), preg());
1860  auto runPrincipal =principalCache_.runPrincipalPtr();
1861  {
1862  SendSourceTerminationSignalIfException sentry(actReg_.get());
1863  input_->readAndMergeRun(*runPrincipal);
1864  sentry.completedSuccessfully();
1865  }
1866  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1867  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1868  }
1869 
1873  << "EventProcessor::readRun\n"
1874  << "Illegal attempt to insert lumi into cache\n"
1875  << "Contact a Framework Developer\n";
1876  }
1879  << "EventProcessor::readRun\n"
1880  << "Illegal attempt to insert lumi into cache\n"
1881  << "Run is invalid\n"
1882  << "Contact a Framework Developer\n";
1883  }
1884  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1885  {
1886  SendSourceTerminationSignalIfException sentry(actReg_.get());
1887  input_->readLuminosityBlock(*lbp, *historyAppender_);
1888  sentry.completedSuccessfully();
1889  }
1890  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1891  principalCache_.insert(lbp);
1892  return input_->luminosityBlock();
1893  }
1894 
1896  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1897  {
1898  SendSourceTerminationSignalIfException sentry(actReg_.get());
1899  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1900  sentry.completedSuccessfully();
1901  }
1902  return input_->luminosityBlock();
1903  }
1904 
1907  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1908  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1909  }
1910 
1913  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1914  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1915  }
1916 
1918  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1919  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1920  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1921  }
1922 
1924  principalCache_.deleteLumi(phid, run, lumi);
1925  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1926  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1927  }
1928 
1929  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1930  std::atomic<bool>* finishedProcessingEvents) {
1931  if(shouldWeStop()) {
1932  return false;
1933  }
1934 
1935  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1936  return false;
1937  }
1938 
1939  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1940  return false;
1941  }
1942 
1946  handler->initializeThisThreadForUse();
1947  }
1948 
1949  try {
1950  //need to use lock in addition to the serial task queue because
1951  // of delayed provenance reading and reading data in response to
1952  // edm::Refs etc
1953  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1954  if(not firstEventInBlock_) {
1955  //The state machine already called input_->nextItemType
1956  // and found an event. We can't call input_->nextItemType
1957  // again since it would move to the next transition
1958  InputSource::ItemType itemType = input_->nextItemType();
1959  if (InputSource::IsEvent !=itemType) {
1961  finishedProcessingEvents->store(true,std::memory_order_release);
1962  //std::cerr<<"next item type "<<itemType<<"\n";
1963  return false;
1964  }
1966  //std::cerr<<"task told to async stop\n";
1967  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1968  return false;
1969  }
1970  } else {
1971  firstEventInBlock_ = false;
1972  }
1973  readEvent(iStreamIndex);
1974  } catch (...) {
1975  bool expected =false;
1976  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1977  deferredExceptionPtr_ = std::current_exception();
1978 
1979  }
1980  return false;
1981  }
1982  return true;
1983  }
1984 
1986  unsigned int iStreamIndex,
1987  std::atomic<bool>* finishedProcessingEvents)
1988  {
1989  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
1990  if(iPtr) {
1991  bool expected = false;
1992  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1993  deferredExceptionPtr_ = *iPtr;
1994  {
1995  WaitingTaskHolder h(iTask);
1996  h.doneWaiting(*iPtr);
1997  }
1998  }
1999  //the stream will stop now
2000  iTask->decrement_ref_count();
2001  return;
2002  }
2003 
2004  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
2005  });
2006 
2007  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
2009 
2010  try {
2011  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
2012  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
2013  } else {
2014  //the stream will stop now
2015  tbb::task::destroy(*recursionTask);
2016  iTask->decrement_ref_count();
2017  }
2018  } catch(...) {
2019  WaitingTaskHolder h(recursionTask);
2020  h.doneWaiting(std::current_exception());
2021  }
2022  });
2023  }
2024 
2026  if(numberOfForkedChildren_>0) {
2027  //Have to do something special for forking since
2028  // after each event the system may have to skip
2029  // some transitions. This is handled in runToCompletion
2030  readEvent(0);
2031  auto eventLoopWaitTask = make_empty_waiting_task();
2032  eventLoopWaitTask->increment_ref_count();
2033  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
2034  eventLoopWaitTask->wait_for_all();
2035  return;
2036  }
2039 
2040  std::atomic<bool> finishedProcessingEvents{false};
2041  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2042 
2043  //The state machine already found the event so
2044  // we have to avoid looking again
2045  firstEventInBlock_ = true;
2046 
2047  //To wait, the ref count has to b 1+#streams
2048  auto eventLoopWaitTask = make_empty_waiting_task();
2049  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2050  eventLoopWaitTask->increment_ref_count();
2051 
2052  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2053  unsigned int iStreamIndex = 0;
2054  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2055  eventLoopWaitTask->increment_ref_count();
2056  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2057  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2058  }) );
2059  }
2060  eventLoopWaitTask->increment_ref_count();
2061  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2062  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2063  }));
2064 
2065  //One of the processing threads saw an exception
2067  std::rethrow_exception(deferredExceptionPtr_);
2068  }
2069  }
2070  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2071  //TODO this will have to become per stream
2072  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2073  StreamContext streamContext(event.streamID(), &processContext_);
2074 
2075  SendSourceTerminationSignalIfException sentry(actReg_.get());
2076  input_->readEvent(event, streamContext);
2077  sentry.completedSuccessfully();
2078 
2079  FDEBUG(1) << "\treadEvent\n";
2080  }
2081 
2083  unsigned int iStreamIndex) {
2084  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2085  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2087  if(rng.isAvailable()) {
2088  Event ev(*pep, ModuleDescription(), nullptr);
2089  rng->postEventRead(ev);
2090  }
2091  assert(pep->luminosityBlockPrincipalPtrValid());
2092  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2093  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2094 
2095  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2096  tbb::task::allocate_root(),
2097  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2098  {
2100 
2101  //NOTE: If we have a looper we only have one Stream
2102  if(looper_) {
2103  processEventWithLooper(*pep);
2104  }
2105 
2106  FDEBUG(1) << "\tprocessEvent\n";
2107  pep->clearEventPrincipal();
2108  if(iPtr) {
2109  iHolder.doneWaiting(*iPtr);
2110  } else {
2111  iHolder.doneWaiting(std::exception_ptr());
2112  }
2113  }
2114  )
2115  );
2116  WaitingTaskHolder afterProcessTask;
2117  if(subProcesses_.empty()) {
2118  afterProcessTask = std::move(finalizeEventTask);
2119  } else {
2120  //Need to run SubProcesses after schedule has finished
2121  // with the event
2122  afterProcessTask = WaitingTaskHolder(
2123  make_waiting_task(tbb::task::allocate_root(),
2124  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2125  {
2126  if(not iPtr) {
2128 
2129  //when run with 1 thread, we want to the order to be what
2130  // it was before. This requires reversing the order since
2131  // tasks are run last one in first one out
2132  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2133  subProcess.doEventAsync(finalizeEventTask,*pep);
2134  }
2135  } else {
2136  finalizeEventTask.doneWaiting(*iPtr);
2137  }
2138  })
2139  );
2140  }
2141 
2142  schedule_->processOneEventAsync(std::move(afterProcessTask),
2143  iStreamIndex,*pep, esp_->eventSetup());
2144 
2145  }
2146 
2148  bool randomAccess = input_->randomAccess();
2149  ProcessingController::ForwardState forwardState = input_->forwardState();
2150  ProcessingController::ReverseState reverseState = input_->reverseState();
2151  ProcessingController pc(forwardState, reverseState, randomAccess);
2152 
2154  do {
2155 
2156  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2157  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2158 
2159  bool succeeded = true;
2160  if(randomAccess) {
2162  input_->skipEvents(-2);
2163  }
2165  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2166  }
2167  }
2168  pc.setLastOperationSucceeded(succeeded);
2169  } while(!pc.lastOperationSucceeded());
2170  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2171  }
2172 
2174  FDEBUG(1) << "\tshouldWeStop\n";
2175  if(shouldWeStop_) return true;
2176  if(!subProcesses_.empty()) {
2177  for(auto const& subProcess : subProcesses_) {
2178  if(subProcess.terminate()) {
2179  return true;
2180  }
2181  }
2182  return false;
2183  }
2184  return schedule_->terminate();
2185  }
2186 
2189  }
2190 
2193  }
2194 
2197  }
2198 
2201  }
2202 
2203  void EventProcessor::terminateMachine(std::unique_ptr<statemachine::Machine> iMachine) {
2204  if(iMachine.get() != nullptr) {
2205  if(!iMachine->terminated()) {
2206  forceLooperToEnd_ = true;
2207  iMachine->process_event(statemachine::Stop());
2208  forceLooperToEnd_ = false;
2209  }
2210  else {
2211  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2212  }
2213  if(iMachine->terminated()) {
2214  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2215  }
2216  }
2217  }
2218 }
std::string emptyRunLumiMode_
size
Write out results.
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
virtual void openOutputFiles() override
virtual void setExceptionMessageLumis(std::string &message) override
SharedResourcesAcquirer sourceResourcesAcquirer_
virtual void doErrorStuff() override
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
virtual statemachine::Run readAndMergeRun() override
edm::propagate_const< std::unique_ptr< InputSource > > input_
virtual bool endOfLoop() override
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
static PFTauRenderPlugin instance
def destroy(e)
Definition: pyrootRender.py:13
ParameterSetID id() const
virtual bool shouldWeCloseOutput() const override
void possiblyContinueAfterForkChildFailure()
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual int readLuminosityBlock() override
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
#define NULL
Definition: scimark2.h:8
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException) override
bool ev
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
bool hasRunPrincipal() const
Definition: config.py:1
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
virtual void setExceptionMessageRuns(std::string &message) override
std::set< std::pair< std::string, std::string > > ExcludedData
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
virtual void startingNewLoop() override
virtual void rewindInput() override
bool alreadyPrinted() const
Definition: Exception.cc:251
bool forkProcess(std::string const &jobReportFile)
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:91
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
virtual bool shouldWeStop() const override
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual statemachine::Run readRun() override
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
virtual void prepareForNextLoop() override
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual bool alreadyHandlingException() const override
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:102
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:44
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: value.py:1
def pipe(cmdline, input=None)
Definition: pipe.py:5
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
int totalEvents() const
std::shared_ptr< edm::ParameterSet > parameterSet() const
virtual StatusCode runToCompletion() override
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:692
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void readFile() override
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
edm::RunNumber_t runNumber() const
Definition: EPStates.h:50
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
virtual void closeOutputFiles() override
HLT enums.
virtual void writeRun(statemachine::Run const &run) override
virtual void closeInputFile(bool cleaningUpAfterException) override
virtual void respondToOpenInputFile() override
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:49
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual int readAndMergeLumi() override
unsigned int RunNumber_t
#define O_NONBLOCK
Definition: SysFile.h:21
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
virtual void respondToCloseInputFile() override
virtual void readAndProcessEvent() override
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
virtual void deleteRunFromCache(statemachine::Run const &run) override
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
Definition: pipe.py:1
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
virtual void setExceptionMessageFiles(std::string &message) override
T get(const Candidate &c)
Definition: component.h:55
virtual void beginRun(statemachine::Run const &run) override
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31