test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
9 
34 
36 
44 
49 
60 
61 #include "MessageForSource.h"
62 #include "MessageForParent.h"
63 
64 #include "boost/thread/xtime.hpp"
65 
66 #include <exception>
67 #include <iomanip>
68 #include <iostream>
69 #include <utility>
70 #include <sstream>
71 
72 #include <sys/ipc.h>
73 #include <sys/msg.h>
74 
75 #include "tbb/task.h"
76 
77 //Used for forking
78 #include <sys/types.h>
79 #include <sys/wait.h>
80 #include <sys/socket.h>
81 #include <sys/select.h>
82 #include <sys/fcntl.h>
83 #include <unistd.h>
84 
85 //Used for CPU affinity
86 #ifndef __APPLE__
87 #include <sched.h>
88 #endif
89 
90 namespace {
91  //Sentry class to only send a signal if an
92  // exception occurs. An exception is identified
93  // by the destructor being called without first
94  // calling completedSuccessfully().
95  class SendSourceTerminationSignalIfException {
96  public:
97  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
98  reg_(iReg) {}
99  ~SendSourceTerminationSignalIfException() {
100  if(reg_) {
101  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
102  }
103  }
104  void completedSuccessfully() {
105  reg_ = nullptr;
106  }
107  private:
108  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
109 
110 
111  };
112 }
113 
114 namespace edm {
115 
116  // ---------------------------------------------------------------
117  std::unique_ptr<InputSource>
119  CommonParams const& common,
120  std::shared_ptr<ProductRegistry> preg,
121  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
122  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
123  std::shared_ptr<ActivityRegistry> areg,
124  std::shared_ptr<ProcessConfiguration const> processConfiguration,
125  PreallocationConfiguration const& allocations) {
126  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
127  if(main_input == 0) {
129  << "There must be exactly one source in the configuration.\n"
130  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
131  }
132 
133  std::string modtype(main_input->getParameter<std::string>("@module_type"));
134 
135  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
137  ConfigurationDescriptions descriptions(filler->baseType());
138  filler->fill(descriptions);
139 
140  try {
141  convertException::wrap([&]() {
142  descriptions.validate(*main_input, std::string("source"));
143  });
144  }
145  catch (cms::Exception & iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
165 
166  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
167  common.maxEventsInput_, common.maxLumisInput_,
168  common.maxSecondsUntilRampdown_, allocations);
169 
170  areg->preSourceConstructionSignal_(md);
171  std::unique_ptr<InputSource> input;
172  try {
173  //even if we have an exception, send the signal
174  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
175  convertException::wrap([&]() {
176  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
177  });
178  }
179  catch (cms::Exception& iException) {
180  std::ostringstream ost;
181  ost << "Constructing input source of type " << modtype;
182  iException.addContext(ost.str());
183  throw;
184  }
185  return input;
186  }
187 
188  // ---------------------------------------------------------------
189  std::shared_ptr<EDLooperBase>
192  ParameterSet& params) {
193  std::shared_ptr<EDLooperBase> vLooper;
194 
195  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
196 
197  if(loopers.size() == 0) {
198  return vLooper;
199  }
200 
201  assert(1 == loopers.size());
202 
203  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
204  itName != itNameEnd;
205  ++itName) {
206 
207  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
208  providerPSet->registerIt();
209  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
210  cp,
211  *providerPSet);
212  }
213  return vLooper;
214 
215  }
216 
217  // ---------------------------------------------------------------
219  ServiceToken const& iToken,
221  std::vector<std::string> const& defaultServices,
222  std::vector<std::string> const& forcedServices) :
223  actReg_(),
224  preg_(),
225  branchIDListHelper_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
231  processConfiguration_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
237  deferredExceptionPtrIsSet_(false),
238  principalCache_(),
239  beginJobCalled_(false),
240  shouldWeStop_(false),
241  stateMachineWasInErrorState_(false),
242  fileMode_(),
243  emptyRunLumiMode_(),
244  exceptionMessageFiles_(),
245  exceptionMessageRuns_(),
246  exceptionMessageLumis_(),
247  alreadyHandlingException_(false),
248  forceLooperToEnd_(false),
249  looperBeginJobRun_(false),
250  forceESCacheClearOnNewRun_(false),
251  numberOfForkedChildren_(0),
252  numberOfSequentialEventsPerChild_(1),
253  setCpuAffinity_(false),
254  eventSetupDataToExcludeFromPrefetching_() {
255  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
256  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
257  processDesc->addServices(defaultServices, forcedServices);
258  init(processDesc, iToken, iLegacy);
259  }
260 
262  std::vector<std::string> const& defaultServices,
263  std::vector<std::string> const& forcedServices) :
264  actReg_(),
265  preg_(),
266  branchIDListHelper_(),
267  serviceToken_(),
268  input_(),
269  espController_(new eventsetup::EventSetupsController),
270  esp_(),
271  act_table_(),
272  processConfiguration_(),
273  schedule_(),
274  subProcesses_(),
275  historyAppender_(new HistoryAppender),
276  fb_(),
277  looper_(),
278  deferredExceptionPtrIsSet_(false),
279  principalCache_(),
280  beginJobCalled_(false),
281  shouldWeStop_(false),
282  stateMachineWasInErrorState_(false),
283  fileMode_(),
284  emptyRunLumiMode_(),
285  exceptionMessageFiles_(),
286  exceptionMessageRuns_(),
287  exceptionMessageLumis_(),
288  alreadyHandlingException_(false),
289  forceLooperToEnd_(false),
290  looperBeginJobRun_(false),
291  forceESCacheClearOnNewRun_(false),
292  numberOfForkedChildren_(0),
293  numberOfSequentialEventsPerChild_(1),
294  setCpuAffinity_(false),
295  asyncStopRequestedWhileProcessingEvents_(false),
296  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
297  eventSetupDataToExcludeFromPrefetching_()
298  {
299  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
300  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
301  processDesc->addServices(defaultServices, forcedServices);
303  }
304 
305  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc>& processDesc,
306  ServiceToken const& token,
308  actReg_(),
309  preg_(),
310  branchIDListHelper_(),
311  serviceToken_(),
312  input_(),
313  espController_(new eventsetup::EventSetupsController),
314  esp_(),
315  act_table_(),
316  processConfiguration_(),
317  schedule_(),
318  subProcesses_(),
319  historyAppender_(new HistoryAppender),
320  fb_(),
321  looper_(),
322  deferredExceptionPtrIsSet_(false),
323  principalCache_(),
324  beginJobCalled_(false),
325  shouldWeStop_(false),
326  stateMachineWasInErrorState_(false),
327  fileMode_(),
328  emptyRunLumiMode_(),
329  exceptionMessageFiles_(),
330  exceptionMessageRuns_(),
331  exceptionMessageLumis_(),
332  alreadyHandlingException_(false),
333  forceLooperToEnd_(false),
334  looperBeginJobRun_(false),
335  forceESCacheClearOnNewRun_(false),
336  numberOfForkedChildren_(0),
337  numberOfSequentialEventsPerChild_(1),
338  setCpuAffinity_(false),
339  asyncStopRequestedWhileProcessingEvents_(false),
340  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
341  eventSetupDataToExcludeFromPrefetching_()
342  {
343  init(processDesc, token, legacy);
344  }
345 
346 
348  actReg_(),
349  preg_(),
350  branchIDListHelper_(),
351  serviceToken_(),
352  input_(),
353  espController_(new eventsetup::EventSetupsController),
354  esp_(),
355  act_table_(),
356  processConfiguration_(),
357  schedule_(),
358  subProcesses_(),
359  historyAppender_(new HistoryAppender),
360  fb_(),
361  looper_(),
362  deferredExceptionPtrIsSet_(false),
363  principalCache_(),
364  beginJobCalled_(false),
365  shouldWeStop_(false),
366  stateMachineWasInErrorState_(false),
367  fileMode_(),
368  emptyRunLumiMode_(),
369  exceptionMessageFiles_(),
370  exceptionMessageRuns_(),
371  exceptionMessageLumis_(),
372  alreadyHandlingException_(false),
373  forceLooperToEnd_(false),
374  looperBeginJobRun_(false),
375  forceESCacheClearOnNewRun_(false),
376  numberOfForkedChildren_(0),
377  numberOfSequentialEventsPerChild_(1),
378  setCpuAffinity_(false),
379  asyncStopRequestedWhileProcessingEvents_(false),
380  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
381  eventSetupDataToExcludeFromPrefetching_()
382 {
383  if(isPython) {
384  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
385  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
387  }
388  else {
389  auto processDesc = std::make_shared<ProcessDesc>(config);
391  }
392  }
393 
394  void
395  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
396  ServiceToken const& iToken,
398 
399  //std::cerr << processDesc->dump() << std::endl;
400 
401  // register the empty parentage vector , once and for all
403 
404  // register the empty parameter set, once and for all.
406 
407  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
408 
409  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
410  std::unique_ptr<std::vector<ParameterSet> > subProcessVParameterSet(popSubProcessVParameterSet(*parameterSet));
411  bool hasSubProcesses = subProcessVParameterSet.get() != nullptr;
412 
413  // Now set some parameters specific to the main process.
414  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
415  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
416  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
417  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
418  //threading
419  unsigned int nThreads=1;
420  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
421  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
422  if(nThreads == 0) {
423  nThreads = 1;
424  }
425  }
426  /* TODO: when we support having each stream run in a different thread use this default
427  unsigned int nStreams =nThreads;
428  */
429  unsigned int nStreams =1;
430  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
431  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
432  if(nStreams==0) {
433  nStreams = nThreads;
434  }
435  }
436  /*
437  bool nRunsSet = false;
438  */
439  unsigned int nConcurrentRuns =1;
440  /*
441  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
442  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
443  }
444  */
445  unsigned int nConcurrentLumis =1;
446  /*
447  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
448  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
449  } else {
450  nConcurrentLumis = nConcurrentRuns;
451  }
452  */
453  //Check that relationships between threading parameters makes sense
454  /*
455  if(nThreads<nStreams) {
456  //bad
457  }
458  if(nConcurrentRuns>nStreams) {
459  //bad
460  }
461  if(nConcurrentRuns>nConcurrentLumis) {
462  //bad
463  }
464  */
465  //forking
466  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
467  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
468  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
469  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
470  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
471  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
472  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
473  itPS != itPSEnd;
474  ++itPS) {
475  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
476  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
477  itPS->getUntrackedParameter<std::string>("label", "")));
478  }
479  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
480 
481  // Now do general initialization
483 
484  //initialize the services
485  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
486  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
487  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
488 
489  //make the services available
491 
492  if(nStreams>1) {
494  handler->willBeUsingThreads();
495  }
496 
497  // intialize miscellaneous items
498  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
499 
500  // intialize the event setup provider
501  esp_ = espController_->makeProvider(*parameterSet);
502 
503  // initialize the looper, if any
504  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
505  if(looper_) {
506  looper_->setActionTable(items.act_table_.get());
507  looper_->attachTo(*items.actReg_);
508 
509  //For now loopers make us run only 1 transition at a time
510  nStreams=1;
511  nConcurrentLumis=1;
512  nConcurrentRuns=1;
513  }
514 
515  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
516 
517  // initialize the input source
518  input_ = makeInput(*parameterSet,
519  *common,
520  items.preg(),
521  items.branchIDListHelper(),
523  items.actReg_,
524  items.processConfiguration(),
526 
527  // intialize the Schedule
528  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
529 
530  // set the data members
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
538  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
547  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
548  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
550  }
551  // initialize the subprocess, if there is one
552  if(hasSubProcesses) {
553  if(subProcesses_ == nullptr) {
554  subProcesses_ = std::make_unique<std::vector<SubProcess> >();
555  }
556  subProcesses_->reserve(subProcessVParameterSet->size());
557  for(auto& subProcessPSet : *subProcessVParameterSet) {
558  subProcesses_->emplace_back(subProcessPSet,
559  *parameterSet,
560  preg(),
564  *actReg_,
565  token,
568  &processContext_);
569  }
570  }
571  }
572 
574  // Make the services available while everything is being deleted.
576  ServiceRegistry::Operate op(token);
577 
578  // manually destroy all these thing that may need the services around
579  // propagate_const<T> has no reset() function
580  espController_ = nullptr;
581  subProcesses_ = nullptr;
582  esp_ = nullptr;
583  schedule_ = nullptr;
584  input_ = nullptr;
585  looper_ = nullptr;
586  actReg_ = nullptr;
587 
590  }
591 
592  void
594  if(beginJobCalled_) return;
595  beginJobCalled_=true;
596  bk::beginJob();
597 
598  // StateSentry toerror(this); // should we add this ?
599  //make the services available
601 
606  actReg_->preallocateSignal_(bounds);
608  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
609 
610  //NOTE: This implementation assumes 'Job' means one call
611  // the EventProcessor::run
612  // If it really means once per 'application' then this code will
613  // have to be changed.
614  // Also have to deal with case where have 'run' then new Module
615  // added and do 'run'
616  // again. In that case the newly added Module needs its 'beginJob'
617  // to be called.
618 
619  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
620  // For now we delay calling beginOfJob until first beginOfRun
621  //if(looper_) {
622  // looper_->beginOfJob(es);
623  //}
624  try {
625  convertException::wrap([&]() {
626  input_->doBeginJob();
627  });
628  }
629  catch(cms::Exception& ex) {
630  ex.addContext("Calling beginJob for the source");
631  throw;
632  }
633  schedule_->beginJob(*preg_);
634  // toerror.succeeded(); // should we add this?
635  if(hasSubProcesses()) {
636  for(auto& subProcess : *subProcesses_) {
637  subProcess.doBeginJob();
638  }
639  }
640  actReg_->postBeginJobSignal_();
641 
642  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
643  schedule_->beginStream(i);
644  if(hasSubProcesses()) {
645  for(auto& subProcess : *subProcesses_) {
646  subProcess.doBeginStream(i);
647  }
648  }
649  }
650  }
651 
652  void
654  // Collects exceptions, so we don't throw before all operations are performed.
655  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
656 
657  //make the services available
659 
660  //NOTE: this really should go elsewhere in the future
661  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
662  c.call([this,i](){this->schedule_->endStream(i);});
663  if(hasSubProcesses()) {
664  for(auto& subProcess : *subProcesses_) {
665  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
666  }
667  }
668  }
669  auto actReg = actReg_.get();
670  c.call([actReg](){actReg->preEndJobSignal_();});
671  schedule_->endJob(c);
672  if(hasSubProcesses()) {
673  for(auto& subProcess : *subProcesses_) {
674  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
675  }
676  }
677  c.call(std::bind(&InputSource::doEndJob, input_.get()));
678  if(looper_) {
679  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
680  }
681  c.call([actReg](){actReg->postEndJobSignal_();});
682  if(c.hasThrown()) {
683  c.rethrow();
684  }
685  }
686 
689  return serviceToken_;
690  }
691 
692  //Setup signal handler to listen for when forked children stop
693  namespace {
694  //These are volatile since the compiler can not be allowed to optimize them
695  // since they can be modified in the signaller handler
696  volatile bool child_failed = false;
697  volatile unsigned int num_children_done = 0;
698  volatile int child_fail_exit_status = 0;
699  volatile int child_fail_signal = 0;
700 
701  //NOTE: We setup the signal handler to run in the main thread which
702  // is also the same thread that then reads the above values
703 
704  extern "C" {
705  void ep_sigchld(int, siginfo_t*, void*) {
706  //printf("in sigchld\n");
707  //FDEBUG(1) << "in sigchld handler\n";
708  int stat_loc;
709  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
710  while(0<p) {
711  //printf(" looping\n");
712  if(WIFEXITED(stat_loc)) {
713  ++num_children_done;
714  if(0 != WEXITSTATUS(stat_loc)) {
715  child_fail_exit_status = WEXITSTATUS(stat_loc);
716  child_failed = true;
717  }
718  }
719  if(WIFSIGNALED(stat_loc)) {
720  ++num_children_done;
721  child_fail_signal = WTERMSIG(stat_loc);
722  child_failed = true;
723  }
724  p = waitpid(-1, &stat_loc, WNOHANG);
725  }
726  }
727  }
728 
729  }
730 
731  enum {
736  };
737 
738  namespace {
739  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
740  unsigned int n = 0;
741  while(numberOfChildren != 0) {
742  ++n;
743  numberOfChildren /= 10;
744  }
745  if(n == 0) {
746  n = 3; // Protect against zero numberOfChildren
747  }
748  return n;
749  }
750 
751  /*This class embodied the thread which is used to listen to the forked children and
752  then tell them which events they should process */
753  class MessageSenderToSource {
754  public:
755  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
756  void operator()();
757 
758  private:
759  const std::vector<int>& m_childrenPipes;
760  long const m_nEventsToProcess;
761  fd_set m_socketSet;
762  unsigned int m_aliveChildren;
763  int m_maxFd;
764  };
765 
766  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
767  std::vector<int> const& childrenPipes,
768  long iNEventsToProcess):
769  m_childrenPipes(childrenPipes),
770  m_nEventsToProcess(iNEventsToProcess),
771  m_aliveChildren(childrenSockets.size()),
772  m_maxFd(0)
773  {
774  FD_ZERO(&m_socketSet);
775  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
776  it != itEnd; it++) {
777  FD_SET(*it, &m_socketSet);
778  if (*it > m_maxFd) {
779  m_maxFd = *it;
780  }
781  }
782  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
783  it != itEnd; ++it) {
784  FD_SET(*it, &m_socketSet);
785  if (*it > m_maxFd) {
786  m_maxFd = *it;
787  }
788  }
789  m_maxFd++; // select reads [0,m_maxFd).
790  }
791 
792  /* This function is the heart of the communication between parent and child.
793  * When ready for more data, the child (see MessageReceiverForSource) requests
794  * data through a AF_UNIX socket message. The parent will then assign the next
795  * chunk of data by sending a message back.
796  *
797  * Additionally, this function also monitors the read-side of the pipe fd from the child.
798  * If the child dies unexpectedly, the pipe will be selected as ready for read and
799  * will return EPIPE when read from. Further, if the child thinks the parent has died
800  * (defined as waiting more than 1s for a response), it will write a single byte to
801  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
802  * If still alive, the parent will read the byte and ignore it.
803  *
804  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
805  * handler can distinguish between success and failure cases.
806  */
807 
808  void
809  MessageSenderToSource::operator()() {
810  multicore::MessageForParent childMsg;
811  LogInfo("ForkingController") << "I am controller";
812  //this is the master and therefore the controller
813 
814  multicore::MessageForSource sndmsg;
815  sndmsg.startIndex = 0;
816  sndmsg.nIndices = m_nEventsToProcess;
817  do {
818 
819  fd_set readSockets, errorSockets;
820  // Wait for a request from a child for events.
821  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
822  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
823  // Note that we don't timeout; may be reconsidered in the future.
824  ssize_t rc;
825  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
826  if (rc < 0) {
827  std::cerr << "select failed; should be impossible due to preconditions.\n";
828  abort();
829  break;
830  }
831 
832  // Read the message from the child.
833  for (int idx=0; idx<m_maxFd; idx++) {
834 
835  // Handle errors
836  if (FD_ISSET(idx, &errorSockets)) {
837  LogInfo("ForkingController") << "Error on socket " << idx;
838  FD_CLR(idx, &m_socketSet);
839  close(idx);
840  // See if it was the watchdog pipe that died.
841  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
842  if (*it == idx) {
843  m_aliveChildren--;
844  }
845  }
846  continue;
847  }
848 
849  if (!FD_ISSET(idx, &readSockets)) {
850  continue;
851  }
852 
853  // See if this FD is a child watchdog pipe. If so, read from it to prevent
854  // writes from blocking.
855  bool is_pipe = false;
856  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
857  if (*it == idx) {
858  is_pipe = true;
859  char buf;
860  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
861  if (rc <= 0) {
862  m_aliveChildren--;
863  FD_CLR(idx, &m_socketSet);
864  close(idx);
865  }
866  }
867  }
868 
869  // Only execute this block if the FD is a socket for sending the child work.
870  if (!is_pipe) {
871  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
872  if (rc < 0) {
873  FD_CLR(idx, &m_socketSet);
874  close(idx);
875  continue;
876  }
877 
878  // Tell the child what events to process.
879  // If 'send' fails, then the child process has failed (any other possibilities are
880  // eliminated because we are using fixed-size messages with Unix datagram sockets).
881  // Thus, the SIGCHLD handler will fire and set child_fail = true.
882  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
883  if (rc < 0) {
884  FD_CLR(idx, &m_socketSet);
885  close(idx);
886  continue;
887  }
888  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
889  sndmsg.startIndex += sndmsg.nIndices;
890  }
891  }
892 
893  } while (m_aliveChildren > 0);
894 
895  return;
896  }
897 
898  }
899 
900 
901  void EventProcessor::possiblyContinueAfterForkChildFailure() {
902  if(child_failed && continueAfterChildFailure_) {
903  if (child_fail_signal) {
904  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
905  child_fail_signal=0;
906  } else if (child_fail_exit_status) {
907  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
908  child_fail_exit_status=0;
909  } else {
910  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
911  }
912  child_failed =false;
913  }
914  }
915 
916  bool
917  EventProcessor::forkProcess(std::string const& jobReportFile) {
918 
919  if(0 == numberOfForkedChildren_) {return true;}
920  assert(0<numberOfForkedChildren_);
921  //do what we want done in common
922  {
923  beginJob(); //make sure this was run
924  // make the services available
925  ServiceRegistry::Operate operate(serviceToken_);
926 
927  InputSource::ItemType itemType;
928  itemType = input_->nextItemType();
929 
930  assert(itemType == InputSource::IsFile);
931  {
932  readFile();
933  }
934  itemType = input_->nextItemType();
935  assert(itemType == InputSource::IsRun);
936 
937  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
938  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
939  input_->runAuxiliary()->beginTime());
940  espController_->eventSetupForInstance(ts);
941  EventSetup const& es = esp_->eventSetup();
942 
943  //now get all the data available in the EventSetup
944  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
945  es.fillAvailableRecordKeys(recordKeys);
946  std::vector<eventsetup::DataKey> dataKeys;
947  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
948  itKey != itEnd;
949  ++itKey) {
950  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
951  //see if this is on our exclusion list
952  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
953  ExcludedData const* excludedData(nullptr);
954  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
955  excludedData = &(itExcludeRec->second);
956  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
957  //skip all items in this record
958  continue;
959  }
960  }
961  if(0 != recordPtr) {
962  dataKeys.clear();
963  recordPtr->fillRegisteredDataKeys(dataKeys);
964  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
965  itDataKey != itDataKeyEnd;
966  ++itDataKey) {
967  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
968  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
969  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
970  continue;
971  }
972  try {
973  recordPtr->doGet(*itDataKey);
974  } catch(cms::Exception& e) {
975  LogWarning("ForkingEventSetupPreFetching") << e.what();
976  }
977  }
978  }
979  }
980  }
981  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
982  {
983  // make the services available
984  ServiceRegistry::Operate operate(serviceToken_);
985  Service<JobReport> jobReport;
986  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
987 
988  //Now actually do the forking
989  actReg_->preForkReleaseResourcesSignal_();
990  input_->doPreForkReleaseResources();
991  schedule_->preForkReleaseResources();
992  }
993  installCustomHandler(SIGCHLD, ep_sigchld);
994 
995 
996  unsigned int childIndex = 0;
997  unsigned int const kMaxChildren = numberOfForkedChildren_;
998  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
999  std::vector<pid_t> childrenIds;
1000  childrenIds.reserve(kMaxChildren);
1001  std::vector<int> childrenSockets;
1002  childrenSockets.reserve(kMaxChildren);
1003  std::vector<int> childrenPipes;
1004  childrenPipes.reserve(kMaxChildren);
1005  std::vector<int> childrenSocketsCopy;
1006  childrenSocketsCopy.reserve(kMaxChildren);
1007  std::vector<int> childrenPipesCopy;
1008  childrenPipesCopy.reserve(kMaxChildren);
1009  int pipes[] {0, 0};
1010 
1011  {
1012  // make the services available
1013  ServiceRegistry::Operate operate(serviceToken_);
1014  Service<JobReport> jobReport;
1015  int sockets[2], fd_flags;
1016  for(; childIndex < kMaxChildren; ++childIndex) {
1017  // Create a UNIX_DGRAM socket pair
1018  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1019  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022  if (pipe(pipes)) {
1023  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1027  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1028  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1029  exit(EXIT_FAILURE);
1030  }
1031  // Mark socket as non-block. Child must be careful to do select prior
1032  // to reading from socket.
1033  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1034  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1035  exit(EXIT_FAILURE);
1036  }
1037  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1038  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1039  exit(EXIT_FAILURE);
1040  }
1041  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1042  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1043  exit(EXIT_FAILURE);
1044  }
1045  // Linux man page notes there are some edge cases where reading from a
1046  // fd can block, even after a select.
1047  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1048  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1049  exit(EXIT_FAILURE);
1050  }
1051  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1052  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1053  exit(EXIT_FAILURE);
1054  }
1055 
1056  childrenPipesCopy = childrenPipes;
1057  childrenSocketsCopy = childrenSockets;
1058 
1059  pid_t value = fork();
1060  if(value == 0) {
1061  // Close the parent's side of the socket and pipe which will talk to us.
1062  close(pipes[0]);
1063  close(sockets[0]);
1064  // Close our copies of the parent's other communication pipes.
1065  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1066  close(*it);
1067  }
1068  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1069  close(*it);
1070  }
1071 
1072  // this is the child process, redirect stdout and stderr to a log file
1073  fflush(stdout);
1074  fflush(stderr);
1075  std::stringstream stout;
1076  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1077  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1078  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1079  }
1080  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1081  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1082  }
1083 
1084  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1085  if(setCpuAffinity_) {
1086  // CPU affinity is handled differently on macosx.
1087  // We disable it and print a message until someone reads:
1088  //
1089  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1090  //
1091  // and implements it.
1092 #ifdef __APPLE__
1093  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1094 #else
1095  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1096  cpu_set_t mask;
1097  CPU_ZERO(&mask);
1098  CPU_SET(childIndex, &mask);
1099  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1100  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1101  exit(-1);
1102  }
1103 #endif
1104  }
1105  break;
1106  } else {
1107  //this is the parent
1108  close(pipes[1]);
1109  close(sockets[1]);
1110  }
1111  if(value < 0) {
1112  LogError("ForkingChild") << "failed to create a child";
1113  exit(-1);
1114  }
1115  childrenIds.push_back(value);
1116  childrenSockets.push_back(sockets[0]);
1117  childrenPipes.push_back(pipes[0]);
1118  }
1119 
1120  if(childIndex < kMaxChildren) {
1121  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1122  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1123 
1124  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1125  input_->doPostForkReacquireResources(receiver);
1126  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1127  //NOTE: sources have to reset themselves by listening to the post fork message
1128  //rewindInput();
1129  return true;
1130  }
1131  jobReport->parentAfterFork(jobReportFile);
1132  }
1133 
1134  //this is the original, which is now the master for all the children
1135 
1136  //Need to wait for signals from the children or externally
1137  // To wait we must
1138  // 1) block the signals we want to wait on so we do not have a race condition
1139  // 2) check that we haven't already meet our ending criteria
1140  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1141  sigset_t blockingSigSet;
1142  sigset_t unblockingSigSet;
1143  sigset_t oldSigSet;
1144  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1145  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1146  sigaddset(&blockingSigSet, SIGCHLD);
1147  sigaddset(&blockingSigSet, SIGUSR2);
1148  sigaddset(&blockingSigSet, SIGINT);
1149  sigdelset(&unblockingSigSet, SIGCHLD);
1150  sigdelset(&unblockingSigSet, SIGUSR2);
1151  sigdelset(&unblockingSigSet, SIGINT);
1152  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1153 
1154  // If there are too many fd's (unlikely, but possible) for select, denote this
1155  // because the sender will fail.
1156  bool too_many_fds = false;
1157  if (pipes[1]+1 > FD_SETSIZE) {
1158  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1159  too_many_fds = true;
1160  }
1161 
1162  //create a thread that sends the units of work to workers
1163  // we create it after all signals were blocked so that this
1164  // thread is never interupted by a signal
1165  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1166  boost::thread senderThread(sender);
1167 
1168  if(not too_many_fds) {
1169  //NOTE: a child could have failed before we got here and even after this call
1170  // which is why the 'if' is conditional on continueAfterChildFailure_
1171  possiblyContinueAfterForkChildFailure();
1172  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1173  sigsuspend(&unblockingSigSet);
1174  possiblyContinueAfterForkChildFailure();
1175  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1176  }
1177  }
1178  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1179 
1180  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1181  if(child_failed) {
1182  LogError("ForkingStopping") << "child failed";
1183  }
1184  if(shutdown_flag) {
1185  LogSystem("ForkingStopping") << "asked to shutdown";
1186  }
1187 
1188  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1189  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1190  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1191  it != itEnd; ++it) {
1192  /* int result = */ kill(*it, SIGUSR2);
1193  }
1194  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1195  while(num_children_done != kMaxChildren) {
1196  sigsuspend(&unblockingSigSet);
1197  }
1198  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1199  }
1200  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1201  senderThread.join();
1202  if(child_failed && !continueAfterChildFailure_) {
1203  if (child_fail_signal) {
1204  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1205  } else if (child_fail_exit_status) {
1206  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1207  } else {
1208  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1209  }
1210  }
1211  if(too_many_fds) {
1212  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1213  }
1214  return false;
1215  }
1216 
1217  std::vector<ModuleDescription const*>
1218  EventProcessor::getAllModuleDescriptions() const {
1219  return schedule_->getAllModuleDescriptions();
1220  }
1221 
1222  int
1223  EventProcessor::totalEvents() const {
1224  return schedule_->totalEvents();
1225  }
1226 
1227  int
1228  EventProcessor::totalEventsPassed() const {
1229  return schedule_->totalEventsPassed();
1230  }
1231 
1232  int
1233  EventProcessor::totalEventsFailed() const {
1234  return schedule_->totalEventsFailed();
1235  }
1236 
1237  void
1238  EventProcessor::enableEndPaths(bool active) {
1239  schedule_->enableEndPaths(active);
1240  }
1241 
1242  bool
1243  EventProcessor::endPathsEnabled() const {
1244  return schedule_->endPathsEnabled();
1245  }
1246 
1247  void
1248  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1249  schedule_->getTriggerReport(rep);
1250  }
1251 
1252  void
1253  EventProcessor::clearCounters() {
1254  schedule_->clearCounters();
1255  }
1256 
1257 
1258  std::auto_ptr<statemachine::Machine>
1259  EventProcessor::createStateMachine() {
1261  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1262  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1263  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1264  else {
1265  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1266  << fileMode_ << ".\n"
1267  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1268  }
1269 
1270  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1271  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1272  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1273  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1274  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1275  else {
1276  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1277  << emptyRunLumiMode_ << ".\n"
1278  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1279  }
1280 
1281  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1282  fileMode,
1283  emptyRunLumiMode));
1284 
1285  machine->initiate();
1286  return machine;
1287  }
1288 
1289  bool
1290  EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
1291  bool returnValue = false;
1292 
1293  // Look for a shutdown signal
1294  if(shutdown_flag.load(std::memory_order_acquire)) {
1295  returnValue = true;
1296  returnCode = epSignal;
1297  }
1298  return returnValue;
1299  }
1300 
1301 
1303  EventProcessor::runToCompletion() {
1304 
1305  StatusCode returnCode=epSuccess;
1306  asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1307  std::auto_ptr<statemachine::Machine> machine;
1308  {
1309  beginJob(); //make sure this was called
1310 
1311  //StatusCode returnCode = epSuccess;
1312  stateMachineWasInErrorState_ = false;
1313 
1314  // make the services available
1315  ServiceRegistry::Operate operate(serviceToken_);
1316 
1317  machine = createStateMachine();
1318  nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1319  asyncStopRequestedWhileProcessingEvents_=false;
1320  try {
1321  convertException::wrap([&]() {
1322 
1323  InputSource::ItemType itemType;
1324 
1325  while(true) {
1326 
1327  bool more = true;
1328  if(numberOfForkedChildren_ > 0) {
1329  size_t size = preg_->size();
1330  {
1331  SendSourceTerminationSignalIfException sentry(actReg_.get());
1332  more = input_->skipForForking();
1333  sentry.completedSuccessfully();
1334  }
1335  if(more) {
1336  if(size < preg_->size()) {
1337  principalCache_.adjustIndexesAfterProductRegistryAddition();
1338  }
1339  principalCache_.adjustEventsToNewProductRegistry(preg());
1340  }
1341  }
1342  {
1343  SendSourceTerminationSignalIfException sentry(actReg_.get());
1344  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1345  sentry.completedSuccessfully();
1346  }
1347 
1348  FDEBUG(1) << "itemType = " << itemType << "\n";
1349 
1350  if(checkForAsyncStopRequest(returnCode)) {
1351  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1352  forceLooperToEnd_ = true;
1353  machine->process_event(statemachine::Stop());
1354  forceLooperToEnd_ = false;
1355  break;
1356  }
1357 
1358  if(itemType == InputSource::IsEvent) {
1359  machine->process_event(statemachine::Event());
1360  if(asyncStopRequestedWhileProcessingEvents_) {
1361  forceLooperToEnd_ = true;
1362  machine->process_event(statemachine::Stop());
1363  forceLooperToEnd_ = false;
1364  returnCode = asyncStopStatusCodeFromProcessingEvents_;
1365  break;
1366  }
1367  itemType = nextItemTypeFromProcessingEvents_;
1368  }
1369 
1370  if(itemType == InputSource::IsEvent) {
1371  }
1372  else if(itemType == InputSource::IsStop) {
1373  machine->process_event(statemachine::Stop());
1374  }
1375  else if(itemType == InputSource::IsFile) {
1376  machine->process_event(statemachine::File());
1377  }
1378  else if(itemType == InputSource::IsRun) {
1379  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1380  }
1381  else if(itemType == InputSource::IsLumi) {
1382  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1383  }
1384  else if(itemType == InputSource::IsSynchronize) {
1385  //For now, we don't have to do anything
1386  }
1387  // This should be impossible
1388  else {
1390  << "Unknown next item type passed to EventProcessor\n"
1391  << "Please report this error to the Framework group\n";
1392  }
1393  if(machine->terminated()) {
1394  break;
1395  }
1396  } // End of loop over state machine events
1397  }); // convertException::wrap
1398  } // Try block
1399  // Some comments on exception handling related to the boost state machine:
1400  //
1401  // Some states used in the machine are special because they
1402  // perform actions while the machine is being terminated, actions
1403  // such as close files, call endRun, call endLumi etc ... Each of these
1404  // states has two functions that perform these actions. The functions
1405  // are almost identical. The major difference is that one version
1406  // catches all exceptions and the other lets exceptions pass through.
1407  // The destructor catches them and the other function named "exit" lets
1408  // them pass through. On a normal termination, boost will always call
1409  // "exit" and then the state destructor. In our state classes, the
1410  // the destructors do nothing if the exit function already took
1411  // care of things. Here's the interesting part. When boost is
1412  // handling an exception the "exit" function is not called (a boost
1413  // feature).
1414  //
1415  // If an exception occurs while the boost machine is in control
1416  // (which usually means inside a process_event call), then
1417  // the boost state machine destroys its states and "terminates" itself.
1418  // This already done before we hit the catch blocks below. In this case
1419  // the call to terminateMachine below only destroys an already
1420  // terminated state machine. Because exit is not called, the state destructors
1421  // handle cleaning up lumis, runs, and files. The destructors swallow
1422  // all exceptions and only pass through the exceptions messages, which
1423  // are tacked onto the original exception below.
1424  //
1425  // If an exception occurs when the boost state machine is not
1426  // in control (outside the process_event functions), then boost
1427  // cannot destroy its own states. The terminateMachine function
1428  // below takes care of that. The flag "alreadyHandlingException"
1429  // is set true so that the state exit functions do nothing (and
1430  // cannot throw more exceptions while handling the first). Then the
1431  // state destructors take care of this because exit did nothing.
1432  //
1433  // In both cases above, the EventProcessor::endOfLoop function is
1434  // not called because it can throw exceptions.
1435  //
1436  // One tricky aspect of the state machine is that things that can
1437  // throw should not be invoked by the state machine while another
1438  // exception is being handled.
1439  // Another tricky aspect is that it appears to be important to
1440  // terminate the state machine before invoking its destructor.
1441  // We've seen crashes that are not understood when that is not
1442  // done. Maintainers of this code should be careful about this.
1443 
1444  catch (cms::Exception & e) {
1445  alreadyHandlingException_ = true;
1446  terminateMachine(machine);
1447  alreadyHandlingException_ = false;
1448  if (!exceptionMessageLumis_.empty()) {
1449  e.addAdditionalInfo(exceptionMessageLumis_);
1450  if (e.alreadyPrinted()) {
1451  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1452  }
1453  }
1454  if (!exceptionMessageRuns_.empty()) {
1455  e.addAdditionalInfo(exceptionMessageRuns_);
1456  if (e.alreadyPrinted()) {
1457  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1458  }
1459  }
1460  if (!exceptionMessageFiles_.empty()) {
1461  e.addAdditionalInfo(exceptionMessageFiles_);
1462  if (e.alreadyPrinted()) {
1463  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1464  }
1465  }
1466  throw;
1467  }
1468 
1469  if(machine->terminated()) {
1470  FDEBUG(1) << "The state machine reports it has been terminated\n";
1471  machine.reset();
1472  }
1473 
1474  if(stateMachineWasInErrorState_) {
1475  throw cms::Exception("BadState")
1476  << "The boost state machine in the EventProcessor exited after\n"
1477  << "entering the Error state.\n";
1478  }
1479 
1480  }
1481  if(machine.get() != 0) {
1482  terminateMachine(machine);
1484  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1485  << "Please report this error to the Framework group\n";
1486  }
1487 
1488  return returnCode;
1489  }
1490 
1491  void EventProcessor::readFile() {
1492  FDEBUG(1) << " \treadFile\n";
1493  size_t size = preg_->size();
1494  SendSourceTerminationSignalIfException sentry(actReg_.get());
1495 
1496  fb_ = input_->readFile();
1497  if(size < preg_->size()) {
1498  principalCache_.adjustIndexesAfterProductRegistryAddition();
1499  }
1500  principalCache_.adjustEventsToNewProductRegistry(preg());
1501  if((numberOfForkedChildren_ > 0) or
1502  (preallocations_.numberOfStreams()>1 and
1503  preallocations_.numberOfThreads()>1)) {
1504  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1505  }
1506  sentry.completedSuccessfully();
1507  }
1508 
1509  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1510  if (fb_.get() != nullptr) {
1511  SendSourceTerminationSignalIfException sentry(actReg_.get());
1512  input_->closeFile(fb_.get(), cleaningUpAfterException);
1513  sentry.completedSuccessfully();
1514  }
1515  FDEBUG(1) << "\tcloseInputFile\n";
1516  }
1517 
1518  void EventProcessor::openOutputFiles() {
1519  if (fb_.get() != nullptr) {
1520  schedule_->openOutputFiles(*fb_);
1521  if(hasSubProcesses()) {
1522  for(auto& subProcess : *subProcesses_) {
1523  subProcess.openOutputFiles(*fb_);
1524  }
1525  }
1526  }
1527  FDEBUG(1) << "\topenOutputFiles\n";
1528  }
1529 
1530  void EventProcessor::closeOutputFiles() {
1531  if (fb_.get() != nullptr) {
1532  schedule_->closeOutputFiles();
1533  if(hasSubProcesses()) {
1534  for(auto& subProcess : *subProcesses_) {
1535  subProcess.closeOutputFiles();
1536  }
1537  }
1538  }
1539  FDEBUG(1) << "\tcloseOutputFiles\n";
1540  }
1541 
1542  void EventProcessor::respondToOpenInputFile() {
1543  if(hasSubProcesses()) {
1544  for(auto& subProcess : *subProcesses_) {
1545  subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1546  }
1547  }
1548  if (fb_.get() != nullptr) {
1549  schedule_->respondToOpenInputFile(*fb_);
1550  if(hasSubProcesses()) {
1551  for(auto& subProcess : *subProcesses_) {
1552  subProcess.respondToOpenInputFile(*fb_);
1553  }
1554  }
1555  }
1556  FDEBUG(1) << "\trespondToOpenInputFile\n";
1557  }
1558 
1559  void EventProcessor::respondToCloseInputFile() {
1560  if (fb_.get() != nullptr) {
1561  schedule_->respondToCloseInputFile(*fb_);
1562  if(hasSubProcesses()) {
1563  for(auto& subProcess : *subProcesses_) {
1564  subProcess.respondToCloseInputFile(*fb_);
1565  }
1566  }
1567  }
1568  FDEBUG(1) << "\trespondToCloseInputFile\n";
1569  }
1570 
1571  void EventProcessor::startingNewLoop() {
1572  shouldWeStop_ = false;
1573  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1574  // until after we've called beginOfJob
1575  if(looper_ && looperBeginJobRun_) {
1576  looper_->doStartingNewLoop();
1577  }
1578  FDEBUG(1) << "\tstartingNewLoop\n";
1579  }
1580 
1581  bool EventProcessor::endOfLoop() {
1582  if(looper_) {
1583  ModuleChanger changer(schedule_.get(),preg_.get());
1584  looper_->setModuleChanger(&changer);
1585  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1586  looper_->setModuleChanger(nullptr);
1587  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1588  else return false;
1589  }
1590  FDEBUG(1) << "\tendOfLoop\n";
1591  return true;
1592  }
1593 
1594  void EventProcessor::rewindInput() {
1595  input_->repeat();
1596  input_->rewind();
1597  FDEBUG(1) << "\trewind\n";
1598  }
1599 
1600  void EventProcessor::prepareForNextLoop() {
1601  looper_->prepareForNextLoop(esp_.get());
1602  FDEBUG(1) << "\tprepareForNextLoop\n";
1603  }
1604 
1605  bool EventProcessor::shouldWeCloseOutput() const {
1606  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1607  if(hasSubProcesses()) {
1608  for(auto const& subProcess : *subProcesses_) {
1609  if(subProcess.shouldWeCloseOutput()) {
1610  return true;
1611  }
1612  }
1613  return false;
1614  }
1615  return schedule_->shouldWeCloseOutput();
1616  }
1617 
1618  void EventProcessor::doErrorStuff() {
1619  FDEBUG(1) << "\tdoErrorStuff\n";
1620  LogError("StateMachine")
1621  << "The EventProcessor state machine encountered an unexpected event\n"
1622  << "and went to the error state\n"
1623  << "Will attempt to terminate processing normally\n"
1624  << "(IF using the looper the next loop will be attempted)\n"
1625  << "This likely indicates a bug in an input module or corrupted input or both\n";
1626  stateMachineWasInErrorState_ = true;
1627  }
1628 
1629  void EventProcessor::beginRun(statemachine::Run const& run) {
1630  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1631  {
1632  SendSourceTerminationSignalIfException sentry(actReg_.get());
1633 
1634  input_->doBeginRun(runPrincipal, &processContext_);
1635  sentry.completedSuccessfully();
1636  }
1637 
1638  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1639  runPrincipal.beginTime());
1640  if(forceESCacheClearOnNewRun_){
1641  espController_->forceCacheClear();
1642  }
1643  {
1644  SendSourceTerminationSignalIfException sentry(actReg_.get());
1645  espController_->eventSetupForInstance(ts);
1646  sentry.completedSuccessfully();
1647  }
1648  EventSetup const& es = esp_->eventSetup();
1649  if(looper_ && looperBeginJobRun_== false) {
1650  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1651  looper_->beginOfJob(es);
1652  looperBeginJobRun_ = true;
1653  looper_->doStartingNewLoop();
1654  }
1655  {
1657  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1658  if(hasSubProcesses()) {
1659  for(auto& subProcess : *subProcesses_) {
1660  subProcess.doBeginRun(runPrincipal, ts);
1661  }
1662  }
1663  }
1664  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1665  if(looper_) {
1666  looper_->doBeginRun(runPrincipal, es, &processContext_);
1667  }
1668  {
1670  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1671  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1672  if(hasSubProcesses()) {
1673  for(auto& subProcess : *subProcesses_) {
1674  subProcess.doStreamBeginRun(i, runPrincipal, ts);
1675  }
1676  }
1677  }
1678  }
1679  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1680  if(looper_) {
1681  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1682  }
1683  }
1684 
1685  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1686  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1687  {
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689 
1690  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1691  sentry.completedSuccessfully();
1692  }
1693 
1694  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1695  runPrincipal.endTime());
1696  {
1697  SendSourceTerminationSignalIfException sentry(actReg_.get());
1698  espController_->eventSetupForInstance(ts);
1699  sentry.completedSuccessfully();
1700  }
1701  EventSetup const& es = esp_->eventSetup();
1702  {
1703  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1705  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1706  if(hasSubProcesses()) {
1707  for(auto& subProcess : *subProcesses_) {
1708  subProcess.doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1709  }
1710  }
1711  }
1712  }
1713  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1714  if(looper_) {
1715  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1716  }
1717  {
1719  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1720  if(hasSubProcesses()) {
1721  for(auto& subProcess : *subProcesses_) {
1722  subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException);
1723  }
1724  }
1725  }
1726  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1727  if(looper_) {
1728  looper_->doEndRun(runPrincipal, es, &processContext_);
1729  }
1730  }
1731 
1732  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1733  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1734  {
1735  SendSourceTerminationSignalIfException sentry(actReg_.get());
1736 
1737  input_->doBeginLumi(lumiPrincipal, &processContext_);
1738  sentry.completedSuccessfully();
1739  }
1740 
1742  if(rng.isAvailable()) {
1743  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1744  rng->preBeginLumi(lb);
1745  }
1746 
1747  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1748  // lumi blocks know their start and end times why not also start and end events?
1749  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1750  {
1751  SendSourceTerminationSignalIfException sentry(actReg_.get());
1752  espController_->eventSetupForInstance(ts);
1753  sentry.completedSuccessfully();
1754  }
1755  EventSetup const& es = esp_->eventSetup();
1756  {
1758  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1759  if(hasSubProcesses()) {
1760  for(auto& subProcess : *subProcesses_) {
1761  subProcess.doBeginLuminosityBlock(lumiPrincipal, ts);
1762  }
1763  }
1764  }
1765  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1766  if(looper_) {
1767  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1768  }
1769  {
1770  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1772  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1773  if(hasSubProcesses()) {
1774  for(auto& subProcess : *subProcesses_) {
1775  subProcess.doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1776  }
1777  }
1778  }
1779  }
1780  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1781  if(looper_) {
1782  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1783  }
1784  }
1785 
1786  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1787  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1788  {
1789  SendSourceTerminationSignalIfException sentry(actReg_.get());
1790 
1791  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1792  sentry.completedSuccessfully();
1793  }
1794  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1795  // lumi blocks know their start and end times why not also start and end events?
1796  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1797  lumiPrincipal.endTime());
1798  {
1799  SendSourceTerminationSignalIfException sentry(actReg_.get());
1800  espController_->eventSetupForInstance(ts);
1801  sentry.completedSuccessfully();
1802  }
1803  EventSetup const& es = esp_->eventSetup();
1804  {
1805  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1807  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1808  if(hasSubProcesses()) {
1809  for(auto& subProcess : *subProcesses_) {
1810  subProcess.doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1811  }
1812  }
1813  }
1814  }
1815  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1816  if(looper_) {
1817  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1818  }
1819  {
1821  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1822  if(hasSubProcesses()) {
1823  for(auto& subProcess : *subProcesses_) {
1824  subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1825  }
1826  }
1827  }
1828  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1829  if(looper_) {
1830  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1831  }
1832  }
1833 
1834  statemachine::Run EventProcessor::readRun() {
1835  if (principalCache_.hasRunPrincipal()) {
1837  << "EventProcessor::readRun\n"
1838  << "Illegal attempt to insert run into cache\n"
1839  << "Contact a Framework Developer\n";
1840  }
1841  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1842  {
1843  SendSourceTerminationSignalIfException sentry(actReg_.get());
1844  input_->readRun(*rp, *historyAppender_);
1845  sentry.completedSuccessfully();
1846  }
1847  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1848  principalCache_.insert(rp);
1849  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1850  }
1851 
1852  statemachine::Run EventProcessor::readAndMergeRun() {
1853  principalCache_.merge(input_->runAuxiliary(), preg());
1854  auto runPrincipal =principalCache_.runPrincipalPtr();
1855  {
1856  SendSourceTerminationSignalIfException sentry(actReg_.get());
1857  input_->readAndMergeRun(*runPrincipal);
1858  sentry.completedSuccessfully();
1859  }
1860  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1861  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1862  }
1863 
1864  int EventProcessor::readLuminosityBlock() {
1865  if (principalCache_.hasLumiPrincipal()) {
1867  << "EventProcessor::readRun\n"
1868  << "Illegal attempt to insert lumi into cache\n"
1869  << "Contact a Framework Developer\n";
1870  }
1871  if (!principalCache_.hasRunPrincipal()) {
1873  << "EventProcessor::readRun\n"
1874  << "Illegal attempt to insert lumi into cache\n"
1875  << "Run is invalid\n"
1876  << "Contact a Framework Developer\n";
1877  }
1878  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1879  {
1880  SendSourceTerminationSignalIfException sentry(actReg_.get());
1881  input_->readLuminosityBlock(*lbp, *historyAppender_);
1882  sentry.completedSuccessfully();
1883  }
1884  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1885  principalCache_.insert(lbp);
1886  return input_->luminosityBlock();
1887  }
1888 
1889  int EventProcessor::readAndMergeLumi() {
1890  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1891  {
1892  SendSourceTerminationSignalIfException sentry(actReg_.get());
1893  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1894  sentry.completedSuccessfully();
1895  }
1896  return input_->luminosityBlock();
1897  }
1898 
1899  void EventProcessor::writeRun(statemachine::Run const& run) {
1900  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1901  if(hasSubProcesses()) {
1902  for(auto& subProcess : *subProcesses_) {
1903  subProcess.writeRun(run.processHistoryID(), run.runNumber());
1904  }
1905  }
1906  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1907  }
1908 
1909  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
1910  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1911  if(hasSubProcesses()) {
1912  for(auto& subProcess : *subProcesses_) {
1913  subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber());
1914  }
1915  }
1916  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1917  }
1918 
1919  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1920  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1921  if(hasSubProcesses()) {
1922  for(auto& subProcess : *subProcesses_) {
1923  subProcess.writeLumi(phid, run, lumi);
1924  }
1925  }
1926  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1927  }
1928 
1929  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1930  principalCache_.deleteLumi(phid, run, lumi);
1931  if(hasSubProcesses()) {
1932  for(auto& subProcess : *subProcesses_) {
1933  subProcess.deleteLumiFromCache(phid, run, lumi);
1934  }
1935  }
1936  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1937  }
1938 
1939  class StreamProcessingTask : public tbb::task {
1940  public:
1942  unsigned int iStreamIndex,
1943  std::atomic<bool>* iFinishedProcessingEvents,
1944  tbb::task* iWaitTask):
1945  m_proc(iProc),
1946  m_streamID(iStreamIndex),
1947  m_finishedProcessingEvents(iFinishedProcessingEvents),
1948  m_waitTask(iWaitTask){}
1949 
1950  tbb::task* execute() {
1951  m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1952  m_waitTask->decrement_ref_count();
1953  return nullptr;
1954  }
1955  private:
1957  unsigned int m_streamID;
1960  };
1961 
1962  void EventProcessor::processEventsForStreamAsync(unsigned int iStreamIndex,
1963  std::atomic<bool>* finishedProcessingEvents) {
1964  try {
1965  // make the services available
1966  ServiceRegistry::Operate operate(serviceToken_);
1967  if(preallocations_.numberOfStreams()>1) {
1969  handler->initializeThisThreadForUse();
1970  }
1971 
1972  if(iStreamIndex==0) {
1973  processEvent(0);
1974  }
1975  do {
1976  if(shouldWeStop()) {
1977  break;
1978  }
1979  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1980  //another thread hit an exception
1981  //std::cerr<<"another thread saw an exception\n";
1982  break;
1983  }
1984  {
1985 
1986 
1987  {
1988  //nextItemType and readEvent need to be in same critical section
1989  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1990 
1991  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1992  //std::cerr<<"finishedProcessingEvents\n";
1993  break;
1994  }
1995 
1996  //If source and DelayedReader share a resource we must serialize them
1997  auto sr = input_->resourceSharedWithDelayedReader();
1998  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1999  if(sr) {
2000  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
2001  }
2002  InputSource::ItemType itemType = input_->nextItemType();
2003  if (InputSource::IsEvent !=itemType) {
2004  nextItemTypeFromProcessingEvents_ = itemType;
2005  finishedProcessingEvents->store(true,std::memory_order_release);
2006  //std::cerr<<"next item type "<<itemType<<"\n";
2007  break;
2008  }
2009  if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
2010  //std::cerr<<"task told to async stop\n";
2011  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
2012  break;
2013  }
2014  readEvent(iStreamIndex);
2015  }
2016  }
2017  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
2018  //another thread hit an exception
2019  //std::cerr<<"another thread saw an exception\n";
2020  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
2021 
2022  break;
2023  }
2024  processEvent(iStreamIndex);
2025  }while(true);
2026  } catch (...) {
2027  bool expected =false;
2028  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2029  deferredExceptionPtr_ = std::current_exception();
2030  }
2031  //std::cerr<<"task caught exception\n";
2032  }
2033  }
2034 
2035  void EventProcessor::readAndProcessEvent() {
2036  if(numberOfForkedChildren_>0) {
2037  readEvent(0);
2038  processEvent(0);
2039  return;
2040  }
2041  nextItemTypeFromProcessingEvents_ = InputSource::IsEvent; //needed for looper
2042  asyncStopRequestedWhileProcessingEvents_ = false;
2043 
2044  std::atomic<bool> finishedProcessingEvents{false};
2045 
2046  //Task assumes Stream 0 has already read the event that caused us to go here
2047  readEvent(0);
2048 
2049  //To wait, the ref count has to b 1+#streams
2050  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
2051  eventLoopWaitTask->increment_ref_count();
2052 
2053  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2054  unsigned int iStreamIndex = 0;
2055  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2056  eventLoopWaitTask->increment_ref_count();
2057  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
2058 
2059  }
2060  eventLoopWaitTask->increment_ref_count();
2061  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
2062  tbb::task::destroy(*eventLoopWaitTask);
2063 
2064  //One of the processing threads saw an exception
2065  if(deferredExceptionPtrIsSet_) {
2066  std::rethrow_exception(deferredExceptionPtr_);
2067  }
2068  }
2069  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2070  //TODO this will have to become per stream
2071  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2072  StreamContext streamContext(event.streamID(), &processContext_);
2073 
2074  SendSourceTerminationSignalIfException sentry(actReg_.get());
2075  input_->readEvent(event, streamContext);
2076  sentry.completedSuccessfully();
2077 
2078  FDEBUG(1) << "\treadEvent\n";
2079  }
2080  void EventProcessor::processEvent(unsigned int iStreamIndex) {
2081  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2082  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2084  if(rng.isAvailable()) {
2085  Event ev(*pep, ModuleDescription(), nullptr);
2086  rng->postEventRead(ev);
2087  }
2088  assert(pep->luminosityBlockPrincipalPtrValid());
2089  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2090  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2091 
2092  //We can only update IOVs on Lumi boundaries
2093  //IOVSyncValue ts(pep->id(), pep->time());
2094  //espController_->eventSetupForInstance(ts);
2095  EventSetup const& es = esp_->eventSetup();
2096  {
2098  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2099  if(hasSubProcesses()) {
2100  for(auto& subProcess : *subProcesses_) {
2101  subProcess.doEvent(*pep);
2102  }
2103  }
2104  }
2105 
2106  //NOTE: If we have a looper we only have one Stream
2107  if(looper_) {
2108  bool randomAccess = input_->randomAccess();
2109  ProcessingController::ForwardState forwardState = input_->forwardState();
2110  ProcessingController::ReverseState reverseState = input_->reverseState();
2111  ProcessingController pc(forwardState, reverseState, randomAccess);
2112 
2113  EDLooperBase::Status status = EDLooperBase::kContinue;
2114  do {
2115 
2116  StreamContext streamContext(pep->streamID(), &processContext_);
2117  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2118 
2119  bool succeeded = true;
2120  if(randomAccess) {
2121  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2122  input_->skipEvents(-2);
2123  }
2124  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2125  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2126  }
2127  }
2128  pc.setLastOperationSucceeded(succeeded);
2129  } while(!pc.lastOperationSucceeded());
2130  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2131 
2132  }
2133 
2134  FDEBUG(1) << "\tprocessEvent\n";
2135  pep->clearEventPrincipal();
2136  }
2137 
2138  bool EventProcessor::shouldWeStop() const {
2139  FDEBUG(1) << "\tshouldWeStop\n";
2140  if(shouldWeStop_) return true;
2141  if(hasSubProcesses()) {
2142  for(auto const& subProcess : *subProcesses_) {
2143  if(subProcess.terminate()) {
2144  return true;
2145  }
2146  }
2147  return false;
2148  }
2149  return schedule_->terminate();
2150  }
2151 
2152  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2153  exceptionMessageFiles_ = message;
2154  }
2155 
2156  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2157  exceptionMessageRuns_ = message;
2158  }
2159 
2160  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2161  exceptionMessageLumis_ = message;
2162  }
2163 
2164  bool EventProcessor::alreadyHandlingException() const {
2165  return alreadyHandlingException_;
2166  }
2167 
2168  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2169  if(iMachine.get() != 0) {
2170  if(!iMachine->terminated()) {
2171  forceLooperToEnd_ = true;
2172  iMachine->process_event(statemachine::Stop());
2173  forceLooperToEnd_ = false;
2174  }
2175  else {
2176  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2177  }
2178  if(iMachine->terminated()) {
2179  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2180  }
2181  iMachine.reset();
2182  }
2183  }
2184 }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:66
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< tbb::task * > m_waitTask
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< EventProcessor * > m_proc
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:70
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:61
ParameterSetID id() const
dispatcher processEvent(e, inputTag, standby)
tuple lumi
Definition: fjr2json.py:35
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(m_qm.get())
Timestamp const & endTime() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
processConfiguration
Definition: Schedule.cc:374
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
std::unique_ptr< std::vector< ParameterSet > > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:640
bool ev
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< std::atomic< bool > * > m_finishedProcessingEvents
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool alreadyPrinted() const
Definition: Exception.cc:251
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:91
static std::string const input
Definition: EdmProvDump.cc:44
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
static InputSourceFactory const * get()
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::shared_ptr< edm::ParameterSet > parameterSet()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:510
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:102
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:40
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasSubProcesses() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
areg
Definition: Schedule.cc:374
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:63
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::RunNumber_t runNumber() const
Definition: EPStates.h:50
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
static ComponentFactory< T > const * get()
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:57
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:49
PathsAndConsumesOfModules pathsAndConsumesOfModules_
unsigned int RunNumber_t
#define O_NONBLOCK
Definition: SysFile.h:21
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:59
preg
Definition: Schedule.cc:374
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
tuple status
Definition: mps_update.py:57
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31