CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
9 
34 
36 
44 
49 
59 
60 #include "MessageForSource.h"
61 #include "MessageForParent.h"
62 
63 #include "boost/thread/xtime.hpp"
64 
65 #include <exception>
66 #include <iomanip>
67 #include <iostream>
68 #include <utility>
69 #include <sstream>
70 
71 #include <sys/ipc.h>
72 #include <sys/msg.h>
73 
74 #include "tbb/task.h"
75 
76 //Used for forking
77 #include <sys/types.h>
78 #include <sys/wait.h>
79 #include <sys/socket.h>
80 #include <sys/select.h>
81 #include <sys/fcntl.h>
82 #include <unistd.h>
83 
84 //Used for CPU affinity
85 #ifndef __APPLE__
86 #include <sched.h>
87 #endif
88 
89 //Needed for introspection
90 #include "Cintex/Cintex.h"
91 
92 
93 namespace {
94  //Sentry class to only send a signal if an
95  // exception occurs. An exception is identified
96  // by the destructor being called without first
97  // calling completedSuccessfully().
98  class SendSourceTerminationSignalIfException {
99  public:
100  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
101  reg_(iReg) {}
102  ~SendSourceTerminationSignalIfException() {
103  if(reg_) {
104  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
105  }
106  }
107  void completedSuccessfully() {
108  reg_ = nullptr;
109  }
110  private:
111  edm::ActivityRegistry* reg_;
112 
113  };
114 }
115 
116 namespace edm {
117 
118  // ---------------------------------------------------------------
119  std::unique_ptr<InputSource>
121  CommonParams const& common,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125  std::shared_ptr<ActivityRegistry> areg,
126  std::shared_ptr<ProcessConfiguration const> processConfiguration,
127  PreallocationConfiguration const& allocations) {
128  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
129  if(main_input == 0) {
131  << "There must be exactly one source in the configuration.\n"
132  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
133  }
134 
135  std::string modtype(main_input->getParameter<std::string>("@module_type"));
136 
137  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
139  ConfigurationDescriptions descriptions(filler->baseType());
140  filler->fill(descriptions);
141 
142  try {
143  convertException::wrap([&]() {
144  descriptions.validate(*main_input, std::string("source"));
145  });
146  }
147  catch (cms::Exception & iException) {
148  std::ostringstream ost;
149  ost << "Validating configuration of input source of type " << modtype;
150  iException.addContext(ost.str());
151  throw;
152  }
153 
154  main_input->registerIt();
155 
156  // Fill in "ModuleDescription", in case the input source produces
157  // any EDProducts, which would be registered in the ProductRegistry.
158  // Also fill in the process history item for this process.
159  // There is no module label for the unnamed input source, so
160  // just use "source".
161  // Only the tracked parameters belong in the process configuration.
162  ModuleDescription md(main_input->id(),
163  main_input->getParameter<std::string>("@module_type"),
164  "source",
165  processConfiguration.get(),
167 
168  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
169  common.maxEventsInput_, common.maxLumisInput_,
170  common.maxSecondsUntilRampdown_, allocations);
171 
172  areg->preSourceConstructionSignal_(md);
173  std::unique_ptr<InputSource> input;
174  try {
175  //even if we have an exception, send the signal
176  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
177  convertException::wrap([&]() {
178  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
179  });
180  }
181  catch (cms::Exception& iException) {
182  std::ostringstream ost;
183  ost << "Constructing input source of type " << modtype;
184  iException.addContext(ost.str());
185  throw;
186  }
187  return input;
188  }
189 
190  // ---------------------------------------------------------------
191  boost::shared_ptr<EDLooperBase>
194  ParameterSet& params) {
195  boost::shared_ptr<EDLooperBase> vLooper;
196 
197  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
198 
199  if(loopers.size() == 0) {
200  return vLooper;
201  }
202 
203  assert(1 == loopers.size());
204 
205  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
206  itName != itNameEnd;
207  ++itName) {
208 
209  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
210  providerPSet->registerIt();
211  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
212  cp,
213  *providerPSet);
214  }
215  return vLooper;
216 
217  }
218 
219  // ---------------------------------------------------------------
221  ServiceToken const& iToken,
223  std::vector<std::string> const& defaultServices,
224  std::vector<std::string> const& forcedServices) :
225  actReg_(),
226  preg_(),
227  branchIDListHelper_(),
228  serviceToken_(),
229  input_(),
230  espController_(new eventsetup::EventSetupsController),
231  esp_(),
232  act_table_(),
233  processConfiguration_(),
234  schedule_(),
235  subProcess_(),
236  historyAppender_(new HistoryAppender),
237  fb_(),
238  looper_(),
239  deferredExceptionPtrIsSet_(false),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  stateMachineWasInErrorState_(false),
244  fileMode_(),
245  emptyRunLumiMode_(),
246  exceptionMessageFiles_(),
247  exceptionMessageRuns_(),
248  exceptionMessageLumis_(),
249  alreadyHandlingException_(false),
250  forceLooperToEnd_(false),
251  looperBeginJobRun_(false),
252  forceESCacheClearOnNewRun_(false),
253  numberOfForkedChildren_(0),
254  numberOfSequentialEventsPerChild_(1),
255  setCpuAffinity_(false),
256  eventSetupDataToExcludeFromPrefetching_() {
257  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
258  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
259  processDesc->addServices(defaultServices, forcedServices);
260  init(processDesc, iToken, iLegacy);
261  }
262 
264  std::vector<std::string> const& defaultServices,
265  std::vector<std::string> const& forcedServices) :
266  actReg_(),
267  preg_(),
268  branchIDListHelper_(),
269  serviceToken_(),
270  input_(),
271  espController_(new eventsetup::EventSetupsController),
272  esp_(),
273  act_table_(),
274  processConfiguration_(),
275  schedule_(),
276  subProcess_(),
277  historyAppender_(new HistoryAppender),
278  fb_(),
279  looper_(),
280  deferredExceptionPtrIsSet_(false),
281  principalCache_(),
282  beginJobCalled_(false),
283  shouldWeStop_(false),
284  stateMachineWasInErrorState_(false),
285  fileMode_(),
286  emptyRunLumiMode_(),
287  exceptionMessageFiles_(),
288  exceptionMessageRuns_(),
289  exceptionMessageLumis_(),
290  alreadyHandlingException_(false),
291  forceLooperToEnd_(false),
292  looperBeginJobRun_(false),
293  forceESCacheClearOnNewRun_(false),
294  numberOfForkedChildren_(0),
295  numberOfSequentialEventsPerChild_(1),
296  setCpuAffinity_(false),
297  asyncStopRequestedWhileProcessingEvents_(false),
298  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
299  eventSetupDataToExcludeFromPrefetching_()
300  {
301  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
302  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
303  processDesc->addServices(defaultServices, forcedServices);
305  }
306 
307  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc>& processDesc,
308  ServiceToken const& token,
310  actReg_(),
311  preg_(),
312  branchIDListHelper_(),
313  serviceToken_(),
314  input_(),
315  espController_(new eventsetup::EventSetupsController),
316  esp_(),
317  act_table_(),
318  processConfiguration_(),
319  schedule_(),
320  subProcess_(),
321  historyAppender_(new HistoryAppender),
322  fb_(),
323  looper_(),
324  deferredExceptionPtrIsSet_(false),
325  principalCache_(),
326  beginJobCalled_(false),
327  shouldWeStop_(false),
328  stateMachineWasInErrorState_(false),
329  fileMode_(),
330  emptyRunLumiMode_(),
331  exceptionMessageFiles_(),
332  exceptionMessageRuns_(),
333  exceptionMessageLumis_(),
334  alreadyHandlingException_(false),
335  forceLooperToEnd_(false),
336  looperBeginJobRun_(false),
337  forceESCacheClearOnNewRun_(false),
338  numberOfForkedChildren_(0),
339  numberOfSequentialEventsPerChild_(1),
340  setCpuAffinity_(false),
341  asyncStopRequestedWhileProcessingEvents_(false),
342  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
343  eventSetupDataToExcludeFromPrefetching_()
344  {
345  init(processDesc, token, legacy);
346  }
347 
348 
350  actReg_(),
351  preg_(),
352  branchIDListHelper_(),
353  serviceToken_(),
354  input_(),
355  espController_(new eventsetup::EventSetupsController),
356  esp_(),
357  act_table_(),
358  processConfiguration_(),
359  schedule_(),
360  subProcess_(),
361  historyAppender_(new HistoryAppender),
362  fb_(),
363  looper_(),
364  deferredExceptionPtrIsSet_(false),
365  principalCache_(),
366  beginJobCalled_(false),
367  shouldWeStop_(false),
368  stateMachineWasInErrorState_(false),
369  fileMode_(),
370  emptyRunLumiMode_(),
371  exceptionMessageFiles_(),
372  exceptionMessageRuns_(),
373  exceptionMessageLumis_(),
374  alreadyHandlingException_(false),
375  forceLooperToEnd_(false),
376  looperBeginJobRun_(false),
377  forceESCacheClearOnNewRun_(false),
378  numberOfForkedChildren_(0),
379  numberOfSequentialEventsPerChild_(1),
380  setCpuAffinity_(false),
381  asyncStopRequestedWhileProcessingEvents_(false),
382  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
383  eventSetupDataToExcludeFromPrefetching_()
384 {
385  if(isPython) {
386  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
387  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
389  }
390  else {
391  auto processDesc = std::make_shared<ProcessDesc>(config);
393  }
394  }
395 
396  void
397  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
398  ServiceToken const& iToken,
400 
401  //std::cerr << processDesc->dump() << std::endl;
402 
403  ROOT::Cintex::Cintex::Enable();
404 
405  // register the empty parentage vector , once and for all
407 
408  // register the empty parameter set, once and for all.
410 
411  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
412  //std::cerr << parameterSet->dump() << std::endl;
413 
414  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
415  std::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
416 
417  // Now set some parameters specific to the main process.
418  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
419  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
420  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
421  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
422  //threading
423  unsigned int nThreads=1;
424  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
425  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
426  if(nThreads == 0) {
427  nThreads = 1;
428  }
429  }
430  /* TODO: when we support having each stream run in a different thread use this default
431  unsigned int nStreams =nThreads;
432  */
433  unsigned int nStreams =1;
434  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
435  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
436  if(nStreams==0) {
437  nStreams = nThreads;
438  }
439  }
440  /*
441  bool nRunsSet = false;
442  */
443  unsigned int nConcurrentRuns =1;
444  /*
445  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
446  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
447  }
448  */
449  unsigned int nConcurrentLumis =1;
450  /*
451  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
452  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
453  } else {
454  nConcurrentLumis = nConcurrentRuns;
455  }
456  */
457  //Check that relationships between threading parameters makes sense
458  /*
459  if(nThreads<nStreams) {
460  //bad
461  }
462  if(nConcurrentRuns>nStreams) {
463  //bad
464  }
465  if(nConcurrentRuns>nConcurrentLumis) {
466  //bad
467  }
468  */
469  //forking
470  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
471  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
472  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
473  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
474  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
475  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
476  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
477  itPS != itPSEnd;
478  ++itPS) {
479  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
480  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
481  itPS->getUntrackedParameter<std::string>("label", "")));
482  }
483  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
484 
485  // Now do general initialization
486  ScheduleItems items;
487 
488  //initialize the services
489  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
490  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
491  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
492 
493  //make the services available
495 
496  if(nStreams>1) {
498  handler->willBeUsingThreads();
499  }
500 
501  // intialize miscellaneous items
502  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
503 
504  // intialize the event setup provider
505  esp_ = espController_->makeProvider(*parameterSet);
506 
507  // initialize the looper, if any
508  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
509  if(looper_) {
510  looper_->setActionTable(items.act_table_.get());
511  looper_->attachTo(*items.actReg_);
512 
513  //For now loopers make us run only 1 transition at a time
514  nStreams=1;
515  nConcurrentLumis=1;
516  nConcurrentRuns=1;
517  }
518 
519  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
520 
521  // initialize the input source
522  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.thinnedAssociationsHelper_, items.actReg_, items.processConfiguration_, preallocations_);
523 
524  // intialize the Schedule
525  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
526 
527  // set the data members
528  act_table_ = std::move(items.act_table_);
529  actReg_ = items.actReg_;
530  preg_.reset(items.preg_.release());
535  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
536 
537  FDEBUG(2) << parameterSet << std::endl;
538 
540  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
541  // Reusable event principal
542  auto ep = std::make_shared<EventPrincipal>(preg_, branchIDListHelper_, thinnedAssociationsHelper_, *processConfiguration_, historyAppender_.get(), index);
543  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
544  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
546  }
547  // initialize the subprocess, if there is one
548  if(subProcessParameterSet) {
549  subProcess_.reset(new SubProcess(*subProcessParameterSet,
550  *parameterSet,
551  preg_,
555  *actReg_,
556  token,
559  &processContext_));
560  }
561  }
562 
564  // Make the services available while everything is being deleted.
565  ServiceToken token = getToken();
566  ServiceRegistry::Operate op(token);
567 
568  // manually destroy all these thing that may need the services around
569  espController_.reset();
570  subProcess_.reset();
571  esp_.reset();
572  schedule_.reset();
573  input_.reset();
574  looper_.reset();
575  actReg_.reset();
576 
579  }
580 
581  void
583  if(beginJobCalled_) return;
584  beginJobCalled_=true;
585  bk::beginJob();
586 
587  // StateSentry toerror(this); // should we add this ?
588  //make the services available
590 
595  actReg_->preallocateSignal_(bounds);
596  //NOTE: This implementation assumes 'Job' means one call
597  // the EventProcessor::run
598  // If it really means once per 'application' then this code will
599  // have to be changed.
600  // Also have to deal with case where have 'run' then new Module
601  // added and do 'run'
602  // again. In that case the newly added Module needs its 'beginJob'
603  // to be called.
604 
605  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
606  // For now we delay calling beginOfJob until first beginOfRun
607  //if(looper_) {
608  // looper_->beginOfJob(es);
609  //}
610  try {
611  convertException::wrap([&]() {
612  input_->doBeginJob();
613  });
614  }
615  catch(cms::Exception& ex) {
616  ex.addContext("Calling beginJob for the source");
617  throw;
618  }
619  schedule_->beginJob(*preg_);
620  // toerror.succeeded(); // should we add this?
621  if(hasSubProcess()) subProcess_->doBeginJob();
622  actReg_->postBeginJobSignal_();
623 
624  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
625  schedule_->beginStream(i);
626  if(hasSubProcess()) subProcess_->doBeginStream(i);
627  }
628  }
629 
630  void
632  // Collects exceptions, so we don't throw before all operations are performed.
633  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
634 
635  //make the services available
637 
638  //NOTE: this really should go elsewhere in the future
639  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
640  c.call([this,i](){this->schedule_->endStream(i);});
641  if(hasSubProcess()) {
642  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
643  }
644  }
645  schedule_->endJob(c);
646  if(hasSubProcess()) {
647  c.call(std::bind(&SubProcess::doEndJob, subProcess_.get()));
648  }
649  c.call(std::bind(&InputSource::doEndJob, input_.get()));
650  if(looper_) {
651  c.call(std::bind(&EDLooperBase::endOfJob, looper_));
652  }
653  auto actReg = actReg_.get();
654  c.call([actReg](){actReg->postEndJobSignal_();});
655  if(c.hasThrown()) {
656  c.rethrow();
657  }
658  }
659 
662  return serviceToken_;
663  }
664 
665  //Setup signal handler to listen for when forked children stop
666  namespace {
667  //These are volatile since the compiler can not be allowed to optimize them
668  // since they can be modified in the signaller handler
669  volatile bool child_failed = false;
670  volatile unsigned int num_children_done = 0;
671  volatile int child_fail_exit_status = 0;
672  volatile int child_fail_signal = 0;
673 
674  //NOTE: We setup the signal handler to run in the main thread which
675  // is also the same thread that then reads the above values
676 
677  extern "C" {
678  void ep_sigchld(int, siginfo_t*, void*) {
679  //printf("in sigchld\n");
680  //FDEBUG(1) << "in sigchld handler\n";
681  int stat_loc;
682  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
683  while(0<p) {
684  //printf(" looping\n");
685  if(WIFEXITED(stat_loc)) {
686  ++num_children_done;
687  if(0 != WEXITSTATUS(stat_loc)) {
688  child_fail_exit_status = WEXITSTATUS(stat_loc);
689  child_failed = true;
690  }
691  }
692  if(WIFSIGNALED(stat_loc)) {
693  ++num_children_done;
694  child_fail_signal = WTERMSIG(stat_loc);
695  child_failed = true;
696  }
697  p = waitpid(-1, &stat_loc, WNOHANG);
698  }
699  }
700  }
701 
702  }
703 
704  enum {
709  };
710 
711  namespace {
712  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
713  unsigned int n = 0;
714  while(numberOfChildren != 0) {
715  ++n;
716  numberOfChildren /= 10;
717  }
718  if(n == 0) {
719  n = 3; // Protect against zero numberOfChildren
720  }
721  return n;
722  }
723 
724  /*This class embodied the thread which is used to listen to the forked children and
725  then tell them which events they should process */
726  class MessageSenderToSource {
727  public:
728  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
729  void operator()();
730 
731  private:
732  const std::vector<int>& m_childrenPipes;
733  long const m_nEventsToProcess;
734  fd_set m_socketSet;
735  unsigned int m_aliveChildren;
736  int m_maxFd;
737  };
738 
739  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
740  std::vector<int> const& childrenPipes,
741  long iNEventsToProcess):
742  m_childrenPipes(childrenPipes),
743  m_nEventsToProcess(iNEventsToProcess),
744  m_aliveChildren(childrenSockets.size()),
745  m_maxFd(0)
746  {
747  FD_ZERO(&m_socketSet);
748  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
749  it != itEnd; it++) {
750  FD_SET(*it, &m_socketSet);
751  if (*it > m_maxFd) {
752  m_maxFd = *it;
753  }
754  }
755  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
756  it != itEnd; ++it) {
757  FD_SET(*it, &m_socketSet);
758  if (*it > m_maxFd) {
759  m_maxFd = *it;
760  }
761  }
762  m_maxFd++; // select reads [0,m_maxFd).
763  }
764 
765  /* This function is the heart of the communication between parent and child.
766  * When ready for more data, the child (see MessageReceiverForSource) requests
767  * data through a AF_UNIX socket message. The parent will then assign the next
768  * chunk of data by sending a message back.
769  *
770  * Additionally, this function also monitors the read-side of the pipe fd from the child.
771  * If the child dies unexpectedly, the pipe will be selected as ready for read and
772  * will return EPIPE when read from. Further, if the child thinks the parent has died
773  * (defined as waiting more than 1s for a response), it will write a single byte to
774  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
775  * If still alive, the parent will read the byte and ignore it.
776  *
777  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
778  * handler can distinguish between success and failure cases.
779  */
780 
781  void
782  MessageSenderToSource::operator()() {
783  multicore::MessageForParent childMsg;
784  LogInfo("ForkingController") << "I am controller";
785  //this is the master and therefore the controller
786 
787  multicore::MessageForSource sndmsg;
788  sndmsg.startIndex = 0;
789  sndmsg.nIndices = m_nEventsToProcess;
790  do {
791 
792  fd_set readSockets, errorSockets;
793  // Wait for a request from a child for events.
794  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
795  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
796  // Note that we don't timeout; may be reconsidered in the future.
797  ssize_t rc;
798  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
799  if (rc < 0) {
800  std::cerr << "select failed; should be impossible due to preconditions.\n";
801  abort();
802  break;
803  }
804 
805  // Read the message from the child.
806  for (int idx=0; idx<m_maxFd; idx++) {
807 
808  // Handle errors
809  if (FD_ISSET(idx, &errorSockets)) {
810  LogInfo("ForkingController") << "Error on socket " << idx;
811  FD_CLR(idx, &m_socketSet);
812  close(idx);
813  // See if it was the watchdog pipe that died.
814  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
815  if (*it == idx) {
816  m_aliveChildren--;
817  }
818  }
819  continue;
820  }
821 
822  if (!FD_ISSET(idx, &readSockets)) {
823  continue;
824  }
825 
826  // See if this FD is a child watchdog pipe. If so, read from it to prevent
827  // writes from blocking.
828  bool is_pipe = false;
829  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
830  if (*it == idx) {
831  is_pipe = true;
832  char buf;
833  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
834  if (rc <= 0) {
835  m_aliveChildren--;
836  FD_CLR(idx, &m_socketSet);
837  close(idx);
838  }
839  }
840  }
841 
842  // Only execute this block if the FD is a socket for sending the child work.
843  if (!is_pipe) {
844  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
845  if (rc < 0) {
846  FD_CLR(idx, &m_socketSet);
847  close(idx);
848  continue;
849  }
850 
851  // Tell the child what events to process.
852  // If 'send' fails, then the child process has failed (any other possibilities are
853  // eliminated because we are using fixed-size messages with Unix datagram sockets).
854  // Thus, the SIGCHLD handler will fire and set child_fail = true.
855  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
856  if (rc < 0) {
857  FD_CLR(idx, &m_socketSet);
858  close(idx);
859  continue;
860  }
861  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
862  sndmsg.startIndex += sndmsg.nIndices;
863  }
864  }
865 
866  } while (m_aliveChildren > 0);
867 
868  return;
869  }
870 
871  }
872 
873 
874  void EventProcessor::possiblyContinueAfterForkChildFailure() {
875  if(child_failed && continueAfterChildFailure_) {
876  if (child_fail_signal) {
877  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
878  child_fail_signal=0;
879  } else if (child_fail_exit_status) {
880  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
881  child_fail_exit_status=0;
882  } else {
883  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
884  }
885  child_failed =false;
886  }
887  }
888 
889  bool
890  EventProcessor::forkProcess(std::string const& jobReportFile) {
891 
892  if(0 == numberOfForkedChildren_) {return true;}
893  assert(0<numberOfForkedChildren_);
894  //do what we want done in common
895  {
896  beginJob(); //make sure this was run
897  // make the services available
898  ServiceRegistry::Operate operate(serviceToken_);
899 
900  InputSource::ItemType itemType;
901  itemType = input_->nextItemType();
902 
903  assert(itemType == InputSource::IsFile);
904  {
905  readFile();
906  }
907  itemType = input_->nextItemType();
908  assert(itemType == InputSource::IsRun);
909 
910  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
911  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
912  input_->runAuxiliary()->beginTime());
913  espController_->eventSetupForInstance(ts);
914  EventSetup const& es = esp_->eventSetup();
915 
916  //now get all the data available in the EventSetup
917  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
918  es.fillAvailableRecordKeys(recordKeys);
919  std::vector<eventsetup::DataKey> dataKeys;
920  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
921  itKey != itEnd;
922  ++itKey) {
923  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
924  //see if this is on our exclusion list
925  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
926  ExcludedData const* excludedData(nullptr);
927  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
928  excludedData = &(itExcludeRec->second);
929  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
930  //skip all items in this record
931  continue;
932  }
933  }
934  if(0 != recordPtr) {
935  dataKeys.clear();
936  recordPtr->fillRegisteredDataKeys(dataKeys);
937  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
938  itDataKey != itDataKeyEnd;
939  ++itDataKey) {
940  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
941  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
942  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
943  continue;
944  }
945  try {
946  recordPtr->doGet(*itDataKey);
947  } catch(cms::Exception& e) {
948  LogWarning("ForkingEventSetupPreFetching") << e.what();
949  }
950  }
951  }
952  }
953  }
954  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
955  {
956  // make the services available
957  ServiceRegistry::Operate operate(serviceToken_);
958  Service<JobReport> jobReport;
959  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
960 
961  //Now actually do the forking
962  actReg_->preForkReleaseResourcesSignal_();
963  input_->doPreForkReleaseResources();
964  schedule_->preForkReleaseResources();
965  }
966  installCustomHandler(SIGCHLD, ep_sigchld);
967 
968 
969  unsigned int childIndex = 0;
970  unsigned int const kMaxChildren = numberOfForkedChildren_;
971  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
972  std::vector<pid_t> childrenIds;
973  childrenIds.reserve(kMaxChildren);
974  std::vector<int> childrenSockets;
975  childrenSockets.reserve(kMaxChildren);
976  std::vector<int> childrenPipes;
977  childrenPipes.reserve(kMaxChildren);
978  std::vector<int> childrenSocketsCopy;
979  childrenSocketsCopy.reserve(kMaxChildren);
980  std::vector<int> childrenPipesCopy;
981  childrenPipesCopy.reserve(kMaxChildren);
982  int pipes[] {0, 0};
983 
984  {
985  // make the services available
986  ServiceRegistry::Operate operate(serviceToken_);
987  Service<JobReport> jobReport;
988  int sockets[2], fd_flags;
989  for(; childIndex < kMaxChildren; ++childIndex) {
990  // Create a UNIX_DGRAM socket pair
991  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
992  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
993  exit(EXIT_FAILURE);
994  }
995  if (pipe(pipes)) {
996  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
997  exit(EXIT_FAILURE);
998  }
999  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1000  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1001  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1002  exit(EXIT_FAILURE);
1003  }
1004  // Mark socket as non-block. Child must be careful to do select prior
1005  // to reading from socket.
1006  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1007  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1008  exit(EXIT_FAILURE);
1009  }
1010  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1011  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1012  exit(EXIT_FAILURE);
1013  }
1014  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1015  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1016  exit(EXIT_FAILURE);
1017  }
1018  // Linux man page notes there are some edge cases where reading from a
1019  // fd can block, even after a select.
1020  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1021  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1022  exit(EXIT_FAILURE);
1023  }
1024  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1025  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1026  exit(EXIT_FAILURE);
1027  }
1028 
1029  childrenPipesCopy = childrenPipes;
1030  childrenSocketsCopy = childrenSockets;
1031 
1032  pid_t value = fork();
1033  if(value == 0) {
1034  // Close the parent's side of the socket and pipe which will talk to us.
1035  close(pipes[0]);
1036  close(sockets[0]);
1037  // Close our copies of the parent's other communication pipes.
1038  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1039  close(*it);
1040  }
1041  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1042  close(*it);
1043  }
1044 
1045  // this is the child process, redirect stdout and stderr to a log file
1046  fflush(stdout);
1047  fflush(stderr);
1048  std::stringstream stout;
1049  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1050  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1051  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1052  }
1053  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1054  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1055  }
1056 
1057  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1058  if(setCpuAffinity_) {
1059  // CPU affinity is handled differently on macosx.
1060  // We disable it and print a message until someone reads:
1061  //
1062  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1063  //
1064  // and implements it.
1065 #ifdef __APPLE__
1066  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1067 #else
1068  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1069  cpu_set_t mask;
1070  CPU_ZERO(&mask);
1071  CPU_SET(childIndex, &mask);
1072  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1073  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1074  exit(-1);
1075  }
1076 #endif
1077  }
1078  break;
1079  } else {
1080  //this is the parent
1081  close(pipes[1]);
1082  close(sockets[1]);
1083  }
1084  if(value < 0) {
1085  LogError("ForkingChild") << "failed to create a child";
1086  exit(-1);
1087  }
1088  childrenIds.push_back(value);
1089  childrenSockets.push_back(sockets[0]);
1090  childrenPipes.push_back(pipes[0]);
1091  }
1092 
1093  if(childIndex < kMaxChildren) {
1094  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1095  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1096 
1097  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1098  input_->doPostForkReacquireResources(receiver);
1099  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1100  //NOTE: sources have to reset themselves by listening to the post fork message
1101  //rewindInput();
1102  return true;
1103  }
1104  jobReport->parentAfterFork(jobReportFile);
1105  }
1106 
1107  //this is the original, which is now the master for all the children
1108 
1109  //Need to wait for signals from the children or externally
1110  // To wait we must
1111  // 1) block the signals we want to wait on so we do not have a race condition
1112  // 2) check that we haven't already meet our ending criteria
1113  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1114  sigset_t blockingSigSet;
1115  sigset_t unblockingSigSet;
1116  sigset_t oldSigSet;
1117  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1118  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1119  sigaddset(&blockingSigSet, SIGCHLD);
1120  sigaddset(&blockingSigSet, SIGUSR2);
1121  sigaddset(&blockingSigSet, SIGINT);
1122  sigdelset(&unblockingSigSet, SIGCHLD);
1123  sigdelset(&unblockingSigSet, SIGUSR2);
1124  sigdelset(&unblockingSigSet, SIGINT);
1125  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1126 
1127  // If there are too many fd's (unlikely, but possible) for select, denote this
1128  // because the sender will fail.
1129  bool too_many_fds = false;
1130  if (pipes[1]+1 > FD_SETSIZE) {
1131  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1132  too_many_fds = true;
1133  }
1134 
1135  //create a thread that sends the units of work to workers
1136  // we create it after all signals were blocked so that this
1137  // thread is never interupted by a signal
1138  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1139  boost::thread senderThread(sender);
1140 
1141  if(not too_many_fds) {
1142  //NOTE: a child could have failed before we got here and even after this call
1143  // which is why the 'if' is conditional on continueAfterChildFailure_
1144  possiblyContinueAfterForkChildFailure();
1145  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1146  sigsuspend(&unblockingSigSet);
1147  possiblyContinueAfterForkChildFailure();
1148  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1149  }
1150  }
1151  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1152 
1153  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1154  if(child_failed) {
1155  LogError("ForkingStopping") << "child failed";
1156  }
1157  if(shutdown_flag) {
1158  LogSystem("ForkingStopping") << "asked to shutdown";
1159  }
1160 
1161  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1162  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1163  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1164  it != itEnd; ++it) {
1165  /* int result = */ kill(*it, SIGUSR2);
1166  }
1167  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1168  while(num_children_done != kMaxChildren) {
1169  sigsuspend(&unblockingSigSet);
1170  }
1171  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1172  }
1173  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1174  senderThread.join();
1175  if(child_failed && !continueAfterChildFailure_) {
1176  if (child_fail_signal) {
1177  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1178  } else if (child_fail_exit_status) {
1179  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1180  } else {
1181  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1182  }
1183  }
1184  if(too_many_fds) {
1185  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1186  }
1187  return false;
1188  }
1189 
1190  std::vector<ModuleDescription const*>
1191  EventProcessor::getAllModuleDescriptions() const {
1192  return schedule_->getAllModuleDescriptions();
1193  }
1194 
1195  int
1196  EventProcessor::totalEvents() const {
1197  return schedule_->totalEvents();
1198  }
1199 
1200  int
1201  EventProcessor::totalEventsPassed() const {
1202  return schedule_->totalEventsPassed();
1203  }
1204 
1205  int
1206  EventProcessor::totalEventsFailed() const {
1207  return schedule_->totalEventsFailed();
1208  }
1209 
1210  void
1211  EventProcessor::enableEndPaths(bool active) {
1212  schedule_->enableEndPaths(active);
1213  }
1214 
1215  bool
1216  EventProcessor::endPathsEnabled() const {
1217  return schedule_->endPathsEnabled();
1218  }
1219 
1220  void
1221  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1222  schedule_->getTriggerReport(rep);
1223  }
1224 
1225  void
1226  EventProcessor::clearCounters() {
1227  schedule_->clearCounters();
1228  }
1229 
1230 
1231  std::auto_ptr<statemachine::Machine>
1232  EventProcessor::createStateMachine() {
1234  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1235  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1236  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1237  else {
1238  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1239  << fileMode_ << ".\n"
1240  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1241  }
1242 
1243  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1244  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1245  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1246  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1247  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1248  else {
1249  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1250  << emptyRunLumiMode_ << ".\n"
1251  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1252  }
1253 
1254  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1255  fileMode,
1256  emptyRunLumiMode));
1257 
1258  machine->initiate();
1259  return machine;
1260  }
1261 
1262  bool
1263  EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
1264  bool returnValue = false;
1265 
1266  // Look for a shutdown signal
1267  if(shutdown_flag.load(std::memory_order_acquire)) {
1268  returnValue = true;
1269  returnCode = epSignal;
1270  }
1271  return returnValue;
1272  }
1273 
1274 
1276  EventProcessor::runToCompletion() {
1277 
1278  StatusCode returnCode=epSuccess;
1279  asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1280  std::auto_ptr<statemachine::Machine> machine;
1281  {
1282  beginJob(); //make sure this was called
1283 
1284  //StatusCode returnCode = epSuccess;
1285  stateMachineWasInErrorState_ = false;
1286 
1287  // make the services available
1288  ServiceRegistry::Operate operate(serviceToken_);
1289 
1290  machine = createStateMachine();
1291  nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1292  asyncStopRequestedWhileProcessingEvents_=false;
1293  try {
1294  convertException::wrap([&]() {
1295 
1296  InputSource::ItemType itemType;
1297 
1298  while(true) {
1299 
1300  bool more = true;
1301  if(numberOfForkedChildren_ > 0) {
1302  size_t size = preg_->size();
1303  {
1304  SendSourceTerminationSignalIfException sentry(actReg_.get());
1305  more = input_->skipForForking();
1306  sentry.completedSuccessfully();
1307  }
1308  if(more) {
1309  if(size < preg_->size()) {
1310  principalCache_.adjustIndexesAfterProductRegistryAddition();
1311  }
1312  principalCache_.adjustEventsToNewProductRegistry(preg_);
1313  }
1314  }
1315  {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1318  sentry.completedSuccessfully();
1319  }
1320 
1321  FDEBUG(1) << "itemType = " << itemType << "\n";
1322 
1323  if(checkForAsyncStopRequest(returnCode)) {
1324  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1325  forceLooperToEnd_ = true;
1326  machine->process_event(statemachine::Stop());
1327  forceLooperToEnd_ = false;
1328  break;
1329  }
1330 
1331  if(itemType == InputSource::IsEvent) {
1332  machine->process_event(statemachine::Event());
1333  if(asyncStopRequestedWhileProcessingEvents_) {
1334  forceLooperToEnd_ = true;
1335  machine->process_event(statemachine::Stop());
1336  forceLooperToEnd_ = false;
1337  returnCode = asyncStopStatusCodeFromProcessingEvents_;
1338  break;
1339  }
1340  itemType = nextItemTypeFromProcessingEvents_;
1341  }
1342 
1343  if(itemType == InputSource::IsEvent) {
1344  }
1345  else if(itemType == InputSource::IsStop) {
1346  machine->process_event(statemachine::Stop());
1347  }
1348  else if(itemType == InputSource::IsFile) {
1349  machine->process_event(statemachine::File());
1350  }
1351  else if(itemType == InputSource::IsRun) {
1352  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1353  }
1354  else if(itemType == InputSource::IsLumi) {
1355  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1356  }
1357  else if(itemType == InputSource::IsSynchronize) {
1358  //For now, we don't have to do anything
1359  }
1360  // This should be impossible
1361  else {
1363  << "Unknown next item type passed to EventProcessor\n"
1364  << "Please report this error to the Framework group\n";
1365  }
1366  if(machine->terminated()) {
1367  break;
1368  }
1369  } // End of loop over state machine events
1370  }); // convertException::wrap
1371  } // Try block
1372  // Some comments on exception handling related to the boost state machine:
1373  //
1374  // Some states used in the machine are special because they
1375  // perform actions while the machine is being terminated, actions
1376  // such as close files, call endRun, call endLumi etc ... Each of these
1377  // states has two functions that perform these actions. The functions
1378  // are almost identical. The major difference is that one version
1379  // catches all exceptions and the other lets exceptions pass through.
1380  // The destructor catches them and the other function named "exit" lets
1381  // them pass through. On a normal termination, boost will always call
1382  // "exit" and then the state destructor. In our state classes, the
1383  // the destructors do nothing if the exit function already took
1384  // care of things. Here's the interesting part. When boost is
1385  // handling an exception the "exit" function is not called (a boost
1386  // feature).
1387  //
1388  // If an exception occurs while the boost machine is in control
1389  // (which usually means inside a process_event call), then
1390  // the boost state machine destroys its states and "terminates" itself.
1391  // This already done before we hit the catch blocks below. In this case
1392  // the call to terminateMachine below only destroys an already
1393  // terminated state machine. Because exit is not called, the state destructors
1394  // handle cleaning up lumis, runs, and files. The destructors swallow
1395  // all exceptions and only pass through the exceptions messages, which
1396  // are tacked onto the original exception below.
1397  //
1398  // If an exception occurs when the boost state machine is not
1399  // in control (outside the process_event functions), then boost
1400  // cannot destroy its own states. The terminateMachine function
1401  // below takes care of that. The flag "alreadyHandlingException"
1402  // is set true so that the state exit functions do nothing (and
1403  // cannot throw more exceptions while handling the first). Then the
1404  // state destructors take care of this because exit did nothing.
1405  //
1406  // In both cases above, the EventProcessor::endOfLoop function is
1407  // not called because it can throw exceptions.
1408  //
1409  // One tricky aspect of the state machine is that things that can
1410  // throw should not be invoked by the state machine while another
1411  // exception is being handled.
1412  // Another tricky aspect is that it appears to be important to
1413  // terminate the state machine before invoking its destructor.
1414  // We've seen crashes that are not understood when that is not
1415  // done. Maintainers of this code should be careful about this.
1416 
1417  catch (cms::Exception & e) {
1418  alreadyHandlingException_ = true;
1419  terminateMachine(machine);
1420  alreadyHandlingException_ = false;
1421  if (!exceptionMessageLumis_.empty()) {
1422  e.addAdditionalInfo(exceptionMessageLumis_);
1423  if (e.alreadyPrinted()) {
1424  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1425  }
1426  }
1427  if (!exceptionMessageRuns_.empty()) {
1428  e.addAdditionalInfo(exceptionMessageRuns_);
1429  if (e.alreadyPrinted()) {
1430  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1431  }
1432  }
1433  if (!exceptionMessageFiles_.empty()) {
1434  e.addAdditionalInfo(exceptionMessageFiles_);
1435  if (e.alreadyPrinted()) {
1436  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1437  }
1438  }
1439  throw;
1440  }
1441 
1442  if(machine->terminated()) {
1443  FDEBUG(1) << "The state machine reports it has been terminated\n";
1444  machine.reset();
1445  }
1446 
1447  if(stateMachineWasInErrorState_) {
1448  throw cms::Exception("BadState")
1449  << "The boost state machine in the EventProcessor exited after\n"
1450  << "entering the Error state.\n";
1451  }
1452 
1453  }
1454  if(machine.get() != 0) {
1455  terminateMachine(machine);
1457  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1458  << "Please report this error to the Framework group\n";
1459  }
1460 
1461  return returnCode;
1462  }
1463 
1464  void EventProcessor::readFile() {
1465  FDEBUG(1) << " \treadFile\n";
1466  size_t size = preg_->size();
1467  SendSourceTerminationSignalIfException sentry(actReg_.get());
1468 
1469  fb_ = input_->readFile();
1470  if(size < preg_->size()) {
1471  principalCache_.adjustIndexesAfterProductRegistryAddition();
1472  }
1473  principalCache_.adjustEventsToNewProductRegistry(preg_);
1474  if((numberOfForkedChildren_ > 0) or
1475  (preallocations_.numberOfStreams()>1 and
1476  preallocations_.numberOfThreads()>1)) {
1477  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1478  }
1479  sentry.completedSuccessfully();
1480  }
1481 
1482  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1483  if (fb_.get() != nullptr) {
1484  SendSourceTerminationSignalIfException sentry(actReg_.get());
1485  input_->closeFile(fb_.get(), cleaningUpAfterException);
1486  sentry.completedSuccessfully();
1487  }
1488  FDEBUG(1) << "\tcloseInputFile\n";
1489  }
1490 
1491  void EventProcessor::openOutputFiles() {
1492  if (fb_.get() != nullptr) {
1493  schedule_->openOutputFiles(*fb_);
1494  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1495  }
1496  FDEBUG(1) << "\topenOutputFiles\n";
1497  }
1498 
1499  void EventProcessor::closeOutputFiles() {
1500  if (fb_.get() != nullptr) {
1501  schedule_->closeOutputFiles();
1502  if(hasSubProcess()) subProcess_->closeOutputFiles();
1503  }
1504  FDEBUG(1) << "\tcloseOutputFiles\n";
1505  }
1506 
1507  void EventProcessor::respondToOpenInputFile() {
1508  if(hasSubProcess()) {
1509  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1510  }
1511  if (fb_.get() != nullptr) {
1512  schedule_->respondToOpenInputFile(*fb_);
1513  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1514  }
1515  FDEBUG(1) << "\trespondToOpenInputFile\n";
1516  }
1517 
1518  void EventProcessor::respondToCloseInputFile() {
1519  if (fb_.get() != nullptr) {
1520  schedule_->respondToCloseInputFile(*fb_);
1521  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1522  }
1523  FDEBUG(1) << "\trespondToCloseInputFile\n";
1524  }
1525 
1526  void EventProcessor::startingNewLoop() {
1527  shouldWeStop_ = false;
1528  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1529  // until after we've called beginOfJob
1530  if(looper_ && looperBeginJobRun_) {
1531  looper_->doStartingNewLoop();
1532  }
1533  FDEBUG(1) << "\tstartingNewLoop\n";
1534  }
1535 
1536  bool EventProcessor::endOfLoop() {
1537  if(looper_) {
1538  ModuleChanger changer(schedule_.get(),preg_.get());
1539  looper_->setModuleChanger(&changer);
1540  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1541  looper_->setModuleChanger(nullptr);
1542  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1543  else return false;
1544  }
1545  FDEBUG(1) << "\tendOfLoop\n";
1546  return true;
1547  }
1548 
1549  void EventProcessor::rewindInput() {
1550  input_->repeat();
1551  input_->rewind();
1552  FDEBUG(1) << "\trewind\n";
1553  }
1554 
1555  void EventProcessor::prepareForNextLoop() {
1556  looper_->prepareForNextLoop(esp_.get());
1557  FDEBUG(1) << "\tprepareForNextLoop\n";
1558  }
1559 
1560  bool EventProcessor::shouldWeCloseOutput() const {
1561  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1562  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1563  }
1564 
1565  void EventProcessor::doErrorStuff() {
1566  FDEBUG(1) << "\tdoErrorStuff\n";
1567  LogError("StateMachine")
1568  << "The EventProcessor state machine encountered an unexpected event\n"
1569  << "and went to the error state\n"
1570  << "Will attempt to terminate processing normally\n"
1571  << "(IF using the looper the next loop will be attempted)\n"
1572  << "This likely indicates a bug in an input module or corrupted input or both\n";
1573  stateMachineWasInErrorState_ = true;
1574  }
1575 
1576  void EventProcessor::beginRun(statemachine::Run const& run) {
1577  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1578  {
1579  SendSourceTerminationSignalIfException sentry(actReg_.get());
1580 
1581  input_->doBeginRun(runPrincipal, &processContext_);
1582  sentry.completedSuccessfully();
1583  }
1584 
1585  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1586  runPrincipal.beginTime());
1587  if(forceESCacheClearOnNewRun_){
1588  espController_->forceCacheClear();
1589  }
1590  {
1591  SendSourceTerminationSignalIfException sentry(actReg_.get());
1592  espController_->eventSetupForInstance(ts);
1593  sentry.completedSuccessfully();
1594  }
1595  EventSetup const& es = esp_->eventSetup();
1596  if(looper_ && looperBeginJobRun_== false) {
1597  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1598  looper_->beginOfJob(es);
1599  looperBeginJobRun_ = true;
1600  looper_->doStartingNewLoop();
1601  }
1602  {
1604  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1605  if(hasSubProcess()) {
1606  subProcess_->doBeginRun(runPrincipal, ts);
1607  }
1608  }
1609  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1610  if(looper_) {
1611  looper_->doBeginRun(runPrincipal, es, &processContext_);
1612  }
1613  {
1615  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1616  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1617  if(hasSubProcess()) {
1618  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1619  }
1620  }
1621  }
1622  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1623  if(looper_) {
1624  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1625  }
1626  }
1627 
1628  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1629  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1630  {
1631  SendSourceTerminationSignalIfException sentry(actReg_.get());
1632 
1633  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1634  sentry.completedSuccessfully();
1635  }
1636 
1637  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1638  runPrincipal.endTime());
1639  {
1640  SendSourceTerminationSignalIfException sentry(actReg_.get());
1641  espController_->eventSetupForInstance(ts);
1642  sentry.completedSuccessfully();
1643  }
1644  EventSetup const& es = esp_->eventSetup();
1645  {
1646  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1648  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1649  if(hasSubProcess()) {
1650  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1651  }
1652  }
1653  }
1654  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1655  if(looper_) {
1656  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1657  }
1658  {
1660  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1661  if(hasSubProcess()) {
1662  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1663  }
1664  }
1665  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1666  if(looper_) {
1667  looper_->doEndRun(runPrincipal, es, &processContext_);
1668  }
1669  }
1670 
1671  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1672  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1673  {
1674  SendSourceTerminationSignalIfException sentry(actReg_.get());
1675 
1676  input_->doBeginLumi(lumiPrincipal, &processContext_);
1677  sentry.completedSuccessfully();
1678  }
1679 
1681  if(rng.isAvailable()) {
1682  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1683  rng->preBeginLumi(lb);
1684  }
1685 
1686  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1687  // lumi blocks know their start and end times why not also start and end events?
1688  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1689  {
1690  SendSourceTerminationSignalIfException sentry(actReg_.get());
1691  espController_->eventSetupForInstance(ts);
1692  sentry.completedSuccessfully();
1693  }
1694  EventSetup const& es = esp_->eventSetup();
1695  {
1697  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1698  if(hasSubProcess()) {
1699  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1700  }
1701  }
1702  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1703  if(looper_) {
1704  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1705  }
1706  {
1707  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1709  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1710  if(hasSubProcess()) {
1711  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1712  }
1713  }
1714  }
1715  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1716  if(looper_) {
1717  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1718  }
1719  }
1720 
1721  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1722  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1723  {
1724  SendSourceTerminationSignalIfException sentry(actReg_.get());
1725 
1726  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1727  sentry.completedSuccessfully();
1728  }
1729  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1730  // lumi blocks know their start and end times why not also start and end events?
1731  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1732  lumiPrincipal.endTime());
1733  {
1734  SendSourceTerminationSignalIfException sentry(actReg_.get());
1735  espController_->eventSetupForInstance(ts);
1736  sentry.completedSuccessfully();
1737  }
1738  EventSetup const& es = esp_->eventSetup();
1739  {
1740  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1742  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1743  if(hasSubProcess()) {
1744  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1745  }
1746  }
1747  }
1748  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1749  if(looper_) {
1750  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1751  }
1752  {
1754  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1755  if(hasSubProcess()) {
1756  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1757  }
1758  }
1759  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1760  if(looper_) {
1761  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1762  }
1763  }
1764 
1765  statemachine::Run EventProcessor::readRun() {
1766  if (principalCache_.hasRunPrincipal()) {
1768  << "EventProcessor::readRun\n"
1769  << "Illegal attempt to insert run into cache\n"
1770  << "Contact a Framework Developer\n";
1771  }
1772  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1773  {
1774  SendSourceTerminationSignalIfException sentry(actReg_.get());
1775  input_->readRun(*rp, *historyAppender_);
1776  sentry.completedSuccessfully();
1777  }
1778  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1779  principalCache_.insert(rp);
1780  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1781  }
1782 
1783  statemachine::Run EventProcessor::readAndMergeRun() {
1784  principalCache_.merge(input_->runAuxiliary(), preg_);
1785  auto runPrincipal =principalCache_.runPrincipalPtr();
1786  {
1787  SendSourceTerminationSignalIfException sentry(actReg_.get());
1788  input_->readAndMergeRun(*runPrincipal);
1789  sentry.completedSuccessfully();
1790  }
1791  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1792  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1793  }
1794 
1795  int EventProcessor::readLuminosityBlock() {
1796  if (principalCache_.hasLumiPrincipal()) {
1798  << "EventProcessor::readRun\n"
1799  << "Illegal attempt to insert lumi into cache\n"
1800  << "Contact a Framework Developer\n";
1801  }
1802  if (!principalCache_.hasRunPrincipal()) {
1804  << "EventProcessor::readRun\n"
1805  << "Illegal attempt to insert lumi into cache\n"
1806  << "Run is invalid\n"
1807  << "Contact a Framework Developer\n";
1808  }
1809  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1810  {
1811  SendSourceTerminationSignalIfException sentry(actReg_.get());
1812  input_->readLuminosityBlock(*lbp, *historyAppender_);
1813  sentry.completedSuccessfully();
1814  }
1815  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1816  principalCache_.insert(lbp);
1817  return input_->luminosityBlock();
1818  }
1819 
1820  int EventProcessor::readAndMergeLumi() {
1821  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1822  {
1823  SendSourceTerminationSignalIfException sentry(actReg_.get());
1824  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1825  sentry.completedSuccessfully();
1826  }
1827  return input_->luminosityBlock();
1828  }
1829 
1830  void EventProcessor::writeRun(statemachine::Run const& run) {
1831  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1832  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1833  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1834  }
1835 
1836  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
1837  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1838  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1839  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1840  }
1841 
1842  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1843  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1844  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1845  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1846  }
1847 
1848  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1849  principalCache_.deleteLumi(phid, run, lumi);
1850  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1851  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1852  }
1853 
1854  class StreamProcessingTask : public tbb::task {
1855  public:
1857  unsigned int iStreamIndex,
1858  std::atomic<bool>* iFinishedProcessingEvents,
1859  tbb::task* iWaitTask):
1860  m_proc(iProc),
1861  m_streamID(iStreamIndex),
1862  m_finishedProcessingEvents(iFinishedProcessingEvents),
1863  m_waitTask(iWaitTask){}
1864 
1865  tbb::task* execute() {
1866  m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1867  m_waitTask->decrement_ref_count();
1868  return nullptr;
1869  }
1870  private:
1872  unsigned int m_streamID;
1873  std::atomic<bool>* m_finishedProcessingEvents;
1874  tbb::task* m_waitTask;
1875  };
1876 
1877  void EventProcessor::processEventsForStreamAsync(unsigned int iStreamIndex,
1878  std::atomic<bool>* finishedProcessingEvents) {
1879  try {
1880  // make the services available
1881  ServiceRegistry::Operate operate(serviceToken_);
1882  if(preallocations_.numberOfStreams()>1) {
1884  handler->initializeThisThreadForUse();
1885  }
1886 
1887  if(iStreamIndex==0) {
1888  processEvent(0);
1889  }
1890  do {
1891  if(shouldWeStop()) {
1892  break;
1893  }
1894  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1895  //another thread hit an exception
1896  //std::cerr<<"another thread saw an exception\n";
1897  break;
1898  }
1899  {
1900 
1901 
1902  {
1903  //nextItemType and readEvent need to be in same critical section
1904  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1905 
1906  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1907  //std::cerr<<"finishedProcessingEvents\n";
1908  break;
1909  }
1910 
1911  //If source and DelayedReader share a resource we must serialize them
1912  auto sr = input_->resourceSharedWithDelayedReader();
1913  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1914  if(sr) {
1915  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1916  }
1917  InputSource::ItemType itemType = input_->nextItemType();
1918  if (InputSource::IsEvent !=itemType) {
1919  nextItemTypeFromProcessingEvents_ = itemType;
1920  finishedProcessingEvents->store(true,std::memory_order_release);
1921  //std::cerr<<"next item type "<<itemType<<"\n";
1922  break;
1923  }
1924  if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1925  //std::cerr<<"task told to async stop\n";
1926  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1927  break;
1928  }
1929  readEvent(iStreamIndex);
1930  }
1931  }
1932  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1933  //another thread hit an exception
1934  //std::cerr<<"another thread saw an exception\n";
1935  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1936 
1937  break;
1938  }
1939  processEvent(iStreamIndex);
1940  }while(true);
1941  } catch (...) {
1942  bool expected =false;
1943  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1944  deferredExceptionPtr_ = std::current_exception();
1945  }
1946  //std::cerr<<"task caught exception\n";
1947  }
1948  }
1949 
1950  void EventProcessor::readAndProcessEvent() {
1951  if(numberOfForkedChildren_>0) {
1952  readEvent(0);
1953  processEvent(0);
1954  return;
1955  }
1956  nextItemTypeFromProcessingEvents_ = InputSource::IsEvent; //needed for looper
1957  asyncStopRequestedWhileProcessingEvents_ = false;
1958 
1959  std::atomic<bool> finishedProcessingEvents{false};
1960 
1961  //Task assumes Stream 0 has already read the event that caused us to go here
1962  readEvent(0);
1963 
1964  //To wait, the ref count has to b 1+#streams
1965  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1966  eventLoopWaitTask->increment_ref_count();
1967 
1968  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1969  unsigned int iStreamIndex = 0;
1970  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1971  eventLoopWaitTask->increment_ref_count();
1972  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1973 
1974  }
1975  eventLoopWaitTask->increment_ref_count();
1976  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1977  tbb::task::destroy(*eventLoopWaitTask);
1978 
1979  //One of the processing threads saw an exception
1980  if(deferredExceptionPtrIsSet_) {
1981  std::rethrow_exception(deferredExceptionPtr_);
1982  }
1983  }
1984  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1985  //TODO this will have to become per stream
1986  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1987  StreamContext streamContext(event.streamID(), &processContext_);
1988 
1989  SendSourceTerminationSignalIfException sentry(actReg_.get());
1990  input_->readEvent(event, streamContext);
1991  sentry.completedSuccessfully();
1992 
1993  FDEBUG(1) << "\treadEvent\n";
1994  }
1995  void EventProcessor::processEvent(unsigned int iStreamIndex) {
1996  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1997  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1999  if(rng.isAvailable()) {
2000  Event ev(*pep, ModuleDescription(), nullptr);
2001  rng->postEventRead(ev);
2002  }
2003  assert(pep->luminosityBlockPrincipalPtrValid());
2004  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2005  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2006 
2007  //We can only update IOVs on Lumi boundaries
2008  //IOVSyncValue ts(pep->id(), pep->time());
2009  //espController_->eventSetupForInstance(ts);
2010  EventSetup const& es = esp_->eventSetup();
2011  {
2013  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2014  if(hasSubProcess()) {
2015  subProcess_->doEvent(*pep);
2016  }
2017  }
2018 
2019  //NOTE: If we have a looper we only have one Stream
2020  if(looper_) {
2021  bool randomAccess = input_->randomAccess();
2022  ProcessingController::ForwardState forwardState = input_->forwardState();
2023  ProcessingController::ReverseState reverseState = input_->reverseState();
2024  ProcessingController pc(forwardState, reverseState, randomAccess);
2025 
2026  EDLooperBase::Status status = EDLooperBase::kContinue;
2027  do {
2028 
2029  StreamContext streamContext(pep->streamID(), &processContext_);
2030  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2031 
2032  bool succeeded = true;
2033  if(randomAccess) {
2034  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2035  input_->skipEvents(-2);
2036  }
2037  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2038  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2039  }
2040  }
2041  pc.setLastOperationSucceeded(succeeded);
2042  } while(!pc.lastOperationSucceeded());
2043  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2044 
2045  }
2046 
2047  FDEBUG(1) << "\tprocessEvent\n";
2048  pep->clearEventPrincipal();
2049  }
2050 
2051  bool EventProcessor::shouldWeStop() const {
2052  FDEBUG(1) << "\tshouldWeStop\n";
2053  if(shouldWeStop_) return true;
2054  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2055  }
2056 
2057  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2058  exceptionMessageFiles_ = message;
2059  }
2060 
2061  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2062  exceptionMessageRuns_ = message;
2063  }
2064 
2065  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2066  exceptionMessageLumis_ = message;
2067  }
2068 
2069  bool EventProcessor::alreadyHandlingException() const {
2070  return alreadyHandlingException_;
2071  }
2072 
2073  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2074  if(iMachine.get() != 0) {
2075  if(!iMachine->terminated()) {
2076  forceLooperToEnd_ = true;
2077  iMachine->process_event(statemachine::Stop());
2078  forceLooperToEnd_ = false;
2079  }
2080  else {
2081  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2082  }
2083  if(iMachine->terminated()) {
2084  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2085  }
2086  iMachine.reset();
2087  }
2088  }
2089 }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:56
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:60
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:254
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:560
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
tuple lumi
Definition: fjr2json.py:35
Timestamp const & endTime() const
processConfiguration
Definition: Schedule.cc:370
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
bool ev
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
std::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:61
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
static std::string const input
Definition: EdmProvDump.cc:44
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
static InputSourceFactory const * get()
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::shared_ptr< edm::ParameterSet > parameterSet()
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:44
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::unique_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:57
virtual void endOfJob()
Definition: EDLooperBase.cc:90
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
areg
Definition: Schedule.cc:370
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
Definition: ScheduleItems.h:59
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
Definition: ScheduleItems.h:58
ServiceToken getToken()
edm::RunNumber_t runNumber() const
Definition: EPStates.h:49
static ComponentFactory< T > const * get()
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:48
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
#define O_NONBLOCK
Definition: SysFile.h:21
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
tuple status
Definition: ntuplemaker.py:245
preg
Definition: Schedule.cc:370
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:16
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31