CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
9 
34 
36 
44 
49 
59 
60 #include "MessageForSource.h"
61 #include "MessageForParent.h"
62 
63 #include "boost/thread/xtime.hpp"
64 
65 #include <exception>
66 #include <iomanip>
67 #include <iostream>
68 #include <utility>
69 #include <sstream>
70 
71 #include <sys/ipc.h>
72 #include <sys/msg.h>
73 
74 #include "tbb/task.h"
75 
76 //Used for forking
77 #include <sys/types.h>
78 #include <sys/wait.h>
79 #include <sys/socket.h>
80 #include <sys/select.h>
81 #include <sys/fcntl.h>
82 #include <unistd.h>
83 
84 //Used for CPU affinity
85 #ifndef __APPLE__
86 #include <sched.h>
87 #endif
88 
89 //Needed for introspection
90 #include "Cintex/Cintex.h"
91 
92 
93 namespace {
94  //Sentry class to only send a signal if an
95  // exception occurs. An exception is identified
96  // by the destructor being called without first
97  // calling completedSuccessfully().
98  class SendSourceTerminationSignalIfException {
99  public:
100  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
101  reg_(iReg) {}
102  ~SendSourceTerminationSignalIfException() {
103  if(reg_) {
104  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
105  }
106  }
107  void completedSuccessfully() {
108  reg_ = nullptr;
109  }
110  private:
111  edm::ActivityRegistry* reg_;
112 
113  };
114 }
115 
116 namespace edm {
117 
118  // ---------------------------------------------------------------
119  std::unique_ptr<InputSource>
121  CommonParams const& common,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ActivityRegistry> areg,
125  std::shared_ptr<ProcessConfiguration const> processConfiguration,
126  PreallocationConfiguration const& allocations) {
127  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
128  if(main_input == 0) {
130  << "There must be exactly one source in the configuration.\n"
131  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
132  }
133 
134  std::string modtype(main_input->getParameter<std::string>("@module_type"));
135 
136  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
138  ConfigurationDescriptions descriptions(filler->baseType());
139  filler->fill(descriptions);
140 
141  try {
142  convertException::wrap([&]() {
143  descriptions.validate(*main_input, std::string("source"));
144  });
145  }
146  catch (cms::Exception & iException) {
147  std::ostringstream ost;
148  ost << "Validating configuration of input source of type " << modtype;
149  iException.addContext(ost.str());
150  throw;
151  }
152 
153  main_input->registerIt();
154 
155  // Fill in "ModuleDescription", in case the input source produces
156  // any EDProducts, which would be registered in the ProductRegistry.
157  // Also fill in the process history item for this process.
158  // There is no module label for the unnamed input source, so
159  // just use "source".
160  // Only the tracked parameters belong in the process configuration.
161  ModuleDescription md(main_input->id(),
162  main_input->getParameter<std::string>("@module_type"),
163  "source",
164  processConfiguration.get(),
166 
167  InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_, common.maxSecondsUntilRampdown_, allocations);
168  areg->preSourceConstructionSignal_(md);
169  std::unique_ptr<InputSource> input;
170  try {
171  //even if we have an exception, send the signal
172  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
173  convertException::wrap([&]() {
174  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
175  });
176  }
177  catch (cms::Exception& iException) {
178  std::ostringstream ost;
179  ost << "Constructing input source of type " << modtype;
180  iException.addContext(ost.str());
181  throw;
182  }
183  return input;
184  }
185 
186  // ---------------------------------------------------------------
187  boost::shared_ptr<EDLooperBase>
190  ParameterSet& params) {
191  boost::shared_ptr<EDLooperBase> vLooper;
192 
193  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
194 
195  if(loopers.size() == 0) {
196  return vLooper;
197  }
198 
199  assert(1 == loopers.size());
200 
201  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
202  itName != itNameEnd;
203  ++itName) {
204 
205  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
206  providerPSet->registerIt();
207  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
208  cp,
209  *providerPSet);
210  }
211  return vLooper;
212 
213  }
214 
215  // ---------------------------------------------------------------
217  ServiceToken const& iToken,
219  std::vector<std::string> const& defaultServices,
220  std::vector<std::string> const& forcedServices) :
221  actReg_(),
222  preg_(),
223  branchIDListHelper_(),
224  serviceToken_(),
225  input_(),
226  espController_(new eventsetup::EventSetupsController),
227  esp_(),
228  act_table_(),
229  processConfiguration_(),
230  schedule_(),
231  subProcess_(),
232  historyAppender_(new HistoryAppender),
233  fb_(),
234  looper_(),
235  deferredExceptionPtrIsSet_(false),
236  principalCache_(),
237  beginJobCalled_(false),
238  shouldWeStop_(false),
239  stateMachineWasInErrorState_(false),
240  fileMode_(),
241  emptyRunLumiMode_(),
242  exceptionMessageFiles_(),
243  exceptionMessageRuns_(),
244  exceptionMessageLumis_(),
245  alreadyHandlingException_(false),
246  forceLooperToEnd_(false),
247  looperBeginJobRun_(false),
248  forceESCacheClearOnNewRun_(false),
249  numberOfForkedChildren_(0),
250  numberOfSequentialEventsPerChild_(1),
251  setCpuAffinity_(false),
252  eventSetupDataToExcludeFromPrefetching_() {
253  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
254  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
255  processDesc->addServices(defaultServices, forcedServices);
256  init(processDesc, iToken, iLegacy);
257  }
258 
260  std::vector<std::string> const& defaultServices,
261  std::vector<std::string> const& forcedServices) :
262  actReg_(),
263  preg_(),
264  branchIDListHelper_(),
265  serviceToken_(),
266  input_(),
267  espController_(new eventsetup::EventSetupsController),
268  esp_(),
269  act_table_(),
270  processConfiguration_(),
271  schedule_(),
272  subProcess_(),
273  historyAppender_(new HistoryAppender),
274  fb_(),
275  looper_(),
276  deferredExceptionPtrIsSet_(false),
277  principalCache_(),
278  beginJobCalled_(false),
279  shouldWeStop_(false),
280  stateMachineWasInErrorState_(false),
281  fileMode_(),
282  emptyRunLumiMode_(),
283  exceptionMessageFiles_(),
284  exceptionMessageRuns_(),
285  exceptionMessageLumis_(),
286  alreadyHandlingException_(false),
287  forceLooperToEnd_(false),
288  looperBeginJobRun_(false),
289  forceESCacheClearOnNewRun_(false),
290  numberOfForkedChildren_(0),
291  numberOfSequentialEventsPerChild_(1),
292  setCpuAffinity_(false),
293  asyncStopRequestedWhileProcessingEvents_(false),
294  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
295  eventSetupDataToExcludeFromPrefetching_()
296  {
297  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
298  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
299  processDesc->addServices(defaultServices, forcedServices);
301  }
302 
303  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc>& processDesc,
304  ServiceToken const& token,
306  actReg_(),
307  preg_(),
308  branchIDListHelper_(),
309  serviceToken_(),
310  input_(),
311  espController_(new eventsetup::EventSetupsController),
312  esp_(),
313  act_table_(),
314  processConfiguration_(),
315  schedule_(),
316  subProcess_(),
317  historyAppender_(new HistoryAppender),
318  fb_(),
319  looper_(),
320  deferredExceptionPtrIsSet_(false),
321  principalCache_(),
322  beginJobCalled_(false),
323  shouldWeStop_(false),
324  stateMachineWasInErrorState_(false),
325  fileMode_(),
326  emptyRunLumiMode_(),
327  exceptionMessageFiles_(),
328  exceptionMessageRuns_(),
329  exceptionMessageLumis_(),
330  alreadyHandlingException_(false),
331  forceLooperToEnd_(false),
332  looperBeginJobRun_(false),
333  forceESCacheClearOnNewRun_(false),
334  numberOfForkedChildren_(0),
335  numberOfSequentialEventsPerChild_(1),
336  setCpuAffinity_(false),
337  asyncStopRequestedWhileProcessingEvents_(false),
338  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
339  eventSetupDataToExcludeFromPrefetching_()
340  {
341  init(processDesc, token, legacy);
342  }
343 
344 
346  actReg_(),
347  preg_(),
348  branchIDListHelper_(),
349  serviceToken_(),
350  input_(),
351  espController_(new eventsetup::EventSetupsController),
352  esp_(),
353  act_table_(),
354  processConfiguration_(),
355  schedule_(),
356  subProcess_(),
357  historyAppender_(new HistoryAppender),
358  fb_(),
359  looper_(),
360  deferredExceptionPtrIsSet_(false),
361  principalCache_(),
362  beginJobCalled_(false),
363  shouldWeStop_(false),
364  stateMachineWasInErrorState_(false),
365  fileMode_(),
366  emptyRunLumiMode_(),
367  exceptionMessageFiles_(),
368  exceptionMessageRuns_(),
369  exceptionMessageLumis_(),
370  alreadyHandlingException_(false),
371  forceLooperToEnd_(false),
372  looperBeginJobRun_(false),
373  forceESCacheClearOnNewRun_(false),
374  numberOfForkedChildren_(0),
375  numberOfSequentialEventsPerChild_(1),
376  setCpuAffinity_(false),
377  asyncStopRequestedWhileProcessingEvents_(false),
378  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
379  eventSetupDataToExcludeFromPrefetching_()
380 {
381  if(isPython) {
382  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
383  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
385  }
386  else {
387  auto processDesc = std::make_shared<ProcessDesc>(config);
389  }
390  }
391 
392  void
393  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
394  ServiceToken const& iToken,
396 
397  //std::cerr << processDesc->dump() << std::endl;
398 
399  ROOT::Cintex::Cintex::Enable();
400 
401  // register the empty parentage vector , once and for all
403 
404  // register the empty parameter set, once and for all.
406 
407  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
408  //std::cerr << parameterSet->dump() << std::endl;
409 
410  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
411  std::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
412 
413  // Now set some parameters specific to the main process.
414  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
415  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
416  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
417  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
418  //threading
419  unsigned int nThreads=1;
420  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
421  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
422  if(nThreads == 0) {
423  nThreads = 1;
424  }
425  }
426  /* TODO: when we support having each stream run in a different thread use this default
427  unsigned int nStreams =nThreads;
428  */
429  unsigned int nStreams =1;
430  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
431  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
432  if(nStreams==0) {
433  nStreams = nThreads;
434  }
435  }
436  /*
437  bool nRunsSet = false;
438  */
439  unsigned int nConcurrentRuns =1;
440  /*
441  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
442  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
443  }
444  */
445  unsigned int nConcurrentLumis =1;
446  /*
447  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
448  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
449  } else {
450  nConcurrentLumis = nConcurrentRuns;
451  }
452  */
453  //Check that relationships between threading parameters makes sense
454  /*
455  if(nThreads<nStreams) {
456  //bad
457  }
458  if(nConcurrentRuns>nStreams) {
459  //bad
460  }
461  if(nConcurrentRuns>nConcurrentLumis) {
462  //bad
463  }
464  */
465  //forking
466  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
467  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
468  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
469  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
470  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
471  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
472  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
473  itPS != itPSEnd;
474  ++itPS) {
475  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
476  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
477  itPS->getUntrackedParameter<std::string>("label", "")));
478  }
479  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
480 
481  // Now do general initialization
482  ScheduleItems items;
483 
484  //initialize the services
485  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
486  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
487  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
488 
489  //make the services available
491 
492  if(nStreams>1) {
494  handler->willBeUsingThreads();
495  }
496 
497  // intialize miscellaneous items
498  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
499 
500  // intialize the event setup provider
501  esp_ = espController_->makeProvider(*parameterSet);
502 
503  // initialize the looper, if any
504  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
505  if(looper_) {
506  looper_->setActionTable(items.act_table_.get());
507  looper_->attachTo(*items.actReg_);
508 
509  //For now loopers make us run only 1 transition at a time
510  nStreams=1;
511  nConcurrentLumis=1;
512  nConcurrentRuns=1;
513  }
514 
515  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
516 
517  // initialize the input source
518  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_, preallocations_);
519 
520  // intialize the Schedule
521  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
522 
523  // set the data members
524  act_table_ = std::move(items.act_table_);
525  actReg_ = items.actReg_;
526  preg_.reset(items.preg_.release());
530  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
531 
532  FDEBUG(2) << parameterSet << std::endl;
533 
535  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
536  // Reusable event principal
537  auto ep = std::make_shared<EventPrincipal>(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get(), index);
538  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
539  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
541  }
542  // initialize the subprocess, if there is one
543  if(subProcessParameterSet) {
544  subProcess_.reset(new SubProcess(*subProcessParameterSet,
545  *parameterSet,
546  preg_,
549  *actReg_,
550  token,
553  &processContext_));
554  }
555  }
556 
558  // Make the services available while everything is being deleted.
559  ServiceToken token = getToken();
560  ServiceRegistry::Operate op(token);
561 
562  // manually destroy all these thing that may need the services around
563  espController_.reset();
564  subProcess_.reset();
565  esp_.reset();
566  schedule_.reset();
567  input_.reset();
568  looper_.reset();
569  actReg_.reset();
570 
573  }
574 
575  void
577  if(beginJobCalled_) return;
578  beginJobCalled_=true;
579  bk::beginJob();
580 
581  // StateSentry toerror(this); // should we add this ?
582  //make the services available
584 
589  actReg_->preallocateSignal_(bounds);
590  //NOTE: This implementation assumes 'Job' means one call
591  // the EventProcessor::run
592  // If it really means once per 'application' then this code will
593  // have to be changed.
594  // Also have to deal with case where have 'run' then new Module
595  // added and do 'run'
596  // again. In that case the newly added Module needs its 'beginJob'
597  // to be called.
598 
599  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
600  // For now we delay calling beginOfJob until first beginOfRun
601  //if(looper_) {
602  // looper_->beginOfJob(es);
603  //}
604  try {
605  convertException::wrap([&]() {
606  input_->doBeginJob();
607  });
608  }
609  catch(cms::Exception& ex) {
610  ex.addContext("Calling beginJob for the source");
611  throw;
612  }
613  schedule_->beginJob(*preg_);
614  // toerror.succeeded(); // should we add this?
615  if(hasSubProcess()) subProcess_->doBeginJob();
616  actReg_->postBeginJobSignal_();
617 
618  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
619  schedule_->beginStream(i);
620  if(hasSubProcess()) subProcess_->doBeginStream(i);
621  }
622  }
623 
624  void
626  // Collects exceptions, so we don't throw before all operations are performed.
627  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
628 
629  //make the services available
631 
632  //NOTE: this really should go elsewhere in the future
633  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
634  c.call([this,i](){this->schedule_->endStream(i);});
635  if(hasSubProcess()) {
636  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
637  }
638  }
639  schedule_->endJob(c);
640  if(hasSubProcess()) {
641  c.call(std::bind(&SubProcess::doEndJob, subProcess_.get()));
642  }
643  c.call(std::bind(&InputSource::doEndJob, input_.get()));
644  if(looper_) {
645  c.call(std::bind(&EDLooperBase::endOfJob, looper_));
646  }
647  auto actReg = actReg_.get();
648  c.call([actReg](){actReg->postEndJobSignal_();});
649  if(c.hasThrown()) {
650  c.rethrow();
651  }
652  }
653 
656  return serviceToken_;
657  }
658 
659  //Setup signal handler to listen for when forked children stop
660  namespace {
661  //These are volatile since the compiler can not be allowed to optimize them
662  // since they can be modified in the signaller handler
663  volatile bool child_failed = false;
664  volatile unsigned int num_children_done = 0;
665  volatile int child_fail_exit_status = 0;
666  volatile int child_fail_signal = 0;
667 
668  //NOTE: We setup the signal handler to run in the main thread which
669  // is also the same thread that then reads the above values
670 
671  extern "C" {
672  void ep_sigchld(int, siginfo_t*, void*) {
673  //printf("in sigchld\n");
674  //FDEBUG(1) << "in sigchld handler\n";
675  int stat_loc;
676  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
677  while(0<p) {
678  //printf(" looping\n");
679  if(WIFEXITED(stat_loc)) {
680  ++num_children_done;
681  if(0 != WEXITSTATUS(stat_loc)) {
682  child_fail_exit_status = WEXITSTATUS(stat_loc);
683  child_failed = true;
684  }
685  }
686  if(WIFSIGNALED(stat_loc)) {
687  ++num_children_done;
688  child_fail_signal = WTERMSIG(stat_loc);
689  child_failed = true;
690  }
691  p = waitpid(-1, &stat_loc, WNOHANG);
692  }
693  }
694  }
695 
696  }
697 
698  enum {
703  };
704 
705  namespace {
706  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
707  unsigned int n = 0;
708  while(numberOfChildren != 0) {
709  ++n;
710  numberOfChildren /= 10;
711  }
712  if(n == 0) {
713  n = 3; // Protect against zero numberOfChildren
714  }
715  return n;
716  }
717 
718  /*This class embodied the thread which is used to listen to the forked children and
719  then tell them which events they should process */
720  class MessageSenderToSource {
721  public:
722  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
723  void operator()();
724 
725  private:
726  const std::vector<int>& m_childrenPipes;
727  long const m_nEventsToProcess;
728  fd_set m_socketSet;
729  unsigned int m_aliveChildren;
730  int m_maxFd;
731  };
732 
733  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
734  std::vector<int> const& childrenPipes,
735  long iNEventsToProcess):
736  m_childrenPipes(childrenPipes),
737  m_nEventsToProcess(iNEventsToProcess),
738  m_aliveChildren(childrenSockets.size()),
739  m_maxFd(0)
740  {
741  FD_ZERO(&m_socketSet);
742  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
743  it != itEnd; it++) {
744  FD_SET(*it, &m_socketSet);
745  if (*it > m_maxFd) {
746  m_maxFd = *it;
747  }
748  }
749  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
750  it != itEnd; ++it) {
751  FD_SET(*it, &m_socketSet);
752  if (*it > m_maxFd) {
753  m_maxFd = *it;
754  }
755  }
756  m_maxFd++; // select reads [0,m_maxFd).
757  }
758 
759  /* This function is the heart of the communication between parent and child.
760  * When ready for more data, the child (see MessageReceiverForSource) requests
761  * data through a AF_UNIX socket message. The parent will then assign the next
762  * chunk of data by sending a message back.
763  *
764  * Additionally, this function also monitors the read-side of the pipe fd from the child.
765  * If the child dies unexpectedly, the pipe will be selected as ready for read and
766  * will return EPIPE when read from. Further, if the child thinks the parent has died
767  * (defined as waiting more than 1s for a response), it will write a single byte to
768  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
769  * If still alive, the parent will read the byte and ignore it.
770  *
771  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
772  * handler can distinguish between success and failure cases.
773  */
774 
775  void
776  MessageSenderToSource::operator()() {
777  multicore::MessageForParent childMsg;
778  LogInfo("ForkingController") << "I am controller";
779  //this is the master and therefore the controller
780 
781  multicore::MessageForSource sndmsg;
782  sndmsg.startIndex = 0;
783  sndmsg.nIndices = m_nEventsToProcess;
784  do {
785 
786  fd_set readSockets, errorSockets;
787  // Wait for a request from a child for events.
788  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
789  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
790  // Note that we don't timeout; may be reconsidered in the future.
791  ssize_t rc;
792  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
793  if (rc < 0) {
794  std::cerr << "select failed; should be impossible due to preconditions.\n";
795  abort();
796  break;
797  }
798 
799  // Read the message from the child.
800  for (int idx=0; idx<m_maxFd; idx++) {
801 
802  // Handle errors
803  if (FD_ISSET(idx, &errorSockets)) {
804  LogInfo("ForkingController") << "Error on socket " << idx;
805  FD_CLR(idx, &m_socketSet);
806  close(idx);
807  // See if it was the watchdog pipe that died.
808  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
809  if (*it == idx) {
810  m_aliveChildren--;
811  }
812  }
813  continue;
814  }
815 
816  if (!FD_ISSET(idx, &readSockets)) {
817  continue;
818  }
819 
820  // See if this FD is a child watchdog pipe. If so, read from it to prevent
821  // writes from blocking.
822  bool is_pipe = false;
823  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
824  if (*it == idx) {
825  is_pipe = true;
826  char buf;
827  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
828  if (rc <= 0) {
829  m_aliveChildren--;
830  FD_CLR(idx, &m_socketSet);
831  close(idx);
832  }
833  }
834  }
835 
836  // Only execute this block if the FD is a socket for sending the child work.
837  if (!is_pipe) {
838  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
839  if (rc < 0) {
840  FD_CLR(idx, &m_socketSet);
841  close(idx);
842  continue;
843  }
844 
845  // Tell the child what events to process.
846  // If 'send' fails, then the child process has failed (any other possibilities are
847  // eliminated because we are using fixed-size messages with Unix datagram sockets).
848  // Thus, the SIGCHLD handler will fire and set child_fail = true.
849  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
850  if (rc < 0) {
851  FD_CLR(idx, &m_socketSet);
852  close(idx);
853  continue;
854  }
855  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
856  sndmsg.startIndex += sndmsg.nIndices;
857  }
858  }
859 
860  } while (m_aliveChildren > 0);
861 
862  return;
863  }
864 
865  }
866 
867 
868  void EventProcessor::possiblyContinueAfterForkChildFailure() {
869  if(child_failed && continueAfterChildFailure_) {
870  if (child_fail_signal) {
871  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
872  child_fail_signal=0;
873  } else if (child_fail_exit_status) {
874  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
875  child_fail_exit_status=0;
876  } else {
877  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
878  }
879  child_failed =false;
880  }
881  }
882 
883  bool
884  EventProcessor::forkProcess(std::string const& jobReportFile) {
885 
886  if(0 == numberOfForkedChildren_) {return true;}
887  assert(0<numberOfForkedChildren_);
888  //do what we want done in common
889  {
890  beginJob(); //make sure this was run
891  // make the services available
892  ServiceRegistry::Operate operate(serviceToken_);
893 
894  InputSource::ItemType itemType;
895  itemType = input_->nextItemType();
896 
897  assert(itemType == InputSource::IsFile);
898  {
899  readFile();
900  }
901  itemType = input_->nextItemType();
902  assert(itemType == InputSource::IsRun);
903 
904  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
905  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
906  input_->runAuxiliary()->beginTime());
907  espController_->eventSetupForInstance(ts);
908  EventSetup const& es = esp_->eventSetup();
909 
910  //now get all the data available in the EventSetup
911  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
912  es.fillAvailableRecordKeys(recordKeys);
913  std::vector<eventsetup::DataKey> dataKeys;
914  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
915  itKey != itEnd;
916  ++itKey) {
917  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
918  //see if this is on our exclusion list
919  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
920  ExcludedData const* excludedData(nullptr);
921  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
922  excludedData = &(itExcludeRec->second);
923  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
924  //skip all items in this record
925  continue;
926  }
927  }
928  if(0 != recordPtr) {
929  dataKeys.clear();
930  recordPtr->fillRegisteredDataKeys(dataKeys);
931  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
932  itDataKey != itDataKeyEnd;
933  ++itDataKey) {
934  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
935  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
936  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
937  continue;
938  }
939  try {
940  recordPtr->doGet(*itDataKey);
941  } catch(cms::Exception& e) {
942  LogWarning("ForkingEventSetupPreFetching") << e.what();
943  }
944  }
945  }
946  }
947  }
948  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
949  {
950  // make the services available
951  ServiceRegistry::Operate operate(serviceToken_);
952  Service<JobReport> jobReport;
953  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
954 
955  //Now actually do the forking
956  actReg_->preForkReleaseResourcesSignal_();
957  input_->doPreForkReleaseResources();
958  schedule_->preForkReleaseResources();
959  }
960  installCustomHandler(SIGCHLD, ep_sigchld);
961 
962 
963  unsigned int childIndex = 0;
964  unsigned int const kMaxChildren = numberOfForkedChildren_;
965  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
966  std::vector<pid_t> childrenIds;
967  childrenIds.reserve(kMaxChildren);
968  std::vector<int> childrenSockets;
969  childrenSockets.reserve(kMaxChildren);
970  std::vector<int> childrenPipes;
971  childrenPipes.reserve(kMaxChildren);
972  std::vector<int> childrenSocketsCopy;
973  childrenSocketsCopy.reserve(kMaxChildren);
974  std::vector<int> childrenPipesCopy;
975  childrenPipesCopy.reserve(kMaxChildren);
976  int pipes[] {0, 0};
977 
978  {
979  // make the services available
980  ServiceRegistry::Operate operate(serviceToken_);
981  Service<JobReport> jobReport;
982  int sockets[2], fd_flags;
983  for(; childIndex < kMaxChildren; ++childIndex) {
984  // Create a UNIX_DGRAM socket pair
985  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
986  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
987  exit(EXIT_FAILURE);
988  }
989  if (pipe(pipes)) {
990  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
991  exit(EXIT_FAILURE);
992  }
993  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
994  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
995  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
996  exit(EXIT_FAILURE);
997  }
998  // Mark socket as non-block. Child must be careful to do select prior
999  // to reading from socket.
1000  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1001  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1002  exit(EXIT_FAILURE);
1003  }
1004  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1005  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1006  exit(EXIT_FAILURE);
1007  }
1008  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1009  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1010  exit(EXIT_FAILURE);
1011  }
1012  // Linux man page notes there are some edge cases where reading from a
1013  // fd can block, even after a select.
1014  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1015  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1016  exit(EXIT_FAILURE);
1017  }
1018  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1019  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022 
1023  childrenPipesCopy = childrenPipes;
1024  childrenSocketsCopy = childrenSockets;
1025 
1026  pid_t value = fork();
1027  if(value == 0) {
1028  // Close the parent's side of the socket and pipe which will talk to us.
1029  close(pipes[0]);
1030  close(sockets[0]);
1031  // Close our copies of the parent's other communication pipes.
1032  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1033  close(*it);
1034  }
1035  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1036  close(*it);
1037  }
1038 
1039  // this is the child process, redirect stdout and stderr to a log file
1040  fflush(stdout);
1041  fflush(stderr);
1042  std::stringstream stout;
1043  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1044  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1045  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1046  }
1047  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1048  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1049  }
1050 
1051  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1052  if(setCpuAffinity_) {
1053  // CPU affinity is handled differently on macosx.
1054  // We disable it and print a message until someone reads:
1055  //
1056  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1057  //
1058  // and implements it.
1059 #ifdef __APPLE__
1060  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1061 #else
1062  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1063  cpu_set_t mask;
1064  CPU_ZERO(&mask);
1065  CPU_SET(childIndex, &mask);
1066  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1067  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1068  exit(-1);
1069  }
1070 #endif
1071  }
1072  break;
1073  } else {
1074  //this is the parent
1075  close(pipes[1]);
1076  close(sockets[1]);
1077  }
1078  if(value < 0) {
1079  LogError("ForkingChild") << "failed to create a child";
1080  exit(-1);
1081  }
1082  childrenIds.push_back(value);
1083  childrenSockets.push_back(sockets[0]);
1084  childrenPipes.push_back(pipes[0]);
1085  }
1086 
1087  if(childIndex < kMaxChildren) {
1088  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1089  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1090 
1091  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1092  input_->doPostForkReacquireResources(receiver);
1093  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1094  //NOTE: sources have to reset themselves by listening to the post fork message
1095  //rewindInput();
1096  return true;
1097  }
1098  jobReport->parentAfterFork(jobReportFile);
1099  }
1100 
1101  //this is the original, which is now the master for all the children
1102 
1103  //Need to wait for signals from the children or externally
1104  // To wait we must
1105  // 1) block the signals we want to wait on so we do not have a race condition
1106  // 2) check that we haven't already meet our ending criteria
1107  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1108  sigset_t blockingSigSet;
1109  sigset_t unblockingSigSet;
1110  sigset_t oldSigSet;
1111  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1112  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1113  sigaddset(&blockingSigSet, SIGCHLD);
1114  sigaddset(&blockingSigSet, SIGUSR2);
1115  sigaddset(&blockingSigSet, SIGINT);
1116  sigdelset(&unblockingSigSet, SIGCHLD);
1117  sigdelset(&unblockingSigSet, SIGUSR2);
1118  sigdelset(&unblockingSigSet, SIGINT);
1119  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1120 
1121  // If there are too many fd's (unlikely, but possible) for select, denote this
1122  // because the sender will fail.
1123  bool too_many_fds = false;
1124  if (pipes[1]+1 > FD_SETSIZE) {
1125  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1126  too_many_fds = true;
1127  }
1128 
1129  //create a thread that sends the units of work to workers
1130  // we create it after all signals were blocked so that this
1131  // thread is never interupted by a signal
1132  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1133  boost::thread senderThread(sender);
1134 
1135  if(not too_many_fds) {
1136  //NOTE: a child could have failed before we got here and even after this call
1137  // which is why the 'if' is conditional on continueAfterChildFailure_
1138  possiblyContinueAfterForkChildFailure();
1139  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1140  sigsuspend(&unblockingSigSet);
1141  possiblyContinueAfterForkChildFailure();
1142  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1143  }
1144  }
1145  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1146 
1147  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1148  if(child_failed) {
1149  LogError("ForkingStopping") << "child failed";
1150  }
1151  if(shutdown_flag) {
1152  LogSystem("ForkingStopping") << "asked to shutdown";
1153  }
1154 
1155  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1156  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1157  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1158  it != itEnd; ++it) {
1159  /* int result = */ kill(*it, SIGUSR2);
1160  }
1161  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1162  while(num_children_done != kMaxChildren) {
1163  sigsuspend(&unblockingSigSet);
1164  }
1165  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1166  }
1167  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1168  senderThread.join();
1169  if(child_failed && !continueAfterChildFailure_) {
1170  if (child_fail_signal) {
1171  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1172  } else if (child_fail_exit_status) {
1173  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1174  } else {
1175  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1176  }
1177  }
1178  if(too_many_fds) {
1179  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1180  }
1181  return false;
1182  }
1183 
1184  std::vector<ModuleDescription const*>
1185  EventProcessor::getAllModuleDescriptions() const {
1186  return schedule_->getAllModuleDescriptions();
1187  }
1188 
1189  int
1190  EventProcessor::totalEvents() const {
1191  return schedule_->totalEvents();
1192  }
1193 
1194  int
1195  EventProcessor::totalEventsPassed() const {
1196  return schedule_->totalEventsPassed();
1197  }
1198 
1199  int
1200  EventProcessor::totalEventsFailed() const {
1201  return schedule_->totalEventsFailed();
1202  }
1203 
1204  void
1205  EventProcessor::enableEndPaths(bool active) {
1206  schedule_->enableEndPaths(active);
1207  }
1208 
1209  bool
1210  EventProcessor::endPathsEnabled() const {
1211  return schedule_->endPathsEnabled();
1212  }
1213 
1214  void
1215  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1216  schedule_->getTriggerReport(rep);
1217  }
1218 
1219  void
1220  EventProcessor::clearCounters() {
1221  schedule_->clearCounters();
1222  }
1223 
1224 
1225  std::auto_ptr<statemachine::Machine>
1226  EventProcessor::createStateMachine() {
1228  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1229  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1230  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1231  else {
1232  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1233  << fileMode_ << ".\n"
1234  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1235  }
1236 
1237  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1238  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1239  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1240  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1241  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1242  else {
1243  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1244  << emptyRunLumiMode_ << ".\n"
1245  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1246  }
1247 
1248  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1249  fileMode,
1250  emptyRunLumiMode));
1251 
1252  machine->initiate();
1253  return machine;
1254  }
1255 
1256  bool
1257  EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
1258  bool returnValue = false;
1259 
1260  // Look for a shutdown signal
1261  if(shutdown_flag.load(std::memory_order_acquire)) {
1262  returnValue = true;
1263  returnCode = epSignal;
1264  }
1265  return returnValue;
1266  }
1267 
1268 
1270  EventProcessor::runToCompletion() {
1271 
1272  StatusCode returnCode=epSuccess;
1273  asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1274  std::auto_ptr<statemachine::Machine> machine;
1275  {
1276  beginJob(); //make sure this was called
1277 
1278  //StatusCode returnCode = epSuccess;
1279  stateMachineWasInErrorState_ = false;
1280 
1281  // make the services available
1282  ServiceRegistry::Operate operate(serviceToken_);
1283 
1284  machine = createStateMachine();
1285  nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1286  asyncStopRequestedWhileProcessingEvents_=false;
1287  try {
1288  convertException::wrap([&]() {
1289 
1290  InputSource::ItemType itemType;
1291 
1292  while(true) {
1293 
1294  bool more = true;
1295  if(numberOfForkedChildren_ > 0) {
1296  size_t size = preg_->size();
1297  {
1298  SendSourceTerminationSignalIfException sentry(actReg_.get());
1299  more = input_->skipForForking();
1300  sentry.completedSuccessfully();
1301  }
1302  if(more) {
1303  if(size < preg_->size()) {
1304  principalCache_.adjustIndexesAfterProductRegistryAddition();
1305  }
1306  principalCache_.adjustEventsToNewProductRegistry(preg_);
1307  }
1308  }
1309  {
1310  SendSourceTerminationSignalIfException sentry(actReg_.get());
1311  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1312  sentry.completedSuccessfully();
1313  }
1314 
1315  FDEBUG(1) << "itemType = " << itemType << "\n";
1316 
1317  if(checkForAsyncStopRequest(returnCode)) {
1318  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1319  forceLooperToEnd_ = true;
1320  machine->process_event(statemachine::Stop());
1321  forceLooperToEnd_ = false;
1322  break;
1323  }
1324 
1325  if(itemType == InputSource::IsEvent) {
1326  machine->process_event(statemachine::Event());
1327  if(asyncStopRequestedWhileProcessingEvents_) {
1328  forceLooperToEnd_ = true;
1329  machine->process_event(statemachine::Stop());
1330  forceLooperToEnd_ = false;
1331  returnCode = asyncStopStatusCodeFromProcessingEvents_;
1332  break;
1333  }
1334  itemType = nextItemTypeFromProcessingEvents_;
1335  }
1336 
1337  if(itemType == InputSource::IsEvent) {
1338  }
1339  else if(itemType == InputSource::IsStop) {
1340  machine->process_event(statemachine::Stop());
1341  }
1342  else if(itemType == InputSource::IsFile) {
1343  machine->process_event(statemachine::File());
1344  }
1345  else if(itemType == InputSource::IsRun) {
1346  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1347  }
1348  else if(itemType == InputSource::IsLumi) {
1349  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1350  }
1351  else if(itemType == InputSource::IsSynchronize) {
1352  //For now, we don't have to do anything
1353  }
1354  // This should be impossible
1355  else {
1357  << "Unknown next item type passed to EventProcessor\n"
1358  << "Please report this error to the Framework group\n";
1359  }
1360  if(machine->terminated()) {
1361  break;
1362  }
1363  } // End of loop over state machine events
1364  }); // convertException::wrap
1365  } // Try block
1366  // Some comments on exception handling related to the boost state machine:
1367  //
1368  // Some states used in the machine are special because they
1369  // perform actions while the machine is being terminated, actions
1370  // such as close files, call endRun, call endLumi etc ... Each of these
1371  // states has two functions that perform these actions. The functions
1372  // are almost identical. The major difference is that one version
1373  // catches all exceptions and the other lets exceptions pass through.
1374  // The destructor catches them and the other function named "exit" lets
1375  // them pass through. On a normal termination, boost will always call
1376  // "exit" and then the state destructor. In our state classes, the
1377  // the destructors do nothing if the exit function already took
1378  // care of things. Here's the interesting part. When boost is
1379  // handling an exception the "exit" function is not called (a boost
1380  // feature).
1381  //
1382  // If an exception occurs while the boost machine is in control
1383  // (which usually means inside a process_event call), then
1384  // the boost state machine destroys its states and "terminates" itself.
1385  // This already done before we hit the catch blocks below. In this case
1386  // the call to terminateMachine below only destroys an already
1387  // terminated state machine. Because exit is not called, the state destructors
1388  // handle cleaning up lumis, runs, and files. The destructors swallow
1389  // all exceptions and only pass through the exceptions messages, which
1390  // are tacked onto the original exception below.
1391  //
1392  // If an exception occurs when the boost state machine is not
1393  // in control (outside the process_event functions), then boost
1394  // cannot destroy its own states. The terminateMachine function
1395  // below takes care of that. The flag "alreadyHandlingException"
1396  // is set true so that the state exit functions do nothing (and
1397  // cannot throw more exceptions while handling the first). Then the
1398  // state destructors take care of this because exit did nothing.
1399  //
1400  // In both cases above, the EventProcessor::endOfLoop function is
1401  // not called because it can throw exceptions.
1402  //
1403  // One tricky aspect of the state machine is that things that can
1404  // throw should not be invoked by the state machine while another
1405  // exception is being handled.
1406  // Another tricky aspect is that it appears to be important to
1407  // terminate the state machine before invoking its destructor.
1408  // We've seen crashes that are not understood when that is not
1409  // done. Maintainers of this code should be careful about this.
1410 
1411  catch (cms::Exception & e) {
1412  alreadyHandlingException_ = true;
1413  terminateMachine(machine);
1414  alreadyHandlingException_ = false;
1415  if (!exceptionMessageLumis_.empty()) {
1416  e.addAdditionalInfo(exceptionMessageLumis_);
1417  if (e.alreadyPrinted()) {
1418  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1419  }
1420  }
1421  if (!exceptionMessageRuns_.empty()) {
1422  e.addAdditionalInfo(exceptionMessageRuns_);
1423  if (e.alreadyPrinted()) {
1424  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1425  }
1426  }
1427  if (!exceptionMessageFiles_.empty()) {
1428  e.addAdditionalInfo(exceptionMessageFiles_);
1429  if (e.alreadyPrinted()) {
1430  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1431  }
1432  }
1433  throw;
1434  }
1435 
1436  if(machine->terminated()) {
1437  FDEBUG(1) << "The state machine reports it has been terminated\n";
1438  machine.reset();
1439  }
1440 
1441  if(stateMachineWasInErrorState_) {
1442  throw cms::Exception("BadState")
1443  << "The boost state machine in the EventProcessor exited after\n"
1444  << "entering the Error state.\n";
1445  }
1446 
1447  }
1448  if(machine.get() != 0) {
1449  terminateMachine(machine);
1451  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1452  << "Please report this error to the Framework group\n";
1453  }
1454 
1455  return returnCode;
1456  }
1457 
1458  void EventProcessor::readFile() {
1459  FDEBUG(1) << " \treadFile\n";
1460  size_t size = preg_->size();
1461  SendSourceTerminationSignalIfException sentry(actReg_.get());
1462 
1463  fb_ = input_->readFile();
1464  if(size < preg_->size()) {
1465  principalCache_.adjustIndexesAfterProductRegistryAddition();
1466  }
1467  principalCache_.adjustEventsToNewProductRegistry(preg_);
1468  if((numberOfForkedChildren_ > 0) or
1469  (preallocations_.numberOfStreams()>1 and
1470  preallocations_.numberOfThreads()>1)) {
1471  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1472  }
1473  sentry.completedSuccessfully();
1474  }
1475 
1476  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1477  if (fb_.get() != nullptr) {
1478  SendSourceTerminationSignalIfException sentry(actReg_.get());
1479  input_->closeFile(fb_.get(), cleaningUpAfterException);
1480  sentry.completedSuccessfully();
1481  }
1482  FDEBUG(1) << "\tcloseInputFile\n";
1483  }
1484 
1485  void EventProcessor::openOutputFiles() {
1486  if (fb_.get() != nullptr) {
1487  schedule_->openOutputFiles(*fb_);
1488  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1489  }
1490  FDEBUG(1) << "\topenOutputFiles\n";
1491  }
1492 
1493  void EventProcessor::closeOutputFiles() {
1494  if (fb_.get() != nullptr) {
1495  schedule_->closeOutputFiles();
1496  if(hasSubProcess()) subProcess_->closeOutputFiles();
1497  }
1498  FDEBUG(1) << "\tcloseOutputFiles\n";
1499  }
1500 
1501  void EventProcessor::respondToOpenInputFile() {
1502  if(hasSubProcess()) {
1503  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1504  }
1505  if (fb_.get() != nullptr) {
1506  schedule_->respondToOpenInputFile(*fb_);
1507  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1508  }
1509  FDEBUG(1) << "\trespondToOpenInputFile\n";
1510  }
1511 
1512  void EventProcessor::respondToCloseInputFile() {
1513  if (fb_.get() != nullptr) {
1514  schedule_->respondToCloseInputFile(*fb_);
1515  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1516  }
1517  FDEBUG(1) << "\trespondToCloseInputFile\n";
1518  }
1519 
1520  void EventProcessor::startingNewLoop() {
1521  shouldWeStop_ = false;
1522  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1523  // until after we've called beginOfJob
1524  if(looper_ && looperBeginJobRun_) {
1525  looper_->doStartingNewLoop();
1526  }
1527  FDEBUG(1) << "\tstartingNewLoop\n";
1528  }
1529 
1530  bool EventProcessor::endOfLoop() {
1531  if(looper_) {
1532  ModuleChanger changer(schedule_.get(),preg_.get());
1533  looper_->setModuleChanger(&changer);
1534  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1535  looper_->setModuleChanger(nullptr);
1536  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1537  else return false;
1538  }
1539  FDEBUG(1) << "\tendOfLoop\n";
1540  return true;
1541  }
1542 
1543  void EventProcessor::rewindInput() {
1544  input_->repeat();
1545  input_->rewind();
1546  FDEBUG(1) << "\trewind\n";
1547  }
1548 
1549  void EventProcessor::prepareForNextLoop() {
1550  looper_->prepareForNextLoop(esp_.get());
1551  FDEBUG(1) << "\tprepareForNextLoop\n";
1552  }
1553 
1554  bool EventProcessor::shouldWeCloseOutput() const {
1555  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1556  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1557  }
1558 
1559  void EventProcessor::doErrorStuff() {
1560  FDEBUG(1) << "\tdoErrorStuff\n";
1561  LogError("StateMachine")
1562  << "The EventProcessor state machine encountered an unexpected event\n"
1563  << "and went to the error state\n"
1564  << "Will attempt to terminate processing normally\n"
1565  << "(IF using the looper the next loop will be attempted)\n"
1566  << "This likely indicates a bug in an input module or corrupted input or both\n";
1567  stateMachineWasInErrorState_ = true;
1568  }
1569 
1570  void EventProcessor::beginRun(statemachine::Run const& run) {
1571  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1572  {
1573  SendSourceTerminationSignalIfException sentry(actReg_.get());
1574 
1575  input_->doBeginRun(runPrincipal, &processContext_);
1576  sentry.completedSuccessfully();
1577  }
1578 
1579  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1580  runPrincipal.beginTime());
1581  if(forceESCacheClearOnNewRun_){
1582  espController_->forceCacheClear();
1583  }
1584  {
1585  SendSourceTerminationSignalIfException sentry(actReg_.get());
1586  espController_->eventSetupForInstance(ts);
1587  sentry.completedSuccessfully();
1588  }
1589  EventSetup const& es = esp_->eventSetup();
1590  if(looper_ && looperBeginJobRun_== false) {
1591  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1592  looper_->beginOfJob(es);
1593  looperBeginJobRun_ = true;
1594  looper_->doStartingNewLoop();
1595  }
1596  {
1598  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1599  if(hasSubProcess()) {
1600  subProcess_->doBeginRun(runPrincipal, ts);
1601  }
1602  }
1603  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1604  if(looper_) {
1605  looper_->doBeginRun(runPrincipal, es, &processContext_);
1606  }
1607  {
1609  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1610  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1611  if(hasSubProcess()) {
1612  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1613  }
1614  }
1615  }
1616  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1617  if(looper_) {
1618  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1619  }
1620  }
1621 
1622  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1623  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1624  {
1625  SendSourceTerminationSignalIfException sentry(actReg_.get());
1626 
1627  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1628  sentry.completedSuccessfully();
1629  }
1630 
1631  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1632  runPrincipal.endTime());
1633  {
1634  SendSourceTerminationSignalIfException sentry(actReg_.get());
1635  espController_->eventSetupForInstance(ts);
1636  sentry.completedSuccessfully();
1637  }
1638  EventSetup const& es = esp_->eventSetup();
1639  {
1640  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1642  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1643  if(hasSubProcess()) {
1644  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1645  }
1646  }
1647  }
1648  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1649  if(looper_) {
1650  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1651  }
1652  {
1654  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1655  if(hasSubProcess()) {
1656  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1657  }
1658  }
1659  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1660  if(looper_) {
1661  looper_->doEndRun(runPrincipal, es, &processContext_);
1662  }
1663  }
1664 
1665  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1666  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1667  {
1668  SendSourceTerminationSignalIfException sentry(actReg_.get());
1669 
1670  input_->doBeginLumi(lumiPrincipal, &processContext_);
1671  sentry.completedSuccessfully();
1672  }
1673 
1675  if(rng.isAvailable()) {
1676  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1677  rng->preBeginLumi(lb);
1678  }
1679 
1680  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1681  // lumi blocks know their start and end times why not also start and end events?
1682  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1683  {
1684  SendSourceTerminationSignalIfException sentry(actReg_.get());
1685  espController_->eventSetupForInstance(ts);
1686  sentry.completedSuccessfully();
1687  }
1688  EventSetup const& es = esp_->eventSetup();
1689  {
1691  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1692  if(hasSubProcess()) {
1693  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1694  }
1695  }
1696  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1697  if(looper_) {
1698  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1699  }
1700  {
1701  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1703  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1704  if(hasSubProcess()) {
1705  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1706  }
1707  }
1708  }
1709  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1710  if(looper_) {
1711  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1712  }
1713  }
1714 
1715  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1716  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1717  {
1718  SendSourceTerminationSignalIfException sentry(actReg_.get());
1719 
1720  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1721  sentry.completedSuccessfully();
1722  }
1723  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1724  // lumi blocks know their start and end times why not also start and end events?
1725  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1726  lumiPrincipal.endTime());
1727  {
1728  SendSourceTerminationSignalIfException sentry(actReg_.get());
1729  espController_->eventSetupForInstance(ts);
1730  sentry.completedSuccessfully();
1731  }
1732  EventSetup const& es = esp_->eventSetup();
1733  {
1734  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1736  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1737  if(hasSubProcess()) {
1738  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1739  }
1740  }
1741  }
1742  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1743  if(looper_) {
1744  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1745  }
1746  {
1748  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1749  if(hasSubProcess()) {
1750  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1751  }
1752  }
1753  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1754  if(looper_) {
1755  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1756  }
1757  }
1758 
1759  statemachine::Run EventProcessor::readRun() {
1760  if (principalCache_.hasRunPrincipal()) {
1762  << "EventProcessor::readRun\n"
1763  << "Illegal attempt to insert run into cache\n"
1764  << "Contact a Framework Developer\n";
1765  }
1766  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1767  {
1768  SendSourceTerminationSignalIfException sentry(actReg_.get());
1769  input_->readRun(*rp, *historyAppender_);
1770  sentry.completedSuccessfully();
1771  }
1772  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1773  principalCache_.insert(rp);
1774  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1775  }
1776 
1777  statemachine::Run EventProcessor::readAndMergeRun() {
1778  principalCache_.merge(input_->runAuxiliary(), preg_);
1779  auto runPrincipal =principalCache_.runPrincipalPtr();
1780  {
1781  SendSourceTerminationSignalIfException sentry(actReg_.get());
1782  input_->readAndMergeRun(*runPrincipal);
1783  sentry.completedSuccessfully();
1784  }
1785  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1786  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1787  }
1788 
1789  int EventProcessor::readLuminosityBlock() {
1790  if (principalCache_.hasLumiPrincipal()) {
1792  << "EventProcessor::readRun\n"
1793  << "Illegal attempt to insert lumi into cache\n"
1794  << "Contact a Framework Developer\n";
1795  }
1796  if (!principalCache_.hasRunPrincipal()) {
1798  << "EventProcessor::readRun\n"
1799  << "Illegal attempt to insert lumi into cache\n"
1800  << "Run is invalid\n"
1801  << "Contact a Framework Developer\n";
1802  }
1803  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1804  {
1805  SendSourceTerminationSignalIfException sentry(actReg_.get());
1806  input_->readLuminosityBlock(*lbp, *historyAppender_);
1807  sentry.completedSuccessfully();
1808  }
1809  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1810  principalCache_.insert(lbp);
1811  return input_->luminosityBlock();
1812  }
1813 
1814  int EventProcessor::readAndMergeLumi() {
1815  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1816  {
1817  SendSourceTerminationSignalIfException sentry(actReg_.get());
1818  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1819  sentry.completedSuccessfully();
1820  }
1821  return input_->luminosityBlock();
1822  }
1823 
1824  void EventProcessor::writeRun(statemachine::Run const& run) {
1825  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1826  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1827  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1828  }
1829 
1830  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
1831  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1832  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1833  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1834  }
1835 
1836  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1837  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1838  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1839  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1840  }
1841 
1842  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1843  principalCache_.deleteLumi(phid, run, lumi);
1844  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1845  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1846  }
1847 
1848  class StreamProcessingTask : public tbb::task {
1849  public:
1851  unsigned int iStreamIndex,
1852  std::atomic<bool>* iFinishedProcessingEvents,
1853  tbb::task* iWaitTask):
1854  m_proc(iProc),
1855  m_streamID(iStreamIndex),
1856  m_finishedProcessingEvents(iFinishedProcessingEvents),
1857  m_waitTask(iWaitTask){}
1858 
1859  tbb::task* execute() {
1860  m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1861  m_waitTask->decrement_ref_count();
1862  return nullptr;
1863  }
1864  private:
1866  unsigned int m_streamID;
1867  std::atomic<bool>* m_finishedProcessingEvents;
1868  tbb::task* m_waitTask;
1869  };
1870 
1871  void EventProcessor::processEventsForStreamAsync(unsigned int iStreamIndex,
1872  std::atomic<bool>* finishedProcessingEvents) {
1873  try {
1874  // make the services available
1875  ServiceRegistry::Operate operate(serviceToken_);
1876  if(preallocations_.numberOfStreams()>1) {
1878  handler->initializeThisThreadForUse();
1879  }
1880 
1881  if(iStreamIndex==0) {
1882  processEvent(0);
1883  }
1884  do {
1885  if(shouldWeStop()) {
1886  break;
1887  }
1888  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1889  //another thread hit an exception
1890  //std::cerr<<"another thread saw an exception\n";
1891  break;
1892  }
1893  {
1894 
1895 
1896  {
1897  //nextItemType and readEvent need to be in same critical section
1898  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1899 
1900  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1901  //std::cerr<<"finishedProcessingEvents\n";
1902  break;
1903  }
1904 
1905  //If source and DelayedReader share a resource we must serialize them
1906  auto sr = input_->resourceSharedWithDelayedReader();
1907  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1908  if(sr) {
1909  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1910  }
1911  InputSource::ItemType itemType = input_->nextItemType();
1912  if (InputSource::IsEvent !=itemType) {
1913  nextItemTypeFromProcessingEvents_ = itemType;
1914  finishedProcessingEvents->store(true,std::memory_order_release);
1915  //std::cerr<<"next item type "<<itemType<<"\n";
1916  break;
1917  }
1918  if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1919  //std::cerr<<"task told to async stop\n";
1920  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1921  break;
1922  }
1923  readEvent(iStreamIndex);
1924  }
1925  }
1926  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1927  //another thread hit an exception
1928  //std::cerr<<"another thread saw an exception\n";
1929  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1930 
1931  break;
1932  }
1933  processEvent(iStreamIndex);
1934  }while(true);
1935  } catch (...) {
1936  bool expected =false;
1937  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1938  deferredExceptionPtr_ = std::current_exception();
1939  }
1940  //std::cerr<<"task caught exception\n";
1941  }
1942  }
1943 
1944  void EventProcessor::readAndProcessEvent() {
1945  if(numberOfForkedChildren_>0) {
1946  readEvent(0);
1947  processEvent(0);
1948  return;
1949  }
1950  nextItemTypeFromProcessingEvents_ = InputSource::IsEvent; //needed for looper
1951  asyncStopRequestedWhileProcessingEvents_ = false;
1952 
1953  std::atomic<bool> finishedProcessingEvents{false};
1954 
1955  //Task assumes Stream 0 has already read the event that caused us to go here
1956  readEvent(0);
1957 
1958  //To wait, the ref count has to b 1+#streams
1959  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1960  eventLoopWaitTask->increment_ref_count();
1961 
1962  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1963  unsigned int iStreamIndex = 0;
1964  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1965  eventLoopWaitTask->increment_ref_count();
1966  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1967 
1968  }
1969  eventLoopWaitTask->increment_ref_count();
1970  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1971  tbb::task::destroy(*eventLoopWaitTask);
1972 
1973  //One of the processing threads saw an exception
1974  if(deferredExceptionPtrIsSet_) {
1975  std::rethrow_exception(deferredExceptionPtr_);
1976  }
1977  }
1978  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1979  //TODO this will have to become per stream
1980  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1981  StreamContext streamContext(event.streamID(), &processContext_);
1982 
1983  SendSourceTerminationSignalIfException sentry(actReg_.get());
1984  input_->readEvent(event, streamContext);
1985  sentry.completedSuccessfully();
1986 
1987  FDEBUG(1) << "\treadEvent\n";
1988  }
1989  void EventProcessor::processEvent(unsigned int iStreamIndex) {
1990  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1991  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1993  if(rng.isAvailable()) {
1994  Event ev(*pep, ModuleDescription(), nullptr);
1995  rng->postEventRead(ev);
1996  }
1997  assert(pep->luminosityBlockPrincipalPtrValid());
1998  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1999  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2000 
2001  //We can only update IOVs on Lumi boundaries
2002  //IOVSyncValue ts(pep->id(), pep->time());
2003  //espController_->eventSetupForInstance(ts);
2004  EventSetup const& es = esp_->eventSetup();
2005  {
2007  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2008  if(hasSubProcess()) {
2009  subProcess_->doEvent(*pep);
2010  }
2011  }
2012 
2013  //NOTE: If we have a looper we only have one Stream
2014  if(looper_) {
2015  bool randomAccess = input_->randomAccess();
2016  ProcessingController::ForwardState forwardState = input_->forwardState();
2017  ProcessingController::ReverseState reverseState = input_->reverseState();
2018  ProcessingController pc(forwardState, reverseState, randomAccess);
2019 
2020  EDLooperBase::Status status = EDLooperBase::kContinue;
2021  do {
2022 
2023  StreamContext streamContext(pep->streamID(), &processContext_);
2024  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2025 
2026  bool succeeded = true;
2027  if(randomAccess) {
2028  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2029  input_->skipEvents(-2);
2030  }
2031  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2032  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2033  }
2034  }
2035  pc.setLastOperationSucceeded(succeeded);
2036  } while(!pc.lastOperationSucceeded());
2037  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2038 
2039  }
2040 
2041  FDEBUG(1) << "\tprocessEvent\n";
2042  pep->clearEventPrincipal();
2043  }
2044 
2045  bool EventProcessor::shouldWeStop() const {
2046  FDEBUG(1) << "\tshouldWeStop\n";
2047  if(shouldWeStop_) return true;
2048  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2049  }
2050 
2051  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2052  exceptionMessageFiles_ = message;
2053  }
2054 
2055  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2056  exceptionMessageRuns_ = message;
2057  }
2058 
2059  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2060  exceptionMessageLumis_ = message;
2061  }
2062 
2063  bool EventProcessor::alreadyHandlingException() const {
2064  return alreadyHandlingException_;
2065  }
2066 
2067  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2068  if(iMachine.get() != 0) {
2069  if(!iMachine->terminated()) {
2070  forceLooperToEnd_ = true;
2071  iMachine->process_event(statemachine::Stop());
2072  forceLooperToEnd_ = false;
2073  }
2074  else {
2075  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2076  }
2077  if(iMachine->terminated()) {
2078  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2079  }
2080  iMachine.reset();
2081  }
2082  }
2083 }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:55
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:58
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:253
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:546
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
tuple lumi
Definition: fjr2json.py:35
Timestamp const & endTime() const
processConfiguration
Definition: Schedule.cc:368
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
std::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:59
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
static std::string const input
Definition: EdmProvDump.cc:44
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
static InputSourceFactory const * get()
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::shared_ptr< edm::ParameterSet > parameterSet()
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:42
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::unique_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:56
virtual void endOfJob()
Definition: EDLooperBase.cc:90
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
areg
Definition: Schedule.cc:368
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void addContext(std::string const &context)
Definition: Exception.cc:227
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
Definition: ScheduleItems.h:57
ServiceToken getToken()
edm::RunNumber_t runNumber() const
Definition: EPStates.h:49
static ComponentFactory< T > const * get()
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:48
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
Definition: EventRange.h:32
#define O_NONBLOCK
Definition: SysFile.h:21
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
tuple status
Definition: ntuplemaker.py:245
preg
Definition: Schedule.cc:368
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:14
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31