CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
11 
36 
38 
46 
49 
57 
58 #include "MessageForSource.h"
59 #include "MessageForParent.h"
60 
61 #include "boost/bind.hpp"
62 #include "boost/thread/xtime.hpp"
63 
64 #include <exception>
65 #include <iomanip>
66 #include <iostream>
67 #include <utility>
68 #include <sstream>
69 
70 #include <sys/ipc.h>
71 #include <sys/msg.h>
72 
73 //Used for forking
74 #include <sys/types.h>
75 #include <sys/wait.h>
76 #include <sys/socket.h>
77 #include <sys/select.h>
78 #include <sys/fcntl.h>
79 #include <unistd.h>
80 
81 //Used for CPU affinity
82 #ifndef __APPLE__
83 #include <sched.h>
84 #endif
85 
86 //Needed for introspection
87 #include "Cintex/Cintex.h"
88 
89 namespace edm {
90 
91  namespace event_processor {
92 
93  class StateSentry {
94  public:
97  void succeeded() {success_ = true;}
98 
99  private:
101  bool success_;
102  };
103  }
104 
105  using namespace event_processor;
106 
107  namespace {
108  template <typename T>
109  class ScheduleSignalSentry {
110  public:
111  ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
112  a_(a), principal_(principal), es_(es) {
113  if (a_) T::preScheduleSignal(a_, principal_);
114  }
115  ~ScheduleSignalSentry() {
116  if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
117  }
118 
119  private:
120  // We own none of these resources.
121  ActivityRegistry* a_;
122  typename T::MyPrincipal* principal_;
123  EventSetup const* es_;
124  };
125  }
126 
127  namespace {
128 
129  // the next two tables must be kept in sync with the state and
130  // message enums from the header
131 
132  char const* stateNames[] = {
133  "Init",
134  "JobReady",
135  "RunGiven",
136  "Running",
137  "Stopping",
138  "ShuttingDown",
139  "Done",
140  "JobEnded",
141  "Error",
142  "ErrorEnded",
143  "End",
144  "Invalid"
145  };
146 
147  char const* msgNames[] = {
148  "SetRun",
149  "Skip",
150  "RunAsync",
151  "Run(ID)",
152  "Run(count)",
153  "BeginJob",
154  "StopAsync",
155  "ShutdownAsync",
156  "EndJob",
157  "CountComplete",
158  "InputExhausted",
159  "StopSignal",
160  "ShutdownSignal",
161  "Finished",
162  "Any",
163  "dtor",
164  "Exception",
165  "Rewind"
166  };
167  }
168  // IMPORTANT NOTE:
169  // the mAny messages are special, they must appear last in the
170  // table if multiple entries for a CurrentState are present.
171  // the changeState function does not use the mAny yet!!!
172 
173  struct TransEntry {
176  State final;
177  };
178 
179  // we should use this information to initialize a two dimensional
180  // table of t[CurrentState][Message] = FinalState
181 
182  /*
183  the way this is current written, the async run can thread function
184  can return in the "JobReady" state - but not yet cleaned up. The
185  problem is that only when stop/shutdown async is called is the
186  thread cleaned up. But the stop/shudown async functions attempt
187  first to change the state using messages that are not valid in
188  "JobReady" state.
189 
190  I think most of the problems can be solved by using two states
191  for "running": RunningS and RunningA (sync/async). The problems
192  seems to be the all the transitions out of running for both
193  modes of operation. The other solution might be to only go to
194  "Stopping" from Running, and use the return code from "run_p" to
195  set the final state. If this is used, then in the sync mode the
196  "Stopping" state would be momentary.
197 
198  */
199 
201  // CurrentState Message FinalState
202  // -----------------------------------------
203  { sInit, mException, sError },
204  { sInit, mBeginJob, sJobReady },
208  { sJobReady, mSkip, sRunning },
209  { sJobReady, mRunID, sRunning },
213  { sJobReady, mDtor, sEnd }, // should this be allowed?
214 
218 
224  { sRunning, mException, sError },
228  { sRunning, mCountComplete, sStopping }, // sJobReady
229  { sRunning, mInputExhausted, sStopping }, // sJobReady
230 
231  { sStopping, mInputRewind, sRunning }, // The looper needs this
236  { sStopping, mStopAsync, sStopping }, // stay
237  { sStopping, mInputExhausted, sStopping }, // stay
238  //{ sStopping, mAny, sJobReady }, // <- ??????
241  { sShuttingDown, mCountComplete, sDone }, // needed?
242  { sShuttingDown, mInputExhausted, sDone }, // needed?
244  //{ sShuttingDown, mShutdownAsync, sShuttingDown }, // only one at
245  //{ sShuttingDown, mStopAsync, sShuttingDown }, // a time
246  //{ sShuttingDown, mAny, sDone }, // <- ??????
247  { sDone, mEndJob, sJobEnded },
248  { sDone, mException, sError },
249  { sJobEnded, mDtor, sEnd },
251  { sError, mEndJob, sError }, // funny one here
252  { sError, mDtor, sError }, // funny one here
253  { sInit, mDtor, sEnd }, // for StorM dummy EP
254  { sStopping, mShutdownAsync, sShuttingDown }, // For FUEP tests
255  { sInvalid, mAny, sInvalid }
256  };
257 
258 
259  // Note: many of the messages generate the mBeginJob message first
260  // mRunID, mRunCount, mSetRun
261 
262  // ---------------------------------------------------------------
263  std::unique_ptr<InputSource>
265  CommonParams const& common,
266  ProductRegistry& preg,
267  boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
268  boost::shared_ptr<ActivityRegistry> areg,
269  boost::shared_ptr<ProcessConfiguration const> processConfiguration) {
270  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
271  if(main_input == 0) {
273  << "There must be exactly one source in the configuration.\n"
274  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
275  }
276 
277  std::string modtype(main_input->getParameter<std::string>("@module_type"));
278 
279  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
281  ConfigurationDescriptions descriptions(filler->baseType());
282  filler->fill(descriptions);
283 
284  try {
285  try {
286  descriptions.validate(*main_input, std::string("source"));
287  }
288  catch (cms::Exception& e) { throw; }
289  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
290  catch (std::exception& e) { convertException::stdToEDM(e); }
292  catch (char const* c) { convertException::charPtrToEDM(c); }
293  catch (...) { convertException::unknownToEDM(); }
294  }
295  catch (cms::Exception & iException) {
296  std::ostringstream ost;
297  ost << "Validating configuration of input source of type " << modtype;
298  iException.addContext(ost.str());
299  throw;
300  }
301 
302  main_input->registerIt();
303 
304  // Fill in "ModuleDescription", in case the input source produces
305  // any EDproducts, which would be registered in the ProductRegistry.
306  // Also fill in the process history item for this process.
307  // There is no module label for the unnamed input source, so
308  // just use "source".
309  // Only the tracked parameters belong in the process configuration.
310  ModuleDescription md(main_input->id(),
311  main_input->getParameter<std::string>("@module_type"),
312  "source",
313  processConfiguration.get());
314 
315  InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_);
316  areg->preSourceConstructionSignal_(md);
317  std::unique_ptr<InputSource> input;
318  try {
319  try {
320  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
321  }
322  catch (cms::Exception& e) { throw; }
323  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
324  catch (std::exception& e) { convertException::stdToEDM(e); }
326  catch (char const* c) { convertException::charPtrToEDM(c); }
327  catch (...) { convertException::unknownToEDM(); }
328  }
329  catch (cms::Exception& iException) {
330  areg->postSourceConstructionSignal_(md);
331  std::ostringstream ost;
332  ost << "Constructing input source of type " << modtype;
333  iException.addContext(ost.str());
334  throw;
335  }
336  areg->postSourceConstructionSignal_(md);
337  return input;
338  }
339 
340  // ---------------------------------------------------------------
341  boost::shared_ptr<EDLooperBase>
344  ParameterSet& params) {
345  boost::shared_ptr<EDLooperBase> vLooper;
346 
347  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
348 
349  if(loopers.size() == 0) {
350  return vLooper;
351  }
352 
353  assert(1 == loopers.size());
354 
355  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
356  itName != itNameEnd;
357  ++itName) {
358 
359  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
360  providerPSet->registerIt();
361  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
362  cp,
363  *providerPSet);
364  }
365  return vLooper;
366 
367  }
368 
369  // ---------------------------------------------------------------
371  ServiceToken const& iToken,
373  std::vector<std::string> const& defaultServices,
374  std::vector<std::string> const& forcedServices) :
375  preProcessEventSignal_(),
376  postProcessEventSignal_(),
377  actReg_(),
378  preg_(),
379  branchIDListHelper_(),
380  serviceToken_(),
381  input_(),
382  espController_(new eventsetup::EventSetupsController),
383  esp_(),
384  act_table_(),
385  processConfiguration_(),
386  schedule_(),
387  subProcess_(),
388  historyAppender_(new HistoryAppender),
389  state_(sInit),
390  event_loop_(),
391  state_lock_(),
392  stop_lock_(),
393  stopper_(),
394  starter_(),
395  stop_count_(-1),
396  last_rc_(epSuccess),
397  last_error_text_(),
398  id_set_(false),
399  event_loop_id_(),
400  my_sig_num_(getSigNum()),
401  fb_(),
402  looper_(),
403  principalCache_(),
404  shouldWeStop_(false),
405  stateMachineWasInErrorState_(false),
406  fileMode_(),
407  emptyRunLumiMode_(),
408  exceptionMessageFiles_(),
409  exceptionMessageRuns_(),
410  exceptionMessageLumis_(),
411  alreadyHandlingException_(false),
412  forceLooperToEnd_(false),
413  looperBeginJobRun_(false),
414  forceESCacheClearOnNewRun_(false),
415  numberOfForkedChildren_(0),
416  numberOfSequentialEventsPerChild_(1),
417  setCpuAffinity_(false),
418  eventSetupDataToExcludeFromPrefetching_() {
419  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
420  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
421  processDesc->addServices(defaultServices, forcedServices);
422  init(processDesc, iToken, iLegacy);
423  }
424 
426  std::vector<std::string> const& defaultServices,
427  std::vector<std::string> const& forcedServices) :
428  preProcessEventSignal_(),
429  postProcessEventSignal_(),
430  actReg_(),
431  preg_(),
432  branchIDListHelper_(),
433  serviceToken_(),
434  input_(),
435  espController_(new eventsetup::EventSetupsController),
436  esp_(),
437  act_table_(),
438  processConfiguration_(),
439  schedule_(),
440  subProcess_(),
441  historyAppender_(new HistoryAppender),
442  state_(sInit),
443  event_loop_(),
444  state_lock_(),
445  stop_lock_(),
446  stopper_(),
447  starter_(),
448  stop_count_(-1),
449  last_rc_(epSuccess),
450  last_error_text_(),
451  id_set_(false),
452  event_loop_id_(),
453  my_sig_num_(getSigNum()),
454  fb_(),
455  looper_(),
456  principalCache_(),
457  shouldWeStop_(false),
458  stateMachineWasInErrorState_(false),
459  fileMode_(),
460  emptyRunLumiMode_(),
461  exceptionMessageFiles_(),
462  exceptionMessageRuns_(),
463  exceptionMessageLumis_(),
464  alreadyHandlingException_(false),
465  forceLooperToEnd_(false),
466  looperBeginJobRun_(false),
467  forceESCacheClearOnNewRun_(false),
468  numberOfForkedChildren_(0),
469  numberOfSequentialEventsPerChild_(1),
470  setCpuAffinity_(false),
471  eventSetupDataToExcludeFromPrefetching_() {
472  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
473  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
474  processDesc->addServices(defaultServices, forcedServices);
476  }
477 
478  EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
479  ServiceToken const& token,
481  preProcessEventSignal_(),
482  postProcessEventSignal_(),
483  actReg_(),
484  preg_(),
485  branchIDListHelper_(),
486  serviceToken_(),
487  input_(),
488  espController_(new eventsetup::EventSetupsController),
489  esp_(),
490  act_table_(),
491  processConfiguration_(),
492  schedule_(),
493  subProcess_(),
494  historyAppender_(new HistoryAppender),
495  state_(sInit),
496  event_loop_(),
497  state_lock_(),
498  stop_lock_(),
499  stopper_(),
500  starter_(),
501  stop_count_(-1),
502  last_rc_(epSuccess),
503  last_error_text_(),
504  id_set_(false),
505  event_loop_id_(),
506  my_sig_num_(getSigNum()),
507  fb_(),
508  looper_(),
509  principalCache_(),
510  shouldWeStop_(false),
511  stateMachineWasInErrorState_(false),
512  fileMode_(),
513  emptyRunLumiMode_(),
514  exceptionMessageFiles_(),
515  exceptionMessageRuns_(),
516  exceptionMessageLumis_(),
517  alreadyHandlingException_(false),
518  forceLooperToEnd_(false),
519  looperBeginJobRun_(false),
520  forceESCacheClearOnNewRun_(false),
521  numberOfForkedChildren_(0),
522  numberOfSequentialEventsPerChild_(1),
523  setCpuAffinity_(false),
524  eventSetupDataToExcludeFromPrefetching_() {
525  init(processDesc, token, legacy);
526  }
527 
528 
530  preProcessEventSignal_(),
531  postProcessEventSignal_(),
532  actReg_(),
533  preg_(),
534  branchIDListHelper_(),
535  serviceToken_(),
536  input_(),
537  espController_(new eventsetup::EventSetupsController),
538  esp_(),
539  act_table_(),
540  processConfiguration_(),
541  schedule_(),
542  subProcess_(),
543  historyAppender_(new HistoryAppender),
544  state_(sInit),
545  event_loop_(),
546  state_lock_(),
547  stop_lock_(),
548  stopper_(),
549  starter_(),
550  stop_count_(-1),
551  last_rc_(epSuccess),
552  last_error_text_(),
553  id_set_(false),
554  event_loop_id_(),
555  my_sig_num_(getSigNum()),
556  fb_(),
557  looper_(),
558  principalCache_(),
559  shouldWeStop_(false),
560  stateMachineWasInErrorState_(false),
561  fileMode_(),
562  emptyRunLumiMode_(),
563  exceptionMessageFiles_(),
564  exceptionMessageRuns_(),
565  exceptionMessageLumis_(),
566  alreadyHandlingException_(false),
567  forceLooperToEnd_(false),
568  looperBeginJobRun_(false),
569  forceESCacheClearOnNewRun_(false),
570  numberOfForkedChildren_(0),
571  numberOfSequentialEventsPerChild_(1),
572  setCpuAffinity_(false),
573  eventSetupDataToExcludeFromPrefetching_() {
574  if(isPython) {
575  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
576  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
578  }
579  else {
580  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
582  }
583  }
584 
585  void
586  EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
587  ServiceToken const& iToken,
589 
590  //std::cerr << processDesc->dump() << std::endl;
591 
592  ROOT::Cintex::Cintex::Enable();
593 
594  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
595  //std::cerr << parameterSet->dump() << std::endl;
596 
597  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
598  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
599 
600  // Now set some parameters specific to the main process.
601  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
602  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
603  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
604  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
605  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
606  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
607  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
608  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
609  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
610  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
611  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
612  itPS != itPSEnd;
613  ++itPS) {
614  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
615  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
616  itPS->getUntrackedParameter<std::string>("label", "")));
617  }
618  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
619 
620  // Now do general initialization
621  ScheduleItems items;
622 
623  //initialize the services
624  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
625  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
626  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
627 
628  //make the services available
630 
631  // intialize miscellaneous items
632  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
633 
634  // intialize the event setup provider
635  esp_ = espController_->makeProvider(*parameterSet);
636 
637  // initialize the looper, if any
638  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
639  if(looper_) {
640  looper_->setActionTable(items.act_table_.get());
641  looper_->attachTo(*items.actReg_);
642  }
643 
644  // initialize the input source
645  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
646 
647  // intialize the Schedule
648  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
649 
650  // set the data members
651  act_table_ = std::move(items.act_table_);
652  actReg_ = items.actReg_;
653  preg_.reset(items.preg_.release());
656 
657  FDEBUG(2) << parameterSet << std::endl;
658  connectSigs(this);
659 
660  // Reusable event principal
661  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
663 
664  // initialize the subprocess, if there is one
665  if(subProcessParameterSet) {
666  subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
667  }
668  }
669 
671  // Make the services available while everything is being deleted.
672  ServiceToken token = getToken();
673  ServiceRegistry::Operate op(token);
674  try {
676  }
677  catch(cms::Exception& e) {
678  LogError("System")
679  << e.explainSelf() << "\n";
680  }
681 
682  // manually destroy all these thing that may need the services around
683  espController_.reset();
684  subProcess_.reset();
685  esp_.reset();
686  schedule_.reset();
687  input_.reset();
688  looper_.reset();
689  actReg_.reset();
690 
691  pset::Registry* psetRegistry = pset::Registry::instance();
692  psetRegistry->data().clear();
693  psetRegistry->extra().setID(ParameterSetID());
694 
696  ParentageRegistry::instance()->data().clear();
699  }
700 
701  void
703  if(state_ != sInit) return;
704  bk::beginJob();
705  // can only be run if in the initial state
707 
708  // StateSentry toerror(this); // should we add this ?
709  //make the services available
711 
712  //NOTE: This implementation assumes 'Job' means one call
713  // the EventProcessor::run
714  // If it really means once per 'application' then this code will
715  // have to be changed.
716  // Also have to deal with case where have 'run' then new Module
717  // added and do 'run'
718  // again. In that case the newly added Module needs its 'beginJob'
719  // to be called.
720 
721  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
722  // For now we delay calling beginOfJob until first beginOfRun
723  //if(looper_) {
724  // looper_->beginOfJob(es);
725  //}
726  try {
727  try {
728  input_->doBeginJob();
729  }
730  catch (cms::Exception& e) { throw; }
731  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
732  catch (std::exception& e) { convertException::stdToEDM(e); }
734  catch(char const* c) { convertException::charPtrToEDM(c); }
735  catch (...) { convertException::unknownToEDM(); }
736  }
737  catch(cms::Exception& ex) {
738  ex.addContext("Calling beginJob for the source");
739  throw;
740  }
741  schedule_->beginJob(*preg_);
742  // toerror.succeeded(); // should we add this?
743  if(hasSubProcess()) subProcess_->doBeginJob();
744  actReg_->postBeginJobSignal_();
745  }
746 
747  void
749  // Collects exceptions, so we don't throw before all operations are performed.
750  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
751 
752  // only allowed to run if state is sIdle, sJobReady, sRunGiven
753  c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
754 
755  //make the services available
757 
758  schedule_->endJob(c);
759  if(hasSubProcess()) {
760  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
761  }
762  c.call(boost::bind(&InputSource::doEndJob, input_.get()));
763  if(looper_) {
764  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
765  }
766  auto actReg = actReg_.get();
767  c.call([actReg](){actReg->postEndJobSignal_();});
768  if(c.hasThrown()) {
769  c.rethrow();
770  }
771  }
772 
775  return serviceToken_;
776  }
777 
778  //Setup signal handler to listen for when forked children stop
779  namespace {
780  //These are volatile since the compiler can not be allowed to optimize them
781  // since they can be modified in the signaller handler
782  volatile bool child_failed = false;
783  volatile unsigned int num_children_done = 0;
784  volatile int child_fail_exit_status = 0;
785  volatile int child_fail_signal = 0;
786 
787  //NOTE: We setup the signal handler to run in the main thread which
788  // is also the same thread that then reads the above values
789 
790  extern "C" {
791  void ep_sigchld(int, siginfo_t*, void*) {
792  //printf("in sigchld\n");
793  //FDEBUG(1) << "in sigchld handler\n";
794  int stat_loc;
795  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
796  while(0<p) {
797  //printf(" looping\n");
798  if(WIFEXITED(stat_loc)) {
799  ++num_children_done;
800  if(0 != WEXITSTATUS(stat_loc)) {
801  child_fail_exit_status = WEXITSTATUS(stat_loc);
802  child_failed = true;
803  }
804  }
805  if(WIFSIGNALED(stat_loc)) {
806  ++num_children_done;
807  child_fail_signal = WTERMSIG(stat_loc);
808  child_failed = true;
809  }
810  p = waitpid(-1, &stat_loc, WNOHANG);
811  }
812  }
813  }
814 
815  }
816 
817  enum {
822  };
823 
824  namespace {
825  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
826  unsigned int n = 0;
827  while(numberOfChildren != 0) {
828  ++n;
829  numberOfChildren /= 10;
830  }
831  if(n == 0) {
832  n = 3; // Protect against zero numberOfChildren
833  }
834  return n;
835  }
836 
837  /*This class embodied the thread which is used to listen to the forked children and
838  then tell them which events they should process */
839  class MessageSenderToSource {
840  public:
841  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
842  void operator()();
843 
844  private:
845  const std::vector<int>& m_childrenPipes;
846  long const m_nEventsToProcess;
847  fd_set m_socketSet;
848  unsigned int m_aliveChildren;
849  int m_maxFd;
850  };
851 
852  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
853  std::vector<int> const& childrenPipes,
854  long iNEventsToProcess):
855  m_childrenPipes(childrenPipes),
856  m_nEventsToProcess(iNEventsToProcess),
857  m_aliveChildren(childrenSockets.size()),
858  m_maxFd(0)
859  {
860  FD_ZERO(&m_socketSet);
861  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
862  it != itEnd; it++) {
863  FD_SET(*it, &m_socketSet);
864  if (*it > m_maxFd) {
865  m_maxFd = *it;
866  }
867  }
868  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
869  it != itEnd; ++it) {
870  FD_SET(*it, &m_socketSet);
871  if (*it > m_maxFd) {
872  m_maxFd = *it;
873  }
874  }
875  m_maxFd++; // select reads [0,m_maxFd).
876  }
877 
878  /* This function is the heart of the communication between parent and child.
879  * When ready for more data, the child (see MessageReceiverForSource) requests
880  * data through a AF_UNIX socket message. The parent will then assign the next
881  * chunk of data by sending a message back.
882  *
883  * Additionally, this function also monitors the read-side of the pipe fd from the child.
884  * If the child dies unexpectedly, the pipe will be selected as ready for read and
885  * will return EPIPE when read from. Further, if the child thinks the parent has died
886  * (defined as waiting more than 1s for a response), it will write a single byte to
887  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
888  * If still alive, the parent will read the byte and ignore it.
889  *
890  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
891  * handler can distinguish between success and failure cases.
892  */
893 
894  void
895  MessageSenderToSource::operator()() {
896  multicore::MessageForParent childMsg;
897  LogInfo("ForkingController") << "I am controller";
898  //this is the master and therefore the controller
899 
900  multicore::MessageForSource sndmsg;
901  sndmsg.startIndex = 0;
902  sndmsg.nIndices = m_nEventsToProcess;
903  do {
904 
905  fd_set readSockets, errorSockets;
906  // Wait for a request from a child for events.
907  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
908  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
909  // Note that we don't timeout; may be reconsidered in the future.
910  ssize_t rc;
911  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
912  if (rc < 0) {
913  std::cerr << "select failed; should be impossible due to preconditions.\n";
914  abort();
915  break;
916  }
917 
918  // Read the message from the child.
919  for (int idx=0; idx<m_maxFd; idx++) {
920 
921  // Handle errors
922  if (FD_ISSET(idx, &errorSockets)) {
923  LogInfo("ForkingController") << "Error on socket " << idx;
924  FD_CLR(idx, &m_socketSet);
925  close(idx);
926  // See if it was the watchdog pipe that died.
927  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
928  if (*it == idx) {
929  m_aliveChildren--;
930  }
931  }
932  continue;
933  }
934 
935  if (!FD_ISSET(idx, &readSockets)) {
936  continue;
937  }
938 
939  // See if this FD is a child watchdog pipe. If so, read from it to prevent
940  // writes from blocking.
941  bool is_pipe = false;
942  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
943  if (*it == idx) {
944  is_pipe = true;
945  char buf;
946  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
947  if (rc <= 0) {
948  m_aliveChildren--;
949  FD_CLR(idx, &m_socketSet);
950  close(idx);
951  }
952  }
953  }
954 
955  // Only execute this block if the FD is a socket for sending the child work.
956  if (!is_pipe) {
957  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
958  if (rc < 0) {
959  FD_CLR(idx, &m_socketSet);
960  close(idx);
961  continue;
962  }
963 
964  // Tell the child what events to process.
965  // If 'send' fails, then the child process has failed (any other possibilities are
966  // eliminated because we are using fixed-size messages with Unix datagram sockets).
967  // Thus, the SIGCHLD handler will fire and set child_fail = true.
968  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
969  if (rc < 0) {
970  FD_CLR(idx, &m_socketSet);
971  close(idx);
972  continue;
973  }
974  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
975  sndmsg.startIndex += sndmsg.nIndices;
976  }
977  }
978 
979  } while (m_aliveChildren > 0);
980 
981  return;
982  }
983 
984  }
985 
986 
987  void EventProcessor::possiblyContinueAfterForkChildFailure() {
988  if(child_failed && continueAfterChildFailure_) {
989  if (child_fail_signal) {
990  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
991  child_fail_signal=0;
992  } else if (child_fail_exit_status) {
993  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
994  child_fail_exit_status=0;
995  } else {
996  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
997  }
998  child_failed =false;
999  }
1000  }
1001 
1002  bool
1003  EventProcessor::forkProcess(std::string const& jobReportFile) {
1004 
1005  if(0 == numberOfForkedChildren_) {return true;}
1006  assert(0<numberOfForkedChildren_);
1007  //do what we want done in common
1008  {
1009  beginJob(); //make sure this was run
1010  // make the services available
1011  ServiceRegistry::Operate operate(serviceToken_);
1012 
1013  InputSource::ItemType itemType;
1014  itemType = input_->nextItemType();
1015 
1016  assert(itemType == InputSource::IsFile);
1017  {
1018  readFile();
1019  }
1020  itemType = input_->nextItemType();
1021  assert(itemType == InputSource::IsRun);
1022 
1023  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
1024  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
1025  input_->runAuxiliary()->beginTime());
1026  espController_->eventSetupForInstance(ts);
1027  EventSetup const& es = esp_->eventSetup();
1028 
1029  //now get all the data available in the EventSetup
1030  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1031  es.fillAvailableRecordKeys(recordKeys);
1032  std::vector<eventsetup::DataKey> dataKeys;
1033  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1034  itKey != itEnd;
1035  ++itKey) {
1036  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
1037  //see if this is on our exclusion list
1038  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
1039  ExcludedData const* excludedData(nullptr);
1040  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1041  excludedData = &(itExcludeRec->second);
1042  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
1043  //skip all items in this record
1044  continue;
1045  }
1046  }
1047  if(0 != recordPtr) {
1048  dataKeys.clear();
1049  recordPtr->fillRegisteredDataKeys(dataKeys);
1050  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1051  itDataKey != itDataKeyEnd;
1052  ++itDataKey) {
1053  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1054  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1055  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1056  continue;
1057  }
1058  try {
1059  recordPtr->doGet(*itDataKey);
1060  } catch(cms::Exception& e) {
1061  LogWarning("ForkingEventSetupPreFetching") << e.what();
1062  }
1063  }
1064  }
1065  }
1066  }
1067  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
1068  {
1069  // make the services available
1070  ServiceRegistry::Operate operate(serviceToken_);
1071  Service<JobReport> jobReport;
1072  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1073 
1074  //Now actually do the forking
1075  actReg_->preForkReleaseResourcesSignal_();
1076  input_->doPreForkReleaseResources();
1077  schedule_->preForkReleaseResources();
1078  }
1079  installCustomHandler(SIGCHLD, ep_sigchld);
1080 
1081 
1082  unsigned int childIndex = 0;
1083  unsigned int const kMaxChildren = numberOfForkedChildren_;
1084  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1085  std::vector<pid_t> childrenIds;
1086  childrenIds.reserve(kMaxChildren);
1087  std::vector<int> childrenSockets;
1088  childrenSockets.reserve(kMaxChildren);
1089  std::vector<int> childrenPipes;
1090  childrenPipes.reserve(kMaxChildren);
1091  std::vector<int> childrenSocketsCopy;
1092  childrenSocketsCopy.reserve(kMaxChildren);
1093  std::vector<int> childrenPipesCopy;
1094  childrenPipesCopy.reserve(kMaxChildren);
1095  int pipes[] {0, 0};
1096 
1097  {
1098  // make the services available
1099  ServiceRegistry::Operate operate(serviceToken_);
1100  Service<JobReport> jobReport;
1101  int sockets[2], fd_flags;
1102  for(; childIndex < kMaxChildren; ++childIndex) {
1103  // Create a UNIX_DGRAM socket pair
1104  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1105  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1106  exit(EXIT_FAILURE);
1107  }
1108  if (pipe(pipes)) {
1109  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1110  exit(EXIT_FAILURE);
1111  }
1112  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1113  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1114  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1115  exit(EXIT_FAILURE);
1116  }
1117  // Mark socket as non-block. Child must be careful to do select prior
1118  // to reading from socket.
1119  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1120  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1121  exit(EXIT_FAILURE);
1122  }
1123  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1124  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1125  exit(EXIT_FAILURE);
1126  }
1127  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1128  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1129  exit(EXIT_FAILURE);
1130  }
1131  // Linux man page notes there are some edge cases where reading from a
1132  // fd can block, even after a select.
1133  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1134  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1135  exit(EXIT_FAILURE);
1136  }
1137  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1138  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1139  exit(EXIT_FAILURE);
1140  }
1141 
1142  childrenPipesCopy = childrenPipes;
1143  childrenSocketsCopy = childrenSockets;
1144 
1145  pid_t value = fork();
1146  if(value == 0) {
1147  // Close the parent's side of the socket and pipe which will talk to us.
1148  close(pipes[0]);
1149  close(sockets[0]);
1150  // Close our copies of the parent's other communication pipes.
1151  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1152  close(*it);
1153  }
1154  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1155  close(*it);
1156  }
1157 
1158  // this is the child process, redirect stdout and stderr to a log file
1159  fflush(stdout);
1160  fflush(stderr);
1161  std::stringstream stout;
1162  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1163  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1164  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1165  }
1166  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1167  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1168  }
1169 
1170  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1171  if(setCpuAffinity_) {
1172  // CPU affinity is handled differently on macosx.
1173  // We disable it and print a message until someone reads:
1174  //
1175  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1176  //
1177  // and implements it.
1178 #ifdef __APPLE__
1179  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1180 #else
1181  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1182  cpu_set_t mask;
1183  CPU_ZERO(&mask);
1184  CPU_SET(childIndex, &mask);
1185  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1186  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1187  exit(-1);
1188  }
1189 #endif
1190  }
1191  break;
1192  } else {
1193  //this is the parent
1194  close(pipes[1]);
1195  close(sockets[1]);
1196  }
1197  if(value < 0) {
1198  LogError("ForkingChild") << "failed to create a child";
1199  exit(-1);
1200  }
1201  childrenIds.push_back(value);
1202  childrenSockets.push_back(sockets[0]);
1203  childrenPipes.push_back(pipes[0]);
1204  }
1205 
1206  if(childIndex < kMaxChildren) {
1207  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1208  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1209 
1210  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1211  input_->doPostForkReacquireResources(receiver);
1212  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1213  //NOTE: sources have to reset themselves by listening to the post fork message
1214  //rewindInput();
1215  return true;
1216  }
1217  jobReport->parentAfterFork(jobReportFile);
1218  }
1219 
1220  //this is the original, which is now the master for all the children
1221 
1222  //Need to wait for signals from the children or externally
1223  // To wait we must
1224  // 1) block the signals we want to wait on so we do not have a race condition
1225  // 2) check that we haven't already meet our ending criteria
1226  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1227  sigset_t blockingSigSet;
1228  sigset_t unblockingSigSet;
1229  sigset_t oldSigSet;
1230  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1231  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1232  sigaddset(&blockingSigSet, SIGCHLD);
1233  sigaddset(&blockingSigSet, SIGUSR2);
1234  sigaddset(&blockingSigSet, SIGINT);
1235  sigdelset(&unblockingSigSet, SIGCHLD);
1236  sigdelset(&unblockingSigSet, SIGUSR2);
1237  sigdelset(&unblockingSigSet, SIGINT);
1238  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1239 
1240  // If there are too many fd's (unlikely, but possible) for select, denote this
1241  // because the sender will fail.
1242  bool too_many_fds = false;
1243  if (pipes[1]+1 > FD_SETSIZE) {
1244  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1245  too_many_fds = true;
1246  }
1247 
1248  //create a thread that sends the units of work to workers
1249  // we create it after all signals were blocked so that this
1250  // thread is never interupted by a signal
1251  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1252  boost::thread senderThread(sender);
1253 
1254  if(not too_many_fds) {
1255  //NOTE: a child could have failed before we got here and even after this call
1256  // which is why the 'if' is conditional on continueAfterChildFailure_
1257  possiblyContinueAfterForkChildFailure();
1258  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1259  sigsuspend(&unblockingSigSet);
1260  possiblyContinueAfterForkChildFailure();
1261  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1262  }
1263  }
1264  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1265 
1266  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1267  if(child_failed) {
1268  LogError("ForkingStopping") << "child failed";
1269  }
1270  if(shutdown_flag) {
1271  LogSystem("ForkingStopping") << "asked to shutdown";
1272  }
1273 
1274  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1275  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1276  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1277  it != itEnd; ++it) {
1278  /* int result = */ kill(*it, SIGUSR2);
1279  }
1280  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1281  while(num_children_done != kMaxChildren) {
1282  sigsuspend(&unblockingSigSet);
1283  }
1284  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1285  }
1286  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1287  senderThread.join();
1288  if(child_failed && !continueAfterChildFailure_) {
1289  if (child_fail_signal) {
1290  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1291  } else if (child_fail_exit_status) {
1292  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1293  } else {
1294  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1295  }
1296  }
1297  if(too_many_fds) {
1298  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1299  }
1300  return false;
1301  }
1302 
1303  void
1304  EventProcessor::connectSigs(EventProcessor* ep) {
1305  // When the FwkImpl signals are given, pass them to the
1306  // appropriate EventProcessor signals so that the outside world
1307  // can see the signal.
1308  actReg_->preProcessEventSignal_.connect(std::cref(ep->preProcessEventSignal_));
1309  actReg_->postProcessEventSignal_.connect(std::cref(ep->postProcessEventSignal_));
1310  }
1311 
1312  std::vector<ModuleDescription const*>
1313  EventProcessor::getAllModuleDescriptions() const {
1314  return schedule_->getAllModuleDescriptions();
1315  }
1316 
1317  int
1318  EventProcessor::totalEvents() const {
1319  return schedule_->totalEvents();
1320  }
1321 
1322  int
1323  EventProcessor::totalEventsPassed() const {
1324  return schedule_->totalEventsPassed();
1325  }
1326 
1327  int
1328  EventProcessor::totalEventsFailed() const {
1329  return schedule_->totalEventsFailed();
1330  }
1331 
1332  void
1333  EventProcessor::enableEndPaths(bool active) {
1334  schedule_->enableEndPaths(active);
1335  }
1336 
1337  bool
1338  EventProcessor::endPathsEnabled() const {
1339  return schedule_->endPathsEnabled();
1340  }
1341 
1342  void
1343  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1344  schedule_->getTriggerReport(rep);
1345  }
1346 
1347  void
1348  EventProcessor::clearCounters() {
1349  schedule_->clearCounters();
1350  }
1351 
1352  char const* EventProcessor::currentStateName() const {
1353  return stateName(getState());
1354  }
1355 
1356  char const* EventProcessor::stateName(State s) const {
1357  return stateNames[s];
1358  }
1359 
1360  char const* EventProcessor::msgName(Msg m) const {
1361  return msgNames[m];
1362  }
1363 
1364  State EventProcessor::getState() const {
1365  return state_;
1366  }
1367 
1368  EventProcessor::StatusCode EventProcessor::statusAsync() const {
1369  // the thread will record exception/error status in the event processor
1370  // for us to look at and report here
1371  return last_rc_;
1372  }
1373 
1374  void
1375  EventProcessor::setRunNumber(RunNumber_t runNumber) {
1376  if(runNumber == 0) {
1377  runNumber = 1;
1378  LogWarning("Invalid Run")
1379  << "EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
1380  << "Run number was set to 1 instead\n";
1381  }
1382 
1383  // inside of beginJob there is a check to see if it has been called before
1384  beginJob();
1385  changeState(mSetRun);
1386 
1387  // interface not correct yet
1388  input_->setRunNumber(runNumber);
1389  }
1390 
1391  void
1392  EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
1393  // inside of beginJob there is a check to see if it has been called before
1394  beginJob();
1395  changeState(mSetRun);
1396 
1397  // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
1398  //input_->declareRunNumber(runNumber);
1399  }
1400 
1402  EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
1403  bool rc = true;
1404  boost::xtime timeout;
1405 
1406 #if BOOST_VERSION >= 105000
1407  boost::xtime_get(&timeout, boost::TIME_UTC_);
1408 #else
1409  boost::xtime_get(&timeout, boost::TIME_UTC);
1410 #endif
1411  timeout.sec += timeout_seconds;
1412 
1413  // make sure to include a timeout here so we don't wait forever
1414  // I suspect there are still timing issues with thread startup
1415  // and the setting of the various control variables (stop_count, id_set)
1416  {
1417  boost::mutex::scoped_lock sl(stop_lock_);
1418 
1419  // look here - if runAsync not active, just return the last return code
1420  if(stop_count_ < 0) return last_rc_;
1421 
1422  if(timeout_seconds == 0) {
1423  while(stop_count_ == 0) stopper_.wait(sl);
1424  } else {
1425  while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
1426  }
1427 
1428  if(rc == false) {
1429  // timeout occurred
1430  // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
1431  // this is a temporary hack until we get the input source
1432  // upgraded to allow blocking input sources to be unblocked
1433 
1434  // the next line is dangerous and causes all sorts of trouble
1435  if(id_set_) pthread_cancel(event_loop_id_);
1436 
1437  // we will not do anything yet
1438  LogWarning("timeout")
1439  << "An asynchronous request was made to shut down "
1440  << "the event loop "
1441  << "and the event loop did not shutdown after "
1442  << timeout_seconds << " seconds\n";
1443  } else {
1444  event_loop_->join();
1445  event_loop_.reset();
1446  id_set_ = false;
1447  stop_count_ = -1;
1448  }
1449  }
1450  return rc == false ? epTimedOut : last_rc_;
1451  }
1452 
1454  EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
1455  StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1456  if(rc != epTimedOut) changeState(mCountComplete);
1457  else errorState();
1458  return rc;
1459  }
1460 
1461 
1462  EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
1463  changeState(mStopAsync);
1464  StatusCode rc = waitForAsyncCompletion(secs);
1465  if(rc != epTimedOut) changeState(mFinished);
1466  else errorState();
1467  return rc;
1468  }
1469 
1470  EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
1471  changeState(mShutdownAsync);
1472  StatusCode rc = waitForAsyncCompletion(secs);
1473  if(rc != epTimedOut) changeState(mFinished);
1474  else errorState();
1475  return rc;
1476  }
1477 
1478  void EventProcessor::errorState() {
1479  state_ = sError;
1480  }
1481 
1482  // next function irrelevant now
1483  EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
1484  // make sure to include a timeout here so we don't wait forever
1485  // I suspect there are still timing issues with thread startup
1486  // and the setting of the various control variables (stop_count, id_set)
1487  changeState(m);
1488  return waitForAsyncCompletion(60*2);
1489  }
1490 
1491  void EventProcessor::changeState(Msg msg) {
1492  // most likely need to serialize access to this routine
1493 
1494  boost::mutex::scoped_lock sl(state_lock_);
1495  State curr = state_;
1496  int rc;
1497  // found if(not end of table) and
1498  // (state == table.state && (msg == table.message || msg == any))
1499  for(rc = 0;
1500  table[rc].current != sInvalid &&
1501  (curr != table[rc].current ||
1502  (curr == table[rc].current &&
1503  msg != table[rc].message && table[rc].message != mAny));
1504  ++rc);
1505 
1506  if(table[rc].current == sInvalid)
1507  throw cms::Exception("BadState")
1508  << "A member function of EventProcessor has been called in an"
1509  << " inappropriate order.\n"
1510  << "Bad transition from " << stateName(curr) << " "
1511  << "using message " << msgName(msg) << "\n"
1512  << "No where to go from here.\n";
1513 
1514  FDEBUG(1) << "changeState: current=" << stateName(curr)
1515  << ", message=" << msgName(msg)
1516  << " -> new=" << stateName(table[rc].final) << "\n";
1517 
1518  state_ = table[rc].final;
1519  }
1520 
1521  void EventProcessor::runAsync() {
1522  beginJob();
1523  {
1524  boost::mutex::scoped_lock sl(stop_lock_);
1525  if(id_set_ == true) {
1526  std::string err("runAsync called while async event loop already running\n");
1527  LogError("FwkJob") << err;
1528  throw cms::Exception("BadState") << err;
1529  }
1530 
1531  changeState(mRunAsync);
1532 
1533  stop_count_ = 0;
1534  last_rc_ = epSuccess; // forget the last value!
1535  event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
1536  boost::xtime timeout;
1537 #if BOOST_VERSION >= 105000
1538  boost::xtime_get(&timeout, boost::TIME_UTC_);
1539 #else
1540  boost::xtime_get(&timeout, boost::TIME_UTC);
1541 #endif
1542  timeout.sec += 60; // 60 seconds to start!!!!
1543  if(starter_.timed_wait(sl, timeout) == false) {
1544  // yikes - the thread did not start
1545  throw cms::Exception("BadState")
1546  << "Async run thread did not start in 60 seconds\n";
1547  }
1548  }
1549  }
1550 
1551  void EventProcessor::asyncRun(EventProcessor* me) {
1552  // set up signals to allow for interruptions
1553  // ignore all other signals
1554  // make sure no exceptions escape out
1555 
1556  // temporary hack until we modify the input source to allow
1557  // wakeup calls from other threads. This mimics the solution
1558  // in EventFilter/Processor, which I do not like.
1559  // allowing cancels means that the thread just disappears at
1560  // certain points. This is bad for C++ stack variables.
1561  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1562  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
1563  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1564  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1565 
1566  {
1567  boost::mutex::scoped_lock sl(me->stop_lock_);
1568  me->event_loop_id_ = pthread_self();
1569  me->id_set_ = true;
1570  me->starter_.notify_all();
1571  }
1572 
1573  Status rc = epException;
1574  FDEBUG(2) << "asyncRun starting ......................\n";
1575 
1576  try {
1577  bool onlineStateTransitions = true;
1578  rc = me->runToCompletion(onlineStateTransitions);
1579  }
1580  catch (cms::Exception& e) {
1581  LogError("FwkJob") << "cms::Exception caught in "
1582  << "EventProcessor::asyncRun"
1583  << "\n"
1584  << e.explainSelf();
1585  me->last_error_text_ = e.explainSelf();
1586  }
1587  catch (std::exception& e) {
1588  LogError("FwkJob") << "Standard library exception caught in "
1589  << "EventProcessor::asyncRun"
1590  << "\n"
1591  << e.what();
1592  me->last_error_text_ = e.what();
1593  }
1594  catch (...) {
1595  LogError("FwkJob") << "Unknown exception caught in "
1596  << "EventProcessor::asyncRun"
1597  << "\n";
1598  me->last_error_text_ = "Unknown exception caught";
1599  rc = epOther;
1600  }
1601 
1602  me->last_rc_ = rc;
1603 
1604  {
1605  // notify anyone waiting for exit that we are doing so now
1606  boost::mutex::scoped_lock sl(me->stop_lock_);
1607  ++me->stop_count_;
1608  me->stopper_.notify_all();
1609  }
1610  FDEBUG(2) << "asyncRun ending ......................\n";
1611  }
1612 
1613  std::auto_ptr<statemachine::Machine>
1614  EventProcessor::createStateMachine() {
1616  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1617  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1618  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1619  else {
1620  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1621  << fileMode_ << ".\n"
1622  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1623  }
1624 
1625  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1626  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1627  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1628  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1629  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1630  else {
1631  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1632  << emptyRunLumiMode_ << ".\n"
1633  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1634  }
1635 
1636  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1637  fileMode,
1638  emptyRunLumiMode));
1639 
1640  machine->initiate();
1641  return machine;
1642  }
1643 
1644 
1646  EventProcessor::runToCompletion(bool onlineStateTransitions) {
1647 
1648  StateSentry toerror(this);
1649 
1650  StatusCode returnCode=epSuccess;
1651  std::auto_ptr<statemachine::Machine> machine;
1652  {
1653  beginJob(); //make sure this was called
1654 
1655  if(!onlineStateTransitions) changeState(mRunCount);
1656 
1657  //StatusCode returnCode = epSuccess;
1658  stateMachineWasInErrorState_ = false;
1659 
1660  // make the services available
1661  ServiceRegistry::Operate operate(serviceToken_);
1662 
1663  machine = createStateMachine();
1664  try {
1665  try {
1666 
1667  InputSource::ItemType itemType;
1668 
1669  while(true) {
1670 
1671  bool more = true;
1672  if(numberOfForkedChildren_ > 0) {
1673  size_t size = preg_->size();
1674  more = input_->skipForForking();
1675  if(more) {
1676  if(size < preg_->size()) {
1677  principalCache_.adjustIndexesAfterProductRegistryAddition();
1678  }
1679  principalCache_.adjustEventToNewProductRegistry(preg_);
1680  }
1681  }
1682  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1683 
1684  FDEBUG(1) << "itemType = " << itemType << "\n";
1685 
1686  // These are used for asynchronous running only and
1687  // and are checking to see if stopAsync or shutdownAsync
1688  // were called from another thread. In the future, we
1689  // may need to do something better than polling the state.
1690  // With the current code this is the simplest thing and
1691  // it should always work. If the interaction between
1692  // threads becomes more complex this may cause problems.
1693  if(state_ == sStopping) {
1694  FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
1695  forceLooperToEnd_ = true;
1696  machine->process_event(statemachine::Stop());
1697  forceLooperToEnd_ = false;
1698  break;
1699  }
1700  else if(state_ == sShuttingDown) {
1701  FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
1702  forceLooperToEnd_ = true;
1703  machine->process_event(statemachine::Stop());
1704  forceLooperToEnd_ = false;
1705  break;
1706  }
1707 
1708  // Look for a shutdown signal
1709  {
1710  boost::mutex::scoped_lock sl(usr2_lock);
1711  if(shutdown_flag) {
1712  changeState(mShutdownSignal);
1713  returnCode = epSignal;
1714  forceLooperToEnd_ = true;
1715  machine->process_event(statemachine::Stop());
1716  forceLooperToEnd_ = false;
1717  break;
1718  }
1719  }
1720 
1721  if(itemType == InputSource::IsStop) {
1722  machine->process_event(statemachine::Stop());
1723  }
1724  else if(itemType == InputSource::IsFile) {
1725  machine->process_event(statemachine::File());
1726  }
1727  else if(itemType == InputSource::IsRun) {
1728  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1729  }
1730  else if(itemType == InputSource::IsLumi) {
1731  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1732  }
1733  else if(itemType == InputSource::IsEvent) {
1734  machine->process_event(statemachine::Event());
1735  }
1736  // This should be impossible
1737  else {
1739  << "Unknown next item type passed to EventProcessor\n"
1740  << "Please report this error to the Framework group\n";
1741  }
1742 
1743  if(machine->terminated()) {
1744  changeState(mInputExhausted);
1745  break;
1746  }
1747  } // End of loop over state machine events
1748  } // Try block
1749  catch (cms::Exception& e) { throw; }
1750  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
1751  catch (std::exception& e) { convertException::stdToEDM(e); }
1753  catch(char const* c) { convertException::charPtrToEDM(c); }
1754  catch (...) { convertException::unknownToEDM(); }
1755  } // Try block
1756  // Some comments on exception handling related to the boost state machine:
1757  //
1758  // Some states used in the machine are special because they
1759  // perform actions while the machine is being terminated, actions
1760  // such as close files, call endRun, call endLumi etc ... Each of these
1761  // states has two functions that perform these actions. The functions
1762  // are almost identical. The major difference is that one version
1763  // catches all exceptions and the other lets exceptions pass through.
1764  // The destructor catches them and the other function named "exit" lets
1765  // them pass through. On a normal termination, boost will always call
1766  // "exit" and then the state destructor. In our state classes, the
1767  // the destructors do nothing if the exit function already took
1768  // care of things. Here's the interesting part. When boost is
1769  // handling an exception the "exit" function is not called (a boost
1770  // feature).
1771  //
1772  // If an exception occurs while the boost machine is in control
1773  // (which usually means inside a process_event call), then
1774  // the boost state machine destroys its states and "terminates" itself.
1775  // This already done before we hit the catch blocks below. In this case
1776  // the call to terminateMachine below only destroys an already
1777  // terminated state machine. Because exit is not called, the state destructors
1778  // handle cleaning up lumis, runs, and files. The destructors swallow
1779  // all exceptions and only pass through the exceptions messages, which
1780  // are tacked onto the original exception below.
1781  //
1782  // If an exception occurs when the boost state machine is not
1783  // in control (outside the process_event functions), then boost
1784  // cannot destroy its own states. The terminateMachine function
1785  // below takes care of that. The flag "alreadyHandlingException"
1786  // is set true so that the state exit functions do nothing (and
1787  // cannot throw more exceptions while handling the first). Then the
1788  // state destructors take care of this because exit did nothing.
1789  //
1790  // In both cases above, the EventProcessor::endOfLoop function is
1791  // not called because it can throw exceptions.
1792  //
1793  // One tricky aspect of the state machine is that things that can
1794  // throw should not be invoked by the state machine while another
1795  // exception is being handled.
1796  // Another tricky aspect is that it appears to be important to
1797  // terminate the state machine before invoking its destructor.
1798  // We've seen crashes that are not understood when that is not
1799  // done. Maintainers of this code should be careful about this.
1800 
1801  catch (cms::Exception & e) {
1802  alreadyHandlingException_ = true;
1803  terminateMachine(machine);
1804  alreadyHandlingException_ = false;
1805  if (!exceptionMessageLumis_.empty()) {
1806  e.addAdditionalInfo(exceptionMessageLumis_);
1807  if (e.alreadyPrinted()) {
1808  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1809  }
1810  }
1811  if (!exceptionMessageRuns_.empty()) {
1812  e.addAdditionalInfo(exceptionMessageRuns_);
1813  if (e.alreadyPrinted()) {
1814  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1815  }
1816  }
1817  if (!exceptionMessageFiles_.empty()) {
1818  e.addAdditionalInfo(exceptionMessageFiles_);
1819  if (e.alreadyPrinted()) {
1820  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1821  }
1822  }
1823  throw;
1824  }
1825 
1826  if(machine->terminated()) {
1827  FDEBUG(1) << "The state machine reports it has been terminated\n";
1828  machine.reset();
1829  }
1830 
1831  if(!onlineStateTransitions) changeState(mFinished);
1832 
1833  if(stateMachineWasInErrorState_) {
1834  throw cms::Exception("BadState")
1835  << "The boost state machine in the EventProcessor exited after\n"
1836  << "entering the Error state.\n";
1837  }
1838 
1839  }
1840  if(machine.get() != 0) {
1841  terminateMachine(machine);
1843  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1844  << "Please report this error to the Framework group\n";
1845  }
1846 
1847  toerror.succeeded();
1848 
1849  return returnCode;
1850  }
1851 
1852  void EventProcessor::readFile() {
1853  FDEBUG(1) << " \treadFile\n";
1854  size_t size = preg_->size();
1855  fb_ = input_->readFile();
1856  if(size < preg_->size()) {
1857  principalCache_.adjustIndexesAfterProductRegistryAddition();
1858  }
1859  principalCache_.adjustEventToNewProductRegistry(preg_);
1860  if(numberOfForkedChildren_ > 0) {
1861  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1862  }
1863  }
1864 
1865  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1866  if (fb_.get() != nullptr) {
1867  input_->closeFile(fb_.get(), cleaningUpAfterException);
1868  }
1869  FDEBUG(1) << "\tcloseInputFile\n";
1870  }
1871 
1872  void EventProcessor::openOutputFiles() {
1873  if (fb_.get() != nullptr) {
1874  schedule_->openOutputFiles(*fb_);
1875  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1876  }
1877  FDEBUG(1) << "\topenOutputFiles\n";
1878  }
1879 
1880  void EventProcessor::closeOutputFiles() {
1881  if (fb_.get() != nullptr) {
1882  schedule_->closeOutputFiles();
1883  if(hasSubProcess()) subProcess_->closeOutputFiles();
1884  }
1885  FDEBUG(1) << "\tcloseOutputFiles\n";
1886  }
1887 
1888  void EventProcessor::respondToOpenInputFile() {
1889  if (fb_.get() != nullptr) {
1890  schedule_->respondToOpenInputFile(*fb_);
1891  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1892  }
1893  FDEBUG(1) << "\trespondToOpenInputFile\n";
1894  }
1895 
1896  void EventProcessor::respondToCloseInputFile() {
1897  if (fb_.get() != nullptr) {
1898  schedule_->respondToCloseInputFile(*fb_);
1899  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1900  }
1901  FDEBUG(1) << "\trespondToCloseInputFile\n";
1902  }
1903 
1904  void EventProcessor::respondToOpenOutputFiles() {
1905  if (fb_.get() != nullptr) {
1906  schedule_->respondToOpenOutputFiles(*fb_);
1907  if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1908  }
1909  FDEBUG(1) << "\trespondToOpenOutputFiles\n";
1910  }
1911 
1912  void EventProcessor::respondToCloseOutputFiles() {
1913  if (fb_.get() != nullptr) {
1914  schedule_->respondToCloseOutputFiles(*fb_);
1915  if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1916  }
1917  FDEBUG(1) << "\trespondToCloseOutputFiles\n";
1918  }
1919 
1920  void EventProcessor::startingNewLoop() {
1921  shouldWeStop_ = false;
1922  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1923  // until after we've called beginOfJob
1924  if(looper_ && looperBeginJobRun_) {
1925  looper_->doStartingNewLoop();
1926  }
1927  FDEBUG(1) << "\tstartingNewLoop\n";
1928  }
1929 
1930  bool EventProcessor::endOfLoop() {
1931  if(looper_) {
1932  ModuleChanger changer(schedule_.get());
1933  looper_->setModuleChanger(&changer);
1934  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1935  looper_->setModuleChanger(nullptr);
1936  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1937  else return false;
1938  }
1939  FDEBUG(1) << "\tendOfLoop\n";
1940  return true;
1941  }
1942 
1943  void EventProcessor::rewindInput() {
1944  input_->repeat();
1945  input_->rewind();
1946  FDEBUG(1) << "\trewind\n";
1947  }
1948 
1949  void EventProcessor::prepareForNextLoop() {
1950  looper_->prepareForNextLoop(esp_.get());
1951  FDEBUG(1) << "\tprepareForNextLoop\n";
1952  }
1953 
1954  bool EventProcessor::shouldWeCloseOutput() const {
1955  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1956  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1957  }
1958 
1959  void EventProcessor::doErrorStuff() {
1960  FDEBUG(1) << "\tdoErrorStuff\n";
1961  LogError("StateMachine")
1962  << "The EventProcessor state machine encountered an unexpected event\n"
1963  << "and went to the error state\n"
1964  << "Will attempt to terminate processing normally\n"
1965  << "(IF using the looper the next loop will be attempted)\n"
1966  << "This likely indicates a bug in an input module or corrupted input or both\n";
1967  stateMachineWasInErrorState_ = true;
1968  }
1969 
1970  void EventProcessor::beginRun(statemachine::Run const& run) {
1971  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1972  input_->doBeginRun(runPrincipal);
1973  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1974  runPrincipal.beginTime());
1975  if(forceESCacheClearOnNewRun_){
1976  espController_->forceCacheClear();
1977  }
1978  espController_->eventSetupForInstance(ts);
1979  EventSetup const& es = esp_->eventSetup();
1980  if(looper_ && looperBeginJobRun_== false) {
1981  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1982  looper_->beginOfJob(es);
1983  looperBeginJobRun_ = true;
1984  looper_->doStartingNewLoop();
1985  }
1986  {
1988  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1989  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1990  if(hasSubProcess()) {
1991  subProcess_->doBeginRun(runPrincipal, ts);
1992  }
1993  }
1994  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1995  if(looper_) {
1996  looper_->doBeginRun(runPrincipal, es);
1997  }
1998  }
1999 
2000  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
2001  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2002  input_->doEndRun(runPrincipal, cleaningUpAfterException);
2003  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
2004  runPrincipal.endTime());
2005  espController_->eventSetupForInstance(ts);
2006  EventSetup const& es = esp_->eventSetup();
2007  {
2009  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2010  schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2011  if(hasSubProcess()) {
2012  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2013  }
2014  }
2015  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
2016  if(looper_) {
2017  looper_->doEndRun(runPrincipal, es);
2018  }
2019  }
2020 
2021  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
2022  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2023  input_->doBeginLumi(lumiPrincipal);
2024 
2026  if(rng.isAvailable()) {
2027  LuminosityBlock lb(lumiPrincipal, ModuleDescription());
2028  rng->preBeginLumi(lb);
2029  }
2030 
2031  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
2032  // lumi blocks know their start and end times why not also start and end events?
2033  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
2034  espController_->eventSetupForInstance(ts);
2035  EventSetup const& es = esp_->eventSetup();
2036  {
2038  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2039  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2040  if(hasSubProcess()) {
2041  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2042  }
2043  }
2044  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
2045  if(looper_) {
2046  looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2047  }
2048  }
2049 
2050  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
2051  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2052  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2053  //NOTE: Using the max event number for the end of a lumi block is a bad idea
2054  // lumi blocks know their start and end times why not also start and end events?
2055  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
2056  lumiPrincipal.endTime());
2057  espController_->eventSetupForInstance(ts);
2058  EventSetup const& es = esp_->eventSetup();
2059  {
2061  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2062  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2063  if(hasSubProcess()) {
2064  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2065  }
2066  }
2067  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
2068  if(looper_) {
2069  looper_->doEndLuminosityBlock(lumiPrincipal, es);
2070  }
2071  }
2072 
2073  statemachine::Run EventProcessor::readAndCacheRun() {
2074  if (principalCache_.hasRunPrincipal()) {
2076  << "EventProcessor::readAndCacheRun\n"
2077  << "Illegal attempt to insert run into cache\n"
2078  << "Contact a Framework Developer\n";
2079  }
2080  principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
2081  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2082  }
2083 
2084  statemachine::Run EventProcessor::readAndMergeRun() {
2085  principalCache_.merge(input_->runAuxiliary(), preg_);
2086  input_->readAndMergeRun(principalCache_.runPrincipalPtr());
2087  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2088  }
2089 
2090  int EventProcessor::readAndCacheLumi() {
2091  if (principalCache_.hasLumiPrincipal()) {
2093  << "EventProcessor::readAndCacheRun\n"
2094  << "Illegal attempt to insert lumi into cache\n"
2095  << "Contact a Framework Developer\n";
2096  }
2097  if (!principalCache_.hasRunPrincipal()) {
2099  << "EventProcessor::readAndCacheRun\n"
2100  << "Illegal attempt to insert lumi into cache\n"
2101  << "Run is invalid\n"
2102  << "Contact a Framework Developer\n";
2103  }
2104  principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
2105  principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
2106  return input_->luminosityBlock();
2107  }
2108 
2109  int EventProcessor::readAndMergeLumi() {
2110  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
2111  input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
2112  return input_->luminosityBlock();
2113  }
2114 
2115  void EventProcessor::writeRun(statemachine::Run const& run) {
2116  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
2117  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
2118  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
2119  }
2120 
2121  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
2122  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
2123  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
2124  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
2125  }
2126 
2127  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
2128  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2129  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2130  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
2131  }
2132 
2133  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
2134  principalCache_.deleteLumi(phid, run, lumi);
2135  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2136  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
2137  }
2138 
2139  void EventProcessor::readAndProcessEvent() {
2140  EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
2141  FDEBUG(1) << "\treadEvent\n";
2142  assert(pep != 0);
2143  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2144  assert(pep->luminosityBlockPrincipalPtrValid());
2145  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2146  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2147 
2148  IOVSyncValue ts(pep->id(), pep->time());
2149  espController_->eventSetupForInstance(ts);
2150  EventSetup const& es = esp_->eventSetup();
2151  {
2153  ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2154  schedule_->processOneOccurrence<Traits>(*pep, es);
2155  if(hasSubProcess()) {
2156  subProcess_->doEvent(*pep, ts);
2157  }
2158  }
2159 
2160  if(looper_) {
2161  bool randomAccess = input_->randomAccess();
2162  ProcessingController::ForwardState forwardState = input_->forwardState();
2163  ProcessingController::ReverseState reverseState = input_->reverseState();
2164  ProcessingController pc(forwardState, reverseState, randomAccess);
2165 
2166  EDLooperBase::Status status = EDLooperBase::kContinue;
2167  do {
2168  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2169 
2170  bool succeeded = true;
2171  if(randomAccess) {
2172  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2173  input_->skipEvents(-2);
2174  }
2175  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2176  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2177  }
2178  }
2179  pc.setLastOperationSucceeded(succeeded);
2180  } while(!pc.lastOperationSucceeded());
2181  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2182 
2183  }
2184 
2185  FDEBUG(1) << "\tprocessEvent\n";
2186  pep->clearEventPrincipal();
2187  }
2188 
2189  bool EventProcessor::shouldWeStop() const {
2190  FDEBUG(1) << "\tshouldWeStop\n";
2191  if(shouldWeStop_) return true;
2192  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2193  }
2194 
2195  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2196  exceptionMessageFiles_ = message;
2197  }
2198 
2199  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2200  exceptionMessageRuns_ = message;
2201  }
2202 
2203  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2204  exceptionMessageLumis_ = message;
2205  }
2206 
2207  bool EventProcessor::alreadyHandlingException() const {
2208  return alreadyHandlingException_;
2209  }
2210 
2211  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2212  if(iMachine.get() != 0) {
2213  if(!iMachine->terminated()) {
2214  forceLooperToEnd_ = true;
2215  iMachine->process_event(statemachine::Stop());
2216  forceLooperToEnd_ = false;
2217  }
2218  else {
2219  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2220  }
2221  if(iMachine->terminated()) {
2222  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2223  }
2224  iMachine.reset();
2225  }
2226  }
2227 }
list table
Definition: asciidump.py:386
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
T getParameter(std::string const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
string rep
Definition: cuy.py:1188
volatile int stop_count_
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
volatile bool id_set_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
RunNumber_t run() const
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:249
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:378
ActivityRegistry::PostProcessEvent postProcessEventSignal_
virtual std::string explainSelf() const
Definition: Exception.cc:146
ParameterSetID id() const
static ThreadSafeRegistry * instance()
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
std::unique_ptr< ActionTable const > act_table_
Definition: ScheduleItems.h:55
Timestamp const & endTime() const
int getSigNum()
EventID const & id() const
void call(boost::function< void(void)>)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
def pipe
Definition: pipe.py:5
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
LuminosityBlockNumber_t luminosityBlock() const
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:46
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:56
unsigned int LuminosityBlockNumber_t
Definition: EventID.h:31
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet)
Timestamp const & time() const
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
TransEntry table[]
ServiceToken serviceToken_
boost::mutex stop_lock_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
ParameterSet const & getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:54
void stdToEDM(std::exception const &e)
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:47
static InputSourceFactory * get()
volatile event_processor::State state_
Timestamp const & endTime() const
Definition: RunPrincipal.h:58
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::unique_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:53
virtual void endOfJob()
Definition: EDLooperBase.cc:74
void connectSigs(EventProcessor *ep)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::RunNumber_t runNumber() const
Definition: EPStates.h:50
boost::condition starter_
double a
Definition: hdecay.h:121
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:49
volatile bool shutdown_flag
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
Definition: EventRange.h:32
#define O_NONBLOCK
Definition: SysFile.h:21
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
ActivityRegistry::PreProcessEvent preProcessEventSignal_
collection_type & data()
Provide access to the contained collection.
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:52
tuple status
Definition: ntuplemaker.py:245
void validate(ParameterSet &pset, std::string const &moduleLabel) const
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
bool luminosityBlockPrincipalPtrValid()
ParameterSet const & registerIt()
static ComponentFactory< T > * get()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:56
boost::mutex usr2_lock
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
Definition: ScheduleItems.h:54
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)