CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
9 
34 
36 
44 
49 
59 
60 #include "MessageForSource.h"
61 #include "MessageForParent.h"
62 
63 #include "boost/bind.hpp"
64 #include "boost/thread/xtime.hpp"
65 
66 #include <exception>
67 #include <iomanip>
68 #include <iostream>
69 #include <utility>
70 #include <sstream>
71 
72 #include <sys/ipc.h>
73 #include <sys/msg.h>
74 
75 #include "tbb/task.h"
76 
77 //Used for forking
78 #include <sys/types.h>
79 #include <sys/wait.h>
80 #include <sys/socket.h>
81 #include <sys/select.h>
82 #include <sys/fcntl.h>
83 #include <unistd.h>
84 
85 //Used for CPU affinity
86 #ifndef __APPLE__
87 #include <sched.h>
88 #endif
89 
90 //Needed for introspection
91 #include "Cintex/Cintex.h"
92 
93 namespace edm {
94 
95  // ---------------------------------------------------------------
96  std::unique_ptr<InputSource>
98  CommonParams const& common,
100  boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
101  boost::shared_ptr<ActivityRegistry> areg,
102  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
103  PreallocationConfiguration const& allocations) {
104  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
105  if(main_input == 0) {
107  << "There must be exactly one source in the configuration.\n"
108  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
109  }
110 
111  std::string modtype(main_input->getParameter<std::string>("@module_type"));
112 
113  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
115  ConfigurationDescriptions descriptions(filler->baseType());
116  filler->fill(descriptions);
117 
118  try {
119  convertException::wrap([&]() {
120  descriptions.validate(*main_input, std::string("source"));
121  });
122  }
123  catch (cms::Exception & iException) {
124  std::ostringstream ost;
125  ost << "Validating configuration of input source of type " << modtype;
126  iException.addContext(ost.str());
127  throw;
128  }
129 
130  main_input->registerIt();
131 
132  // Fill in "ModuleDescription", in case the input source produces
133  // any EDproducts, which would be registered in the ProductRegistry.
134  // Also fill in the process history item for this process.
135  // There is no module label for the unnamed input source, so
136  // just use "source".
137  // Only the tracked parameters belong in the process configuration.
138  ModuleDescription md(main_input->id(),
139  main_input->getParameter<std::string>("@module_type"),
140  "source",
141  processConfiguration.get(),
143 
144  InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_, allocations);
145  areg->preSourceConstructionSignal_(md);
146  std::unique_ptr<InputSource> input;
147  try {
148  //even if we have an exception, send the signal
149  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
150  convertException::wrap([&]() {
151  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
152  });
153  }
154  catch (cms::Exception& iException) {
155  std::ostringstream ost;
156  ost << "Constructing input source of type " << modtype;
157  iException.addContext(ost.str());
158  throw;
159  }
160  return input;
161  }
162 
163  // ---------------------------------------------------------------
164  boost::shared_ptr<EDLooperBase>
167  ParameterSet& params) {
168  boost::shared_ptr<EDLooperBase> vLooper;
169 
170  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
171 
172  if(loopers.size() == 0) {
173  return vLooper;
174  }
175 
176  assert(1 == loopers.size());
177 
178  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
179  itName != itNameEnd;
180  ++itName) {
181 
182  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
183  providerPSet->registerIt();
184  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
185  cp,
186  *providerPSet);
187  }
188  return vLooper;
189 
190  }
191 
192  // ---------------------------------------------------------------
194  ServiceToken const& iToken,
196  std::vector<std::string> const& defaultServices,
197  std::vector<std::string> const& forcedServices) :
198  actReg_(),
199  preg_(),
200  branchIDListHelper_(),
201  serviceToken_(),
202  input_(),
203  espController_(new eventsetup::EventSetupsController),
204  esp_(),
205  act_table_(),
206  processConfiguration_(),
207  schedule_(),
208  subProcess_(),
209  historyAppender_(new HistoryAppender),
210  fb_(),
211  looper_(),
212  deferredExceptionPtrIsSet_(false),
213  principalCache_(),
214  beginJobCalled_(false),
215  shouldWeStop_(false),
216  stateMachineWasInErrorState_(false),
217  fileMode_(),
218  emptyRunLumiMode_(),
219  exceptionMessageFiles_(),
220  exceptionMessageRuns_(),
221  exceptionMessageLumis_(),
222  alreadyHandlingException_(false),
223  forceLooperToEnd_(false),
224  looperBeginJobRun_(false),
225  forceESCacheClearOnNewRun_(false),
226  numberOfForkedChildren_(0),
227  numberOfSequentialEventsPerChild_(1),
228  setCpuAffinity_(false),
229  eventSetupDataToExcludeFromPrefetching_() {
230  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
231  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
232  processDesc->addServices(defaultServices, forcedServices);
233  init(processDesc, iToken, iLegacy);
234  }
235 
237  std::vector<std::string> const& defaultServices,
238  std::vector<std::string> const& forcedServices) :
239  actReg_(),
240  preg_(),
241  branchIDListHelper_(),
242  serviceToken_(),
243  input_(),
244  espController_(new eventsetup::EventSetupsController),
245  esp_(),
246  act_table_(),
247  processConfiguration_(),
248  schedule_(),
249  subProcess_(),
250  historyAppender_(new HistoryAppender),
251  fb_(),
252  looper_(),
253  deferredExceptionPtrIsSet_(false),
254  principalCache_(),
255  beginJobCalled_(false),
256  shouldWeStop_(false),
257  stateMachineWasInErrorState_(false),
258  fileMode_(),
259  emptyRunLumiMode_(),
260  exceptionMessageFiles_(),
261  exceptionMessageRuns_(),
262  exceptionMessageLumis_(),
263  alreadyHandlingException_(false),
264  forceLooperToEnd_(false),
265  looperBeginJobRun_(false),
266  forceESCacheClearOnNewRun_(false),
267  numberOfForkedChildren_(0),
268  numberOfSequentialEventsPerChild_(1),
269  setCpuAffinity_(false),
270  asyncStopRequestedWhileProcessingEvents_(false),
271  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
272  eventSetupDataToExcludeFromPrefetching_()
273  {
274  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
275  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
276  processDesc->addServices(defaultServices, forcedServices);
278  }
279 
280  EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
281  ServiceToken const& token,
283  actReg_(),
284  preg_(),
285  branchIDListHelper_(),
286  serviceToken_(),
287  input_(),
288  espController_(new eventsetup::EventSetupsController),
289  esp_(),
290  act_table_(),
291  processConfiguration_(),
292  schedule_(),
293  subProcess_(),
294  historyAppender_(new HistoryAppender),
295  fb_(),
296  looper_(),
297  deferredExceptionPtrIsSet_(false),
298  principalCache_(),
299  beginJobCalled_(false),
300  shouldWeStop_(false),
301  stateMachineWasInErrorState_(false),
302  fileMode_(),
303  emptyRunLumiMode_(),
304  exceptionMessageFiles_(),
305  exceptionMessageRuns_(),
306  exceptionMessageLumis_(),
307  alreadyHandlingException_(false),
308  forceLooperToEnd_(false),
309  looperBeginJobRun_(false),
310  forceESCacheClearOnNewRun_(false),
311  numberOfForkedChildren_(0),
312  numberOfSequentialEventsPerChild_(1),
313  setCpuAffinity_(false),
314  asyncStopRequestedWhileProcessingEvents_(false),
315  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
316  eventSetupDataToExcludeFromPrefetching_()
317  {
318  init(processDesc, token, legacy);
319  }
320 
321 
323  actReg_(),
324  preg_(),
325  branchIDListHelper_(),
326  serviceToken_(),
327  input_(),
328  espController_(new eventsetup::EventSetupsController),
329  esp_(),
330  act_table_(),
331  processConfiguration_(),
332  schedule_(),
333  subProcess_(),
334  historyAppender_(new HistoryAppender),
335  fb_(),
336  looper_(),
337  deferredExceptionPtrIsSet_(false),
338  principalCache_(),
339  beginJobCalled_(false),
340  shouldWeStop_(false),
341  stateMachineWasInErrorState_(false),
342  fileMode_(),
343  emptyRunLumiMode_(),
344  exceptionMessageFiles_(),
345  exceptionMessageRuns_(),
346  exceptionMessageLumis_(),
347  alreadyHandlingException_(false),
348  forceLooperToEnd_(false),
349  looperBeginJobRun_(false),
350  forceESCacheClearOnNewRun_(false),
351  numberOfForkedChildren_(0),
352  numberOfSequentialEventsPerChild_(1),
353  setCpuAffinity_(false),
354  asyncStopRequestedWhileProcessingEvents_(false),
355  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
356  eventSetupDataToExcludeFromPrefetching_()
357 {
358  if(isPython) {
359  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
360  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
362  }
363  else {
364  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
366  }
367  }
368 
369  void
370  EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
371  ServiceToken const& iToken,
373 
374  //std::cerr << processDesc->dump() << std::endl;
375 
376  ROOT::Cintex::Cintex::Enable();
377 
378  // register the empty parentage vector , once and for all
380 
381  // register the empty parameter set, once and for all.
383 
384  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
385  //std::cerr << parameterSet->dump() << std::endl;
386 
387  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
388  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
389 
390  // Now set some parameters specific to the main process.
391  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
392  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
393  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
394  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
395  //threading
396  unsigned int nThreads=1;
397  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
398  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
399  if(nThreads == 0) {
400  nThreads = 1;
401  }
402  }
403  /* TODO: when we support having each stream run in a different thread use this default
404  unsigned int nStreams =nThreads;
405  */
406  unsigned int nStreams =1;
407  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
408  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
409  if(nStreams==0) {
410  nStreams = nThreads;
411  }
412  }
413  /*
414  bool nRunsSet = false;
415  */
416  unsigned int nConcurrentRuns =1;
417  /*
418  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
419  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
420  }
421  */
422  unsigned int nConcurrentLumis =1;
423  /*
424  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
425  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
426  } else {
427  nConcurrentLumis = nConcurrentRuns;
428  }
429  */
430  //Check that relationships between threading parameters makes sense
431  /*
432  if(nThreads<nStreams) {
433  //bad
434  }
435  if(nConcurrentRuns>nStreams) {
436  //bad
437  }
438  if(nConcurrentRuns>nConcurrentLumis) {
439  //bad
440  }
441  */
442  //forking
443  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
444  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
445  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
446  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
447  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
448  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
449  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
450  itPS != itPSEnd;
451  ++itPS) {
452  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
453  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
454  itPS->getUntrackedParameter<std::string>("label", "")));
455  }
456  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
457 
458  // Now do general initialization
459  ScheduleItems items;
460 
461  //initialize the services
462  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
463  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
464  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
465 
466  //make the services available
468 
469  if(nStreams>1) {
471  handler->willBeUsingThreads();
472  }
473 
474  // intialize miscellaneous items
475  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
476 
477  // intialize the event setup provider
478  esp_ = espController_->makeProvider(*parameterSet);
479 
480  // initialize the looper, if any
481  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
482  if(looper_) {
483  looper_->setActionTable(items.act_table_.get());
484  looper_->attachTo(*items.actReg_);
485 
486  //For now loopers make us run only 1 transition at a time
487  nStreams=1;
488  nConcurrentLumis=1;
489  nConcurrentRuns=1;
490  }
491 
492  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
493 
494  // initialize the input source
495  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_, preallocations_);
496 
497  // intialize the Schedule
498  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
499 
500  // set the data members
501  act_table_ = std::move(items.act_table_);
502  actReg_ = items.actReg_;
503  preg_.reset(items.preg_.release());
507  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
508 
509  FDEBUG(2) << parameterSet << std::endl;
510 
512  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
513  // Reusable event principal
514  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_,
517  historyAppender_.get(),
518  index));
519  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
520  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
522  }
523  // initialize the subprocess, if there is one
524  if(subProcessParameterSet) {
525  subProcess_.reset(new SubProcess(*subProcessParameterSet,
526  *parameterSet,
527  preg_,
530  *actReg_,
531  token,
534  &processContext_));
535  }
536  }
537 
539  // Make the services available while everything is being deleted.
540  ServiceToken token = getToken();
541  ServiceRegistry::Operate op(token);
542 
543  // manually destroy all these thing that may need the services around
544  espController_.reset();
545  subProcess_.reset();
546  esp_.reset();
547  schedule_.reset();
548  input_.reset();
549  looper_.reset();
550  actReg_.reset();
551 
554  }
555 
556  void
558  if(beginJobCalled_) return;
559  beginJobCalled_=true;
560  bk::beginJob();
561 
562  // StateSentry toerror(this); // should we add this ?
563  //make the services available
565 
570  actReg_->preallocateSignal_(bounds);
571  //NOTE: This implementation assumes 'Job' means one call
572  // the EventProcessor::run
573  // If it really means once per 'application' then this code will
574  // have to be changed.
575  // Also have to deal with case where have 'run' then new Module
576  // added and do 'run'
577  // again. In that case the newly added Module needs its 'beginJob'
578  // to be called.
579 
580  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
581  // For now we delay calling beginOfJob until first beginOfRun
582  //if(looper_) {
583  // looper_->beginOfJob(es);
584  //}
585  try {
586  convertException::wrap([&]() {
587  input_->doBeginJob();
588  });
589  }
590  catch(cms::Exception& ex) {
591  ex.addContext("Calling beginJob for the source");
592  throw;
593  }
594  schedule_->beginJob(*preg_);
595  // toerror.succeeded(); // should we add this?
596  if(hasSubProcess()) subProcess_->doBeginJob();
597  actReg_->postBeginJobSignal_();
598 
599  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
600  schedule_->beginStream(i);
601  if(hasSubProcess()) subProcess_->doBeginStream(i);
602  }
603  }
604 
605  void
607  // Collects exceptions, so we don't throw before all operations are performed.
608  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
609 
610  //make the services available
612 
613  //NOTE: this really should go elsewhere in the future
614  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
615  c.call([this,i](){this->schedule_->endStream(i);});
616  if(hasSubProcess()) {
617  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
618  }
619  }
620  schedule_->endJob(c);
621  if(hasSubProcess()) {
622  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
623  }
624  c.call(boost::bind(&InputSource::doEndJob, input_.get()));
625  if(looper_) {
626  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
627  }
628  auto actReg = actReg_.get();
629  c.call([actReg](){actReg->postEndJobSignal_();});
630  if(c.hasThrown()) {
631  c.rethrow();
632  }
633  }
634 
637  return serviceToken_;
638  }
639 
640  //Setup signal handler to listen for when forked children stop
641  namespace {
642  //These are volatile since the compiler can not be allowed to optimize them
643  // since they can be modified in the signaller handler
644  volatile bool child_failed = false;
645  volatile unsigned int num_children_done = 0;
646  volatile int child_fail_exit_status = 0;
647  volatile int child_fail_signal = 0;
648 
649  //NOTE: We setup the signal handler to run in the main thread which
650  // is also the same thread that then reads the above values
651 
652  extern "C" {
653  void ep_sigchld(int, siginfo_t*, void*) {
654  //printf("in sigchld\n");
655  //FDEBUG(1) << "in sigchld handler\n";
656  int stat_loc;
657  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
658  while(0<p) {
659  //printf(" looping\n");
660  if(WIFEXITED(stat_loc)) {
661  ++num_children_done;
662  if(0 != WEXITSTATUS(stat_loc)) {
663  child_fail_exit_status = WEXITSTATUS(stat_loc);
664  child_failed = true;
665  }
666  }
667  if(WIFSIGNALED(stat_loc)) {
668  ++num_children_done;
669  child_fail_signal = WTERMSIG(stat_loc);
670  child_failed = true;
671  }
672  p = waitpid(-1, &stat_loc, WNOHANG);
673  }
674  }
675  }
676 
677  }
678 
679  enum {
684  };
685 
686  namespace {
687  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
688  unsigned int n = 0;
689  while(numberOfChildren != 0) {
690  ++n;
691  numberOfChildren /= 10;
692  }
693  if(n == 0) {
694  n = 3; // Protect against zero numberOfChildren
695  }
696  return n;
697  }
698 
699  /*This class embodied the thread which is used to listen to the forked children and
700  then tell them which events they should process */
701  class MessageSenderToSource {
702  public:
703  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
704  void operator()();
705 
706  private:
707  const std::vector<int>& m_childrenPipes;
708  long const m_nEventsToProcess;
709  fd_set m_socketSet;
710  unsigned int m_aliveChildren;
711  int m_maxFd;
712  };
713 
714  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
715  std::vector<int> const& childrenPipes,
716  long iNEventsToProcess):
717  m_childrenPipes(childrenPipes),
718  m_nEventsToProcess(iNEventsToProcess),
719  m_aliveChildren(childrenSockets.size()),
720  m_maxFd(0)
721  {
722  FD_ZERO(&m_socketSet);
723  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
724  it != itEnd; it++) {
725  FD_SET(*it, &m_socketSet);
726  if (*it > m_maxFd) {
727  m_maxFd = *it;
728  }
729  }
730  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
731  it != itEnd; ++it) {
732  FD_SET(*it, &m_socketSet);
733  if (*it > m_maxFd) {
734  m_maxFd = *it;
735  }
736  }
737  m_maxFd++; // select reads [0,m_maxFd).
738  }
739 
740  /* This function is the heart of the communication between parent and child.
741  * When ready for more data, the child (see MessageReceiverForSource) requests
742  * data through a AF_UNIX socket message. The parent will then assign the next
743  * chunk of data by sending a message back.
744  *
745  * Additionally, this function also monitors the read-side of the pipe fd from the child.
746  * If the child dies unexpectedly, the pipe will be selected as ready for read and
747  * will return EPIPE when read from. Further, if the child thinks the parent has died
748  * (defined as waiting more than 1s for a response), it will write a single byte to
749  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
750  * If still alive, the parent will read the byte and ignore it.
751  *
752  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
753  * handler can distinguish between success and failure cases.
754  */
755 
756  void
757  MessageSenderToSource::operator()() {
758  multicore::MessageForParent childMsg;
759  LogInfo("ForkingController") << "I am controller";
760  //this is the master and therefore the controller
761 
762  multicore::MessageForSource sndmsg;
763  sndmsg.startIndex = 0;
764  sndmsg.nIndices = m_nEventsToProcess;
765  do {
766 
767  fd_set readSockets, errorSockets;
768  // Wait for a request from a child for events.
769  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
770  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
771  // Note that we don't timeout; may be reconsidered in the future.
772  ssize_t rc;
773  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
774  if (rc < 0) {
775  std::cerr << "select failed; should be impossible due to preconditions.\n";
776  abort();
777  break;
778  }
779 
780  // Read the message from the child.
781  for (int idx=0; idx<m_maxFd; idx++) {
782 
783  // Handle errors
784  if (FD_ISSET(idx, &errorSockets)) {
785  LogInfo("ForkingController") << "Error on socket " << idx;
786  FD_CLR(idx, &m_socketSet);
787  close(idx);
788  // See if it was the watchdog pipe that died.
789  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
790  if (*it == idx) {
791  m_aliveChildren--;
792  }
793  }
794  continue;
795  }
796 
797  if (!FD_ISSET(idx, &readSockets)) {
798  continue;
799  }
800 
801  // See if this FD is a child watchdog pipe. If so, read from it to prevent
802  // writes from blocking.
803  bool is_pipe = false;
804  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
805  if (*it == idx) {
806  is_pipe = true;
807  char buf;
808  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
809  if (rc <= 0) {
810  m_aliveChildren--;
811  FD_CLR(idx, &m_socketSet);
812  close(idx);
813  }
814  }
815  }
816 
817  // Only execute this block if the FD is a socket for sending the child work.
818  if (!is_pipe) {
819  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
820  if (rc < 0) {
821  FD_CLR(idx, &m_socketSet);
822  close(idx);
823  continue;
824  }
825 
826  // Tell the child what events to process.
827  // If 'send' fails, then the child process has failed (any other possibilities are
828  // eliminated because we are using fixed-size messages with Unix datagram sockets).
829  // Thus, the SIGCHLD handler will fire and set child_fail = true.
830  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
831  if (rc < 0) {
832  FD_CLR(idx, &m_socketSet);
833  close(idx);
834  continue;
835  }
836  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
837  sndmsg.startIndex += sndmsg.nIndices;
838  }
839  }
840 
841  } while (m_aliveChildren > 0);
842 
843  return;
844  }
845 
846  }
847 
848 
849  void EventProcessor::possiblyContinueAfterForkChildFailure() {
850  if(child_failed && continueAfterChildFailure_) {
851  if (child_fail_signal) {
852  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
853  child_fail_signal=0;
854  } else if (child_fail_exit_status) {
855  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
856  child_fail_exit_status=0;
857  } else {
858  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
859  }
860  child_failed =false;
861  }
862  }
863 
864  bool
865  EventProcessor::forkProcess(std::string const& jobReportFile) {
866 
867  if(0 == numberOfForkedChildren_) {return true;}
868  assert(0<numberOfForkedChildren_);
869  //do what we want done in common
870  {
871  beginJob(); //make sure this was run
872  // make the services available
873  ServiceRegistry::Operate operate(serviceToken_);
874 
875  InputSource::ItemType itemType;
876  itemType = input_->nextItemType();
877 
878  assert(itemType == InputSource::IsFile);
879  {
880  readFile();
881  }
882  itemType = input_->nextItemType();
883  assert(itemType == InputSource::IsRun);
884 
885  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
886  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
887  input_->runAuxiliary()->beginTime());
888  espController_->eventSetupForInstance(ts);
889  EventSetup const& es = esp_->eventSetup();
890 
891  //now get all the data available in the EventSetup
892  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
893  es.fillAvailableRecordKeys(recordKeys);
894  std::vector<eventsetup::DataKey> dataKeys;
895  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
896  itKey != itEnd;
897  ++itKey) {
898  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
899  //see if this is on our exclusion list
900  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
901  ExcludedData const* excludedData(nullptr);
902  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
903  excludedData = &(itExcludeRec->second);
904  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
905  //skip all items in this record
906  continue;
907  }
908  }
909  if(0 != recordPtr) {
910  dataKeys.clear();
911  recordPtr->fillRegisteredDataKeys(dataKeys);
912  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
913  itDataKey != itDataKeyEnd;
914  ++itDataKey) {
915  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
916  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
917  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
918  continue;
919  }
920  try {
921  recordPtr->doGet(*itDataKey);
922  } catch(cms::Exception& e) {
923  LogWarning("ForkingEventSetupPreFetching") << e.what();
924  }
925  }
926  }
927  }
928  }
929  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
930  {
931  // make the services available
932  ServiceRegistry::Operate operate(serviceToken_);
933  Service<JobReport> jobReport;
934  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
935 
936  //Now actually do the forking
937  actReg_->preForkReleaseResourcesSignal_();
938  input_->doPreForkReleaseResources();
939  schedule_->preForkReleaseResources();
940  }
941  installCustomHandler(SIGCHLD, ep_sigchld);
942 
943 
944  unsigned int childIndex = 0;
945  unsigned int const kMaxChildren = numberOfForkedChildren_;
946  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
947  std::vector<pid_t> childrenIds;
948  childrenIds.reserve(kMaxChildren);
949  std::vector<int> childrenSockets;
950  childrenSockets.reserve(kMaxChildren);
951  std::vector<int> childrenPipes;
952  childrenPipes.reserve(kMaxChildren);
953  std::vector<int> childrenSocketsCopy;
954  childrenSocketsCopy.reserve(kMaxChildren);
955  std::vector<int> childrenPipesCopy;
956  childrenPipesCopy.reserve(kMaxChildren);
957  int pipes[] {0, 0};
958 
959  {
960  // make the services available
961  ServiceRegistry::Operate operate(serviceToken_);
962  Service<JobReport> jobReport;
963  int sockets[2], fd_flags;
964  for(; childIndex < kMaxChildren; ++childIndex) {
965  // Create a UNIX_DGRAM socket pair
966  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
967  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
968  exit(EXIT_FAILURE);
969  }
970  if (pipe(pipes)) {
971  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
972  exit(EXIT_FAILURE);
973  }
974  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
975  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
976  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
977  exit(EXIT_FAILURE);
978  }
979  // Mark socket as non-block. Child must be careful to do select prior
980  // to reading from socket.
981  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
982  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
983  exit(EXIT_FAILURE);
984  }
985  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
986  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
987  exit(EXIT_FAILURE);
988  }
989  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
990  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
991  exit(EXIT_FAILURE);
992  }
993  // Linux man page notes there are some edge cases where reading from a
994  // fd can block, even after a select.
995  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
996  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
997  exit(EXIT_FAILURE);
998  }
999  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1000  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1001  exit(EXIT_FAILURE);
1002  }
1003 
1004  childrenPipesCopy = childrenPipes;
1005  childrenSocketsCopy = childrenSockets;
1006 
1007  pid_t value = fork();
1008  if(value == 0) {
1009  // Close the parent's side of the socket and pipe which will talk to us.
1010  close(pipes[0]);
1011  close(sockets[0]);
1012  // Close our copies of the parent's other communication pipes.
1013  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1014  close(*it);
1015  }
1016  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1017  close(*it);
1018  }
1019 
1020  // this is the child process, redirect stdout and stderr to a log file
1021  fflush(stdout);
1022  fflush(stderr);
1023  std::stringstream stout;
1024  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1025  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1026  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1027  }
1028  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1029  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1030  }
1031 
1032  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1033  if(setCpuAffinity_) {
1034  // CPU affinity is handled differently on macosx.
1035  // We disable it and print a message until someone reads:
1036  //
1037  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1038  //
1039  // and implements it.
1040 #ifdef __APPLE__
1041  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1042 #else
1043  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1044  cpu_set_t mask;
1045  CPU_ZERO(&mask);
1046  CPU_SET(childIndex, &mask);
1047  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1048  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1049  exit(-1);
1050  }
1051 #endif
1052  }
1053  break;
1054  } else {
1055  //this is the parent
1056  close(pipes[1]);
1057  close(sockets[1]);
1058  }
1059  if(value < 0) {
1060  LogError("ForkingChild") << "failed to create a child";
1061  exit(-1);
1062  }
1063  childrenIds.push_back(value);
1064  childrenSockets.push_back(sockets[0]);
1065  childrenPipes.push_back(pipes[0]);
1066  }
1067 
1068  if(childIndex < kMaxChildren) {
1069  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1070  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1071 
1072  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1073  input_->doPostForkReacquireResources(receiver);
1074  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1075  //NOTE: sources have to reset themselves by listening to the post fork message
1076  //rewindInput();
1077  return true;
1078  }
1079  jobReport->parentAfterFork(jobReportFile);
1080  }
1081 
1082  //this is the original, which is now the master for all the children
1083 
1084  //Need to wait for signals from the children or externally
1085  // To wait we must
1086  // 1) block the signals we want to wait on so we do not have a race condition
1087  // 2) check that we haven't already meet our ending criteria
1088  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1089  sigset_t blockingSigSet;
1090  sigset_t unblockingSigSet;
1091  sigset_t oldSigSet;
1092  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1093  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1094  sigaddset(&blockingSigSet, SIGCHLD);
1095  sigaddset(&blockingSigSet, SIGUSR2);
1096  sigaddset(&blockingSigSet, SIGINT);
1097  sigdelset(&unblockingSigSet, SIGCHLD);
1098  sigdelset(&unblockingSigSet, SIGUSR2);
1099  sigdelset(&unblockingSigSet, SIGINT);
1100  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1101 
1102  // If there are too many fd's (unlikely, but possible) for select, denote this
1103  // because the sender will fail.
1104  bool too_many_fds = false;
1105  if (pipes[1]+1 > FD_SETSIZE) {
1106  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1107  too_many_fds = true;
1108  }
1109 
1110  //create a thread that sends the units of work to workers
1111  // we create it after all signals were blocked so that this
1112  // thread is never interupted by a signal
1113  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1114  boost::thread senderThread(sender);
1115 
1116  if(not too_many_fds) {
1117  //NOTE: a child could have failed before we got here and even after this call
1118  // which is why the 'if' is conditional on continueAfterChildFailure_
1119  possiblyContinueAfterForkChildFailure();
1120  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1121  sigsuspend(&unblockingSigSet);
1122  possiblyContinueAfterForkChildFailure();
1123  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1124  }
1125  }
1126  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1127 
1128  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1129  if(child_failed) {
1130  LogError("ForkingStopping") << "child failed";
1131  }
1132  if(shutdown_flag) {
1133  LogSystem("ForkingStopping") << "asked to shutdown";
1134  }
1135 
1136  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1137  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1138  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1139  it != itEnd; ++it) {
1140  /* int result = */ kill(*it, SIGUSR2);
1141  }
1142  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1143  while(num_children_done != kMaxChildren) {
1144  sigsuspend(&unblockingSigSet);
1145  }
1146  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1147  }
1148  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1149  senderThread.join();
1150  if(child_failed && !continueAfterChildFailure_) {
1151  if (child_fail_signal) {
1152  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1153  } else if (child_fail_exit_status) {
1154  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1155  } else {
1156  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1157  }
1158  }
1159  if(too_many_fds) {
1160  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1161  }
1162  return false;
1163  }
1164 
1165  std::vector<ModuleDescription const*>
1166  EventProcessor::getAllModuleDescriptions() const {
1167  return schedule_->getAllModuleDescriptions();
1168  }
1169 
1170  int
1171  EventProcessor::totalEvents() const {
1172  return schedule_->totalEvents();
1173  }
1174 
1175  int
1176  EventProcessor::totalEventsPassed() const {
1177  return schedule_->totalEventsPassed();
1178  }
1179 
1180  int
1181  EventProcessor::totalEventsFailed() const {
1182  return schedule_->totalEventsFailed();
1183  }
1184 
1185  void
1186  EventProcessor::enableEndPaths(bool active) {
1187  schedule_->enableEndPaths(active);
1188  }
1189 
1190  bool
1191  EventProcessor::endPathsEnabled() const {
1192  return schedule_->endPathsEnabled();
1193  }
1194 
1195  void
1196  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1197  schedule_->getTriggerReport(rep);
1198  }
1199 
1200  void
1201  EventProcessor::clearCounters() {
1202  schedule_->clearCounters();
1203  }
1204 
1205 
1206  std::auto_ptr<statemachine::Machine>
1207  EventProcessor::createStateMachine() {
1209  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1210  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1211  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1212  else {
1213  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1214  << fileMode_ << ".\n"
1215  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1216  }
1217 
1218  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1219  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1220  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1221  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1222  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1223  else {
1224  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1225  << emptyRunLumiMode_ << ".\n"
1226  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1227  }
1228 
1229  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1230  fileMode,
1231  emptyRunLumiMode));
1232 
1233  machine->initiate();
1234  return machine;
1235  }
1236 
1237  bool
1238  EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
1239  bool returnValue = false;
1240 
1241  // Look for a shutdown signal
1242  if(shutdown_flag.load(std::memory_order_acquire)) {
1243  returnValue = true;
1244  returnCode = epSignal;
1245  }
1246  return returnValue;
1247  }
1248 
1249 
1251  EventProcessor::runToCompletion() {
1252 
1253  StatusCode returnCode=epSuccess;
1254  asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1255  std::auto_ptr<statemachine::Machine> machine;
1256  {
1257  beginJob(); //make sure this was called
1258 
1259  //StatusCode returnCode = epSuccess;
1260  stateMachineWasInErrorState_ = false;
1261 
1262  // make the services available
1263  ServiceRegistry::Operate operate(serviceToken_);
1264 
1265  machine = createStateMachine();
1266  nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1267  asyncStopRequestedWhileProcessingEvents_=false;
1268  try {
1269  convertException::wrap([&]() {
1270 
1271  InputSource::ItemType itemType;
1272 
1273  while(true) {
1274 
1275  bool more = true;
1276  if(numberOfForkedChildren_ > 0) {
1277  size_t size = preg_->size();
1278  more = input_->skipForForking();
1279  if(more) {
1280  if(size < preg_->size()) {
1281  principalCache_.adjustIndexesAfterProductRegistryAddition();
1282  }
1283  principalCache_.adjustEventsToNewProductRegistry(preg_);
1284  }
1285  }
1286  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1287 
1288  FDEBUG(1) << "itemType = " << itemType << "\n";
1289 
1290  if(checkForAsyncStopRequest(returnCode)) {
1291  forceLooperToEnd_ = true;
1292  machine->process_event(statemachine::Stop());
1293  forceLooperToEnd_ = false;
1294  break;
1295  }
1296 
1297  if(itemType == InputSource::IsEvent) {
1298  machine->process_event(statemachine::Event());
1299  if(asyncStopRequestedWhileProcessingEvents_) {
1300  forceLooperToEnd_ = true;
1301  machine->process_event(statemachine::Stop());
1302  forceLooperToEnd_ = false;
1303  returnCode = asyncStopStatusCodeFromProcessingEvents_;
1304  break;
1305  }
1306  itemType = nextItemTypeFromProcessingEvents_;
1307  }
1308 
1309  if(itemType == InputSource::IsEvent) {
1310  }
1311  else if(itemType == InputSource::IsStop) {
1312  machine->process_event(statemachine::Stop());
1313  }
1314  else if(itemType == InputSource::IsFile) {
1315  machine->process_event(statemachine::File());
1316  }
1317  else if(itemType == InputSource::IsRun) {
1318  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1319  }
1320  else if(itemType == InputSource::IsLumi) {
1321  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1322  }
1323  else if(itemType == InputSource::IsSynchronize) {
1324  //For now, we don't have to do anything
1325  }
1326  // This should be impossible
1327  else {
1329  << "Unknown next item type passed to EventProcessor\n"
1330  << "Please report this error to the Framework group\n";
1331  }
1332  if(machine->terminated()) {
1333  break;
1334  }
1335  } // End of loop over state machine events
1336  }); // convertException::wrap
1337  } // Try block
1338  // Some comments on exception handling related to the boost state machine:
1339  //
1340  // Some states used in the machine are special because they
1341  // perform actions while the machine is being terminated, actions
1342  // such as close files, call endRun, call endLumi etc ... Each of these
1343  // states has two functions that perform these actions. The functions
1344  // are almost identical. The major difference is that one version
1345  // catches all exceptions and the other lets exceptions pass through.
1346  // The destructor catches them and the other function named "exit" lets
1347  // them pass through. On a normal termination, boost will always call
1348  // "exit" and then the state destructor. In our state classes, the
1349  // the destructors do nothing if the exit function already took
1350  // care of things. Here's the interesting part. When boost is
1351  // handling an exception the "exit" function is not called (a boost
1352  // feature).
1353  //
1354  // If an exception occurs while the boost machine is in control
1355  // (which usually means inside a process_event call), then
1356  // the boost state machine destroys its states and "terminates" itself.
1357  // This already done before we hit the catch blocks below. In this case
1358  // the call to terminateMachine below only destroys an already
1359  // terminated state machine. Because exit is not called, the state destructors
1360  // handle cleaning up lumis, runs, and files. The destructors swallow
1361  // all exceptions and only pass through the exceptions messages, which
1362  // are tacked onto the original exception below.
1363  //
1364  // If an exception occurs when the boost state machine is not
1365  // in control (outside the process_event functions), then boost
1366  // cannot destroy its own states. The terminateMachine function
1367  // below takes care of that. The flag "alreadyHandlingException"
1368  // is set true so that the state exit functions do nothing (and
1369  // cannot throw more exceptions while handling the first). Then the
1370  // state destructors take care of this because exit did nothing.
1371  //
1372  // In both cases above, the EventProcessor::endOfLoop function is
1373  // not called because it can throw exceptions.
1374  //
1375  // One tricky aspect of the state machine is that things that can
1376  // throw should not be invoked by the state machine while another
1377  // exception is being handled.
1378  // Another tricky aspect is that it appears to be important to
1379  // terminate the state machine before invoking its destructor.
1380  // We've seen crashes that are not understood when that is not
1381  // done. Maintainers of this code should be careful about this.
1382 
1383  catch (cms::Exception & e) {
1384  alreadyHandlingException_ = true;
1385  terminateMachine(machine);
1386  alreadyHandlingException_ = false;
1387  if (!exceptionMessageLumis_.empty()) {
1388  e.addAdditionalInfo(exceptionMessageLumis_);
1389  if (e.alreadyPrinted()) {
1390  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1391  }
1392  }
1393  if (!exceptionMessageRuns_.empty()) {
1394  e.addAdditionalInfo(exceptionMessageRuns_);
1395  if (e.alreadyPrinted()) {
1396  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1397  }
1398  }
1399  if (!exceptionMessageFiles_.empty()) {
1400  e.addAdditionalInfo(exceptionMessageFiles_);
1401  if (e.alreadyPrinted()) {
1402  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1403  }
1404  }
1405  throw;
1406  }
1407 
1408  if(machine->terminated()) {
1409  FDEBUG(1) << "The state machine reports it has been terminated\n";
1410  machine.reset();
1411  }
1412 
1413  if(stateMachineWasInErrorState_) {
1414  throw cms::Exception("BadState")
1415  << "The boost state machine in the EventProcessor exited after\n"
1416  << "entering the Error state.\n";
1417  }
1418 
1419  }
1420  if(machine.get() != 0) {
1421  terminateMachine(machine);
1423  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1424  << "Please report this error to the Framework group\n";
1425  }
1426 
1427  return returnCode;
1428  }
1429 
1430  void EventProcessor::readFile() {
1431  FDEBUG(1) << " \treadFile\n";
1432  size_t size = preg_->size();
1433  fb_ = input_->readFile();
1434  if(size < preg_->size()) {
1435  principalCache_.adjustIndexesAfterProductRegistryAddition();
1436  }
1437  principalCache_.adjustEventsToNewProductRegistry(preg_);
1438  if((numberOfForkedChildren_ > 0) or
1439  (preallocations_.numberOfStreams()>1 and
1440  preallocations_.numberOfThreads()>1)) {
1441  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1442  }
1443  }
1444 
1445  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1446  if (fb_.get() != nullptr) {
1447  input_->closeFile(fb_.get(), cleaningUpAfterException);
1448  }
1449  FDEBUG(1) << "\tcloseInputFile\n";
1450  }
1451 
1452  void EventProcessor::openOutputFiles() {
1453  if (fb_.get() != nullptr) {
1454  schedule_->openOutputFiles(*fb_);
1455  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1456  }
1457  FDEBUG(1) << "\topenOutputFiles\n";
1458  }
1459 
1460  void EventProcessor::closeOutputFiles() {
1461  if (fb_.get() != nullptr) {
1462  schedule_->closeOutputFiles();
1463  if(hasSubProcess()) subProcess_->closeOutputFiles();
1464  }
1465  FDEBUG(1) << "\tcloseOutputFiles\n";
1466  }
1467 
1468  void EventProcessor::respondToOpenInputFile() {
1469  if(hasSubProcess()) {
1470  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1471  }
1472  if (fb_.get() != nullptr) {
1473  schedule_->respondToOpenInputFile(*fb_);
1474  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1475  }
1476  FDEBUG(1) << "\trespondToOpenInputFile\n";
1477  }
1478 
1479  void EventProcessor::respondToCloseInputFile() {
1480  if (fb_.get() != nullptr) {
1481  schedule_->respondToCloseInputFile(*fb_);
1482  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1483  }
1484  FDEBUG(1) << "\trespondToCloseInputFile\n";
1485  }
1486 
1487  void EventProcessor::startingNewLoop() {
1488  shouldWeStop_ = false;
1489  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1490  // until after we've called beginOfJob
1491  if(looper_ && looperBeginJobRun_) {
1492  looper_->doStartingNewLoop();
1493  }
1494  FDEBUG(1) << "\tstartingNewLoop\n";
1495  }
1496 
1497  bool EventProcessor::endOfLoop() {
1498  if(looper_) {
1499  ModuleChanger changer(schedule_.get(),preg_.get());
1500  looper_->setModuleChanger(&changer);
1501  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1502  looper_->setModuleChanger(nullptr);
1503  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1504  else return false;
1505  }
1506  FDEBUG(1) << "\tendOfLoop\n";
1507  return true;
1508  }
1509 
1510  void EventProcessor::rewindInput() {
1511  input_->repeat();
1512  input_->rewind();
1513  FDEBUG(1) << "\trewind\n";
1514  }
1515 
1516  void EventProcessor::prepareForNextLoop() {
1517  looper_->prepareForNextLoop(esp_.get());
1518  FDEBUG(1) << "\tprepareForNextLoop\n";
1519  }
1520 
1521  bool EventProcessor::shouldWeCloseOutput() const {
1522  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1523  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1524  }
1525 
1526  void EventProcessor::doErrorStuff() {
1527  FDEBUG(1) << "\tdoErrorStuff\n";
1528  LogError("StateMachine")
1529  << "The EventProcessor state machine encountered an unexpected event\n"
1530  << "and went to the error state\n"
1531  << "Will attempt to terminate processing normally\n"
1532  << "(IF using the looper the next loop will be attempted)\n"
1533  << "This likely indicates a bug in an input module or corrupted input or both\n";
1534  stateMachineWasInErrorState_ = true;
1535  }
1536 
1537  void EventProcessor::beginRun(statemachine::Run const& run) {
1538  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1539  input_->doBeginRun(runPrincipal, &processContext_);
1540  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1541  runPrincipal.beginTime());
1542  if(forceESCacheClearOnNewRun_){
1543  espController_->forceCacheClear();
1544  }
1545  espController_->eventSetupForInstance(ts);
1546  EventSetup const& es = esp_->eventSetup();
1547  if(looper_ && looperBeginJobRun_== false) {
1548  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1549  looper_->beginOfJob(es);
1550  looperBeginJobRun_ = true;
1551  looper_->doStartingNewLoop();
1552  }
1553  {
1555  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1556  if(hasSubProcess()) {
1557  subProcess_->doBeginRun(runPrincipal, ts);
1558  }
1559  }
1560  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1561  if(looper_) {
1562  looper_->doBeginRun(runPrincipal, es, &processContext_);
1563  }
1564  {
1566  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1567  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1568  if(hasSubProcess()) {
1569  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1570  }
1571  }
1572  }
1573  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1574  if(looper_) {
1575  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1576  }
1577  }
1578 
1579  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1580  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1581  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1582  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1583  runPrincipal.endTime());
1584  espController_->eventSetupForInstance(ts);
1585  EventSetup const& es = esp_->eventSetup();
1586  {
1587  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1589  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1590  if(hasSubProcess()) {
1591  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1592  }
1593  }
1594  }
1595  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1596  if(looper_) {
1597  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1598  }
1599  {
1601  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1602  if(hasSubProcess()) {
1603  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1604  }
1605  }
1606  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1607  if(looper_) {
1608  looper_->doEndRun(runPrincipal, es, &processContext_);
1609  }
1610  }
1611 
1612  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1613  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1614  input_->doBeginLumi(lumiPrincipal, &processContext_);
1615 
1617  if(rng.isAvailable()) {
1618  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1619  rng->preBeginLumi(lb);
1620  }
1621 
1622  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1623  // lumi blocks know their start and end times why not also start and end events?
1624  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1625  espController_->eventSetupForInstance(ts);
1626  EventSetup const& es = esp_->eventSetup();
1627  {
1629  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1630  if(hasSubProcess()) {
1631  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1632  }
1633  }
1634  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1635  if(looper_) {
1636  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1637  }
1638  {
1639  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1641  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1642  if(hasSubProcess()) {
1643  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1644  }
1645  }
1646  }
1647  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1648  if(looper_) {
1649  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1650  }
1651  }
1652 
1653  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1654  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1655  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1656  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1657  // lumi blocks know their start and end times why not also start and end events?
1658  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1659  lumiPrincipal.endTime());
1660  espController_->eventSetupForInstance(ts);
1661  EventSetup const& es = esp_->eventSetup();
1662  {
1663  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1665  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1666  if(hasSubProcess()) {
1667  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1668  }
1669  }
1670  }
1671  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1672  if(looper_) {
1673  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1674  }
1675  {
1677  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1678  if(hasSubProcess()) {
1679  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1680  }
1681  }
1682  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1683  if(looper_) {
1684  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1685  }
1686  }
1687 
1688  statemachine::Run EventProcessor::readRun() {
1689  if (principalCache_.hasRunPrincipal()) {
1691  << "EventProcessor::readRun\n"
1692  << "Illegal attempt to insert run into cache\n"
1693  << "Contact a Framework Developer\n";
1694  }
1695  boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(input_->runAuxiliary(),
1696  preg_,
1697  *processConfiguration_,
1698  historyAppender_.get(),
1699  0));
1700  input_->readRun(*rp, *historyAppender_);
1701  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1702  principalCache_.insert(rp);
1703  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1704  }
1705 
1706  statemachine::Run EventProcessor::readAndMergeRun() {
1707  principalCache_.merge(input_->runAuxiliary(), preg_);
1708  auto runPrincipal =principalCache_.runPrincipalPtr();
1709  input_->readAndMergeRun(*runPrincipal);
1710  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1711  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1712  }
1713 
1714  int EventProcessor::readLuminosityBlock() {
1715  if (principalCache_.hasLumiPrincipal()) {
1717  << "EventProcessor::readRun\n"
1718  << "Illegal attempt to insert lumi into cache\n"
1719  << "Contact a Framework Developer\n";
1720  }
1721  if (!principalCache_.hasRunPrincipal()) {
1723  << "EventProcessor::readRun\n"
1724  << "Illegal attempt to insert lumi into cache\n"
1725  << "Run is invalid\n"
1726  << "Contact a Framework Developer\n";
1727  }
1728  boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(input_->luminosityBlockAuxiliary(),
1729  preg_,
1730  *processConfiguration_,
1731  historyAppender_.get(),
1732  0));
1733  input_->readLuminosityBlock(*lbp, *historyAppender_);
1734  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1735  principalCache_.insert(lbp);
1736  return input_->luminosityBlock();
1737  }
1738 
1739  int EventProcessor::readAndMergeLumi() {
1740  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1741  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1742  return input_->luminosityBlock();
1743  }
1744 
1745  void EventProcessor::writeRun(statemachine::Run const& run) {
1746  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1747  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1748  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1749  }
1750 
1751  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
1752  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1753  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1754  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1755  }
1756 
1757  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1758  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1759  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1760  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1761  }
1762 
1763  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1764  principalCache_.deleteLumi(phid, run, lumi);
1765  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1766  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1767  }
1768 
1769  class StreamProcessingTask : public tbb::task {
1770  public:
1772  unsigned int iStreamIndex,
1773  std::atomic<bool>* iFinishedProcessingEvents,
1774  tbb::task* iWaitTask):
1775  m_proc(iProc),
1776  m_streamID(iStreamIndex),
1777  m_finishedProcessingEvents(iFinishedProcessingEvents),
1778  m_waitTask(iWaitTask){}
1779 
1780  tbb::task* execute() {
1781  m_proc->processEventsForStreamAsync(m_streamID,m_finishedProcessingEvents);
1782  m_waitTask->decrement_ref_count();
1783  return nullptr;
1784  }
1785  private:
1787  unsigned int m_streamID;
1788  std::atomic<bool>* m_finishedProcessingEvents;
1789  tbb::task* m_waitTask;
1790  };
1791 
1792  void EventProcessor::processEventsForStreamAsync(unsigned int iStreamIndex,
1793  std::atomic<bool>* finishedProcessingEvents) {
1794  try {
1795  // make the services available
1796  ServiceRegistry::Operate operate(serviceToken_);
1797  if(preallocations_.numberOfStreams()>1) {
1799  handler->initializeThisThreadForUse();
1800  }
1801 
1802  if(iStreamIndex==0) {
1803  processEvent(0);
1804  }
1805  do {
1806  if(shouldWeStop()) {
1807  break;
1808  }
1809  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1810  //another thread hit an exception
1811  //std::cerr<<"another thread saw an exception\n";
1812  break;
1813  }
1814  {
1815 
1816 
1817  {
1818  //nextItemType and readEvent need to be in same critical section
1819  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1820 
1821  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1822  //std::cerr<<"finishedProcessingEvents\n";
1823  break;
1824  }
1825 
1826  //If source and DelayedReader share a resource we must serialize them
1827  auto sr = input_->resourceSharedWithDelayedReader();
1828  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1829  if(sr) {
1830  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1831  }
1832 
1833  InputSource::ItemType itemType = input_->nextItemType();
1834  if (InputSource::IsEvent !=itemType) {
1835  nextItemTypeFromProcessingEvents_ = itemType;
1836  finishedProcessingEvents->store(true,std::memory_order_release);
1837  //std::cerr<<"next item type "<<itemType<<"\n";
1838  break;
1839  }
1840  if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1841  //std::cerr<<"task told to async stop\n";
1842  break;
1843  }
1844  readEvent(iStreamIndex);
1845  }
1846  }
1847  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1848  //another thread hit an exception
1849  //std::cerr<<"another thread saw an exception\n";
1850  break;
1851  }
1852  processEvent(iStreamIndex);
1853  }while(true);
1854  } catch (...) {
1855  bool expected =false;
1856  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1857  deferredExceptionPtr_ = std::current_exception();
1858  }
1859  //std::cerr<<"task caught exception\n";
1860  }
1861  }
1862 
1863  void EventProcessor::readAndProcessEvent() {
1864  if(numberOfForkedChildren_>0) {
1865  readEvent(0);
1866  processEvent(0);
1867  return;
1868  }
1869  nextItemTypeFromProcessingEvents_ = InputSource::IsEvent; //needed for looper
1870  asyncStopRequestedWhileProcessingEvents_ = false;
1871 
1872  std::atomic<bool> finishedProcessingEvents{false};
1873 
1874  //Task assumes Stream 0 has already read the event that caused us to go here
1875  readEvent(0);
1876 
1877  //To wait, the ref count has to b 1+#streams
1878  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1879  eventLoopWaitTask->increment_ref_count();
1880 
1881  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1882  unsigned int iStreamIndex = 0;
1883  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1884  eventLoopWaitTask->increment_ref_count();
1885  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1886 
1887  }
1888  eventLoopWaitTask->increment_ref_count();
1889  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1890  tbb::task::destroy(*eventLoopWaitTask);
1891 
1892  //One of the processing threads saw an exception
1893  if(deferredExceptionPtrIsSet_) {
1894  std::rethrow_exception(deferredExceptionPtr_);
1895  }
1896  }
1897  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1898  //TODO this will have to become per stream
1899  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1900  StreamContext streamContext(event.streamID(), &processContext_);
1901  input_->readEvent(event, streamContext);
1902  FDEBUG(1) << "\treadEvent\n";
1903  }
1904  void EventProcessor::processEvent(unsigned int iStreamIndex) {
1905  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1906  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1908  if(rng.isAvailable()) {
1909  Event ev(*pep, ModuleDescription(), nullptr);
1910  rng->postEventRead(ev);
1911  }
1912  assert(pep->luminosityBlockPrincipalPtrValid());
1913  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1914  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1915 
1916  //We can only update IOVs on Lumi boundaries
1917  //IOVSyncValue ts(pep->id(), pep->time());
1918  //espController_->eventSetupForInstance(ts);
1919  EventSetup const& es = esp_->eventSetup();
1920  {
1922  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
1923  if(hasSubProcess()) {
1924  subProcess_->doEvent(*pep);
1925  }
1926  }
1927 
1928  //NOTE: If we have a looper we only have one Stream
1929  if(looper_) {
1930  bool randomAccess = input_->randomAccess();
1931  ProcessingController::ForwardState forwardState = input_->forwardState();
1932  ProcessingController::ReverseState reverseState = input_->reverseState();
1933  ProcessingController pc(forwardState, reverseState, randomAccess);
1934 
1935  EDLooperBase::Status status = EDLooperBase::kContinue;
1936  do {
1937 
1938  StreamContext streamContext(pep->streamID(), &processContext_);
1939  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
1940 
1941  bool succeeded = true;
1942  if(randomAccess) {
1943  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1944  input_->skipEvents(-2);
1945  }
1946  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1947  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1948  }
1949  }
1950  pc.setLastOperationSucceeded(succeeded);
1951  } while(!pc.lastOperationSucceeded());
1952  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1953 
1954  }
1955 
1956  FDEBUG(1) << "\tprocessEvent\n";
1957  pep->clearEventPrincipal();
1958  }
1959 
1960  bool EventProcessor::shouldWeStop() const {
1961  FDEBUG(1) << "\tshouldWeStop\n";
1962  if(shouldWeStop_) return true;
1963  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
1964  }
1965 
1966  void EventProcessor::setExceptionMessageFiles(std::string& message) {
1967  exceptionMessageFiles_ = message;
1968  }
1969 
1970  void EventProcessor::setExceptionMessageRuns(std::string& message) {
1971  exceptionMessageRuns_ = message;
1972  }
1973 
1974  void EventProcessor::setExceptionMessageLumis(std::string& message) {
1975  exceptionMessageLumis_ = message;
1976  }
1977 
1978  bool EventProcessor::alreadyHandlingException() const {
1979  return alreadyHandlingException_;
1980  }
1981 
1982  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
1983  if(iMachine.get() != 0) {
1984  if(!iMachine->terminated()) {
1985  forceLooperToEnd_ = true;
1986  iMachine->process_event(statemachine::Stop());
1987  forceLooperToEnd_ = false;
1988  }
1989  else {
1990  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
1991  }
1992  if(iMachine->terminated()) {
1993  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
1994  }
1995  iMachine.reset();
1996  }
1997  }
1998 }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void clear()
Not thread safe.
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
std::atomic< bool > * m_finishedProcessingEvents
edm::EventID specifiedEventTransition() const
StreamProcessingTask(EventProcessor *iProc, unsigned int iStreamIndex, std::atomic< bool > *iFinishedProcessingEvents, tbb::task *iWaitTask)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:60
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:249
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:550
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
tuple lumi
Definition: fjr2json.py:35
Timestamp const & endTime() const
void call(boost::function< void(void)>)
processConfiguration
Definition: Schedule.cc:369
def pipe
Definition: pipe.py:5
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:61
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
Definition: EventID.h:31
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
static std::string const input
Definition: EdmProvDump.cc:44
boost::shared_ptr< edm::ParameterSet > parameterSet()
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
static InputSourceFactory const * get()
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:42
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::unique_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:58
virtual void endOfJob()
Definition: EDLooperBase.cc:93
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
areg
Definition: Schedule.cc:369
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
boost::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::RunNumber_t runNumber() const
Definition: EPStates.h:49
static ComponentFactory< T > const * get()
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:48
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
Definition: EventRange.h:32
#define O_NONBLOCK
Definition: SysFile.h:21
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:57
tuple status
Definition: ntuplemaker.py:245
preg
Definition: Schedule.cc:369
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:14
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
Definition: ScheduleItems.h:59
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)