test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
2 
8 
33 
35 
43 
48 
60 
61 #include "MessageForSource.h"
62 #include "MessageForParent.h"
63 
64 #include "boost/thread/xtime.hpp"
65 
66 #include <exception>
67 #include <iomanip>
68 #include <iostream>
69 #include <utility>
70 #include <sstream>
71 
72 #include <sys/ipc.h>
73 #include <sys/msg.h>
74 
75 #include "tbb/task.h"
76 
77 //Used for forking
78 #include <sys/types.h>
79 #include <sys/wait.h>
80 #include <sys/socket.h>
81 #include <sys/select.h>
82 #include <sys/fcntl.h>
83 #include <unistd.h>
84 
85 
86 //Used for CPU affinity
87 #ifndef __APPLE__
88 #include <sched.h>
89 #endif
90 
91 namespace {
92  //Sentry class to only send a signal if an
93  // exception occurs. An exception is identified
94  // by the destructor being called without first
95  // calling completedSuccessfully().
96  class SendSourceTerminationSignalIfException {
97  public:
98  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
99  reg_(iReg) {}
100  ~SendSourceTerminationSignalIfException() {
101  if(reg_) {
102  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
103  }
104  }
105  void completedSuccessfully() {
106  reg_ = nullptr;
107  }
108  private:
109  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
110  };
111 
112 }
113 
114 namespace edm {
115 
116  // ---------------------------------------------------------------
117  std::unique_ptr<InputSource>
119  CommonParams const& common,
120  std::shared_ptr<ProductRegistry> preg,
121  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
122  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
123  std::shared_ptr<ActivityRegistry> areg,
124  std::shared_ptr<ProcessConfiguration const> processConfiguration,
125  PreallocationConfiguration const& allocations) {
126  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
127  if(main_input == 0) {
129  << "There must be exactly one source in the configuration.\n"
130  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
131  }
132 
133  std::string modtype(main_input->getParameter<std::string>("@module_type"));
134 
135  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
137  ConfigurationDescriptions descriptions(filler->baseType());
138  filler->fill(descriptions);
139 
140  try {
141  convertException::wrap([&]() {
142  descriptions.validate(*main_input, std::string("source"));
143  });
144  }
145  catch (cms::Exception & iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
165 
166  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
167  common.maxEventsInput_, common.maxLumisInput_,
168  common.maxSecondsUntilRampdown_, allocations);
169 
170  areg->preSourceConstructionSignal_(md);
171  std::unique_ptr<InputSource> input;
172  try {
173  //even if we have an exception, send the signal
174  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
175  convertException::wrap([&]() {
176  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
177  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
178  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
179  });
180  }
181  catch (cms::Exception& iException) {
182  std::ostringstream ost;
183  ost << "Constructing input source of type " << modtype;
184  iException.addContext(ost.str());
185  throw;
186  }
187  return input;
188  }
189 
190  // ---------------------------------------------------------------
191  std::shared_ptr<EDLooperBase>
194  ParameterSet& params) {
195  std::shared_ptr<EDLooperBase> vLooper;
196 
197  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
198 
199  if(loopers.size() == 0) {
200  return vLooper;
201  }
202 
203  assert(1 == loopers.size());
204 
205  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
206  itName != itNameEnd;
207  ++itName) {
208 
209  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
210  providerPSet->registerIt();
211  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
212  cp,
213  *providerPSet);
214  }
215  return vLooper;
216  }
217 
218  // ---------------------------------------------------------------
220  ServiceToken const& iToken,
222  std::vector<std::string> const& defaultServices,
223  std::vector<std::string> const& forcedServices) :
224  actReg_(),
225  preg_(),
226  branchIDListHelper_(),
227  serviceToken_(),
228  input_(),
229  espController_(new eventsetup::EventSetupsController),
230  esp_(),
231  act_table_(),
232  processConfiguration_(),
233  schedule_(),
234  subProcesses_(),
235  historyAppender_(new HistoryAppender),
236  fb_(),
237  looper_(),
238  deferredExceptionPtrIsSet_(false),
239  principalCache_(),
240  beginJobCalled_(false),
241  shouldWeStop_(false),
242  stateMachineWasInErrorState_(false),
243  fileMode_(),
244  emptyRunLumiMode_(),
245  exceptionMessageFiles_(),
246  exceptionMessageRuns_(),
247  exceptionMessageLumis_(),
248  alreadyHandlingException_(false),
249  forceLooperToEnd_(false),
250  looperBeginJobRun_(false),
251  forceESCacheClearOnNewRun_(false),
252  numberOfForkedChildren_(0),
253  numberOfSequentialEventsPerChild_(1),
254  setCpuAffinity_(false),
255  eventSetupDataToExcludeFromPrefetching_() {
256  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
257  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
258  processDesc->addServices(defaultServices, forcedServices);
259  init(processDesc, iToken, iLegacy);
260  }
261 
263  std::vector<std::string> const& defaultServices,
264  std::vector<std::string> const& forcedServices) :
265  actReg_(),
266  preg_(),
267  branchIDListHelper_(),
268  serviceToken_(),
269  input_(),
270  espController_(new eventsetup::EventSetupsController),
271  esp_(),
272  act_table_(),
273  processConfiguration_(),
274  schedule_(),
275  subProcesses_(),
276  historyAppender_(new HistoryAppender),
277  fb_(),
278  looper_(),
279  deferredExceptionPtrIsSet_(false),
280  principalCache_(),
281  beginJobCalled_(false),
282  shouldWeStop_(false),
283  stateMachineWasInErrorState_(false),
284  fileMode_(),
285  emptyRunLumiMode_(),
286  exceptionMessageFiles_(),
287  exceptionMessageRuns_(),
288  exceptionMessageLumis_(),
289  alreadyHandlingException_(false),
290  forceLooperToEnd_(false),
291  looperBeginJobRun_(false),
292  forceESCacheClearOnNewRun_(false),
293  numberOfForkedChildren_(0),
294  numberOfSequentialEventsPerChild_(1),
295  setCpuAffinity_(false),
296  asyncStopRequestedWhileProcessingEvents_(false),
297  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
298  eventSetupDataToExcludeFromPrefetching_()
299  {
300  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
301  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
302  processDesc->addServices(defaultServices, forcedServices);
304  }
305 
306  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc>& processDesc,
307  ServiceToken const& token,
309  actReg_(),
310  preg_(),
311  branchIDListHelper_(),
312  serviceToken_(),
313  input_(),
314  espController_(new eventsetup::EventSetupsController),
315  esp_(),
316  act_table_(),
317  processConfiguration_(),
318  schedule_(),
319  subProcesses_(),
320  historyAppender_(new HistoryAppender),
321  fb_(),
322  looper_(),
323  deferredExceptionPtrIsSet_(false),
324  principalCache_(),
325  beginJobCalled_(false),
326  shouldWeStop_(false),
327  stateMachineWasInErrorState_(false),
328  fileMode_(),
329  emptyRunLumiMode_(),
330  exceptionMessageFiles_(),
331  exceptionMessageRuns_(),
332  exceptionMessageLumis_(),
333  alreadyHandlingException_(false),
334  forceLooperToEnd_(false),
335  looperBeginJobRun_(false),
336  forceESCacheClearOnNewRun_(false),
337  numberOfForkedChildren_(0),
338  numberOfSequentialEventsPerChild_(1),
339  setCpuAffinity_(false),
340  asyncStopRequestedWhileProcessingEvents_(false),
341  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
342  eventSetupDataToExcludeFromPrefetching_()
343  {
344  init(processDesc, token, legacy);
345  }
346 
347 
349  actReg_(),
350  preg_(),
351  branchIDListHelper_(),
352  serviceToken_(),
353  input_(),
354  espController_(new eventsetup::EventSetupsController),
355  esp_(),
356  act_table_(),
357  processConfiguration_(),
358  schedule_(),
359  subProcesses_(),
360  historyAppender_(new HistoryAppender),
361  fb_(),
362  looper_(),
363  deferredExceptionPtrIsSet_(false),
364  principalCache_(),
365  beginJobCalled_(false),
366  shouldWeStop_(false),
367  stateMachineWasInErrorState_(false),
368  fileMode_(),
369  emptyRunLumiMode_(),
370  exceptionMessageFiles_(),
371  exceptionMessageRuns_(),
372  exceptionMessageLumis_(),
373  alreadyHandlingException_(false),
374  forceLooperToEnd_(false),
375  looperBeginJobRun_(false),
376  forceESCacheClearOnNewRun_(false),
377  numberOfForkedChildren_(0),
378  numberOfSequentialEventsPerChild_(1),
379  setCpuAffinity_(false),
380  asyncStopRequestedWhileProcessingEvents_(false),
381  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
382  eventSetupDataToExcludeFromPrefetching_()
383  {
384  if(isPython) {
385  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
386  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
388  }
389  else {
390  auto processDesc = std::make_shared<ProcessDesc>(config);
392  }
393  }
394 
395  void
396  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
397  ServiceToken const& iToken,
399 
400  //std::cerr << processDesc->dump() << std::endl;
401 
402  // register the empty parentage vector , once and for all
404 
405  // register the empty parameter set, once and for all.
407 
408  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
409 
410  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
411  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
412  bool const hasSubProcesses = !subProcessVParameterSet.empty();
413 
414  // Now set some parameters specific to the main process.
415  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
416  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
417  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
418  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
419  //threading
420  unsigned int nThreads=1;
421  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
422  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
423  if(nThreads == 0) {
424  nThreads = 1;
425  }
426  }
427  /* TODO: when we support having each stream run in a different thread use this default
428  unsigned int nStreams =nThreads;
429  */
430  unsigned int nStreams =1;
431  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
432  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
433  if(nStreams==0) {
434  nStreams = nThreads;
435  }
436  // PG: Log the number of streams
437  edm::LogInfo("StreamSetup") <<"setting # streams "<<nStreams;
438  }
439  /*
440  bool nRunsSet = false;
441  */
442  unsigned int nConcurrentRuns =1;
443  /*
444  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
445  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
446  }
447  */
448  unsigned int nConcurrentLumis =1;
449  /*
450  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
451  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
452  } else {
453  nConcurrentLumis = nConcurrentRuns;
454  }
455  */
456  //Check that relationships between threading parameters makes sense
457  /*
458  if(nThreads<nStreams) {
459  //bad
460  }
461  if(nConcurrentRuns>nStreams) {
462  //bad
463  }
464  if(nConcurrentRuns>nConcurrentLumis) {
465  //bad
466  }
467  */
468  //forking
469  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
470  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
471  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
472  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
473  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
474  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
475  for(auto const& ps : excluded) {
476  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
477  ps.getUntrackedParameter<std::string>("label", ""));
478  }
479  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
480 
481  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
482 
483  // Now do general initialization
485 
486  //initialize the services
487  auto& serviceSets = processDesc->getServicesPSets();
488  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
489  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
490 
491  //make the services available
493 
494  if(nStreams>1) {
496  handler->willBeUsingThreads();
497  }
498 
499  // intialize miscellaneous items
500  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
501 
502  // intialize the event setup provider
503  esp_ = espController_->makeProvider(*parameterSet);
504 
505  // initialize the looper, if any
506  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
507  if(looper_) {
508  looper_->setActionTable(items.act_table_.get());
509  looper_->attachTo(*items.actReg_);
510 
511  //For now loopers make us run only 1 transition at a time
512  nStreams=1;
513  nConcurrentLumis=1;
514  nConcurrentRuns=1;
515  }
516 
517  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
518 
519  // initialize the input source
520  input_ = makeInput(*parameterSet,
521  *common,
522  items.preg(),
523  items.branchIDListHelper(),
525  items.actReg_,
526  items.processConfiguration(),
528 
529  // intialize the Schedule
530  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
531 
532  // set the data members
534  actReg_ = items.actReg_;
535  preg_ = items.preg();
540  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
541 
542  FDEBUG(2) << parameterSet << std::endl;
543 
545  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
546  // Reusable event principal
547  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
550  }
551 
552  // fill the subprocesses, if there are any
553  subProcesses_.reserve(subProcessVParameterSet.size());
554  for(auto& subProcessPSet : subProcessVParameterSet) {
555  subProcesses_.emplace_back(subProcessPSet,
556  *parameterSet,
557  preg(),
561  *actReg_,
562  token,
565  &processContext_);
566  }
567  }
568 
570  // Make the services available while everything is being deleted.
572  ServiceRegistry::Operate op(token);
573 
574  // manually destroy all these thing that may need the services around
575  // propagate_const<T> has no reset() function
576  espController_ = nullptr;
577  esp_ = nullptr;
578  schedule_ = nullptr;
579  input_ = nullptr;
580  looper_ = nullptr;
581  actReg_ = nullptr;
582 
585  }
586 
587  void
589  if(beginJobCalled_) return;
590  beginJobCalled_=true;
591  bk::beginJob();
592 
593  // StateSentry toerror(this); // should we add this ?
594  //make the services available
596 
601  actReg_->preallocateSignal_(bounds);
603 
604  //NOTE: this may throw
606  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
607 
608  //NOTE: This implementation assumes 'Job' means one call
609  // the EventProcessor::run
610  // If it really means once per 'application' then this code will
611  // have to be changed.
612  // Also have to deal with case where have 'run' then new Module
613  // added and do 'run'
614  // again. In that case the newly added Module needs its 'beginJob'
615  // to be called.
616 
617  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
618  // For now we delay calling beginOfJob until first beginOfRun
619  //if(looper_) {
620  // looper_->beginOfJob(es);
621  //}
622  try {
623  convertException::wrap([&]() {
624  input_->doBeginJob();
625  });
626  }
627  catch(cms::Exception& ex) {
628  ex.addContext("Calling beginJob for the source");
629  throw;
630  }
631  schedule_->beginJob(*preg_);
632  // toerror.succeeded(); // should we add this?
633  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
634  actReg_->postBeginJobSignal_();
635 
636  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
637  schedule_->beginStream(i);
638  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
639  }
640  }
641 
642  void
644  // Collects exceptions, so we don't throw before all operations are performed.
645  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
646 
647  //make the services available
649 
650  //NOTE: this really should go elsewhere in the future
651  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
652  c.call([this,i](){this->schedule_->endStream(i);});
653  for(auto& subProcess : subProcesses_) {
654  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
655  }
656  }
657  auto actReg = actReg_.get();
658  c.call([actReg](){actReg->preEndJobSignal_();});
659  schedule_->endJob(c);
660  for(auto& subProcess : subProcesses_) {
661  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
662  }
663  c.call(std::bind(&InputSource::doEndJob, input_.get()));
664  if(looper_) {
665  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
666  }
667  c.call([actReg](){actReg->postEndJobSignal_();});
668  if(c.hasThrown()) {
669  c.rethrow();
670  }
671  }
672 
675  return serviceToken_;
676  }
677 
678  //Setup signal handler to listen for when forked children stop
679  namespace {
680  //These are volatile since the compiler can not be allowed to optimize them
681  // since they can be modified in the signaller handler
682  volatile bool child_failed = false;
683  volatile unsigned int num_children_done = 0;
684  volatile int child_fail_exit_status = 0;
685  volatile int child_fail_signal = 0;
686 
687  //NOTE: We setup the signal handler to run in the main thread which
688  // is also the same thread that then reads the above values
689 
690  extern "C" {
691  void ep_sigchld(int, siginfo_t*, void*) {
692  //printf("in sigchld\n");
693  //FDEBUG(1) << "in sigchld handler\n";
694  int stat_loc;
695  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
696  while(0<p) {
697  //printf(" looping\n");
698  if(WIFEXITED(stat_loc)) {
699  ++num_children_done;
700  if(0 != WEXITSTATUS(stat_loc)) {
701  child_fail_exit_status = WEXITSTATUS(stat_loc);
702  child_failed = true;
703  }
704  }
705  if(WIFSIGNALED(stat_loc)) {
706  ++num_children_done;
707  child_fail_signal = WTERMSIG(stat_loc);
708  child_failed = true;
709  }
710  p = waitpid(-1, &stat_loc, WNOHANG);
711  }
712  }
713  }
714 
715  }
716 
717  enum {
722  };
723 
724  namespace {
725  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
726  unsigned int n = 0;
727  while(numberOfChildren != 0) {
728  ++n;
729  numberOfChildren /= 10;
730  }
731  if(n == 0) {
732  n = 3; // Protect against zero numberOfChildren
733  }
734  return n;
735  }
736 
737  /*This class embodied the thread which is used to listen to the forked children and
738  then tell them which events they should process */
739  class MessageSenderToSource {
740  public:
741  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
742  void operator()();
743 
744  private:
745  const std::vector<int>& m_childrenPipes;
746  long const m_nEventsToProcess;
747  fd_set m_socketSet;
748  unsigned int m_aliveChildren;
749  int m_maxFd;
750  };
751 
752  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
753  std::vector<int> const& childrenPipes,
754  long iNEventsToProcess):
755  m_childrenPipes(childrenPipes),
756  m_nEventsToProcess(iNEventsToProcess),
757  m_aliveChildren(childrenSockets.size()),
758  m_maxFd(0)
759  {
760  FD_ZERO(&m_socketSet);
761  for (auto const socket : childrenSockets) {
762  FD_SET(socket, &m_socketSet);
763  if (socket > m_maxFd) {
764  m_maxFd = socket;
765  }
766  }
767  for (auto const pipe : childrenPipes) {
768  FD_SET(pipe, &m_socketSet);
769  if (pipe > m_maxFd) {
770  m_maxFd = pipe;
771  }
772  }
773  m_maxFd++; // select reads [0,m_maxFd).
774  }
775 
776  /* This function is the heart of the communication between parent and child.
777  * When ready for more data, the child (see MessageReceiverForSource) requests
778  * data through a AF_UNIX socket message. The parent will then assign the next
779  * chunk of data by sending a message back.
780  *
781  * Additionally, this function also monitors the read-side of the pipe fd from the child.
782  * If the child dies unexpectedly, the pipe will be selected as ready for read and
783  * will return EPIPE when read from. Further, if the child thinks the parent has died
784  * (defined as waiting more than 1s for a response), it will write a single byte to
785  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
786  * If still alive, the parent will read the byte and ignore it.
787  *
788  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
789  * handler can distinguish between success and failure cases.
790  */
791 
792  void
793  MessageSenderToSource::operator()() {
794  multicore::MessageForParent childMsg;
795  LogInfo("ForkingController") << "I am controller";
796  //this is the master and therefore the controller
797 
798  multicore::MessageForSource sndmsg;
799  sndmsg.startIndex = 0;
800  sndmsg.nIndices = m_nEventsToProcess;
801  do {
802 
803  fd_set readSockets, errorSockets;
804  // Wait for a request from a child for events.
805  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
806  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
807  // Note that we don't timeout; may be reconsidered in the future.
808  ssize_t rc;
809  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
810  if (rc < 0) {
811  std::cerr << "select failed; should be impossible due to preconditions.\n";
812  abort();
813  break;
814  }
815 
816  // Read the message from the child.
817  for (int idx=0; idx<m_maxFd; idx++) {
818 
819  // Handle errors
820  if (FD_ISSET(idx, &errorSockets)) {
821  LogInfo("ForkingController") << "Error on socket " << idx;
822  FD_CLR(idx, &m_socketSet);
823  close(idx);
824  // See if it was the watchdog pipe that died.
825  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
826  if (*it == idx) {
827  m_aliveChildren--;
828  }
829  }
830  continue;
831  }
832 
833  if (!FD_ISSET(idx, &readSockets)) {
834  continue;
835  }
836 
837  // See if this FD is a child watchdog pipe. If so, read from it to prevent
838  // writes from blocking.
839  bool is_pipe = false;
840  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
841  if (*it == idx) {
842  is_pipe = true;
843  char buf;
844  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
845  if (rc <= 0) {
846  m_aliveChildren--;
847  FD_CLR(idx, &m_socketSet);
848  close(idx);
849  }
850  }
851  }
852 
853  // Only execute this block if the FD is a socket for sending the child work.
854  if (!is_pipe) {
855  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
856  if (rc < 0) {
857  FD_CLR(idx, &m_socketSet);
858  close(idx);
859  continue;
860  }
861 
862  // Tell the child what events to process.
863  // If 'send' fails, then the child process has failed (any other possibilities are
864  // eliminated because we are using fixed-size messages with Unix datagram sockets).
865  // Thus, the SIGCHLD handler will fire and set child_fail = true.
866  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
867  if (rc < 0) {
868  FD_CLR(idx, &m_socketSet);
869  close(idx);
870  continue;
871  }
872  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
873  sndmsg.startIndex += sndmsg.nIndices;
874  }
875  }
876 
877  } while (m_aliveChildren > 0);
878 
879  return;
880  }
881 
882  }
883 
884 
885  void EventProcessor::possiblyContinueAfterForkChildFailure() {
886  if(child_failed && continueAfterChildFailure_) {
887  if (child_fail_signal) {
888  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
889  child_fail_signal=0;
890  } else if (child_fail_exit_status) {
891  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
892  child_fail_exit_status=0;
893  } else {
894  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
895  }
896  child_failed =false;
897  }
898  }
899 
900  bool
901  EventProcessor::forkProcess(std::string const& jobReportFile) {
902 
903  if(0 == numberOfForkedChildren_) {return true;}
904  assert(0<numberOfForkedChildren_);
905  //do what we want done in common
906  {
907  beginJob(); //make sure this was run
908  // make the services available
909  ServiceRegistry::Operate operate(serviceToken_);
910 
911  InputSource::ItemType itemType;
912  itemType = input_->nextItemType();
913 
914  assert(itemType == InputSource::IsFile);
915  {
916  readFile();
917  }
918  itemType = input_->nextItemType();
919  assert(itemType == InputSource::IsRun);
920 
921  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
922  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
923  input_->runAuxiliary()->beginTime());
924  espController_->eventSetupForInstance(ts);
925  EventSetup const& es = esp_->eventSetup();
926 
927  //now get all the data available in the EventSetup
928  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
929  es.fillAvailableRecordKeys(recordKeys);
930  std::vector<eventsetup::DataKey> dataKeys;
931  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
932  itKey != itEnd;
933  ++itKey) {
934  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
935  //see if this is on our exclusion list
936  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
937  ExcludedData const* excludedData(nullptr);
938  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
939  excludedData = &(itExcludeRec->second);
940  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
941  //skip all items in this record
942  continue;
943  }
944  }
945  if(0 != recordPtr) {
946  dataKeys.clear();
947  recordPtr->fillRegisteredDataKeys(dataKeys);
948  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
949  itDataKey != itDataKeyEnd;
950  ++itDataKey) {
951  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
952  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
953  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
954  continue;
955  }
956  try {
957  recordPtr->doGet(*itDataKey);
958  } catch(cms::Exception& e) {
959  LogWarning("ForkingEventSetupPreFetching") << e.what();
960  }
961  }
962  }
963  }
964  }
965  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
966  {
967  // make the services available
968  ServiceRegistry::Operate operate(serviceToken_);
969  Service<JobReport> jobReport;
970  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
971 
972  //Now actually do the forking
973  actReg_->preForkReleaseResourcesSignal_();
974  input_->doPreForkReleaseResources();
975  schedule_->preForkReleaseResources();
976  }
977  installCustomHandler(SIGCHLD, ep_sigchld);
978 
979 
980  unsigned int childIndex = 0;
981  unsigned int const kMaxChildren = numberOfForkedChildren_;
982  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
983  std::vector<pid_t> childrenIds;
984  childrenIds.reserve(kMaxChildren);
985  std::vector<int> childrenSockets;
986  childrenSockets.reserve(kMaxChildren);
987  std::vector<int> childrenPipes;
988  childrenPipes.reserve(kMaxChildren);
989  std::vector<int> childrenSocketsCopy;
990  childrenSocketsCopy.reserve(kMaxChildren);
991  std::vector<int> childrenPipesCopy;
992  childrenPipesCopy.reserve(kMaxChildren);
993  int pipes[] {0, 0};
994 
995  {
996  // make the services available
997  ServiceRegistry::Operate operate(serviceToken_);
998  Service<JobReport> jobReport;
999  int sockets[2], fd_flags;
1000  for(; childIndex < kMaxChildren; ++childIndex) {
1001  // Create a UNIX_DGRAM socket pair
1002  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1003  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1004  exit(EXIT_FAILURE);
1005  }
1006  if (pipe(pipes)) {
1007  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1008  exit(EXIT_FAILURE);
1009  }
1010  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1011  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1012  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1013  exit(EXIT_FAILURE);
1014  }
1015  // Mark socket as non-block. Child must be careful to do select prior
1016  // to reading from socket.
1017  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1018  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1019  exit(EXIT_FAILURE);
1020  }
1021  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1022  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1023  exit(EXIT_FAILURE);
1024  }
1025  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1026  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1027  exit(EXIT_FAILURE);
1028  }
1029  // Linux man page notes there are some edge cases where reading from a
1030  // fd can block, even after a select.
1031  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1032  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1033  exit(EXIT_FAILURE);
1034  }
1035  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1036  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1037  exit(EXIT_FAILURE);
1038  }
1039 
1040  childrenPipesCopy = childrenPipes;
1041  childrenSocketsCopy = childrenSockets;
1042 
1043  pid_t value = fork();
1044  if(value == 0) {
1045  // Close the parent's side of the socket and pipe which will talk to us.
1046  close(pipes[0]);
1047  close(sockets[0]);
1048  // Close our copies of the parent's other communication pipes.
1049  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1050  close(*it);
1051  }
1052  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1053  close(*it);
1054  }
1055 
1056  // this is the child process, redirect stdout and stderr to a log file
1057  fflush(stdout);
1058  fflush(stderr);
1059  std::stringstream stout;
1060  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1061  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1062  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1063  }
1064  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1065  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1066  }
1067 
1068  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1069  if(setCpuAffinity_) {
1070  // CPU affinity is handled differently on macosx.
1071  // We disable it and print a message until someone reads:
1072  //
1073  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1074  //
1075  // and implements it.
1076 #ifdef __APPLE__
1077  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1078 #else
1079  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1080  cpu_set_t mask;
1081  CPU_ZERO(&mask);
1082  CPU_SET(childIndex, &mask);
1083  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1084  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1085  exit(-1);
1086  }
1087 #endif
1088  }
1089  break;
1090  } else {
1091  //this is the parent
1092  close(pipes[1]);
1093  close(sockets[1]);
1094  }
1095  if(value < 0) {
1096  LogError("ForkingChild") << "failed to create a child";
1097  exit(-1);
1098  }
1099  childrenIds.push_back(value);
1100  childrenSockets.push_back(sockets[0]);
1101  childrenPipes.push_back(pipes[0]);
1102  }
1103 
1104  if(childIndex < kMaxChildren) {
1105  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1106  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1107 
1108  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1109  input_->doPostForkReacquireResources(receiver);
1110  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1111  //NOTE: sources have to reset themselves by listening to the post fork message
1112  //rewindInput();
1113  return true;
1114  }
1115  jobReport->parentAfterFork(jobReportFile);
1116  }
1117 
1118  //this is the original, which is now the master for all the children
1119 
1120  //Need to wait for signals from the children or externally
1121  // To wait we must
1122  // 1) block the signals we want to wait on so we do not have a race condition
1123  // 2) check that we haven't already meet our ending criteria
1124  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1125  sigset_t blockingSigSet;
1126  sigset_t unblockingSigSet;
1127  sigset_t oldSigSet;
1128  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1129  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1130  sigaddset(&blockingSigSet, SIGCHLD);
1131  sigaddset(&blockingSigSet, SIGUSR2);
1132  sigaddset(&blockingSigSet, SIGINT);
1133  sigdelset(&unblockingSigSet, SIGCHLD);
1134  sigdelset(&unblockingSigSet, SIGUSR2);
1135  sigdelset(&unblockingSigSet, SIGINT);
1136  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1137 
1138  // If there are too many fd's (unlikely, but possible) for select, denote this
1139  // because the sender will fail.
1140  bool too_many_fds = false;
1141  if (pipes[1]+1 > FD_SETSIZE) {
1142  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1143  too_many_fds = true;
1144  }
1145 
1146  //create a thread that sends the units of work to workers
1147  // we create it after all signals were blocked so that this
1148  // thread is never interupted by a signal
1149  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1150  boost::thread senderThread(sender);
1151 
1152  if(not too_many_fds) {
1153  //NOTE: a child could have failed before we got here and even after this call
1154  // which is why the 'if' is conditional on continueAfterChildFailure_
1155  possiblyContinueAfterForkChildFailure();
1156  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1157  sigsuspend(&unblockingSigSet);
1158  possiblyContinueAfterForkChildFailure();
1159  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1160  }
1161  }
1162  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1163 
1164  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1165  if(child_failed) {
1166  LogError("ForkingStopping") << "child failed";
1167  }
1168  if(shutdown_flag) {
1169  LogSystem("ForkingStopping") << "asked to shutdown";
1170  }
1171 
1172  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1173  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1174  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1175  it != itEnd; ++it) {
1176  /* int result = */ kill(*it, SIGUSR2);
1177  }
1178  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1179  while(num_children_done != kMaxChildren) {
1180  sigsuspend(&unblockingSigSet);
1181  }
1182  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1183  }
1184  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1185  senderThread.join();
1186  if(child_failed && !continueAfterChildFailure_) {
1187  if (child_fail_signal) {
1188  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1189  } else if (child_fail_exit_status) {
1190  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1191  } else {
1192  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1193  }
1194  }
1195  if(too_many_fds) {
1196  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1197  }
1198  return false;
1199  }
1200 
1201  std::vector<ModuleDescription const*>
1202  EventProcessor::getAllModuleDescriptions() const {
1203  return schedule_->getAllModuleDescriptions();
1204  }
1205 
1206  int
1207  EventProcessor::totalEvents() const {
1208  return schedule_->totalEvents();
1209  }
1210 
1211  int
1212  EventProcessor::totalEventsPassed() const {
1213  return schedule_->totalEventsPassed();
1214  }
1215 
1216  int
1217  EventProcessor::totalEventsFailed() const {
1218  return schedule_->totalEventsFailed();
1219  }
1220 
1221  void
1222  EventProcessor::enableEndPaths(bool active) {
1223  schedule_->enableEndPaths(active);
1224  }
1225 
1226  bool
1227  EventProcessor::endPathsEnabled() const {
1228  return schedule_->endPathsEnabled();
1229  }
1230 
1231  void
1232  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1233  schedule_->getTriggerReport(rep);
1234  }
1235 
1236  void
1237  EventProcessor::clearCounters() {
1238  schedule_->clearCounters();
1239  }
1240 
1241 
1242  std::unique_ptr<statemachine::Machine>
1243  EventProcessor::createStateMachine() {
1245  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1246  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1247  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1248  else {
1249  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1250  << fileMode_ << ".\n"
1251  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1252  }
1253 
1254  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1255  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1256  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1257  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1258  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1259  else {
1260  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1261  << emptyRunLumiMode_ << ".\n"
1262  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1263  }
1264 
1265  auto machine = std::make_unique<statemachine::Machine>(
1266  this,
1267  fileMode,
1268  emptyRunLumiMode);
1269 
1270  machine->initiate();
1271  return machine;
1272  }
1273 
1274  bool
1275  EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
1276  bool returnValue = false;
1277 
1278  // Look for a shutdown signal
1279  if(shutdown_flag.load(std::memory_order_acquire)) {
1280  returnValue = true;
1281  returnCode = epSignal;
1282  }
1283  return returnValue;
1284  }
1285 
1286 
1288  EventProcessor::runToCompletion() {
1289 
1290  StatusCode returnCode=epSuccess;
1291  asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1292  std::unique_ptr<statemachine::Machine> machine;
1293  {
1294  beginJob(); //make sure this was called
1295 
1296  //StatusCode returnCode = epSuccess;
1297  stateMachineWasInErrorState_ = false;
1298 
1299  // make the services available
1300  ServiceRegistry::Operate operate(serviceToken_);
1301 
1302  machine = createStateMachine();
1303  nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1304  asyncStopRequestedWhileProcessingEvents_=false;
1305  try {
1306  convertException::wrap([&]() {
1307 
1308  InputSource::ItemType itemType;
1309 
1310  while(true) {
1311 
1312  bool more = true;
1313  if(numberOfForkedChildren_ > 0) {
1314  size_t size = preg_->size();
1315  {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317  more = input_->skipForForking();
1318  sentry.completedSuccessfully();
1319  }
1320  if(more) {
1321  if(size < preg_->size()) {
1322  principalCache_.adjustIndexesAfterProductRegistryAddition();
1323  }
1324  principalCache_.adjustEventsToNewProductRegistry(preg());
1325  }
1326  }
1327  {
1328  SendSourceTerminationSignalIfException sentry(actReg_.get());
1329  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1330  sentry.completedSuccessfully();
1331  }
1332 
1333  FDEBUG(1) << "itemType = " << itemType << "\n";
1334 
1335  if(checkForAsyncStopRequest(returnCode)) {
1336  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1337  forceLooperToEnd_ = true;
1338  machine->process_event(statemachine::Stop());
1339  forceLooperToEnd_ = false;
1340  break;
1341  }
1342 
1343  if(itemType == InputSource::IsEvent) {
1344  machine->process_event(statemachine::Event());
1345  if(asyncStopRequestedWhileProcessingEvents_) {
1346  forceLooperToEnd_ = true;
1347  machine->process_event(statemachine::Stop());
1348  forceLooperToEnd_ = false;
1349  returnCode = asyncStopStatusCodeFromProcessingEvents_;
1350  break;
1351  }
1352  itemType = nextItemTypeFromProcessingEvents_;
1353  }
1354 
1355  if(itemType == InputSource::IsEvent) {
1356  }
1357  else if(itemType == InputSource::IsStop) {
1358  machine->process_event(statemachine::Stop());
1359  }
1360  else if(itemType == InputSource::IsFile) {
1361  machine->process_event(statemachine::File());
1362  }
1363  else if(itemType == InputSource::IsRun) {
1364  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1365  }
1366  else if(itemType == InputSource::IsLumi) {
1367  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1368  }
1369  else if(itemType == InputSource::IsSynchronize) {
1370  //For now, we don't have to do anything
1371  }
1372  // This should be impossible
1373  else {
1375  << "Unknown next item type passed to EventProcessor\n"
1376  << "Please report this error to the Framework group\n";
1377  }
1378  if(machine->terminated()) {
1379  break;
1380  }
1381  } // End of loop over state machine events
1382  }); // convertException::wrap
1383  } // Try block
1384  // Some comments on exception handling related to the boost state machine:
1385  //
1386  // Some states used in the machine are special because they
1387  // perform actions while the machine is being terminated, actions
1388  // such as close files, call endRun, call endLumi etc ... Each of these
1389  // states has two functions that perform these actions. The functions
1390  // are almost identical. The major difference is that one version
1391  // catches all exceptions and the other lets exceptions pass through.
1392  // The destructor catches them and the other function named "exit" lets
1393  // them pass through. On a normal termination, boost will always call
1394  // "exit" and then the state destructor. In our state classes, the
1395  // the destructors do nothing if the exit function already took
1396  // care of things. Here's the interesting part. When boost is
1397  // handling an exception the "exit" function is not called (a boost
1398  // feature).
1399  //
1400  // If an exception occurs while the boost machine is in control
1401  // (which usually means inside a process_event call), then
1402  // the boost state machine destroys its states and "terminates" itself.
1403  // This already done before we hit the catch blocks below. In this case
1404  // the call to terminateMachine below only destroys an already
1405  // terminated state machine. Because exit is not called, the state destructors
1406  // handle cleaning up lumis, runs, and files. The destructors swallow
1407  // all exceptions and only pass through the exceptions messages, which
1408  // are tacked onto the original exception below.
1409  //
1410  // If an exception occurs when the boost state machine is not
1411  // in control (outside the process_event functions), then boost
1412  // cannot destroy its own states. The terminateMachine function
1413  // below takes care of that. The flag "alreadyHandlingException"
1414  // is set true so that the state exit functions do nothing (and
1415  // cannot throw more exceptions while handling the first). Then the
1416  // state destructors take care of this because exit did nothing.
1417  //
1418  // In both cases above, the EventProcessor::endOfLoop function is
1419  // not called because it can throw exceptions.
1420  //
1421  // One tricky aspect of the state machine is that things that can
1422  // throw should not be invoked by the state machine while another
1423  // exception is being handled.
1424  // Another tricky aspect is that it appears to be important to
1425  // terminate the state machine before invoking its destructor.
1426  // We've seen crashes that are not understood when that is not
1427  // done. Maintainers of this code should be careful about this.
1428 
1429  catch (cms::Exception & e) {
1430  alreadyHandlingException_ = true;
1431  terminateMachine(std::move(machine));
1432  alreadyHandlingException_ = false;
1433  if (!exceptionMessageLumis_.empty()) {
1434  e.addAdditionalInfo(exceptionMessageLumis_);
1435  if (e.alreadyPrinted()) {
1436  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1437  }
1438  }
1439  if (!exceptionMessageRuns_.empty()) {
1440  e.addAdditionalInfo(exceptionMessageRuns_);
1441  if (e.alreadyPrinted()) {
1442  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1443  }
1444  }
1445  if (!exceptionMessageFiles_.empty()) {
1446  e.addAdditionalInfo(exceptionMessageFiles_);
1447  if (e.alreadyPrinted()) {
1448  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1449  }
1450  }
1451  throw;
1452  }
1453 
1454  if(machine->terminated()) {
1455  FDEBUG(1) << "The state machine reports it has been terminated\n";
1456  machine.reset();
1457  }
1458 
1459  if(stateMachineWasInErrorState_) {
1460  throw cms::Exception("BadState")
1461  << "The boost state machine in the EventProcessor exited after\n"
1462  << "entering the Error state.\n";
1463  }
1464 
1465  }
1466  if(machine.get() != nullptr) {
1467  terminateMachine(std::move(machine));
1469  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1470  << "Please report this error to the Framework group\n";
1471  }
1472 
1473  return returnCode;
1474  }
1475 
1476  void EventProcessor::readFile() {
1477  FDEBUG(1) << " \treadFile\n";
1478  size_t size = preg_->size();
1479  SendSourceTerminationSignalIfException sentry(actReg_.get());
1480 
1481  fb_ = input_->readFile();
1482  if(size < preg_->size()) {
1483  principalCache_.adjustIndexesAfterProductRegistryAddition();
1484  }
1485  principalCache_.adjustEventsToNewProductRegistry(preg());
1486  if((numberOfForkedChildren_ > 0) or
1487  (preallocations_.numberOfStreams()>1 and
1488  preallocations_.numberOfThreads()>1)) {
1489  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1490  }
1491  sentry.completedSuccessfully();
1492  }
1493 
1494  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1495  if (fb_.get() != nullptr) {
1496  SendSourceTerminationSignalIfException sentry(actReg_.get());
1497  input_->closeFile(fb_.get(), cleaningUpAfterException);
1498  sentry.completedSuccessfully();
1499  }
1500  FDEBUG(1) << "\tcloseInputFile\n";
1501  }
1502 
1503  void EventProcessor::openOutputFiles() {
1504  if (fb_.get() != nullptr) {
1505  schedule_->openOutputFiles(*fb_);
1506  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1507  }
1508  FDEBUG(1) << "\topenOutputFiles\n";
1509  }
1510 
1511  void EventProcessor::closeOutputFiles() {
1512  if (fb_.get() != nullptr) {
1513  schedule_->closeOutputFiles();
1514  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1515  }
1516  FDEBUG(1) << "\tcloseOutputFiles\n";
1517  }
1518 
1519  void EventProcessor::respondToOpenInputFile() {
1520  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1521  if (fb_.get() != nullptr) {
1522  schedule_->respondToOpenInputFile(*fb_);
1523  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1524  }
1525  FDEBUG(1) << "\trespondToOpenInputFile\n";
1526  }
1527 
1528  void EventProcessor::respondToCloseInputFile() {
1529  if (fb_.get() != nullptr) {
1530  schedule_->respondToCloseInputFile(*fb_);
1531  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1532  }
1533  FDEBUG(1) << "\trespondToCloseInputFile\n";
1534  }
1535 
1536  void EventProcessor::startingNewLoop() {
1537  shouldWeStop_ = false;
1538  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1539  // until after we've called beginOfJob
1540  if(looper_ && looperBeginJobRun_) {
1541  looper_->doStartingNewLoop();
1542  }
1543  FDEBUG(1) << "\tstartingNewLoop\n";
1544  }
1545 
1546  bool EventProcessor::endOfLoop() {
1547  if(looper_) {
1548  ModuleChanger changer(schedule_.get(),preg_.get());
1549  looper_->setModuleChanger(&changer);
1550  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1551  looper_->setModuleChanger(nullptr);
1552  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1553  else return false;
1554  }
1555  FDEBUG(1) << "\tendOfLoop\n";
1556  return true;
1557  }
1558 
1559  void EventProcessor::rewindInput() {
1560  input_->repeat();
1561  input_->rewind();
1562  FDEBUG(1) << "\trewind\n";
1563  }
1564 
1565  void EventProcessor::prepareForNextLoop() {
1566  looper_->prepareForNextLoop(esp_.get());
1567  FDEBUG(1) << "\tprepareForNextLoop\n";
1568  }
1569 
1570  bool EventProcessor::shouldWeCloseOutput() const {
1571  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1572  if(!subProcesses_.empty()) {
1573  for(auto const& subProcess : subProcesses_) {
1574  if(subProcess.shouldWeCloseOutput()) {
1575  return true;
1576  }
1577  }
1578  return false;
1579  }
1580  return schedule_->shouldWeCloseOutput();
1581  }
1582 
1583  void EventProcessor::doErrorStuff() {
1584  FDEBUG(1) << "\tdoErrorStuff\n";
1585  LogError("StateMachine")
1586  << "The EventProcessor state machine encountered an unexpected event\n"
1587  << "and went to the error state\n"
1588  << "Will attempt to terminate processing normally\n"
1589  << "(IF using the looper the next loop will be attempted)\n"
1590  << "This likely indicates a bug in an input module or corrupted input or both\n";
1591  stateMachineWasInErrorState_ = true;
1592  }
1593 
1594  void EventProcessor::beginRun(statemachine::Run const& run) {
1595  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1596  {
1597  SendSourceTerminationSignalIfException sentry(actReg_.get());
1598 
1599  input_->doBeginRun(runPrincipal, &processContext_);
1600  sentry.completedSuccessfully();
1601  }
1602 
1603  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1604  runPrincipal.beginTime());
1605  if(forceESCacheClearOnNewRun_){
1606  espController_->forceCacheClear();
1607  }
1608  {
1609  SendSourceTerminationSignalIfException sentry(actReg_.get());
1610  espController_->eventSetupForInstance(ts);
1611  sentry.completedSuccessfully();
1612  }
1613  EventSetup const& es = esp_->eventSetup();
1614  if(looper_ && looperBeginJobRun_== false) {
1615  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1616  looper_->beginOfJob(es);
1617  looperBeginJobRun_ = true;
1618  looper_->doStartingNewLoop();
1619  }
1620  {
1622  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1623  for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
1624  }
1625  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1626  if(looper_) {
1627  looper_->doBeginRun(runPrincipal, es, &processContext_);
1628  }
1629  {
1631  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1632  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1633  for_all(subProcesses_, [i, &runPrincipal, &ts](auto& subProcess){ subProcess.doStreamBeginRun(i, runPrincipal, ts); });
1634  }
1635  }
1636  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1637  if(looper_) {
1638  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1639  }
1640  }
1641 
1642  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1643  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1644  {
1645  SendSourceTerminationSignalIfException sentry(actReg_.get());
1646 
1647  runPrincipal.setEndTime(input_->timestamp());
1648  runPrincipal.setComplete();
1649  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1650  sentry.completedSuccessfully();
1651  }
1652 
1653  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1654  runPrincipal.endTime());
1655  {
1656  SendSourceTerminationSignalIfException sentry(actReg_.get());
1657  espController_->eventSetupForInstance(ts);
1658  sentry.completedSuccessfully();
1659  }
1660  EventSetup const& es = esp_->eventSetup();
1661  {
1662  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1664  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1665  for_all(subProcesses_, [i, &runPrincipal, &ts, cleaningUpAfterException](auto& subProcess) { subProcess.doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1666  });
1667  }
1668  }
1669  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1670  if(looper_) {
1671  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1672  }
1673  {
1675  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1676  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1677  }
1678  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1679  if(looper_) {
1680  looper_->doEndRun(runPrincipal, es, &processContext_);
1681  }
1682  }
1683 
1684  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1685  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1686  {
1687  SendSourceTerminationSignalIfException sentry(actReg_.get());
1688 
1689  input_->doBeginLumi(lumiPrincipal, &processContext_);
1690  sentry.completedSuccessfully();
1691  }
1692 
1694  if(rng.isAvailable()) {
1695  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1696  rng->preBeginLumi(lb);
1697  }
1698 
1699  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1700  // lumi blocks know their start and end times why not also start and end events?
1701  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1702  {
1703  SendSourceTerminationSignalIfException sentry(actReg_.get());
1704  espController_->eventSetupForInstance(ts);
1705  sentry.completedSuccessfully();
1706  }
1707  EventSetup const& es = esp_->eventSetup();
1708  {
1710  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1711  for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
1712  }
1713  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1714  if(looper_) {
1715  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1716  }
1717  {
1718  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1720  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1721  for_all(subProcesses_, [i, &lumiPrincipal, &ts](auto& subProcess){ subProcess.doStreamBeginLuminosityBlock(i,lumiPrincipal, ts); });
1722  }
1723  }
1724  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1725  if(looper_) {
1726  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1727  }
1728  }
1729 
1730  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1731  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1732  {
1733  SendSourceTerminationSignalIfException sentry(actReg_.get());
1734 
1735  lumiPrincipal.setEndTime(input_->timestamp());
1736  lumiPrincipal.setComplete();
1737  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1738  sentry.completedSuccessfully();
1739  }
1740  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1741  // lumi blocks know their start and end times why not also start and end events?
1742  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1743  lumiPrincipal.endTime());
1744  {
1745  SendSourceTerminationSignalIfException sentry(actReg_.get());
1746  espController_->eventSetupForInstance(ts);
1747  sentry.completedSuccessfully();
1748  }
1749  EventSetup const& es = esp_->eventSetup();
1750  {
1751  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1753  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1754  for_all(subProcesses_, [i, &lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException); });
1755  }
1756  }
1757  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1758  if(looper_) {
1759  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1760  }
1761  {
1763  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1764  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1765  }
1766  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1767  if(looper_) {
1768  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1769  }
1770  }
1771 
1772  statemachine::Run EventProcessor::readRun() {
1773  if (principalCache_.hasRunPrincipal()) {
1775  << "EventProcessor::readRun\n"
1776  << "Illegal attempt to insert run into cache\n"
1777  << "Contact a Framework Developer\n";
1778  }
1779  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1780  {
1781  SendSourceTerminationSignalIfException sentry(actReg_.get());
1782  input_->readRun(*rp, *historyAppender_);
1783  sentry.completedSuccessfully();
1784  }
1785  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1786  principalCache_.insert(rp);
1787  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1788  }
1789 
1790  statemachine::Run EventProcessor::readAndMergeRun() {
1791  principalCache_.merge(input_->runAuxiliary(), preg());
1792  auto runPrincipal =principalCache_.runPrincipalPtr();
1793  {
1794  SendSourceTerminationSignalIfException sentry(actReg_.get());
1795  input_->readAndMergeRun(*runPrincipal);
1796  sentry.completedSuccessfully();
1797  }
1798  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1799  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1800  }
1801 
1802  int EventProcessor::readLuminosityBlock() {
1803  if (principalCache_.hasLumiPrincipal()) {
1805  << "EventProcessor::readRun\n"
1806  << "Illegal attempt to insert lumi into cache\n"
1807  << "Contact a Framework Developer\n";
1808  }
1809  if (!principalCache_.hasRunPrincipal()) {
1811  << "EventProcessor::readRun\n"
1812  << "Illegal attempt to insert lumi into cache\n"
1813  << "Run is invalid\n"
1814  << "Contact a Framework Developer\n";
1815  }
1816  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1817  {
1818  SendSourceTerminationSignalIfException sentry(actReg_.get());
1819  input_->readLuminosityBlock(*lbp, *historyAppender_);
1820  sentry.completedSuccessfully();
1821  }
1822  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1823  principalCache_.insert(lbp);
1824  return input_->luminosityBlock();
1825  }
1826 
1827  int EventProcessor::readAndMergeLumi() {
1828  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1829  {
1830  SendSourceTerminationSignalIfException sentry(actReg_.get());
1831  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1832  sentry.completedSuccessfully();
1833  }
1834  return input_->luminosityBlock();
1835  }
1836 
1837  void EventProcessor::writeRun(statemachine::Run const& run) {
1838  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1839  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1840  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1841  }
1842 
1843  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
1844  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1845  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1846  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1847  }
1848 
1849  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1850  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1851  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1852  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1853  }
1854 
1855  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1856  principalCache_.deleteLumi(phid, run, lumi);
1857  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1858  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1859  }
1860 
1861  class StreamProcessingTask : public tbb::task {
1862  public:
1864  unsigned int iStreamIndex,
1865  std::atomic<bool>* iFinishedProcessingEvents,
1866  tbb::task* iWaitTask):
1867  m_proc(iProc),
1868  m_streamID(iStreamIndex),
1869  m_finishedProcessingEvents(iFinishedProcessingEvents),
1870  m_waitTask(iWaitTask){}
1871 
1872  tbb::task* execute() {
1873  m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1874  m_waitTask->decrement_ref_count();
1875  return nullptr;
1876  }
1877  private:
1879  unsigned int m_streamID;
1882  };
1883 
1884  void EventProcessor::processEventsForStreamAsync(unsigned int iStreamIndex,
1885  std::atomic<bool>* finishedProcessingEvents) {
1886  try {
1887  // make the services available
1888  ServiceRegistry::Operate operate(serviceToken_);
1889  if(preallocations_.numberOfStreams()>1) {
1891  handler->initializeThisThreadForUse();
1892  }
1893 
1894  if(iStreamIndex==0) {
1895  processEvent(0);
1896  }
1897  do {
1898  if(shouldWeStop()) {
1899  break;
1900  }
1901  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1902  //another thread hit an exception
1903  //std::cerr<<"another thread saw an exception\n";
1904  break;
1905  }
1906  {
1907 
1908 
1909  {
1910  //nextItemType and readEvent need to be in same critical section
1911  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1912 
1913  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1914  //std::cerr<<"finishedProcessingEvents\n";
1915  break;
1916  }
1917 
1918  //If source and DelayedReader share a resource we must serialize them
1919  auto sr = input_->resourceSharedWithDelayedReader().second;
1920  std::unique_lock<std::recursive_mutex> delayedReaderGuard;
1921  if(sr) {
1922  delayedReaderGuard = std::unique_lock<std::recursive_mutex>(*sr);
1923  }
1924  InputSource::ItemType itemType = input_->nextItemType();
1925  if (InputSource::IsEvent !=itemType) {
1926  nextItemTypeFromProcessingEvents_ = itemType;
1927  finishedProcessingEvents->store(true,std::memory_order_release);
1928  //std::cerr<<"next item type "<<itemType<<"\n";
1929  break;
1930  }
1931  if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1932  //std::cerr<<"task told to async stop\n";
1933  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1934  break;
1935  }
1936  readEvent(iStreamIndex);
1937  }
1938  }
1939  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1940  //another thread hit an exception
1941  //std::cerr<<"another thread saw an exception\n";
1942  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1943 
1944  break;
1945  }
1946  processEvent(iStreamIndex);
1947  }while(true);
1948  } catch (...) {
1949  bool expected =false;
1950  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1951  deferredExceptionPtr_ = std::current_exception();
1952  }
1953  //std::cerr<<"task caught exception\n";
1954  }
1955  }
1956 
1957  void EventProcessor::readAndProcessEvent() {
1958  if(numberOfForkedChildren_>0) {
1959  readEvent(0);
1960  processEvent(0);
1961  return;
1962  }
1963  nextItemTypeFromProcessingEvents_ = InputSource::IsEvent; //needed for looper
1964  asyncStopRequestedWhileProcessingEvents_ = false;
1965 
1966  std::atomic<bool> finishedProcessingEvents{false};
1967 
1968  //Task assumes Stream 0 has already read the event that caused us to go here
1969  readEvent(0);
1970 
1971  //To wait, the ref count has to b 1+#streams
1972  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1973  eventLoopWaitTask->increment_ref_count();
1974 
1975  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1976  unsigned int iStreamIndex = 0;
1977  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1978  eventLoopWaitTask->increment_ref_count();
1979  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1980 
1981  }
1982  eventLoopWaitTask->increment_ref_count();
1983  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1984  tbb::task::destroy(*eventLoopWaitTask);
1985 
1986  //One of the processing threads saw an exception
1987  if(deferredExceptionPtrIsSet_) {
1988  std::rethrow_exception(deferredExceptionPtr_);
1989  }
1990  }
1991  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1992  //TODO this will have to become per stream
1993  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1994  StreamContext streamContext(event.streamID(), &processContext_);
1995 
1996  SendSourceTerminationSignalIfException sentry(actReg_.get());
1997  input_->readEvent(event, streamContext);
1998  sentry.completedSuccessfully();
1999 
2000  FDEBUG(1) << "\treadEvent\n";
2001  }
2002  void EventProcessor::processEvent(unsigned int iStreamIndex) {
2003  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2004  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2006  if(rng.isAvailable()) {
2007  Event ev(*pep, ModuleDescription(), nullptr);
2008  rng->postEventRead(ev);
2009  }
2010  assert(pep->luminosityBlockPrincipalPtrValid());
2011  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2012  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2013 
2014  //We can only update IOVs on Lumi boundaries
2015  //IOVSyncValue ts(pep->id(), pep->time());
2016  //espController_->eventSetupForInstance(ts);
2017  EventSetup const& es = esp_->eventSetup();
2018  {
2019  schedule_->processOneEvent(iStreamIndex,*pep, es);
2020  for_all(subProcesses_, [pep](auto& subProcess) { subProcess.doEvent(*pep); });
2021  }
2022 
2023  //NOTE: If we have a looper we only have one Stream
2024  if(looper_) {
2025  bool randomAccess = input_->randomAccess();
2026  ProcessingController::ForwardState forwardState = input_->forwardState();
2027  ProcessingController::ReverseState reverseState = input_->reverseState();
2028  ProcessingController pc(forwardState, reverseState, randomAccess);
2029 
2030  EDLooperBase::Status status = EDLooperBase::kContinue;
2031  do {
2032 
2033  StreamContext streamContext(pep->streamID(), &processContext_);
2034  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2035 
2036  bool succeeded = true;
2037  if(randomAccess) {
2038  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2039  input_->skipEvents(-2);
2040  }
2041  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2042  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2043  }
2044  }
2045  pc.setLastOperationSucceeded(succeeded);
2046  } while(!pc.lastOperationSucceeded());
2047  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2048 
2049  }
2050 
2051  FDEBUG(1) << "\tprocessEvent\n";
2052  pep->clearEventPrincipal();
2053  }
2054 
2055  bool EventProcessor::shouldWeStop() const {
2056  FDEBUG(1) << "\tshouldWeStop\n";
2057  if(shouldWeStop_) return true;
2058  if(!subProcesses_.empty()) {
2059  for(auto const& subProcess : subProcesses_) {
2060  if(subProcess.terminate()) {
2061  return true;
2062  }
2063  }
2064  return false;
2065  }
2066  return schedule_->terminate();
2067  }
2068 
2069  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2070  exceptionMessageFiles_ = message;
2071  }
2072 
2073  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2074  exceptionMessageRuns_ = message;
2075  }
2076 
2077  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2078  exceptionMessageLumis_ = message;
2079  }
2080 
2081  bool EventProcessor::alreadyHandlingException() const {
2082  return alreadyHandlingException_;
2083  }
2084 
2085  void EventProcessor::terminateMachine(std::unique_ptr<statemachine::Machine> iMachine) {
2086  if(iMachine.get() != nullptr) {
2087  if(!iMachine->terminated()) {
2088  forceLooperToEnd_ = true;
2089  iMachine->process_event(statemachine::Stop());
2090  forceLooperToEnd_ = false;
2091  }
2092  else {
2093  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2094  }
2095  if(iMachine->terminated()) {
2096  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2097  }
2098  }
2099  }
2100 }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:66
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< tbb::task * > m_waitTask
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
edm::propagate_const< EventProcessor * > m_proc
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::Service< edm::RandomNumberGenerator > rng
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:70
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:61
ParameterSetID id() const
dispatcher processEvent(e, inputTag, standby)
tuple lumi
Definition: fjr2json.py:35
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(m_qm.get())
Timestamp const & endTime() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
processConfiguration
Definition: Schedule.cc:383
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
bool ev
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< std::atomic< bool > * > m_finishedProcessingEvents
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool alreadyPrinted() const
Definition: Exception.cc:251
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:91
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
static InputSourceFactory const * get()
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::shared_ptr< edm::ParameterSet > parameterSet()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:510
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:102
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:40
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
areg
Definition: Schedule.cc:383
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:574
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:63
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::RunNumber_t runNumber() const
Definition: EPStates.h:50
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
static ComponentFactory< T > const * get()
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:57
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:49
PathsAndConsumesOfModules pathsAndConsumesOfModules_
unsigned int RunNumber_t
#define O_NONBLOCK
Definition: SysFile.h:21
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:59
preg
Definition: Schedule.cc:383
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
tuple status
Definition: mps_update.py:57
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31