CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
1 
3 
11 
36 
38 
46 
49 
57 
58 #include "MessageForSource.h"
59 #include "MessageForParent.h"
60 
61 #include "boost/bind.hpp"
62 #include "boost/thread/xtime.hpp"
63 
64 #include <exception>
65 #include <iomanip>
66 #include <iostream>
67 #include <utility>
68 #include <sstream>
69 
70 #include <sys/ipc.h>
71 #include <sys/msg.h>
72 
73 //Used for forking
74 #include <sys/types.h>
75 #include <sys/wait.h>
76 #include <sys/socket.h>
77 #include <sys/select.h>
78 #include <sys/fcntl.h>
79 #include <unistd.h>
80 
81 //Used for CPU affinity
82 #ifndef __APPLE__
83 #include <sched.h>
84 #endif
85 
86 namespace edm {
87 
88  namespace event_processor {
89 
90  class StateSentry {
91  public:
94  void succeeded() {success_ = true;}
95 
96  private:
98  bool success_;
99  };
100  }
101 
102  using namespace event_processor;
103 
104  namespace {
105  template <typename T>
106  class ScheduleSignalSentry {
107  public:
108  ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
109  a_(a), principal_(principal), es_(es) {
110  if (a_) T::preScheduleSignal(a_, principal_);
111  }
112  ~ScheduleSignalSentry() {
113  if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
114  }
115 
116  private:
117  // We own none of these resources.
118  ActivityRegistry* a_;
119  typename T::MyPrincipal* principal_;
120  EventSetup const* es_;
121  };
122  }
123 
124  namespace {
125 
126  // the next two tables must be kept in sync with the state and
127  // message enums from the header
128 
129  char const* stateNames[] = {
130  "Init",
131  "JobReady",
132  "RunGiven",
133  "Running",
134  "Stopping",
135  "ShuttingDown",
136  "Done",
137  "JobEnded",
138  "Error",
139  "ErrorEnded",
140  "End",
141  "Invalid"
142  };
143 
144  char const* msgNames[] = {
145  "SetRun",
146  "Skip",
147  "RunAsync",
148  "Run(ID)",
149  "Run(count)",
150  "BeginJob",
151  "StopAsync",
152  "ShutdownAsync",
153  "EndJob",
154  "CountComplete",
155  "InputExhausted",
156  "StopSignal",
157  "ShutdownSignal",
158  "Finished",
159  "Any",
160  "dtor",
161  "Exception",
162  "Rewind"
163  };
164  }
165  // IMPORTANT NOTE:
166  // the mAny messages are special, they must appear last in the
167  // table if multiple entries for a CurrentState are present.
168  // the changeState function does not use the mAny yet!!!
169 
170  struct TransEntry {
173  State final;
174  };
175 
176  // we should use this information to initialize a two dimensional
177  // table of t[CurrentState][Message] = FinalState
178 
179  /*
180  the way this is current written, the async run can thread function
181  can return in the "JobReady" state - but not yet cleaned up. The
182  problem is that only when stop/shutdown async is called is the
183  thread cleaned up. But the stop/shudown async functions attempt
184  first to change the state using messages that are not valid in
185  "JobReady" state.
186 
187  I think most of the problems can be solved by using two states
188  for "running": RunningS and RunningA (sync/async). The problems
189  seems to be the all the transitions out of running for both
190  modes of operation. The other solution might be to only go to
191  "Stopping" from Running, and use the return code from "run_p" to
192  set the final state. If this is used, then in the sync mode the
193  "Stopping" state would be momentary.
194 
195  */
196 
198  // CurrentState Message FinalState
199  // -----------------------------------------
200  { sInit, mException, sError },
201  { sInit, mBeginJob, sJobReady },
205  { sJobReady, mSkip, sRunning },
206  { sJobReady, mRunID, sRunning },
210  { sJobReady, mDtor, sEnd }, // should this be allowed?
211 
215 
221  { sRunning, mException, sError },
225  { sRunning, mCountComplete, sStopping }, // sJobReady
226  { sRunning, mInputExhausted, sStopping }, // sJobReady
227 
228  { sStopping, mInputRewind, sRunning }, // The looper needs this
233  { sStopping, mStopAsync, sStopping }, // stay
234  { sStopping, mInputExhausted, sStopping }, // stay
235  //{ sStopping, mAny, sJobReady }, // <- ??????
238  { sShuttingDown, mCountComplete, sDone }, // needed?
239  { sShuttingDown, mInputExhausted, sDone }, // needed?
241  //{ sShuttingDown, mShutdownAsync, sShuttingDown }, // only one at
242  //{ sShuttingDown, mStopAsync, sShuttingDown }, // a time
243  //{ sShuttingDown, mAny, sDone }, // <- ??????
244  { sDone, mEndJob, sJobEnded },
245  { sDone, mException, sError },
246  { sJobEnded, mDtor, sEnd },
248  { sError, mEndJob, sError }, // funny one here
249  { sError, mDtor, sError }, // funny one here
250  { sInit, mDtor, sEnd }, // for StorM dummy EP
251  { sStopping, mShutdownAsync, sShuttingDown }, // For FUEP tests
252  { sInvalid, mAny, sInvalid }
253  };
254 
255 
256  // Note: many of the messages generate the mBeginJob message first
257  // mRunID, mRunCount, mSetRun
258 
259  // ---------------------------------------------------------------
260  boost::shared_ptr<InputSource>
262  CommonParams const& common,
263  ProductRegistry& preg,
264  PrincipalCache& pCache,
265  boost::shared_ptr<ActivityRegistry> areg,
266  boost::shared_ptr<ProcessConfiguration> processConfiguration) {
267  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
268  if(main_input == 0) {
270  << "There must be exactly one source in the configuration.\n"
271  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
272  }
273 
274  std::string modtype(main_input->getParameter<std::string>("@module_type"));
275 
276  std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
278  ConfigurationDescriptions descriptions(filler->baseType());
279  filler->fill(descriptions);
280 
281  try {
282  try {
283  descriptions.validate(*main_input, std::string("source"));
284  }
285  catch (cms::Exception& e) { throw; }
286  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
287  catch (std::exception& e) { convertException::stdToEDM(e); }
288  catch (std::string& s) { convertException::stringToEDM(s); }
289  catch (char const* c) { convertException::charPtrToEDM(c); }
290  catch (...) { convertException::unknownToEDM(); }
291  }
292  catch (cms::Exception & iException) {
293  std::ostringstream ost;
294  ost << "Validating configuration of input source of type " << modtype;
295  iException.addContext(ost.str());
296  throw;
297  }
298 
299  main_input->registerIt();
300 
301  // Fill in "ModuleDescription", in case the input source produces
302  // any EDproducts, which would be registered in the ProductRegistry.
303  // Also fill in the process history item for this process.
304  // There is no module label for the unnamed input source, so
305  // just use "source".
306  // Only the tracked parameters belong in the process configuration.
307  ModuleDescription md(main_input->id(),
308  main_input->getParameter<std::string>("@module_type"),
309  "source",
310  processConfiguration.get());
311 
312  InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
313  areg->preSourceConstructionSignal_(md);
314  boost::shared_ptr<InputSource> input;
315  try {
316  try {
317  input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
318  }
319  catch (cms::Exception& e) { throw; }
320  catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
321  catch (std::exception& e) { convertException::stdToEDM(e); }
322  catch (std::string& s) { convertException::stringToEDM(s); }
323  catch (char const* c) { convertException::charPtrToEDM(c); }
324  catch (...) { convertException::unknownToEDM(); }
325  }
326  catch (cms::Exception& iException) {
327  areg->postSourceConstructionSignal_(md);
328  std::ostringstream ost;
329  ost << "Constructing input source of type " << modtype;
330  iException.addContext(ost.str());
331  throw;
332  }
333  areg->postSourceConstructionSignal_(md);
334  return input;
335  }
336 
337  // ---------------------------------------------------------------
338  boost::shared_ptr<EDLooperBase>
341  ParameterSet& params) {
342  boost::shared_ptr<EDLooperBase> vLooper;
343 
344  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
345 
346  if(loopers.size() == 0) {
347  return vLooper;
348  }
349 
350  assert(1 == loopers.size());
351 
352  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
353  itName != itNameEnd;
354  ++itName) {
355 
356  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
357  providerPSet->registerIt();
358  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
359  cp,
360  *providerPSet);
361  }
362  return vLooper;
363 
364  }
365 
366  // ---------------------------------------------------------------
368  ServiceToken const& iToken,
370  std::vector<std::string> const& defaultServices,
371  std::vector<std::string> const& forcedServices) :
372  preProcessEventSignal_(),
373  postProcessEventSignal_(),
374  actReg_(),
375  preg_(),
376  serviceToken_(),
377  input_(),
378  espController_(new eventsetup::EventSetupsController),
379  esp_(),
380  act_table_(),
381  processConfiguration_(),
382  schedule_(),
383  subProcess_(),
384  historyAppender_(new HistoryAppender),
385  state_(sInit),
386  event_loop_(),
387  state_lock_(),
388  stop_lock_(),
389  stopper_(),
390  starter_(),
391  stop_count_(-1),
392  last_rc_(epSuccess),
393  last_error_text_(),
394  id_set_(false),
395  event_loop_id_(),
396  my_sig_num_(getSigNum()),
397  fb_(),
398  looper_(),
399  machine_(),
400  principalCache_(),
401  shouldWeStop_(false),
402  stateMachineWasInErrorState_(false),
403  fileMode_(),
404  emptyRunLumiMode_(),
405  exceptionMessageFiles_(),
406  exceptionMessageRuns_(),
407  exceptionMessageLumis_(),
408  alreadyHandlingException_(false),
409  forceLooperToEnd_(false),
410  looperBeginJobRun_(false),
411  forceESCacheClearOnNewRun_(false),
412  numberOfForkedChildren_(0),
413  numberOfSequentialEventsPerChild_(1),
414  setCpuAffinity_(false),
415  eventSetupDataToExcludeFromPrefetching_() {
416  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
417  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
418  processDesc->addServices(defaultServices, forcedServices);
419  init(processDesc, iToken, iLegacy);
420  }
421 
423  std::vector<std::string> const& defaultServices,
424  std::vector<std::string> const& forcedServices) :
425  preProcessEventSignal_(),
426  postProcessEventSignal_(),
427  actReg_(),
428  preg_(),
429  serviceToken_(),
430  input_(),
431  espController_(new eventsetup::EventSetupsController),
432  esp_(),
433  act_table_(),
434  processConfiguration_(),
435  schedule_(),
436  subProcess_(),
437  historyAppender_(new HistoryAppender),
438  state_(sInit),
439  event_loop_(),
440  state_lock_(),
441  stop_lock_(),
442  stopper_(),
443  starter_(),
444  stop_count_(-1),
445  last_rc_(epSuccess),
446  last_error_text_(),
447  id_set_(false),
448  event_loop_id_(),
449  my_sig_num_(getSigNum()),
450  fb_(),
451  looper_(),
452  machine_(),
453  principalCache_(),
454  shouldWeStop_(false),
455  stateMachineWasInErrorState_(false),
456  fileMode_(),
457  emptyRunLumiMode_(),
458  exceptionMessageFiles_(),
459  exceptionMessageRuns_(),
460  exceptionMessageLumis_(),
461  alreadyHandlingException_(false),
462  forceLooperToEnd_(false),
463  looperBeginJobRun_(false),
464  forceESCacheClearOnNewRun_(false),
465  numberOfForkedChildren_(0),
466  numberOfSequentialEventsPerChild_(1),
467  setCpuAffinity_(false),
468  eventSetupDataToExcludeFromPrefetching_() {
469  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
470  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
471  processDesc->addServices(defaultServices, forcedServices);
473  }
474 
475  EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
476  ServiceToken const& token,
478  preProcessEventSignal_(),
479  postProcessEventSignal_(),
480  actReg_(),
481  preg_(),
482  serviceToken_(),
483  input_(),
484  espController_(new eventsetup::EventSetupsController),
485  esp_(),
486  act_table_(),
487  processConfiguration_(),
488  schedule_(),
489  subProcess_(),
490  historyAppender_(new HistoryAppender),
491  state_(sInit),
492  event_loop_(),
493  state_lock_(),
494  stop_lock_(),
495  stopper_(),
496  starter_(),
497  stop_count_(-1),
498  last_rc_(epSuccess),
499  last_error_text_(),
500  id_set_(false),
501  event_loop_id_(),
502  my_sig_num_(getSigNum()),
503  fb_(),
504  looper_(),
505  machine_(),
506  principalCache_(),
507  shouldWeStop_(false),
508  stateMachineWasInErrorState_(false),
509  fileMode_(),
510  emptyRunLumiMode_(),
511  exceptionMessageFiles_(),
512  exceptionMessageRuns_(),
513  exceptionMessageLumis_(),
514  alreadyHandlingException_(false),
515  forceLooperToEnd_(false),
516  looperBeginJobRun_(false),
517  forceESCacheClearOnNewRun_(false),
518  numberOfForkedChildren_(0),
519  numberOfSequentialEventsPerChild_(1),
520  setCpuAffinity_(false),
521  eventSetupDataToExcludeFromPrefetching_() {
522  init(processDesc, token, legacy);
523  }
524 
525 
526  EventProcessor::EventProcessor(std::string const& config, bool isPython):
527  preProcessEventSignal_(),
528  postProcessEventSignal_(),
529  actReg_(),
530  preg_(),
531  serviceToken_(),
532  input_(),
533  espController_(new eventsetup::EventSetupsController),
534  esp_(),
535  act_table_(),
536  processConfiguration_(),
537  schedule_(),
538  subProcess_(),
539  historyAppender_(new HistoryAppender),
540  state_(sInit),
541  event_loop_(),
542  state_lock_(),
543  stop_lock_(),
544  stopper_(),
545  starter_(),
546  stop_count_(-1),
547  last_rc_(epSuccess),
548  last_error_text_(),
549  id_set_(false),
550  event_loop_id_(),
551  my_sig_num_(getSigNum()),
552  fb_(),
553  looper_(),
554  machine_(),
555  principalCache_(),
556  shouldWeStop_(false),
557  stateMachineWasInErrorState_(false),
558  fileMode_(),
559  emptyRunLumiMode_(),
560  exceptionMessageFiles_(),
561  exceptionMessageRuns_(),
562  exceptionMessageLumis_(),
563  alreadyHandlingException_(false),
564  forceLooperToEnd_(false),
565  looperBeginJobRun_(false),
566  forceESCacheClearOnNewRun_(false),
567  numberOfForkedChildren_(0),
568  numberOfSequentialEventsPerChild_(1),
569  setCpuAffinity_(false),
570  eventSetupDataToExcludeFromPrefetching_() {
571  if(isPython) {
572  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
573  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
575  }
576  else {
577  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
579  }
580  }
581 
582  void
583  EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
584  ServiceToken const& iToken,
586 
587  //std::cerr << processDesc->dump() << std::endl;
588  // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
589  // They must be cleared here because some processes run multiple EventProcessors in succession.
591 
592  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
593  //std::cerr << parameterSet->dump() << std::endl;
594 
595  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
596  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
597 
598  // Now set some parameters specific to the main process.
599  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
600  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
601  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
602  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
603  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
604  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
605  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
606  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
607  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
608  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
609  itPS != itPSEnd;
610  ++itPS) {
611  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
612  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
613  itPS->getUntrackedParameter<std::string>("label", "")));
614  }
615  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
616 
617  // Now do general initialization
618  ScheduleItems items;
619 
620  //initialize the services
621  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
622  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
623  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
624 
625  //make the services available
627 
628  // intialize miscellaneous items
629  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
630 
631  // intialize the event setup provider
632  esp_ = espController_->makeProvider(*parameterSet);
633 
634  // initialize the looper, if any
635  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
636  if(looper_) {
637  looper_->setActionTable(items.act_table_.get());
638  looper_->attachTo(*items.actReg_);
639  }
640 
641  // initialize the input source
642  input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
643 
644  // intialize the Schedule
645  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
646 
647  // set the data members
648  act_table_ = items.act_table_;
649  actReg_ = items.actReg_;
650  preg_ = items.preg_;
652 
653  FDEBUG(2) << parameterSet << std::endl;
654  connectSigs(this);
655 
656  // initialize the subprocess, if there is one
657  if(subProcessParameterSet) {
658  subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
659  }
660  espController_->clearComponents();
661  }
662 
664  // Make the services available while everything is being deleted.
665  ServiceToken token = getToken();
666  ServiceRegistry::Operate op(token);
667 
668  // The state machine should have already been cleaned up
669  // and destroyed at this point by a call to EndJob or
670  // earlier when it completed processing events, but if it
671  // has not been we'll take care of it here at the last moment.
672  // This could cause problems if we are already handling an
673  // exception and another one is thrown here ... For a critical
674  // executable the solution to this problem is for the code using
675  // the EventProcessor to explicitly call EndJob or use runToCompletion,
676  // then the next line of code is never executed.
678 
679  try {
681  }
682  catch(cms::Exception& e) {
683  LogError("System")
684  << e.explainSelf() << "\n";
685  }
686 
687  // manually destroy all these thing that may need the services around
688  espController_.reset();
689  subProcess_.reset();
690  esp_.reset();
691  schedule_.reset();
692  input_.reset();
693  looper_.reset();
694  actReg_.reset();
695 
696  pset::Registry* psetRegistry = pset::Registry::instance();
697  psetRegistry->data().clear();
698  psetRegistry->extra().setID(ParameterSetID());
699 
701  ParentageRegistry::instance()->data().clear();
705  }
706 
707  void
709  beginJob(); //make sure this was called
712  {
713  StateSentry toerror(this);
714 
715  //make the services available
717 
718  {
719  input_->repeat();
720  input_->rewind();
721  }
723  toerror.succeeded();
724  }
726  }
727 
729  EventProcessor::run(int numberEventsToProcess, bool) {
730  return runEventCount(numberEventsToProcess);
731  }
732 
734  EventProcessor::skip(int numberToSkip) {
735  beginJob(); //make sure this was called
737  {
738  StateSentry toerror(this);
739 
740  //make the services available
742 
743  {
744  input_->skipEvents(numberToSkip);
745  }
747  toerror.succeeded();
748  }
750  return epSuccess;
751  }
752 
753  void
755  if(state_ != sInit) return;
756  bk::beginJob();
757  // can only be run if in the initial state
759 
760  // StateSentry toerror(this); // should we add this ?
761  //make the services available
763 
764  //NOTE: This implementation assumes 'Job' means one call
765  // the EventProcessor::run
766  // If it really means once per 'application' then this code will
767  // have to be changed.
768  // Also have to deal with case where have 'run' then new Module
769  // added and do 'run'
770  // again. In that case the newly added Module needs its 'beginJob'
771  // to be called.
772 
773  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
774  // For now we delay calling beginOfJob until first beginOfRun
775  //if(looper_) {
776  // looper_->beginOfJob(es);
777  //}
778  try {
779  try {
780  input_->doBeginJob();
781  }
782  catch (cms::Exception& e) { throw; }
783  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
784  catch (std::exception& e) { convertException::stdToEDM(e); }
785  catch(std::string& s) { convertException::stringToEDM(s); }
786  catch(char const* c) { convertException::charPtrToEDM(c); }
787  catch (...) { convertException::unknownToEDM(); }
788  }
789  catch(cms::Exception& ex) {
790  ex.addContext("Calling beginJob for the source");
791  throw;
792  }
793  schedule_->beginJob();
794  // toerror.succeeded(); // should we add this?
795  if(hasSubProcess()) subProcess_->doBeginJob();
796  actReg_->postBeginJobSignal_();
797  }
798 
799  void
801  // Collects exceptions, so we don't throw before all operations are performed.
802  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
803 
804  // only allowed to run if state is sIdle, sJobReady, sRunGiven
805  c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
806 
807  //make the services available
809 
810  c.call(boost::bind(&EventProcessor::terminateMachine, this));
811  schedule_->endJob(c);
812  if(hasSubProcess()) {
813  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
814  }
815  c.call(boost::bind(&InputSource::doEndJob, input_));
816  if(looper_) {
817  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
818  }
819  c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
820  if(c.hasThrown()) {
821  c.rethrow();
822  }
823  }
824 
827  return serviceToken_;
828  }
829 
830  //Setup signal handler to listen for when forked children stop
831  namespace {
832  //These are volatile since the compiler can not be allowed to optimize them
833  // since they can be modified in the signaller handler
834  volatile bool child_failed = false;
835  volatile unsigned int num_children_done = 0;
836  volatile int child_fail_exit_status = 0;
837  volatile int child_fail_signal = 0;
838 
839  extern "C" {
840  void ep_sigchld(int, siginfo_t*, void*) {
841  //printf("in sigchld\n");
842  //FDEBUG(1) << "in sigchld handler\n";
843  int stat_loc;
844  pid_t p = waitpid(-1, &stat_loc, WNOHANG);
845  while(0<p) {
846  //printf(" looping\n");
847  if(WIFEXITED(stat_loc)) {
848  ++num_children_done;
849  if(0 != WEXITSTATUS(stat_loc)) {
850  child_fail_exit_status = WEXITSTATUS(stat_loc);
851  child_failed = true;
852  }
853  }
854  if(WIFSIGNALED(stat_loc)) {
855  ++num_children_done;
856  child_fail_signal = WTERMSIG(stat_loc);
857  child_failed = true;
858  }
859  p = waitpid(-1, &stat_loc, WNOHANG);
860  }
861  }
862  }
863 
864  }
865 
866  enum {
871  };
872 
873  namespace {
874  unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
875  unsigned int n = 0;
876  while(numberOfChildren != 0) {
877  ++n;
878  numberOfChildren /= 10;
879  }
880  if(n == 0) {
881  n = 3; // Protect against zero numberOfChildren
882  }
883  return n;
884  }
885 
886  /*This class embodied the thread which is used to listen to the forked children and
887  then tell them which events they should process */
888  class MessageSenderToSource {
889  public:
890  MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
891  void operator()();
892 
893  private:
894  const std::vector<int>& m_childrenPipes;
895  long const m_nEventsToProcess;
896  fd_set m_socketSet;
897  unsigned int m_aliveChildren;
898  int m_maxFd;
899  };
900 
901  MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
902  std::vector<int> const& childrenPipes,
903  long iNEventsToProcess):
904  m_childrenPipes(childrenPipes),
905  m_nEventsToProcess(iNEventsToProcess),
906  m_aliveChildren(childrenSockets.size()),
907  m_maxFd(0)
908  {
909  FD_ZERO(&m_socketSet);
910  for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
911  it != itEnd; it++) {
912  FD_SET(*it, &m_socketSet);
913  if (*it > m_maxFd) {
914  m_maxFd = *it;
915  }
916  }
917  for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
918  it != itEnd; ++it) {
919  FD_SET(*it, &m_socketSet);
920  if (*it > m_maxFd) {
921  m_maxFd = *it;
922  }
923  }
924  m_maxFd++; // select reads [0,m_maxFd).
925  }
926 
927  /* This function is the heart of the communication between parent and child.
928  * When ready for more data, the child (see MessageReceiverForSource) requests
929  * data through a AF_UNIX socket message. The parent will then assign the next
930  * chunk of data by sending a message back.
931  *
932  * Additionally, this function also monitors the read-side of the pipe fd from the child.
933  * If the child dies unexpectedly, the pipe will be selected as ready for read and
934  * will return EPIPE when read from. Further, if the child thinks the parent has died
935  * (defined as waiting more than 1s for a response), it will write a single byte to
936  * the pipe. If the parent has died, the child will get a EPIPE and throw an exception.
937  * If still alive, the parent will read the byte and ignore it.
938  *
939  * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
940  * handler can distinguish between success and failure cases.
941  */
942 
943  void
944  MessageSenderToSource::operator()() {
945  multicore::MessageForParent childMsg;
946  LogInfo("ForkingController") << "I am controller";
947  //this is the master and therefore the controller
948 
949  multicore::MessageForSource sndmsg;
950  sndmsg.startIndex = 0;
951  sndmsg.nIndices = m_nEventsToProcess;
952  do {
953 
954  fd_set readSockets, errorSockets;
955  // Wait for a request from a child for events.
956  memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
957  memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
958  // Note that we don't timeout; may be reconsidered in the future.
959  ssize_t rc;
960  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
961  if (rc < 0) {
962  std::cerr << "select failed; should be impossible due to preconditions.\n";
963  abort();
964  break;
965  }
966 
967  // Read the message from the child.
968  for (int idx=0; idx<m_maxFd; idx++) {
969 
970  // Handle errors
971  if (FD_ISSET(idx, &errorSockets)) {
972  LogInfo("ForkingController") << "Error on socket " << idx;
973  FD_CLR(idx, &m_socketSet);
974  close(idx);
975  // See if it was the watchdog pipe that died.
976  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
977  if (*it == idx) {
978  m_aliveChildren--;
979  }
980  }
981  continue;
982  }
983 
984  if (!FD_ISSET(idx, &readSockets)) {
985  continue;
986  }
987 
988  // See if this FD is a child watchdog pipe. If so, read from it to prevent
989  // writes from blocking.
990  bool is_pipe = false;
991  for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
992  if (*it == idx) {
993  is_pipe = true;
994  char buf;
995  while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
996  if (rc <= 0) {
997  m_aliveChildren--;
998  FD_CLR(idx, &m_socketSet);
999  close(idx);
1000  }
1001  }
1002  }
1003 
1004  // Only execute this block if the FD is a socket for sending the child work.
1005  if (!is_pipe) {
1006  while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
1007  if (rc < 0) {
1008  FD_CLR(idx, &m_socketSet);
1009  close(idx);
1010  continue;
1011  }
1012 
1013  // Tell the child what events to process.
1014  // If 'send' fails, then the child process has failed (any other possibilities are
1015  // eliminated because we are using fixed-size messages with Unix datagram sockets).
1016  // Thus, the SIGCHLD handler will fire and set child_fail = true.
1017  while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
1018  if (rc < 0) {
1019  FD_CLR(idx, &m_socketSet);
1020  close(idx);
1021  continue;
1022  }
1023  //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
1024  sndmsg.startIndex += sndmsg.nIndices;
1025  }
1026  }
1027 
1028  } while (m_aliveChildren > 0);
1029 
1030  return;
1031  }
1032 
1033  }
1034 
1035  bool
1036  EventProcessor::forkProcess(std::string const& jobReportFile) {
1037 
1038  if(0 == numberOfForkedChildren_) {return true;}
1039  assert(0<numberOfForkedChildren_);
1040  //do what we want done in common
1041  {
1042  beginJob(); //make sure this was run
1043  // make the services available
1044  ServiceRegistry::Operate operate(serviceToken_);
1045 
1046  InputSource::ItemType itemType;
1047  itemType = input_->nextItemType();
1048 
1049  assert(itemType == InputSource::IsFile);
1050  {
1051  readFile();
1052  }
1053  itemType = input_->nextItemType();
1054  assert(itemType == InputSource::IsRun);
1055 
1056  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
1057  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
1058  input_->runAuxiliary()->beginTime());
1059  espController_->eventSetupForInstance(ts);
1060  EventSetup const& es = esp_->eventSetup();
1061 
1062  //now get all the data available in the EventSetup
1063  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1064  es.fillAvailableRecordKeys(recordKeys);
1065  std::vector<eventsetup::DataKey> dataKeys;
1066  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1067  itKey != itEnd;
1068  ++itKey) {
1069  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
1070  //see if this is on our exclusion list
1071  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
1072  ExcludedData const* excludedData(0);
1073  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1074  excludedData = &(itExcludeRec->second);
1075  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
1076  //skip all items in this record
1077  continue;
1078  }
1079  }
1080  if(0 != recordPtr) {
1081  dataKeys.clear();
1082  recordPtr->fillRegisteredDataKeys(dataKeys);
1083  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1084  itDataKey != itDataKeyEnd;
1085  ++itDataKey) {
1086  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1087  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1088  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1089  continue;
1090  }
1091  try {
1092  recordPtr->doGet(*itDataKey);
1093  } catch(cms::Exception& e) {
1094  LogWarning("ForkingEventSetupPreFetching") << e.what();
1095  }
1096  }
1097  }
1098  }
1099  }
1100  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
1101  {
1102  // make the services available
1103  ServiceRegistry::Operate operate(serviceToken_);
1104  Service<JobReport> jobReport;
1105  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1106 
1107  //Now actually do the forking
1108  actReg_->preForkReleaseResourcesSignal_();
1109  input_->doPreForkReleaseResources();
1110  schedule_->preForkReleaseResources();
1111  }
1112  installCustomHandler(SIGCHLD, ep_sigchld);
1113 
1114 
1115  unsigned int childIndex = 0;
1116  unsigned int const kMaxChildren = numberOfForkedChildren_;
1117  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1118  std::vector<pid_t> childrenIds;
1119  childrenIds.reserve(kMaxChildren);
1120  std::vector<int> childrenSockets;
1121  childrenSockets.reserve(kMaxChildren);
1122  std::vector<int> childrenPipes;
1123  childrenPipes.reserve(kMaxChildren);
1124  std::vector<int> childrenSocketsCopy;
1125  childrenSocketsCopy.reserve(kMaxChildren);
1126  std::vector<int> childrenPipesCopy;
1127  childrenPipesCopy.reserve(kMaxChildren);
1128  int pipes[2];
1129 
1130  {
1131  // make the services available
1132  ServiceRegistry::Operate operate(serviceToken_);
1133  Service<JobReport> jobReport;
1134  int sockets[2], fd_flags;
1135  for(; childIndex < kMaxChildren; ++childIndex) {
1136  // Create a UNIX_DGRAM socket pair
1137  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1138  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1139  exit(EXIT_FAILURE);
1140  }
1141  if (pipe(pipes)) {
1142  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1143  exit(EXIT_FAILURE);
1144  }
1145  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1146  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1147  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1148  exit(EXIT_FAILURE);
1149  }
1150  // Mark socket as non-block. Child must be careful to do select prior
1151  // to reading from socket.
1152  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1153  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1154  exit(EXIT_FAILURE);
1155  }
1156  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1157  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1158  exit(EXIT_FAILURE);
1159  }
1160  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1161  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1162  exit(EXIT_FAILURE);
1163  }
1164  // Linux man page notes there are some edge cases where reading from a
1165  // fd can block, even after a select.
1166  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1167  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1168  exit(EXIT_FAILURE);
1169  }
1170  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1171  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1172  exit(EXIT_FAILURE);
1173  }
1174 
1175  childrenPipesCopy = childrenPipes;
1176  childrenSocketsCopy = childrenSockets;
1177 
1178  pid_t value = fork();
1179  if(value == 0) {
1180  // Close the parent's side of the socket and pipe which will talk to us.
1181  close(pipes[0]);
1182  close(sockets[0]);
1183  // Close our copies of the parent's other communication pipes.
1184  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1185  close(*it);
1186  }
1187  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1188  close(*it);
1189  }
1190 
1191  // this is the child process, redirect stdout and stderr to a log file
1192  fflush(stdout);
1193  fflush(stderr);
1194  std::stringstream stout;
1195  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1196  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1197  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1198  }
1199  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1200  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1201  }
1202 
1203  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1204  if(setCpuAffinity_) {
1205  // CPU affinity is handled differently on macosx.
1206  // We disable it and print a message until someone reads:
1207  //
1208  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1209  //
1210  // and implements it.
1211 #ifdef __APPLE__
1212  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1213 #else
1214  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1215  cpu_set_t mask;
1216  CPU_ZERO(&mask);
1217  CPU_SET(childIndex, &mask);
1218  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1219  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1220  exit(-1);
1221  }
1222 #endif
1223  }
1224  break;
1225  } else {
1226  //this is the parent
1227  close(pipes[1]);
1228  close(sockets[1]);
1229  }
1230  if(value < 0) {
1231  LogError("ForkingChild") << "failed to create a child";
1232  exit(-1);
1233  }
1234  childrenIds.push_back(value);
1235  childrenSockets.push_back(sockets[0]);
1236  childrenPipes.push_back(pipes[0]);
1237  }
1238 
1239  if(childIndex < kMaxChildren) {
1240  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1241  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1242 
1243  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1244  input_->doPostForkReacquireResources(receiver);
1245  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1246  //NOTE: sources have to reset themselves by listening to the post fork message
1247  //rewindInput();
1248  return true;
1249  }
1250  jobReport->parentAfterFork(jobReportFile);
1251  }
1252 
1253  //this is the original, which is now the master for all the children
1254 
1255  //Need to wait for signals from the children or externally
1256  // To wait we must
1257  // 1) block the signals we want to wait on so we do not have a race condition
1258  // 2) check that we haven't already meet our ending criteria
1259  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1260  sigset_t blockingSigSet;
1261  sigset_t unblockingSigSet;
1262  sigset_t oldSigSet;
1263  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1264  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1265  sigaddset(&blockingSigSet, SIGCHLD);
1266  sigaddset(&blockingSigSet, SIGUSR2);
1267  sigaddset(&blockingSigSet, SIGINT);
1268  sigdelset(&unblockingSigSet, SIGCHLD);
1269  sigdelset(&unblockingSigSet, SIGUSR2);
1270  sigdelset(&unblockingSigSet, SIGINT);
1271  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1272 
1273  // If there are too many fd's (unlikely, but possible) for select, denote this
1274  // because the sender will fail.
1275  bool too_many_fds = false;
1276  if (pipes[1]+1 > FD_SETSIZE) {
1277  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1278  too_many_fds = true;
1279  }
1280 
1281  //create a thread that sends the units of work to workers
1282  // we create it after all signals were blocked so that this
1283  // thread is never interupted by a signal
1284  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1285  boost::thread senderThread(sender);
1286 
1287  while(!too_many_fds && !shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
1288  sigsuspend(&unblockingSigSet);
1289  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1290  }
1291  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1292 
1293  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1294  if(child_failed) {
1295  LogError("ForkingStopping") << "child failed";
1296  }
1297  if(shutdown_flag) {
1298  LogSystem("ForkingStopping") << "asked to shutdown";
1299  }
1300 
1301  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1302  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1303  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1304  it != itEnd; ++it) {
1305  /* int result = */ kill(*it, SIGUSR2);
1306  }
1307  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1308  while(num_children_done != kMaxChildren) {
1309  sigsuspend(&unblockingSigSet);
1310  }
1311  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1312  }
1313  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1314  senderThread.join();
1315  if(child_failed) {
1316  if (child_fail_signal) {
1317  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1318  } else if (child_fail_exit_status) {
1319  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1320  } else {
1321  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1322  }
1323  }
1324  if(too_many_fds) {
1325  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1326  }
1327  return false;
1328  }
1329 
1330  void
1331  EventProcessor::connectSigs(EventProcessor* ep) {
1332  // When the FwkImpl signals are given, pass them to the
1333  // appropriate EventProcessor signals so that the outside world
1334  // can see the signal.
1335  actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
1336  actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
1337  }
1338 
1339  std::vector<ModuleDescription const*>
1340  EventProcessor::getAllModuleDescriptions() const {
1341  return schedule_->getAllModuleDescriptions();
1342  }
1343 
1344  int
1345  EventProcessor::totalEvents() const {
1346  return schedule_->totalEvents();
1347  }
1348 
1349  int
1350  EventProcessor::totalEventsPassed() const {
1351  return schedule_->totalEventsPassed();
1352  }
1353 
1354  int
1355  EventProcessor::totalEventsFailed() const {
1356  return schedule_->totalEventsFailed();
1357  }
1358 
1359  void
1360  EventProcessor::enableEndPaths(bool active) {
1361  schedule_->enableEndPaths(active);
1362  }
1363 
1364  bool
1365  EventProcessor::endPathsEnabled() const {
1366  return schedule_->endPathsEnabled();
1367  }
1368 
1369  void
1370  EventProcessor::getTriggerReport(TriggerReport& rep) const {
1371  schedule_->getTriggerReport(rep);
1372  }
1373 
1374  void
1375  EventProcessor::clearCounters() {
1376  schedule_->clearCounters();
1377  }
1378 
1379  char const* EventProcessor::currentStateName() const {
1380  return stateName(getState());
1381  }
1382 
1383  char const* EventProcessor::stateName(State s) const {
1384  return stateNames[s];
1385  }
1386 
1387  char const* EventProcessor::msgName(Msg m) const {
1388  return msgNames[m];
1389  }
1390 
1391  State EventProcessor::getState() const {
1392  return state_;
1393  }
1394 
1395  EventProcessor::StatusCode EventProcessor::statusAsync() const {
1396  // the thread will record exception/error status in the event processor
1397  // for us to look at and report here
1398  return last_rc_;
1399  }
1400 
1401  void
1402  EventProcessor::setRunNumber(RunNumber_t runNumber) {
1403  if(runNumber == 0) {
1404  runNumber = 1;
1405  LogWarning("Invalid Run")
1406  << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
1407  << "Run number was set to 1 instead\n";
1408  }
1409 
1410  // inside of beginJob there is a check to see if it has been called before
1411  beginJob();
1412  changeState(mSetRun);
1413 
1414  // interface not correct yet
1415  input_->setRunNumber(runNumber);
1416  }
1417 
1418  void
1419  EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
1420  // inside of beginJob there is a check to see if it has been called before
1421  beginJob();
1422  changeState(mSetRun);
1423 
1424  // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
1425  //input_->declareRunNumber(runNumber);
1426  }
1427 
1429  EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
1430  bool rc = true;
1431  boost::xtime timeout;
1432  boost::xtime_get(&timeout, boost::TIME_UTC);
1433  timeout.sec += timeout_seconds;
1434 
1435  // make sure to include a timeout here so we don't wait forever
1436  // I suspect there are still timing issues with thread startup
1437  // and the setting of the various control variables (stop_count, id_set)
1438  {
1439  boost::mutex::scoped_lock sl(stop_lock_);
1440 
1441  // look here - if runAsync not active, just return the last return code
1442  if(stop_count_ < 0) return last_rc_;
1443 
1444  if(timeout_seconds == 0) {
1445  while(stop_count_ == 0) stopper_.wait(sl);
1446  } else {
1447  while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
1448  }
1449 
1450  if(rc == false) {
1451  // timeout occurred
1452  // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
1453  // this is a temporary hack until we get the input source
1454  // upgraded to allow blocking input sources to be unblocked
1455 
1456  // the next line is dangerous and causes all sorts of trouble
1457  if(id_set_) pthread_cancel(event_loop_id_);
1458 
1459  // we will not do anything yet
1460  LogWarning("timeout")
1461  << "An asynchronous request was made to shut down "
1462  << "the event loop "
1463  << "and the event loop did not shutdown after "
1464  << timeout_seconds << " seconds\n";
1465  } else {
1466  event_loop_->join();
1467  event_loop_.reset();
1468  id_set_ = false;
1469  stop_count_ = -1;
1470  }
1471  }
1472  return rc == false ? epTimedOut : last_rc_;
1473  }
1474 
1476  EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
1477  StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1478  if(rc != epTimedOut) changeState(mCountComplete);
1479  else errorState();
1480  return rc;
1481  }
1482 
1483 
1484  EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
1485  changeState(mStopAsync);
1486  StatusCode rc = waitForAsyncCompletion(secs);
1487  if(rc != epTimedOut) changeState(mFinished);
1488  else errorState();
1489  return rc;
1490  }
1491 
1492  EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
1493  changeState(mShutdownAsync);
1494  StatusCode rc = waitForAsyncCompletion(secs);
1495  if(rc != epTimedOut) changeState(mFinished);
1496  else errorState();
1497  return rc;
1498  }
1499 
1500  void EventProcessor::errorState() {
1501  state_ = sError;
1502  }
1503 
1504  // next function irrelevant now
1505  EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
1506  // make sure to include a timeout here so we don't wait forever
1507  // I suspect there are still timing issues with thread startup
1508  // and the setting of the various control variables (stop_count, id_set)
1509  changeState(m);
1510  return waitForAsyncCompletion(60*2);
1511  }
1512 
1513  void EventProcessor::changeState(Msg msg) {
1514  // most likely need to serialize access to this routine
1515 
1516  boost::mutex::scoped_lock sl(state_lock_);
1517  State curr = state_;
1518  int rc;
1519  // found if(not end of table) and
1520  // (state == table.state && (msg == table.message || msg == any))
1521  for(rc = 0;
1522  table[rc].current != sInvalid &&
1523  (curr != table[rc].current ||
1524  (curr == table[rc].current &&
1525  msg != table[rc].message && table[rc].message != mAny));
1526  ++rc);
1527 
1528  if(table[rc].current == sInvalid)
1529  throw cms::Exception("BadState")
1530  << "A member function of EventProcessor has been called in an"
1531  << " inappropriate order.\n"
1532  << "Bad transition from " << stateName(curr) << " "
1533  << "using message " << msgName(msg) << "\n"
1534  << "No where to go from here.\n";
1535 
1536  FDEBUG(1) << "changeState: current=" << stateName(curr)
1537  << ", message=" << msgName(msg)
1538  << " -> new=" << stateName(table[rc].final) << "\n";
1539 
1540  state_ = table[rc].final;
1541  }
1542 
1543  void EventProcessor::runAsync() {
1544  beginJob();
1545  {
1546  boost::mutex::scoped_lock sl(stop_lock_);
1547  if(id_set_ == true) {
1548  std::string err("runAsync called while async event loop already running\n");
1549  LogError("FwkJob") << err;
1550  throw cms::Exception("BadState") << err;
1551  }
1552 
1553  changeState(mRunAsync);
1554 
1555  stop_count_ = 0;
1556  last_rc_ = epSuccess; // forget the last value!
1557  event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
1558  boost::xtime timeout;
1559  boost::xtime_get(&timeout, boost::TIME_UTC);
1560  timeout.sec += 60; // 60 seconds to start!!!!
1561  if(starter_.timed_wait(sl, timeout) == false) {
1562  // yikes - the thread did not start
1563  throw cms::Exception("BadState")
1564  << "Async run thread did not start in 60 seconds\n";
1565  }
1566  }
1567  }
1568 
1569  void EventProcessor::asyncRun(EventProcessor* me) {
1570  // set up signals to allow for interruptions
1571  // ignore all other signals
1572  // make sure no exceptions escape out
1573 
1574  // temporary hack until we modify the input source to allow
1575  // wakeup calls from other threads. This mimics the solution
1576  // in EventFilter/Processor, which I do not like.
1577  // allowing cancels means that the thread just disappears at
1578  // certain points. This is bad for C++ stack variables.
1579  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1580  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
1581  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1582  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1583 
1584  {
1585  boost::mutex::scoped_lock(me->stop_lock_);
1586  me->event_loop_id_ = pthread_self();
1587  me->id_set_ = true;
1588  me->starter_.notify_all();
1589  }
1590 
1591  Status rc = epException;
1592  FDEBUG(2) << "asyncRun starting ......................\n";
1593 
1594  try {
1595  bool onlineStateTransitions = true;
1596  rc = me->runToCompletion(onlineStateTransitions);
1597  }
1598  catch (cms::Exception& e) {
1599  LogError("FwkJob") << "cms::Exception caught in "
1600  << "EventProcessor::asyncRun"
1601  << "\n"
1602  << e.explainSelf();
1603  me->last_error_text_ = e.explainSelf();
1604  }
1605  catch (std::exception& e) {
1606  LogError("FwkJob") << "Standard library exception caught in "
1607  << "EventProcessor::asyncRun"
1608  << "\n"
1609  << e.what();
1610  me->last_error_text_ = e.what();
1611  }
1612  catch (...) {
1613  LogError("FwkJob") << "Unknown exception caught in "
1614  << "EventProcessor::asyncRun"
1615  << "\n";
1616  me->last_error_text_ = "Unknown exception caught";
1617  rc = epOther;
1618  }
1619 
1620  me->last_rc_ = rc;
1621 
1622  {
1623  // notify anyone waiting for exit that we are doing so now
1624  boost::mutex::scoped_lock sl(me->stop_lock_);
1625  ++me->stop_count_;
1626  me->stopper_.notify_all();
1627  }
1628  FDEBUG(2) << "asyncRun ending ......................\n";
1629  }
1630 
1631 
1633  EventProcessor::runToCompletion(bool onlineStateTransitions) {
1634 
1635  StateSentry toerror(this);
1636 
1637  int numberOfEventsToProcess = -1;
1638  StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
1639 
1640  if(machine_.get() != 0) {
1642  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1643  << "Please report this error to the Framework group\n";
1644  }
1645 
1646  toerror.succeeded();
1647 
1648  return returnCode;
1649  }
1650 
1652  EventProcessor::runEventCount(int numberOfEventsToProcess) {
1653 
1654  StateSentry toerror(this);
1655 
1656  bool onlineStateTransitions = false;
1657  StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
1658 
1659  toerror.succeeded();
1660 
1661  return returnCode;
1662  }
1663 
1665  EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
1666 
1667  // Reusable event principal
1668  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_, historyAppender_.get()));
1669  principalCache_.insert(ep);
1670 
1671  beginJob(); //make sure this was called
1672 
1673  if(!onlineStateTransitions) changeState(mRunCount);
1674 
1675  StatusCode returnCode = epSuccess;
1676  stateMachineWasInErrorState_ = false;
1677 
1678  // make the services available
1679  ServiceRegistry::Operate operate(serviceToken_);
1680 
1681  if(machine_.get() == 0) {
1682 
1684  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1685  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1686  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1687  else {
1688  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1689  << fileMode_ << ".\n"
1690  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1691  }
1692 
1693  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1694  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1695  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1696  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1697  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1698  else {
1699  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1700  << emptyRunLumiMode_ << ".\n"
1701  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1702  }
1703 
1704  machine_.reset(new statemachine::Machine(this,
1705  fileMode,
1706  emptyRunLumiMode));
1707 
1708  machine_->initiate();
1709  }
1710 
1711  try {
1712  try {
1713 
1714  InputSource::ItemType itemType;
1715 
1716  int iEvents = 0;
1717 
1718  while(true) {
1719 
1720  itemType = input_->nextItemType();
1721 
1722  FDEBUG(1) << "itemType = " << itemType << "\n";
1723 
1724  // These are used for asynchronous running only and
1725  // and are checking to see if stopAsync or shutdownAsync
1726  // were called from another thread. In the future, we
1727  // may need to do something better than polling the state.
1728  // With the current code this is the simplest thing and
1729  // it should always work. If the interaction between
1730  // threads becomes more complex this may cause problems.
1731  if(state_ == sStopping) {
1732  FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
1733  forceLooperToEnd_ = true;
1734  machine_->process_event(statemachine::Stop());
1735  forceLooperToEnd_ = false;
1736  break;
1737  }
1738  else if(state_ == sShuttingDown) {
1739  FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
1740  forceLooperToEnd_ = true;
1741  machine_->process_event(statemachine::Stop());
1742  forceLooperToEnd_ = false;
1743  break;
1744  }
1745 
1746  // Look for a shutdown signal
1747  {
1748  boost::mutex::scoped_lock sl(usr2_lock);
1749  if(shutdown_flag) {
1750  changeState(mShutdownSignal);
1751  returnCode = epSignal;
1752  forceLooperToEnd_ = true;
1753  machine_->process_event(statemachine::Stop());
1754  forceLooperToEnd_ = false;
1755  break;
1756  }
1757  }
1758 
1759  if(itemType == InputSource::IsStop) {
1760  machine_->process_event(statemachine::Stop());
1761  }
1762  else if(itemType == InputSource::IsFile) {
1763  machine_->process_event(statemachine::File());
1764  }
1765  else if(itemType == InputSource::IsRun) {
1766  machine_->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1767  }
1768  else if(itemType == InputSource::IsLumi) {
1769  machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
1770  }
1771  else if(itemType == InputSource::IsEvent) {
1772  machine_->process_event(statemachine::Event());
1773  ++iEvents;
1774  if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
1775  returnCode = epCountComplete;
1776  changeState(mInputExhausted);
1777  FDEBUG(1) << "Event count complete, pausing event loop\n";
1778  break;
1779  }
1780  }
1781  // This should be impossible
1782  else {
1784  << "Unknown next item type passed to EventProcessor\n"
1785  << "Please report this error to the Framework group\n";
1786  }
1787 
1788  if(machine_->terminated()) {
1789  changeState(mInputExhausted);
1790  break;
1791  }
1792  } // End of loop over state machine events
1793  } // Try block
1794  catch (cms::Exception& e) { throw; }
1795  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
1796  catch (std::exception& e) { convertException::stdToEDM(e); }
1797  catch(std::string& s) { convertException::stringToEDM(s); }
1798  catch(char const* c) { convertException::charPtrToEDM(c); }
1799  catch (...) { convertException::unknownToEDM(); }
1800  } // Try block
1801  // Some comments on exception handling related to the boost state machine:
1802  //
1803  // Some states used in the machine are special because they
1804  // perform actions while the machine is being terminated, actions
1805  // such as close files, call endRun, call endLumi etc ... Each of these
1806  // states has two functions that perform these actions. The functions
1807  // are almost identical. The major difference is that one version
1808  // catches all exceptions and the other lets exceptions pass through.
1809  // The destructor catches them and the other function named "exit" lets
1810  // them pass through. On a normal termination, boost will always call
1811  // "exit" and then the state destructor. In our state classes, the
1812  // the destructors do nothing if the exit function already took
1813  // care of things. Here's the interesting part. When boost is
1814  // handling an exception the "exit" function is not called (a boost
1815  // feature).
1816  //
1817  // If an exception occurs while the boost machine is in control
1818  // (which usually means inside a process_event call), then
1819  // the boost state machine destroys its states and "terminates" itself.
1820  // This already done before we hit the catch blocks below. In this case
1821  // the call to terminateMachine below only destroys an already
1822  // terminated state machine. Because exit is not called, the state destructors
1823  // handle cleaning up lumis, runs, and files. The destructors swallow
1824  // all exceptions and only pass through the exceptions messages, which
1825  // are tacked onto the original exception below.
1826  //
1827  // If an exception occurs when the boost state machine is not
1828  // in control (outside the process_event functions), then boost
1829  // cannot destroy its own states. The terminateMachine function
1830  // below takes care of that. The flag "alreadyHandlingException"
1831  // is set true so that the state exit functions do nothing (and
1832  // cannot throw more exceptions while handling the first). Then the
1833  // state destructors take care of this because exit did nothing.
1834  //
1835  // In both cases above, the EventProcessor::endOfLoop function is
1836  // not called because it can throw exceptions.
1837  //
1838  // One tricky aspect of the state machine is that things that can
1839  // throw should not be invoked by the state machine while another
1840  // exception is being handled.
1841  // Another tricky aspect is that it appears to be important to
1842  // terminate the state machine before invoking its destructor.
1843  // We've seen crashes that are not understood when that is not
1844  // done. Maintainers of this code should be careful about this.
1845 
1846  catch (cms::Exception & e) {
1847  alreadyHandlingException_ = true;
1848  terminateMachine();
1849  alreadyHandlingException_ = false;
1850  if (!exceptionMessageLumis_.empty()) {
1851  e.addAdditionalInfo(exceptionMessageLumis_);
1852  if (e.alreadyPrinted()) {
1853  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1854  }
1855  }
1856  if (!exceptionMessageRuns_.empty()) {
1857  e.addAdditionalInfo(exceptionMessageRuns_);
1858  if (e.alreadyPrinted()) {
1859  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1860  }
1861  }
1862  if (!exceptionMessageFiles_.empty()) {
1863  e.addAdditionalInfo(exceptionMessageFiles_);
1864  if (e.alreadyPrinted()) {
1865  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1866  }
1867  }
1868  throw;
1869  }
1870 
1871  if(machine_->terminated()) {
1872  FDEBUG(1) << "The state machine reports it has been terminated\n";
1873  machine_.reset();
1874  }
1875 
1876  if(!onlineStateTransitions) changeState(mFinished);
1877 
1878  if(stateMachineWasInErrorState_) {
1879  throw cms::Exception("BadState")
1880  << "The boost state machine in the EventProcessor exited after\n"
1881  << "entering the Error state.\n";
1882  }
1883 
1884  return returnCode;
1885  }
1886 
1888  FDEBUG(1) << " \treadFile\n";
1889  fb_ = input_->readFile();
1890  if(numberOfForkedChildren_ > 0) {
1891  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1892  }
1893  }
1894 
1895  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1896  if (fb_.get() != 0) {
1897  input_->closeFile(fb_, cleaningUpAfterException);
1898  }
1899  FDEBUG(1) << "\tcloseInputFile\n";
1900  }
1901 
1902  void EventProcessor::openOutputFiles() {
1903  if (fb_.get() != 0) {
1904  schedule_->openOutputFiles(*fb_);
1905  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1906  }
1907  FDEBUG(1) << "\topenOutputFiles\n";
1908  }
1909 
1910  void EventProcessor::closeOutputFiles() {
1911  if (fb_.get() != 0) {
1912  schedule_->closeOutputFiles();
1913  if(hasSubProcess()) subProcess_->closeOutputFiles();
1914  }
1915  FDEBUG(1) << "\tcloseOutputFiles\n";
1916  }
1917 
1918  void EventProcessor::respondToOpenInputFile() {
1919  if (fb_.get() != 0) {
1920  schedule_->respondToOpenInputFile(*fb_);
1921  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1922  }
1923  FDEBUG(1) << "\trespondToOpenInputFile\n";
1924  }
1925 
1926  void EventProcessor::respondToCloseInputFile() {
1927  if (fb_.get() != 0) {
1928  schedule_->respondToCloseInputFile(*fb_);
1929  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1930  }
1931  FDEBUG(1) << "\trespondToCloseInputFile\n";
1932  }
1933 
1934  void EventProcessor::respondToOpenOutputFiles() {
1935  if (fb_.get() != 0) {
1936  schedule_->respondToOpenOutputFiles(*fb_);
1937  if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1938  }
1939  FDEBUG(1) << "\trespondToOpenOutputFiles\n";
1940  }
1941 
1942  void EventProcessor::respondToCloseOutputFiles() {
1943  if (fb_.get() != 0) {
1944  schedule_->respondToCloseOutputFiles(*fb_);
1945  if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1946  }
1947  FDEBUG(1) << "\trespondToCloseOutputFiles\n";
1948  }
1949 
1950  void EventProcessor::startingNewLoop() {
1951  shouldWeStop_ = false;
1952  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1953  // until after we've called beginOfJob
1954  if(looper_ && looperBeginJobRun_) {
1955  looper_->doStartingNewLoop();
1956  }
1957  FDEBUG(1) << "\tstartingNewLoop\n";
1958  }
1959 
1960  bool EventProcessor::endOfLoop() {
1961  if(looper_) {
1962  ModuleChanger changer(schedule_.get());
1963  looper_->setModuleChanger(&changer);
1964  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1965  looper_->setModuleChanger(0);
1966  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1967  else return false;
1968  }
1969  FDEBUG(1) << "\tendOfLoop\n";
1970  return true;
1971  }
1972 
1973  void EventProcessor::rewindInput() {
1974  input_->repeat();
1975  input_->rewind();
1976  FDEBUG(1) << "\trewind\n";
1977  }
1978 
1979  void EventProcessor::prepareForNextLoop() {
1980  looper_->prepareForNextLoop(esp_.get());
1981  FDEBUG(1) << "\tprepareForNextLoop\n";
1982  }
1983 
1984  bool EventProcessor::shouldWeCloseOutput() const {
1985  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1986  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1987  }
1988 
1989  void EventProcessor::doErrorStuff() {
1990  FDEBUG(1) << "\tdoErrorStuff\n";
1991  LogError("StateMachine")
1992  << "The EventProcessor state machine encountered an unexpected event\n"
1993  << "and went to the error state\n"
1994  << "Will attempt to terminate processing normally\n"
1995  << "(IF using the looper the next loop will be attempted)\n"
1996  << "This likely indicates a bug in an input module or corrupted input or both\n";
1997  stateMachineWasInErrorState_ = true;
1998  }
1999 
2000  void EventProcessor::beginRun(statemachine::Run const& run) {
2001  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2002  input_->doBeginRun(runPrincipal);
2003  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
2004  runPrincipal.beginTime());
2005  if(forceESCacheClearOnNewRun_){
2006  espController_->forceCacheClear();
2007  }
2008  espController_->eventSetupForInstance(ts);
2009  EventSetup const& es = esp_->eventSetup();
2010  if(looper_ && looperBeginJobRun_== false) {
2011  looper_->copyInfo(ScheduleInfo(schedule_.get()));
2012  looper_->beginOfJob(es);
2013  looperBeginJobRun_ = true;
2014  looper_->doStartingNewLoop();
2015  }
2016  {
2018  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2019  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
2020  if(hasSubProcess()) {
2021  subProcess_->doBeginRun(runPrincipal, ts);
2022  }
2023  }
2024  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
2025  if(looper_) {
2026  looper_->doBeginRun(runPrincipal, es);
2027  }
2028  }
2029 
2030  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
2031  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2032  input_->doEndRun(runPrincipal, cleaningUpAfterException);
2033  IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
2034  runPrincipal.endTime());
2035  espController_->eventSetupForInstance(ts);
2036  EventSetup const& es = esp_->eventSetup();
2037  {
2039  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2040  schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2041  if(hasSubProcess()) {
2042  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2043  }
2044  }
2045  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
2046  if(looper_) {
2047  looper_->doEndRun(runPrincipal, es);
2048  }
2049  }
2050 
2051  void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
2052  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2053  input_->doBeginLumi(lumiPrincipal);
2054 
2056  if(rng.isAvailable()) {
2057  LuminosityBlock lb(lumiPrincipal, ModuleDescription());
2058  rng->preBeginLumi(lb);
2059  }
2060 
2061  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
2062  // lumi blocks know their start and end times why not also start and end events?
2063  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
2064  espController_->eventSetupForInstance(ts);
2065  EventSetup const& es = esp_->eventSetup();
2066  {
2068  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2069  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2070  if(hasSubProcess()) {
2071  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2072  }
2073  }
2074  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
2075  if(looper_) {
2076  looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2077  }
2078  }
2079 
2080  void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi, bool cleaningUpAfterException) {
2081  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2082  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2083  //NOTE: Using the max event number for the end of a lumi block is a bad idea
2084  // lumi blocks know their start and end times why not also start and end events?
2085  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
2086  lumiPrincipal.endTime());
2087  espController_->eventSetupForInstance(ts);
2088  EventSetup const& es = esp_->eventSetup();
2089  {
2091  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2092  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2093  if(hasSubProcess()) {
2094  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2095  }
2096  }
2097  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
2098  if(looper_) {
2099  looper_->doEndLuminosityBlock(lumiPrincipal, es);
2100  }
2101  }
2102 
2103  statemachine::Run EventProcessor::readAndCacheRun(bool merge) {
2104  input_->readAndCacheRun(merge, *historyAppender_);
2105  input_->markRun();
2106  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2107  }
2108 
2109  int EventProcessor::readAndCacheLumi(bool merge) {
2110  input_->readAndCacheLumi(merge, *historyAppender_);
2111  input_->markLumi();
2112  return input_->luminosityBlock();
2113  }
2114 
2115  void EventProcessor::writeRun(statemachine::Run const& run) {
2116  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
2117  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
2118  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
2119  }
2120 
2121  void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
2122  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
2123  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
2124  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
2125  }
2126 
2127  void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
2128  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2129  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2130  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
2131  }
2132 
2133  void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
2134  principalCache_.deleteLumi(phid, run, lumi);
2135  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2136  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
2137  }
2138 
2139  void EventProcessor::readAndProcessEvent() {
2140  EventPrincipal *pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
2141  FDEBUG(1) << "\treadEvent\n";
2142  assert(pep != 0);
2143 
2144  IOVSyncValue ts(pep->id(), pep->time());
2145  espController_->eventSetupForInstance(ts);
2146  EventSetup const& es = esp_->eventSetup();
2147  {
2149  ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2150  schedule_->processOneOccurrence<Traits>(*pep, es);
2151  if(hasSubProcess()) {
2152  subProcess_->doEvent(*pep, ts);
2153  }
2154  }
2155 
2156  if(looper_) {
2157  bool randomAccess = input_->randomAccess();
2158  ProcessingController::ForwardState forwardState = input_->forwardState();
2159  ProcessingController::ReverseState reverseState = input_->reverseState();
2160  ProcessingController pc(forwardState, reverseState, randomAccess);
2161 
2162  EDLooperBase::Status status = EDLooperBase::kContinue;
2163  do {
2164  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2165 
2166  bool succeeded = true;
2167  if(randomAccess) {
2168  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2169  input_->skipEvents(-2);
2170  }
2171  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2172  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2173  }
2174  }
2175  pc.setLastOperationSucceeded(succeeded);
2176  } while(!pc.lastOperationSucceeded());
2177  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2178 
2179  }
2180 
2181  FDEBUG(1) << "\tprocessEvent\n";
2182  pep->clearEventPrincipal();
2183  }
2184 
2185  bool EventProcessor::shouldWeStop() const {
2186  FDEBUG(1) << "\tshouldWeStop\n";
2187  if(shouldWeStop_) return true;
2188  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2189  }
2190 
2191  void EventProcessor::setExceptionMessageFiles(std::string& message) {
2192  exceptionMessageFiles_ = message;
2193  }
2194 
2195  void EventProcessor::setExceptionMessageRuns(std::string& message) {
2196  exceptionMessageRuns_ = message;
2197  }
2198 
2199  void EventProcessor::setExceptionMessageLumis(std::string& message) {
2200  exceptionMessageLumis_ = message;
2201  }
2202 
2203  bool EventProcessor::alreadyHandlingException() const {
2204  return alreadyHandlingException_;
2205  }
2206 
2207  void EventProcessor::terminateMachine() {
2208  if(machine_.get() != 0) {
2209  if(!machine_->terminated()) {
2210  forceLooperToEnd_ = true;
2211  machine_->process_event(statemachine::Stop());
2212  forceLooperToEnd_ = false;
2213  }
2214  else {
2215  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2216  }
2217  if(machine_->terminated()) {
2218  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2219  }
2220  machine_.reset();
2221  }
2222  }
2223 }
list table
Definition: asciidump.py:386
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
T getParameter(std::string const &) const
boost::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
void fillRegisteredDataKeys(std::vector< DataKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all registered data keys ...
volatile bool id_set_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:235
StatusCode skip(int numberToSkip)
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:361
ActivityRegistry::PostProcessEvent postProcessEventSignal_
virtual std::string explainSelf() const
Definition: Exception.cc:146
ParameterSetID id() const
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
Timestamp const & endTime() const
int getSigNum()
EventID const & id() const
void call(boost::function< void(void)>)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
def pipe
Definition: pipe.py:5
static ThreadSafeRegistry * instance()
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
virtual StatusCode runEventCount(int numberOfEventsToProcess)
void readFile()
Definition: HZZ4LRooPdfs.cc:39
void installCustomHandler(int signum, CFUNC func)
RunNumber_t run() const
Definition: RunPrincipal.h:46
std::set< std::pair< std::string, std::string > > ExcludedData
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< ProcessConfiguration > processConfiguration_
Definition: ScheduleItems.h:51
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void beginJob()
Definition: Breakpoints.cc:15
const eventsetup::EventSetupRecord * find(const eventsetup::EventSetupRecordKey &) const
Definition: EventSetup.cc:90
std::auto_ptr< Schedule > initSchedule(ParameterSet &parameterSet, ParameterSet const *subProcessPSet)
Timestamp const & time() const
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
TransEntry table[]
ServiceToken serviceToken_
boost::mutex stop_lock_
boost::shared_ptr< ActionTable const > act_table_
Definition: ScheduleItems.h:50
DataProxy const * find(DataKey const &aKey) const
LuminosityBlockNumber_t luminosityBlock() const
ParameterSet const & getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:54
void stdToEDM(std::exception const &e)
int runNumber() const
Definition: EPStates.h:48
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
void fillAvailableRecordKeys(std::vector< eventsetup::EventSetupRecordKey > &oToFill) const
clears the oToFill vector and then fills it with the keys for all available records ...
Definition: EventSetup.cc:101
bool isAvailable() const
Definition: Service.h:47
static InputSourceFactory * get()
volatile event_processor::State state_
Timestamp const & endTime() const
Definition: RunPrincipal.h:58
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
boost::shared_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, PrincipalCache &pCache, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration)
boost::shared_ptr< ProcessConfiguration > processConfiguration_
virtual void endOfJob()
Definition: EDLooperBase.cc:74
void connectSigs(EventProcessor *ep)
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
volatile pthread_t event_loop_id_
volatile Status last_rc_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
author Stefano ARGIRO author Bill Tanenbaum
double a
Definition: hdecay.h:121
boost::shared_ptr< ActionTable const > act_table_
edm::ProcessHistoryID const & processHistoryID() const
Definition: EPStates.h:47
volatile bool shutdown_flag
std::auto_ptr< SubProcess > subProcess_
unsigned int RunNumber_t
Definition: EventRange.h:32
#define O_NONBLOCK
Definition: SysFile.h:21
boost::shared_ptr< SignallingProductRegistry > preg_
Definition: ScheduleItems.h:49
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::auto_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
ActivityRegistry::PreProcessEvent preProcessEventSignal_
collection_type & data()
Provide access to the contained collection.
bool doGet(DataKey const &aKey, bool aGetTransiently=false) const
returns false if no data available for key
boost::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:48
tuple status
Definition: ntuplemaker.py:245
void validate(ParameterSet &pset, std::string const &moduleLabel) const
ParameterSet const & registerIt()
bool insert(Storage &, ItemType *, const IdTag &)
static ComponentFactory< T > * get()
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:56
boost::mutex usr2_lock
PrincipalCache principalCache_
bool hasSubProcess() const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)