CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/FWCore/Framework/src/EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
00010 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00011 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00012 
00013 #include "FWCore/Framework/interface/CommonParams.h"
00014 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00015 #include "FWCore/Framework/interface/EDLooperBase.h"
00016 #include "FWCore/Framework/interface/EventPrincipal.h"
00017 #include "FWCore/Framework/interface/EventSetupProvider.h"
00018 #include "FWCore/Framework/interface/EventSetupRecord.h"
00019 #include "FWCore/Framework/interface/FileBlock.h"
00020 #include "FWCore/Framework/interface/InputSourceDescription.h"
00021 #include "FWCore/Framework/interface/IOVSyncValue.h"
00022 #include "FWCore/Framework/interface/LooperFactory.h"
00023 #include "FWCore/Framework/interface/LuminosityBlock.h"
00024 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00025 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00026 #include "FWCore/Framework/interface/ModuleChanger.h"
00027 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00028 #include "FWCore/Framework/interface/ProcessingController.h"
00029 #include "FWCore/Framework/interface/RunPrincipal.h"
00030 #include "FWCore/Framework/interface/Schedule.h"
00031 #include "FWCore/Framework/interface/ScheduleInfo.h"
00032 #include "FWCore/Framework/interface/SubProcess.h"
00033 #include "FWCore/Framework/src/Breakpoints.h"
00034 #include "FWCore/Framework/src/EPStates.h"
00035 #include "FWCore/Framework/src/EventSetupsController.h"
00036 #include "FWCore/Framework/src/InputSourceFactory.h"
00037 
00038 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00039 
00040 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00041 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00043 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00044 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00045 #include "FWCore/ParameterSet/interface/Registry.h"
00046 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00047 
00048 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00049 #include "FWCore/ServiceRegistry/interface/Service.h"
00050 
00051 #include "FWCore/Utilities/interface/DebugMacros.h"
00052 #include "FWCore/Utilities/interface/EDMException.h"
00053 #include "FWCore/Utilities/interface/GetPassID.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057 
00058 #include "boost/bind.hpp"
00059 #include "boost/thread/xtime.hpp"
00060 
00061 #include <exception>
00062 #include <iomanip>
00063 #include <iostream>
00064 #include <utility>
00065 
00066 #include <sys/ipc.h>
00067 #include <sys/msg.h>
00068 
00069 //Used for forking
00070 #include <sys/types.h>
00071 #include <sys/wait.h>
00072 #include <unistd.h>
00073 
00074 //Used for CPU affinity
00075 #ifndef __APPLE__
00076 #include <sched.h>
00077 #endif
00078 
00079 namespace edm {
00080 
00081   namespace event_processor {
00082 
00083     class StateSentry {
00084     public:
00085       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00086       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00087       void succeeded() {success_ = true;}
00088 
00089     private:
00090       EventProcessor* ep_;
00091       bool success_;
00092     };
00093   }
00094 
00095   using namespace event_processor;
00096 
00097   namespace {
00098     template <typename T>
00099     class ScheduleSignalSentry {
00100     public:
00101       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00102            a_(a), principal_(principal), es_(es) {
00103         if (a_) T::preScheduleSignal(a_, principal_);
00104       }
00105       ~ScheduleSignalSentry() {
00106         if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00107       }
00108 
00109     private:
00110       // We own none of these resources.
00111       ActivityRegistry* a_;
00112       typename T::MyPrincipal* principal_;
00113       EventSetup const* es_;
00114     };
00115   }
00116 
00117   namespace {
00118 
00119     // the next two tables must be kept in sync with the state and
00120     // message enums from the header
00121 
00122     char const* stateNames[] = {
00123       "Init",
00124       "JobReady",
00125       "RunGiven",
00126       "Running",
00127       "Stopping",
00128       "ShuttingDown",
00129       "Done",
00130       "JobEnded",
00131       "Error",
00132       "ErrorEnded",
00133       "End",
00134       "Invalid"
00135     };
00136 
00137     char const* msgNames[] = {
00138       "SetRun",
00139       "Skip",
00140       "RunAsync",
00141       "Run(ID)",
00142       "Run(count)",
00143       "BeginJob",
00144       "StopAsync",
00145       "ShutdownAsync",
00146       "EndJob",
00147       "CountComplete",
00148       "InputExhausted",
00149       "StopSignal",
00150       "ShutdownSignal",
00151       "Finished",
00152       "Any",
00153       "dtor",
00154       "Exception",
00155       "Rewind"
00156     };
00157   }
00158     // IMPORTANT NOTE:
00159     // the mAny messages are special, they must appear last in the
00160     // table if multiple entries for a CurrentState are present.
00161     // the changeState function does not use the mAny yet!!!
00162 
00163     struct TransEntry {
00164       State current;
00165       Msg   message;
00166       State final;
00167     };
00168 
00169     // we should use this information to initialize a two dimensional
00170     // table of t[CurrentState][Message] = FinalState
00171 
00172     /*
00173       the way this is current written, the async run can thread function
00174       can return in the "JobReady" state - but not yet cleaned up.  The
00175       problem is that only when stop/shutdown async is called is the
00176       thread cleaned up. But the stop/shudown async functions attempt
00177       first to change the state using messages that are not valid in
00178       "JobReady" state.
00179 
00180       I think most of the problems can be solved by using two states
00181       for "running": RunningS and RunningA (sync/async). The problems
00182       seems to be the all the transitions out of running for both
00183       modes of operation.  The other solution might be to only go to
00184       "Stopping" from Running, and use the return code from "run_p" to
00185       set the final state.  If this is used, then in the sync mode the
00186       "Stopping" state would be momentary.
00187 
00188      */
00189 
00190     TransEntry table[] = {
00191     // CurrentState   Message         FinalState
00192     // -----------------------------------------
00193       { sInit,          mException,      sError },
00194       { sInit,          mBeginJob,       sJobReady },
00195       { sJobReady,      mException,      sError },
00196       { sJobReady,      mSetRun,         sRunGiven },
00197       { sJobReady,      mInputRewind,    sRunning },
00198       { sJobReady,      mSkip,           sRunning },
00199       { sJobReady,      mRunID,          sRunning },
00200       { sJobReady,      mRunCount,       sRunning },
00201       { sJobReady,      mEndJob,         sJobEnded },
00202       { sJobReady,      mBeginJob,       sJobReady },
00203       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00204 
00205       { sJobReady,      mStopAsync,      sJobReady },
00206       { sJobReady,      mCountComplete,  sJobReady },
00207       { sJobReady,      mFinished,       sJobReady },
00208 
00209       { sRunGiven,      mException,      sError },
00210       { sRunGiven,      mRunAsync,       sRunning },
00211       { sRunGiven,      mBeginJob,       sRunGiven },
00212       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00213       { sRunGiven,      mStopAsync,      sStopping },
00214       { sRunning,       mException,      sError },
00215       { sRunning,       mStopAsync,      sStopping },
00216       { sRunning,       mShutdownAsync,  sShuttingDown },
00217       { sRunning,       mShutdownSignal, sShuttingDown },
00218       { sRunning,       mCountComplete,  sStopping }, // sJobReady
00219       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00220 
00221       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00222       { sStopping,      mException,      sError },
00223       { sStopping,      mFinished,       sJobReady },
00224       { sStopping,      mCountComplete,  sJobReady },
00225       { sStopping,      mShutdownSignal, sShuttingDown },
00226       { sStopping,      mStopAsync,      sStopping },     // stay
00227       { sStopping,      mInputExhausted, sStopping },     // stay
00228       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00229       { sShuttingDown,  mException,      sError },
00230       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00231       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00232       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00233       { sShuttingDown,  mFinished,       sDone },
00234       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00235       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00236       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00237       { sDone,          mEndJob,         sJobEnded },
00238       { sDone,          mException,      sError },
00239       { sJobEnded,      mDtor,           sEnd },
00240       { sJobEnded,      mException,      sError },
00241       { sError,         mEndJob,         sError },   // funny one here
00242       { sError,         mDtor,           sError },   // funny one here
00243       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00244       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00245       { sInvalid,       mAny,            sInvalid }
00246     };
00247 
00248 
00249     // Note: many of the messages generate the mBeginJob message first
00250     //  mRunID, mRunCount, mSetRun
00251 
00252   // ---------------------------------------------------------------
00253   boost::shared_ptr<InputSource>
00254   makeInput(ParameterSet& params,
00255             CommonParams const& common,
00256             ProductRegistry& preg,
00257             PrincipalCache& pCache,
00258             boost::shared_ptr<ActivityRegistry> areg,
00259             boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00260     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00261     if(main_input == 0) {
00262       throw Exception(errors::Configuration, "FailedInputSource")
00263         << "Configuration of primary input source has failed\n";
00264     }
00265 
00266     std::string modtype;
00267     try {
00268       modtype = main_input->getParameter<std::string>("@module_type");
00269       std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00270         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00271       ConfigurationDescriptions descriptions(filler->baseType());
00272       filler->fill(descriptions);
00273       descriptions.validate(*main_input, std::string("source"));
00274     }
00275     catch (cms::Exception& iException) {
00276       Exception toThrow(errors::Configuration, "Failed validating primary input source configuration.");
00277       toThrow << "\nSource plugin name is \"" << modtype << "\"\n";
00278       toThrow.append(iException);
00279       throw toThrow;
00280     }
00281 
00282     main_input->registerIt();
00283 
00284     // Fill in "ModuleDescription", in case the input source produces
00285     // any EDproducts, which would be registered in the ProductRegistry.
00286     // Also fill in the process history item for this process.
00287     // There is no module label for the unnamed input source, so
00288     // just use "source".
00289     // Only the tracked parameters belong in the process configuration.
00290     ModuleDescription md(main_input->id(),
00291                          main_input->getParameter<std::string>("@module_type"),
00292                          "source",
00293                          processConfiguration);
00294 
00295     InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00296     areg->preSourceConstructionSignal_(md);
00297     boost::shared_ptr<InputSource> input;
00298     try {
00299       input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00300       areg->postSourceConstructionSignal_(md);
00301     }
00302     catch (Exception& iException) {
00303       areg->postSourceConstructionSignal_(md);
00304       //we want to keep the same category code so that cmsRun will return the proper return value
00305       Exception toThrow(iException.categoryCode(), "Error occured while constructing primary input source.");
00306       toThrow << "\nSource is of type \"" << modtype << "\"\n";
00307       toThrow.append(iException);
00308       throw toThrow;
00309     }
00310     catch (cms::Exception& iException) {
00311       areg->postSourceConstructionSignal_(md);
00312       Exception toThrow(errors::Configuration, "Error occured while constructing primary input source.");
00313       toThrow << "\nSource is of type \"" << modtype << "\"\n";
00314       toThrow.append(iException);
00315       throw toThrow;
00316     }
00317 
00318     return input;
00319   }
00320 
00321   // ---------------------------------------------------------------
00322   boost::shared_ptr<EDLooperBase>
00323   fillLooper(eventsetup::EventSetupProvider& cp,
00324                          ParameterSet& params,
00325                          CommonParams const& common) {
00326     boost::shared_ptr<EDLooperBase> vLooper;
00327 
00328     std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00329 
00330     if(loopers.size() == 0) {
00331        return vLooper;
00332     }
00333 
00334     assert(1 == loopers.size());
00335 
00336     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00337         itName != itNameEnd;
00338         ++itName) {
00339 
00340       ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00341       providerPSet->registerIt();
00342       vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00343                                                         *providerPSet,
00344                                                         common.processName_,
00345                                                         common.releaseVersion_,
00346                                                         common.passID_);
00347       }
00348       return vLooper;
00349 
00350   }
00351 
00352   // ---------------------------------------------------------------
00353   EventProcessor::EventProcessor(std::string const& config,
00354                                 ServiceToken const& iToken,
00355                                 serviceregistry::ServiceLegacy iLegacy,
00356                                 std::vector<std::string> const& defaultServices,
00357                                 std::vector<std::string> const& forcedServices) :
00358     preProcessEventSignal_(),
00359     postProcessEventSignal_(),
00360     actReg_(),
00361     preg_(),
00362     serviceToken_(),
00363     input_(),
00364     esp_(),
00365     act_table_(),
00366     processConfiguration_(),
00367     schedule_(),
00368     subProcess_(),
00369     state_(sInit),
00370     event_loop_(),
00371     state_lock_(),
00372     stop_lock_(),
00373     stopper_(),
00374     starter_(),
00375     stop_count_(-1),
00376     last_rc_(epSuccess),
00377     last_error_text_(),
00378     id_set_(false),
00379     event_loop_id_(),
00380     my_sig_num_(getSigNum()),
00381     fb_(),
00382     looper_(),
00383     machine_(),
00384     principalCache_(),
00385     shouldWeStop_(false),
00386     stateMachineWasInErrorState_(false),
00387     fileMode_(),
00388     emptyRunLumiMode_(),
00389     exceptionMessageFiles_(),
00390     exceptionMessageRuns_(),
00391     exceptionMessageLumis_(),
00392     alreadyHandlingException_(false),
00393     forceLooperToEnd_(false),
00394     looperBeginJobRun_(false),
00395     forceESCacheClearOnNewRun_(false),
00396     numberOfForkedChildren_(0),
00397     numberOfSequentialEventsPerChild_(1),
00398     setCpuAffinity_(false),
00399     eventSetupDataToExcludeFromPrefetching_() {
00400     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00401     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00402     processDesc->addServices(defaultServices, forcedServices);
00403     init(processDesc, iToken, iLegacy);
00404   }
00405 
00406   EventProcessor::EventProcessor(std::string const& config,
00407                                 std::vector<std::string> const& defaultServices,
00408                                 std::vector<std::string> const& forcedServices) :
00409     preProcessEventSignal_(),
00410     postProcessEventSignal_(),
00411     actReg_(),
00412     preg_(),
00413     serviceToken_(),
00414     input_(),
00415     esp_(),
00416     act_table_(),
00417     processConfiguration_(),
00418     schedule_(),
00419     subProcess_(),
00420     state_(sInit),
00421     event_loop_(),
00422     state_lock_(),
00423     stop_lock_(),
00424     stopper_(),
00425     starter_(),
00426     stop_count_(-1),
00427     last_rc_(epSuccess),
00428     last_error_text_(),
00429     id_set_(false),
00430     event_loop_id_(),
00431     my_sig_num_(getSigNum()),
00432     fb_(),
00433     looper_(),
00434     machine_(),
00435     principalCache_(),
00436     shouldWeStop_(false),
00437     stateMachineWasInErrorState_(false),
00438     fileMode_(),
00439     emptyRunLumiMode_(),
00440     exceptionMessageFiles_(),
00441     exceptionMessageRuns_(),
00442     exceptionMessageLumis_(),
00443     alreadyHandlingException_(false),
00444     forceLooperToEnd_(false),
00445     looperBeginJobRun_(false),
00446     forceESCacheClearOnNewRun_(false),
00447     numberOfForkedChildren_(0),
00448     numberOfSequentialEventsPerChild_(1),
00449     setCpuAffinity_(false),
00450     eventSetupDataToExcludeFromPrefetching_() {
00451     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00452     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00453     processDesc->addServices(defaultServices, forcedServices);
00454     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00455   }
00456 
00457   EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00458                  ServiceToken const& token,
00459                  serviceregistry::ServiceLegacy legacy) :
00460     preProcessEventSignal_(),
00461     postProcessEventSignal_(),
00462     actReg_(),
00463     preg_(),
00464     serviceToken_(),
00465     input_(),
00466     esp_(),
00467     act_table_(),
00468     processConfiguration_(),
00469     schedule_(),
00470     subProcess_(),
00471     state_(sInit),
00472     event_loop_(),
00473     state_lock_(),
00474     stop_lock_(),
00475     stopper_(),
00476     starter_(),
00477     stop_count_(-1),
00478     last_rc_(epSuccess),
00479     last_error_text_(),
00480     id_set_(false),
00481     event_loop_id_(),
00482     my_sig_num_(getSigNum()),
00483     fb_(),
00484     looper_(),
00485     machine_(),
00486     principalCache_(),
00487     shouldWeStop_(false),
00488     stateMachineWasInErrorState_(false),
00489     fileMode_(),
00490     emptyRunLumiMode_(),
00491     exceptionMessageFiles_(),
00492     exceptionMessageRuns_(),
00493     exceptionMessageLumis_(),
00494     alreadyHandlingException_(false),
00495     forceLooperToEnd_(false),
00496     looperBeginJobRun_(false),
00497     forceESCacheClearOnNewRun_(false),
00498     numberOfForkedChildren_(0),
00499     numberOfSequentialEventsPerChild_(1),
00500     setCpuAffinity_(false),
00501     eventSetupDataToExcludeFromPrefetching_() {
00502     init(processDesc, token, legacy);
00503   }
00504 
00505 
00506   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00507     preProcessEventSignal_(),
00508     postProcessEventSignal_(),
00509     actReg_(),
00510     preg_(),
00511     serviceToken_(),
00512     input_(),
00513     esp_(),
00514     act_table_(),
00515     processConfiguration_(),
00516     schedule_(),
00517     subProcess_(),
00518     state_(sInit),
00519     event_loop_(),
00520     state_lock_(),
00521     stop_lock_(),
00522     stopper_(),
00523     starter_(),
00524     stop_count_(-1),
00525     last_rc_(epSuccess),
00526     last_error_text_(),
00527     id_set_(false),
00528     event_loop_id_(),
00529     my_sig_num_(getSigNum()),
00530     fb_(),
00531     looper_(),
00532     machine_(),
00533     principalCache_(),
00534     shouldWeStop_(false),
00535     stateMachineWasInErrorState_(false),
00536     fileMode_(),
00537     emptyRunLumiMode_(),
00538     exceptionMessageFiles_(),
00539     exceptionMessageRuns_(),
00540     exceptionMessageLumis_(),
00541     alreadyHandlingException_(false),
00542     forceLooperToEnd_(false),
00543     looperBeginJobRun_(false),
00544     forceESCacheClearOnNewRun_(false),
00545     numberOfForkedChildren_(0),
00546     numberOfSequentialEventsPerChild_(1),
00547     setCpuAffinity_(false),
00548     eventSetupDataToExcludeFromPrefetching_() {
00549     if(isPython) {
00550       boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00551       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00552       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00553     }
00554     else {
00555       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00556       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00557     }
00558   }
00559 
00560   void
00561   EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00562                         ServiceToken const& iToken,
00563                         serviceregistry::ServiceLegacy iLegacy) {
00564 
00565     //std::cerr << processDesc->dump() << std::endl;
00566     // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
00567     //  They must be cleared here because some processes run multiple EventProcessors in succession.
00568     BranchIDListHelper::clearRegistries();
00569 
00570     boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00571     //std::cerr << parameterSet->dump() << std::endl;
00572 
00573     // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
00574     boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00575 
00576     // Now set some parameters specific to the main process.
00577     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00578     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00579     emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00580     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00581     ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00582     numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00583     numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00584     setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00585     std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00586     for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00587         itPS != itPSEnd;
00588         ++itPS) {
00589       eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00590                                                 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00591                                                                itPS->getUntrackedParameter<std::string>("label", "")));
00592     }
00593 
00594     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00595 
00596     std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00597 
00598     // Now do general initialization
00599     ScheduleItems items;
00600 
00601     //initialize the services
00602     boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00603     ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00604     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00605 
00606     //make the services available
00607     ServiceRegistry::Operate operate(serviceToken_);
00608 
00609     // intialize miscellaneous items
00610     boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00611 
00612     // intialize the event setup provider
00613     esp_ = espController->makeProvider(*parameterSet, *common);
00614 
00615     // initialize the looper, if any
00616     looper_ = fillLooper(*esp_, *parameterSet, *common);
00617     if(looper_) {
00618       looper_->setActionTable(items.act_table_.get());
00619       looper_->attachTo(*items.actReg_);
00620     }
00621 
00622     // initialize the input source
00623     input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00624 
00625     // intialize the Schedule
00626     schedule_ = items.initSchedule(*parameterSet);
00627 
00628     // set the data members
00629     act_table_ = items.act_table_;
00630     actReg_ = items.actReg_;
00631     preg_ = items.preg_;
00632     processConfiguration_ = items.processConfiguration_;
00633 
00634     FDEBUG(2) << parameterSet << std::endl;
00635     connectSigs(this);
00636 
00637     // initialize the subprocess, if there is one
00638     if(subProcessParameterSet) {
00639       subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController, *actReg_, token, serviceregistry::kConfigurationOverrides));
00640     }
00641   }
00642 
00643   EventProcessor::~EventProcessor() {
00644     // Make the services available while everything is being deleted.
00645     ServiceToken token = getToken();
00646     ServiceRegistry::Operate op(token);
00647 
00648     // The state machine should have already been cleaned up
00649     // and destroyed at this point by a call to EndJob or
00650     // earlier when it completed processing events, but if it
00651     // has not been we'll take care of it here at the last moment.
00652     // This could cause problems if we are already handling an
00653     // exception and another one is thrown here ...  For a critical
00654     // executable the solution to this problem is for the code using
00655     // the EventProcessor to explicitly call EndJob or use runToCompletion,
00656     // then the next line of code is never executed.
00657     terminateMachine();
00658 
00659     try {
00660       changeState(mDtor);
00661     }
00662     catch(cms::Exception& e) {
00663       LogError("System")
00664         << e.explainSelf() << "\n";
00665     }
00666 
00667     // manually destroy all these thing that may need the services around
00668     subProcess_.reset();
00669     esp_.reset();
00670     schedule_.reset();
00671     input_.reset();
00672     looper_.reset();
00673     actReg_.reset();
00674 
00675     pset::Registry* psetRegistry = pset::Registry::instance();
00676     psetRegistry->data().clear();
00677     psetRegistry->extra().setID(ParameterSetID());
00678 
00679     EntryDescriptionRegistry::instance()->data().clear();
00680     ParentageRegistry::instance()->data().clear();
00681     ProcessConfigurationRegistry::instance()->data().clear();
00682     ProcessHistoryRegistry::instance()->data().clear();
00683     BranchIDListHelper::clearRegistries();
00684   }
00685 
00686   void
00687   EventProcessor::rewind() {
00688     beginJob(); //make sure this was called
00689     changeState(mStopAsync);
00690     changeState(mInputRewind);
00691     {
00692       StateSentry toerror(this);
00693 
00694       //make the services available
00695       ServiceRegistry::Operate operate(serviceToken_);
00696 
00697       {
00698         input_->repeat();
00699         input_->rewind();
00700       }
00701       changeState(mCountComplete);
00702       toerror.succeeded();
00703     }
00704     changeState(mFinished);
00705   }
00706 
00707   EventProcessor::StatusCode
00708   EventProcessor::run(int numberEventsToProcess, bool) {
00709     return runEventCount(numberEventsToProcess);
00710   }
00711 
00712   EventProcessor::StatusCode
00713   EventProcessor::skip(int numberToSkip) {
00714     beginJob(); //make sure this was called
00715     changeState(mSkip);
00716     {
00717       StateSentry toerror(this);
00718 
00719       //make the services available
00720       ServiceRegistry::Operate operate(serviceToken_);
00721 
00722       {
00723         input_->skipEvents(numberToSkip);
00724       }
00725       changeState(mCountComplete);
00726       toerror.succeeded();
00727     }
00728     changeState(mFinished);
00729     return epSuccess;
00730   }
00731 
00732   void
00733   EventProcessor::beginJob() {
00734     if(state_ != sInit) return;
00735     bk::beginJob();
00736     // can only be run if in the initial state
00737     changeState(mBeginJob);
00738 
00739     // StateSentry toerror(this); // should we add this ?
00740     //make the services available
00741     ServiceRegistry::Operate operate(serviceToken_);
00742 
00743     //NOTE:  This implementation assumes 'Job' means one call
00744     // the EventProcessor::run
00745     // If it really means once per 'application' then this code will
00746     // have to be changed.
00747     // Also have to deal with case where have 'run' then new Module
00748     // added and do 'run'
00749     // again.  In that case the newly added Module needs its 'beginJob'
00750     // to be called.
00751 
00752     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
00753     //  For now we delay calling beginOfJob until first beginOfRun
00754     //if(looper_) {
00755     //   looper_->beginOfJob(es);
00756     //}
00757     try {
00758       input_->doBeginJob();
00759     } catch(cms::Exception& e) {
00760       LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00761       e << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00762       throw;
00763     } catch(std::exception& e) {
00764       LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n";
00765       throw;
00766     } catch(...) {
00767       LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n";
00768       throw;
00769     }
00770 
00771     schedule_->beginJob();
00772     // toerror.succeeded(); // should we add this?
00773     if(hasSubProcess()) subProcess_->doBeginJob();
00774     actReg_->postBeginJobSignal_();
00775   }
00776 
00777   void
00778   EventProcessor::endJob() {
00779     // Collects exceptions, so we don't throw before all operations are performed.
00780     ExceptionCollector c;
00781 
00782     // only allowed to run if state is sIdle, sJobReady, sRunGiven
00783     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00784 
00785     //make the services available
00786     ServiceRegistry::Operate operate(serviceToken_);
00787 
00788     c.call(boost::bind(&EventProcessor::terminateMachine, this));
00789     c.call(boost::bind(&Schedule::endJob, schedule_.get()));
00790     if(hasSubProcess()) {
00791       c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00792     }
00793     c.call(boost::bind(&InputSource::doEndJob, input_));
00794     if(looper_) {
00795       c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00796     }
00797     c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00798     if(c.hasThrown()) {
00799       c.rethrow();
00800     }
00801   }
00802 
00803   ServiceToken
00804   EventProcessor::getToken() {
00805     return serviceToken_;
00806   }
00807 
00808   //Setup signal handler to listen for when forked children stop
00809   namespace {
00810     volatile bool child_failed = false;
00811     volatile unsigned int num_children_done = 0;
00812     volatile int child_fail_exit_status = 0;
00813 
00814     extern "C" {
00815       void ep_sigchld(int, siginfo_t*, void*) {
00816         //printf("in sigchld\n");
00817         //FDEBUG(1) << "in sigchld handler\n";
00818         int stat_loc;
00819         pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00820         while(0<p) {
00821           //printf("  looping\n");
00822           if(WIFEXITED(stat_loc)) {
00823             ++num_children_done;
00824             if(0 != WEXITSTATUS(stat_loc)) {
00825               child_failed = true;
00826               child_fail_exit_status = WEXITSTATUS(stat_loc);
00827             }
00828           }
00829           if(WIFSIGNALED(stat_loc)) {
00830             ++num_children_done;
00831             child_failed = true;
00832           }
00833           p = waitpid(-1, &stat_loc, WNOHANG);
00834         }
00835       }
00836     }
00837 
00838   }
00839 
00840   enum {
00841     kChildSucceed,
00842     kChildExitBadly,
00843     kChildSegv,
00844     kMaxChildAction
00845   };
00846 
00847   namespace {
00848     unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00849       unsigned int n = 0;
00850       while(numberOfChildren != 0) {
00851         ++n;
00852         numberOfChildren /= 10;
00853       }
00854       if(n == 0) {
00855         n = 3; // Protect against zero numberOfChildren
00856       }
00857       return n;
00858     }
00859 
00860     class MessageSenderToSource {
00861     public:
00862       MessageSenderToSource(int iQueueID, long iNEventsToProcess):
00863       m_queueID(iQueueID),
00864       m_nEventsToProcess(iNEventsToProcess) {}
00865       void operator()() {
00866         std::cout << "I am controller" << std::endl;
00867         //this is the first child and therefore the controller
00868 
00869         multicore::MessageForSource sndmsg;
00870         sndmsg.startIndex = 0;
00871         sndmsg.nIndices = m_nEventsToProcess;
00872         if(m_nEventsToProcess > 0 && m_nEventsToProcess < m_nEventsToProcess) {
00873           sndmsg.nIndices = m_nEventsToProcess;
00874         }
00875         int value = 0;
00876         do {
00877           value = msgsnd(m_queueID, &sndmsg, multicore::MessageForSource::sizeForBuffer(), 0);
00878           //std::cerr << "worker asked for work" << std::endl;
00879           sndmsg.startIndex += sndmsg.nIndices;
00880         } while(value == 0);
00881         if(errno != EIDRM and errno != EINVAL) {
00882           //NOTE: on Linux we sometimes get EINVAL after the queue was removed rather than EIDRM
00883           perror("message queue send failed");
00884         }
00885         return;
00886       }
00887 
00888     private:
00889       int const m_queueID;
00890       long const m_nEventsToProcess;
00891     };
00892   }
00893 
00894   bool
00895   EventProcessor::forkProcess(std::string const& jobReportFile) {
00896 
00897     if(0 == numberOfForkedChildren_) {return true;}
00898     assert(0<numberOfForkedChildren_);
00899     //do what we want done in common
00900     {
00901       beginJob(); //make sure this was run
00902       // make the services available
00903       ServiceRegistry::Operate operate(serviceToken_);
00904 
00905       InputSource::ItemType itemType;
00906       itemType = input_->nextItemType();
00907 
00908       assert(itemType == InputSource::IsFile);
00909       {
00910         readFile();
00911       }
00912       itemType = input_->nextItemType();
00913       assert(itemType == InputSource::IsRun);
00914 
00915       std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl;
00916       IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
00917                       input_->runAuxiliary()->beginTime());
00918       EventSetup const& es = esp_->eventSetupForInstance(ts);
00919 
00920       //now get all the data available in the EventSetup
00921       std::vector<eventsetup::EventSetupRecordKey> recordKeys;
00922       es.fillAvailableRecordKeys(recordKeys);
00923       std::vector<eventsetup::DataKey> dataKeys;
00924       for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
00925           itKey != itEnd;
00926           ++itKey) {
00927         eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
00928         //see if this is on our exclusion list
00929         ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
00930         ExcludedData const* excludedData(0);
00931         if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
00932           excludedData = &(itExcludeRec->second);
00933           if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
00934             //skip all items in this record
00935             continue;
00936           }
00937         }
00938         if(0 != recordPtr) {
00939           dataKeys.clear();
00940           recordPtr->fillRegisteredDataKeys(dataKeys);
00941           for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
00942               itDataKey != itDataKeyEnd;
00943               ++itDataKey) {
00944             //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00945             if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
00946               std::cout << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00947               continue;
00948             }
00949             try {
00950               recordPtr->doGet(*itDataKey);
00951             } catch(cms::Exception& e) {
00952              LogWarning("EventSetupPreFetching") << e.what();
00953             }
00954           }
00955         }
00956       }
00957     }
00958     std::cout << "  done prefetching" << std::endl;
00959 {
00960     // make the services available
00961     ServiceRegistry::Operate operate(serviceToken_);
00962     Service<JobReport> jobReport;
00963     jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
00964 
00965     //Now actually do the forking
00966     actReg_->preForkReleaseResourcesSignal_();
00967     input_->doPreForkReleaseResources();
00968     schedule_->preForkReleaseResources();
00969 }
00970     installCustomHandler(SIGCHLD, ep_sigchld);
00971 
00972     //Create a message queue used to set what events each child processes
00973     int queueID;
00974     if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) {
00975       printf("Error obtaining message queue\n");
00976       exit(EXIT_FAILURE);
00977     }
00978 
00979     unsigned int childIndex = 0;
00980     unsigned int const kMaxChildren = numberOfForkedChildren_;
00981     unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
00982     std::vector<pid_t> childrenIds;
00983     childrenIds.reserve(kMaxChildren);
00984 {
00985     // make the services available
00986     ServiceRegistry::Operate operate(serviceToken_);
00987     Service<JobReport> jobReport;
00988     for(; childIndex < kMaxChildren; ++childIndex) {
00989       pid_t value = fork();
00990       if(value == 0) {
00991         // this is the child process, redirect stdout and stderr to a log file
00992         fflush(stdout);
00993         fflush(stderr);
00994         std::stringstream stout;
00995         stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
00996         if(0 == freopen(stout.str().c_str(), "w", stdout)) {
00997           std::cerr << "Error during freopen of child process "
00998           << childIndex << std::endl;
00999         }
01000         if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01001           std::cerr << "Error during dup2 of child process"
01002           << childIndex << std::endl;
01003         }
01004 
01005         std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl;
01006         if(setCpuAffinity_) {
01007           // CPU affinity is handled differently on macosx.
01008           // We disable it and print a message until someone reads:
01009           //
01010           // https://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
01011           //
01012           // and implements it.
01013 #ifdef __APPLE__
01014           std::cout << "Architecture support for CPU affinity not implemented." << std::endl;
01015 #else
01016           std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl;
01017           cpu_set_t mask;
01018           CPU_ZERO(&mask);
01019           CPU_SET(childIndex, &mask);
01020           if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01021             std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl;
01022             exit(-1);
01023           }
01024 #endif
01025         }
01026         break;
01027       }
01028       if(value < 0) {
01029         std::cerr << "failed to create a child" << std::endl;
01030         //message queue is a system resource so must be cleaned up
01031         // before parent goes away
01032         msgctl(queueID, IPC_RMID, 0);
01033         exit(-1);
01034       }
01035       childrenIds.push_back(value);
01036     }
01037 
01038     if(childIndex < kMaxChildren) {
01039       jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01040       actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01041 
01042       boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID));
01043       input_->doPostForkReacquireResources(receiver);
01044       schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01045       //NOTE: sources have to reset themselves by listening to the post fork message
01046       //rewindInput();
01047       return true;
01048     }
01049     jobReport->parentAfterFork(jobReportFile);
01050 }
01051 
01052     //this is the original, which is now the master for all the children
01053 
01054     //Need to wait for signals from the children or externally
01055     // To wait we must
01056     // 1) block the signals we want to wait on so we do not have a race condition
01057     // 2) check that we haven't already meet our ending criteria
01058     // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
01059     sigset_t blockingSigSet;
01060     sigset_t unblockingSigSet;
01061     sigset_t oldSigSet;
01062     pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01063     pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01064     sigaddset(&blockingSigSet, SIGCHLD);
01065     sigaddset(&blockingSigSet, SIGUSR2);
01066     sigaddset(&blockingSigSet, SIGINT);
01067     sigdelset(&unblockingSigSet, SIGCHLD);
01068     sigdelset(&unblockingSigSet, SIGUSR2);
01069     sigdelset(&unblockingSigSet, SIGINT);
01070     pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01071 
01072     //create a thread that sends the units of work to workers
01073     // we create it after all signals were blocked so that this
01074     // thread is never interupted by a signal
01075     MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_);
01076     boost::thread senderThread(sender);
01077 
01078     while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01079       sigsuspend(&unblockingSigSet);
01080       std::cout << "woke from sigwait" << std::endl;
01081     }
01082     pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01083 
01084     std::cout << "num children who have already stopped " << num_children_done << std::endl;
01085     if(child_failed) {
01086       std::cout << "child failed" << std::endl;
01087     }
01088     if(shutdown_flag) {
01089       std::cout << "asked to shutdown" << std::endl;
01090     }
01091 
01092     if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01093       std::cout << "must stop children" << std::endl;
01094       for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01095           it != itEnd; ++it) {
01096         /* int result = */ kill(*it, SIGUSR2);
01097       }
01098       pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01099       while(num_children_done != kMaxChildren) {
01100         sigsuspend(&unblockingSigSet);
01101       }
01102       pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01103     }
01104     //remove message queue, this will cause the message thread to terminate
01105     // kill it now since all children already stopped
01106     msgctl(queueID, IPC_RMID, 0);
01107     //now wait for the sender thread to finish. This should be quick since
01108     // we killed the message queue
01109     senderThread.join();
01110     if(child_failed) {
01111       throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01112     }
01113     return false;
01114   }
01115 
01116   void
01117   EventProcessor::connectSigs(EventProcessor* ep) {
01118     // When the FwkImpl signals are given, pass them to the
01119     // appropriate EventProcessor signals so that the outside world
01120     // can see the signal.
01121     actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01122     actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01123   }
01124 
01125   std::vector<ModuleDescription const*>
01126   EventProcessor::getAllModuleDescriptions() const {
01127     return schedule_->getAllModuleDescriptions();
01128   }
01129 
01130   int
01131   EventProcessor::totalEvents() const {
01132     return schedule_->totalEvents();
01133   }
01134 
01135   int
01136   EventProcessor::totalEventsPassed() const {
01137     return schedule_->totalEventsPassed();
01138   }
01139 
01140   int
01141   EventProcessor::totalEventsFailed() const {
01142     return schedule_->totalEventsFailed();
01143   }
01144 
01145   void
01146   EventProcessor::enableEndPaths(bool active) {
01147     schedule_->enableEndPaths(active);
01148   }
01149 
01150   bool
01151   EventProcessor::endPathsEnabled() const {
01152     return schedule_->endPathsEnabled();
01153   }
01154 
01155   void
01156   EventProcessor::getTriggerReport(TriggerReport& rep) const {
01157     schedule_->getTriggerReport(rep);
01158   }
01159 
01160   void
01161   EventProcessor::clearCounters() {
01162     schedule_->clearCounters();
01163   }
01164 
01165   char const* EventProcessor::currentStateName() const {
01166     return stateName(getState());
01167   }
01168 
01169   char const* EventProcessor::stateName(State s) const {
01170     return stateNames[s];
01171   }
01172 
01173   char const* EventProcessor::msgName(Msg m) const {
01174     return msgNames[m];
01175   }
01176 
01177   State EventProcessor::getState() const {
01178     return state_;
01179   }
01180 
01181   EventProcessor::StatusCode EventProcessor::statusAsync() const {
01182     // the thread will record exception/error status in the event processor
01183     // for us to look at and report here
01184     return last_rc_;
01185   }
01186 
01187   void
01188   EventProcessor::setRunNumber(RunNumber_t runNumber) {
01189     if(runNumber == 0) {
01190       runNumber = 1;
01191       LogWarning("Invalid Run")
01192         << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01193         << "Run number was set to 1 instead\n";
01194     }
01195 
01196     // inside of beginJob there is a check to see if it has been called before
01197     beginJob();
01198     changeState(mSetRun);
01199 
01200     // interface not correct yet
01201     input_->setRunNumber(runNumber);
01202   }
01203 
01204   void
01205   EventProcessor::declareRunNumber(RunNumber_t runNumber) {
01206     // inside of beginJob there is a check to see if it has been called before
01207     beginJob();
01208     changeState(mSetRun);
01209 
01210     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
01211     //input_->declareRunNumber(runNumber);
01212   }
01213 
01214   EventProcessor::StatusCode
01215   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01216     bool rc = true;
01217     boost::xtime timeout;
01218     boost::xtime_get(&timeout, boost::TIME_UTC);
01219     timeout.sec += timeout_seconds;
01220 
01221     // make sure to include a timeout here so we don't wait forever
01222     // I suspect there are still timing issues with thread startup
01223     // and the setting of the various control variables (stop_count, id_set)
01224     {
01225       boost::mutex::scoped_lock sl(stop_lock_);
01226 
01227       // look here - if runAsync not active, just return the last return code
01228       if(stop_count_ < 0) return last_rc_;
01229 
01230       if(timeout_seconds == 0) {
01231         while(stop_count_ == 0) stopper_.wait(sl);
01232       } else {
01233         while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01234       }
01235 
01236       if(rc == false) {
01237           // timeout occurred
01238           // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
01239           // this is a temporary hack until we get the input source
01240           // upgraded to allow blocking input sources to be unblocked
01241 
01242           // the next line is dangerous and causes all sorts of trouble
01243           if(id_set_) pthread_cancel(event_loop_id_);
01244 
01245           // we will not do anything yet
01246           LogWarning("timeout")
01247             << "An asynchronous request was made to shut down "
01248             << "the event loop "
01249             << "and the event loop did not shutdown after "
01250             << timeout_seconds << " seconds\n";
01251       } else {
01252           event_loop_->join();
01253           event_loop_.reset();
01254           id_set_ = false;
01255           stop_count_ = -1;
01256       }
01257     }
01258     return rc == false ? epTimedOut : last_rc_;
01259   }
01260 
01261   EventProcessor::StatusCode
01262   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01263     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01264     if(rc != epTimedOut) changeState(mCountComplete);
01265     else errorState();
01266     return rc;
01267   }
01268 
01269 
01270   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01271     changeState(mStopAsync);
01272     StatusCode rc = waitForAsyncCompletion(secs);
01273     if(rc != epTimedOut) changeState(mFinished);
01274     else errorState();
01275     return rc;
01276   }
01277 
01278   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01279     changeState(mShutdownAsync);
01280     StatusCode rc = waitForAsyncCompletion(secs);
01281     if(rc != epTimedOut) changeState(mFinished);
01282     else errorState();
01283     return rc;
01284   }
01285 
01286   void EventProcessor::errorState() {
01287     state_ = sError;
01288   }
01289 
01290   // next function irrelevant now
01291   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01292     // make sure to include a timeout here so we don't wait forever
01293     // I suspect there are still timing issues with thread startup
01294     // and the setting of the various control variables (stop_count, id_set)
01295     changeState(m);
01296     return waitForAsyncCompletion(60*2);
01297   }
01298 
01299   void EventProcessor::changeState(Msg msg) {
01300     // most likely need to serialize access to this routine
01301 
01302     boost::mutex::scoped_lock sl(state_lock_);
01303     State curr = state_;
01304     int rc;
01305     // found if(not end of table) and
01306     // (state == table.state && (msg == table.message || msg == any))
01307     for(rc = 0;
01308         table[rc].current != sInvalid &&
01309           (curr != table[rc].current ||
01310            (curr == table[rc].current &&
01311              msg != table[rc].message && table[rc].message != mAny));
01312         ++rc);
01313 
01314     if(table[rc].current == sInvalid)
01315       throw cms::Exception("BadState")
01316         << "A member function of EventProcessor has been called in an"
01317         << " inappropriate order.\n"
01318         << "Bad transition from " << stateName(curr) << " "
01319         << "using message " << msgName(msg) << "\n"
01320         << "No where to go from here.\n";
01321 
01322     FDEBUG(1) << "changeState: current=" << stateName(curr)
01323               << ", message=" << msgName(msg)
01324               << " -> new=" << stateName(table[rc].final) << "\n";
01325 
01326     state_ = table[rc].final;
01327   }
01328 
01329   void EventProcessor::runAsync() {
01330     beginJob();
01331     {
01332       boost::mutex::scoped_lock sl(stop_lock_);
01333       if(id_set_ == true) {
01334           std::string err("runAsync called while async event loop already running\n");
01335           LogError("FwkJob") << err;
01336           throw cms::Exception("BadState") << err;
01337       }
01338 
01339       changeState(mRunAsync);
01340 
01341       stop_count_ = 0;
01342       last_rc_ = epSuccess; // forget the last value!
01343       event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01344       boost::xtime timeout;
01345       boost::xtime_get(&timeout, boost::TIME_UTC);
01346       timeout.sec += 60; // 60 seconds to start!!!!
01347       if(starter_.timed_wait(sl, timeout) == false) {
01348           // yikes - the thread did not start
01349           throw cms::Exception("BadState")
01350             << "Async run thread did not start in 60 seconds\n";
01351       }
01352     }
01353   }
01354 
01355   void EventProcessor::asyncRun(EventProcessor* me) {
01356     // set up signals to allow for interruptions
01357     // ignore all other signals
01358     // make sure no exceptions escape out
01359 
01360     // temporary hack until we modify the input source to allow
01361     // wakeup calls from other threads.  This mimics the solution
01362     // in EventFilter/Processor, which I do not like.
01363     // allowing cancels means that the thread just disappears at
01364     // certain points.  This is bad for C++ stack variables.
01365     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01366     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
01367     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01368     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01369 
01370     {
01371       boost::mutex::scoped_lock(me->stop_lock_);
01372       me->event_loop_id_ = pthread_self();
01373       me->id_set_ = true;
01374       me->starter_.notify_all();
01375     }
01376 
01377     Status rc = epException;
01378     FDEBUG(2) << "asyncRun starting ......................\n";
01379 
01380     try {
01381       bool onlineStateTransitions = true;
01382       rc = me->runToCompletion(onlineStateTransitions);
01383     }
01384     catch (cms::Exception& e) {
01385       LogError("FwkJob") << "cms::Exception caught in "
01386                          << "EventProcessor::asyncRun"
01387                          << "\n"
01388                          << e.explainSelf();
01389       me->last_error_text_ = e.explainSelf();
01390     }
01391     catch (std::exception& e) {
01392       LogError("FwkJob") << "Standard library exception caught in "
01393                          << "EventProcessor::asyncRun"
01394                          << "\n"
01395                          << e.what();
01396       me->last_error_text_ = e.what();
01397     }
01398     catch (...) {
01399       LogError("FwkJob") << "Unknown exception caught in "
01400                          << "EventProcessor::asyncRun"
01401                          << "\n";
01402       me->last_error_text_ = "Unknown exception caught";
01403       rc = epOther;
01404     }
01405 
01406     me->last_rc_ = rc;
01407 
01408     {
01409       // notify anyone waiting for exit that we are doing so now
01410       boost::mutex::scoped_lock sl(me->stop_lock_);
01411       ++me->stop_count_;
01412       me->stopper_.notify_all();
01413     }
01414     FDEBUG(2) << "asyncRun ending ......................\n";
01415   }
01416 
01417 
01418   EventProcessor::StatusCode
01419   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01420 
01421     StateSentry toerror(this);
01422 
01423     int numberOfEventsToProcess = -1;
01424     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01425 
01426     if(machine_.get() != 0) {
01427       throw Exception(errors::LogicError)
01428         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01429         << "Please report this error to the Framework group\n";
01430     }
01431 
01432     toerror.succeeded();
01433 
01434     return returnCode;
01435   }
01436 
01437   EventProcessor::StatusCode
01438   EventProcessor::runEventCount(int numberOfEventsToProcess) {
01439 
01440     StateSentry toerror(this);
01441 
01442     bool onlineStateTransitions = false;
01443     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01444 
01445     toerror.succeeded();
01446 
01447     return returnCode;
01448   }
01449 
01450   EventProcessor::StatusCode
01451   EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01452 
01453     // Reusable event principal
01454     boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
01455     principalCache_.insert(ep);
01456 
01457     beginJob(); //make sure this was called
01458 
01459     if(!onlineStateTransitions) changeState(mRunCount);
01460 
01461     StatusCode returnCode = epSuccess;
01462     stateMachineWasInErrorState_ = false;
01463 
01464     // make the services available
01465     ServiceRegistry::Operate operate(serviceToken_);
01466 
01467     if(machine_.get() == 0) {
01468 
01469       statemachine::FileMode fileMode;
01470       if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01471       else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01472       else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01473       else {
01474          throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01475             << fileMode_ << ".\n"
01476             << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01477       }
01478 
01479       statemachine::EmptyRunLumiMode emptyRunLumiMode;
01480       if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01481       else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01482       else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01483       else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01484       else {
01485          throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01486             << emptyRunLumiMode_ << ".\n"
01487             << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01488       }
01489 
01490       machine_.reset(new statemachine::Machine(this,
01491                                                fileMode,
01492                                                emptyRunLumiMode));
01493 
01494       machine_->initiate();
01495     }
01496 
01497     try {
01498 
01499       InputSource::ItemType itemType;
01500 
01501       int iEvents = 0;
01502 
01503       while(true) {
01504 
01505         itemType = input_->nextItemType();
01506 
01507         FDEBUG(1) << "itemType = " << itemType << "\n";
01508 
01509         // These are used for asynchronous running only and
01510         // and are checking to see if stopAsync or shutdownAsync
01511         // were called from another thread.  In the future, we
01512         // may need to do something better than polling the state.
01513         // With the current code this is the simplest thing and
01514         // it should always work.  If the interaction between
01515         // threads becomes more complex this may cause problems.
01516         if(state_ == sStopping) {
01517           FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01518           forceLooperToEnd_ = true;
01519           machine_->process_event(statemachine::Stop());
01520           forceLooperToEnd_ = false;
01521           break;
01522         }
01523         else if(state_ == sShuttingDown) {
01524           FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01525           forceLooperToEnd_ = true;
01526           machine_->process_event(statemachine::Stop());
01527           forceLooperToEnd_ = false;
01528           break;
01529         }
01530 
01531         // Look for a shutdown signal
01532         {
01533           boost::mutex::scoped_lock sl(usr2_lock);
01534           if(shutdown_flag) {
01535             changeState(mShutdownSignal);
01536             returnCode = epSignal;
01537             forceLooperToEnd_ = true;
01538             machine_->process_event(statemachine::Stop());
01539             forceLooperToEnd_ = false;
01540             break;
01541           }
01542         }
01543 
01544         if(itemType == InputSource::IsStop) {
01545           machine_->process_event(statemachine::Stop());
01546         }
01547         else if(itemType == InputSource::IsFile) {
01548           machine_->process_event(statemachine::File());
01549         }
01550         else if(itemType == InputSource::IsRun) {
01551           machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run()));
01552         }
01553         else if(itemType == InputSource::IsLumi) {
01554           machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01555         }
01556         else if(itemType == InputSource::IsEvent) {
01557           machine_->process_event(statemachine::Event());
01558           ++iEvents;
01559           if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01560             returnCode = epCountComplete;
01561             changeState(mInputExhausted);
01562             FDEBUG(1) << "Event count complete, pausing event loop\n";
01563             break;
01564           }
01565         }
01566         // This should be impossible
01567         else {
01568           throw Exception(errors::LogicError)
01569             << "Unknown next item type passed to EventProcessor\n"
01570             << "Please report this error to the Framework group\n";
01571         }
01572 
01573         if(machine_->terminated()) {
01574           changeState(mInputExhausted);
01575           break;
01576         }
01577       }  // End of loop over state machine events
01578     } // Try block
01579 
01580     // Some comments on exception handling related to the boost state machine:
01581     //
01582     // Some states used in the machine are special because they
01583     // perform actions while the machine is being terminated, actions
01584     // such as close files, call endRun, call endLumi etc ...  Each of these
01585     // states has two functions that perform these actions.  The functions
01586     // are almost identical.  The major difference is that one version
01587     // catches all exceptions and the other lets exceptions pass through.
01588     // The destructor catches them and the other function named "exit" lets
01589     // them pass through.  On a normal termination, boost will always call
01590     // "exit" and then the state destructor.  In our state classes, the
01591     // the destructors do nothing if the exit function already took
01592     // care of things.  Here's the interesting part.  When boost is
01593     // handling an exception the "exit" function is not called (a boost
01594     // feature).
01595     //
01596     // If an exception occurs while the boost machine is in control
01597     // (which usually means inside a process_event call), then
01598     // the boost state machine destroys its states and "terminates" itself.
01599     // This already done before we hit the catch blocks below. In this case
01600     // the call to terminateMachine below only destroys an already
01601     // terminated state machine.  Because exit is not called, the state destructors
01602     // handle cleaning up lumis, runs, and files.  The destructors swallow
01603     // all exceptions and only pass through the exceptions messages, which
01604     // are tacked onto the original exception below.
01605     //
01606     // If an exception occurs when the boost state machine is not
01607     // in control (outside the process_event functions), then boost
01608     // cannot destroy its own states.  The terminateMachine function
01609     // below takes care of that.  The flag "alreadyHandlingException"
01610     // is set true so that the state exit functions do nothing (and
01611     // cannot throw more exceptions while handling the first).  Then the
01612     // state destructors take care of this because exit did nothing.
01613     //
01614     // In both cases above, the EventProcessor::endOfLoop function is
01615     // not called because it can throw exceptions.
01616     //
01617     // One tricky aspect of the state machine is that things that can
01618     // throw should not be invoked by the state machine while another
01619     // exception is being handled.
01620     // Another tricky aspect is that it appears to be important to
01621     // terminate the state machine before invoking its destructor.
01622     // We've seen crashes that are not understood when that is not
01623     // done.  Maintainers of this code should be careful about this.
01624 
01625     catch (cms::Exception& e) {
01626       alreadyHandlingException_ = true;
01627       terminateMachine();
01628       alreadyHandlingException_ = false;
01629       e << "cms::Exception caught in EventProcessor and rethrown\n";
01630       e << exceptionMessageLumis_;
01631       e << exceptionMessageRuns_;
01632       e << exceptionMessageFiles_;
01633       throw;
01634     }
01635     catch (std::bad_alloc& e) {
01636       alreadyHandlingException_ = true;
01637       terminateMachine();
01638       alreadyHandlingException_ = false;
01639       throw cms::Exception("std::bad_alloc")
01640         << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n"
01641         << "The job has probably exhausted the virtual memory available to the process.\n"
01642         << exceptionMessageLumis_
01643         << exceptionMessageRuns_
01644         << exceptionMessageFiles_;
01645     }
01646     catch (std::exception& e) {
01647       alreadyHandlingException_ = true;
01648       terminateMachine();
01649       alreadyHandlingException_ = false;
01650       throw cms::Exception("StdException")
01651         << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n"
01652         << "Previous information:\n" << e.what() << "\n"
01653         << exceptionMessageLumis_
01654         << exceptionMessageRuns_
01655         << exceptionMessageFiles_;
01656     }
01657     catch (...) {
01658       alreadyHandlingException_ = true;
01659       terminateMachine();
01660       alreadyHandlingException_ = false;
01661       throw cms::Exception("Unknown")
01662         << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n"
01663         << exceptionMessageLumis_
01664         << exceptionMessageRuns_
01665         << exceptionMessageFiles_;
01666     }
01667 
01668     if(machine_->terminated()) {
01669       FDEBUG(1) << "The state machine reports it has been terminated\n";
01670       machine_.reset();
01671     }
01672 
01673     if(!onlineStateTransitions) changeState(mFinished);
01674 
01675     if(stateMachineWasInErrorState_) {
01676       throw cms::Exception("BadState")
01677         << "The boost state machine in the EventProcessor exited after\n"
01678         << "entering the Error state.\n";
01679     }
01680 
01681     return returnCode;
01682   }
01683 
01684   void EventProcessor::readFile() {
01685     FDEBUG(1) << " \treadFile\n";
01686     fb_ = input_->readFile();
01687     if(numberOfForkedChildren_ > 0) {
01688         fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01689     }
01690   }
01691 
01692   void EventProcessor::closeInputFile() {
01693     input_->closeFile(fb_);
01694     FDEBUG(1) << "\tcloseInputFile\n";
01695   }
01696 
01697   void EventProcessor::openOutputFiles() {
01698     schedule_->openOutputFiles(*fb_);
01699     if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01700     FDEBUG(1) << "\topenOutputFiles\n";
01701   }
01702 
01703   void EventProcessor::closeOutputFiles() {
01704     schedule_->closeOutputFiles();
01705     if(hasSubProcess()) subProcess_->closeOutputFiles();
01706     FDEBUG(1) << "\tcloseOutputFiles\n";
01707   }
01708 
01709   void EventProcessor::respondToOpenInputFile() {
01710     schedule_->respondToOpenInputFile(*fb_);
01711     if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01712     FDEBUG(1) << "\trespondToOpenInputFile\n";
01713   }
01714 
01715   void EventProcessor::respondToCloseInputFile() {
01716     schedule_->respondToCloseInputFile(*fb_);
01717     if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01718     FDEBUG(1) << "\trespondToCloseInputFile\n";
01719   }
01720 
01721   void EventProcessor::respondToOpenOutputFiles() {
01722     schedule_->respondToOpenOutputFiles(*fb_);
01723     if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01724     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01725   }
01726 
01727   void EventProcessor::respondToCloseOutputFiles() {
01728     schedule_->respondToCloseOutputFiles(*fb_);
01729     if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01730     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01731   }
01732 
01733   void EventProcessor::startingNewLoop() {
01734     shouldWeStop_ = false;
01735     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
01736     // until after we've called beginOfJob
01737     if(looper_ && looperBeginJobRun_) {
01738       looper_->doStartingNewLoop();
01739     }
01740     FDEBUG(1) << "\tstartingNewLoop\n";
01741   }
01742 
01743   bool EventProcessor::endOfLoop() {
01744     if(looper_) {
01745       ModuleChanger changer(schedule_.get());
01746       looper_->setModuleChanger(&changer);
01747       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01748       looper_->setModuleChanger(0);
01749       if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01750       else return false;
01751     }
01752     FDEBUG(1) << "\tendOfLoop\n";
01753     return true;
01754   }
01755 
01756   void EventProcessor::rewindInput() {
01757     input_->repeat();
01758     input_->rewind();
01759     FDEBUG(1) << "\trewind\n";
01760   }
01761 
01762   void EventProcessor::prepareForNextLoop() {
01763     looper_->prepareForNextLoop(esp_.get());
01764     FDEBUG(1) << "\tprepareForNextLoop\n";
01765   }
01766 
01767   bool EventProcessor::shouldWeCloseOutput() const {
01768     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01769     return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01770   }
01771 
01772   void EventProcessor::doErrorStuff() {
01773     FDEBUG(1) << "\tdoErrorStuff\n";
01774     LogError("StateMachine")
01775       << "The EventProcessor state machine encountered an unexpected event\n"
01776       << "and went to the error state\n"
01777       << "Will attempt to terminate processing normally\n"
01778       << "(IF using the looper the next loop will be attempted)\n"
01779       << "This likely indicates a bug in an input module or corrupted input or both\n";
01780     stateMachineWasInErrorState_ = true;
01781   }
01782 
01783   void EventProcessor::beginRun(statemachine::Run const& run) {
01784     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01785     input_->doBeginRun(runPrincipal);
01786     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01787                     runPrincipal.beginTime());
01788     if(forceESCacheClearOnNewRun_){
01789       esp_->forceCacheClear();
01790     }
01791     EventSetup const& es = esp_->eventSetupForInstance(ts);
01792     if(looper_ && looperBeginJobRun_== false) {
01793       looper_->copyInfo(ScheduleInfo(schedule_.get()));
01794       looper_->beginOfJob(es);
01795       looperBeginJobRun_ = true;
01796       looper_->doStartingNewLoop();
01797     }
01798     {
01799       typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01800       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01801       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01802       if(hasSubProcess()) {
01803         subProcess_->doBeginRun(runPrincipal, ts);
01804       }
01805     }
01806     FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01807     if(looper_) {
01808       looper_->doBeginRun(runPrincipal, es);
01809     }
01810   }
01811 
01812   void EventProcessor::endRun(statemachine::Run const& run) {
01813     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01814     input_->doEndRun(runPrincipal);
01815     IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01816                     runPrincipal.endTime());
01817     EventSetup const& es = esp_->eventSetupForInstance(ts);
01818     {
01819       typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
01820       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01821       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01822       if(hasSubProcess()) {
01823         subProcess_->doEndRun(runPrincipal, ts);
01824       }
01825     }
01826     FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
01827     if(looper_) {
01828       looper_->doEndRun(runPrincipal, es);
01829     }
01830   }
01831 
01832   void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
01833     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01834     input_->doBeginLumi(lumiPrincipal);
01835 
01836     Service<RandomNumberGenerator> rng;
01837     if(rng.isAvailable()) {
01838       LuminosityBlock lb(lumiPrincipal, ModuleDescription());
01839       rng->preBeginLumi(lb);
01840     }
01841 
01842     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
01843     // lumi blocks know their start and end times why not also start and end events?
01844     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
01845     EventSetup const& es = esp_->eventSetupForInstance(ts);
01846     {
01847       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
01848       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01849       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01850       if(hasSubProcess()) {
01851         subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
01852       }
01853     }
01854     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01855     if(looper_) {
01856       looper_->doBeginLuminosityBlock(lumiPrincipal, es);
01857     }
01858   }
01859 
01860   void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
01861     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01862     input_->doEndLumi(lumiPrincipal);
01863     //NOTE: Using the max event number for the end of a lumi block is a bad idea
01864     // lumi blocks know their start and end times why not also start and end events?
01865     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
01866                     lumiPrincipal.endTime());
01867     EventSetup const& es = esp_->eventSetupForInstance(ts);
01868     {
01869       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
01870       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01871       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01872       if(hasSubProcess()) {
01873         subProcess_->doEndLuminosityBlock(lumiPrincipal, ts);
01874       }
01875     }
01876     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01877     if(looper_) {
01878       looper_->doEndLuminosityBlock(lumiPrincipal, es);
01879     }
01880   }
01881 
01882   statemachine::Run EventProcessor::readAndCacheRun() {
01883     input_->readAndCacheRun();
01884     input_->markRun();
01885     return statemachine::Run(input_->processHistoryID(), input_->run());
01886   }
01887 
01888   int EventProcessor::readAndCacheLumi() {
01889     input_->readAndCacheLumi();
01890     input_->markLumi();
01891     return input_->luminosityBlock();
01892   }
01893 
01894   void EventProcessor::writeRun(statemachine::Run const& run) {
01895     schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
01896     if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
01897     FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
01898   }
01899 
01900   void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
01901     principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
01902     FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
01903   }
01904 
01905   void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
01906     schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
01907     if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
01908     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01909   }
01910 
01911   void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
01912     principalCache_.deleteLumi(phid, run, lumi);
01913     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01914   }
01915 
01916   void EventProcessor::readAndProcessEvent() {
01917     EventPrincipal *pep = 0;
01918     try {
01919       pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
01920       FDEBUG(1) << "\treadEvent\n";
01921     }
01922     catch(cms::Exception& e) {
01923       actions::ActionCodes action = act_table_->find(e.rootCause());
01924       if(action == actions::Rethrow) {
01925         throw;
01926       } else {
01927         LogWarning(e.category())
01928           << "an exception occurred and all paths for the event are being skipped: \n"
01929           << e.what();
01930         return;
01931       }
01932     }
01933     assert(pep != 0);
01934 
01935     IOVSyncValue ts(pep->id(), pep->time());
01936     EventSetup const& es = esp_->eventSetupForInstance(ts);
01937     {
01938       typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
01939       ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
01940       schedule_->processOneOccurrence<Traits>(*pep, es);
01941       if(hasSubProcess()) {
01942         subProcess_->doEvent(*pep, ts);
01943       }
01944     }
01945 
01946     if(looper_) {
01947       bool randomAccess = input_->randomAccess();
01948       ProcessingController::ForwardState forwardState = input_->forwardState();
01949       ProcessingController::ReverseState reverseState = input_->reverseState();
01950       ProcessingController pc(forwardState, reverseState, randomAccess);
01951 
01952       EDLooperBase::Status status = EDLooperBase::kContinue;
01953       do {
01954         status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
01955 
01956         bool succeeded = true;
01957         if(randomAccess) {
01958           if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
01959             input_->skipEvents(-2);
01960           }
01961           else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
01962             succeeded = input_->goToEvent(pc.specifiedEventTransition());
01963           }
01964         }
01965         pc.setLastOperationSucceeded(succeeded);
01966       } while(!pc.lastOperationSucceeded());
01967       if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
01968 
01969     }
01970 
01971     FDEBUG(1) << "\tprocessEvent\n";
01972     pep->clearEventPrincipal();
01973   }
01974 
01975   bool EventProcessor::shouldWeStop() const {
01976     FDEBUG(1) << "\tshouldWeStop\n";
01977     if(shouldWeStop_) return true;
01978     return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
01979   }
01980 
01981   void EventProcessor::setExceptionMessageFiles(std::string& message) {
01982     exceptionMessageFiles_ = message;
01983   }
01984 
01985   void EventProcessor::setExceptionMessageRuns(std::string& message) {
01986     exceptionMessageRuns_ = message;
01987   }
01988 
01989   void EventProcessor::setExceptionMessageLumis(std::string& message) {
01990     exceptionMessageLumis_ = message;
01991   }
01992 
01993   bool EventProcessor::alreadyHandlingException() const {
01994     return alreadyHandlingException_;
01995   }
01996 
01997   void EventProcessor::terminateMachine() {
01998     if(machine_.get() != 0) {
01999       if(!machine_->terminated()) {
02000         forceLooperToEnd_ = true;
02001         machine_->process_event(statemachine::Stop());
02002         forceLooperToEnd_ = false;
02003       }
02004       else {
02005         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
02006       }
02007       if(machine_->terminated()) {
02008         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02009       }
02010       machine_.reset();
02011     }
02012   }
02013 }