CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
2 
9 
38 
40 
48 
53 
55 
67 
68 #include "MessageForSource.h"
69 #include "MessageForParent.h"
70 
71 #include "boost/thread/xtime.hpp"
72 #include "boost/range/adaptor/reversed.hpp"
73 
74 #include <exception>
75 #include <iomanip>
76 #include <iostream>
77 #include <utility>
78 #include <sstream>
79 
80 #include <sys/ipc.h>
81 #include <sys/msg.h>
82 
83 #include "tbb/task.h"
84 
85 //Used for forking
86 #include <sys/types.h>
87 #include <sys/wait.h>
88 #include <sys/socket.h>
89 #include <sys/select.h>
90 #include <sys/fcntl.h>
91 #include <unistd.h>
92 
93 
94 //Used for CPU affinity
95 #ifndef __APPLE__
96 #include <sched.h>
97 #endif
98 
99 namespace {
100  //Sentry class to only send a signal if an
101  // exception occurs. An exception is identified
102  // by the destructor being called without first
103  // calling completedSuccessfully().
104  class SendSourceTerminationSignalIfException {
105  public:
106  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
107  reg_(iReg) {}
108  ~SendSourceTerminationSignalIfException() {
109  if(reg_) {
110  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
111  }
112  }
113  void completedSuccessfully() {
114  reg_ = nullptr;
115  }
116  private:
117  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
118  };
119 
120 }
121 
122 namespace edm {
123 
124  // ---------------------------------------------------------------
125  std::unique_ptr<InputSource>
127  CommonParams const& common,
128  std::shared_ptr<ProductRegistry> preg,
129  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
130  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
131  std::shared_ptr<ActivityRegistry> areg,
132  std::shared_ptr<ProcessConfiguration const> processConfiguration,
133  PreallocationConfiguration const& allocations) {
134  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
135  if(main_input == 0) {
137  << "There must be exactly one source in the configuration.\n"
138  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
139  }
140 
141  std::string modtype(main_input->getParameter<std::string>("@module_type"));
142 
143  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
145  ConfigurationDescriptions descriptions(filler->baseType());
146  filler->fill(descriptions);
147 
148  try {
149  convertException::wrap([&]() {
150  descriptions.validate(*main_input, std::string("source"));
151  });
152  }
153  catch (cms::Exception & iException) {
154  std::ostringstream ost;
155  ost << "Validating configuration of input source of type " << modtype;
156  iException.addContext(ost.str());
157  throw;
158  }
159 
160  main_input->registerIt();
161 
162  // Fill in "ModuleDescription", in case the input source produces
163  // any EDProducts, which would be registered in the ProductRegistry.
164  // Also fill in the process history item for this process.
165  // There is no module label for the unnamed input source, so
166  // just use "source".
167  // Only the tracked parameters belong in the process configuration.
168  ModuleDescription md(main_input->id(),
169  main_input->getParameter<std::string>("@module_type"),
170  "source",
171  processConfiguration.get(),
172  ModuleDescription::getUniqueID());
173 
174  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
175  common.maxEventsInput_, common.maxLumisInput_,
176  common.maxSecondsUntilRampdown_, allocations);
177 
178  areg->preSourceConstructionSignal_(md);
179  std::unique_ptr<InputSource> input;
180  try {
181  //even if we have an exception, send the signal
182  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
183  convertException::wrap([&]() {
184  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
185  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
186  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
187  });
188  }
189  catch (cms::Exception& iException) {
190  std::ostringstream ost;
191  ost << "Constructing input source of type " << modtype;
192  iException.addContext(ost.str());
193  throw;
194  }
195  return input;
196  }
197 
198  // ---------------------------------------------------------------
199  std::shared_ptr<EDLooperBase>
202  ParameterSet& params) {
203  std::shared_ptr<EDLooperBase> vLooper;
204 
205  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
206 
207  if(loopers.size() == 0) {
208  return vLooper;
209  }
210 
211  assert(1 == loopers.size());
212 
213  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
214  itName != itNameEnd;
215  ++itName) {
216 
217  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
218  providerPSet->registerIt();
219  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
220  cp,
221  *providerPSet);
222  }
223  return vLooper;
224  }
225 
226  // ---------------------------------------------------------------
227  EventProcessor::EventProcessor(std::string const& config,
228  ServiceToken const& iToken,
230  std::vector<std::string> const& defaultServices,
231  std::vector<std::string> const& forcedServices) :
232  actReg_(),
233  preg_(),
234  branchIDListHelper_(),
235  serviceToken_(),
236  input_(),
237  espController_(new eventsetup::EventSetupsController),
238  esp_(),
239  act_table_(),
240  processConfiguration_(),
241  schedule_(),
242  subProcesses_(),
243  historyAppender_(new HistoryAppender),
244  fb_(),
245  looper_(),
246  deferredExceptionPtrIsSet_(false),
247  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
248  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
249  principalCache_(),
250  beginJobCalled_(false),
251  shouldWeStop_(false),
252  stateMachineWasInErrorState_(false),
253  fileMode_(),
254  emptyRunLumiMode_(),
255  exceptionMessageFiles_(),
256  exceptionMessageRuns_(),
257  exceptionMessageLumis_(),
258  alreadyHandlingException_(false),
259  forceLooperToEnd_(false),
260  looperBeginJobRun_(false),
261  forceESCacheClearOnNewRun_(false),
262  numberOfForkedChildren_(0),
263  numberOfSequentialEventsPerChild_(1),
264  setCpuAffinity_(false),
265  eventSetupDataToExcludeFromPrefetching_() {
266  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
267  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
268  processDesc->addServices(defaultServices, forcedServices);
269  init(processDesc, iToken, iLegacy);
270  }
271 
273  std::vector<std::string> const& defaultServices,
274  std::vector<std::string> const& forcedServices) :
275  actReg_(),
276  preg_(),
278  serviceToken_(),
279  input_(),
280  espController_(new eventsetup::EventSetupsController),
281  esp_(),
282  act_table_(),
284  schedule_(),
285  subProcesses_(),
287  fb_(),
288  looper_(),
290  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
291  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
292  principalCache_(),
296  fileMode_(),
311  {
312  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
313  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
314  processDesc->addServices(defaultServices, forcedServices);
316  }
317 
318  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
319  ServiceToken const& token,
321  actReg_(),
322  preg_(),
324  serviceToken_(),
325  input_(),
326  espController_(new eventsetup::EventSetupsController),
327  esp_(),
328  act_table_(),
330  schedule_(),
331  subProcesses_(),
333  fb_(),
334  looper_(),
336  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
337  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
338  principalCache_(),
342  fileMode_(),
357  {
358  init(processDesc, token, legacy);
359  }
360 
361 
363  actReg_(),
364  preg_(),
366  serviceToken_(),
367  input_(),
368  espController_(new eventsetup::EventSetupsController),
369  esp_(),
370  act_table_(),
372  schedule_(),
373  subProcesses_(),
375  fb_(),
376  looper_(),
378  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
379  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
380  principalCache_(),
384  fileMode_(),
399  {
400  if(isPython) {
401  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
402  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
404  }
405  else {
406  auto processDesc = std::make_shared<ProcessDesc>(config);
408  }
409  }
410 
411  void
412  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
413  ServiceToken const& iToken,
415 
416  //std::cerr << processDesc->dump() << std::endl;
417 
418  // register the empty parentage vector , once and for all
420 
421  // register the empty parameter set, once and for all.
423 
424  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
425 
426  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
427  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
428  bool const hasSubProcesses = !subProcessVParameterSet.empty();
429 
430  // Now set some parameters specific to the main process.
431  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
432  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
433  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
434  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
435  //threading
436  unsigned int nThreads=1;
437  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
438  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
439  if(nThreads == 0) {
440  nThreads = 1;
441  }
442  }
443  /* TODO: when we support having each stream run in a different thread use this default
444  unsigned int nStreams =nThreads;
445  */
446  unsigned int nStreams =1;
447  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
448  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
449  if(nStreams==0) {
450  nStreams = nThreads;
451  }
452  }
453  if(nThreads >1) {
454  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
455  }
456 
457  /*
458  bool nRunsSet = false;
459  */
460  unsigned int nConcurrentRuns =1;
461  /*
462  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
463  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
464  }
465  */
466  unsigned int nConcurrentLumis =1;
467  /*
468  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
469  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
470  } else {
471  nConcurrentLumis = nConcurrentRuns;
472  }
473  */
474  //Check that relationships between threading parameters makes sense
475  /*
476  if(nThreads<nStreams) {
477  //bad
478  }
479  if(nConcurrentRuns>nStreams) {
480  //bad
481  }
482  if(nConcurrentRuns>nConcurrentLumis) {
483  //bad
484  }
485  */
486  //forking
487  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
488  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
489  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
490  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
491  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
492  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
493  for(auto const& ps : excluded) {
494  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
495  ps.getUntrackedParameter<std::string>("label", ""));
496  }
497  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
498 
499  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
500 
501  // Now do general initialization
503 
504  //initialize the services
505  auto& serviceSets = processDesc->getServicesPSets();
506  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
507  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
508 
509  //make the services available
511 
512  if(nStreams>1) {
514  handler->willBeUsingThreads();
515  }
516 
517  // intialize miscellaneous items
518  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
519 
520  // intialize the event setup provider
521  esp_ = espController_->makeProvider(*parameterSet);
522 
523  // initialize the looper, if any
524  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
525  if(looper_) {
526  looper_->setActionTable(items.act_table_.get());
527  looper_->attachTo(*items.actReg_);
528 
529  //For now loopers make us run only 1 transition at a time
530  nStreams=1;
531  nConcurrentLumis=1;
532  nConcurrentRuns=1;
533  }
534 
535  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
536 
537  // initialize the input source
538  input_ = makeInput(*parameterSet,
539  *common,
540  items.preg(),
541  items.branchIDListHelper(),
543  items.actReg_,
544  items.processConfiguration(),
546 
547  // intialize the Schedule
548  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
549 
550  // set the data members
552  actReg_ = items.actReg_;
553  preg_ = items.preg();
558  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
559 
560  FDEBUG(2) << parameterSet << std::endl;
561 
563  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
564  // Reusable event principal
565  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
568  }
569 
570  // fill the subprocesses, if there are any
571  subProcesses_.reserve(subProcessVParameterSet.size());
572  for(auto& subProcessPSet : subProcessVParameterSet) {
573  subProcesses_.emplace_back(subProcessPSet,
574  *parameterSet,
575  preg(),
580  *actReg_,
581  token,
584  &processContext_);
585  }
586  }
587 
589  // Make the services available while everything is being deleted.
590  ServiceToken token = getToken();
591  ServiceRegistry::Operate op(token);
592 
593  // manually destroy all these thing that may need the services around
594  // propagate_const<T> has no reset() function
595  espController_ = nullptr;
596  esp_ = nullptr;
597  schedule_ = nullptr;
598  input_ = nullptr;
599  looper_ = nullptr;
600  actReg_ = nullptr;
601 
604  }
605 
606  void
608  if(beginJobCalled_) return;
609  beginJobCalled_=true;
610  bk::beginJob();
611 
612  // StateSentry toerror(this); // should we add this ?
613  //make the services available
615 
620  actReg_->preallocateSignal_(bounds);
622 
623  //NOTE: this may throw
625  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
626 
627  //NOTE: This implementation assumes 'Job' means one call
628  // the EventProcessor::run
629  // If it really means once per 'application' then this code will
630  // have to be changed.
631  // Also have to deal with case where have 'run' then new Module
632  // added and do 'run'
633  // again. In that case the newly added Module needs its 'beginJob'
634  // to be called.
635 
636  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
637  // For now we delay calling beginOfJob until first beginOfRun
638  //if(looper_) {
639  // looper_->beginOfJob(es);
640  //}
641  try {
642  convertException::wrap([&]() {
643  input_->doBeginJob();
644  });
645  }
646  catch(cms::Exception& ex) {
647  ex.addContext("Calling beginJob for the source");
648  throw;
649  }
650  schedule_->beginJob(*preg_);
651  // toerror.succeeded(); // should we add this?
652  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
653  actReg_->postBeginJobSignal_();
654 
655  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
656  schedule_->beginStream(i);
657  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
658  }
659  }
660 
661  void
663  // Collects exceptions, so we don't throw before all operations are performed.
664  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
665 
666  //make the services available
668 
669  //NOTE: this really should go elsewhere in the future
670  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
671  c.call([this,i](){this->schedule_->endStream(i);});
672  for(auto& subProcess : subProcesses_) {
673  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
674  }
675  }
676  auto actReg = actReg_.get();
677  c.call([actReg](){actReg->preEndJobSignal_();});
678  schedule_->endJob(c);
679  for(auto& subProcess : subProcesses_) {
680  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
681  }
682  c.call(std::bind(&InputSource::doEndJob, input_.get()));
683  if(looper_) {
684  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
685  }
686  c.call([actReg](){actReg->postEndJobSignal_();});
687  if(c.hasThrown()) {
688  c.rethrow();
689  }
690  }
691 
694  return serviceToken_;
695  }
696 
697  //Setup signal handler to listen for when forked children stop
698  namespace {
699  //These are volatile since the compiler can not be allowed to optimize them
700  // since they can be modified in the signaller handler
701  volatile bool child_failed = false;
702  volatile unsigned int num_children_done = 0;
703  volatile int child_fail_exit_status = 0;
704  volatile int child_fail_signal = 0;
705 
706  //NOTE: We setup the signal handler to run in the main thread which
707  // is also the same thread that then reads the above values
708 
709  extern "C" {
710  void ep_sigchld(int, siginfo_t*, void*) {
711  //printf("in sigchld\n");
712  //FDEBUG(1) << "in sigchld handler\n";
713  int stat_loc;
714  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
715  while(0<p) {
716  //printf(" looping\n");
717  if(WIFEXITED(stat_loc)) {
718  ++num_children_done;
719  if(0 != WEXITSTATUS(stat_loc)) {
720  child_fail_exit_status = WEXITSTATUS(stat_loc);
721  child_failed = true;
722  }
723  }
724  if(WIFSIGNALED(stat_loc)) {
725  ++num_children_done;
726  child_fail_signal = WTERMSIG(stat_loc);
727  child_failed = true;
728  }
729  p = waitpid(-1, &stat_loc, WNOHANG);
730  }
731  }
732  }
733 
734  }
735 
736  enum {
741  };
742 
743  namespace {
744  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
745  unsigned int n = 0;
746  while(numberOfChildren != 0) {
747  ++n;
748  numberOfChildren /= 10;
749  }
750  if(n == 0) {
751  n = 3; // Protect against zero numberOfChildren
752  }
753  return n;
754  }
755 
756  /*This class embodied the thread which is used to listen to the forked children and
757  then tell them which events they should process */
758  class MessageSenderToSource {
759  public:
760  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
761  void operator()();
762 
763  private:
764  const std::vector<int>& m_childrenPipes;
765  long const m_nEventsToProcess;
766  fd_set m_socketSet;
767  unsigned int m_aliveChildren;
768  int m_maxFd;
769  };
770 
771  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
772  std::vector<int> const& childrenPipes,
773  long iNEventsToProcess):
774  m_childrenPipes(childrenPipes),
775  m_nEventsToProcess(iNEventsToProcess),
776  m_aliveChildren(childrenSockets.size()),
777  m_maxFd(0)
778  {
779  FD_ZERO(&m_socketSet);
780  for (auto const socket : childrenSockets) {
781  FD_SET(socket, &m_socketSet);
782  if (socket > m_maxFd) {
783  m_maxFd = socket;
784  }
785  }
786  for (auto const pipe : childrenPipes) {
787  FD_SET(pipe, &m_socketSet);
788  if (pipe > m_maxFd) {
789  m_maxFd = pipe;
790  }
791  }
792  m_maxFd++; // select reads [0,m_maxFd).
793  }
794 
795  /* This function is the heart of the communication between parent and child.
796  * When ready for more data, the child (see MessageReceiverForSource) requests
797  * data through a AF_UNIX socket message. The parent will then assign the next
798  * chunk of data by sending a message back.
799  *
800  * Additionally, this function also monitors the read-side of the pipe fd from the child.
801  * If the child dies unexpectedly, the pipe will be selected as ready for read and
802  * will return EPIPE when read from. Further, if the child thinks the parent has died
803  * (defined as waiting more than 1s for a response), it will write a single byte to
804  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
805  * If still alive, the parent will read the byte and ignore it.
806  *
807  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
808  * handler can distinguish between success and failure cases.
809  */
810 
811  void
812  MessageSenderToSource::operator()() {
814  LogInfo("ForkingController") << "I am controller";
815  //this is the master and therefore the controller
816 
818  sndmsg.startIndex = 0;
819  sndmsg.nIndices = m_nEventsToProcess;
820  do {
821 
822  fd_set readSockets, errorSockets;
823  // Wait for a request from a child for events.
824  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
825  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
826  // Note that we don't timeout; may be reconsidered in the future.
827  ssize_t rc;
828  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
829  if (rc < 0) {
830  std::cerr << "select failed; should be impossible due to preconditions.\n";
831  abort();
832  break;
833  }
834 
835  // Read the message from the child.
836  for (int idx=0; idx<m_maxFd; idx++) {
837 
838  // Handle errors
839  if (FD_ISSET(idx, &errorSockets)) {
840  LogInfo("ForkingController") << "Error on socket " << idx;
841  FD_CLR(idx, &m_socketSet);
842  close(idx);
843  // See if it was the watchdog pipe that died.
844  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
845  if (*it == idx) {
846  m_aliveChildren--;
847  }
848  }
849  continue;
850  }
851 
852  if (!FD_ISSET(idx, &readSockets)) {
853  continue;
854  }
855 
856  // See if this FD is a child watchdog pipe. If so, read from it to prevent
857  // writes from blocking.
858  bool is_pipe = false;
859  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
860  if (*it == idx) {
861  is_pipe = true;
862  char buf;
863  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
864  if (rc <= 0) {
865  m_aliveChildren--;
866  FD_CLR(idx, &m_socketSet);
867  close(idx);
868  }
869  }
870  }
871 
872  // Only execute this block if the FD is a socket for sending the child work.
873  if (!is_pipe) {
874  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
875  if (rc < 0) {
876  FD_CLR(idx, &m_socketSet);
877  close(idx);
878  continue;
879  }
880 
881  // Tell the child what events to process.
882  // If 'send' fails, then the child process has failed (any other possibilities are
883  // eliminated because we are using fixed-size messages with Unix datagram sockets).
884  // Thus, the SIGCHLD handler will fire and set child_fail = true.
885  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
886  if (rc < 0) {
887  FD_CLR(idx, &m_socketSet);
888  close(idx);
889  continue;
890  }
891  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
892  sndmsg.startIndex += sndmsg.nIndices;
893  }
894  }
895 
896  } while (m_aliveChildren > 0);
897 
898  return;
899  }
900 
901  }
902 
903 
905  if(child_failed && continueAfterChildFailure_) {
906  if (child_fail_signal) {
907  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
908  child_fail_signal=0;
909  } else if (child_fail_exit_status) {
910  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
911  child_fail_exit_status=0;
912  } else {
913  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
914  }
915  child_failed =false;
916  }
917  }
918 
919  bool
920  EventProcessor::forkProcess(std::string const& jobReportFile) {
921 
922  if(0 == numberOfForkedChildren_) {return true;}
923  assert(0<numberOfForkedChildren_);
924  //do what we want done in common
925  {
926  beginJob(); //make sure this was run
927  // make the services available
929 
930  InputSource::ItemType itemType;
931  itemType = input_->nextItemType();
932 
933  assert(itemType == InputSource::IsFile);
934  {
935  readFile();
936  }
937  itemType = input_->nextItemType();
938  assert(itemType == InputSource::IsRun);
939 
940  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
941  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
942  input_->runAuxiliary()->beginTime());
943  espController_->eventSetupForInstance(ts);
944  EventSetup const& es = esp_->eventSetup();
945 
946  //now get all the data available in the EventSetup
947  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
948  es.fillAvailableRecordKeys(recordKeys);
949  std::vector<eventsetup::DataKey> dataKeys;
950  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
951  itKey != itEnd;
952  ++itKey) {
953  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
954  //see if this is on our exclusion list
955  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
956  ExcludedData const* excludedData(nullptr);
957  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
958  excludedData = &(itExcludeRec->second);
959  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
960  //skip all items in this record
961  continue;
962  }
963  }
964  if(0 != recordPtr) {
965  dataKeys.clear();
966  recordPtr->fillRegisteredDataKeys(dataKeys);
967  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
968  itDataKey != itDataKeyEnd;
969  ++itDataKey) {
970  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
971  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
972  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
973  continue;
974  }
975  try {
976  recordPtr->doGet(*itDataKey);
977  } catch(cms::Exception& e) {
978  LogWarning("ForkingEventSetupPreFetching") << e.what();
979  }
980  }
981  }
982  }
983  }
984  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
985  {
986  // make the services available
988  Service<JobReport> jobReport;
989  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
990 
991  //Now actually do the forking
992  actReg_->preForkReleaseResourcesSignal_();
993  input_->doPreForkReleaseResources();
994  schedule_->preForkReleaseResources();
995  }
996  installCustomHandler(SIGCHLD, ep_sigchld);
997 
998 
999  unsigned int childIndex = 0;
1000  unsigned int const kMaxChildren = numberOfForkedChildren_;
1001  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1002  std::vector<pid_t> childrenIds;
1003  childrenIds.reserve(kMaxChildren);
1004  std::vector<int> childrenSockets;
1005  childrenSockets.reserve(kMaxChildren);
1006  std::vector<int> childrenPipes;
1007  childrenPipes.reserve(kMaxChildren);
1008  std::vector<int> childrenSocketsCopy;
1009  childrenSocketsCopy.reserve(kMaxChildren);
1010  std::vector<int> childrenPipesCopy;
1011  childrenPipesCopy.reserve(kMaxChildren);
1012  int pipes[] {0, 0};
1013 
1014  {
1015  // make the services available
1017  Service<JobReport> jobReport;
1018  int sockets[2], fd_flags;
1019  for(; childIndex < kMaxChildren; ++childIndex) {
1020  // Create a UNIX_DGRAM socket pair
1021  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1022  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1023  exit(EXIT_FAILURE);
1024  }
1025  if (pipe(pipes)) {
1026  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1027  exit(EXIT_FAILURE);
1028  }
1029  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1030  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1031  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1032  exit(EXIT_FAILURE);
1033  }
1034  // Mark socket as non-block. Child must be careful to do select prior
1035  // to reading from socket.
1036  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1037  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1038  exit(EXIT_FAILURE);
1039  }
1040  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1041  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1042  exit(EXIT_FAILURE);
1043  }
1044  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1045  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1046  exit(EXIT_FAILURE);
1047  }
1048  // Linux man page notes there are some edge cases where reading from a
1049  // fd can block, even after a select.
1050  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1051  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1052  exit(EXIT_FAILURE);
1053  }
1054  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1055  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1056  exit(EXIT_FAILURE);
1057  }
1058 
1059  childrenPipesCopy = childrenPipes;
1060  childrenSocketsCopy = childrenSockets;
1061 
1062  pid_t value = fork();
1063  if(value == 0) {
1064  // Close the parent's side of the socket and pipe which will talk to us.
1065  close(pipes[0]);
1066  close(sockets[0]);
1067  // Close our copies of the parent's other communication pipes.
1068  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1069  close(*it);
1070  }
1071  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1072  close(*it);
1073  }
1074 
1075  // this is the child process, redirect stdout and stderr to a log file
1076  fflush(stdout);
1077  fflush(stderr);
1078  std::stringstream stout;
1079  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1080  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1081  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1082  }
1083  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1084  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1085  }
1086 
1087  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1088  if(setCpuAffinity_) {
1089  // CPU affinity is handled differently on macosx.
1090  // We disable it and print a message until someone reads:
1091  //
1092  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1093  //
1094  // and implements it.
1095 #ifdef __APPLE__
1096  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1097 #else
1098  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1099  cpu_set_t mask;
1100  CPU_ZERO(&mask);
1101  CPU_SET(childIndex, &mask);
1102  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1103  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1104  exit(-1);
1105  }
1106 #endif
1107  }
1108  break;
1109  } else {
1110  //this is the parent
1111  close(pipes[1]);
1112  close(sockets[1]);
1113  }
1114  if(value < 0) {
1115  LogError("ForkingChild") << "failed to create a child";
1116  exit(-1);
1117  }
1118  childrenIds.push_back(value);
1119  childrenSockets.push_back(sockets[0]);
1120  childrenPipes.push_back(pipes[0]);
1121  }
1122 
1123  if(childIndex < kMaxChildren) {
1124  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1125  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1126 
1127  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1128  input_->doPostForkReacquireResources(receiver);
1129  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1130  //NOTE: sources have to reset themselves by listening to the post fork message
1131  //rewindInput();
1132  return true;
1133  }
1134  jobReport->parentAfterFork(jobReportFile);
1135  }
1136 
1137  //this is the original, which is now the master for all the children
1138 
1139  //Need to wait for signals from the children or externally
1140  // To wait we must
1141  // 1) block the signals we want to wait on so we do not have a race condition
1142  // 2) check that we haven't already meet our ending criteria
1143  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1144  sigset_t blockingSigSet;
1145  sigset_t unblockingSigSet;
1146  sigset_t oldSigSet;
1147  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1148  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1149  sigaddset(&blockingSigSet, SIGCHLD);
1150  sigaddset(&blockingSigSet, SIGUSR2);
1151  sigaddset(&blockingSigSet, SIGINT);
1152  sigdelset(&unblockingSigSet, SIGCHLD);
1153  sigdelset(&unblockingSigSet, SIGUSR2);
1154  sigdelset(&unblockingSigSet, SIGINT);
1155  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1156 
1157  // If there are too many fd's (unlikely, but possible) for select, denote this
1158  // because the sender will fail.
1159  bool too_many_fds = false;
1160  if (pipes[1]+1 > FD_SETSIZE) {
1161  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1162  too_many_fds = true;
1163  }
1164 
1165  //create a thread that sends the units of work to workers
1166  // we create it after all signals were blocked so that this
1167  // thread is never interupted by a signal
1168  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1169  boost::thread senderThread(sender);
1170 
1171  if(not too_many_fds) {
1172  //NOTE: a child could have failed before we got here and even after this call
1173  // which is why the 'if' is conditional on continueAfterChildFailure_
1175  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1176  sigsuspend(&unblockingSigSet);
1178  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1179  }
1180  }
1181  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1182 
1183  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1184  if(child_failed) {
1185  LogError("ForkingStopping") << "child failed";
1186  }
1187  if(shutdown_flag) {
1188  LogSystem("ForkingStopping") << "asked to shutdown";
1189  }
1190 
1191  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1192  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1193  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1194  it != itEnd; ++it) {
1195  /* int result = */ kill(*it, SIGUSR2);
1196  }
1197  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1198  while(num_children_done != kMaxChildren) {
1199  sigsuspend(&unblockingSigSet);
1200  }
1201  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1202  }
1203  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1204  senderThread.join();
1205  if(child_failed && !continueAfterChildFailure_) {
1206  if (child_fail_signal) {
1207  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1208  } else if (child_fail_exit_status) {
1209  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1210  } else {
1211  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1212  }
1213  }
1214  if(too_many_fds) {
1215  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1216  }
1217  return false;
1218  }
1219 
1220  std::vector<ModuleDescription const*>
1222  return schedule_->getAllModuleDescriptions();
1223  }
1224 
1225  int
1227  return schedule_->totalEvents();
1228  }
1229 
1230  int
1232  return schedule_->totalEventsPassed();
1233  }
1234 
1235  int
1237  return schedule_->totalEventsFailed();
1238  }
1239 
1240  void
1242  schedule_->enableEndPaths(active);
1243  }
1244 
1245  bool
1247  return schedule_->endPathsEnabled();
1248  }
1249 
1250  void
1252  schedule_->getTriggerReport(rep);
1253  }
1254 
1255  void
1257  schedule_->clearCounters();
1258  }
1259 
1260 
1261  std::unique_ptr<statemachine::Machine>
1263  statemachine::FileMode fileMode;
1264  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1265  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1266  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1267  else {
1268  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1269  << fileMode_ << ".\n"
1270  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1271  }
1272 
1273  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1274  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1275  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1276  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1277  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1278  else {
1279  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1280  << emptyRunLumiMode_ << ".\n"
1281  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1282  }
1283 
1284  auto machine = std::make_unique<statemachine::Machine>(
1285  this,
1286  fileMode,
1287  emptyRunLumiMode);
1288 
1289  machine->initiate();
1290  return machine;
1291  }
1292 
1293  bool
1295  bool returnValue = false;
1296 
1297  // Look for a shutdown signal
1298  if(shutdown_flag.load(std::memory_order_acquire)) {
1299  returnValue = true;
1300  returnCode = epSignal;
1301  }
1302  return returnValue;
1303  }
1304 
1305 
1308 
1311  std::unique_ptr<statemachine::Machine> machine;
1312  {
1313  beginJob(); //make sure this was called
1314 
1315  //StatusCode returnCode = epSuccess;
1317 
1318  // make the services available
1320 
1321  machine = createStateMachine();
1324  try {
1325  convertException::wrap([&]() {
1326 
1327  InputSource::ItemType itemType;
1328 
1329  while(true) {
1330 
1331  bool more = true;
1332  if(numberOfForkedChildren_ > 0) {
1333  size_t size = preg_->size();
1334  {
1335  SendSourceTerminationSignalIfException sentry(actReg_.get());
1336  more = input_->skipForForking();
1337  sentry.completedSuccessfully();
1338  }
1339  if(more) {
1340  if(size < preg_->size()) {
1342  }
1344  }
1345  }
1346  {
1347  SendSourceTerminationSignalIfException sentry(actReg_.get());
1348  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1349  sentry.completedSuccessfully();
1350  }
1351 
1352  FDEBUG(1) << "itemType = " << itemType << "\n";
1353 
1354  if(checkForAsyncStopRequest(returnCode)) {
1355  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1356  forceLooperToEnd_ = true;
1357  machine->process_event(statemachine::Stop());
1358  forceLooperToEnd_ = false;
1359  break;
1360  }
1361 
1362  if(itemType == InputSource::IsEvent) {
1363  machine->process_event(statemachine::Event());
1365  forceLooperToEnd_ = true;
1366  machine->process_event(statemachine::Stop());
1367  forceLooperToEnd_ = false;
1369  break;
1370  }
1372  }
1373 
1374  if(itemType == InputSource::IsEvent) {
1375  }
1376  else if(itemType == InputSource::IsStop) {
1377  machine->process_event(statemachine::Stop());
1378  }
1379  else if(itemType == InputSource::IsFile) {
1380  machine->process_event(statemachine::File());
1381  }
1382  else if(itemType == InputSource::IsRun) {
1383  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1384  }
1385  else if(itemType == InputSource::IsLumi) {
1386  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1387  }
1388  else if(itemType == InputSource::IsSynchronize) {
1389  //For now, we don't have to do anything
1390  }
1391  // This should be impossible
1392  else {
1394  << "Unknown next item type passed to EventProcessor\n"
1395  << "Please report this error to the Framework group\n";
1396  }
1397  if(machine->terminated()) {
1398  break;
1399  }
1400  } // End of loop over state machine events
1401  }); // convertException::wrap
1402  } // Try block
1403  // Some comments on exception handling related to the boost state machine:
1404  //
1405  // Some states used in the machine are special because they
1406  // perform actions while the machine is being terminated, actions
1407  // such as close files, call endRun, call endLumi etc ... Each of these
1408  // states has two functions that perform these actions. The functions
1409  // are almost identical. The major difference is that one version
1410  // catches all exceptions and the other lets exceptions pass through.
1411  // The destructor catches them and the other function named "exit" lets
1412  // them pass through. On a normal termination, boost will always call
1413  // "exit" and then the state destructor. In our state classes, the
1414  // the destructors do nothing if the exit function already took
1415  // care of things. Here's the interesting part. When boost is
1416  // handling an exception the "exit" function is not called (a boost
1417  // feature).
1418  //
1419  // If an exception occurs while the boost machine is in control
1420  // (which usually means inside a process_event call), then
1421  // the boost state machine destroys its states and "terminates" itself.
1422  // This already done before we hit the catch blocks below. In this case
1423  // the call to terminateMachine below only destroys an already
1424  // terminated state machine. Because exit is not called, the state destructors
1425  // handle cleaning up lumis, runs, and files. The destructors swallow
1426  // all exceptions and only pass through the exceptions messages, which
1427  // are tacked onto the original exception below.
1428  //
1429  // If an exception occurs when the boost state machine is not
1430  // in control (outside the process_event functions), then boost
1431  // cannot destroy its own states. The terminateMachine function
1432  // below takes care of that. The flag "alreadyHandlingException"
1433  // is set true so that the state exit functions do nothing (and
1434  // cannot throw more exceptions while handling the first). Then the
1435  // state destructors take care of this because exit did nothing.
1436  //
1437  // In both cases above, the EventProcessor::endOfLoop function is
1438  // not called because it can throw exceptions.
1439  //
1440  // One tricky aspect of the state machine is that things that can
1441  // throw should not be invoked by the state machine while another
1442  // exception is being handled.
1443  // Another tricky aspect is that it appears to be important to
1444  // terminate the state machine before invoking its destructor.
1445  // We've seen crashes that are not understood when that is not
1446  // done. Maintainers of this code should be careful about this.
1447 
1448  catch (cms::Exception & e) {
1450  terminateMachine(std::move(machine));
1451  alreadyHandlingException_ = false;
1452  if (!exceptionMessageLumis_.empty()) {
1454  if (e.alreadyPrinted()) {
1455  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1456  }
1457  }
1458  if (!exceptionMessageRuns_.empty()) {
1460  if (e.alreadyPrinted()) {
1461  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1462  }
1463  }
1464  if (!exceptionMessageFiles_.empty()) {
1466  if (e.alreadyPrinted()) {
1467  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1468  }
1469  }
1470  throw;
1471  }
1472 
1473  if(machine->terminated()) {
1474  FDEBUG(1) << "The state machine reports it has been terminated\n";
1475  machine.reset();
1476  }
1477 
1479  throw cms::Exception("BadState")
1480  << "The boost state machine in the EventProcessor exited after\n"
1481  << "entering the Error state.\n";
1482  }
1483 
1484  }
1485  if(machine.get() != nullptr) {
1486  terminateMachine(std::move(machine));
1488  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1489  << "Please report this error to the Framework group\n";
1490  }
1491 
1492  return returnCode;
1493  }
1494 
1496  FDEBUG(1) << " \treadFile\n";
1497  size_t size = preg_->size();
1498  SendSourceTerminationSignalIfException sentry(actReg_.get());
1499 
1500  fb_ = input_->readFile();
1501  if(size < preg_->size()) {
1503  }
1505  if((numberOfForkedChildren_ > 0) or
1508  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1509  }
1510  sentry.completedSuccessfully();
1511  }
1512 
1513  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1514  if (fb_.get() != nullptr) {
1515  SendSourceTerminationSignalIfException sentry(actReg_.get());
1516  input_->closeFile(fb_.get(), cleaningUpAfterException);
1517  sentry.completedSuccessfully();
1518  }
1519  FDEBUG(1) << "\tcloseInputFile\n";
1520  }
1521 
1523  if (fb_.get() != nullptr) {
1524  schedule_->openOutputFiles(*fb_);
1525  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1526  }
1527  FDEBUG(1) << "\topenOutputFiles\n";
1528  }
1529 
1531  if (fb_.get() != nullptr) {
1532  schedule_->closeOutputFiles();
1533  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1534  }
1535  FDEBUG(1) << "\tcloseOutputFiles\n";
1536  }
1537 
1539  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1540  if (fb_.get() != nullptr) {
1541  schedule_->respondToOpenInputFile(*fb_);
1542  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1543  }
1544  FDEBUG(1) << "\trespondToOpenInputFile\n";
1545  }
1546 
1548  if (fb_.get() != nullptr) {
1549  schedule_->respondToCloseInputFile(*fb_);
1550  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1551  }
1552  FDEBUG(1) << "\trespondToCloseInputFile\n";
1553  }
1554 
1556  shouldWeStop_ = false;
1557  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1558  // until after we've called beginOfJob
1559  if(looper_ && looperBeginJobRun_) {
1560  looper_->doStartingNewLoop();
1561  }
1562  FDEBUG(1) << "\tstartingNewLoop\n";
1563  }
1564 
1566  if(looper_) {
1567  ModuleChanger changer(schedule_.get(),preg_.get());
1568  looper_->setModuleChanger(&changer);
1569  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1570  looper_->setModuleChanger(nullptr);
1571  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1572  else return false;
1573  }
1574  FDEBUG(1) << "\tendOfLoop\n";
1575  return true;
1576  }
1577 
1579  input_->repeat();
1580  input_->rewind();
1581  FDEBUG(1) << "\trewind\n";
1582  }
1583 
1585  looper_->prepareForNextLoop(esp_.get());
1586  FDEBUG(1) << "\tprepareForNextLoop\n";
1587  }
1588 
1590  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1591  if(!subProcesses_.empty()) {
1592  for(auto const& subProcess : subProcesses_) {
1593  if(subProcess.shouldWeCloseOutput()) {
1594  return true;
1595  }
1596  }
1597  return false;
1598  }
1599  return schedule_->shouldWeCloseOutput();
1600  }
1601 
1603  FDEBUG(1) << "\tdoErrorStuff\n";
1604  LogError("StateMachine")
1605  << "The EventProcessor state machine encountered an unexpected event\n"
1606  << "and went to the error state\n"
1607  << "Will attempt to terminate processing normally\n"
1608  << "(IF using the looper the next loop will be attempted)\n"
1609  << "This likely indicates a bug in an input module or corrupted input or both\n";
1611  }
1612 
1614  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1615  {
1616  SendSourceTerminationSignalIfException sentry(actReg_.get());
1617 
1618  input_->doBeginRun(runPrincipal, &processContext_);
1619  sentry.completedSuccessfully();
1620  }
1621 
1622  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1623  runPrincipal.beginTime());
1625  espController_->forceCacheClear();
1626  }
1627  {
1628  SendSourceTerminationSignalIfException sentry(actReg_.get());
1629  espController_->eventSetupForInstance(ts);
1630  sentry.completedSuccessfully();
1631  }
1632  EventSetup const& es = esp_->eventSetup();
1633  if(looper_ && looperBeginJobRun_== false) {
1634  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1635  looper_->beginOfJob(es);
1636  looperBeginJobRun_ = true;
1637  looper_->doStartingNewLoop();
1638  }
1639  {
1641  auto globalWaitTask = make_empty_waiting_task();
1642  globalWaitTask->increment_ref_count();
1643  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1644  *schedule_,
1645  runPrincipal,
1646  ts,
1647  es,
1648  subProcesses_);
1649  globalWaitTask->wait_for_all();
1650  if(globalWaitTask->exceptionPtr() != nullptr) {
1651  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1652  }
1653  }
1654  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1655  if(looper_) {
1656  looper_->doBeginRun(runPrincipal, es, &processContext_);
1657  }
1658  {
1659  //To wait, the ref count has to be 1+#streams
1660  auto streamLoopWaitTask = make_empty_waiting_task();
1661  streamLoopWaitTask->increment_ref_count();
1662 
1664 
1665  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1666  *schedule_,
1668  runPrincipal,
1669  ts,
1670  es,
1671  subProcesses_);
1672 
1673  streamLoopWaitTask->wait_for_all();
1674  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1675  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1676  }
1677  }
1678  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1679  if(looper_) {
1680  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1681  }
1682  }
1683 
1684  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1685  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1686  {
1687  SendSourceTerminationSignalIfException sentry(actReg_.get());
1688 
1689  runPrincipal.setEndTime(input_->timestamp());
1690  runPrincipal.setComplete();
1691  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1692  sentry.completedSuccessfully();
1693  }
1694 
1696  runPrincipal.endTime());
1697  {
1698  SendSourceTerminationSignalIfException sentry(actReg_.get());
1699  espController_->eventSetupForInstance(ts);
1700  sentry.completedSuccessfully();
1701  }
1702  EventSetup const& es = esp_->eventSetup();
1703  {
1704  //To wait, the ref count has to be 1+#streams
1705  auto streamLoopWaitTask = make_empty_waiting_task();
1706  streamLoopWaitTask->increment_ref_count();
1707 
1709 
1710  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1711  *schedule_,
1713  runPrincipal,
1714  ts,
1715  es,
1716  subProcesses_,
1717  cleaningUpAfterException);
1718 
1719  streamLoopWaitTask->wait_for_all();
1720  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1721  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1722  }
1723  }
1724  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1725  if(looper_) {
1726  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1727  }
1728  {
1729  runPrincipal.setAtEndTransition(true);
1731  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1732  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1733  }
1734  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1735  if(looper_) {
1736  looper_->doEndRun(runPrincipal, es, &processContext_);
1737  }
1738  }
1739 
1741  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1742  {
1743  SendSourceTerminationSignalIfException sentry(actReg_.get());
1744 
1745  input_->doBeginLumi(lumiPrincipal, &processContext_);
1746  sentry.completedSuccessfully();
1747  }
1748 
1750  if(rng.isAvailable()) {
1751  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1752  rng->preBeginLumi(lb);
1753  }
1754 
1755  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1756  // lumi blocks know their start and end times why not also start and end events?
1757  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1758  {
1759  SendSourceTerminationSignalIfException sentry(actReg_.get());
1760  espController_->eventSetupForInstance(ts);
1761  sentry.completedSuccessfully();
1762  }
1763  EventSetup const& es = esp_->eventSetup();
1764  {
1766  auto globalWaitTask = make_empty_waiting_task();
1767  globalWaitTask->increment_ref_count();
1768  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1769  *schedule_,
1770  lumiPrincipal,
1771  ts,
1772  es,
1773  subProcesses_);
1774  globalWaitTask->wait_for_all();
1775  if(globalWaitTask->exceptionPtr() != nullptr) {
1776  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1777  }
1778  }
1779  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1780  if(looper_) {
1781  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1782  }
1783  {
1784  //To wait, the ref count has to b 1+#streams
1785  auto streamLoopWaitTask = make_empty_waiting_task();
1786  streamLoopWaitTask->increment_ref_count();
1787 
1789 
1790  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1791  *schedule_,
1793  lumiPrincipal,
1794  ts,
1795  es,
1796  subProcesses_);
1797  streamLoopWaitTask->wait_for_all();
1798  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1799  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1800  }
1801  }
1802 
1803  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1804  if(looper_) {
1805  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1806  }
1807  }
1808 
1809  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1810  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1811  {
1812  SendSourceTerminationSignalIfException sentry(actReg_.get());
1813 
1814  lumiPrincipal.setEndTime(input_->timestamp());
1815  lumiPrincipal.setComplete();
1816  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1817  sentry.completedSuccessfully();
1818  }
1819  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1820  // lumi blocks know their start and end times why not also start and end events?
1821  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1822  lumiPrincipal.endTime());
1823  {
1824  SendSourceTerminationSignalIfException sentry(actReg_.get());
1825  espController_->eventSetupForInstance(ts);
1826  sentry.completedSuccessfully();
1827  }
1828  EventSetup const& es = esp_->eventSetup();
1829  {
1830  //To wait, the ref count has to b 1+#streams
1831  auto streamLoopWaitTask = make_empty_waiting_task();
1832  streamLoopWaitTask->increment_ref_count();
1833 
1835 
1836  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1837  *schedule_,
1839  lumiPrincipal,
1840  ts,
1841  es,
1842  subProcesses_,
1843  cleaningUpAfterException);
1844  streamLoopWaitTask->wait_for_all();
1845  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1846  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1847  }
1848  }
1849  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1850  if(looper_) {
1851  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1852  }
1853  {
1854  lumiPrincipal.setAtEndTransition(true);
1856  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1857  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1858  }
1859  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1860  if(looper_) {
1861  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1862  }
1863  }
1864 
1868  << "EventProcessor::readRun\n"
1869  << "Illegal attempt to insert run into cache\n"
1870  << "Contact a Framework Developer\n";
1871  }
1872  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1873  {
1874  SendSourceTerminationSignalIfException sentry(actReg_.get());
1875  input_->readRun(*rp, *historyAppender_);
1876  sentry.completedSuccessfully();
1877  }
1878  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1879  principalCache_.insert(rp);
1880  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1881  }
1882 
1884  principalCache_.merge(input_->runAuxiliary(), preg());
1885  auto runPrincipal =principalCache_.runPrincipalPtr();
1886  {
1887  SendSourceTerminationSignalIfException sentry(actReg_.get());
1888  input_->readAndMergeRun(*runPrincipal);
1889  sentry.completedSuccessfully();
1890  }
1891  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1892  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1893  }
1894 
1898  << "EventProcessor::readRun\n"
1899  << "Illegal attempt to insert lumi into cache\n"
1900  << "Contact a Framework Developer\n";
1901  }
1904  << "EventProcessor::readRun\n"
1905  << "Illegal attempt to insert lumi into cache\n"
1906  << "Run is invalid\n"
1907  << "Contact a Framework Developer\n";
1908  }
1909  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1910  {
1911  SendSourceTerminationSignalIfException sentry(actReg_.get());
1912  input_->readLuminosityBlock(*lbp, *historyAppender_);
1913  sentry.completedSuccessfully();
1914  }
1915  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1916  principalCache_.insert(lbp);
1917  return input_->luminosityBlock();
1918  }
1919 
1921  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1922  {
1923  SendSourceTerminationSignalIfException sentry(actReg_.get());
1924  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1925  sentry.completedSuccessfully();
1926  }
1927  return input_->luminosityBlock();
1928  }
1929 
1932  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1933  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1934  }
1935 
1938  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1939  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1940  }
1941 
1943  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1944  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1945  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1946  }
1947 
1949  principalCache_.deleteLumi(phid, run, lumi);
1950  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1951  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1952  }
1953 
1954  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1955  std::atomic<bool>* finishedProcessingEvents) {
1956  if(shouldWeStop()) {
1957  return false;
1958  }
1959 
1960  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1961  return false;
1962  }
1963 
1964  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1965  return false;
1966  }
1967 
1971  handler->initializeThisThreadForUse();
1972  }
1973 
1974  try {
1975  //need to use lock in addition to the serial task queue because
1976  // of delayed provenance reading and reading data in response to
1977  // edm::Refs etc
1978  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1979  if(not firstEventInBlock_) {
1980  //The state machine already called input_->nextItemType
1981  // and found an event. We can't call input_->nextItemType
1982  // again since it would move to the next transition
1983  InputSource::ItemType itemType = input_->nextItemType();
1984  if (InputSource::IsEvent !=itemType) {
1986  finishedProcessingEvents->store(true,std::memory_order_release);
1987  //std::cerr<<"next item type "<<itemType<<"\n";
1988  return false;
1989  }
1991  //std::cerr<<"task told to async stop\n";
1992  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1993  return false;
1994  }
1995  } else {
1996  firstEventInBlock_ = false;
1997  }
1998  readEvent(iStreamIndex);
1999  } catch (...) {
2000  bool expected =false;
2001  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2002  deferredExceptionPtr_ = std::current_exception();
2003 
2004  }
2005  return false;
2006  }
2007  return true;
2008  }
2009 
2011  unsigned int iStreamIndex,
2012  std::atomic<bool>* finishedProcessingEvents)
2013  {
2014  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
2015  if(iPtr) {
2016  bool expected = false;
2017  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2018  deferredExceptionPtr_ = *iPtr;
2019  {
2020  WaitingTaskHolder h(iTask);
2021  h.doneWaiting(*iPtr);
2022  }
2023  }
2024  //the stream will stop now
2025  iTask->decrement_ref_count();
2026  return;
2027  }
2028 
2029  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
2030  });
2031 
2032  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
2034 
2035  try {
2036  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
2037  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
2038  } else {
2039  //the stream will stop now
2040  tbb::task::destroy(*recursionTask);
2041  iTask->decrement_ref_count();
2042  }
2043  } catch(...) {
2044  WaitingTaskHolder h(recursionTask);
2045  h.doneWaiting(std::current_exception());
2046  }
2047  });
2048  }
2049 
2051  if(numberOfForkedChildren_>0) {
2052  //Have to do something special for forking since
2053  // after each event the system may have to skip
2054  // some transitions. This is handled in runToCompletion
2055  readEvent(0);
2056  auto eventLoopWaitTask = make_empty_waiting_task();
2057  eventLoopWaitTask->increment_ref_count();
2058  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
2059  eventLoopWaitTask->wait_for_all();
2060  return;
2061  }
2064 
2065  std::atomic<bool> finishedProcessingEvents{false};
2066  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2067 
2068  //The state machine already found the event so
2069  // we have to avoid looking again
2070  firstEventInBlock_ = true;
2071 
2072  //To wait, the ref count has to b 1+#streams
2073  auto eventLoopWaitTask = make_empty_waiting_task();
2074  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2075  eventLoopWaitTask->increment_ref_count();
2076 
2077  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2078  unsigned int iStreamIndex = 0;
2079  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2080  eventLoopWaitTask->increment_ref_count();
2081  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2082  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2083  }) );
2084  }
2085  eventLoopWaitTask->increment_ref_count();
2086  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2087  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2088  }));
2089 
2090  //One of the processing threads saw an exception
2092  std::rethrow_exception(deferredExceptionPtr_);
2093  }
2094  }
2095  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2096  //TODO this will have to become per stream
2097  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2098  StreamContext streamContext(event.streamID(), &processContext_);
2099 
2100  SendSourceTerminationSignalIfException sentry(actReg_.get());
2101  input_->readEvent(event, streamContext);
2102  sentry.completedSuccessfully();
2103 
2104  FDEBUG(1) << "\treadEvent\n";
2105  }
2106 
2108  unsigned int iStreamIndex) {
2109  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2110  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2112  if(rng.isAvailable()) {
2113  Event ev(*pep, ModuleDescription(), nullptr);
2114  rng->postEventRead(ev);
2115  }
2116  assert(pep->luminosityBlockPrincipalPtrValid());
2117  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2118  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2119 
2120  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2121  tbb::task::allocate_root(),
2122  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2123  {
2125 
2126  //NOTE: If we have a looper we only have one Stream
2127  if(looper_) {
2128  processEventWithLooper(*pep);
2129  }
2130 
2131  FDEBUG(1) << "\tprocessEvent\n";
2132  pep->clearEventPrincipal();
2133  if(iPtr) {
2134  iHolder.doneWaiting(*iPtr);
2135  } else {
2136  iHolder.doneWaiting(std::exception_ptr());
2137  }
2138  }
2139  )
2140  );
2141  WaitingTaskHolder afterProcessTask;
2142  if(subProcesses_.empty()) {
2143  afterProcessTask = std::move(finalizeEventTask);
2144  } else {
2145  //Need to run SubProcesses after schedule has finished
2146  // with the event
2147  afterProcessTask = WaitingTaskHolder(
2148  make_waiting_task(tbb::task::allocate_root(),
2149  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2150  {
2151  if(not iPtr) {
2153 
2154  //when run with 1 thread, we want to the order to be what
2155  // it was before. This requires reversing the order since
2156  // tasks are run last one in first one out
2157  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2158  subProcess.doEventAsync(finalizeEventTask,*pep);
2159  }
2160  } else {
2161  finalizeEventTask.doneWaiting(*iPtr);
2162  }
2163  })
2164  );
2165  }
2166 
2167  schedule_->processOneEventAsync(std::move(afterProcessTask),
2168  iStreamIndex,*pep, esp_->eventSetup());
2169 
2170  }
2171 
2173  bool randomAccess = input_->randomAccess();
2174  ProcessingController::ForwardState forwardState = input_->forwardState();
2175  ProcessingController::ReverseState reverseState = input_->reverseState();
2176  ProcessingController pc(forwardState, reverseState, randomAccess);
2177 
2179  do {
2180 
2181  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2182  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2183 
2184  bool succeeded = true;
2185  if(randomAccess) {
2187  input_->skipEvents(-2);
2188  }
2190  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2191  }
2192  }
2193  pc.setLastOperationSucceeded(succeeded);
2194  } while(!pc.lastOperationSucceeded());
2195  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2196  }
2197 
2199  FDEBUG(1) << "\tshouldWeStop\n";
2200  if(shouldWeStop_) return true;
2201  if(!subProcesses_.empty()) {
2202  for(auto const& subProcess : subProcesses_) {
2203  if(subProcess.terminate()) {
2204  return true;
2205  }
2206  }
2207  return false;
2208  }
2209  return schedule_->terminate();
2210  }
2211 
2214  }
2215 
2218  }
2219 
2222  }
2223 
2226  }
2227 
2228  void EventProcessor::terminateMachine(std::unique_ptr<statemachine::Machine> iMachine) {
2229  if(iMachine.get() != nullptr) {
2230  if(!iMachine->terminated()) {
2231  forceLooperToEnd_ = true;
2232  iMachine->process_event(statemachine::Stop());
2233  forceLooperToEnd_ = false;
2234  }
2235  else {
2236  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2237  }
2238  if(iMachine->terminated()) {
2239  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2240  }
2241  }
2242  }
2243 }
std::string emptyRunLumiMode_
size
Write out results.
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
virtual void openOutputFiles() override
virtual void setExceptionMessageLumis(std::string &message) override
SharedResourcesAcquirer sourceResourcesAcquirer_
virtual void doErrorStuff() override
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
virtual statemachine::Run readAndMergeRun() override
edm::propagate_const< std::unique_ptr< InputSource > > input_
virtual bool endOfLoop() override
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
static PFTauRenderPlugin instance
def destroy(e)
Definition: pyrootRender.py:13
ParameterSetID id() const
virtual bool shouldWeCloseOutput() const override
void possiblyContinueAfterForkChildFailure()
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual int readLuminosityBlock() override
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
#define NULL
Definition: scimark2.h:8
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException) override
bool ev
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:310
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
bool hasRunPrincipal() const
Definition: config.py:1
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
virtual void setExceptionMessageRuns(std::string &message) override
std::set< std::pair< std::string, std::string > > ExcludedData
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
virtual void startingNewLoop() override
virtual void rewindInput() override
bool alreadyPrinted() const
Definition: Exception.cc:251
bool forkProcess(std::string const &jobReportFile)
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:91
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
virtual bool shouldWeStop() const override
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual statemachine::Run readRun() override
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
virtual void prepareForNextLoop() override
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual bool alreadyHandlingException() const override
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:102
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:44
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: value.py:1
def pipe(cmdline, input=None)
Definition: pipe.py:5
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
int totalEvents() const
std::shared_ptr< edm::ParameterSet > parameterSet() const
virtual StatusCode runToCompletion() override
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:747
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void readFile() override
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
edm::RunNumber_t runNumber() const
Definition: EPStates.h:50
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
virtual void closeOutputFiles() override
HLT enums.
virtual void writeRun(statemachine::Run const &run) override
virtual void closeInputFile(bool cleaningUpAfterException) override
virtual void respondToOpenInputFile() override
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:49
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual int readAndMergeLumi() override
unsigned int RunNumber_t
#define O_NONBLOCK
Definition: SysFile.h:21
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
virtual void respondToCloseInputFile() override
virtual void readAndProcessEvent() override
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
virtual void deleteRunFromCache(statemachine::Run const &run) override
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
Definition: pipe.py:1
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
virtual void setExceptionMessageFiles(std::string &message) override
T get(const Candidate &c)
Definition: component.h:55
virtual void beginRun(statemachine::Run const &run) override
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31