CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
9 
34 
36 
44 
49 
59 
60 #include "MessageForSource.h"
61 #include "MessageForParent.h"
62 
63 #include "boost/thread/xtime.hpp"
64 
65 #include <exception>
66 #include <iomanip>
67 #include <iostream>
68 #include <utility>
69 #include <sstream>
70 
71 #include <sys/ipc.h>
72 #include <sys/msg.h>
73 
74 #include "tbb/task.h"
75 
76 //Used for forking
77 #include <sys/types.h>
78 #include <sys/wait.h>
79 #include <sys/socket.h>
80 #include <sys/select.h>
81 #include <sys/fcntl.h>
82 #include <unistd.h>
83 
84 //Used for CPU affinity
85 #ifndef __APPLE__
86 #include <sched.h>
87 #endif
88 
89 namespace {
90  //Sentry class to only send a signal if an
91  // exception occurs. An exception is identified
92  // by the destructor being called without first
93  // calling completedSuccessfully().
94  class SendSourceTerminationSignalIfException {
95  public:
96  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
97  reg_(iReg) {}
98  ~SendSourceTerminationSignalIfException() {
99  if(reg_) {
100  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
101  }
102  }
103  void completedSuccessfully() {
104  reg_ = nullptr;
105  }
106  private:
107  edm::ActivityRegistry* reg_;
108 
109  };
110 }
111 
112 namespace edm {
113 
114  // ---------------------------------------------------------------
115  std::unique_ptr<InputSource>
117  CommonParams const& common,
119  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
120  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
121  std::shared_ptr<ActivityRegistry> areg,
122  std::shared_ptr<ProcessConfiguration const> processConfiguration,
123  PreallocationConfiguration const& allocations) {
124  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
125  if(main_input == 0) {
127  << "There must be exactly one source in the configuration.\n"
128  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
129  }
130 
131  std::string modtype(main_input->getParameter<std::string>("@module_type"));
132 
133  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
135  ConfigurationDescriptions descriptions(filler->baseType());
136  filler->fill(descriptions);
137 
138  try {
139  convertException::wrap([&]() {
140  descriptions.validate(*main_input, std::string("source"));
141  });
142  }
143  catch (cms::Exception & iException) {
144  std::ostringstream ost;
145  ost << "Validating configuration of input source of type " << modtype;
146  iException.addContext(ost.str());
147  throw;
148  }
149 
150  main_input->registerIt();
151 
152  // Fill in "ModuleDescription", in case the input source produces
153  // any EDProducts, which would be registered in the ProductRegistry.
154  // Also fill in the process history item for this process.
155  // There is no module label for the unnamed input source, so
156  // just use "source".
157  // Only the tracked parameters belong in the process configuration.
158  ModuleDescription md(main_input->id(),
159  main_input->getParameter<std::string>("@module_type"),
160  "source",
161  processConfiguration.get(),
163 
164  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
165  common.maxEventsInput_, common.maxLumisInput_,
166  common.maxSecondsUntilRampdown_, allocations);
167 
168  areg->preSourceConstructionSignal_(md);
169  std::unique_ptr<InputSource> input;
170  try {
171  //even if we have an exception, send the signal
172  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
173  convertException::wrap([&]() {
174  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
175  });
176  }
177  catch (cms::Exception& iException) {
178  std::ostringstream ost;
179  ost << "Constructing input source of type " << modtype;
180  iException.addContext(ost.str());
181  throw;
182  }
183  return input;
184  }
185 
186  // ---------------------------------------------------------------
187  boost::shared_ptr<EDLooperBase>
190  ParameterSet& params) {
191  boost::shared_ptr<EDLooperBase> vLooper;
192 
193  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
194 
195  if(loopers.size() == 0) {
196  return vLooper;
197  }
198 
199  assert(1 == loopers.size());
200 
201  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
202  itName != itNameEnd;
203  ++itName) {
204 
205  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
206  providerPSet->registerIt();
207  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
208  cp,
209  *providerPSet);
210  }
211  return vLooper;
212 
213  }
214 
215  // ---------------------------------------------------------------
217  ServiceToken const& iToken,
219  std::vector<std::string> const& defaultServices,
220  std::vector<std::string> const& forcedServices) :
221  actReg_(),
222  preg_(),
223  branchIDListHelper_(),
224  serviceToken_(),
225  input_(),
226  espController_(new eventsetup::EventSetupsController),
227  esp_(),
228  act_table_(),
229  processConfiguration_(),
230  schedule_(),
231  subProcess_(),
232  historyAppender_(new HistoryAppender),
233  fb_(),
234  looper_(),
235  deferredExceptionPtrIsSet_(false),
236  principalCache_(),
237  beginJobCalled_(false),
238  shouldWeStop_(false),
239  stateMachineWasInErrorState_(false),
240  fileMode_(),
241  emptyRunLumiMode_(),
242  exceptionMessageFiles_(),
243  exceptionMessageRuns_(),
244  exceptionMessageLumis_(),
245  alreadyHandlingException_(false),
246  forceLooperToEnd_(false),
247  looperBeginJobRun_(false),
248  forceESCacheClearOnNewRun_(false),
249  numberOfForkedChildren_(0),
250  numberOfSequentialEventsPerChild_(1),
251  setCpuAffinity_(false),
252  eventSetupDataToExcludeFromPrefetching_() {
253  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
254  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
255  processDesc->addServices(defaultServices, forcedServices);
256  init(processDesc, iToken, iLegacy);
257  }
258 
260  std::vector<std::string> const& defaultServices,
261  std::vector<std::string> const& forcedServices) :
262  actReg_(),
263  preg_(),
264  branchIDListHelper_(),
265  serviceToken_(),
266  input_(),
267  espController_(new eventsetup::EventSetupsController),
268  esp_(),
269  act_table_(),
270  processConfiguration_(),
271  schedule_(),
272  subProcess_(),
273  historyAppender_(new HistoryAppender),
274  fb_(),
275  looper_(),
276  deferredExceptionPtrIsSet_(false),
277  principalCache_(),
278  beginJobCalled_(false),
279  shouldWeStop_(false),
280  stateMachineWasInErrorState_(false),
281  fileMode_(),
282  emptyRunLumiMode_(),
283  exceptionMessageFiles_(),
284  exceptionMessageRuns_(),
285  exceptionMessageLumis_(),
286  alreadyHandlingException_(false),
287  forceLooperToEnd_(false),
288  looperBeginJobRun_(false),
289  forceESCacheClearOnNewRun_(false),
290  numberOfForkedChildren_(0),
291  numberOfSequentialEventsPerChild_(1),
292  setCpuAffinity_(false),
293  asyncStopRequestedWhileProcessingEvents_(false),
294  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
295  eventSetupDataToExcludeFromPrefetching_()
296  {
297  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
298  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
299  processDesc->addServices(defaultServices, forcedServices);
301  }
302 
303  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc>& processDesc,
304  ServiceToken const& token,
306  actReg_(),
307  preg_(),
308  branchIDListHelper_(),
309  serviceToken_(),
310  input_(),
311  espController_(new eventsetup::EventSetupsController),
312  esp_(),
313  act_table_(),
314  processConfiguration_(),
315  schedule_(),
316  subProcess_(),
317  historyAppender_(new HistoryAppender),
318  fb_(),
319  looper_(),
320  deferredExceptionPtrIsSet_(false),
321  principalCache_(),
322  beginJobCalled_(false),
323  shouldWeStop_(false),
324  stateMachineWasInErrorState_(false),
325  fileMode_(),
326  emptyRunLumiMode_(),
327  exceptionMessageFiles_(),
328  exceptionMessageRuns_(),
329  exceptionMessageLumis_(),
330  alreadyHandlingException_(false),
331  forceLooperToEnd_(false),
332  looperBeginJobRun_(false),
333  forceESCacheClearOnNewRun_(false),
334  numberOfForkedChildren_(0),
335  numberOfSequentialEventsPerChild_(1),
336  setCpuAffinity_(false),
337  asyncStopRequestedWhileProcessingEvents_(false),
338  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
339  eventSetupDataToExcludeFromPrefetching_()
340  {
341  init(processDesc, token, legacy);
342  }
343 
344 
346  actReg_(),
347  preg_(),
348  branchIDListHelper_(),
349  serviceToken_(),
350  input_(),
351  espController_(new eventsetup::EventSetupsController),
352  esp_(),
353  act_table_(),
354  processConfiguration_(),
355  schedule_(),
356  subProcess_(),
357  historyAppender_(new HistoryAppender),
358  fb_(),
359  looper_(),
360  deferredExceptionPtrIsSet_(false),
361  principalCache_(),
362  beginJobCalled_(false),
363  shouldWeStop_(false),
364  stateMachineWasInErrorState_(false),
365  fileMode_(),
366  emptyRunLumiMode_(),
367  exceptionMessageFiles_(),
368  exceptionMessageRuns_(),
369  exceptionMessageLumis_(),
370  alreadyHandlingException_(false),
371  forceLooperToEnd_(false),
372  looperBeginJobRun_(false),
373  forceESCacheClearOnNewRun_(false),
374  numberOfForkedChildren_(0),
375  numberOfSequentialEventsPerChild_(1),
376  setCpuAffinity_(false),
377  asyncStopRequestedWhileProcessingEvents_(false),
378  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
379  eventSetupDataToExcludeFromPrefetching_()
380 {
381  if(isPython) {
382  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
383  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
385  }
386  else {
387  auto processDesc = std::make_shared<ProcessDesc>(config);
389  }
390  }
391 
392  void
393  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
394  ServiceToken const& iToken,
396 
397  //std::cerr << processDesc->dump() << std::endl;
398 
399  // register the empty parentage vector , once and for all
401 
402  // register the empty parameter set, once and for all.
404 
405  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
406  //std::cerr << parameterSet->dump() << std::endl;
407 
408  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
409  std::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
410 
411  // Now set some parameters specific to the main process.
412  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
413  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
414  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
415  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
416  //threading
417  unsigned int nThreads=1;
418  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
419  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
420  if(nThreads == 0) {
421  nThreads = 1;
422  }
423  }
424  /* TODO: when we support having each stream run in a different thread use this default
425  unsigned int nStreams =nThreads;
426  */
427  unsigned int nStreams =1;
428  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
429  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
430  if(nStreams==0) {
431  nStreams = nThreads;
432  }
433  }
434  /*
435  bool nRunsSet = false;
436  */
437  unsigned int nConcurrentRuns =1;
438  /*
439  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
440  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
441  }
442  */
443  unsigned int nConcurrentLumis =1;
444  /*
445  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
446  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
447  } else {
448  nConcurrentLumis = nConcurrentRuns;
449  }
450  */
451  //Check that relationships between threading parameters makes sense
452  /*
453  if(nThreads<nStreams) {
454  //bad
455  }
456  if(nConcurrentRuns>nStreams) {
457  //bad
458  }
459  if(nConcurrentRuns>nConcurrentLumis) {
460  //bad
461  }
462  */
463  //forking
464  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
465  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
466  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
467  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
468  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
469  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
470  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
471  itPS != itPSEnd;
472  ++itPS) {
473  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
474  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
475  itPS->getUntrackedParameter<std::string>("label", "")));
476  }
477  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
478 
479  // Now do general initialization
480  ScheduleItems items;
481 
482  //initialize the services
483  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
484  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
485  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
486 
487  //make the services available
489 
490  if(nStreams>1) {
492  handler->willBeUsingThreads();
493  }
494 
495  // intialize miscellaneous items
496  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
497 
498  // intialize the event setup provider
499  esp_ = espController_->makeProvider(*parameterSet);
500 
501  // initialize the looper, if any
502  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
503  if(looper_) {
504  looper_->setActionTable(items.act_table_.get());
505  looper_->attachTo(*items.actReg_);
506 
507  //For now loopers make us run only 1 transition at a time
508  nStreams=1;
509  nConcurrentLumis=1;
510  nConcurrentRuns=1;
511  }
512 
513  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
514 
515  // initialize the input source
516  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.thinnedAssociationsHelper_, items.actReg_, items.processConfiguration_, preallocations_);
517 
518  // intialize the Schedule
519  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
520 
521  // set the data members
522  act_table_ = std::move(items.act_table_);
523  actReg_ = items.actReg_;
524  preg_.reset(items.preg_.release());
529  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
530 
531  FDEBUG(2) << parameterSet << std::endl;
532 
534  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
535  // Reusable event principal
536  auto ep = std::make_shared<EventPrincipal>(preg_, branchIDListHelper_, thinnedAssociationsHelper_, *processConfiguration_, historyAppender_.get(), index);
537  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
538  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
540  }
541  // initialize the subprocess, if there is one
542  if(subProcessParameterSet) {
543  subProcess_.reset(new SubProcess(*subProcessParameterSet,
544  *parameterSet,
545  preg_,
549  *actReg_,
550  token,
553  &processContext_));
554  }
555  }
556 
558  // Make the services available while everything is being deleted.
559  ServiceToken token = getToken();
560  ServiceRegistry::Operate op(token);
561 
562  // manually destroy all these thing that may need the services around
563  espController_.reset();
564  subProcess_.reset();
565  esp_.reset();
566  schedule_.reset();
567  input_.reset();
568  looper_.reset();
569  actReg_.reset();
570 
573  }
574 
575  void
577  if(beginJobCalled_) return;
578  beginJobCalled_=true;
579  bk::beginJob();
580 
581  // StateSentry toerror(this); // should we add this ?
582  //make the services available
584 
589  actReg_->preallocateSignal_(bounds);
591  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
592 
593  //NOTE: This implementation assumes 'Job' means one call
594  // the EventProcessor::run
595  // If it really means once per 'application' then this code will
596  // have to be changed.
597  // Also have to deal with case where have 'run' then new Module
598  // added and do 'run'
599  // again. In that case the newly added Module needs its 'beginJob'
600  // to be called.
601 
602  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
603  // For now we delay calling beginOfJob until first beginOfRun
604  //if(looper_) {
605  // looper_->beginOfJob(es);
606  //}
607  try {
608  convertException::wrap([&]() {
609  input_->doBeginJob();
610  });
611  }
612  catch(cms::Exception& ex) {
613  ex.addContext("Calling beginJob for the source");
614  throw;
615  }
616  schedule_->beginJob(*preg_);
617  // toerror.succeeded(); // should we add this?
618  if(hasSubProcess()) subProcess_->doBeginJob();
619  actReg_->postBeginJobSignal_();
620 
621  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
622  schedule_->beginStream(i);
623  if(hasSubProcess()) subProcess_->doBeginStream(i);
624  }
625  }
626 
627  void
629  // Collects exceptions, so we don't throw before all operations are performed.
630  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
631 
632  //make the services available
634 
635  //NOTE: this really should go elsewhere in the future
636  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
637  c.call([this,i](){this->schedule_->endStream(i);});
638  if(hasSubProcess()) {
639  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
640  }
641  }
642  auto actReg = actReg_.get();
643  c.call([actReg](){actReg->preEndJobSignal_();});
644  schedule_->endJob(c);
645  if(hasSubProcess()) {
646  c.call(std::bind(&SubProcess::doEndJob, subProcess_.get()));
647  }
648  c.call(std::bind(&InputSource::doEndJob, input_.get()));
649  if(looper_) {
650  c.call(std::bind(&EDLooperBase::endOfJob, looper_));
651  }
652  c.call([actReg](){actReg->postEndJobSignal_();});
653  if(c.hasThrown()) {
654  c.rethrow();
655  }
656  }
657 
660  return serviceToken_;
661  }
662 
663  //Setup signal handler to listen for when forked children stop
664  namespace {
665  //These are volatile since the compiler can not be allowed to optimize them
666  // since they can be modified in the signaller handler
667  volatile bool child_failed = false;
668  volatile unsigned int num_children_done = 0;
669  volatile int child_fail_exit_status = 0;
670  volatile int child_fail_signal = 0;
671 
672  //NOTE: We setup the signal handler to run in the main thread which
673  // is also the same thread that then reads the above values
674 
675  extern "C" {
676  void ep_sigchld(int, siginfo_t*, void*) {
677  //printf("in sigchld\n");
678  //FDEBUG(1) << "in sigchld handler\n";
679  int stat_loc;
680  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
681  while(0<p) {
682  //printf(" looping\n");
683  if(WIFEXITED(stat_loc)) {
684  ++num_children_done;
685  if(0 != WEXITSTATUS(stat_loc)) {
686  child_fail_exit_status = WEXITSTATUS(stat_loc);
687  child_failed = true;
688  }
689  }
690  if(WIFSIGNALED(stat_loc)) {
691  ++num_children_done;
692  child_fail_signal = WTERMSIG(stat_loc);
693  child_failed = true;
694  }
695  p = waitpid(-1, &stat_loc, WNOHANG);
696  }
697  }
698  }
699 
700  }
701 
702  enum {
707  };
708 
709  namespace {
710  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
711  unsigned int n = 0;
712  while(numberOfChildren != 0) {
713  ++n;
714  numberOfChildren /= 10;
715  }
716  if(n == 0) {
717  n = 3; // Protect against zero numberOfChildren
718  }
719  return n;
720  }
721 
722  /*This class embodied the thread which is used to listen to the forked children and
723  then tell them which events they should process */
724  class MessageSenderToSource {
725  public:
726  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
727  void operator()();
728 
729  private:
730  const std::vector<int>& m_childrenPipes;
731  long const m_nEventsToProcess;
732  fd_set m_socketSet;
733  unsigned int m_aliveChildren;
734  int m_maxFd;
735  };
736 
737  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
738  std::vector<int> const& childrenPipes,
739  long iNEventsToProcess):
740  m_childrenPipes(childrenPipes),
741  m_nEventsToProcess(iNEventsToProcess),
742  m_aliveChildren(childrenSockets.size()),
743  m_maxFd(0)
744  {
745  FD_ZERO(&m_socketSet);
746  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
747  it != itEnd; it++) {
748  FD_SET(*it, &m_socketSet);
749  if (*it > m_maxFd) {
750  m_maxFd = *it;
751  }
752  }
753  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
754  it != itEnd; ++it) {
755  FD_SET(*it, &m_socketSet);
756  if (*it > m_maxFd) {
757  m_maxFd = *it;
758  }
759  }
760  m_maxFd++; // select reads [0,m_maxFd).
761  }
762 
763  /* This function is the heart of the communication between parent and child.
764  * When ready for more data, the child (see MessageReceiverForSource) requests
765  * data through a AF_UNIX socket message. The parent will then assign the next
766  * chunk of data by sending a message back.
767  *
768  * Additionally, this function also monitors the read-side of the pipe fd from the child.
769  * If the child dies unexpectedly, the pipe will be selected as ready for read and
770  * will return EPIPE when read from. Further, if the child thinks the parent has died
771  * (defined as waiting more than 1s for a response), it will write a single byte to
772  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
773  * If still alive, the parent will read the byte and ignore it.
774  *
775  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
776  * handler can distinguish between success and failure cases.
777  */
778 
779  void
780  MessageSenderToSource::operator()() {
781  multicore::MessageForParent childMsg;
782  LogInfo("ForkingController") << "I am controller";
783  //this is the master and therefore the controller
784 
785  multicore::MessageForSource sndmsg;
786  sndmsg.startIndex = 0;
787  sndmsg.nIndices = m_nEventsToProcess;
788  do {
789 
790  fd_set readSockets, errorSockets;
791  // Wait for a request from a child for events.
792  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
793  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
794  // Note that we don't timeout; may be reconsidered in the future.
795  ssize_t rc;
796  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
797  if (rc < 0) {
798  std::cerr << "select failed; should be impossible due to preconditions.\n";
799  abort();
800  break;
801  }
802 
803  // Read the message from the child.
804  for (int idx=0; idx<m_maxFd; idx++) {
805 
806  // Handle errors
807  if (FD_ISSET(idx, &errorSockets)) {
808  LogInfo("ForkingController") << "Error on socket " << idx;
809  FD_CLR(idx, &m_socketSet);
810  close(idx);
811  // See if it was the watchdog pipe that died.
812  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
813  if (*it == idx) {
814  m_aliveChildren--;
815  }
816  }
817  continue;
818  }
819 
820  if (!FD_ISSET(idx, &readSockets)) {
821  continue;
822  }
823 
824  // See if this FD is a child watchdog pipe. If so, read from it to prevent
825  // writes from blocking.
826  bool is_pipe = false;
827  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
828  if (*it == idx) {
829  is_pipe = true;
830  char buf;
831  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
832  if (rc <= 0) {
833  m_aliveChildren--;
834  FD_CLR(idx, &m_socketSet);
835  close(idx);
836  }
837  }
838  }
839 
840  // Only execute this block if the FD is a socket for sending the child work.
841  if (!is_pipe) {
842  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
843  if (rc < 0) {
844  FD_CLR(idx, &m_socketSet);
845  close(idx);
846  continue;
847  }
848 
849  // Tell the child what events to process.
850  // If 'send' fails, then the child process has failed (any other possibilities are
851  // eliminated because we are using fixed-size messages with Unix datagram sockets).
852  // Thus, the SIGCHLD handler will fire and set child_fail = true.
853  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
854  if (rc < 0) {
855  FD_CLR(idx, &m_socketSet);
856  close(idx);
857  continue;
858  }
859  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
860  sndmsg.startIndex += sndmsg.nIndices;
861  }
862  }
863 
864  } while (m_aliveChildren > 0);
865 
866  return;
867  }
868 
869  }
870 
871 
872  void EventProcessor::possiblyContinueAfterForkChildFailure() {
873  if(child_failed && continueAfterChildFailure_) {
874  if (child_fail_signal) {
875  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
876  child_fail_signal=0;
877  } else if (child_fail_exit_status) {
878  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
879  child_fail_exit_status=0;
880  } else {
881  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
882  }
883  child_failed =false;
884  }
885  }
886 
887  bool
888  EventProcessor::forkProcess(std::string const& jobReportFile) {
889 
890  if(0 == numberOfForkedChildren_) {return true;}
891  assert(0<numberOfForkedChildren_);
892  //do what we want done in common
893  {
894  beginJob(); //make sure this was run
895  // make the services available
896  ServiceRegistry::Operate operate(serviceToken_);
897 
898  InputSource::ItemType itemType;
899  itemType = input_->nextItemType();
900 
901  assert(itemType == InputSource::IsFile);
902  {
903  readFile();
904  }
905  itemType = input_->nextItemType();
906  assert(itemType == InputSource::IsRun);
907 
908  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
909  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
910  input_->runAuxiliary()->beginTime());
911  espController_->eventSetupForInstance(ts);
912  EventSetup const& es = esp_->eventSetup();
913 
914  //now get all the data available in the EventSetup
915  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
916  es.fillAvailableRecordKeys(recordKeys);
917  std::vector<eventsetup::DataKey> dataKeys;
918  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
919  itKey != itEnd;
920  ++itKey) {
921  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
922  //see if this is on our exclusion list
923  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
924  ExcludedData const* excludedData(nullptr);
925  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
926  excludedData = &(itExcludeRec->second);
927  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
928  //skip all items in this record
929  continue;
930  }
931  }
932  if(0 != recordPtr) {
933  dataKeys.clear();
934  recordPtr->fillRegisteredDataKeys(dataKeys);
935  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
936  itDataKey != itDataKeyEnd;
937  ++itDataKey) {
938  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
939  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
940  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
941  continue;
942  }
943  try {
944  recordPtr->doGet(*itDataKey);
945  } catch(cms::Exception& e) {
946  LogWarning("ForkingEventSetupPreFetching") << e.what();
947  }
948  }
949  }
950  }
951  }
952  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
953  {
954  // make the services available
955  ServiceRegistry::Operate operate(serviceToken_);
956  Service<JobReport> jobReport;
957  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
958 
959  //Now actually do the forking
960  actReg_->preForkReleaseResourcesSignal_();
961  input_->doPreForkReleaseResources();
962  schedule_->preForkReleaseResources();
963  }
964  installCustomHandler(SIGCHLD, ep_sigchld);
965 
966 
967  unsigned int childIndex = 0;
968  unsigned int const kMaxChildren = numberOfForkedChildren_;
969  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
970  std::vector<pid_t> childrenIds;
971  childrenIds.reserve(kMaxChildren);
972  std::vector<int> childrenSockets;
973  childrenSockets.reserve(kMaxChildren);
974  std::vector<int> childrenPipes;
975  childrenPipes.reserve(kMaxChildren);
976  std::vector<int> childrenSocketsCopy;
977  childrenSocketsCopy.reserve(kMaxChildren);
978  std::vector<int> childrenPipesCopy;
979  childrenPipesCopy.reserve(kMaxChildren);
980  int pipes[] {0, 0};
981 
982  {
983  // make the services available
984  ServiceRegistry::Operate operate(serviceToken_);
985  Service<JobReport> jobReport;
986  int sockets[2], fd_flags;
987  for(; childIndex < kMaxChildren; ++childIndex) {
988  // Create a UNIX_DGRAM socket pair
989  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
990  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
991  exit(EXIT_FAILURE);
992  }
993  if (pipe(pipes)) {
994  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
995  exit(EXIT_FAILURE);
996  }
997  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
998  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
999  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1000  exit(EXIT_FAILURE);
1001  }
1002  // Mark socket as non-block. Child must be careful to do select prior
1003  // to reading from socket.
1004  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1005  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1006  exit(EXIT_FAILURE);
1007  }
1008  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1009  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1010  exit(EXIT_FAILURE);
1011  }
1012  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1013  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1014  exit(EXIT_FAILURE);
1015  }
1016  // Linux man page notes there are some edge cases where reading from a
1017  // fd can block, even after a select.
1018  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1019  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1023  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026 
1027  childrenPipesCopy = childrenPipes;
1028  childrenSocketsCopy = childrenSockets;
1029 
1030  pid_t value = fork();
1031  if(value == 0) {
1032  // Close the parent's side of the socket and pipe which will talk to us.
1033  close(pipes[0]);
1034  close(sockets[0]);
1035  // Close our copies of the parent's other communication pipes.
1036  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1037  close(*it);
1038  }
1039  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1040  close(*it);
1041  }
1042 
1043  // this is the child process, redirect stdout and stderr to a log file
1044  fflush(stdout);
1045  fflush(stderr);
1046  std::stringstream stout;
1047  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1048  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1049  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1050  }
1051  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1052  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1053  }
1054 
1055  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1056  if(setCpuAffinity_) {
1057  // CPU affinity is handled differently on macosx.
1058  // We disable it and print a message until someone reads:
1059  //
1060  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1061  //
1062  // and implements it.
1063 #ifdef __APPLE__
1064  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1065 #else
1066  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1067  cpu_set_t mask;
1068  CPU_ZERO(&mask);
1069  CPU_SET(childIndex, &mask);
1070  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1071  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1072  exit(-1);
1073  }
1074 #endif
1075  }
1076  break;
1077  } else {
1078  //this is the parent
1079  close(pipes[1]);
1080  close(sockets[1]);
1081  }
1082  if(value < 0) {
1083  LogError("ForkingChild") << "failed to create a child";
1084  exit(-1);
1085  }
1086  childrenIds.push_back(value);
1087  childrenSockets.push_back(sockets[0]);
1088  childrenPipes.push_back(pipes[0]);
1089  }
1090 
1091  if(childIndex < kMaxChildren) {
1092  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1093  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1094 
1095  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1096  input_->doPostForkReacquireResources(receiver);
1097  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1098  //NOTE: sources have to reset themselves by listening to the post fork message
1099  //rewindInput();
1100  return true;
1101  }
1102  jobReport->parentAfterFork(jobReportFile);
1103  }
1104 
1105  //this is the original, which is now the master for all the children
1106 
1107  //Need to wait for signals from the children or externally
1108  // To wait we must
1109  // 1) block the signals we want to wait on so we do not have a race condition
1110  // 2) check that we haven't already meet our ending criteria
1111  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1112  sigset_t blockingSigSet;
1113  sigset_t unblockingSigSet;
1114  sigset_t oldSigSet;
1115  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1116  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1117  sigaddset(&blockingSigSet, SIGCHLD);
1118  sigaddset(&blockingSigSet, SIGUSR2);
1119  sigaddset(&blockingSigSet, SIGINT);
1120  sigdelset(&unblockingSigSet, SIGCHLD);
1121  sigdelset(&unblockingSigSet, SIGUSR2);
1122  sigdelset(&unblockingSigSet, SIGINT);
1123  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1124 
1125  // If there are too many fd's (unlikely, but possible) for select, denote this
1126  // because the sender will fail.
1127  bool too_many_fds = false;
1128  if (pipes[1]+1 > FD_SETSIZE) {
1129  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1130  too_many_fds = true;
1131  }
1132 
1133  //create a thread that sends the units of work to workers
1134  // we create it after all signals were blocked so that this
1135  // thread is never interupted by a signal
1136  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1137  boost::thread senderThread(sender);
1138 
1139  if(not too_many_fds) {
1140  //NOTE: a child could have failed before we got here and even after this call
1141  // which is why the 'if' is conditional on continueAfterChildFailure_
1142  possiblyContinueAfterForkChildFailure();
1143  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1144  sigsuspend(&unblockingSigSet);
1145  possiblyContinueAfterForkChildFailure();
1146  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1147  }
1148  }
1149  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1150 
1151  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1152  if(child_failed) {
1153  LogError("ForkingStopping") << "child failed";
1154  }
1155  if(shutdown_flag) {
1156  LogSystem("ForkingStopping") << "asked to shutdown";
1157  }
1158 
1159  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1160  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1161  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1162  it != itEnd; ++it) {
1163  /* int result = */ kill(*it, SIGUSR2);
1164  }
1165  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1166  while(num_children_done != kMaxChildren) {
1167  sigsuspend(&unblockingSigSet);
1168  }
1169  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1170  }
1171  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1172  senderThread.join();
1173  if(child_failed && !continueAfterChildFailure_) {
1174  if (child_fail_signal) {
1175  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1176  } else if (child_fail_exit_status) {
1177  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1178  } else {
1179  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1180  }
1181  }
1182  if(too_many_fds) {
1183  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1184  }
1185  return false;
1186  }
1187 
1188  std::vector<ModuleDescription const*>
1189  EventProcessor::getAllModuleDescriptions() const {
1190  return schedule_->getAllModuleDescriptions();
1191  }
1192 
1193  int
1194  EventProcessor::totalEvents() const {
1195  return schedule_->totalEvents();
1196  }
1197 
1198  int
1199  EventProcessor::totalEventsPassed() const {
1200  return schedule_->totalEventsPassed();
1201  }
1202 
1203  int
1204  EventProcessor::totalEventsFailed() const {
1205  return schedule_->totalEventsFailed();
1206  }
1207 
1208  void
1209  EventProcessor::enableEndPaths(bool active) {
1210  schedule_->enableEndPaths(active);
1211  }
1212 
1213  bool
1214  EventProcessor::endPathsEnabled() const {
1215  return schedule_->endPathsEnabled();
1216  }
1217 
1218  void
1219  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1220  schedule_->getTriggerReport(rep);
1221  }
1222 
1223  void
1224  EventProcessor::clearCounters() {
1225  schedule_->clearCounters();
1226  }
1227 
1228 
1229  std::auto_ptr<statemachine::Machine>
1230  EventProcessor::createStateMachine() {
1232  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1233  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1234  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1235  else {
1236  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1237  << fileMode_ << ".\n"
1238  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1239  }
1240 
1241  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1242  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1243  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1244  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1245  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1246  else {
1247  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1248  << emptyRunLumiMode_ << ".\n"
1249  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1250  }
1251 
1252  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1253  fileMode,
1254  emptyRunLumiMode));
1255 
1256  machine->initiate();
1257  return machine;
1258  }
1259 
1260  bool
1261  EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
1262  bool returnValue = false;
1263 
1264  // Look for a shutdown signal
1265  if(shutdown_flag.load(std::memory_order_acquire)) {
1266  returnValue = true;
1267  returnCode = epSignal;
1268  }
1269  return returnValue;
1270  }
1271 
1272 
1274  EventProcessor::runToCompletion() {
1275 
1276  StatusCode returnCode=epSuccess;
1277  asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1278  std::auto_ptr<statemachine::Machine> machine;
1279  {
1280  beginJob(); //make sure this was called
1281 
1282  //StatusCode returnCode = epSuccess;
1283  stateMachineWasInErrorState_ = false;
1284 
1285  // make the services available
1286  ServiceRegistry::Operate operate(serviceToken_);
1287 
1288  machine = createStateMachine();
1289  nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1290  asyncStopRequestedWhileProcessingEvents_=false;
1291  try {
1292  convertException::wrap([&]() {
1293 
1294  InputSource::ItemType itemType;
1295 
1296  while(true) {
1297 
1298  bool more = true;
1299  if(numberOfForkedChildren_ > 0) {
1300  size_t size = preg_->size();
1301  {
1302  SendSourceTerminationSignalIfException sentry(actReg_.get());
1303  more = input_->skipForForking();
1304  sentry.completedSuccessfully();
1305  }
1306  if(more) {
1307  if(size < preg_->size()) {
1308  principalCache_.adjustIndexesAfterProductRegistryAddition();
1309  }
1310  principalCache_.adjustEventsToNewProductRegistry(preg_);
1311  }
1312  }
1313  {
1314  SendSourceTerminationSignalIfException sentry(actReg_.get());
1315  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1316  sentry.completedSuccessfully();
1317  }
1318 
1319  FDEBUG(1) << "itemType = " << itemType << "\n";
1320 
1321  if(checkForAsyncStopRequest(returnCode)) {
1322  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1323  forceLooperToEnd_ = true;
1324  machine->process_event(statemachine::Stop());
1325  forceLooperToEnd_ = false;
1326  break;
1327  }
1328 
1329  if(itemType == InputSource::IsEvent) {
1330  machine->process_event(statemachine::Event());
1331  if(asyncStopRequestedWhileProcessingEvents_) {
1332  forceLooperToEnd_ = true;
1333  machine->process_event(statemachine::Stop());
1334  forceLooperToEnd_ = false;
1335  returnCode = asyncStopStatusCodeFromProcessingEvents_;
1336  break;
1337  }
1338  itemType = nextItemTypeFromProcessingEvents_;
1339  }
1340 
1341  if(itemType == InputSource::IsEvent) {
1342  }
1343  else if(itemType == InputSource::IsStop) {
1344  machine->process_event(statemachine::Stop());
1345  }
1346  else if(itemType == InputSource::IsFile) {
1347  machine->process_event(statemachine::File());
1348  }
1349  else if(itemType == InputSource::IsRun) {
1350  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1351  }
1352  else if(itemType == InputSource::IsLumi) {
1353  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1354  }
1355  else if(itemType == InputSource::IsSynchronize) {
1356  //For now, we don't have to do anything
1357  }
1358  // This should be impossible
1359  else {
1361  << "Unknown next item type passed to EventProcessor\n"
1362  << "Please report this error to the Framework group\n";
1363  }
1364  if(machine->terminated()) {
1365  break;
1366  }
1367  } // End of loop over state machine events
1368  }); // convertException::wrap
1369  } // Try block
1370  // Some comments on exception handling related to the boost state machine:
1371  //
1372  // Some states used in the machine are special because they
1373  // perform actions while the machine is being terminated, actions
1374  // such as close files, call endRun, call endLumi etc ... Each of these
1375  // states has two functions that perform these actions. The functions
1376  // are almost identical. The major difference is that one version
1377  // catches all exceptions and the other lets exceptions pass through.
1378  // The destructor catches them and the other function named "exit" lets
1379  // them pass through. On a normal termination, boost will always call
1380  // "exit" and then the state destructor. In our state classes, the
1381  // the destructors do nothing if the exit function already took
1382  // care of things. Here's the interesting part. When boost is
1383  // handling an exception the "exit" function is not called (a boost
1384  // feature).
1385  //
1386  // If an exception occurs while the boost machine is in control
1387  // (which usually means inside a process_event call), then
1388  // the boost state machine destroys its states and "terminates" itself.
1389  // This already done before we hit the catch blocks below. In this case
1390  // the call to terminateMachine below only destroys an already
1391  // terminated state machine. Because exit is not called, the state destructors
1392  // handle cleaning up lumis, runs, and files. The destructors swallow
1393  // all exceptions and only pass through the exceptions messages, which
1394  // are tacked onto the original exception below.
1395  //
1396  // If an exception occurs when the boost state machine is not
1397  // in control (outside the process_event functions), then boost
1398  // cannot destroy its own states. The terminateMachine function
1399  // below takes care of that. The flag "alreadyHandlingException"
1400  // is set true so that the state exit functions do nothing (and
1401  // cannot throw more exceptions while handling the first). Then the
1402  // state destructors take care of this because exit did nothing.
1403  //
1404  // In both cases above, the EventProcessor::endOfLoop function is
1405  // not called because it can throw exceptions.
1406  //
1407  // One tricky aspect of the state machine is that things that can
1408  // throw should not be invoked by the state machine while another
1409  // exception is being handled.
1410  // Another tricky aspect is that it appears to be important to
1411  // terminate the state machine before invoking its destructor.
1412  // We've seen crashes that are not understood when that is not
1413  // done. Maintainers of this code should be careful about this.
1414 
1415  catch (cms::Exception & e) {
1416  alreadyHandlingException_ = true;
1417  terminateMachine(machine);
1418  alreadyHandlingException_ = false;
1419  if (!exceptionMessageLumis_.empty()) {
1420  e.addAdditionalInfo(exceptionMessageLumis_);
1421  if (e.alreadyPrinted()) {
1422  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1423  }
1424  }
1425  if (!exceptionMessageRuns_.empty()) {
1426  e.addAdditionalInfo(exceptionMessageRuns_);
1427  if (e.alreadyPrinted()) {
1428  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1429  }
1430  }
1431  if (!exceptionMessageFiles_.empty()) {
1432  e.addAdditionalInfo(exceptionMessageFiles_);
1433  if (e.alreadyPrinted()) {
1434  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1435  }
1436  }
1437  throw;
1438  }
1439 
1440  if(machine->terminated()) {
1441  FDEBUG(1) << "The state machine reports it has been terminated\n";
1442  machine.reset();
1443  }
1444 
1445  if(stateMachineWasInErrorState_) {
1446  throw cms::Exception("BadState")
1447  << "The boost state machine in the EventProcessor exited after\n"
1448  << "entering the Error state.\n";
1449  }
1450 
1451  }
1452  if(machine.get() != 0) {
1453  terminateMachine(machine);
1455  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1456  << "Please report this error to the Framework group\n";
1457  }
1458 
1459  return returnCode;
1460  }
1461 
1462  void EventProcessor::readFile() {
1463  FDEBUG(1) << " \treadFile\n";
1464  size_t size = preg_->size();
1465  SendSourceTerminationSignalIfException sentry(actReg_.get());
1466 
1467  fb_ = input_->readFile();
1468  if(size < preg_->size()) {
1469  principalCache_.adjustIndexesAfterProductRegistryAddition();
1470  }
1471  principalCache_.adjustEventsToNewProductRegistry(preg_);
1472  if((numberOfForkedChildren_ > 0) or
1473  (preallocations_.numberOfStreams()>1 and
1474  preallocations_.numberOfThreads()>1)) {
1475  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1476  }
1477  sentry.completedSuccessfully();
1478  }
1479 
1480  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1481  if (fb_.get() != nullptr) {
1482  SendSourceTerminationSignalIfException sentry(actReg_.get());
1483  input_->closeFile(fb_.get(), cleaningUpAfterException);
1484  sentry.completedSuccessfully();
1485  }
1486  FDEBUG(1) << "\tcloseInputFile\n";
1487  }
1488 
1489  void EventProcessor::openOutputFiles() {
1490  if (fb_.get() != nullptr) {
1491  schedule_->openOutputFiles(*fb_);
1492  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1493  }
1494  FDEBUG(1) << "\topenOutputFiles\n";
1495  }
1496 
1497  void EventProcessor::closeOutputFiles() {
1498  if (fb_.get() != nullptr) {
1499  schedule_->closeOutputFiles();
1500  if(hasSubProcess()) subProcess_->closeOutputFiles();
1501  }
1502  FDEBUG(1) << "\tcloseOutputFiles\n";
1503  }
1504 
1505  void EventProcessor::respondToOpenInputFile() {
1506  if(hasSubProcess()) {
1507  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1508  }
1509  if (fb_.get() != nullptr) {
1510  schedule_->respondToOpenInputFile(*fb_);
1511  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1512  }
1513  FDEBUG(1) << "\trespondToOpenInputFile\n";
1514  }
1515 
1516  void EventProcessor::respondToCloseInputFile() {
1517  if (fb_.get() != nullptr) {
1518  schedule_->respondToCloseInputFile(*fb_);
1519  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1520  }
1521  FDEBUG(1) << "\trespondToCloseInputFile\n";
1522  }
1523 
1524  void EventProcessor::startingNewLoop() {
1525  shouldWeStop_ = false;
1526  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1527  // until after we've called beginOfJob
1528  if(looper_ && looperBeginJobRun_) {
1529  looper_->doStartingNewLoop();
1530  }
1531  FDEBUG(1) << "\tstartingNewLoop\n";
1532  }
1533 
1534  bool EventProcessor::endOfLoop() {
1535  if(looper_) {
1536  ModuleChanger changer(schedule_.get(),preg_.get());
1537  looper_->setModuleChanger(&changer);
1538  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1539  looper_->setModuleChanger(nullptr);
1540  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1541  else return false;
1542  }
1543  FDEBUG(1) << "\tendOfLoop\n";
1544  return true;
1545  }
1546 
1547  void EventProcessor::rewindInput() {
1548  input_->repeat();
1549  input_->rewind();
1550  FDEBUG(1) << "\trewind\n";
1551  }
1552 
1553  void EventProcessor::prepareForNextLoop() {
1554  looper_->prepareForNextLoop(esp_.get());
1555  FDEBUG(1) << "\tprepareForNextLoop\n";
1556  }
1557 
1558  bool EventProcessor::shouldWeCloseOutput() const {
1559  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1560  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1561  }
1562 
1563  void EventProcessor::doErrorStuff() {
1564  FDEBUG(1) << "\tdoErrorStuff\n";
1565  LogError("StateMachine")
1566  << "The EventProcessor state machine encountered an unexpected event\n"
1567  << "and went to the error state\n"
1568  << "Will attempt to terminate processing normally\n"
1569  << "(IF using the looper the next loop will be attempted)\n"
1570  << "This likely indicates a bug in an input module or corrupted input or both\n";
1571  stateMachineWasInErrorState_ = true;
1572  }
1573 
1574  void EventProcessor::beginRun(statemachine::Run const& run) {
1575  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1576  {
1577  SendSourceTerminationSignalIfException sentry(actReg_.get());
1578 
1579  input_->doBeginRun(runPrincipal, &processContext_);
1580  sentry.completedSuccessfully();
1581  }
1582 
1583  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1584  runPrincipal.beginTime());
1585  if(forceESCacheClearOnNewRun_){
1586  espController_->forceCacheClear();
1587  }
1588  {
1589  SendSourceTerminationSignalIfException sentry(actReg_.get());
1590  espController_->eventSetupForInstance(ts);
1591  sentry.completedSuccessfully();
1592  }
1593  EventSetup const& es = esp_->eventSetup();
1594  if(looper_ && looperBeginJobRun_== false) {
1595  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1596  looper_->beginOfJob(es);
1597  looperBeginJobRun_ = true;
1598  looper_->doStartingNewLoop();
1599  }
1600  {
1602  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1603  if(hasSubProcess()) {
1604  subProcess_->doBeginRun(runPrincipal, ts);
1605  }
1606  }
1607  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1608  if(looper_) {
1609  looper_->doBeginRun(runPrincipal, es, &processContext_);
1610  }
1611  {
1613  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1614  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1615  if(hasSubProcess()) {
1616  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1617  }
1618  }
1619  }
1620  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1621  if(looper_) {
1622  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1623  }
1624  }
1625 
1626  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1627  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1628  {
1629  SendSourceTerminationSignalIfException sentry(actReg_.get());
1630 
1631  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1632  sentry.completedSuccessfully();
1633  }
1634 
1635  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1636  runPrincipal.endTime());
1637  {
1638  SendSourceTerminationSignalIfException sentry(actReg_.get());
1639  espController_->eventSetupForInstance(ts);
1640  sentry.completedSuccessfully();
1641  }
1642  EventSetup const& es = esp_->eventSetup();
1643  {
1644  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1646  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1647  if(hasSubProcess()) {
1648  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1649  }
1650  }
1651  }
1652  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1653  if(looper_) {
1654  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1655  }
1656  {
1658  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1659  if(hasSubProcess()) {
1660  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1661  }
1662  }
1663  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1664  if(looper_) {
1665  looper_->doEndRun(runPrincipal, es, &processContext_);
1666  }
1667  }
1668 
1669  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1670  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1671  {
1672  SendSourceTerminationSignalIfException sentry(actReg_.get());
1673 
1674  input_->doBeginLumi(lumiPrincipal, &processContext_);
1675  sentry.completedSuccessfully();
1676  }
1677 
1679  if(rng.isAvailable()) {
1680  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1681  rng->preBeginLumi(lb);
1682  }
1683 
1684  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1685  // lumi blocks know their start and end times why not also start and end events?
1686  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1687  {
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689  espController_->eventSetupForInstance(ts);
1690  sentry.completedSuccessfully();
1691  }
1692  EventSetup const& es = esp_->eventSetup();
1693  {
1695  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1696  if(hasSubProcess()) {
1697  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1698  }
1699  }
1700  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1701  if(looper_) {
1702  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1703  }
1704  {
1705  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1707  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1708  if(hasSubProcess()) {
1709  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1710  }
1711  }
1712  }
1713  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1714  if(looper_) {
1715  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1716  }
1717  }
1718 
1719  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1720  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1721  {
1722  SendSourceTerminationSignalIfException sentry(actReg_.get());
1723 
1724  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1725  sentry.completedSuccessfully();
1726  }
1727  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1728  // lumi blocks know their start and end times why not also start and end events?
1729  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1730  lumiPrincipal.endTime());
1731  {
1732  SendSourceTerminationSignalIfException sentry(actReg_.get());
1733  espController_->eventSetupForInstance(ts);
1734  sentry.completedSuccessfully();
1735  }
1736  EventSetup const& es = esp_->eventSetup();
1737  {
1738  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1740  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1741  if(hasSubProcess()) {
1742  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1743  }
1744  }
1745  }
1746  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1747  if(looper_) {
1748  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1749  }
1750  {
1752  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1753  if(hasSubProcess()) {
1754  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1755  }
1756  }
1757  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1758  if(looper_) {
1759  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1760  }
1761  }
1762 
1763  statemachine::Run EventProcessor::readRun() {
1764  if (principalCache_.hasRunPrincipal()) {
1766  << "EventProcessor::readRun\n"
1767  << "Illegal attempt to insert run into cache\n"
1768  << "Contact a Framework Developer\n";
1769  }
1770  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1771  {
1772  SendSourceTerminationSignalIfException sentry(actReg_.get());
1773  input_->readRun(*rp, *historyAppender_);
1774  sentry.completedSuccessfully();
1775  }
1776  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1777  principalCache_.insert(rp);
1778  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1779  }
1780 
1781  statemachine::Run EventProcessor::readAndMergeRun() {
1782  principalCache_.merge(input_->runAuxiliary(), preg_);
1783  auto runPrincipal =principalCache_.runPrincipalPtr();
1784  {
1785  SendSourceTerminationSignalIfException sentry(actReg_.get());
1786  input_->readAndMergeRun(*runPrincipal);
1787  sentry.completedSuccessfully();
1788  }
1789  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1790  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1791  }
1792 
1793  int EventProcessor::readLuminosityBlock() {
1794  if (principalCache_.hasLumiPrincipal()) {
1796  << "EventProcessor::readRun\n"
1797  << "Illegal attempt to insert lumi into cache\n"
1798  << "Contact a Framework Developer\n";
1799  }
1800  if (!principalCache_.hasRunPrincipal()) {
1802  << "EventProcessor::readRun\n"
1803  << "Illegal attempt to insert lumi into cache\n"
1804  << "Run is invalid\n"
1805  << "Contact a Framework Developer\n";
1806  }
1807  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1808  {
1809  SendSourceTerminationSignalIfException sentry(actReg_.get());
1810  input_->readLuminosityBlock(*lbp, *historyAppender_);
1811  sentry.completedSuccessfully();
1812  }
1813  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1814  principalCache_.insert(lbp);
1815  return input_->luminosityBlock();
1816  }
1817 
1818  int EventProcessor::readAndMergeLumi() {
1819  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1820  {
1821  SendSourceTerminationSignalIfException sentry(actReg_.get());
1822  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1823  sentry.completedSuccessfully();
1824  }
1825  return input_->luminosityBlock();
1826  }
1827 
1828  void EventProcessor::writeRun(statemachine::Run const& run) {
1829  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1830  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1831  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1832  }
1833 
1834  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
1835  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1836  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1837  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1838  }
1839 
1840  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1841  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1842  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1843  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1844  }
1845 
1846  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1847  principalCache_.deleteLumi(phid, run, lumi);
1848  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1849  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1850  }
1851 
1852  class StreamProcessingTask : public tbb::task {
1853  public:
1855  unsigned int iStreamIndex,
1856  std::atomic<bool>* iFinishedProcessingEvents,
1857  tbb::task* iWaitTask):
1858  m_proc(iProc),
1859  m_streamID(iStreamIndex),
1860  m_finishedProcessingEvents(iFinishedProcessingEvents),
1861  m_waitTask(iWaitTask){}
1862 
1863  tbb::task* execute() {
1864  m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1865  m_waitTask->decrement_ref_count();
1866  return nullptr;
1867  }
1868  private:
1870  unsigned int m_streamID;
1871  std::atomic<bool>* m_finishedProcessingEvents;
1872  tbb::task* m_waitTask;
1873  };
1874 
1875  void EventProcessor::processEventsForStreamAsync(unsigned int iStreamIndex,
1876  std::atomic<bool>* finishedProcessingEvents) {
1877  try {
1878  // make the services available
1879  ServiceRegistry::Operate operate(serviceToken_);
1880  if(preallocations_.numberOfStreams()>1) {
1882  handler->initializeThisThreadForUse();
1883  }
1884 
1885  if(iStreamIndex==0) {
1886  processEvent(0);
1887  }
1888  do {
1889  if(shouldWeStop()) {
1890  break;
1891  }
1892  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1893  //another thread hit an exception
1894  //std::cerr<<"another thread saw an exception\n";
1895  break;
1896  }
1897  {
1898 
1899 
1900  {
1901  //nextItemType and readEvent need to be in same critical section
1902  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1903 
1904  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1905  //std::cerr<<"finishedProcessingEvents\n";
1906  break;
1907  }
1908 
1909  //If source and DelayedReader share a resource we must serialize them
1910  auto sr = input_->resourceSharedWithDelayedReader();
1911  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1912  if(sr) {
1913  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1914  }
1915  InputSource::ItemType itemType = input_->nextItemType();
1916  if (InputSource::IsEvent !=itemType) {
1917  nextItemTypeFromProcessingEvents_ = itemType;
1918  finishedProcessingEvents->store(true,std::memory_order_release);
1919  //std::cerr<<"next item type "<<itemType<<"\n";
1920  break;
1921  }
1922  if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1923  //std::cerr<<"task told to async stop\n";
1924  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1925  break;
1926  }
1927  readEvent(iStreamIndex);
1928  }
1929  }
1930  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1931  //another thread hit an exception
1932  //std::cerr<<"another thread saw an exception\n";
1933  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1934 
1935  break;
1936  }
1937  processEvent(iStreamIndex);
1938  }while(true);
1939  } catch (...) {
1940  bool expected =false;
1941  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1942  deferredExceptionPtr_ = std::current_exception();
1943  }
1944  //std::cerr<<"task caught exception\n";
1945  }
1946  }
1947 
1948  void EventProcessor::readAndProcessEvent() {
1949  if(numberOfForkedChildren_>0) {
1950  readEvent(0);
1951  processEvent(0);
1952  return;
1953  }
1954  nextItemTypeFromProcessingEvents_ = InputSource::IsEvent; //needed for looper
1955  asyncStopRequestedWhileProcessingEvents_ = false;
1956 
1957  std::atomic<bool> finishedProcessingEvents{false};
1958 
1959  //Task assumes Stream 0 has already read the event that caused us to go here
1960  readEvent(0);
1961 
1962  //To wait, the ref count has to b 1+#streams
1963  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1964  eventLoopWaitTask->increment_ref_count();
1965 
1966  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1967  unsigned int iStreamIndex = 0;
1968  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1969  eventLoopWaitTask->increment_ref_count();
1970  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1971 
1972  }
1973  eventLoopWaitTask->increment_ref_count();
1974  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1975  tbb::task::destroy(*eventLoopWaitTask);
1976 
1977  //One of the processing threads saw an exception
1978  if(deferredExceptionPtrIsSet_) {
1979  std::rethrow_exception(deferredExceptionPtr_);
1980  }
1981  }
1982  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1983  //TODO this will have to become per stream
1984  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1985  StreamContext streamContext(event.streamID(), &processContext_);
1986 
1987  SendSourceTerminationSignalIfException sentry(actReg_.get());
1988  input_->readEvent(event, streamContext);
1989  sentry.completedSuccessfully();
1990 
1991  FDEBUG(1) << "\treadEvent\n";
1992  }
1993  void EventProcessor::processEvent(unsigned int iStreamIndex) {
1994  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1995  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1997  if(rng.isAvailable()) {
1998  Event ev(*pep, ModuleDescription(), nullptr);
1999  rng->postEventRead(ev);
2000  }
2001  assert(pep->luminosityBlockPrincipalPtrValid());
2002  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2003  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2004 
2005  //We can only update IOVs on Lumi boundaries
2006  //IOVSyncValue ts(pep->id(), pep->time());
2007  //espController_->eventSetupForInstance(ts);
2008  EventSetup const& es = esp_->eventSetup();
2009  {
2011  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2012  if(hasSubProcess()) {
2013  subProcess_->doEvent(*pep);
2014  }
2015  }
2016 
2017  //NOTE: If we have a looper we only have one Stream
2018  if(looper_) {
2019  bool randomAccess = input_->randomAccess();
2020  ProcessingController::ForwardState forwardState = input_->forwardState();
2021  ProcessingController::ReverseState reverseState = input_->reverseState();
2022  ProcessingController pc(forwardState, reverseState, randomAccess);
2023 
2024  EDLooperBase::Status status = EDLooperBase::kContinue;
2025  do {
2026 
2027  StreamContext streamContext(pep->streamID(), &processContext_);
2028  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2029 
2030  bool succeeded = true;
2031  if(randomAccess) {
2032  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2033  input_->skipEvents(-2);
2034  }
2035  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2036  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2037  }
2038  }
2039  pc.setLastOperationSucceeded(succeeded);
2040  } while(!pc.lastOperationSucceeded());
2041  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2042 
2043  }
2044 
2045  FDEBUG(1) << "\tprocessEvent\n";
2046  pep->clearEventPrincipal();
2047  }
2048 
2049  bool EventProcessor::shouldWeStop() const {
2050  FDEBUG(1) << "\tshouldWeStop\n";
2051  if(shouldWeStop_) return true;
2052  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2053  }
2054 
2055  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2056  exceptionMessageFiles_ = message;
2057  }
2058 
2059  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2060  exceptionMessageRuns_ = message;
2061  }
2062 
2063  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2064  exceptionMessageLumis_ = message;
2065  }
2066 
2067  bool EventProcessor::alreadyHandlingException() const {
2068  return alreadyHandlingException_;
2069  }
2070 
2071  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2072  if(iMachine.get() != 0) {
2073  if(!iMachine->terminated()) {
2074  forceLooperToEnd_ = true;
2075  iMachine->process_event(statemachine::Stop());
2076  forceLooperToEnd_ = false;
2077  }
2078  else {
2079  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2080  }
2081  if(iMachine->terminated()) {
2082  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2083  }
2084  iMachine.reset();
2085  }
2086  }
2087 }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:56
virtual char const * what() const
Definition: Exception.cc:141
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:60
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:254
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:564
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
tuple lumi
Definition: fjr2json.py:35
assert(m_qm.get())
Timestamp const & endTime() const
processConfiguration
Definition: Schedule.cc:370
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
bool ev
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
std::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:61
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
static std::string const input
Definition: EdmProvDump.cc:43
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
static InputSourceFactory const * get()
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::shared_ptr< edm::ParameterSet > parameterSet()
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:44
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::unique_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:57
virtual void endOfJob()
Definition: EDLooperBase.cc:90
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
areg
Definition: Schedule.cc:370
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
Definition: ScheduleItems.h:59
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
Definition: ScheduleItems.h:58
ServiceToken getToken()
edm::RunNumber_t runNumber() const
Definition: EPStates.h:49
static ComponentFactory< T > const * get()
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:48
PathsAndConsumesOfModules pathsAndConsumesOfModules_
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
#define O_NONBLOCK
Definition: SysFile.h:21
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
volatile std::atomic< bool > shutdown_flag false
void call(std::function< void(void)>)
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
tuple status
Definition: ntuplemaker.py:245
preg
Definition: Schedule.cc:370
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:16
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31