CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
11 
36 
38 
46 
49 
57 
58 #include "MessageForSource.h"
59 #include "MessageForParent.h"
60 
61 #include "boost/bind.hpp"
62 #include "boost/thread/xtime.hpp"
63 
64 #include <exception>
65 #include <iomanip>
66 #include <iostream>
67 #include <utility>
68 #include <sstream>
69 
70 #include <sys/ipc.h>
71 #include <sys/msg.h>
72 
73 //Used for forking
74 #include <sys/types.h>
75 #include <sys/wait.h>
76 #include <sys/socket.h>
77 #include <sys/select.h>
78 #include <sys/fcntl.h>
79 #include <unistd.h>
80 
81 //Used for CPU affinity
82 #ifndef __APPLE__
83 #include <sched.h>
84 #endif
85 
86 //Needed for introspection
87 #include "Cintex/Cintex.h"
88 
89 namespace edm {
90 
91  namespace event_processor {
92 
93  class StateSentry {
94  public:
97  void succeeded() {success_ = true;}
98 
99  private:
101  bool success_;
102  };
103  }
104 
105  using namespace event_processor;
106 
107  namespace {
108  template <typename T>
109  class ScheduleSignalSentry {
110  public:
111  ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
112  a_(a), principal_(principal), es_(es), allowThrow_(false) {
113  if (a_) T::preScheduleSignal(a_, principal_);
114  }
115  ~ScheduleSignalSentry() noexcept(false) {
116  try {
117  if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
118  } catch(...) {
119  if( allowThrow_) {throw;}
120  }
121  }
122 
123  void allowThrow() {
124  allowThrow_ = true;
125  }
126  private:
127  // We own none of these resources.
128  ActivityRegistry* a_;
129  typename T::MyPrincipal* principal_;
130  EventSetup const* es_;
131  bool allowThrow_ =true;
132  };
133  }
134 
135  namespace {
136 
137  // the next two tables must be kept in sync with the state and
138  // message enums from the header
139 
140  char const* stateNames[] = {
141  "Init",
142  "JobReady",
143  "RunGiven",
144  "Running",
145  "Stopping",
146  "ShuttingDown",
147  "Done",
148  "JobEnded",
149  "Error",
150  "ErrorEnded",
151  "End",
152  "Invalid"
153  };
154 
155  char const* msgNames[] = {
156  "SetRun",
157  "Skip",
158  "RunAsync",
159  "Run(ID)",
160  "Run(count)",
161  "BeginJob",
162  "StopAsync",
163  "ShutdownAsync",
164  "EndJob",
165  "CountComplete",
166  "InputExhausted",
167  "StopSignal",
168  "ShutdownSignal",
169  "Finished",
170  "Any",
171  "dtor",
172  "Exception",
173  "Rewind"
174  };
175  }
176  // IMPORTANT NOTE:
177  // the mAny messages are special, they must appear last in the
178  // table if multiple entries for a CurrentState are present.
179  // the changeState function does not use the mAny yet!!!
180 
181  struct TransEntry {
184  State final;
185  };
186 
187  // we should use this information to initialize a two dimensional
188  // table of t[CurrentState][Message] = FinalState
189 
190  /*
191  the way this is current written, the async run can thread function
192  can return in the "JobReady" state - but not yet cleaned up. The
193  problem is that only when stop/shutdown async is called is the
194  thread cleaned up. But the stop/shudown async functions attempt
195  first to change the state using messages that are not valid in
196  "JobReady" state.
197 
198  I think most of the problems can be solved by using two states
199  for "running": RunningS and RunningA (sync/async). The problems
200  seems to be the all the transitions out of running for both
201  modes of operation. The other solution might be to only go to
202  "Stopping" from Running, and use the return code from "run_p" to
203  set the final state. If this is used, then in the sync mode the
204  "Stopping" state would be momentary.
205 
206  */
207 
209  // CurrentState Message FinalState
210  // -----------------------------------------
211  { sInit, mException, sError },
212  { sInit, mBeginJob, sJobReady },
216  { sJobReady, mSkip, sRunning },
217  { sJobReady, mRunID, sRunning },
221  { sJobReady, mDtor, sEnd }, // should this be allowed?
222 
226 
232  { sRunning, mException, sError },
236  { sRunning, mCountComplete, sStopping }, // sJobReady
237  { sRunning, mInputExhausted, sStopping }, // sJobReady
238 
239  { sStopping, mInputRewind, sRunning }, // The looper needs this
244  { sStopping, mStopAsync, sStopping }, // stay
245  { sStopping, mInputExhausted, sStopping }, // stay
246  //{ sStopping, mAny, sJobReady }, // <- ??????
249  { sShuttingDown, mCountComplete, sDone }, // needed?
250  { sShuttingDown, mInputExhausted, sDone }, // needed?
252  //{ sShuttingDown, mShutdownAsync, sShuttingDown }, // only one at
253  //{ sShuttingDown, mStopAsync, sShuttingDown }, // a time
254  //{ sShuttingDown, mAny, sDone }, // <- ??????
255  { sDone, mEndJob, sJobEnded },
256  { sDone, mException, sError },
257  { sJobEnded, mDtor, sEnd },
259  { sError, mEndJob, sError }, // funny one here
260  { sError, mDtor, sError }, // funny one here
261  { sInit, mDtor, sEnd }, // for StorM dummy EP
262  { sStopping, mShutdownAsync, sShuttingDown }, // For FUEP tests
263  { sInvalid, mAny, sInvalid }
264  };
265 
266 
267  // Note: many of the messages generate the mBeginJob message first
268  // mRunID, mRunCount, mSetRun
269 
270  // ---------------------------------------------------------------
271  std::unique_ptr<InputSource>
273  CommonParams const& common,
274  ProductRegistry& preg,
275  boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
276  boost::shared_ptr<ActivityRegistry> areg,
277  boost::shared_ptr<ProcessConfiguration const> processConfiguration) {
278  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
279  if(main_input == 0) {
281  << "There must be exactly one source in the configuration.\n"
282  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
283  }
284 
285  std::string modtype(main_input->getParameter<std::string>("@module_type"));
286 
287  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
289  ConfigurationDescriptions descriptions(filler->baseType());
290  filler->fill(descriptions);
291 
292  try {
293  try {
294  descriptions.validate(*main_input, std::string("source"));
295  }
296  catch (cms::Exception& e) { throw; }
297  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
298  catch (std::exception& e) { convertException::stdToEDM(e); }
300  catch (char const* c) { convertException::charPtrToEDM(c); }
301  catch (...) { convertException::unknownToEDM(); }
302  }
303  catch (cms::Exception & iException) {
304  std::ostringstream ost;
305  ost << "Validating configuration of input source of type " << modtype;
306  iException.addContext(ost.str());
307  throw;
308  }
309 
310  main_input->registerIt();
311 
312  // Fill in "ModuleDescription", in case the input source produces
313  // any EDproducts, which would be registered in the ProductRegistry.
314  // Also fill in the process history item for this process.
315  // There is no module label for the unnamed input source, so
316  // just use "source".
317  // Only the tracked parameters belong in the process configuration.
318  ModuleDescription md(main_input->id(),
319  main_input->getParameter<std::string>("@module_type"),
320  "source",
321  processConfiguration.get());
322 
323  InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_);
324  areg->preSourceConstructionSignal_(md);
325  std::unique_ptr<InputSource> input;
326  try {
327  try {
328  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
329  }
330  catch (cms::Exception& e) { throw; }
331  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
332  catch (std::exception& e) { convertException::stdToEDM(e); }
334  catch (char const* c) { convertException::charPtrToEDM(c); }
335  catch (...) { convertException::unknownToEDM(); }
336  }
337  catch (cms::Exception& iException) {
338  areg->postSourceConstructionSignal_(md);
339  std::ostringstream ost;
340  ost << "Constructing input source of type " << modtype;
341  iException.addContext(ost.str());
342  throw;
343  }
344  areg->postSourceConstructionSignal_(md);
345  return input;
346  }
347 
348  // ---------------------------------------------------------------
349  boost::shared_ptr<EDLooperBase>
352  ParameterSet& params) {
353  boost::shared_ptr<EDLooperBase> vLooper;
354 
355  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
356 
357  if(loopers.size() == 0) {
358  return vLooper;
359  }
360 
361  assert(1 == loopers.size());
362 
363  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
364  itName != itNameEnd;
365  ++itName) {
366 
367  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
368  providerPSet->registerIt();
369  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
370  cp,
371  *providerPSet);
372  }
373  return vLooper;
374 
375  }
376 
377  // ---------------------------------------------------------------
379  ServiceToken const& iToken,
381  std::vector<std::string> const& defaultServices,
382  std::vector<std::string> const& forcedServices) :
383  preProcessEventSignal_(),
384  postProcessEventSignal_(),
385  actReg_(),
386  preg_(),
387  branchIDListHelper_(),
388  serviceToken_(),
389  input_(),
390  espController_(new eventsetup::EventSetupsController),
391  esp_(),
392  act_table_(),
393  processConfiguration_(),
394  schedule_(),
395  subProcess_(),
396  historyAppender_(new HistoryAppender),
397  state_(sInit),
398  event_loop_(),
399  state_lock_(),
400  stop_lock_(),
401  stopper_(),
402  starter_(),
403  stop_count_(-1),
404  last_rc_(epSuccess),
405  last_error_text_(),
406  id_set_(false),
407  event_loop_id_(),
408  my_sig_num_(getSigNum()),
409  fb_(),
410  looper_(),
411  principalCache_(),
412  shouldWeStop_(false),
413  stateMachineWasInErrorState_(false),
414  fileMode_(),
415  emptyRunLumiMode_(),
416  exceptionMessageFiles_(),
417  exceptionMessageRuns_(),
418  exceptionMessageLumis_(),
419  alreadyHandlingException_(false),
420  forceLooperToEnd_(false),
421  looperBeginJobRun_(false),
422  forceESCacheClearOnNewRun_(false),
423  numberOfForkedChildren_(0),
424  numberOfSequentialEventsPerChild_(1),
425  setCpuAffinity_(false),
426  eventSetupDataToExcludeFromPrefetching_() {
427  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
428  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
429  processDesc->addServices(defaultServices, forcedServices);
430  init(processDesc, iToken, iLegacy);
431  }
432 
434  std::vector<std::string> const& defaultServices,
435  std::vector<std::string> const& forcedServices) :
436  preProcessEventSignal_(),
437  postProcessEventSignal_(),
438  actReg_(),
439  preg_(),
440  branchIDListHelper_(),
441  serviceToken_(),
442  input_(),
443  espController_(new eventsetup::EventSetupsController),
444  esp_(),
445  act_table_(),
446  processConfiguration_(),
447  schedule_(),
448  subProcess_(),
449  historyAppender_(new HistoryAppender),
450  state_(sInit),
451  event_loop_(),
452  state_lock_(),
453  stop_lock_(),
454  stopper_(),
455  starter_(),
456  stop_count_(-1),
457  last_rc_(epSuccess),
458  last_error_text_(),
459  id_set_(false),
460  event_loop_id_(),
461  my_sig_num_(getSigNum()),
462  fb_(),
463  looper_(),
464  principalCache_(),
465  shouldWeStop_(false),
466  stateMachineWasInErrorState_(false),
467  fileMode_(),
468  emptyRunLumiMode_(),
469  exceptionMessageFiles_(),
470  exceptionMessageRuns_(),
471  exceptionMessageLumis_(),
472  alreadyHandlingException_(false),
473  forceLooperToEnd_(false),
474  looperBeginJobRun_(false),
475  forceESCacheClearOnNewRun_(false),
476  numberOfForkedChildren_(0),
477  numberOfSequentialEventsPerChild_(1),
478  setCpuAffinity_(false),
479  eventSetupDataToExcludeFromPrefetching_() {
480  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
481  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
482  processDesc->addServices(defaultServices, forcedServices);
484  }
485 
486  EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
487  ServiceToken const& token,
489  preProcessEventSignal_(),
490  postProcessEventSignal_(),
491  actReg_(),
492  preg_(),
493  branchIDListHelper_(),
494  serviceToken_(),
495  input_(),
496  espController_(new eventsetup::EventSetupsController),
497  esp_(),
498  act_table_(),
499  processConfiguration_(),
500  schedule_(),
501  subProcess_(),
502  historyAppender_(new HistoryAppender),
503  state_(sInit),
504  event_loop_(),
505  state_lock_(),
506  stop_lock_(),
507  stopper_(),
508  starter_(),
509  stop_count_(-1),
510  last_rc_(epSuccess),
511  last_error_text_(),
512  id_set_(false),
513  event_loop_id_(),
514  my_sig_num_(getSigNum()),
515  fb_(),
516  looper_(),
517  principalCache_(),
518  shouldWeStop_(false),
519  stateMachineWasInErrorState_(false),
520  fileMode_(),
521  emptyRunLumiMode_(),
522  exceptionMessageFiles_(),
523  exceptionMessageRuns_(),
524  exceptionMessageLumis_(),
525  alreadyHandlingException_(false),
526  forceLooperToEnd_(false),
527  looperBeginJobRun_(false),
528  forceESCacheClearOnNewRun_(false),
529  numberOfForkedChildren_(0),
530  numberOfSequentialEventsPerChild_(1),
531  setCpuAffinity_(false),
532  eventSetupDataToExcludeFromPrefetching_() {
533  init(processDesc, token, legacy);
534  }
535 
536 
538  preProcessEventSignal_(),
539  postProcessEventSignal_(),
540  actReg_(),
541  preg_(),
542  branchIDListHelper_(),
543  serviceToken_(),
544  input_(),
545  espController_(new eventsetup::EventSetupsController),
546  esp_(),
547  act_table_(),
548  processConfiguration_(),
549  schedule_(),
550  subProcess_(),
551  historyAppender_(new HistoryAppender),
552  state_(sInit),
553  event_loop_(),
554  state_lock_(),
555  stop_lock_(),
556  stopper_(),
557  starter_(),
558  stop_count_(-1),
559  last_rc_(epSuccess),
560  last_error_text_(),
561  id_set_(false),
562  event_loop_id_(),
563  my_sig_num_(getSigNum()),
564  fb_(),
565  looper_(),
566  principalCache_(),
567  shouldWeStop_(false),
568  stateMachineWasInErrorState_(false),
569  fileMode_(),
570  emptyRunLumiMode_(),
571  exceptionMessageFiles_(),
572  exceptionMessageRuns_(),
573  exceptionMessageLumis_(),
574  alreadyHandlingException_(false),
575  forceLooperToEnd_(false),
576  looperBeginJobRun_(false),
577  forceESCacheClearOnNewRun_(false),
578  numberOfForkedChildren_(0),
579  numberOfSequentialEventsPerChild_(1),
580  setCpuAffinity_(false),
581  eventSetupDataToExcludeFromPrefetching_() {
582  if(isPython) {
583  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
584  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
586  }
587  else {
588  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
590  }
591  }
592 
593  void
594  EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
595  ServiceToken const& iToken,
597 
598  //std::cerr << processDesc->dump() << std::endl;
599 
600  ROOT::Cintex::Cintex::Enable();
601 
602  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
603  //std::cerr << parameterSet->dump() << std::endl;
604 
605  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
606  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
607 
608  // Now set some parameters specific to the main process.
609  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
610  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
611  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
612  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
613  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
614  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
615  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
616  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
617  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
618  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
619  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
620  itPS != itPSEnd;
621  ++itPS) {
622  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
623  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
624  itPS->getUntrackedParameter<std::string>("label", "")));
625  }
626  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
627 
628  // Now do general initialization
629  ScheduleItems items;
630 
631  //initialize the services
632  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
633  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
634  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
635 
636  //make the services available
638 
639  // intialize miscellaneous items
640  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
641 
642  // intialize the event setup provider
643  esp_ = espController_->makeProvider(*parameterSet);
644 
645  // initialize the looper, if any
646  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
647  if(looper_) {
648  looper_->setActionTable(items.act_table_.get());
649  looper_->attachTo(*items.actReg_);
650  }
651 
652  // initialize the input source
653  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
654 
655  // intialize the Schedule
656  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
657 
658  // set the data members
659  act_table_ = std::move(items.act_table_);
660  actReg_ = items.actReg_;
661  preg_.reset(items.preg_.release());
664 
665  FDEBUG(2) << parameterSet << std::endl;
666  connectSigs(this);
667 
668  // Reusable event principal
669  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
671 
672  // initialize the subprocess, if there is one
673  if(subProcessParameterSet) {
674  subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
675  }
676  }
677 
679  // Make the services available while everything is being deleted.
680  ServiceToken token = getToken();
681  ServiceRegistry::Operate op(token);
682  try {
684  }
685  catch(cms::Exception& e) {
686  LogError("System")
687  << e.explainSelf() << "\n";
688  }
689 
690  // manually destroy all these thing that may need the services around
691  espController_.reset();
692  subProcess_.reset();
693  esp_.reset();
694  schedule_.reset();
695  input_.reset();
696  looper_.reset();
697  actReg_.reset();
698 
699  pset::Registry* psetRegistry = pset::Registry::instance();
700  psetRegistry->data().clear();
701  psetRegistry->extra().setID(ParameterSetID());
702 
704  ParentageRegistry::instance()->data().clear();
707  }
708 
709  void
711  if(state_ != sInit) return;
712  bk::beginJob();
713  // can only be run if in the initial state
715 
716  // StateSentry toerror(this); // should we add this ?
717  //make the services available
719 
720  //NOTE: This implementation assumes 'Job' means one call
721  // the EventProcessor::run
722  // If it really means once per 'application' then this code will
723  // have to be changed.
724  // Also have to deal with case where have 'run' then new Module
725  // added and do 'run'
726  // again. In that case the newly added Module needs its 'beginJob'
727  // to be called.
728 
729  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
730  // For now we delay calling beginOfJob until first beginOfRun
731  //if(looper_) {
732  // looper_->beginOfJob(es);
733  //}
734  try {
735  try {
736  input_->doBeginJob();
737  }
738  catch (cms::Exception& e) { throw; }
739  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
740  catch (std::exception& e) { convertException::stdToEDM(e); }
742  catch(char const* c) { convertException::charPtrToEDM(c); }
743  catch (...) { convertException::unknownToEDM(); }
744  }
745  catch(cms::Exception& ex) {
746  ex.addContext("Calling beginJob for the source");
747  throw;
748  }
749  schedule_->beginJob(*preg_);
750  // toerror.succeeded(); // should we add this?
751  if(hasSubProcess()) subProcess_->doBeginJob();
752  actReg_->postBeginJobSignal_();
753  }
754 
755  void
757  // Collects exceptions, so we don't throw before all operations are performed.
758  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
759 
760  // only allowed to run if state is sIdle, sJobReady, sRunGiven
761  c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
762 
763  //make the services available
765 
766  schedule_->endJob(c);
767  if(hasSubProcess()) {
768  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
769  }
770  c.call(boost::bind(&InputSource::doEndJob, input_.get()));
771  if(looper_) {
772  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
773  }
774  auto actReg = actReg_.get();
775  c.call([actReg](){actReg->postEndJobSignal_();});
776  if(c.hasThrown()) {
777  c.rethrow();
778  }
779  }
780 
783  return serviceToken_;
784  }
785 
786  //Setup signal handler to listen for when forked children stop
787  namespace {
788  //These are volatile since the compiler can not be allowed to optimize them
789  // since they can be modified in the signaller handler
790  volatile bool child_failed = false;
791  volatile unsigned int num_children_done = 0;
792  volatile int child_fail_exit_status = 0;
793  volatile int child_fail_signal = 0;
794 
795  //NOTE: We setup the signal handler to run in the main thread which
796  // is also the same thread that then reads the above values
797 
798  extern "C" {
799  void ep_sigchld(int, siginfo_t*, void*) {
800  //printf("in sigchld\n");
801  //FDEBUG(1) << "in sigchld handler\n";
802  int stat_loc;
803  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
804  while(0<p) {
805  //printf(" looping\n");
806  if(WIFEXITED(stat_loc)) {
807  ++num_children_done;
808  if(0 != WEXITSTATUS(stat_loc)) {
809  child_fail_exit_status = WEXITSTATUS(stat_loc);
810  child_failed = true;
811  }
812  }
813  if(WIFSIGNALED(stat_loc)) {
814  ++num_children_done;
815  child_fail_signal = WTERMSIG(stat_loc);
816  child_failed = true;
817  }
818  p = waitpid(-1, &stat_loc, WNOHANG);
819  }
820  }
821  }
822 
823  }
824 
825  enum {
830  };
831 
832  namespace {
833  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
834  unsigned int n = 0;
835  while(numberOfChildren != 0) {
836  ++n;
837  numberOfChildren /= 10;
838  }
839  if(n == 0) {
840  n = 3; // Protect against zero numberOfChildren
841  }
842  return n;
843  }
844 
845  /*This class embodied the thread which is used to listen to the forked children and
846  then tell them which events they should process */
847  class MessageSenderToSource {
848  public:
849  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
850  void operator()();
851 
852  private:
853  const std::vector<int>& m_childrenPipes;
854  long const m_nEventsToProcess;
855  fd_set m_socketSet;
856  unsigned int m_aliveChildren;
857  int m_maxFd;
858  };
859 
860  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
861  std::vector<int> const& childrenPipes,
862  long iNEventsToProcess):
863  m_childrenPipes(childrenPipes),
864  m_nEventsToProcess(iNEventsToProcess),
865  m_aliveChildren(childrenSockets.size()),
866  m_maxFd(0)
867  {
868  FD_ZERO(&m_socketSet);
869  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
870  it != itEnd; it++) {
871  FD_SET(*it, &m_socketSet);
872  if (*it > m_maxFd) {
873  m_maxFd = *it;
874  }
875  }
876  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
877  it != itEnd; ++it) {
878  FD_SET(*it, &m_socketSet);
879  if (*it > m_maxFd) {
880  m_maxFd = *it;
881  }
882  }
883  m_maxFd++; // select reads [0,m_maxFd).
884  }
885 
886  /* This function is the heart of the communication between parent and child.
887  * When ready for more data, the child (see MessageReceiverForSource) requests
888  * data through a AF_UNIX socket message. The parent will then assign the next
889  * chunk of data by sending a message back.
890  *
891  * Additionally, this function also monitors the read-side of the pipe fd from the child.
892  * If the child dies unexpectedly, the pipe will be selected as ready for read and
893  * will return EPIPE when read from. Further, if the child thinks the parent has died
894  * (defined as waiting more than 1s for a response), it will write a single byte to
895  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
896  * If still alive, the parent will read the byte and ignore it.
897  *
898  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
899  * handler can distinguish between success and failure cases.
900  */
901 
902  void
903  MessageSenderToSource::operator()() {
904  multicore::MessageForParent childMsg;
905  LogInfo("ForkingController") << "I am controller";
906  //this is the master and therefore the controller
907 
908  multicore::MessageForSource sndmsg;
909  sndmsg.startIndex = 0;
910  sndmsg.nIndices = m_nEventsToProcess;
911  do {
912 
913  fd_set readSockets, errorSockets;
914  // Wait for a request from a child for events.
915  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
916  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
917  // Note that we don't timeout; may be reconsidered in the future.
918  ssize_t rc;
919  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
920  if (rc < 0) {
921  std::cerr << "select failed; should be impossible due to preconditions.\n";
922  abort();
923  break;
924  }
925 
926  // Read the message from the child.
927  for (int idx=0; idx<m_maxFd; idx++) {
928 
929  // Handle errors
930  if (FD_ISSET(idx, &errorSockets)) {
931  LogInfo("ForkingController") << "Error on socket " << idx;
932  FD_CLR(idx, &m_socketSet);
933  close(idx);
934  // See if it was the watchdog pipe that died.
935  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
936  if (*it == idx) {
937  m_aliveChildren--;
938  }
939  }
940  continue;
941  }
942 
943  if (!FD_ISSET(idx, &readSockets)) {
944  continue;
945  }
946 
947  // See if this FD is a child watchdog pipe. If so, read from it to prevent
948  // writes from blocking.
949  bool is_pipe = false;
950  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
951  if (*it == idx) {
952  is_pipe = true;
953  char buf;
954  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
955  if (rc <= 0) {
956  m_aliveChildren--;
957  FD_CLR(idx, &m_socketSet);
958  close(idx);
959  }
960  }
961  }
962 
963  // Only execute this block if the FD is a socket for sending the child work.
964  if (!is_pipe) {
965  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
966  if (rc < 0) {
967  FD_CLR(idx, &m_socketSet);
968  close(idx);
969  continue;
970  }
971 
972  // Tell the child what events to process.
973  // If 'send' fails, then the child process has failed (any other possibilities are
974  // eliminated because we are using fixed-size messages with Unix datagram sockets).
975  // Thus, the SIGCHLD handler will fire and set child_fail = true.
976  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
977  if (rc < 0) {
978  FD_CLR(idx, &m_socketSet);
979  close(idx);
980  continue;
981  }
982  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
983  sndmsg.startIndex += sndmsg.nIndices;
984  }
985  }
986 
987  } while (m_aliveChildren > 0);
988 
989  return;
990  }
991 
992  }
993 
994 
995  void EventProcessor::possiblyContinueAfterForkChildFailure() {
996  if(child_failed && continueAfterChildFailure_) {
997  if (child_fail_signal) {
998  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
999  child_fail_signal=0;
1000  } else if (child_fail_exit_status) {
1001  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1002  child_fail_exit_status=0;
1003  } else {
1004  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1005  }
1006  child_failed =false;
1007  }
1008  }
1009 
1010  bool
1011  EventProcessor::forkProcess(std::string const& jobReportFile) {
1012 
1013  if(0 == numberOfForkedChildren_) {return true;}
1014  assert(0<numberOfForkedChildren_);
1015  //do what we want done in common
1016  {
1017  beginJob(); //make sure this was run
1018  // make the services available
1019  ServiceRegistry::Operate operate(serviceToken_);
1020 
1021  InputSource::ItemType itemType;
1022  itemType = input_->nextItemType();
1023 
1024  assert(itemType == InputSource::IsFile);
1025  {
1026  readFile();
1027  }
1028  itemType = input_->nextItemType();
1029  assert(itemType == InputSource::IsRun);
1030 
1031  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
1032  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
1033  input_->runAuxiliary()->beginTime());
1034  espController_->eventSetupForInstance(ts);
1035  EventSetup const& es = esp_->eventSetup();
1036 
1037  //now get all the data available in the EventSetup
1038  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1039  es.fillAvailableRecordKeys(recordKeys);
1040  std::vector<eventsetup::DataKey> dataKeys;
1041  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1042  itKey != itEnd;
1043  ++itKey) {
1044  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
1045  //see if this is on our exclusion list
1046  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
1047  ExcludedData const* excludedData(nullptr);
1048  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1049  excludedData = &(itExcludeRec->second);
1050  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
1051  //skip all items in this record
1052  continue;
1053  }
1054  }
1055  if(0 != recordPtr) {
1056  dataKeys.clear();
1057  recordPtr->fillRegisteredDataKeys(dataKeys);
1058  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1059  itDataKey != itDataKeyEnd;
1060  ++itDataKey) {
1061  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1062  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1063  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1064  continue;
1065  }
1066  try {
1067  recordPtr->doGet(*itDataKey);
1068  } catch(cms::Exception& e) {
1069  LogWarning("ForkingEventSetupPreFetching") << e.what();
1070  }
1071  }
1072  }
1073  }
1074  }
1075  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
1076  {
1077  // make the services available
1078  ServiceRegistry::Operate operate(serviceToken_);
1079  Service<JobReport> jobReport;
1080  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1081 
1082  //Now actually do the forking
1083  actReg_->preForkReleaseResourcesSignal_();
1084  input_->doPreForkReleaseResources();
1085  schedule_->preForkReleaseResources();
1086  }
1087  installCustomHandler(SIGCHLD, ep_sigchld);
1088 
1089 
1090  unsigned int childIndex = 0;
1091  unsigned int const kMaxChildren = numberOfForkedChildren_;
1092  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1093  std::vector<pid_t> childrenIds;
1094  childrenIds.reserve(kMaxChildren);
1095  std::vector<int> childrenSockets;
1096  childrenSockets.reserve(kMaxChildren);
1097  std::vector<int> childrenPipes;
1098  childrenPipes.reserve(kMaxChildren);
1099  std::vector<int> childrenSocketsCopy;
1100  childrenSocketsCopy.reserve(kMaxChildren);
1101  std::vector<int> childrenPipesCopy;
1102  childrenPipesCopy.reserve(kMaxChildren);
1103  int pipes[] {0, 0};
1104 
1105  {
1106  // make the services available
1107  ServiceRegistry::Operate operate(serviceToken_);
1108  Service<JobReport> jobReport;
1109  int sockets[2], fd_flags;
1110  for(; childIndex < kMaxChildren; ++childIndex) {
1111  // Create a UNIX_DGRAM socket pair
1112  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1113  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1114  exit(EXIT_FAILURE);
1115  }
1116  if (pipe(pipes)) {
1117  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1118  exit(EXIT_FAILURE);
1119  }
1120  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1121  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1122  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1123  exit(EXIT_FAILURE);
1124  }
1125  // Mark socket as non-block. Child must be careful to do select prior
1126  // to reading from socket.
1127  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1128  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1129  exit(EXIT_FAILURE);
1130  }
1131  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1132  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1133  exit(EXIT_FAILURE);
1134  }
1135  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1136  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1137  exit(EXIT_FAILURE);
1138  }
1139  // Linux man page notes there are some edge cases where reading from a
1140  // fd can block, even after a select.
1141  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1142  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1143  exit(EXIT_FAILURE);
1144  }
1145  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1146  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1147  exit(EXIT_FAILURE);
1148  }
1149 
1150  childrenPipesCopy = childrenPipes;
1151  childrenSocketsCopy = childrenSockets;
1152 
1153  pid_t value = fork();
1154  if(value == 0) {
1155  // Close the parent's side of the socket and pipe which will talk to us.
1156  close(pipes[0]);
1157  close(sockets[0]);
1158  // Close our copies of the parent's other communication pipes.
1159  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1160  close(*it);
1161  }
1162  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1163  close(*it);
1164  }
1165 
1166  // this is the child process, redirect stdout and stderr to a log file
1167  fflush(stdout);
1168  fflush(stderr);
1169  std::stringstream stout;
1170  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1171  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1172  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1173  }
1174  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1175  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1176  }
1177 
1178  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1179  if(setCpuAffinity_) {
1180  // CPU affinity is handled differently on macosx.
1181  // We disable it and print a message until someone reads:
1182  //
1183  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1184  //
1185  // and implements it.
1186 #ifdef __APPLE__
1187  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1188 #else
1189  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1190  cpu_set_t mask;
1191  CPU_ZERO(&mask);
1192  CPU_SET(childIndex, &mask);
1193  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1194  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1195  exit(-1);
1196  }
1197 #endif
1198  }
1199  break;
1200  } else {
1201  //this is the parent
1202  close(pipes[1]);
1203  close(sockets[1]);
1204  }
1205  if(value < 0) {
1206  LogError("ForkingChild") << "failed to create a child";
1207  exit(-1);
1208  }
1209  childrenIds.push_back(value);
1210  childrenSockets.push_back(sockets[0]);
1211  childrenPipes.push_back(pipes[0]);
1212  }
1213 
1214  if(childIndex < kMaxChildren) {
1215  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1216  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1217 
1218  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1219  input_->doPostForkReacquireResources(receiver);
1220  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1221  //NOTE: sources have to reset themselves by listening to the post fork message
1222  //rewindInput();
1223  return true;
1224  }
1225  jobReport->parentAfterFork(jobReportFile);
1226  }
1227 
1228  //this is the original, which is now the master for all the children
1229 
1230  //Need to wait for signals from the children or externally
1231  // To wait we must
1232  // 1) block the signals we want to wait on so we do not have a race condition
1233  // 2) check that we haven't already meet our ending criteria
1234  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1235  sigset_t blockingSigSet;
1236  sigset_t unblockingSigSet;
1237  sigset_t oldSigSet;
1238  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1239  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1240  sigaddset(&blockingSigSet, SIGCHLD);
1241  sigaddset(&blockingSigSet, SIGUSR2);
1242  sigaddset(&blockingSigSet, SIGINT);
1243  sigdelset(&unblockingSigSet, SIGCHLD);
1244  sigdelset(&unblockingSigSet, SIGUSR2);
1245  sigdelset(&unblockingSigSet, SIGINT);
1246  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1247 
1248  // If there are too many fd's (unlikely, but possible) for select, denote this
1249  // because the sender will fail.
1250  bool too_many_fds = false;
1251  if (pipes[1]+1 > FD_SETSIZE) {
1252  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1253  too_many_fds = true;
1254  }
1255 
1256  //create a thread that sends the units of work to workers
1257  // we create it after all signals were blocked so that this
1258  // thread is never interupted by a signal
1259  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1260  boost::thread senderThread(sender);
1261 
1262  if(not too_many_fds) {
1263  //NOTE: a child could have failed before we got here and even after this call
1264  // which is why the 'if' is conditional on continueAfterChildFailure_
1265  possiblyContinueAfterForkChildFailure();
1266  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1267  sigsuspend(&unblockingSigSet);
1268  possiblyContinueAfterForkChildFailure();
1269  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1270  }
1271  }
1272  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1273 
1274  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1275  if(child_failed) {
1276  LogError("ForkingStopping") << "child failed";
1277  }
1278  if(shutdown_flag) {
1279  LogSystem("ForkingStopping") << "asked to shutdown";
1280  }
1281 
1282  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1283  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1284  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1285  it != itEnd; ++it) {
1286  /* int result = */ kill(*it, SIGUSR2);
1287  }
1288  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1289  while(num_children_done != kMaxChildren) {
1290  sigsuspend(&unblockingSigSet);
1291  }
1292  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1293  }
1294  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1295  senderThread.join();
1296  if(child_failed && !continueAfterChildFailure_) {
1297  if (child_fail_signal) {
1298  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1299  } else if (child_fail_exit_status) {
1300  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1301  } else {
1302  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1303  }
1304  }
1305  if(too_many_fds) {
1306  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1307  }
1308  return false;
1309  }
1310 
1311  void
1312  EventProcessor::connectSigs(EventProcessor* ep) {
1313  // When the FwkImpl signals are given, pass them to the
1314  // appropriate EventProcessor signals so that the outside world
1315  // can see the signal.
1316  actReg_->preProcessEventSignal_.connect(std::cref(ep->preProcessEventSignal_));
1317  actReg_->postProcessEventSignal_.connect(std::cref(ep->postProcessEventSignal_));
1318  }
1319 
1320  std::vector<ModuleDescription const*>
1321  EventProcessor::getAllModuleDescriptions() const {
1322  return schedule_->getAllModuleDescriptions();
1323  }
1324 
1325  int
1326  EventProcessor::totalEvents() const {
1327  return schedule_->totalEvents();
1328  }
1329 
1330  int
1331  EventProcessor::totalEventsPassed() const {
1332  return schedule_->totalEventsPassed();
1333  }
1334 
1335  int
1336  EventProcessor::totalEventsFailed() const {
1337  return schedule_->totalEventsFailed();
1338  }
1339 
1340  void
1341  EventProcessor::enableEndPaths(bool active) {
1342  schedule_->enableEndPaths(active);
1343  }
1344 
1345  bool
1346  EventProcessor::endPathsEnabled() const {
1347  return schedule_->endPathsEnabled();
1348  }
1349 
1350  void
1351  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1352  schedule_->getTriggerReport(rep);
1353  }
1354 
1355  void
1356  EventProcessor::clearCounters() {
1357  schedule_->clearCounters();
1358  }
1359 
1360  char const* EventProcessor::currentStateName() const {
1361  return stateName(getState());
1362  }
1363 
1364  char const* EventProcessor::stateName(State s) const {
1365  return stateNames[s];
1366  }
1367 
1368  char const* EventProcessor::msgName(Msg m) const {
1369  return msgNames[m];
1370  }
1371 
1372  State EventProcessor::getState() const {
1373  return state_;
1374  }
1375 
1376  EventProcessor::StatusCode EventProcessor::statusAsync() const {
1377  // the thread will record exception/error status in the event processor
1378  // for us to look at and report here
1379  return last_rc_;
1380  }
1381 
1382  void
1383  EventProcessor::setRunNumber(RunNumber_t runNumber) {
1384  if(runNumber == 0) {
1385  runNumber = 1;
1386  LogWarning("Invalid Run")
1387  << "EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
1388  << "Run number was set to 1 instead\n";
1389  }
1390 
1391  // inside of beginJob there is a check to see if it has been called before
1392  beginJob();
1393  changeState(mSetRun);
1394 
1395  // interface not correct yet
1396  input_->setRunNumber(runNumber);
1397  }
1398 
1399  void
1400  EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
1401  // inside of beginJob there is a check to see if it has been called before
1402  beginJob();
1403  changeState(mSetRun);
1404 
1405  // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
1406  //input_->declareRunNumber(runNumber);
1407  }
1408 
1410  EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
1411  bool rc = true;
1412  boost::xtime timeout;
1413 
1414 #if BOOST_VERSION >= 105000
1415  boost::xtime_get(&timeout, boost::TIME_UTC_);
1416 #else
1417  boost::xtime_get(&timeout, boost::TIME_UTC);
1418 #endif
1419  timeout.sec += timeout_seconds;
1420 
1421  // make sure to include a timeout here so we don't wait forever
1422  // I suspect there are still timing issues with thread startup
1423  // and the setting of the various control variables (stop_count, id_set)
1424  {
1425  boost::mutex::scoped_lock sl(stop_lock_);
1426 
1427  // look here - if runAsync not active, just return the last return code
1428  if(stop_count_ < 0) return last_rc_;
1429 
1430  if(timeout_seconds == 0) {
1431  while(stop_count_ == 0) stopper_.wait(sl);
1432  } else {
1433  while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
1434  }
1435 
1436  if(rc == false) {
1437  // timeout occurred
1438  // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
1439  // this is a temporary hack until we get the input source
1440  // upgraded to allow blocking input sources to be unblocked
1441 
1442  // the next line is dangerous and causes all sorts of trouble
1443  if(id_set_) pthread_cancel(event_loop_id_);
1444 
1445  // we will not do anything yet
1446  LogWarning("timeout")
1447  << "An asynchronous request was made to shut down "
1448  << "the event loop "
1449  << "and the event loop did not shutdown after "
1450  << timeout_seconds << " seconds\n";
1451  } else {
1452  event_loop_->join();
1453  event_loop_.reset();
1454  id_set_ = false;
1455  stop_count_ = -1;
1456  }
1457  }
1458  return rc == false ? epTimedOut : last_rc_;
1459  }
1460 
1462  EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
1463  StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1464  if(rc != epTimedOut) changeState(mCountComplete);
1465  else errorState();
1466  return rc;
1467  }
1468 
1469 
1470  EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
1471  changeState(mStopAsync);
1472  StatusCode rc = waitForAsyncCompletion(secs);
1473  if(rc != epTimedOut) changeState(mFinished);
1474  else errorState();
1475  return rc;
1476  }
1477 
1478  EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
1479  changeState(mShutdownAsync);
1480  StatusCode rc = waitForAsyncCompletion(secs);
1481  if(rc != epTimedOut) changeState(mFinished);
1482  else errorState();
1483  return rc;
1484  }
1485 
1486  void EventProcessor::errorState() {
1487  state_ = sError;
1488  }
1489 
1490  // next function irrelevant now
1491  EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
1492  // make sure to include a timeout here so we don't wait forever
1493  // I suspect there are still timing issues with thread startup
1494  // and the setting of the various control variables (stop_count, id_set)
1495  changeState(m);
1496  return waitForAsyncCompletion(60*2);
1497  }
1498 
1499  void EventProcessor::changeState(Msg msg) {
1500  // most likely need to serialize access to this routine
1501 
1502  boost::mutex::scoped_lock sl(state_lock_);
1503  State curr = state_;
1504  int rc;
1505  // found if(not end of table) and
1506  // (state == table.state && (msg == table.message || msg == any))
1507  for(rc = 0;
1508  table[rc].current != sInvalid &&
1509  (curr != table[rc].current ||
1510  (curr == table[rc].current &&
1511  msg != table[rc].message && table[rc].message != mAny));
1512  ++rc);
1513 
1514  if(table[rc].current == sInvalid)
1515  throw cms::Exception("BadState")
1516  << "A member function of EventProcessor has been called in an"
1517  << " inappropriate order.\n"
1518  << "Bad transition from " << stateName(curr) << " "
1519  << "using message " << msgName(msg) << "\n"
1520  << "No where to go from here.\n";
1521 
1522  FDEBUG(1) << "changeState: current=" << stateName(curr)
1523  << ", message=" << msgName(msg)
1524  << " -> new=" << stateName(table[rc].final) << "\n";
1525 
1526  state_ = table[rc].final;
1527  }
1528 
1529  void EventProcessor::runAsync() {
1530  beginJob();
1531  {
1532  boost::mutex::scoped_lock sl(stop_lock_);
1533  if(id_set_ == true) {
1534  std::string err("runAsync called while async event loop already running\n");
1535  LogError("FwkJob") << err;
1536  throw cms::Exception("BadState") << err;
1537  }
1538 
1539  changeState(mRunAsync);
1540 
1541  stop_count_ = 0;
1542  last_rc_ = epSuccess; // forget the last value!
1543  event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
1544  boost::xtime timeout;
1545 #if BOOST_VERSION >= 105000
1546  boost::xtime_get(&timeout, boost::TIME_UTC_);
1547 #else
1548  boost::xtime_get(&timeout, boost::TIME_UTC);
1549 #endif
1550  timeout.sec += 60; // 60 seconds to start!!!!
1551  if(starter_.timed_wait(sl, timeout) == false) {
1552  // yikes - the thread did not start
1553  throw cms::Exception("BadState")
1554  << "Async run thread did not start in 60 seconds\n";
1555  }
1556  }
1557  }
1558 
1559  void EventProcessor::asyncRun(EventProcessor* me) {
1560  // set up signals to allow for interruptions
1561  // ignore all other signals
1562  // make sure no exceptions escape out
1563 
1564  // temporary hack until we modify the input source to allow
1565  // wakeup calls from other threads. This mimics the solution
1566  // in EventFilter/Processor, which I do not like.
1567  // allowing cancels means that the thread just disappears at
1568  // certain points. This is bad for C++ stack variables.
1569  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1570  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
1571  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1572  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1573 
1574  {
1575  boost::mutex::scoped_lock sl(me->stop_lock_);
1576  me->event_loop_id_ = pthread_self();
1577  me->id_set_ = true;
1578  me->starter_.notify_all();
1579  }
1580 
1581  Status rc = epException;
1582  FDEBUG(2) << "asyncRun starting ......................\n";
1583 
1584  try {
1585  bool onlineStateTransitions = true;
1586  rc = me->runToCompletion(onlineStateTransitions);
1587  }
1588  catch (cms::Exception& e) {
1589  LogError("FwkJob") << "cms::Exception caught in "
1590  << "EventProcessor::asyncRun"
1591  << "\n"
1592  << e.explainSelf();
1593  me->last_error_text_ = e.explainSelf();
1594  }
1595  catch (std::exception& e) {
1596  LogError("FwkJob") << "Standard library exception caught in "
1597  << "EventProcessor::asyncRun"
1598  << "\n"
1599  << e.what();
1600  me->last_error_text_ = e.what();
1601  }
1602  catch (...) {
1603  LogError("FwkJob") << "Unknown exception caught in "
1604  << "EventProcessor::asyncRun"
1605  << "\n";
1606  me->last_error_text_ = "Unknown exception caught";
1607  rc = epOther;
1608  }
1609 
1610  me->last_rc_ = rc;
1611 
1612  {
1613  // notify anyone waiting for exit that we are doing so now
1614  boost::mutex::scoped_lock sl(me->stop_lock_);
1615  ++me->stop_count_;
1616  me->stopper_.notify_all();
1617  }
1618  FDEBUG(2) << "asyncRun ending ......................\n";
1619  }
1620 
1621  std::auto_ptr<statemachine::Machine>
1622  EventProcessor::createStateMachine() {
1624  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1625  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1626  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1627  else {
1628  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1629  << fileMode_ << ".\n"
1630  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1631  }
1632 
1633  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1634  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1635  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1636  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1637  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1638  else {
1639  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1640  << emptyRunLumiMode_ << ".\n"
1641  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1642  }
1643 
1644  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1645  fileMode,
1646  emptyRunLumiMode));
1647 
1648  machine->initiate();
1649  return machine;
1650  }
1651 
1652 
1654  EventProcessor::runToCompletion(bool onlineStateTransitions) {
1655 
1656  StateSentry toerror(this);
1657 
1658  StatusCode returnCode=epSuccess;
1659  std::auto_ptr<statemachine::Machine> machine;
1660  {
1661  beginJob(); //make sure this was called
1662 
1663  if(!onlineStateTransitions) changeState(mRunCount);
1664 
1665  //StatusCode returnCode = epSuccess;
1666  stateMachineWasInErrorState_ = false;
1667 
1668  // make the services available
1669  ServiceRegistry::Operate operate(serviceToken_);
1670 
1671  machine = createStateMachine();
1672  try {
1673  try {
1674 
1675  InputSource::ItemType itemType;
1676 
1677  while(true) {
1678 
1679  bool more = true;
1680  if(numberOfForkedChildren_ > 0) {
1681  size_t size = preg_->size();
1682  more = input_->skipForForking();
1683  if(more) {
1684  if(size < preg_->size()) {
1685  principalCache_.adjustIndexesAfterProductRegistryAddition();
1686  }
1687  principalCache_.adjustEventToNewProductRegistry(preg_);
1688  }
1689  }
1690  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1691 
1692  FDEBUG(1) << "itemType = " << itemType << "\n";
1693 
1694  // These are used for asynchronous running only and
1695  // and are checking to see if stopAsync or shutdownAsync
1696  // were called from another thread. In the future, we
1697  // may need to do something better than polling the state.
1698  // With the current code this is the simplest thing and
1699  // it should always work. If the interaction between
1700  // threads becomes more complex this may cause problems.
1701  if(state_ == sStopping) {
1702  FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
1703  forceLooperToEnd_ = true;
1704  machine->process_event(statemachine::Stop());
1705  forceLooperToEnd_ = false;
1706  break;
1707  }
1708  else if(state_ == sShuttingDown) {
1709  FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
1710  forceLooperToEnd_ = true;
1711  machine->process_event(statemachine::Stop());
1712  forceLooperToEnd_ = false;
1713  break;
1714  }
1715 
1716  // Look for a shutdown signal
1717  {
1718  boost::mutex::scoped_lock sl(usr2_lock);
1719  if(shutdown_flag) {
1720  changeState(mShutdownSignal);
1721  returnCode = epSignal;
1722  forceLooperToEnd_ = true;
1723  machine->process_event(statemachine::Stop());
1724  forceLooperToEnd_ = false;
1725  break;
1726  }
1727  }
1728 
1729  if(itemType == InputSource::IsStop) {
1730  machine->process_event(statemachine::Stop());
1731  }
1732  else if(itemType == InputSource::IsFile) {
1733  machine->process_event(statemachine::File());
1734  }
1735  else if(itemType == InputSource::IsRun) {
1736  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1737  }
1738  else if(itemType == InputSource::IsLumi) {
1739  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1740  }
1741  else if(itemType == InputSource::IsEvent) {
1742  machine->process_event(statemachine::Event());
1743  }
1744  // This should be impossible
1745  else {
1747  << "Unknown next item type passed to EventProcessor\n"
1748  << "Please report this error to the Framework group\n";
1749  }
1750 
1751  if(machine->terminated()) {
1752  changeState(mInputExhausted);
1753  break;
1754  }
1755  } // End of loop over state machine events
1756  } // Try block
1757  catch (cms::Exception& e) { throw; }
1758  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
1759  catch (std::exception& e) { convertException::stdToEDM(e); }
1761  catch(char const* c) { convertException::charPtrToEDM(c); }
1762  catch (...) { convertException::unknownToEDM(); }
1763  } // Try block
1764  // Some comments on exception handling related to the boost state machine:
1765  //
1766  // Some states used in the machine are special because they
1767  // perform actions while the machine is being terminated, actions
1768  // such as close files, call endRun, call endLumi etc ... Each of these
1769  // states has two functions that perform these actions. The functions
1770  // are almost identical. The major difference is that one version
1771  // catches all exceptions and the other lets exceptions pass through.
1772  // The destructor catches them and the other function named "exit" lets
1773  // them pass through. On a normal termination, boost will always call
1774  // "exit" and then the state destructor. In our state classes, the
1775  // the destructors do nothing if the exit function already took
1776  // care of things. Here's the interesting part. When boost is
1777  // handling an exception the "exit" function is not called (a boost
1778  // feature).
1779  //
1780  // If an exception occurs while the boost machine is in control
1781  // (which usually means inside a process_event call), then
1782  // the boost state machine destroys its states and "terminates" itself.
1783  // This already done before we hit the catch blocks below. In this case
1784  // the call to terminateMachine below only destroys an already
1785  // terminated state machine. Because exit is not called, the state destructors
1786  // handle cleaning up lumis, runs, and files. The destructors swallow
1787  // all exceptions and only pass through the exceptions messages, which
1788  // are tacked onto the original exception below.
1789  //
1790  // If an exception occurs when the boost state machine is not
1791  // in control (outside the process_event functions), then boost
1792  // cannot destroy its own states. The terminateMachine function
1793  // below takes care of that. The flag "alreadyHandlingException"
1794  // is set true so that the state exit functions do nothing (and
1795  // cannot throw more exceptions while handling the first). Then the
1796  // state destructors take care of this because exit did nothing.
1797  //
1798  // In both cases above, the EventProcessor::endOfLoop function is
1799  // not called because it can throw exceptions.
1800  //
1801  // One tricky aspect of the state machine is that things that can
1802  // throw should not be invoked by the state machine while another
1803  // exception is being handled.
1804  // Another tricky aspect is that it appears to be important to
1805  // terminate the state machine before invoking its destructor.
1806  // We've seen crashes that are not understood when that is not
1807  // done. Maintainers of this code should be careful about this.
1808 
1809  catch (cms::Exception & e) {
1810  alreadyHandlingException_ = true;
1811  terminateMachine(machine);
1812  alreadyHandlingException_ = false;
1813  if (!exceptionMessageLumis_.empty()) {
1814  e.addAdditionalInfo(exceptionMessageLumis_);
1815  if (e.alreadyPrinted()) {
1816  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1817  }
1818  }
1819  if (!exceptionMessageRuns_.empty()) {
1820  e.addAdditionalInfo(exceptionMessageRuns_);
1821  if (e.alreadyPrinted()) {
1822  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1823  }
1824  }
1825  if (!exceptionMessageFiles_.empty()) {
1826  e.addAdditionalInfo(exceptionMessageFiles_);
1827  if (e.alreadyPrinted()) {
1828  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1829  }
1830  }
1831  throw;
1832  }
1833 
1834  if(machine->terminated()) {
1835  FDEBUG(1) << "The state machine reports it has been terminated\n";
1836  machine.reset();
1837  }
1838 
1839  if(!onlineStateTransitions) changeState(mFinished);
1840 
1841  if(stateMachineWasInErrorState_) {
1842  throw cms::Exception("BadState")
1843  << "The boost state machine in the EventProcessor exited after\n"
1844  << "entering the Error state.\n";
1845  }
1846 
1847  }
1848  if(machine.get() != 0) {
1849  terminateMachine(machine);
1851  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1852  << "Please report this error to the Framework group\n";
1853  }
1854 
1855  toerror.succeeded();
1856 
1857  return returnCode;
1858  }
1859 
1860  void EventProcessor::readFile() {
1861  FDEBUG(1) << " \treadFile\n";
1862  size_t size = preg_->size();
1863  fb_ = input_->readFile();
1864  if(size < preg_->size()) {
1865  principalCache_.adjustIndexesAfterProductRegistryAddition();
1866  }
1867  principalCache_.adjustEventToNewProductRegistry(preg_);
1868  if(numberOfForkedChildren_ > 0) {
1869  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1870  }
1871  }
1872 
1873  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1874  if (fb_.get() != nullptr) {
1875  input_->closeFile(fb_.get(), cleaningUpAfterException);
1876  }
1877  FDEBUG(1) << "\tcloseInputFile\n";
1878  }
1879 
1880  void EventProcessor::openOutputFiles() {
1881  if (fb_.get() != nullptr) {
1882  schedule_->openOutputFiles(*fb_);
1883  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1884  }
1885  FDEBUG(1) << "\topenOutputFiles\n";
1886  }
1887 
1888  void EventProcessor::closeOutputFiles() {
1889  if (fb_.get() != nullptr) {
1890  schedule_->closeOutputFiles();
1891  if(hasSubProcess()) subProcess_->closeOutputFiles();
1892  }
1893  FDEBUG(1) << "\tcloseOutputFiles\n";
1894  }
1895 
1896  void EventProcessor::respondToOpenInputFile() {
1897  if (fb_.get() != nullptr) {
1898  schedule_->respondToOpenInputFile(*fb_);
1899  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1900  }
1901  FDEBUG(1) << "\trespondToOpenInputFile\n";
1902  }
1903 
1904  void EventProcessor::respondToCloseInputFile() {
1905  if (fb_.get() != nullptr) {
1906  schedule_->respondToCloseInputFile(*fb_);
1907  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1908  }
1909  FDEBUG(1) << "\trespondToCloseInputFile\n";
1910  }
1911 
1912  void EventProcessor::respondToOpenOutputFiles() {
1913  if (fb_.get() != nullptr) {
1914  schedule_->respondToOpenOutputFiles(*fb_);
1915  if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1916  }
1917  FDEBUG(1) << "\trespondToOpenOutputFiles\n";
1918  }
1919 
1920  void EventProcessor::respondToCloseOutputFiles() {
1921  if (fb_.get() != nullptr) {
1922  schedule_->respondToCloseOutputFiles(*fb_);
1923  if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1924  }
1925  FDEBUG(1) << "\trespondToCloseOutputFiles\n";
1926  }
1927 
1928  void EventProcessor::startingNewLoop() {
1929  shouldWeStop_ = false;
1930  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1931  // until after we've called beginOfJob
1932  if(looper_ && looperBeginJobRun_) {
1933  looper_->doStartingNewLoop();
1934  }
1935  FDEBUG(1) << "\tstartingNewLoop\n";
1936  }
1937 
1938  bool EventProcessor::endOfLoop() {
1939  if(looper_) {
1940  ModuleChanger changer(schedule_.get());
1941  looper_->setModuleChanger(&changer);
1942  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1943  looper_->setModuleChanger(nullptr);
1944  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1945  else return false;
1946  }
1947  FDEBUG(1) << "\tendOfLoop\n";
1948  return true;
1949  }
1950 
1951  void EventProcessor::rewindInput() {
1952  input_->repeat();
1953  input_->rewind();
1954  FDEBUG(1) << "\trewind\n";
1955  }
1956 
1957  void EventProcessor::prepareForNextLoop() {
1958  looper_->prepareForNextLoop(esp_.get());
1959  FDEBUG(1) << "\tprepareForNextLoop\n";
1960  }
1961 
1962  bool EventProcessor::shouldWeCloseOutput() const {
1963  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1964  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1965  }
1966 
1967  void EventProcessor::doErrorStuff() {
1968  FDEBUG(1) << "\tdoErrorStuff\n";
1969  LogError("StateMachine")
1970  << "The EventProcessor state machine encountered an unexpected event\n"
1971  << "and went to the error state\n"
1972  << "Will attempt to terminate processing normally\n"
1973  << "(IF using the looper the next loop will be attempted)\n"
1974  << "This likely indicates a bug in an input module or corrupted input or both\n";
1975  stateMachineWasInErrorState_ = true;
1976  }
1977 
1978  void EventProcessor::beginRun(statemachine::Run const& run) {
1979  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1980  input_->doBeginRun(runPrincipal);
1981  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1982  runPrincipal.beginTime());
1983  if(forceESCacheClearOnNewRun_){
1984  espController_->forceCacheClear();
1985  }
1986  espController_->eventSetupForInstance(ts);
1987  EventSetup const& es = esp_->eventSetup();
1988  if(looper_ && looperBeginJobRun_== false) {
1989  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1990  looper_->beginOfJob(es);
1991  looperBeginJobRun_ = true;
1992  looper_->doStartingNewLoop();
1993  }
1994  {
1996  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1997  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1998  if(hasSubProcess()) {
1999  subProcess_->doBeginRun(runPrincipal, ts);
2000  }
2001  sentry.allowThrow();
2002  }
2003  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
2004  if(looper_) {
2005  looper_->doBeginRun(runPrincipal, es);
2006  }
2007  }
2008 
2009  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
2010  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2011  input_->doEndRun(runPrincipal, cleaningUpAfterException);
2012  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
2013  runPrincipal.endTime());
2014  espController_->eventSetupForInstance(ts);
2015  EventSetup const& es = esp_->eventSetup();
2016  {
2018  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2019  schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2020  if(hasSubProcess()) {
2021  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2022  }
2023  sentry.allowThrow();
2024  }
2025  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
2026  if(looper_) {
2027  looper_->doEndRun(runPrincipal, es);
2028  }
2029  }
2030 
2031  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
2032  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2033  input_->doBeginLumi(lumiPrincipal);
2034 
2036  if(rng.isAvailable()) {
2037  LuminosityBlock lb(lumiPrincipal, ModuleDescription());
2038  rng->preBeginLumi(lb);
2039  }
2040 
2041  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
2042  // lumi blocks know their start and end times why not also start and end events?
2043  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
2044  espController_->eventSetupForInstance(ts);
2045  EventSetup const& es = esp_->eventSetup();
2046  {
2048  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2049  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2050  if(hasSubProcess()) {
2051  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2052  }
2053  sentry.allowThrow();
2054  }
2055  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
2056  if(looper_) {
2057  looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2058  }
2059  }
2060 
2061  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
2062  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2063  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2064  //NOTE: Using the max event number for the end of a lumi block is a bad idea
2065  // lumi blocks know their start and end times why not also start and end events?
2066  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
2067  lumiPrincipal.endTime());
2068  espController_->eventSetupForInstance(ts);
2069  EventSetup const& es = esp_->eventSetup();
2070  {
2072  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2073  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2074  if(hasSubProcess()) {
2075  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2076  }
2077  sentry.allowThrow();
2078  }
2079  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
2080  if(looper_) {
2081  looper_->doEndLuminosityBlock(lumiPrincipal, es);
2082  }
2083  }
2084 
2085  statemachine::Run EventProcessor::readAndCacheRun() {
2086  if (principalCache_.hasRunPrincipal()) {
2088  << "EventProcessor::readAndCacheRun\n"
2089  << "Illegal attempt to insert run into cache\n"
2090  << "Contact a Framework Developer\n";
2091  }
2092  principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
2093  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2094  }
2095 
2096  statemachine::Run EventProcessor::readAndMergeRun() {
2097  principalCache_.merge(input_->runAuxiliary(), preg_);
2098  input_->readAndMergeRun(principalCache_.runPrincipalPtr());
2099  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2100  }
2101 
2102  int EventProcessor::readAndCacheLumi() {
2103  if (principalCache_.hasLumiPrincipal()) {
2105  << "EventProcessor::readAndCacheRun\n"
2106  << "Illegal attempt to insert lumi into cache\n"
2107  << "Contact a Framework Developer\n";
2108  }
2109  if (!principalCache_.hasRunPrincipal()) {
2111  << "EventProcessor::readAndCacheRun\n"
2112  << "Illegal attempt to insert lumi into cache\n"
2113  << "Run is invalid\n"
2114  << "Contact a Framework Developer\n";
2115  }
2116  principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
2117  principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
2118  return input_->luminosityBlock();
2119  }
2120 
2121  int EventProcessor::readAndMergeLumi() {
2122  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
2123  input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
2124  return input_->luminosityBlock();
2125  }
2126 
2127  void EventProcessor::writeRun(statemachine::Run const& run) {
2128  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
2129  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
2130  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
2131  }
2132 
2133  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
2134  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
2135  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
2136  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
2137  }
2138 
2139  void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
2140  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2141  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2142  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
2143  }
2144 
2145  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
2146  principalCache_.deleteLumi(phid, run, lumi);
2147  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2148  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
2149  }
2150 
2151  void EventProcessor::readAndProcessEvent() {
2152  EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
2153  FDEBUG(1) << "\treadEvent\n";
2154  assert(pep != 0);
2155  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2156  assert(pep->luminosityBlockPrincipalPtrValid());
2157  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2158  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2159 
2160  IOVSyncValue ts(pep->id(), pep->time());
2161  espController_->eventSetupForInstance(ts);
2162  EventSetup const& es = esp_->eventSetup();
2163  {
2165  ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2166  schedule_->processOneOccurrence<Traits>(*pep, es);
2167  if(hasSubProcess()) {
2168  subProcess_->doEvent(*pep, ts);
2169  }
2170  sentry.allowThrow();
2171  }
2172 
2173  if(looper_) {
2174  bool randomAccess = input_->randomAccess();
2175  ProcessingController::ForwardState forwardState = input_->forwardState();
2176  ProcessingController::ReverseState reverseState = input_->reverseState();
2177  ProcessingController pc(forwardState, reverseState, randomAccess);
2178 
2179  EDLooperBase::Status status = EDLooperBase::kContinue;
2180  do {
2181  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2182 
2183  bool succeeded = true;
2184  if(randomAccess) {
2185  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2186  input_->skipEvents(-2);
2187  }
2188  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2189  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2190  }
2191  }
2192  pc.setLastOperationSucceeded(succeeded);
2193  } while(!pc.lastOperationSucceeded());
2194  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2195 
2196  }
2197 
2198  FDEBUG(1) << "\tprocessEvent\n";
2199  pep->clearEventPrincipal();
2200  }
2201 
2202  bool EventProcessor::shouldWeStop() const {
2203  FDEBUG(1) << "\tshouldWeStop\n";
2204  if(shouldWeStop_) return true;
2205  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2206  }
2207 
2208  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2209  exceptionMessageFiles_ = message;
2210  }
2211 
2212  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2213  exceptionMessageRuns_ = message;
2214  }
2215 
2216  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2217  exceptionMessageLumis_ = message;
2218  }
2219 
2220  bool EventProcessor::alreadyHandlingException() const {
2221  return alreadyHandlingException_;
2222  }
2223 
2224  void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
2225  if(iMachine.get() != 0) {
2226  if(!iMachine->terminated()) {
2227  forceLooperToEnd_ = true;
2228  iMachine->process_event(statemachine::Stop());
2229  forceLooperToEnd_ = false;
2230  }
2231  else {
2232  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2233  }
2234  if(iMachine->terminated()) {
2235  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2236  }
2237  iMachine.reset();
2238  }
2239  }
2240 }
list table
Definition: asciidump.py:386
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
T getParameter(std::string const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
string rep
Definition: cuy.py:1188
volatile int stop_count_
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
volatile bool id_set_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
RunNumber_t run() const
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:249
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:378
ActivityRegistry::PostProcessEvent postProcessEventSignal_
virtual std::string explainSelf() const
Definition: Exception.cc:146
ParameterSetID id() const
static ThreadSafeRegistry * instance()
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
std::unique_ptr< ActionTable const > act_table_
Definition: ScheduleItems.h:55
Timestamp const & endTime() const
int getSigNum()
EventID const & id() const
void call(boost::function< void(void)>)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
def pipe
Definition: pipe.py:5
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
LuminosityBlockNumber_t luminosityBlock() const
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:46
void insert(boost::shared_ptr< RunPrincipal > rp)
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:56
unsigned int LuminosityBlockNumber_t
Definition: EventID.h:31
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet)
Timestamp const & time() const
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
TransEntry table[]
ServiceToken serviceToken_
boost::mutex stop_lock_
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
ParameterSet const & getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:54
void stdToEDM(std::exception const &e)
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:47
static InputSourceFactory * get()
volatile event_processor::State state_
Timestamp const & endTime() const
Definition: RunPrincipal.h:58
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::unique_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:53
virtual void endOfJob()
Definition: EDLooperBase.cc:74
void connectSigs(EventProcessor *ep)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
#define noexcept
volatile Status last_rc_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::unique_ptr< InputSource > input_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::RunNumber_t runNumber() const
Definition: EPStates.h:50
boost::condition starter_
double a
Definition: hdecay.h:121
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:49
volatile bool shutdown_flag
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
Definition: EventRange.h:32
#define O_NONBLOCK
Definition: SysFile.h:21
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
ActivityRegistry::PreProcessEvent preProcessEventSignal_
collection_type & data()
Provide access to the contained collection.
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:52
tuple status
Definition: ntuplemaker.py:245
void validate(ParameterSet &pset, std::string const &moduleLabel) const
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
bool luminosityBlockPrincipalPtrValid()
ParameterSet const & registerIt()
static ComponentFactory< T > * get()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:56
boost::mutex usr2_lock
PrincipalCache principalCache_
bool hasSubProcess() const
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
Definition: ScheduleItems.h:54
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)