CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/FWCore/Framework/src/EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
00010 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00011 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00012 
00013 #include "FWCore/Framework/interface/CommonParams.h"
00014 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00015 #include "FWCore/Framework/interface/EDLooperBase.h"
00016 #include "FWCore/Framework/interface/EventPrincipal.h"
00017 #include "FWCore/Framework/interface/EventSetupProvider.h"
00018 #include "FWCore/Framework/interface/EventSetupRecord.h"
00019 #include "FWCore/Framework/src/EventSetupsController.h"
00020 #include "FWCore/Framework/interface/FileBlock.h"
00021 #include "FWCore/Framework/interface/InputSourceDescription.h"
00022 #include "FWCore/Framework/interface/IOVSyncValue.h"
00023 #include "FWCore/Framework/interface/LooperFactory.h"
00024 #include "FWCore/Framework/interface/LuminosityBlock.h"
00025 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00026 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00027 #include "FWCore/Framework/interface/ModuleChanger.h"
00028 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00029 #include "FWCore/Framework/interface/ProcessingController.h"
00030 #include "FWCore/Framework/interface/RunPrincipal.h"
00031 #include "FWCore/Framework/interface/Schedule.h"
00032 #include "FWCore/Framework/interface/ScheduleInfo.h"
00033 #include "FWCore/Framework/interface/TriggerNamesService.h"
00034 #include "FWCore/Framework/src/Breakpoints.h"
00035 #include "FWCore/Framework/src/EPStates.h"
00036 #include "FWCore/Framework/src/InputSourceFactory.h"
00037 
00038 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00039 
00040 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046 
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049 
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/GetPassID.h"
00053 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00054 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00055 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00056 #include "FWCore/Version/interface/GetReleaseVersion.h"
00057 
00058 #include "boost/bind.hpp"
00059 #include "boost/thread/xtime.hpp"
00060 
00061 #include <exception>
00062 #include <iomanip>
00063 #include <iostream>
00064 #include <utility>
00065 
00066 #include <sys/ipc.h>
00067 #include <sys/msg.h>
00068 
00069 //Used for forking
00070 #include <sys/types.h>
00071 #include <sys/wait.h>
00072 #include <unistd.h>
00073 
00074 //Used for CPU affinity
00075 #ifndef __APPLE__
00076 #include <sched.h>
00077 #endif
00078 
00079 namespace edm {
00080 
00081   namespace event_processor {
00082 
00083     class StateSentry {
00084     public:
00085       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00086       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00087       void succeeded() {success_ = true;}
00088 
00089     private:
00090       EventProcessor* ep_;
00091       bool success_;
00092     };
00093   }
00094 
00095   using namespace event_processor;
00096 
00097   namespace {
00098     template <typename T>
00099     class ScheduleSignalSentry {
00100     public:
00101       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00102            a_(a), principal_(principal), es_(es) {
00103         if (a_) T::preScheduleSignal(a_, principal_);
00104       }
00105       ~ScheduleSignalSentry() {
00106         if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00107       }
00108 
00109     private:
00110       // We own none of these resources.
00111       ActivityRegistry* a_;
00112       typename T::MyPrincipal* principal_;
00113       EventSetup const* es_;
00114     };
00115   }
00116 
00117   namespace {
00118 
00119     // the next two tables must be kept in sync with the state and
00120     // message enums from the header
00121 
00122     char const* stateNames[] = {
00123       "Init",
00124       "JobReady",
00125       "RunGiven",
00126       "Running",
00127       "Stopping",
00128       "ShuttingDown",
00129       "Done",
00130       "JobEnded",
00131       "Error",
00132       "ErrorEnded",
00133       "End",
00134       "Invalid"
00135     };
00136 
00137     char const* msgNames[] = {
00138       "SetRun",
00139       "Skip",
00140       "RunAsync",
00141       "Run(ID)",
00142       "Run(count)",
00143       "BeginJob",
00144       "StopAsync",
00145       "ShutdownAsync",
00146       "EndJob",
00147       "CountComplete",
00148       "InputExhausted",
00149       "StopSignal",
00150       "ShutdownSignal",
00151       "Finished",
00152       "Any",
00153       "dtor",
00154       "Exception",
00155       "Rewind"
00156     };
00157   }
00158     // IMPORTANT NOTE:
00159     // the mAny messages are special, they must appear last in the
00160     // table if multiple entries for a CurrentState are present.
00161     // the changeState function does not use the mAny yet!!!
00162 
00163     struct TransEntry {
00164       State current;
00165       Msg   message;
00166       State final;
00167     };
00168 
00169     // we should use this information to initialize a two dimensional
00170     // table of t[CurrentState][Message] = FinalState
00171 
00172     /*
00173       the way this is current written, the async run can thread function
00174       can return in the "JobReady" state - but not yet cleaned up.  The
00175       problem is that only when stop/shutdown async is called is the
00176       thread cleaned up. But the stop/shudown async functions attempt
00177       first to change the state using messages that are not valid in
00178       "JobReady" state.
00179 
00180       I think most of the problems can be solved by using two states
00181       for "running": RunningS and RunningA (sync/async). The problems
00182       seems to be the all the transitions out of running for both
00183       modes of operation.  The other solution might be to only go to
00184       "Stopping" from Running, and use the return code from "run_p" to
00185       set the final state.  If this is used, then in the sync mode the
00186       "Stopping" state would be momentary.
00187 
00188      */
00189 
00190     TransEntry table[] = {
00191     // CurrentState   Message         FinalState
00192     // -----------------------------------------
00193       { sInit,          mException,      sError },
00194       { sInit,          mBeginJob,       sJobReady },
00195       { sJobReady,      mException,      sError },
00196       { sJobReady,      mSetRun,         sRunGiven },
00197       { sJobReady,      mInputRewind,    sRunning },
00198       { sJobReady,      mSkip,           sRunning },
00199       { sJobReady,      mRunID,          sRunning },
00200       { sJobReady,      mRunCount,       sRunning },
00201       { sJobReady,      mEndJob,         sJobEnded },
00202       { sJobReady,      mBeginJob,       sJobReady },
00203       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00204 
00205       { sJobReady,      mStopAsync,      sJobReady },
00206       { sJobReady,      mCountComplete,  sJobReady },
00207       { sJobReady,      mFinished,       sJobReady },
00208 
00209       { sRunGiven,      mException,      sError },
00210       { sRunGiven,      mRunAsync,       sRunning },
00211       { sRunGiven,      mBeginJob,       sRunGiven },
00212       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00213       { sRunGiven,      mStopAsync,      sStopping },
00214       { sRunning,       mException,      sError },
00215       { sRunning,       mStopAsync,      sStopping },
00216       { sRunning,       mShutdownAsync,  sShuttingDown },
00217       { sRunning,       mShutdownSignal, sShuttingDown },
00218       { sRunning,       mCountComplete,  sStopping }, // sJobReady
00219       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00220 
00221       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00222       { sStopping,      mException,      sError },
00223       { sStopping,      mFinished,       sJobReady },
00224       { sStopping,      mCountComplete,  sJobReady },
00225       { sStopping,      mShutdownSignal, sShuttingDown },
00226       { sStopping,      mStopAsync,      sStopping },     // stay
00227       { sStopping,      mInputExhausted, sStopping },     // stay
00228       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00229       { sShuttingDown,  mException,      sError },
00230       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00231       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00232       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00233       { sShuttingDown,  mFinished,       sDone },
00234       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00235       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00236       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00237       { sDone,          mEndJob,         sJobEnded },
00238       { sDone,          mException,      sError },
00239       { sJobEnded,      mDtor,           sEnd },
00240       { sJobEnded,      mException,      sError },
00241       { sError,         mEndJob,         sError },   // funny one here
00242       { sError,         mDtor,           sError },   // funny one here
00243       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00244       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00245       { sInvalid,       mAny,            sInvalid }
00246     };
00247 
00248 
00249     // Note: many of the messages generate the mBeginJob message first
00250     //  mRunID, mRunCount, mSetRun
00251 
00252   // ---------------------------------------------------------------
00253   boost::shared_ptr<InputSource>
00254   makeInput(ParameterSet& params,
00255             CommonParams const& common,
00256             ProductRegistry& preg,
00257             PrincipalCache& pCache,
00258             boost::shared_ptr<ActivityRegistry> areg,
00259             boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00260     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00261     if(main_input == 0) {
00262       throw Exception(errors::Configuration, "FailedInputSource")
00263         << "Configuration of primary input source has failed\n";
00264     }
00265 
00266     std::string modtype;
00267     try {
00268       modtype = main_input->getParameter<std::string>("@module_type");
00269       std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00270         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00271       ConfigurationDescriptions descriptions(filler->baseType());
00272       filler->fill(descriptions);
00273       descriptions.validate(*main_input, std::string("source"));
00274     }
00275     catch (cms::Exception& iException) {
00276       Exception toThrow(errors::Configuration, "Failed validating primary input source configuration.");
00277       toThrow << "\nSource plugin name is \"" << modtype << "\"\n";
00278       toThrow.append(iException);
00279       throw toThrow;
00280     }
00281 
00282     main_input->registerIt();
00283 
00284     // Fill in "ModuleDescription", in case the input source produces
00285     // any EDproducts, which would be registered in the ProductRegistry.
00286     // Also fill in the process history item for this process.
00287     // There is no module label for the unnamed input source, so
00288     // just use "source".
00289     // Only the tracked parameters belong in the process configuration.
00290     ModuleDescription md(main_input->id(),
00291                          main_input->getParameter<std::string>("@module_type"),
00292                          "source",
00293                          processConfiguration);
00294 
00295     InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00296     areg->preSourceConstructionSignal_(md);
00297     boost::shared_ptr<InputSource> input;
00298     try {
00299       input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00300       areg->postSourceConstructionSignal_(md);
00301     }
00302     catch (Exception& iException) {
00303       areg->postSourceConstructionSignal_(md);
00304       //we want to keep the same category code so that cmsRun will return the proper return value
00305       Exception toThrow(iException.categoryCode(), "Error occured while constructing primary input source.");
00306       toThrow << "\nSource is of type \"" << modtype << "\"\n";
00307       toThrow.append(iException);
00308       throw toThrow;
00309     }
00310     catch (cms::Exception& iException) {
00311       areg->postSourceConstructionSignal_(md);
00312       Exception toThrow(errors::Configuration, "Error occured while constructing primary input source.");
00313       toThrow << "\nSource is of type \"" << modtype << "\"\n";
00314       toThrow.append(iException);
00315       throw toThrow;
00316     }
00317 
00318     return input;
00319   }
00320 
00321   // ---------------------------------------------------------------
00322   boost::shared_ptr<EDLooperBase>
00323   fillLooper(eventsetup::EventSetupProvider& cp,
00324                          ParameterSet& params,
00325                          CommonParams const& common) {
00326     boost::shared_ptr<EDLooperBase> vLooper;
00327 
00328     std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00329 
00330     if(loopers.size() == 0) {
00331        return vLooper;
00332     }
00333 
00334     assert(1 == loopers.size());
00335 
00336     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00337         itName != itNameEnd;
00338         ++itName) {
00339 
00340       ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00341       providerPSet->registerIt();
00342       vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00343                                                         *providerPSet,
00344                                                         common.processName_,
00345                                                         common.releaseVersion_,
00346                                                         common.passID_);
00347       }
00348       return vLooper;
00349 
00350   }
00351 
00352   // ---------------------------------------------------------------
00353   EventProcessor::EventProcessor(std::string const& config,
00354                                 ServiceToken const& iToken,
00355                                 serviceregistry::ServiceLegacy iLegacy,
00356                                 std::vector<std::string> const& defaultServices,
00357                                 std::vector<std::string> const& forcedServices) :
00358     preProcessEventSignal_(),
00359     postProcessEventSignal_(),
00360     actReg_(new ActivityRegistry),
00361     preg_(new SignallingProductRegistry),
00362     serviceToken_(),
00363     input_(),
00364     esp_(),
00365     act_table_(),
00366     processConfiguration_(),
00367     schedule_(),
00368     state_(sInit),
00369     event_loop_(),
00370     state_lock_(),
00371     stop_lock_(),
00372     stopper_(),
00373     stop_count_(-1),
00374     last_rc_(epSuccess),
00375     last_error_text_(),
00376     id_set_(false),
00377     event_loop_id_(),
00378     my_sig_num_(getSigNum()),
00379     fb_(),
00380     looper_(),
00381     shouldWeStop_(false),
00382     stateMachineWasInErrorState_(false),
00383     alreadyHandlingException_(false),
00384     forceLooperToEnd_(false),
00385     looperBeginJobRun_(false),
00386     forceESCacheClearOnNewRun_(false),
00387     numberOfForkedChildren_(0),
00388     numberOfSequentialEventsPerChild_(1),
00389     setCpuAffinity_(false) {
00390     boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00391     processDesc->addServices(defaultServices, forcedServices);
00392     init(processDesc, iToken, iLegacy);
00393   }
00394 
00395   EventProcessor::EventProcessor(std::string const& config,
00396                                 std::vector<std::string> const& defaultServices,
00397                                 std::vector<std::string> const& forcedServices) :
00398     preProcessEventSignal_(),
00399     postProcessEventSignal_(),
00400     actReg_(new ActivityRegistry),
00401     preg_(new SignallingProductRegistry),
00402     serviceToken_(),
00403     input_(),
00404     esp_(),
00405     act_table_(),
00406     processConfiguration_(),
00407     schedule_(),
00408     state_(sInit),
00409     event_loop_(),
00410     state_lock_(),
00411     stop_lock_(),
00412     stopper_(),
00413     stop_count_(-1),
00414     last_rc_(epSuccess),
00415     last_error_text_(),
00416     id_set_(false),
00417     event_loop_id_(),
00418     my_sig_num_(getSigNum()),
00419     fb_(),
00420     looper_(),
00421     shouldWeStop_(false),
00422     stateMachineWasInErrorState_(false),
00423     alreadyHandlingException_(false),
00424     forceLooperToEnd_(false),
00425     looperBeginJobRun_(false),
00426     forceESCacheClearOnNewRun_(false),
00427     numberOfForkedChildren_(0),
00428     numberOfSequentialEventsPerChild_(1),
00429     setCpuAffinity_(false) {
00430     boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00431     processDesc->addServices(defaultServices, forcedServices);
00432     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00433   }
00434 
00435   EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00436                  ServiceToken const& token,
00437                  serviceregistry::ServiceLegacy legacy) :
00438     preProcessEventSignal_(),
00439     postProcessEventSignal_(),
00440     actReg_(new ActivityRegistry),
00441     preg_(new SignallingProductRegistry),
00442     serviceToken_(),
00443     input_(),
00444     esp_(),
00445     act_table_(),
00446     processConfiguration_(),
00447     schedule_(),
00448     state_(sInit),
00449     event_loop_(),
00450     state_lock_(),
00451     stop_lock_(),
00452     stopper_(),
00453     stop_count_(-1),
00454     last_rc_(epSuccess),
00455     last_error_text_(),
00456     id_set_(false),
00457     event_loop_id_(),
00458     my_sig_num_(getSigNum()),
00459     fb_(),
00460     looper_(),
00461     shouldWeStop_(false),
00462     stateMachineWasInErrorState_(false),
00463     alreadyHandlingException_(false),
00464     forceLooperToEnd_(false),
00465     looperBeginJobRun_(false),
00466     forceESCacheClearOnNewRun_(false)
00467   {
00468     init(processDesc, token, legacy);
00469   }
00470 
00471 
00472   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00473     preProcessEventSignal_(),
00474     postProcessEventSignal_(),
00475     actReg_(new ActivityRegistry),
00476     preg_(new SignallingProductRegistry),
00477     serviceToken_(),
00478     input_(),
00479     esp_(),
00480     act_table_(),
00481     processConfiguration_(),
00482     schedule_(),
00483     state_(sInit),
00484     event_loop_(),
00485     state_lock_(),
00486     stop_lock_(),
00487     stopper_(),
00488     stop_count_(-1),
00489     last_rc_(epSuccess),
00490     last_error_text_(),
00491     id_set_(false),
00492     event_loop_id_(),
00493     my_sig_num_(getSigNum()),
00494     fb_(),
00495     looper_(),
00496     shouldWeStop_(false),
00497     stateMachineWasInErrorState_(false),
00498     alreadyHandlingException_(false),
00499     forceLooperToEnd_(false),
00500     looperBeginJobRun_(false),
00501     forceESCacheClearOnNewRun_(false)
00502   {
00503     if(isPython) {
00504       boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00505       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00506     }
00507     else {
00508       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00509       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00510     }
00511   }
00512 
00513   void
00514   EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00515                         ServiceToken const& iToken,
00516                         serviceregistry::ServiceLegacy iLegacy) {
00517 
00518     // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
00519     //  They must be cleared here because some processes run multiple EventProcessors in succession.
00520     BranchIDListHelper::clearRegistries();
00521 
00522     boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00523 
00524     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00525     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00526     emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00527     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00528     ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00529     numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00530     numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00531     setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00532     std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00533     for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00534         itPS != itPSEnd;
00535         ++itPS) {
00536       eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00537                                                 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00538                                                                itPS->getUntrackedParameter<std::string>("label", "")));
00539     }
00540 
00541     boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00542     //makeParameterSets(config, parameterSet, pServiceSets);
00543 
00544     //create the services
00545     ServiceToken tempToken(ServiceRegistry::createSet(*pServiceSets, iToken, iLegacy));
00546 
00547     //see if any of the Services have to have their PSets stored
00548     for(std::vector<ParameterSet>::const_iterator it = pServiceSets->begin(), itEnd = pServiceSets->end();
00549         it != itEnd;
00550         ++it) {
00551       if(it->exists("@save_config")) {
00552         parameterSet->addParameter(it->getParameter<std::string>("@service_type"), *it);
00553       }
00554     }
00555     // Copy slots that hold all the registered callback functions like
00556     // PostBeginJob into an ActivityRegistry that is owned by EventProcessor
00557     tempToken.copySlotsTo(*actReg_);
00558 
00559     //add the ProductRegistry as a service ONLY for the construction phase
00560     typedef serviceregistry::ServiceWrapper<ConstProductRegistry> w_CPR;
00561     boost::shared_ptr<w_CPR>
00562       reg(new w_CPR(std::auto_ptr<ConstProductRegistry>(new ConstProductRegistry(*preg_))));
00563     ServiceToken tempToken2(ServiceRegistry::createContaining(reg,
00564                                                               tempToken,
00565                                                               serviceregistry::kOverlapIsError));
00566 
00567     // the next thing is ugly: pull out the trigger path pset and
00568     // create a service and extra token for it
00569     std::string processName = parameterSet->getParameter<std::string>("@process_name");
00570 
00571     typedef service::TriggerNamesService TNS;
00572     typedef serviceregistry::ServiceWrapper<TNS> w_TNS;
00573 
00574     boost::shared_ptr<w_TNS> tnsptr
00575       (new w_TNS(std::auto_ptr<TNS>(new TNS(*parameterSet))));
00576 
00577     serviceToken_ = ServiceRegistry::createContaining(tnsptr,
00578                                                       tempToken2,
00579                                                       serviceregistry::kOverlapIsError);
00580 
00581     //make the services available
00582     ServiceRegistry::Operate operate(serviceToken_);
00583 
00584     act_table_.reset(new ActionTable(*parameterSet));
00585     CommonParams common = CommonParams(processName,
00586                            getReleaseVersion(),
00587                            getPassID(),
00588                            parameterSet->getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1),
00589                            parameterSet->getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1));
00590 
00591     std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00592     esp_ = espController->makeProvider(*parameterSet, common);
00593     processConfiguration_.reset(new ProcessConfiguration(processName, getReleaseVersion(), getPassID()));
00594 
00595     looper_ = fillLooper(*esp_, *parameterSet, common);
00596     if(looper_) {
00597       looper_->setActionTable(act_table_.get());
00598       looper_->attachTo(*actReg_);
00599     }
00600 
00601     input_ = makeInput(*parameterSet, common, *preg_, principalCache_, actReg_, processConfiguration_);
00602 
00603     schedule_ = std::auto_ptr<Schedule>
00604       (new Schedule(*parameterSet,
00605                     ServiceRegistry::instance().get<TNS>(),
00606                     *preg_,
00607                     *act_table_,
00608                     actReg_,
00609                     processConfiguration_));
00610 
00611     //   initialize(iToken, iLegacy);
00612     FDEBUG(2) << parameterSet << std::endl;
00613     connectSigs(this);
00614   }
00615 
00616   EventProcessor::~EventProcessor() {
00617     // Make the services available while everything is being deleted.
00618     ServiceToken token = getToken();
00619     ServiceRegistry::Operate op(token);
00620 
00621     // The state machine should have already been cleaned up
00622     // and destroyed at this point by a call to EndJob or
00623     // earlier when it completed processing events, but if it
00624     // has not been we'll take care of it here at the last moment.
00625     // This could cause problems if we are already handling an
00626     // exception and another one is thrown here ...  For a critical
00627     // executable the solution to this problem is for the code using
00628     // the EventProcessor to explicitly call EndJob or use runToCompletion,
00629     // then the next line of code is never executed.
00630     terminateMachine();
00631 
00632     try {
00633       changeState(mDtor);
00634     }
00635     catch(cms::Exception& e) {
00636       LogError("System")
00637         << e.explainSelf() << "\n";
00638     }
00639 
00640     // manually destroy all these thing that may need the services around
00641     esp_.reset();
00642     schedule_.reset();
00643     input_.reset();
00644     looper_.reset();
00645     actReg_.reset();
00646 
00647     pset::Registry* psetRegistry = pset::Registry::instance();
00648     psetRegistry->data().clear();
00649     psetRegistry->extra().setID(ParameterSetID());
00650 
00651     EntryDescriptionRegistry::instance()->data().clear();
00652     ParentageRegistry::instance()->data().clear();
00653     ProcessConfigurationRegistry::instance()->data().clear();
00654     ProcessHistoryRegistry::instance()->data().clear();
00655     BranchIDListHelper::clearRegistries();
00656   }
00657 
00658   void
00659   EventProcessor::rewind() {
00660     beginJob(); //make sure this was called
00661     changeState(mStopAsync);
00662     changeState(mInputRewind);
00663     {
00664       StateSentry toerror(this);
00665 
00666       //make the services available
00667       ServiceRegistry::Operate operate(serviceToken_);
00668 
00669       {
00670         input_->repeat();
00671         input_->rewind();
00672       }
00673       changeState(mCountComplete);
00674       toerror.succeeded();
00675     }
00676     changeState(mFinished);
00677   }
00678 
00679   EventProcessor::StatusCode
00680   EventProcessor::run(int numberEventsToProcess, bool) {
00681     return runEventCount(numberEventsToProcess);
00682   }
00683 
00684   EventProcessor::StatusCode
00685   EventProcessor::skip(int numberToSkip) {
00686     beginJob(); //make sure this was called
00687     changeState(mSkip);
00688     {
00689       StateSentry toerror(this);
00690 
00691       //make the services available
00692       ServiceRegistry::Operate operate(serviceToken_);
00693 
00694       {
00695         input_->skipEvents(numberToSkip);
00696       }
00697       changeState(mCountComplete);
00698       toerror.succeeded();
00699     }
00700     changeState(mFinished);
00701     return epSuccess;
00702   }
00703 
00704   void
00705   EventProcessor::beginJob() {
00706     if(state_ != sInit) return;
00707     bk::beginJob();
00708     // can only be run if in the initial state
00709     changeState(mBeginJob);
00710 
00711     // StateSentry toerror(this); // should we add this ?
00712     //make the services available
00713     ServiceRegistry::Operate operate(serviceToken_);
00714 
00715     //NOTE:  This implementation assumes 'Job' means one call
00716     // the EventProcessor::run
00717     // If it really means once per 'application' then this code will
00718     // have to be changed.
00719     // Also have to deal with case where have 'run' then new Module
00720     // added and do 'run'
00721     // again.  In that case the newly added Module needs its 'beginJob'
00722     // to be called.
00723 
00724     //NOTE: in future we should have a beginOfJob for looper which takes no arguments
00725     //  For now we delay calling beginOfJob until first beginOfRun
00726     //if(looper_) {
00727     //   looper_->beginOfJob(es);
00728     //}
00729     try {
00730       input_->doBeginJob();
00731     } catch(cms::Exception& e) {
00732       LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00733       e << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00734       throw;
00735     } catch(std::exception& e) {
00736       LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n";
00737       throw;
00738     } catch(...) {
00739       LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n";
00740       throw;
00741     }
00742 
00743     schedule_->beginJob();
00744     actReg_->postBeginJobSignal_();
00745     // toerror.succeeded(); // should we add this?
00746   }
00747 
00748   void
00749   EventProcessor::endJob() {
00750     // Collects exceptions, so we don't throw before all operations are performed.
00751     ExceptionCollector c;
00752 
00753     // only allowed to run if state is sIdle, sJobReady, sRunGiven
00754     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00755 
00756     //make the services available
00757     ServiceRegistry::Operate operate(serviceToken_);
00758 
00759     c.call(boost::bind(&EventProcessor::terminateMachine, this));
00760     c.call(boost::bind(&Schedule::endJob, schedule_.get()));
00761     c.call(boost::bind(&InputSource::doEndJob, input_));
00762     if(looper_) {
00763       c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00764     }
00765     c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00766     if(c.hasThrown()) {
00767       c.rethrow();
00768     }
00769   }
00770 
00771   ServiceToken
00772   EventProcessor::getToken() {
00773     return serviceToken_;
00774   }
00775 
00776   //Setup signal handler to listen for when forked children stop
00777   namespace {
00778     volatile bool child_failed = false;
00779     volatile unsigned int num_children_done = 0;
00780     volatile int child_fail_exit_status = 0;
00781 
00782     extern "C" {
00783       void ep_sigchld(int, siginfo_t*, void*) {
00784         //printf("in sigchld\n");
00785         //FDEBUG(1) << "in sigchld handler\n";
00786         int stat_loc;
00787         pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00788         while(0<p) {
00789           //printf("  looping\n");
00790           if(WIFEXITED(stat_loc)) {
00791             ++num_children_done;
00792             if(0 != WEXITSTATUS(stat_loc)) {
00793               child_failed = true;
00794               child_fail_exit_status = WEXITSTATUS(stat_loc);
00795             }
00796           }
00797           if(WIFSIGNALED(stat_loc)) {
00798             ++num_children_done;
00799             child_failed = true;
00800           }
00801           p = waitpid(-1, &stat_loc, WNOHANG);
00802         }
00803       }
00804     }
00805 
00806   }
00807 
00808   enum {
00809     kChildSucceed,
00810     kChildExitBadly,
00811     kChildSegv,
00812     kMaxChildAction
00813   };
00814 
00815   namespace {
00816     unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00817       unsigned int n = 0;
00818       while(numberOfChildren != 0) {
00819         ++n;
00820         numberOfChildren /= 10;
00821       }
00822       if(n == 0) {
00823         n = 3; // Protect against zero numberOfChildren
00824       }
00825       return n;
00826     }
00827 
00828     class MessageSenderToSource {
00829     public:
00830       MessageSenderToSource(int iQueueID, long iNEventsToProcess):
00831       m_queueID(iQueueID),
00832       m_nEventsToProcess(iNEventsToProcess) {}
00833       void operator()() {
00834         std::cout << "I am controller" << std::endl;
00835         //this is the first child and therefore the controller
00836 
00837         multicore::MessageForSource sndmsg;
00838         sndmsg.startIndex = 0;
00839         sndmsg.nIndices = m_nEventsToProcess;
00840         if(m_nEventsToProcess > 0 && m_nEventsToProcess < m_nEventsToProcess) {
00841           sndmsg.nIndices = m_nEventsToProcess;
00842         }
00843         int value = 0;
00844         do {
00845           value = msgsnd(m_queueID, &sndmsg, multicore::MessageForSource::sizeForBuffer(), 0);
00846           //std::cerr << "worker asked for work" << std::endl;
00847           sndmsg.startIndex += sndmsg.nIndices;
00848         } while(value == 0);
00849         if(errno != EIDRM and errno != EINVAL) {
00850           //NOTE: on Linux we sometimes get EINVAL after the queue was removed rather than EIDRM
00851           perror("message queue send failed");
00852         }
00853         return;
00854       }
00855 
00856     private:
00857       int const m_queueID;
00858       long const m_nEventsToProcess;
00859     };
00860   }
00861 
00862   bool
00863   EventProcessor::forkProcess(std::string const& jobReportFile) {
00864 
00865     if(0 == numberOfForkedChildren_) {return true;}
00866     assert(0<numberOfForkedChildren_);
00867     //do what we want done in common
00868     {
00869       beginJob(); //make sure this was run
00870       // make the services available
00871       ServiceRegistry::Operate operate(serviceToken_);
00872 
00873       InputSource::ItemType itemType;
00874       itemType = input_->nextItemType();
00875 
00876       assert(itemType == InputSource::IsFile);
00877       {
00878         readFile();
00879       }
00880       itemType = input_->nextItemType();
00881       assert(itemType == InputSource::IsRun);
00882 
00883       std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl;
00884       IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
00885                       input_->runAuxiliary()->beginTime());
00886       EventSetup const& es = esp_->eventSetupForInstance(ts);
00887 
00888       //now get all the data available in the EventSetup
00889       std::vector<eventsetup::EventSetupRecordKey> recordKeys;
00890       es.fillAvailableRecordKeys(recordKeys);
00891       std::vector<eventsetup::DataKey> dataKeys;
00892       for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
00893           itKey != itEnd;
00894           ++itKey) {
00895         eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
00896         //see if this is on our exclusion list
00897         ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
00898         ExcludedData const* excludedData(0);
00899         if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
00900           excludedData = &(itExcludeRec->second);
00901           if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
00902             //skip all items in this record
00903             continue;
00904           }
00905         }
00906         if(0 != recordPtr) {
00907           dataKeys.clear();
00908           recordPtr->fillRegisteredDataKeys(dataKeys);
00909           for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
00910               itDataKey != itDataKeyEnd;
00911               ++itDataKey) {
00912             //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00913             if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
00914               std::cout << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00915               continue;
00916             }
00917             try {
00918               recordPtr->doGet(*itDataKey);
00919             } catch(cms::Exception& e) {
00920              LogWarning("EventSetupPreFetching") << e.what();
00921             }
00922           }
00923         }
00924       }
00925     }
00926     std::cout << "  done prefetching" << std::endl;
00927 {
00928     // make the services available
00929     ServiceRegistry::Operate operate(serviceToken_);
00930     Service<JobReport> jobReport;
00931     jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
00932 
00933     //Now actually do the forking
00934     actReg_->preForkReleaseResourcesSignal_();
00935     input_->doPreForkReleaseResources();
00936     schedule_->preForkReleaseResources();
00937 }
00938     installCustomHandler(SIGCHLD, ep_sigchld);
00939 
00940     //Create a message queue used to set what events each child processes
00941     int queueID;
00942     if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) {
00943       printf("Error obtaining message queue\n");
00944       exit(EXIT_FAILURE);
00945     }
00946 
00947     unsigned int childIndex = 0;
00948     unsigned int const kMaxChildren = numberOfForkedChildren_;
00949     unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
00950     std::vector<pid_t> childrenIds;
00951     childrenIds.reserve(kMaxChildren);
00952 {
00953     // make the services available
00954     ServiceRegistry::Operate operate(serviceToken_);
00955     Service<JobReport> jobReport;
00956     for(; childIndex < kMaxChildren; ++childIndex) {
00957       pid_t value = fork();
00958       if(value == 0) {
00959         // this is the child process, redirect stdout and stderr to a log file
00960         fflush(stdout);
00961         fflush(stderr);
00962         std::stringstream stout;
00963         stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
00964         if(0 == freopen(stout.str().c_str(), "w", stdout)) {
00965           std::cerr << "Error during freopen of child process "
00966           << childIndex << std::endl;
00967         }
00968         if(dup2(fileno(stdout), fileno(stderr)) < 0) {
00969           std::cerr << "Error during dup2 of child process"
00970           << childIndex << std::endl;
00971         }
00972 
00973         std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl;
00974         if(setCpuAffinity_) {
00975           // CPU affinity is handled differently on macosx.
00976           // We disable it and print a message until someone reads:
00977           //
00978           // https://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
00979           //
00980           // and implements it.
00981 #ifdef __APPLE__
00982           std::cout << "Architecture support for CPU affinity not implemented." << std::endl;
00983 #else
00984           std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl;
00985           cpu_set_t mask;
00986           CPU_ZERO(&mask);
00987           CPU_SET(childIndex, &mask);
00988           if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
00989             std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl;
00990             exit(-1);
00991           }
00992 #endif
00993         }
00994         break;
00995       }
00996       if(value < 0) {
00997         std::cerr << "failed to create a child" << std::endl;
00998         //message queue is a system resource so must be cleaned up
00999         // before parent goes away
01000         msgctl(queueID, IPC_RMID, 0);
01001         exit(-1);
01002       }
01003       childrenIds.push_back(value);
01004     }
01005 
01006     if(childIndex < kMaxChildren) {
01007       jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01008       actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01009 
01010       boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID));
01011       input_->doPostForkReacquireResources(receiver);
01012       schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01013       //NOTE: sources have to reset themselves by listening to the post fork message
01014       //rewindInput();
01015       return true;
01016     }
01017     jobReport->parentAfterFork(jobReportFile);
01018 }
01019 
01020     //this is the original which is now the master for all the children
01021 
01022     //Need to wait for signals from the children or externally
01023     // To wait we must
01024     // 1) block the signals we want to wait on so we do not have a race condition
01025     // 2) check that we haven't already meet our ending criteria
01026     // 3) call sigsuspend which unblocks the signals and waits until a signal is caught
01027     sigset_t blockingSigSet;
01028     sigset_t unblockingSigSet;
01029     sigset_t oldSigSet;
01030     pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01031     pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01032     sigaddset(&blockingSigSet, SIGCHLD);
01033     sigaddset(&blockingSigSet, SIGUSR2);
01034     sigaddset(&blockingSigSet, SIGINT);
01035     sigdelset(&unblockingSigSet, SIGCHLD);
01036     sigdelset(&unblockingSigSet, SIGUSR2);
01037     sigdelset(&unblockingSigSet, SIGINT);
01038     pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01039 
01040     //create a thread which sends the units of work to workers
01041     // we create it after all signals were blocked so that this
01042     // thread is never interupted by a signal
01043     MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_);
01044     boost::thread senderThread(sender);
01045 
01046     while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01047       sigsuspend(&unblockingSigSet);
01048       std::cout << "woke from sigwait" << std::endl;
01049     }
01050     pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01051 
01052     std::cout << "num children who have already stopped " << num_children_done << std::endl;
01053     if(child_failed) {
01054       std::cout << "child failed" << std::endl;
01055     }
01056     if(shutdown_flag) {
01057       std::cout << "asked to shutdown" << std::endl;
01058     }
01059 
01060     if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01061       std::cout << "must stop children" << std::endl;
01062       for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01063           it != itEnd; ++it) {
01064         /* int result = */ kill(*it, SIGUSR2);
01065       }
01066       pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01067       while(num_children_done != kMaxChildren) {
01068         sigsuspend(&unblockingSigSet);
01069       }
01070       pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01071     }
01072     //remove message queue, this will cause the message thread to terminate
01073     // kill it now since all children already stopped
01074     msgctl(queueID, IPC_RMID, 0);
01075     //now wait for the sender thread to finish. This should be quick since
01076     // we killed the message queue
01077     senderThread.join();
01078     if(child_failed) {
01079       throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01080     }
01081     return false;
01082   }
01083 
01084   void
01085   EventProcessor::connectSigs(EventProcessor* ep) {
01086     // When the FwkImpl signals are given, pass them to the
01087     // appropriate EventProcessor signals so that the outside world
01088     // can see the signal.
01089     actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01090     actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01091   }
01092 
01093   std::vector<ModuleDescription const*>
01094   EventProcessor::getAllModuleDescriptions() const {
01095     return schedule_->getAllModuleDescriptions();
01096   }
01097 
01098   int
01099   EventProcessor::totalEvents() const {
01100     return schedule_->totalEvents();
01101   }
01102 
01103   int
01104   EventProcessor::totalEventsPassed() const {
01105     return schedule_->totalEventsPassed();
01106   }
01107 
01108   int
01109   EventProcessor::totalEventsFailed() const {
01110     return schedule_->totalEventsFailed();
01111   }
01112 
01113   void
01114   EventProcessor::enableEndPaths(bool active) {
01115     schedule_->enableEndPaths(active);
01116   }
01117 
01118   bool
01119   EventProcessor::endPathsEnabled() const {
01120     return schedule_->endPathsEnabled();
01121   }
01122 
01123   void
01124   EventProcessor::getTriggerReport(TriggerReport& rep) const {
01125     schedule_->getTriggerReport(rep);
01126   }
01127 
01128   void
01129   EventProcessor::clearCounters() {
01130     schedule_->clearCounters();
01131   }
01132 
01133   char const* EventProcessor::currentStateName() const {
01134     return stateName(getState());
01135   }
01136 
01137   char const* EventProcessor::stateName(State s) const {
01138     return stateNames[s];
01139   }
01140 
01141   char const* EventProcessor::msgName(Msg m) const {
01142     return msgNames[m];
01143   }
01144 
01145   State EventProcessor::getState() const {
01146     return state_;
01147   }
01148 
01149   EventProcessor::StatusCode EventProcessor::statusAsync() const {
01150     // the thread will record exception/error status in the event processor
01151     // for us to look at and report here
01152     return last_rc_;
01153   }
01154 
01155   void
01156   EventProcessor::setRunNumber(RunNumber_t runNumber) {
01157     if(runNumber == 0) {
01158       runNumber = 1;
01159       LogWarning("Invalid Run")
01160         << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01161         << "Run number was set to 1 instead\n";
01162     }
01163 
01164     // inside of beginJob there is a check to see if it has been called before
01165     beginJob();
01166     changeState(mSetRun);
01167 
01168     // interface not correct yet
01169     input_->setRunNumber(runNumber);
01170   }
01171 
01172   void
01173   EventProcessor::declareRunNumber(RunNumber_t runNumber) {
01174     // inside of beginJob there is a check to see if it has been called before
01175     beginJob();
01176     changeState(mSetRun);
01177 
01178     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
01179     //input_->declareRunNumber(runNumber);
01180   }
01181 
01182   EventProcessor::StatusCode
01183   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01184     bool rc = true;
01185     boost::xtime timeout;
01186     boost::xtime_get(&timeout, boost::TIME_UTC);
01187     timeout.sec += timeout_seconds;
01188 
01189     // make sure to include a timeout here so we don't wait forever
01190     // I suspect there are still timing issues with thread startup
01191     // and the setting of the various control variables (stop_count, id_set)
01192     {
01193       boost::mutex::scoped_lock sl(stop_lock_);
01194 
01195       // look here - if runAsync not active, just return the last return code
01196       if(stop_count_ < 0) return last_rc_;
01197 
01198       if(timeout_seconds == 0) {
01199         while(stop_count_ == 0) stopper_.wait(sl);
01200       } else {
01201         while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01202       }
01203 
01204       if(rc == false) {
01205           // timeout occurred
01206           // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
01207           // this is a temporary hack until we get the input source
01208           // upgraded to allow blocking input sources to be unblocked
01209 
01210           // the next line is dangerous and causes all sorts of trouble
01211           if(id_set_) pthread_cancel(event_loop_id_);
01212 
01213           // we will not do anything yet
01214           LogWarning("timeout")
01215             << "An asynchronous request was made to shut down "
01216             << "the event loop "
01217             << "and the event loop did not shutdown after "
01218             << timeout_seconds << " seconds\n";
01219       } else {
01220           event_loop_->join();
01221           event_loop_.reset();
01222           id_set_ = false;
01223           stop_count_ = -1;
01224       }
01225     }
01226     return rc == false ? epTimedOut : last_rc_;
01227   }
01228 
01229   EventProcessor::StatusCode
01230   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01231     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01232     if(rc != epTimedOut) changeState(mCountComplete);
01233     else errorState();
01234     return rc;
01235   }
01236 
01237 
01238   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01239     changeState(mStopAsync);
01240     StatusCode rc = waitForAsyncCompletion(secs);
01241     if(rc != epTimedOut) changeState(mFinished);
01242     else errorState();
01243     return rc;
01244   }
01245 
01246   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01247     changeState(mShutdownAsync);
01248     StatusCode rc = waitForAsyncCompletion(secs);
01249     if(rc != epTimedOut) changeState(mFinished);
01250     else errorState();
01251     return rc;
01252   }
01253 
01254   void EventProcessor::errorState() {
01255     state_ = sError;
01256   }
01257 
01258   // next function irrelevant now
01259   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01260     // make sure to include a timeout here so we don't wait forever
01261     // I suspect there are still timing issues with thread startup
01262     // and the setting of the various control variables (stop_count, id_set)
01263     changeState(m);
01264     return waitForAsyncCompletion(60*2);
01265   }
01266 
01267   void EventProcessor::changeState(Msg msg) {
01268     // most likely need to serialize access to this routine
01269 
01270     boost::mutex::scoped_lock sl(state_lock_);
01271     State curr = state_;
01272     int rc;
01273     // found if(not end of table) and
01274     // (state == table.state && (msg == table.message || msg == any))
01275     for(rc = 0;
01276         table[rc].current != sInvalid &&
01277           (curr != table[rc].current ||
01278            (curr == table[rc].current &&
01279              msg != table[rc].message && table[rc].message != mAny));
01280         ++rc);
01281 
01282     if(table[rc].current == sInvalid)
01283       throw cms::Exception("BadState")
01284         << "A member function of EventProcessor has been called in an"
01285         << " inappropriate order.\n"
01286         << "Bad transition from " << stateName(curr) << " "
01287         << "using message " << msgName(msg) << "\n"
01288         << "No where to go from here.\n";
01289 
01290     FDEBUG(1) << "changeState: current=" << stateName(curr)
01291               << ", message=" << msgName(msg)
01292               << " -> new=" << stateName(table[rc].final) << "\n";
01293 
01294     state_ = table[rc].final;
01295   }
01296 
01297   void EventProcessor::runAsync() {
01298     beginJob();
01299     {
01300       boost::mutex::scoped_lock sl(stop_lock_);
01301       if(id_set_ == true) {
01302           std::string err("runAsync called while async event loop already running\n");
01303           LogError("FwkJob") << err;
01304           throw cms::Exception("BadState") << err;
01305       }
01306 
01307       changeState(mRunAsync);
01308 
01309       stop_count_ = 0;
01310       last_rc_ = epSuccess; // forget the last value!
01311       event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01312       boost::xtime timeout;
01313       boost::xtime_get(&timeout, boost::TIME_UTC);
01314       timeout.sec += 60; // 60 seconds to start!!!!
01315       if(starter_.timed_wait(sl, timeout) == false) {
01316           // yikes - the thread did not start
01317           throw cms::Exception("BadState")
01318             << "Async run thread did not start in 60 seconds\n";
01319       }
01320     }
01321   }
01322 
01323   void EventProcessor::asyncRun(EventProcessor* me) {
01324     // set up signals to allow for interruptions
01325     // ignore all other signals
01326     // make sure no exceptions escape out
01327 
01328     // temporary hack until we modify the input source to allow
01329     // wakeup calls from other threads.  This mimics the solution
01330     // in EventFilter/Processor, which I do not like.
01331     // allowing cancels means that the thread just disappears at
01332     // certain points.  This is bad for C++ stack variables.
01333     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01334     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
01335     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01336     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01337 
01338     {
01339       boost::mutex::scoped_lock(me->stop_lock_);
01340       me->event_loop_id_ = pthread_self();
01341       me->id_set_ = true;
01342       me->starter_.notify_all();
01343     }
01344 
01345     Status rc = epException;
01346     FDEBUG(2) << "asyncRun starting ......................\n";
01347 
01348     try {
01349       bool onlineStateTransitions = true;
01350       rc = me->runToCompletion(onlineStateTransitions);
01351     }
01352     catch (cms::Exception& e) {
01353       LogError("FwkJob") << "cms::Exception caught in "
01354                          << "EventProcessor::asyncRun"
01355                          << "\n"
01356                          << e.explainSelf();
01357       me->last_error_text_ = e.explainSelf();
01358     }
01359     catch (std::exception& e) {
01360       LogError("FwkJob") << "Standard library exception caught in "
01361                          << "EventProcessor::asyncRun"
01362                          << "\n"
01363                          << e.what();
01364       me->last_error_text_ = e.what();
01365     }
01366     catch (...) {
01367       LogError("FwkJob") << "Unknown exception caught in "
01368                          << "EventProcessor::asyncRun"
01369                          << "\n";
01370       me->last_error_text_ = "Unknown exception caught";
01371       rc = epOther;
01372     }
01373 
01374     me->last_rc_ = rc;
01375 
01376     {
01377       // notify anyone waiting for exit that we are doing so now
01378       boost::mutex::scoped_lock sl(me->stop_lock_);
01379       ++me->stop_count_;
01380       me->stopper_.notify_all();
01381     }
01382     FDEBUG(2) << "asyncRun ending ......................\n";
01383   }
01384 
01385 
01386   EventProcessor::StatusCode
01387   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01388 
01389     StateSentry toerror(this);
01390 
01391     int numberOfEventsToProcess = -1;
01392     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01393 
01394     if(machine_.get() != 0) {
01395       throw Exception(errors::LogicError)
01396         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01397         << "Please report this error to the Framework group\n";
01398     }
01399 
01400     toerror.succeeded();
01401 
01402     return returnCode;
01403   }
01404 
01405   EventProcessor::StatusCode
01406   EventProcessor::runEventCount(int numberOfEventsToProcess) {
01407 
01408     StateSentry toerror(this);
01409 
01410     bool onlineStateTransitions = false;
01411     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01412 
01413     toerror.succeeded();
01414 
01415     return returnCode;
01416   }
01417 
01418   EventProcessor::StatusCode
01419   EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01420 
01421     // Reusable event principal
01422     boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
01423     principalCache_.insert(ep);
01424 
01425     beginJob(); //make sure this was called
01426 
01427     if(!onlineStateTransitions) changeState(mRunCount);
01428 
01429     StatusCode returnCode = epSuccess;
01430     stateMachineWasInErrorState_ = false;
01431 
01432     // make the services available
01433     ServiceRegistry::Operate operate(serviceToken_);
01434 
01435     if(machine_.get() == 0) {
01436 
01437       statemachine::FileMode fileMode;
01438       if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01439       else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01440       else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01441       else {
01442          throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01443             << fileMode_ << ".\n"
01444             << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01445       }
01446 
01447       statemachine::EmptyRunLumiMode emptyRunLumiMode;
01448       if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01449       else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01450       else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01451       else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01452       else {
01453          throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01454             << emptyRunLumiMode_ << ".\n"
01455             << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01456       }
01457 
01458       machine_.reset(new statemachine::Machine(this,
01459                                                fileMode,
01460                                                emptyRunLumiMode));
01461 
01462       machine_->initiate();
01463     }
01464 
01465     try {
01466 
01467       InputSource::ItemType itemType;
01468 
01469       int iEvents = 0;
01470 
01471       while(true) {
01472 
01473         itemType = input_->nextItemType();
01474 
01475         FDEBUG(1) << "itemType = " << itemType << "\n";
01476 
01477         // These are used for asynchronous running only and
01478         // and are checking to see if stopAsync or shutdownAsync
01479         // were called from another thread.  In the future, we
01480         // may need to do something better than polling the state.
01481         // With the current code this is the simplest thing and
01482         // it should always work.  If the interaction between
01483         // threads becomes more complex this may cause problems.
01484         if(state_ == sStopping) {
01485           FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01486           forceLooperToEnd_ = true;
01487           machine_->process_event(statemachine::Stop());
01488           forceLooperToEnd_ = false;
01489           break;
01490         }
01491         else if(state_ == sShuttingDown) {
01492           FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01493           forceLooperToEnd_ = true;
01494           machine_->process_event(statemachine::Stop());
01495           forceLooperToEnd_ = false;
01496           break;
01497         }
01498 
01499         // Look for a shutdown signal
01500         {
01501           boost::mutex::scoped_lock sl(usr2_lock);
01502           if(shutdown_flag) {
01503             changeState(mShutdownSignal);
01504             returnCode = epSignal;
01505             forceLooperToEnd_ = true;
01506             machine_->process_event(statemachine::Stop());
01507             forceLooperToEnd_ = false;
01508             break;
01509           }
01510         }
01511 
01512         if(itemType == InputSource::IsStop) {
01513           machine_->process_event(statemachine::Stop());
01514         }
01515         else if(itemType == InputSource::IsFile) {
01516           machine_->process_event(statemachine::File());
01517         }
01518         else if(itemType == InputSource::IsRun) {
01519           machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run()));
01520         }
01521         else if(itemType == InputSource::IsLumi) {
01522           machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01523         }
01524         else if(itemType == InputSource::IsEvent) {
01525           machine_->process_event(statemachine::Event());
01526           ++iEvents;
01527           if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01528             returnCode = epCountComplete;
01529             changeState(mInputExhausted);
01530             FDEBUG(1) << "Event count complete, pausing event loop\n";
01531             break;
01532           }
01533         }
01534         // This should be impossible
01535         else {
01536           throw Exception(errors::LogicError)
01537             << "Unknown next item type passed to EventProcessor\n"
01538             << "Please report this error to the Framework group\n";
01539         }
01540 
01541         if(machine_->terminated()) {
01542           changeState(mInputExhausted);
01543           break;
01544         }
01545       }  // End of loop over state machine events
01546     } // Try block
01547 
01548     // Some comments on exception handling related to the boost state machine:
01549     //
01550     // Some states used in the machine are special because they
01551     // perform actions while the machine is being terminated, actions
01552     // such as close files, call endRun, call endLumi etc ...  Each of these
01553     // states has two functions that perform these actions.  The functions
01554     // are almost identical.  The major difference is that one version
01555     // catches all exceptions and the other lets exceptions pass through.
01556     // The destructor catches them and the other function named "exit" lets
01557     // them pass through.  On a normal termination, boost will always call
01558     // "exit" and then the state destructor.  In our state classes, the
01559     // the destructors do nothing if the exit function already took
01560     // care of things.  Here's the interesting part.  When boost is
01561     // handling an exception the "exit" function is not called (a boost
01562     // feature).
01563     //
01564     // If an exception occurs while the boost machine is in control
01565     // (which usually means inside a process_event call), then
01566     // the boost state machine destroys its states and "terminates" itself.
01567     // This already done before we hit the catch blocks below. In this case
01568     // the call to terminateMachine below only destroys an already
01569     // terminated state machine.  Because exit is not called, the state destructors
01570     // handle cleaning up lumis, runs, and files.  The destructors swallow
01571     // all exceptions and only pass through the exceptions messages which
01572     // are tacked onto the original exception below.
01573     //
01574     // If an exception occurs when the boost state machine is not
01575     // in control (outside the process_event functions), then boost
01576     // cannot destroy its own states.  The terminateMachine function
01577     // below takes care of that.  The flag "alreadyHandlingException"
01578     // is set true so that the state exit functions do nothing (and
01579     // cannot throw more exceptions while handling the first).  Then the
01580     // state destructors take care of this because exit did nothing.
01581     //
01582     // In both cases above, the EventProcessor::endOfLoop function is
01583     // not called because it can throw exceptions.
01584     //
01585     // One tricky aspect of the state machine is that things which can
01586     // throw should not be invoked by the state machine while another
01587     // exception is being handled.
01588     // Another tricky aspect is that it appears to be important to
01589     // terminate the state machine before invoking its destructor.
01590     // We've seen crashes which are not understood when that is not
01591     // done.  Maintainers of this code should be careful about this.
01592 
01593     catch (cms::Exception& e) {
01594       alreadyHandlingException_ = true;
01595       terminateMachine();
01596       alreadyHandlingException_ = false;
01597       e << "cms::Exception caught in EventProcessor and rethrown\n";
01598       e << exceptionMessageLumis_;
01599       e << exceptionMessageRuns_;
01600       e << exceptionMessageFiles_;
01601       throw;
01602     }
01603     catch (std::bad_alloc& e) {
01604       alreadyHandlingException_ = true;
01605       terminateMachine();
01606       alreadyHandlingException_ = false;
01607       throw cms::Exception("std::bad_alloc")
01608         << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n"
01609         << "The job has probably exhausted the virtual memory available to the process.\n"
01610         << exceptionMessageLumis_
01611         << exceptionMessageRuns_
01612         << exceptionMessageFiles_;
01613     }
01614     catch (std::exception& e) {
01615       alreadyHandlingException_ = true;
01616       terminateMachine();
01617       alreadyHandlingException_ = false;
01618       throw cms::Exception("StdException")
01619         << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n"
01620         << "Previous information:\n" << e.what() << "\n"
01621         << exceptionMessageLumis_
01622         << exceptionMessageRuns_
01623         << exceptionMessageFiles_;
01624     }
01625     catch (...) {
01626       alreadyHandlingException_ = true;
01627       terminateMachine();
01628       alreadyHandlingException_ = false;
01629       throw cms::Exception("Unknown")
01630         << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n"
01631         << exceptionMessageLumis_
01632         << exceptionMessageRuns_
01633         << exceptionMessageFiles_;
01634     }
01635 
01636     if(machine_->terminated()) {
01637       FDEBUG(1) << "The state machine reports it has been terminated\n";
01638       machine_.reset();
01639     }
01640 
01641     if(!onlineStateTransitions) changeState(mFinished);
01642 
01643     if(stateMachineWasInErrorState_) {
01644       throw cms::Exception("BadState")
01645         << "The boost state machine in the EventProcessor exited after\n"
01646         << "entering the Error state.\n";
01647     }
01648 
01649     return returnCode;
01650   }
01651 
01652   void EventProcessor::readFile() {
01653     FDEBUG(1) << " \treadFile\n";
01654     fb_ = input_->readFile();
01655     if(numberOfForkedChildren_ > 0) {
01656         fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01657     }
01658   }
01659 
01660   void EventProcessor::closeInputFile() {
01661     input_->closeFile(fb_);
01662     FDEBUG(1) << "\tcloseInputFile\n";
01663   }
01664 
01665   void EventProcessor::openOutputFiles() {
01666     schedule_->openOutputFiles(*fb_);
01667     FDEBUG(1) << "\topenOutputFiles\n";
01668   }
01669 
01670   void EventProcessor::closeOutputFiles() {
01671     schedule_->closeOutputFiles();
01672     FDEBUG(1) << "\tcloseOutputFiles\n";
01673   }
01674 
01675   void EventProcessor::respondToOpenInputFile() {
01676     schedule_->respondToOpenInputFile(*fb_);
01677     FDEBUG(1) << "\trespondToOpenInputFile\n";
01678   }
01679 
01680   void EventProcessor::respondToCloseInputFile() {
01681     schedule_->respondToCloseInputFile(*fb_);
01682     FDEBUG(1) << "\trespondToCloseInputFile\n";
01683   }
01684 
01685   void EventProcessor::respondToOpenOutputFiles() {
01686     schedule_->respondToOpenOutputFiles(*fb_);
01687     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01688   }
01689 
01690   void EventProcessor::respondToCloseOutputFiles() {
01691     schedule_->respondToCloseOutputFiles(*fb_);
01692     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01693   }
01694 
01695   void EventProcessor::startingNewLoop() {
01696     shouldWeStop_ = false;
01697     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
01698     // until after we've called beginOfJob
01699     if(looper_ && looperBeginJobRun_) {
01700       looper_->doStartingNewLoop();
01701     }
01702     FDEBUG(1) << "\tstartingNewLoop\n";
01703   }
01704 
01705   bool EventProcessor::endOfLoop() {
01706     if(looper_) {
01707       ModuleChanger changer(schedule_.get());
01708       looper_->setModuleChanger(&changer);
01709       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01710       looper_->setModuleChanger(0);
01711       if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01712       else return false;
01713     }
01714     FDEBUG(1) << "\tendOfLoop\n";
01715     return true;
01716   }
01717 
01718   void EventProcessor::rewindInput() {
01719     input_->repeat();
01720     input_->rewind();
01721     FDEBUG(1) << "\trewind\n";
01722   }
01723 
01724   void EventProcessor::prepareForNextLoop() {
01725     looper_->prepareForNextLoop(esp_.get());
01726     FDEBUG(1) << "\tprepareForNextLoop\n";
01727   }
01728 
01729   bool EventProcessor::shouldWeCloseOutput() const {
01730     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01731     return schedule_->shouldWeCloseOutput();
01732   }
01733 
01734   void EventProcessor::doErrorStuff() {
01735     FDEBUG(1) << "\tdoErrorStuff\n";
01736     LogError("StateMachine")
01737       << "The EventProcessor state machine encountered an unexpected event\n"
01738       << "and went to the error state\n"
01739       << "Will attempt to terminate processing normally\n"
01740       << "(IF using the looper the next loop will be attempted)\n"
01741       << "This likely indicates a bug in an input module or corrupted input or both\n";
01742     stateMachineWasInErrorState_ = true;
01743   }
01744 
01745   void EventProcessor::beginRun(statemachine::Run const& run) {
01746     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01747     input_->doBeginRun(runPrincipal);
01748     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01749                     runPrincipal.beginTime());
01750     if(forceESCacheClearOnNewRun_){
01751       esp_->forceCacheClear();
01752     }
01753     EventSetup const& es = esp_->eventSetupForInstance(ts);
01754     if(looper_ && looperBeginJobRun_== false) {
01755       looper_->copyInfo(ScheduleInfo(schedule_.get()));
01756       looper_->beginOfJob(es);
01757       looperBeginJobRun_ = true;
01758       looper_->doStartingNewLoop();
01759     }
01760     {
01761       typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01762       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01763       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01764     }
01765     FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01766     if(looper_) {
01767       looper_->doBeginRun(runPrincipal, es);
01768     }
01769   }
01770 
01771   void EventProcessor::endRun(statemachine::Run const& run) {
01772     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01773     input_->doEndRun(runPrincipal);
01774     IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01775                     runPrincipal.endTime());
01776     EventSetup const& es = esp_->eventSetupForInstance(ts);
01777     {
01778       typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
01779       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01780       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01781     }
01782     FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
01783     if(looper_) {
01784       looper_->doEndRun(runPrincipal, es);
01785     }
01786   }
01787 
01788   void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
01789     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01790     input_->doBeginLumi(lumiPrincipal);
01791 
01792     Service<RandomNumberGenerator> rng;
01793     if(rng.isAvailable()) {
01794       LuminosityBlock lb(lumiPrincipal, ModuleDescription());
01795       rng->preBeginLumi(lb);
01796     }
01797 
01798     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
01799     // lumi blocks know their start and end times why not also start and end events?
01800     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
01801     EventSetup const& es = esp_->eventSetupForInstance(ts);
01802     {
01803       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
01804       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01805       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01806     }
01807     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01808     if(looper_) {
01809       looper_->doBeginLuminosityBlock(lumiPrincipal, es);
01810     }
01811   }
01812 
01813   void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
01814     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01815     input_->doEndLumi(lumiPrincipal);
01816     //NOTE: Using the max event number for the end of a lumi block is a bad idea
01817     // lumi blocks know their start and end times why not also start and end events?
01818     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
01819                     lumiPrincipal.endTime());
01820     EventSetup const& es = esp_->eventSetupForInstance(ts);
01821     {
01822       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
01823       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01824       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01825     }
01826     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01827     if(looper_) {
01828       looper_->doEndLuminosityBlock(lumiPrincipal, es);
01829     }
01830   }
01831 
01832   statemachine::Run EventProcessor::readAndCacheRun() {
01833 
01834     input_->readAndCacheRun();
01835     input_->markRun();
01836     return statemachine::Run(input_->processHistoryID(), input_->run());
01837   }
01838 
01839   int EventProcessor::readAndCacheLumi() {
01840     input_->readAndCacheLumi();
01841     input_->markLumi();
01842     return input_->luminosityBlock();
01843   }
01844 
01845   void EventProcessor::writeRun(statemachine::Run const& run) {
01846     schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
01847     FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
01848   }
01849 
01850   void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
01851     principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
01852     FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
01853   }
01854 
01855   void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
01856     schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
01857     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01858   }
01859 
01860   void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
01861     principalCache_.deleteLumi(phid, run, lumi);
01862     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01863   }
01864 
01865   void EventProcessor::readAndProcessEvent() {
01866     EventPrincipal *pep = 0;
01867     try {
01868       pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
01869       FDEBUG(1) << "\treadEvent\n";
01870     }
01871     catch(cms::Exception& e) {
01872       actions::ActionCodes action = act_table_->find(e.rootCause());
01873       if(action == actions::Rethrow) {
01874         throw;
01875       } else {
01876         LogWarning(e.category())
01877           << "an exception occurred and all paths for the event are being skipped: \n"
01878           << e.what();
01879         return;
01880       }
01881     }
01882     assert(pep != 0);
01883 
01884     IOVSyncValue ts(pep->id(), pep->time());
01885     EventSetup const& es = esp_->eventSetupForInstance(ts);
01886     {
01887       typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
01888       ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
01889       schedule_->processOneOccurrence<Traits>(*pep, es);
01890     }
01891 
01892     if(looper_) {
01893       bool randomAccess = input_->randomAccess();
01894       ProcessingController::ForwardState forwardState = input_->forwardState();
01895       ProcessingController::ReverseState reverseState = input_->reverseState();
01896       ProcessingController pc(forwardState, reverseState, randomAccess);
01897 
01898       EDLooperBase::Status status = EDLooperBase::kContinue;
01899       do {
01900         status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
01901 
01902         bool succeeded = true;
01903         if(randomAccess) {
01904           if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
01905             input_->skipEvents(-2);
01906           }
01907           else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
01908             succeeded = input_->goToEvent(pc.specifiedEventTransition());
01909           }
01910         }
01911         pc.setLastOperationSucceeded(succeeded);
01912       } while(!pc.lastOperationSucceeded());
01913       if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
01914 
01915     }
01916 
01917     FDEBUG(1) << "\tprocessEvent\n";
01918     pep->clearEventPrincipal();
01919   }
01920 
01921   bool EventProcessor::shouldWeStop() const {
01922     FDEBUG(1) << "\tshouldWeStop\n";
01923     if(shouldWeStop_) return true;
01924     return schedule_->terminate();
01925   }
01926 
01927   void EventProcessor::setExceptionMessageFiles(std::string& message) {
01928     exceptionMessageFiles_ = message;
01929   }
01930 
01931   void EventProcessor::setExceptionMessageRuns(std::string& message) {
01932     exceptionMessageRuns_ = message;
01933   }
01934 
01935   void EventProcessor::setExceptionMessageLumis(std::string& message) {
01936     exceptionMessageLumis_ = message;
01937   }
01938 
01939   bool EventProcessor::alreadyHandlingException() const {
01940     return alreadyHandlingException_;
01941   }
01942 
01943   void EventProcessor::terminateMachine() {
01944     if(machine_.get() != 0) {
01945       if(!machine_->terminated()) {
01946         forceLooperToEnd_ = true;
01947         machine_->process_event(statemachine::Stop());
01948         forceLooperToEnd_ = false;
01949       }
01950       else {
01951         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
01952       }
01953       if(machine_->terminated()) {
01954         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
01955       }
01956       machine_.reset();
01957     }
01958   }
01959 }