CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
9 
34 
36 
44 
49 
58 
59 #include "MessageForSource.h"
60 #include "MessageForParent.h"
61 
62 #include "boost/bind.hpp"
63 #include "boost/thread/xtime.hpp"
64 
65 #include <exception>
66 #include <iomanip>
67 #include <iostream>
68 #include <utility>
69 #include <sstream>
70 
71 #include <sys/ipc.h>
72 #include <sys/msg.h>
73 
74 //Used for forking
75 #include <sys/types.h>
76 #include <sys/wait.h>
77 #include <sys/socket.h>
78 #include <sys/select.h>
79 #include <sys/fcntl.h>
80 #include <unistd.h>
81 
82 //Used for CPU affinity
83 #ifndef __APPLE__
84 #include <sched.h>
85 #endif
86 
87 //Needed for introspection
88 #include "Cintex/Cintex.h"
89 
90 namespace edm {
91 
92  // ---------------------------------------------------------------
93  std::unique_ptr<InputSource>
95  CommonParams const& common,
97  boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
98  boost::shared_ptr<ActivityRegistry> areg,
99  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
100  PreallocationConfiguration const& allocations) {
101  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
102  if(main_input == 0) {
104  << "There must be exactly one source in the configuration.\n"
105  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
106  }
107 
108  std::string modtype(main_input->getParameter<std::string>("@module_type"));
109 
110  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
112  ConfigurationDescriptions descriptions(filler->baseType());
113  filler->fill(descriptions);
114 
115  try {
116  try {
117  descriptions.validate(*main_input, std::string("source"));
118  }
119  catch (cms::Exception& e) { throw; }
120  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
121  catch (std::exception& e) { convertException::stdToEDM(e); }
123  catch (char const* c) { convertException::charPtrToEDM(c); }
124  catch (...) { convertException::unknownToEDM(); }
125  }
126  catch (cms::Exception & iException) {
127  std::ostringstream ost;
128  ost << "Validating configuration of input source of type " << modtype;
129  iException.addContext(ost.str());
130  throw;
131  }
132 
133  main_input->registerIt();
134 
135  // Fill in "ModuleDescription", in case the input source produces
136  // any EDproducts, which would be registered in the ProductRegistry.
137  // Also fill in the process history item for this process.
138  // There is no module label for the unnamed input source, so
139  // just use "source".
140  // Only the tracked parameters belong in the process configuration.
141  ModuleDescription md(main_input->id(),
142  main_input->getParameter<std::string>("@module_type"),
143  "source",
144  processConfiguration.get(),
146 
147  InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_, allocations);
148  areg->preSourceConstructionSignal_(md);
149  std::unique_ptr<InputSource> input;
150  try {
151  //even if we have an exception, send the signal
152  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
153  try {
154  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
155  }
156  catch (cms::Exception& e) { throw; }
157  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
158  catch (std::exception& e) { convertException::stdToEDM(e); }
160  catch (char const* c) { convertException::charPtrToEDM(c); }
161  catch (...) { convertException::unknownToEDM(); }
162  }
163  catch (cms::Exception& iException) {
164  std::ostringstream ost;
165  ost << "Constructing input source of type " << modtype;
166  iException.addContext(ost.str());
167  throw;
168  }
169  return input;
170  }
171 
172  // ---------------------------------------------------------------
173  boost::shared_ptr<EDLooperBase>
176  ParameterSet& params) {
177  boost::shared_ptr<EDLooperBase> vLooper;
178 
179  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
180 
181  if(loopers.size() == 0) {
182  return vLooper;
183  }
184 
185  assert(1 == loopers.size());
186 
187  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
188  itName != itNameEnd;
189  ++itName) {
190 
191  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
192  providerPSet->registerIt();
193  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
194  cp,
195  *providerPSet);
196  }
197  return vLooper;
198 
199  }
200 
201  // ---------------------------------------------------------------
203  ServiceToken const& iToken,
205  std::vector<std::string> const& defaultServices,
206  std::vector<std::string> const& forcedServices) :
207  actReg_(),
208  preg_(),
209  branchIDListHelper_(),
210  serviceToken_(),
211  input_(),
212  espController_(new eventsetup::EventSetupsController),
213  esp_(),
214  act_table_(),
215  processConfiguration_(),
216  schedule_(),
217  subProcess_(),
218  historyAppender_(new HistoryAppender),
219  fb_(),
220  looper_(),
221  principalCache_(),
222  beginJobCalled_(false),
223  shouldWeStop_(false),
224  stateMachineWasInErrorState_(false),
225  fileMode_(),
226  emptyRunLumiMode_(),
227  exceptionMessageFiles_(),
228  exceptionMessageRuns_(),
229  exceptionMessageLumis_(),
230  alreadyHandlingException_(false),
231  forceLooperToEnd_(false),
232  looperBeginJobRun_(false),
233  forceESCacheClearOnNewRun_(false),
234  numberOfForkedChildren_(0),
235  numberOfSequentialEventsPerChild_(1),
236  setCpuAffinity_(false),
237  eventSetupDataToExcludeFromPrefetching_() {
238  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
239  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
240  processDesc->addServices(defaultServices, forcedServices);
241  init(processDesc, iToken, iLegacy);
242  }
243 
245  std::vector<std::string> const& defaultServices,
246  std::vector<std::string> const& forcedServices) :
247  actReg_(),
248  preg_(),
249  branchIDListHelper_(),
250  serviceToken_(),
251  input_(),
252  espController_(new eventsetup::EventSetupsController),
253  esp_(),
254  act_table_(),
255  processConfiguration_(),
256  schedule_(),
257  subProcess_(),
258  historyAppender_(new HistoryAppender),
259  fb_(),
260  looper_(),
261  principalCache_(),
262  beginJobCalled_(false),
263  shouldWeStop_(false),
264  stateMachineWasInErrorState_(false),
265  fileMode_(),
266  emptyRunLumiMode_(),
267  exceptionMessageFiles_(),
268  exceptionMessageRuns_(),
269  exceptionMessageLumis_(),
270  alreadyHandlingException_(false),
271  forceLooperToEnd_(false),
272  looperBeginJobRun_(false),
273  forceESCacheClearOnNewRun_(false),
274  numberOfForkedChildren_(0),
275  numberOfSequentialEventsPerChild_(1),
276  setCpuAffinity_(false),
277  asyncStopRequestedWhileProcessingEvents_(false),
278  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
279  eventSetupDataToExcludeFromPrefetching_()
280  {
281  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
282  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
283  processDesc->addServices(defaultServices, forcedServices);
285  }
286 
287  EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
288  ServiceToken const& token,
290  actReg_(),
291  preg_(),
292  branchIDListHelper_(),
293  serviceToken_(),
294  input_(),
295  espController_(new eventsetup::EventSetupsController),
296  esp_(),
297  act_table_(),
298  processConfiguration_(),
299  schedule_(),
300  subProcess_(),
301  historyAppender_(new HistoryAppender),
302  fb_(),
303  looper_(),
304  principalCache_(),
305  beginJobCalled_(false),
306  shouldWeStop_(false),
307  stateMachineWasInErrorState_(false),
308  fileMode_(),
309  emptyRunLumiMode_(),
310  exceptionMessageFiles_(),
311  exceptionMessageRuns_(),
312  exceptionMessageLumis_(),
313  alreadyHandlingException_(false),
314  forceLooperToEnd_(false),
315  looperBeginJobRun_(false),
316  forceESCacheClearOnNewRun_(false),
317  numberOfForkedChildren_(0),
318  numberOfSequentialEventsPerChild_(1),
319  setCpuAffinity_(false),
320  asyncStopRequestedWhileProcessingEvents_(false),
321  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
322  eventSetupDataToExcludeFromPrefetching_()
323  {
324  init(processDesc, token, legacy);
325  }
326 
327 
329  actReg_(),
330  preg_(),
331  branchIDListHelper_(),
332  serviceToken_(),
333  input_(),
334  espController_(new eventsetup::EventSetupsController),
335  esp_(),
336  act_table_(),
337  processConfiguration_(),
338  schedule_(),
339  subProcess_(),
340  historyAppender_(new HistoryAppender),
341  fb_(),
342  looper_(),
343  principalCache_(),
344  beginJobCalled_(false),
345  shouldWeStop_(false),
346  stateMachineWasInErrorState_(false),
347  fileMode_(),
348  emptyRunLumiMode_(),
349  exceptionMessageFiles_(),
350  exceptionMessageRuns_(),
351  exceptionMessageLumis_(),
352  alreadyHandlingException_(false),
353  forceLooperToEnd_(false),
354  looperBeginJobRun_(false),
355  forceESCacheClearOnNewRun_(false),
356  numberOfForkedChildren_(0),
357  numberOfSequentialEventsPerChild_(1),
358  setCpuAffinity_(false),
359  asyncStopRequestedWhileProcessingEvents_(false),
360  nextItemTypeFromProcessingEvents_(InputSource::IsEvent),
361  eventSetupDataToExcludeFromPrefetching_()
362 {
363  if(isPython) {
364  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
365  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
367  }
368  else {
369  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
371  }
372  }
373 
374  void
375  EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
376  ServiceToken const& iToken,
378 
379  //std::cerr << processDesc->dump() << std::endl;
380 
381  ROOT::Cintex::Cintex::Enable();
382 
383  // register the empty parentage vector , once and for all
385 
386  // register the empty parameter set, once and for all.
388 
389  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
390  //std::cerr << parameterSet->dump() << std::endl;
391 
392  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
393  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
394 
395  // Now set some parameters specific to the main process.
396  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
397  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
398  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
399  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
400  //threading
401  unsigned int nThreads=1;
402  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
403  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
404  if(nThreads == 0) {
405  nThreads = 1;
406  }
407  }
408  /* TODO: when we support having each stream run in a different thread use this default
409  unsigned int nStreams =nThreads;
410  */
411  unsigned int nStreams =1;
412  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
413  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
414  if(nStreams==0) {
415  nStreams = nThreads;
416  }
417  }
418  /*
419  bool nRunsSet = false;
420  */
421  unsigned int nConcurrentRuns =1;
422  /*
423  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
424  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
425  }
426  */
427  unsigned int nConcurrentLumis =1;
428  /*
429  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
430  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
431  } else {
432  nConcurrentLumis = nConcurrentRuns;
433  }
434  */
435  //Check that relationships between threading parameters makes sense
436  /*
437  if(nThreads<nStreams) {
438  //bad
439  }
440  if(nConcurrentRuns>nStreams) {
441  //bad
442  }
443  if(nConcurrentRuns>nConcurrentLumis) {
444  //bad
445  }
446  */
447  //forking
448  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
449  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
450  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
451  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
452  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
453  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
454  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
455  itPS != itPSEnd;
456  ++itPS) {
457  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
458  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
459  itPS->getUntrackedParameter<std::string>("label", "")));
460  }
461  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
462 
463  // Now do general initialization
464  ScheduleItems items;
465 
466  //initialize the services
467  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
468  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
469  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
470 
471  //make the services available
473 
474  // intialize miscellaneous items
475  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
476 
477  // intialize the event setup provider
478  esp_ = espController_->makeProvider(*parameterSet);
479 
480  // initialize the looper, if any
481  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
482  if(looper_) {
483  looper_->setActionTable(items.act_table_.get());
484  looper_->attachTo(*items.actReg_);
485 
486  //For now loopers make us run only 1 transition at a time
487  nStreams=1;
488  nConcurrentLumis=1;
489  nConcurrentRuns=1;
490  }
491 
492  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
493 
494  // initialize the input source
495  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_, preallocations_);
496 
497  // intialize the Schedule
498  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
499 
500  // set the data members
501  act_table_ = std::move(items.act_table_);
502  actReg_ = items.actReg_;
503  preg_.reset(items.preg_.release());
507  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
508 
509  FDEBUG(2) << parameterSet << std::endl;
510 
512  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
513  // Reusable event principal
514  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_,
517  historyAppender_.get(),
518  index));
519  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
520  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
522  }
523  // initialize the subprocess, if there is one
524  if(subProcessParameterSet) {
525  subProcess_.reset(new SubProcess(*subProcessParameterSet,
526  *parameterSet,
527  preg_,
530  *actReg_,
531  token,
534  &processContext_));
535  }
536  }
537 
539  // Make the services available while everything is being deleted.
540  ServiceToken token = getToken();
541  ServiceRegistry::Operate op(token);
542 
543  // manually destroy all these thing that may need the services around
544  espController_.reset();
545  subProcess_.reset();
546  esp_.reset();
547  schedule_.reset();
548  input_.reset();
549  looper_.reset();
550  actReg_.reset();
551 
554  }
555 
556  void
558  if(beginJobCalled_) return;
559  beginJobCalled_=true;
560  bk::beginJob();
561 
562  // StateSentry toerror(this); // should we add this ?
563  //make the services available
565 
570  actReg_->preallocateSignal_(bounds);
571  //NOTE: This implementation assumes 'Job' means one call
572  // the EventProcessor::run
573  // If it really means once per 'application' then this code will
574  // have to be changed.
575  // Also have to deal with case where have 'run' then new Module
576  // added and do 'run'
577  // again. In that case the newly added Module needs its 'beginJob'
578  // to be called.
579 
580  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
581  // For now we delay calling beginOfJob until first beginOfRun
582  //if(looper_) {
583  // looper_->beginOfJob(es);
584  //}
585  try {
586  try {
587  input_->doBeginJob();
588  }
589  catch (cms::Exception& e) { throw; }
590  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
591  catch (std::exception& e) { convertException::stdToEDM(e); }
593  catch(char const* c) { convertException::charPtrToEDM(c); }
594  catch (...) { convertException::unknownToEDM(); }
595  }
596  catch(cms::Exception& ex) {
597  ex.addContext("Calling beginJob for the source");
598  throw;
599  }
600  schedule_->beginJob(*preg_);
601  // toerror.succeeded(); // should we add this?
602  if(hasSubProcess()) subProcess_->doBeginJob();
603  actReg_->postBeginJobSignal_();
604 
605  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
606  schedule_->beginStream(i);
607  if(hasSubProcess()) subProcess_->doBeginStream(i);
608  }
609  }
610 
611  void
613  // Collects exceptions, so we don't throw before all operations are performed.
614  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
615 
616  //make the services available
618 
619  //NOTE: this really should go elsewhere in the future
620  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
621  c.call([this,i](){this->schedule_->endStream(i);});
622  if(hasSubProcess()) {
623  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
624  }
625  }
626  schedule_->endJob(c);
627  if(hasSubProcess()) {
628  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
629  }
630  c.call(boost::bind(&InputSource::doEndJob, input_.get()));
631  if(looper_) {
632  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
633  }
634  auto actReg = actReg_.get();
635  c.call([actReg](){actReg->postEndJobSignal_();});
636  if(c.hasThrown()) {
637  c.rethrow();
638  }
639  }
640 
643  return serviceToken_;
644  }
645 
646  //Setup signal handler to listen for when forked children stop
647  namespace {
648  //These are volatile since the compiler can not be allowed to optimize them
649  // since they can be modified in the signaller handler
650  volatile bool child_failed = false;
651  volatile unsigned int num_children_done = 0;
652  volatile int child_fail_exit_status = 0;
653  volatile int child_fail_signal = 0;
654 
655  //NOTE: We setup the signal handler to run in the main thread which
656  // is also the same thread that then reads the above values
657 
658  extern "C" {
659  void ep_sigchld(int, siginfo_t*, void*) {
660  //printf("in sigchld\n");
661  //FDEBUG(1) << "in sigchld handler\n";
662  int stat_loc;
663  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
664  while(0<p) {
665  //printf(" looping\n");
666  if(WIFEXITED(stat_loc)) {
667  ++num_children_done;
668  if(0 != WEXITSTATUS(stat_loc)) {
669  child_fail_exit_status = WEXITSTATUS(stat_loc);
670  child_failed = true;
671  }
672  }
673  if(WIFSIGNALED(stat_loc)) {
674  ++num_children_done;
675  child_fail_signal = WTERMSIG(stat_loc);
676  child_failed = true;
677  }
678  p = waitpid(-1, &stat_loc, WNOHANG);
679  }
680  }
681  }
682 
683  }
684 
685  enum {
690  };
691 
692  namespace {
693  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
694  unsigned int n = 0;
695  while(numberOfChildren != 0) {
696  ++n;
697  numberOfChildren /= 10;
698  }
699  if(n == 0) {
700  n = 3; // Protect against zero numberOfChildren
701  }
702  return n;
703  }
704 
705  /*This class embodied the thread which is used to listen to the forked children and
706  then tell them which events they should process */
707  class MessageSenderToSource {
708  public:
709  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
710  void operator()();
711 
712  private:
713  const std::vector<int>& m_childrenPipes;
714  long const m_nEventsToProcess;
715  fd_set m_socketSet;
716  unsigned int m_aliveChildren;
717  int m_maxFd;
718  };
719 
720  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
721  std::vector<int> const& childrenPipes,
722  long iNEventsToProcess):
723  m_childrenPipes(childrenPipes),
724  m_nEventsToProcess(iNEventsToProcess),
725  m_aliveChildren(childrenSockets.size()),
726  m_maxFd(0)
727  {
728  FD_ZERO(&m_socketSet);
729  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
730  it != itEnd; it++) {
731  FD_SET(*it, &m_socketSet);
732  if (*it > m_maxFd) {
733  m_maxFd = *it;
734  }
735  }
736  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
737  it != itEnd; ++it) {
738  FD_SET(*it, &m_socketSet);
739  if (*it > m_maxFd) {
740  m_maxFd = *it;
741  }
742  }
743  m_maxFd++; // select reads [0,m_maxFd).
744  }
745 
746  /* This function is the heart of the communication between parent and child.
747  * When ready for more data, the child (see MessageReceiverForSource) requests
748  * data through a AF_UNIX socket message. The parent will then assign the next
749  * chunk of data by sending a message back.
750  *
751  * Additionally, this function also monitors the read-side of the pipe fd from the child.
752  * If the child dies unexpectedly, the pipe will be selected as ready for read and
753  * will return EPIPE when read from. Further, if the child thinks the parent has died
754  * (defined as waiting more than 1s for a response), it will write a single byte to
755  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
756  * If still alive, the parent will read the byte and ignore it.
757  *
758  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
759  * handler can distinguish between success and failure cases.
760  */
761 
762  void
763  MessageSenderToSource::operator()() {
764  multicore::MessageForParent childMsg;
765  LogInfo("ForkingController") << "I am controller";
766  //this is the master and therefore the controller
767 
768  multicore::MessageForSource sndmsg;
769  sndmsg.startIndex = 0;
770  sndmsg.nIndices = m_nEventsToProcess;
771  do {
772 
773  fd_set readSockets, errorSockets;
774  // Wait for a request from a child for events.
775  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
776  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
777  // Note that we don't timeout; may be reconsidered in the future.
778  ssize_t rc;
779  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
780  if (rc < 0) {
781  std::cerr << "select failed; should be impossible due to preconditions.\n";
782  abort();
783  break;
784  }
785 
786  // Read the message from the child.
787  for (int idx=0; idx<m_maxFd; idx++) {
788 
789  // Handle errors
790  if (FD_ISSET(idx, &errorSockets)) {
791  LogInfo("ForkingController") << "Error on socket " << idx;
792  FD_CLR(idx, &m_socketSet);
793  close(idx);
794  // See if it was the watchdog pipe that died.
795  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
796  if (*it == idx) {
797  m_aliveChildren--;
798  }
799  }
800  continue;
801  }
802 
803  if (!FD_ISSET(idx, &readSockets)) {
804  continue;
805  }
806 
807  // See if this FD is a child watchdog pipe. If so, read from it to prevent
808  // writes from blocking.
809  bool is_pipe = false;
810  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
811  if (*it == idx) {
812  is_pipe = true;
813  char buf;
814  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
815  if (rc <= 0) {
816  m_aliveChildren--;
817  FD_CLR(idx, &m_socketSet);
818  close(idx);
819  }
820  }
821  }
822 
823  // Only execute this block if the FD is a socket for sending the child work.
824  if (!is_pipe) {
825  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
826  if (rc < 0) {
827  FD_CLR(idx, &m_socketSet);
828  close(idx);
829  continue;
830  }
831 
832  // Tell the child what events to process.
833  // If 'send' fails, then the child process has failed (any other possibilities are
834  // eliminated because we are using fixed-size messages with Unix datagram sockets).
835  // Thus, the SIGCHLD handler will fire and set child_fail = true.
836  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
837  if (rc < 0) {
838  FD_CLR(idx, &m_socketSet);
839  close(idx);
840  continue;
841  }
842  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
843  sndmsg.startIndex += sndmsg.nIndices;
844  }
845  }
846 
847  } while (m_aliveChildren > 0);
848 
849  return;
850  }
851 
852  }
853 
854 
855  void EventProcessor::possiblyContinueAfterForkChildFailure() {
856  if(child_failed && continueAfterChildFailure_) {
857  if (child_fail_signal) {
858  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
859  child_fail_signal=0;
860  } else if (child_fail_exit_status) {
861  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
862  child_fail_exit_status=0;
863  } else {
864  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
865  }
866  child_failed =false;
867  }
868  }
869 
870  bool
871  EventProcessor::forkProcess(std::string const& jobReportFile) {
872 
873  if(0 == numberOfForkedChildren_) {return true;}
874  assert(0<numberOfForkedChildren_);
875  //do what we want done in common
876  {
877  beginJob(); //make sure this was run
878  // make the services available
879  ServiceRegistry::Operate operate(serviceToken_);
880 
881  InputSource::ItemType itemType;
882  itemType = input_->nextItemType();
883 
884  assert(itemType == InputSource::IsFile);
885  {
886  readFile();
887  }
888  itemType = input_->nextItemType();
889  assert(itemType == InputSource::IsRun);
890 
891  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
892  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
893  input_->runAuxiliary()->beginTime());
894  espController_->eventSetupForInstance(ts);
895  EventSetup const& es = esp_->eventSetup();
896 
897  //now get all the data available in the EventSetup
898  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
899  es.fillAvailableRecordKeys(recordKeys);
900  std::vector<eventsetup::DataKey> dataKeys;
901  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
902  itKey != itEnd;
903  ++itKey) {
904  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
905  //see if this is on our exclusion list
906  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
907  ExcludedData const* excludedData(nullptr);
908  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
909  excludedData = &(itExcludeRec->second);
910  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
911  //skip all items in this record
912  continue;
913  }
914  }
915  if(0 != recordPtr) {
916  dataKeys.clear();
917  recordPtr->fillRegisteredDataKeys(dataKeys);
918  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
919  itDataKey != itDataKeyEnd;
920  ++itDataKey) {
921  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
922  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
923  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
924  continue;
925  }
926  try {
927  recordPtr->doGet(*itDataKey);
928  } catch(cms::Exception& e) {
929  LogWarning("ForkingEventSetupPreFetching") << e.what();
930  }
931  }
932  }
933  }
934  }
935  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
936  {
937  // make the services available
938  ServiceRegistry::Operate operate(serviceToken_);
939  Service<JobReport> jobReport;
940  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
941 
942  //Now actually do the forking
943  actReg_->preForkReleaseResourcesSignal_();
944  input_->doPreForkReleaseResources();
945  schedule_->preForkReleaseResources();
946  }
947  installCustomHandler(SIGCHLD, ep_sigchld);
948 
949 
950  unsigned int childIndex = 0;
951  unsigned int const kMaxChildren = numberOfForkedChildren_;
952  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
953  std::vector<pid_t> childrenIds;
954  childrenIds.reserve(kMaxChildren);
955  std::vector<int> childrenSockets;
956  childrenSockets.reserve(kMaxChildren);
957  std::vector<int> childrenPipes;
958  childrenPipes.reserve(kMaxChildren);
959  std::vector<int> childrenSocketsCopy;
960  childrenSocketsCopy.reserve(kMaxChildren);
961  std::vector<int> childrenPipesCopy;
962  childrenPipesCopy.reserve(kMaxChildren);
963  int pipes[] {0, 0};
964 
965  {
966  // make the services available
967  ServiceRegistry::Operate operate(serviceToken_);
968  Service<JobReport> jobReport;
969  int sockets[2], fd_flags;
970  for(; childIndex < kMaxChildren; ++childIndex) {
971  // Create a UNIX_DGRAM socket pair
972  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
973  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
974  exit(EXIT_FAILURE);
975  }
976  if (pipe(pipes)) {
977  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
978  exit(EXIT_FAILURE);
979  }
980  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
981  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
982  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
983  exit(EXIT_FAILURE);
984  }
985  // Mark socket as non-block. Child must be careful to do select prior
986  // to reading from socket.
987  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
988  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
989  exit(EXIT_FAILURE);
990  }
991  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
992  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
993  exit(EXIT_FAILURE);
994  }
995  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
996  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
997  exit(EXIT_FAILURE);
998  }
999  // Linux man page notes there are some edge cases where reading from a
1000  // fd can block, even after a select.
1001  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1002  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1003  exit(EXIT_FAILURE);
1004  }
1005  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1006  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1007  exit(EXIT_FAILURE);
1008  }
1009 
1010  childrenPipesCopy = childrenPipes;
1011  childrenSocketsCopy = childrenSockets;
1012 
1013  pid_t value = fork();
1014  if(value == 0) {
1015  // Close the parent's side of the socket and pipe which will talk to us.
1016  close(pipes[0]);
1017  close(sockets[0]);
1018  // Close our copies of the parent's other communication pipes.
1019  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1020  close(*it);
1021  }
1022  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1023  close(*it);
1024  }
1025 
1026  // this is the child process, redirect stdout and stderr to a log file
1027  fflush(stdout);
1028  fflush(stderr);
1029  std::stringstream stout;
1030  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1031  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1032  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1033  }
1034  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1035  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1036  }
1037 
1038  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1039  if(setCpuAffinity_) {
1040  // CPU affinity is handled differently on macosx.
1041  // We disable it and print a message until someone reads:
1042  //
1043  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1044  //
1045  // and implements it.
1046 #ifdef __APPLE__
1047  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1048 #else
1049  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1050  cpu_set_t mask;
1051  CPU_ZERO(&mask);
1052  CPU_SET(childIndex, &mask);
1053  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1054  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1055  exit(-1);
1056  }
1057 #endif
1058  }
1059  break;
1060  } else {
1061  //this is the parent
1062  close(pipes[1]);
1063  close(sockets[1]);
1064  }
1065  if(value < 0) {
1066  LogError("ForkingChild") << "failed to create a child";
1067  exit(-1);
1068  }
1069  childrenIds.push_back(value);
1070  childrenSockets.push_back(sockets[0]);
1071  childrenPipes.push_back(pipes[0]);
1072  }
1073 
1074  if(childIndex < kMaxChildren) {
1075  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1076  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1077 
1078  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1079  input_->doPostForkReacquireResources(receiver);
1080  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1081  //NOTE: sources have to reset themselves by listening to the post fork message
1082  //rewindInput();
1083  return true;
1084  }
1085  jobReport->parentAfterFork(jobReportFile);
1086  }
1087 
1088  //this is the original, which is now the master for all the children
1089 
1090  //Need to wait for signals from the children or externally
1091  // To wait we must
1092  // 1) block the signals we want to wait on so we do not have a race condition
1093  // 2) check that we haven't already meet our ending criteria
1094  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1095  sigset_t blockingSigSet;
1096  sigset_t unblockingSigSet;
1097  sigset_t oldSigSet;
1098  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1099  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1100  sigaddset(&blockingSigSet, SIGCHLD);
1101  sigaddset(&blockingSigSet, SIGUSR2);
1102  sigaddset(&blockingSigSet, SIGINT);
1103  sigdelset(&unblockingSigSet, SIGCHLD);
1104  sigdelset(&unblockingSigSet, SIGUSR2);
1105  sigdelset(&unblockingSigSet, SIGINT);
1106  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1107 
1108  // If there are too many fd's (unlikely, but possible) for select, denote this
1109  // because the sender will fail.
1110  bool too_many_fds = false;
1111  if (pipes[1]+1 > FD_SETSIZE) {
1112  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1113  too_many_fds = true;
1114  }
1115 
1116  //create a thread that sends the units of work to workers
1117  // we create it after all signals were blocked so that this
1118  // thread is never interupted by a signal
1119  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1120  boost::thread senderThread(sender);
1121 
1122  if(not too_many_fds) {
1123  //NOTE: a child could have failed before we got here and even after this call
1124  // which is why the 'if' is conditional on continueAfterChildFailure_
1125  possiblyContinueAfterForkChildFailure();
1126  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1127  sigsuspend(&unblockingSigSet);
1128  possiblyContinueAfterForkChildFailure();
1129  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1130  }
1131  }
1132  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1133 
1134  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1135  if(child_failed) {
1136  LogError("ForkingStopping") << "child failed";
1137  }
1138  if(shutdown_flag) {
1139  LogSystem("ForkingStopping") << "asked to shutdown";
1140  }
1141 
1142  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1143  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1144  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1145  it != itEnd; ++it) {
1146  /* int result = */ kill(*it, SIGUSR2);
1147  }
1148  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1149  while(num_children_done != kMaxChildren) {
1150  sigsuspend(&unblockingSigSet);
1151  }
1152  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1153  }
1154  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1155  senderThread.join();
1156  if(child_failed && !continueAfterChildFailure_) {
1157  if (child_fail_signal) {
1158  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1159  } else if (child_fail_exit_status) {
1160  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1161  } else {
1162  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1163  }
1164  }
1165  if(too_many_fds) {
1166  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1167  }
1168  return false;
1169  }
1170 
1171  std::vector<ModuleDescription const*>
1172  EventProcessor::getAllModuleDescriptions() const {
1173  return schedule_->getAllModuleDescriptions();
1174  }
1175 
1176  int
1177  EventProcessor::totalEvents() const {
1178  return schedule_->totalEvents();
1179  }
1180 
1181  int
1182  EventProcessor::totalEventsPassed() const {
1183  return schedule_->totalEventsPassed();
1184  }
1185 
1186  int
1187  EventProcessor::totalEventsFailed() const {
1188  return schedule_->totalEventsFailed();
1189  }
1190 
1191  void
1192  EventProcessor::enableEndPaths(bool active) {
1193  schedule_->enableEndPaths(active);
1194  }
1195 
1196  bool
1197  EventProcessor::endPathsEnabled() const {
1198  return schedule_->endPathsEnabled();
1199  }
1200 
1201  void
1202  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1203  schedule_->getTriggerReport(rep);
1204  }
1205 
1206  void
1207  EventProcessor::clearCounters() {
1208  schedule_->clearCounters();
1209  }
1210 
1211 
1212  std::auto_ptr<statemachine::Machine>
1213  EventProcessor::createStateMachine() {
1215  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1216  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1217  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1218  else {
1219  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1220  << fileMode_ << ".\n"
1221  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1222  }
1223 
1224  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1225  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1226  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1227  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1228  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1229  else {
1230  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1231  << emptyRunLumiMode_ << ".\n"
1232  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1233  }
1234 
1235  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1236  fileMode,
1237  emptyRunLumiMode));
1238 
1239  machine->initiate();
1240  return machine;
1241  }
1242 
1243  bool
1244  EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
1245  bool returnValue = false;
1246 
1247  // Look for a shutdown signal
1248  if(shutdown_flag.load(std::memory_order_relaxed)) {
1249  returnValue = true;
1250  returnCode = epSignal;
1251  }
1252  return returnValue;
1253  }
1254 
1255 
1257  EventProcessor::runToCompletion() {
1258 
1259  StatusCode returnCode=epSuccess;
1260  asyncStopStatusCodeFromProcessingEvents_=epSuccess;
1261  std::auto_ptr<statemachine::Machine> machine;
1262  {
1263  beginJob(); //make sure this was called
1264 
1265  //StatusCode returnCode = epSuccess;
1266  stateMachineWasInErrorState_ = false;
1267 
1268  // make the services available
1269  ServiceRegistry::Operate operate(serviceToken_);
1270 
1271  machine = createStateMachine();
1272  nextItemTypeFromProcessingEvents_=InputSource::IsEvent;
1273  asyncStopRequestedWhileProcessingEvents_=false;
1274  try {
1275  try {
1276 
1277  InputSource::ItemType itemType;
1278 
1279  while(true) {
1280 
1281  bool more = true;
1282  if(numberOfForkedChildren_ > 0) {
1283  size_t size = preg_->size();
1284  more = input_->skipForForking();
1285  if(more) {
1286  if(size < preg_->size()) {
1287  principalCache_.adjustIndexesAfterProductRegistryAddition();
1288  }
1289  principalCache_.adjustEventsToNewProductRegistry(preg_);
1290  }
1291  }
1292  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1293 
1294  FDEBUG(1) << "itemType = " << itemType << "\n";
1295 
1296  if(checkForAsyncStopRequest(returnCode)) {
1297  forceLooperToEnd_ = true;
1298  machine->process_event(statemachine::Stop());
1299  forceLooperToEnd_ = false;
1300  break;
1301  }
1302 
1303  if(itemType == InputSource::IsEvent) {
1304  machine->process_event(statemachine::Event());
1305  if(asyncStopRequestedWhileProcessingEvents_) {
1306  forceLooperToEnd_ = true;
1307  machine->process_event(statemachine::Stop());
1308  forceLooperToEnd_ = false;
1309  returnCode = asyncStopStatusCodeFromProcessingEvents_;
1310  break;
1311  }
1312  itemType = nextItemTypeFromProcessingEvents_;
1313  }
1314 
1315  if(itemType == InputSource::IsEvent) {
1316  }
1317  else if(itemType == InputSource::IsStop) {
1318  machine->process_event(statemachine::Stop());
1319  }
1320  else if(itemType == InputSource::IsFile) {
1321  machine->process_event(statemachine::File());
1322  }
1323  else if(itemType == InputSource::IsRun) {
1324  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1325  }
1326  else if(itemType == InputSource::IsLumi) {
1327  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1328  }
1329  else if(itemType == InputSource::IsSynchronize) {
1330  //For now, we don't have to do anything
1331  }
1332  // This should be impossible
1333  else {
1335  << "Unknown next item type passed to EventProcessor\n"
1336  << "Please report this error to the Framework group\n";
1337  }
1338  if(machine->terminated()) {
1339  break;
1340  }
1341  } // End of loop over state machine events
1342  } // Try block
1343  catch (cms::Exception& e) { throw; }
1344  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
1345  catch (std::exception& e) { convertException::stdToEDM(e); }
1347  catch(char const* c) { convertException::charPtrToEDM(c); }
1348  catch (...) { convertException::unknownToEDM(); }
1349  } // Try block
1350  // Some comments on exception handling related to the boost state machine:
1351  //
1352  // Some states used in the machine are special because they
1353  // perform actions while the machine is being terminated, actions
1354  // such as close files, call endRun, call endLumi etc ... Each of these
1355  // states has two functions that perform these actions. The functions
1356  // are almost identical. The major difference is that one version
1357  // catches all exceptions and the other lets exceptions pass through.
1358  // The destructor catches them and the other function named "exit" lets
1359  // them pass through. On a normal termination, boost will always call
1360  // "exit" and then the state destructor. In our state classes, the
1361  // the destructors do nothing if the exit function already took
1362  // care of things. Here's the interesting part. When boost is
1363  // handling an exception the "exit" function is not called (a boost
1364  // feature).
1365  //
1366  // If an exception occurs while the boost machine is in control
1367  // (which usually means inside a process_event call), then
1368  // the boost state machine destroys its states and "terminates" itself.
1369  // This already done before we hit the catch blocks below. In this case
1370  // the call to terminateMachine below only destroys an already
1371  // terminated state machine. Because exit is not called, the state destructors
1372  // handle cleaning up lumis, runs, and files. The destructors swallow
1373  // all exceptions and only pass through the exceptions messages, which
1374  // are tacked onto the original exception below.
1375  //
1376  // If an exception occurs when the boost state machine is not
1377  // in control (outside the process_event functions), then boost
1378  // cannot destroy its own states. The terminateMachine function
1379  // below takes care of that. The flag "alreadyHandlingException"
1380  // is set true so that the state exit functions do nothing (and
1381  // cannot throw more exceptions while handling the first). Then the
1382  // state destructors take care of this because exit did nothing.
1383  //
1384  // In both cases above, the EventProcessor::endOfLoop function is
1385  // not called because it can throw exceptions.
1386  //
1387  // One tricky aspect of the state machine is that things that can
1388  // throw should not be invoked by the state machine while another
1389  // exception is being handled.
1390  // Another tricky aspect is that it appears to be important to
1391  // terminate the state machine before invoking its destructor.
1392  // We've seen crashes that are not understood when that is not
1393  // done. Maintainers of this code should be careful about this.
1394 
1395  catch (cms::Exception & e) {
1396  alreadyHandlingException_ = true;
1397  terminateMachine(machine);
1398  alreadyHandlingException_ = false;
1399  if (!exceptionMessageLumis_.empty()) {
1400  e.addAdditionalInfo(exceptionMessageLumis_);
1401  if (e.alreadyPrinted()) {
1402  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1403  }
1404  }
1405  if (!exceptionMessageRuns_.empty()) {
1406  e.addAdditionalInfo(exceptionMessageRuns_);
1407  if (e.alreadyPrinted()) {
1408  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1409  }
1410  }
1411  if (!exceptionMessageFiles_.empty()) {
1412  e.addAdditionalInfo(exceptionMessageFiles_);
1413  if (e.alreadyPrinted()) {
1414  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1415  }
1416  }
1417  throw;
1418  }
1419 
1420  if(machine->terminated()) {
1421  FDEBUG(1) << "The state machine reports it has been terminated\n";
1422  machine.reset();
1423  }
1424 
1425  if(stateMachineWasInErrorState_) {
1426  throw cms::Exception("BadState")
1427  << "The boost state machine in the EventProcessor exited after\n"
1428  << "entering the Error state.\n";
1429  }
1430 
1431  }
1432  if(machine.get() != 0) {
1433  terminateMachine(machine);
1435  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1436  << "Please report this error to the Framework group\n";
1437  }
1438 
1439  return returnCode;
1440  }
1441 
1442  void EventProcessor::readFile() {
1443  FDEBUG(1) << " \treadFile\n";
1444  size_t size = preg_->size();
1445  fb_ = input_->readFile();
1446  if(size < preg_->size()) {
1447  principalCache_.adjustIndexesAfterProductRegistryAddition();
1448  }
1449  principalCache_.adjustEventsToNewProductRegistry(preg_);
1450  if(numberOfForkedChildren_ > 0) {
1451  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1452  }
1453  }
1454 
1455  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1456  if (fb_.get() != nullptr) {
1457  input_->closeFile(fb_.get(), cleaningUpAfterException);
1458  }
1459  FDEBUG(1) << "\tcloseInputFile\n";
1460  }
1461 
1462  void EventProcessor::openOutputFiles() {
1463  if (fb_.get() != nullptr) {
1464  schedule_->openOutputFiles(*fb_);
1465  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1466  }
1467  FDEBUG(1) << "\topenOutputFiles\n";
1468  }
1469 
1470  void EventProcessor::closeOutputFiles() {
1471  if (fb_.get() != nullptr) {
1472  schedule_->closeOutputFiles();
1473  if(hasSubProcess()) subProcess_->closeOutputFiles();
1474  }
1475  FDEBUG(1) << "\tcloseOutputFiles\n";
1476  }
1477 
1478  void EventProcessor::respondToOpenInputFile() {
1479  if (fb_.get() != nullptr) {
1480  schedule_->respondToOpenInputFile(*fb_);
1481  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1482  }
1483  FDEBUG(1) << "\trespondToOpenInputFile\n";
1484  }
1485 
1486  void EventProcessor::respondToCloseInputFile() {
1487  if (fb_.get() != nullptr) {
1488  schedule_->respondToCloseInputFile(*fb_);
1489  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1490  }
1491  FDEBUG(1) << "\trespondToCloseInputFile\n";
1492  }
1493 
1494  void EventProcessor::startingNewLoop() {
1495  shouldWeStop_ = false;
1496  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1497  // until after we've called beginOfJob
1498  if(looper_ && looperBeginJobRun_) {
1499  looper_->doStartingNewLoop();
1500  }
1501  FDEBUG(1) << "\tstartingNewLoop\n";
1502  }
1503 
1504  bool EventProcessor::endOfLoop() {
1505  if(looper_) {
1506  ModuleChanger changer(schedule_.get());
1507  looper_->setModuleChanger(&changer);
1508  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1509  looper_->setModuleChanger(nullptr);
1510  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1511  else return false;
1512  }
1513  FDEBUG(1) << "\tendOfLoop\n";
1514  return true;
1515  }
1516 
1517  void EventProcessor::rewindInput() {
1518  input_->repeat();
1519  input_->rewind();
1520  FDEBUG(1) << "\trewind\n";
1521  }
1522 
1523  void EventProcessor::prepareForNextLoop() {
1524  looper_->prepareForNextLoop(esp_.get());
1525  FDEBUG(1) << "\tprepareForNextLoop\n";
1526  }
1527 
1528  bool EventProcessor::shouldWeCloseOutput() const {
1529  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1530  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1531  }
1532 
1533  void EventProcessor::doErrorStuff() {
1534  FDEBUG(1) << "\tdoErrorStuff\n";
1535  LogError("StateMachine")
1536  << "The EventProcessor state machine encountered an unexpected event\n"
1537  << "and went to the error state\n"
1538  << "Will attempt to terminate processing normally\n"
1539  << "(IF using the looper the next loop will be attempted)\n"
1540  << "This likely indicates a bug in an input module or corrupted input or both\n";
1541  stateMachineWasInErrorState_ = true;
1542  }
1543 
1544  void EventProcessor::beginRun(statemachine::Run const& run) {
1545  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1546  input_->doBeginRun(runPrincipal, &processContext_);
1547  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1548  runPrincipal.beginTime());
1549  if(forceESCacheClearOnNewRun_){
1550  espController_->forceCacheClear();
1551  }
1552  espController_->eventSetupForInstance(ts);
1553  EventSetup const& es = esp_->eventSetup();
1554  if(looper_ && looperBeginJobRun_== false) {
1555  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1556  looper_->beginOfJob(es);
1557  looperBeginJobRun_ = true;
1558  looper_->doStartingNewLoop();
1559  }
1560  {
1562  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1563  if(hasSubProcess()) {
1564  subProcess_->doBeginRun(runPrincipal, ts);
1565  }
1566  }
1567  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1568  if(looper_) {
1569  looper_->doBeginRun(runPrincipal, es, &processContext_);
1570  }
1571  {
1573  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1574  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1575  if(hasSubProcess()) {
1576  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1577  }
1578  }
1579  }
1580  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1581  if(looper_) {
1582  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1583  }
1584  }
1585 
1586  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
1587  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1588  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1589  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1590  runPrincipal.endTime());
1591  espController_->eventSetupForInstance(ts);
1592  EventSetup const& es = esp_->eventSetup();
1593  {
1594  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1596  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1597  if(hasSubProcess()) {
1598  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1599  }
1600  }
1601  }
1602  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1603  if(looper_) {
1604  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1605  }
1606  {
1608  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1609  if(hasSubProcess()) {
1610  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1611  }
1612  }
1613  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1614  if(looper_) {
1615  looper_->doEndRun(runPrincipal, es, &processContext_);
1616  }
1617  }
1618 
1619  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1620  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1621  input_->doBeginLumi(lumiPrincipal, &processContext_);
1622 
1624  if(rng.isAvailable()) {
1625  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1626  rng->preBeginLumi(lb);
1627  }
1628 
1629  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1630  // lumi blocks know their start and end times why not also start and end events?
1631  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1632  espController_->eventSetupForInstance(ts);
1633  EventSetup const& es = esp_->eventSetup();
1634  {
1636  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1637  if(hasSubProcess()) {
1638  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1639  }
1640  }
1641  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1642  if(looper_) {
1643  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1644  }
1645  {
1646  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1648  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1649  if(hasSubProcess()) {
1650  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1651  }
1652  }
1653  }
1654  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1655  if(looper_) {
1656  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1657  }
1658  }
1659 
1660  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1661  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1662  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1663  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1664  // lumi blocks know their start and end times why not also start and end events?
1665  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1666  lumiPrincipal.endTime());
1667  espController_->eventSetupForInstance(ts);
1668  EventSetup const& es = esp_->eventSetup();
1669  {
1670  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1672  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1673  if(hasSubProcess()) {
1674  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1675  }
1676  }
1677  }
1678  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1679  if(looper_) {
1680  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1681  }
1682  {
1684  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1685  if(hasSubProcess()) {
1686  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1687  }
1688  }
1689  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1690  if(looper_) {
1691  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1692  }
1693  }
1694 
1695  statemachine::Run EventProcessor::readRun() {
1696  if (principalCache_.hasRunPrincipal()) {
1698  << "EventProcessor::readRun\n"
1699  << "Illegal attempt to insert run into cache\n"
1700  << "Contact a Framework Developer\n";
1701  }
1702  boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(input_->runAuxiliary(),
1703  preg_,
1704  *processConfiguration_,
1705  historyAppender_.get(),
1706  0));
1707  input_->readRun(*rp, *historyAppender_);
1708  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1709  principalCache_.insert(rp);
1710  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1711  }
1712 
1713  statemachine::Run EventProcessor::readAndMergeRun() {
1714  principalCache_.merge(input_->runAuxiliary(), preg_);
1715  auto runPrincipal =principalCache_.runPrincipalPtr();
1716  input_->readAndMergeRun(*runPrincipal);
1717  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1718  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1719  }
1720 
1721  int EventProcessor::readLuminosityBlock() {
1722  if (principalCache_.hasLumiPrincipal()) {
1724  << "EventProcessor::readRun\n"
1725  << "Illegal attempt to insert lumi into cache\n"
1726  << "Contact a Framework Developer\n";
1727  }
1728  if (!principalCache_.hasRunPrincipal()) {
1730  << "EventProcessor::readRun\n"
1731  << "Illegal attempt to insert lumi into cache\n"
1732  << "Run is invalid\n"
1733  << "Contact a Framework Developer\n";
1734  }
1735  boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(input_->luminosityBlockAuxiliary(),
1736  preg_,
1737  *processConfiguration_,
1738  historyAppender_.get(),
1739  0));
1740  input_->readLuminosityBlock(*lbp, *historyAppender_);
1741  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1742  principalCache_.insert(lbp);
1743  return input_->luminosityBlock();
1744  }
1745 
1746  int EventProcessor::readAndMergeLumi() {
1747  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1748  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1749  return input_->luminosityBlock();
1750  }
1751 
1752  void EventProcessor::writeRun(statemachine::Run const& run) {
1753  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1754  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1755  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1756  }
1757 
1758  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
1759  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1760  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1761  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1762  }
1763 
1764  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1765  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1766  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1767  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1768  }
1769 
1770  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
1771  principalCache_.deleteLumi(phid, run, lumi);
1772  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1773  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1774  }
1775 
1776  void EventProcessor::readAndProcessEvent() {
1777  if(numberOfForkedChildren_>0) {
1778  readEvent(0);
1779  processEvent(0);
1780  return;
1781  }
1782  InputSource::ItemType itemType = InputSource::IsEvent;
1783 
1784  //While all the following item types are isEvent, process them right here
1785  asyncStopRequestedWhileProcessingEvents_ = false;
1786 
1787  //We will round-robin which stream to use
1788  unsigned int nextStreamIndex=0;
1789  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1790  do {
1791  readEvent(nextStreamIndex);
1792  processEvent(nextStreamIndex);
1793  nextStreamIndex = (nextStreamIndex+1) % kNumStreams;
1794 
1795  if(shouldWeStop()) {
1796  break;
1797  }
1798  itemType = input_->nextItemType();
1799  if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) {
1800  break;
1801  }
1802  } while (itemType == InputSource::IsEvent);
1803  nextItemTypeFromProcessingEvents_ = itemType;
1804  }
1805  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1806  //TODO this will have to become per stream
1807  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1808  StreamContext streamContext(event.streamID(), &processContext_);
1809  input_->readEvent(event, streamContext);
1810  FDEBUG(1) << "\treadEvent\n";
1811  }
1812  void EventProcessor::processEvent(unsigned int iStreamIndex) {
1813  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1814  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1815  assert(pep->luminosityBlockPrincipalPtrValid());
1816  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1817  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1818 
1819  //We can only update IOVs on Lumi boundaries
1820  //IOVSyncValue ts(pep->id(), pep->time());
1821  //espController_->eventSetupForInstance(ts);
1822  EventSetup const& es = esp_->eventSetup();
1823  {
1825  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
1826  if(hasSubProcess()) {
1827  subProcess_->doEvent(*pep);
1828  }
1829  }
1830 
1831  //NOTE: If we have a looper we only have one Stream
1832  if(looper_) {
1833  bool randomAccess = input_->randomAccess();
1834  ProcessingController::ForwardState forwardState = input_->forwardState();
1835  ProcessingController::ReverseState reverseState = input_->reverseState();
1836  ProcessingController pc(forwardState, reverseState, randomAccess);
1837 
1838  EDLooperBase::Status status = EDLooperBase::kContinue;
1839  do {
1840 
1841  StreamContext streamContext(pep->streamID(), &processContext_);
1842  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
1843 
1844  bool succeeded = true;
1845  if(randomAccess) {
1846  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1847  input_->skipEvents(-2);
1848  }
1849  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1850  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1851  }
1852  }
1853  pc.setLastOperationSucceeded(succeeded);
1854  } while(!pc.lastOperationSucceeded());
1855  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1856 
1857  }
1858 
1859  FDEBUG(1) << "\tprocessEvent\n";
1860  pep->clearEventPrincipal();
1861  }
1862 
1863  bool EventProcessor::shouldWeStop() const {
1864  FDEBUG(1) << "\tshouldWeStop\n";
1865  if(shouldWeStop_) return true;
1866  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
1867  }
1868 
1869  void EventProcessor::setExceptionMessageFiles(std::string& message) {
1870  exceptionMessageFiles_ = message;
1871  }
1872 
1873  void EventProcessor::setExceptionMessageRuns(std::string& message) {
1874  exceptionMessageRuns_ = message;
1875  }
1876 
1877  void EventProcessor::setExceptionMessageLumis(std::string& message) {
1878  exceptionMessageLumis_ = message;
1879  }
1880 
1881  bool EventProcessor::alreadyHandlingException() const {
1882  return alreadyHandlingException_;
1883  }
1884 
1885  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
1886  if(iMachine.get() != 0) {
1887  if(!iMachine->terminated()) {
1888  forceLooperToEnd_ = true;
1889  iMachine->process_event(statemachine::Stop());
1890  forceLooperToEnd_ = false;
1891  }
1892  else {
1893  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
1894  }
1895  if(iMachine->terminated()) {
1896  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
1897  }
1898  iMachine.reset();
1899  }
1900  }
1901 }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void clear()
Not thread safe.
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:60
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:252
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:540
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
dispatcher processEvent(e, inputTag, standby)
tuple lumi
Definition: fjr2json.py:35
Timestamp const & endTime() const
void call(boost::function< void(void)>)
processConfiguration
Definition: Schedule.cc:362
def pipe
Definition: pipe.py:5
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:61
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:61
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
Definition: EventID.h:31
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
static std::string const input
Definition: EdmProvDump.cc:44
boost::shared_ptr< edm::ParameterSet > parameterSet()
ServiceToken serviceToken_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
static InputSourceFactory const * get()
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
void stdToEDM(std::exception const &e)
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:42
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::unique_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:58
virtual void endOfJob()
Definition: EDLooperBase.cc:93
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::auto_ptr< Schedule > schedule_
areg
Definition: Schedule.cc:362
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
boost::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void charPtrToEDM(char const *c)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::RunNumber_t runNumber() const
Definition: EPStates.h:49
static ComponentFactory< T > const * get()
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:48
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
Definition: EventRange.h:32
#define O_NONBLOCK
Definition: SysFile.h:21
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
volatile std::atomic< bool > shutdown_flag false
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:57
tuple status
Definition: ntuplemaker.py:245
void validate(ParameterSet &pset, std::string const &moduleLabel) const
preg
Definition: Schedule.cc:362
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
ParameterSet const & registerIt()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:14
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
Definition: ScheduleItems.h:59
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)