CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/FWCore/Framework/src/EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011 
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/InputSourceDescription.h"
00019 #include "FWCore/Framework/interface/IOVSyncValue.h"
00020 #include "FWCore/Framework/interface/LooperFactory.h"
00021 #include "FWCore/Framework/interface/LuminosityBlock.h"
00022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00023 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00024 #include "FWCore/Framework/interface/ModuleChanger.h"
00025 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00026 #include "FWCore/Framework/interface/ProcessingController.h"
00027 #include "FWCore/Framework/interface/RunPrincipal.h"
00028 #include "FWCore/Framework/interface/Schedule.h"
00029 #include "FWCore/Framework/interface/ScheduleInfo.h"
00030 #include "FWCore/Framework/interface/SubProcess.h"
00031 #include "FWCore/Framework/src/Breakpoints.h"
00032 #include "FWCore/Framework/src/EPStates.h"
00033 #include "FWCore/Framework/src/EventSetupsController.h"
00034 #include "FWCore/Framework/src/InputSourceFactory.h"
00035 
00036 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00037 
00038 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00039 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00040 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00042 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00043 #include "FWCore/ParameterSet/interface/Registry.h"
00044 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00045 
00046 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00047 #include "FWCore/ServiceRegistry/interface/Service.h"
00048 
00049 #include "FWCore/Utilities/interface/DebugMacros.h"
00050 #include "FWCore/Utilities/interface/EDMException.h"
00051 #include "FWCore/Utilities/interface/Exception.h"
00052 #include "FWCore/Utilities/interface/ConvertException.h"
00053 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00054 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00055 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00056 
00057 #include "boost/bind.hpp"
00058 #include "boost/thread/xtime.hpp"
00059 
00060 #include <exception>
00061 #include <iomanip>
00062 #include <iostream>
00063 #include <utility>
00064 #include <sstream>
00065 
00066 #include <sys/ipc.h>
00067 #include <sys/msg.h>
00068 
00069 //Used for forking
00070 #include <sys/types.h>
00071 #include <sys/wait.h>
00072 #include <unistd.h>
00073 
00074 //Used for CPU affinity
00075 #ifndef __APPLE__
00076 #include <sched.h>
00077 #endif
00078 
00079 namespace edm {
00080 
00081   namespace event_processor {
00082 
00083     class StateSentry {
00084     public:
00085       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00086       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00087       void succeeded() {success_ = true;}
00088 
00089     private:
00090       EventProcessor* ep_;
00091       bool success_;
00092     };
00093   }
00094 
00095   using namespace event_processor;
00096 
00097   namespace {
00098     template <typename T>
00099     class ScheduleSignalSentry {
00100     public:
00101       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00102            a_(a), principal_(principal), es_(es) {
00103         if (a_) T::preScheduleSignal(a_, principal_);
00104       }
00105       ~ScheduleSignalSentry() {
00106         if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00107       }
00108 
00109     private:
00110       // We own none of these resources.
00111       ActivityRegistry* a_;
00112       typename T::MyPrincipal* principal_;
00113       EventSetup const* es_;
00114     };
00115   }
00116 
00117   namespace {
00118 
00119     // the next two tables must be kept in sync with the state and
00120     // message enums from the header
00121 
00122     char const* stateNames[] = {
00123       "Init",
00124       "JobReady",
00125       "RunGiven",
00126       "Running",
00127       "Stopping",
00128       "ShuttingDown",
00129       "Done",
00130       "JobEnded",
00131       "Error",
00132       "ErrorEnded",
00133       "End",
00134       "Invalid"
00135     };
00136 
00137     char const* msgNames[] = {
00138       "SetRun",
00139       "Skip",
00140       "RunAsync",
00141       "Run(ID)",
00142       "Run(count)",
00143       "BeginJob",
00144       "StopAsync",
00145       "ShutdownAsync",
00146       "EndJob",
00147       "CountComplete",
00148       "InputExhausted",
00149       "StopSignal",
00150       "ShutdownSignal",
00151       "Finished",
00152       "Any",
00153       "dtor",
00154       "Exception",
00155       "Rewind"
00156     };
00157   }
00158     // IMPORTANT NOTE:
00159     // the mAny messages are special, they must appear last in the
00160     // table if multiple entries for a CurrentState are present.
00161     // the changeState function does not use the mAny yet!!!
00162 
00163     struct TransEntry {
00164       State current;
00165       Msg   message;
00166       State final;
00167     };
00168 
00169     // we should use this information to initialize a two dimensional
00170     // table of t[CurrentState][Message] = FinalState
00171 
00172     /*
00173       the way this is current written, the async run can thread function
00174       can return in the "JobReady" state - but not yet cleaned up.  The
00175       problem is that only when stop/shutdown async is called is the
00176       thread cleaned up. But the stop/shudown async functions attempt
00177       first to change the state using messages that are not valid in
00178       "JobReady" state.
00179 
00180       I think most of the problems can be solved by using two states
00181       for "running": RunningS and RunningA (sync/async). The problems
00182       seems to be the all the transitions out of running for both
00183       modes of operation.  The other solution might be to only go to
00184       "Stopping" from Running, and use the return code from "run_p" to
00185       set the final state.  If this is used, then in the sync mode the
00186       "Stopping" state would be momentary.
00187 
00188      */
00189 
00190     TransEntry table[] = {
00191     // CurrentState   Message         FinalState
00192     // -----------------------------------------
00193       { sInit,          mException,      sError },
00194       { sInit,          mBeginJob,       sJobReady },
00195       { sJobReady,      mException,      sError },
00196       { sJobReady,      mSetRun,         sRunGiven },
00197       { sJobReady,      mInputRewind,    sRunning },
00198       { sJobReady,      mSkip,           sRunning },
00199       { sJobReady,      mRunID,          sRunning },
00200       { sJobReady,      mRunCount,       sRunning },
00201       { sJobReady,      mEndJob,         sJobEnded },
00202       { sJobReady,      mBeginJob,       sJobReady },
00203       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00204 
00205       { sJobReady,      mStopAsync,      sJobReady },
00206       { sJobReady,      mCountComplete,  sJobReady },
00207       { sJobReady,      mFinished,       sJobReady },
00208 
00209       { sRunGiven,      mException,      sError },
00210       { sRunGiven,      mRunAsync,       sRunning },
00211       { sRunGiven,      mBeginJob,       sRunGiven },
00212       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00213       { sRunGiven,      mStopAsync,      sStopping },
00214       { sRunning,       mException,      sError },
00215       { sRunning,       mStopAsync,      sStopping },
00216       { sRunning,       mShutdownAsync,  sShuttingDown },
00217       { sRunning,       mShutdownSignal, sShuttingDown },
00218       { sRunning,       mCountComplete,  sStopping }, // sJobReady
00219       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00220 
00221       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00222       { sStopping,      mException,      sError },
00223       { sStopping,      mFinished,       sJobReady },
00224       { sStopping,      mCountComplete,  sJobReady },
00225       { sStopping,      mShutdownSignal, sShuttingDown },
00226       { sStopping,      mStopAsync,      sStopping },     // stay
00227       { sStopping,      mInputExhausted, sStopping },     // stay
00228       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00229       { sShuttingDown,  mException,      sError },
00230       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00231       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00232       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00233       { sShuttingDown,  mFinished,       sDone },
00234       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00235       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00236       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00237       { sDone,          mEndJob,         sJobEnded },
00238       { sDone,          mException,      sError },
00239       { sJobEnded,      mDtor,           sEnd },
00240       { sJobEnded,      mException,      sError },
00241       { sError,         mEndJob,         sError },   // funny one here
00242       { sError,         mDtor,           sError },   // funny one here
00243       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00244       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00245       { sInvalid,       mAny,            sInvalid }
00246     };
00247 
00248 
00249     // Note: many of the messages generate the mBeginJob message first
00250     //  mRunID, mRunCount, mSetRun
00251 
00252   // ---------------------------------------------------------------
00253   boost::shared_ptr<InputSource>
00254   makeInput(ParameterSet& params,
00255             CommonParams const& common,
00256             ProductRegistry& preg,
00257             PrincipalCache& pCache,
00258             boost::shared_ptr<ActivityRegistry> areg,
00259             boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00260     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00261     if(main_input == 0) {
00262       throw Exception(errors::Configuration)
00263         << "There must be exactly one source in the configuration.\n"
00264         << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00265     }
00266 
00267     std::string modtype(main_input->getParameter<std::string>("@module_type"));
00268 
00269     std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00270       ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00271     ConfigurationDescriptions descriptions(filler->baseType());
00272     filler->fill(descriptions);
00273 
00274     try {
00275       try {
00276         descriptions.validate(*main_input, std::string("source"));
00277       }
00278       catch (cms::Exception& e) { throw; }
00279       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00280       catch (std::exception& e) { convertException::stdToEDM(e); }
00281       catch(std::string& s) { convertException::stringToEDM(s); }
00282       catch(char const* c) { convertException::charPtrToEDM(c); }
00283       catch (...) { convertException::unknownToEDM(); }
00284     }
00285     catch (cms::Exception & iException) {
00286       std::ostringstream ost;
00287       ost << "Validating configuration of input source of type " << modtype;
00288       iException.addContext(ost.str());
00289       throw;
00290     }
00291 
00292     main_input->registerIt();
00293 
00294     // Fill in "ModuleDescription", in case the input source produces
00295     // any EDproducts, which would be registered in the ProductRegistry.
00296     // Also fill in the process history item for this process.
00297     // There is no module label for the unnamed input source, so
00298     // just use "source".
00299     // Only the tracked parameters belong in the process configuration.
00300     ModuleDescription md(main_input->id(),
00301                          main_input->getParameter<std::string>("@module_type"),
00302                          "source",
00303                          processConfiguration.get());
00304 
00305     InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00306     areg->preSourceConstructionSignal_(md);
00307     boost::shared_ptr<InputSource> input;
00308     try {
00309       try {
00310         input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00311       }
00312       catch (cms::Exception& e) { throw; }
00313       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00314       catch (std::exception& e) { convertException::stdToEDM(e); }
00315       catch(std::string& s) { convertException::stringToEDM(s); }
00316       catch(char const* c) { convertException::charPtrToEDM(c); }
00317       catch (...) { convertException::unknownToEDM(); }
00318     }
00319     catch (cms::Exception& iException) {
00320       areg->postSourceConstructionSignal_(md);
00321       std::ostringstream ost;
00322       ost << "Constructing input source of type " << modtype;
00323       iException.addContext(ost.str());
00324       throw;
00325     }
00326     areg->postSourceConstructionSignal_(md);
00327     return input;
00328   }
00329 
00330   // ---------------------------------------------------------------
00331   boost::shared_ptr<EDLooperBase>
00332   fillLooper(eventsetup::EventSetupProvider& cp,
00333                          ParameterSet& params,
00334                          CommonParams const& common) {
00335     boost::shared_ptr<EDLooperBase> vLooper;
00336 
00337     std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00338 
00339     if(loopers.size() == 0) {
00340        return vLooper;
00341     }
00342 
00343     assert(1 == loopers.size());
00344 
00345     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00346         itName != itNameEnd;
00347         ++itName) {
00348 
00349       ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00350       providerPSet->registerIt();
00351       vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00352                                                         *providerPSet,
00353                                                         common.processName_,
00354                                                         common.releaseVersion_,
00355                                                         common.passID_);
00356       }
00357       return vLooper;
00358 
00359   }
00360 
00361   // ---------------------------------------------------------------
00362   EventProcessor::EventProcessor(std::string const& config,
00363                                 ServiceToken const& iToken,
00364                                 serviceregistry::ServiceLegacy iLegacy,
00365                                 std::vector<std::string> const& defaultServices,
00366                                 std::vector<std::string> const& forcedServices) :
00367     preProcessEventSignal_(),
00368     postProcessEventSignal_(),
00369     actReg_(),
00370     preg_(),
00371     serviceToken_(),
00372     input_(),
00373     esp_(),
00374     act_table_(),
00375     processConfiguration_(),
00376     schedule_(),
00377     subProcess_(),
00378     state_(sInit),
00379     event_loop_(),
00380     state_lock_(),
00381     stop_lock_(),
00382     stopper_(),
00383     starter_(),
00384     stop_count_(-1),
00385     last_rc_(epSuccess),
00386     last_error_text_(),
00387     id_set_(false),
00388     event_loop_id_(),
00389     my_sig_num_(getSigNum()),
00390     fb_(),
00391     looper_(),
00392     machine_(),
00393     principalCache_(),
00394     shouldWeStop_(false),
00395     stateMachineWasInErrorState_(false),
00396     fileMode_(),
00397     emptyRunLumiMode_(),
00398     exceptionMessageFiles_(),
00399     exceptionMessageRuns_(),
00400     exceptionMessageLumis_(),
00401     alreadyHandlingException_(false),
00402     forceLooperToEnd_(false),
00403     looperBeginJobRun_(false),
00404     forceESCacheClearOnNewRun_(false),
00405     numberOfForkedChildren_(0),
00406     numberOfSequentialEventsPerChild_(1),
00407     setCpuAffinity_(false),
00408     eventSetupDataToExcludeFromPrefetching_() {
00409     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00410     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00411     processDesc->addServices(defaultServices, forcedServices);
00412     init(processDesc, iToken, iLegacy);
00413   }
00414 
00415   EventProcessor::EventProcessor(std::string const& config,
00416                                 std::vector<std::string> const& defaultServices,
00417                                 std::vector<std::string> const& forcedServices) :
00418     preProcessEventSignal_(),
00419     postProcessEventSignal_(),
00420     actReg_(),
00421     preg_(),
00422     serviceToken_(),
00423     input_(),
00424     esp_(),
00425     act_table_(),
00426     processConfiguration_(),
00427     schedule_(),
00428     subProcess_(),
00429     state_(sInit),
00430     event_loop_(),
00431     state_lock_(),
00432     stop_lock_(),
00433     stopper_(),
00434     starter_(),
00435     stop_count_(-1),
00436     last_rc_(epSuccess),
00437     last_error_text_(),
00438     id_set_(false),
00439     event_loop_id_(),
00440     my_sig_num_(getSigNum()),
00441     fb_(),
00442     looper_(),
00443     machine_(),
00444     principalCache_(),
00445     shouldWeStop_(false),
00446     stateMachineWasInErrorState_(false),
00447     fileMode_(),
00448     emptyRunLumiMode_(),
00449     exceptionMessageFiles_(),
00450     exceptionMessageRuns_(),
00451     exceptionMessageLumis_(),
00452     alreadyHandlingException_(false),
00453     forceLooperToEnd_(false),
00454     looperBeginJobRun_(false),
00455     forceESCacheClearOnNewRun_(false),
00456     numberOfForkedChildren_(0),
00457     numberOfSequentialEventsPerChild_(1),
00458     setCpuAffinity_(false),
00459     eventSetupDataToExcludeFromPrefetching_() {
00460     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00461     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00462     processDesc->addServices(defaultServices, forcedServices);
00463     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00464   }
00465 
00466   EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00467                  ServiceToken const& token,
00468                  serviceregistry::ServiceLegacy legacy) :
00469     preProcessEventSignal_(),
00470     postProcessEventSignal_(),
00471     actReg_(),
00472     preg_(),
00473     serviceToken_(),
00474     input_(),
00475     esp_(),
00476     act_table_(),
00477     processConfiguration_(),
00478     schedule_(),
00479     subProcess_(),
00480     state_(sInit),
00481     event_loop_(),
00482     state_lock_(),
00483     stop_lock_(),
00484     stopper_(),
00485     starter_(),
00486     stop_count_(-1),
00487     last_rc_(epSuccess),
00488     last_error_text_(),
00489     id_set_(false),
00490     event_loop_id_(),
00491     my_sig_num_(getSigNum()),
00492     fb_(),
00493     looper_(),
00494     machine_(),
00495     principalCache_(),
00496     shouldWeStop_(false),
00497     stateMachineWasInErrorState_(false),
00498     fileMode_(),
00499     emptyRunLumiMode_(),
00500     exceptionMessageFiles_(),
00501     exceptionMessageRuns_(),
00502     exceptionMessageLumis_(),
00503     alreadyHandlingException_(false),
00504     forceLooperToEnd_(false),
00505     looperBeginJobRun_(false),
00506     forceESCacheClearOnNewRun_(false),
00507     numberOfForkedChildren_(0),
00508     numberOfSequentialEventsPerChild_(1),
00509     setCpuAffinity_(false),
00510     eventSetupDataToExcludeFromPrefetching_() {
00511     init(processDesc, token, legacy);
00512   }
00513 
00514 
00515   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00516     preProcessEventSignal_(),
00517     postProcessEventSignal_(),
00518     actReg_(),
00519     preg_(),
00520     serviceToken_(),
00521     input_(),
00522     esp_(),
00523     act_table_(),
00524     processConfiguration_(),
00525     schedule_(),
00526     subProcess_(),
00527     state_(sInit),
00528     event_loop_(),
00529     state_lock_(),
00530     stop_lock_(),
00531     stopper_(),
00532     starter_(),
00533     stop_count_(-1),
00534     last_rc_(epSuccess),
00535     last_error_text_(),
00536     id_set_(false),
00537     event_loop_id_(),
00538     my_sig_num_(getSigNum()),
00539     fb_(),
00540     looper_(),
00541     machine_(),
00542     principalCache_(),
00543     shouldWeStop_(false),
00544     stateMachineWasInErrorState_(false),
00545     fileMode_(),
00546     emptyRunLumiMode_(),
00547     exceptionMessageFiles_(),
00548     exceptionMessageRuns_(),
00549     exceptionMessageLumis_(),
00550     alreadyHandlingException_(false),
00551     forceLooperToEnd_(false),
00552     looperBeginJobRun_(false),
00553     forceESCacheClearOnNewRun_(false),
00554     numberOfForkedChildren_(0),
00555     numberOfSequentialEventsPerChild_(1),
00556     setCpuAffinity_(false),
00557     eventSetupDataToExcludeFromPrefetching_() {
00558     if(isPython) {
00559       boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00560       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00561       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00562     }
00563     else {
00564       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00565       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00566     }
00567   }
00568 
00569   void
00570   EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00571                         ServiceToken const& iToken,
00572                         serviceregistry::ServiceLegacy iLegacy) {
00573 
00574     //std::cerr << processDesc->dump() << std::endl;
00575     // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
00576     //  They must be cleared here because some processes run multiple EventProcessors in succession.
00577     BranchIDListHelper::clearRegistries();
00578 
00579     boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00580     //std::cerr << parameterSet->dump() << std::endl;
00581 
00582     // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
00583     boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00584 
00585     // Now set some parameters specific to the main process.
00586     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00587     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00588     emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00589     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00590     ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00591     numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00592     numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00593     setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00594     std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00595     for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00596         itPS != itPSEnd;
00597         ++itPS) {
00598       eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00599                                                 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00600                                                                itPS->getUntrackedParameter<std::string>("label", "")));
00601     }
00602     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00603 
00604     std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00605 
00606     // Now do general initialization
00607     ScheduleItems items;
00608 
00609     //initialize the services
00610     boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00611     ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00612     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00613 
00614     //make the services available
00615     ServiceRegistry::Operate operate(serviceToken_);
00616 
00617     // intialize miscellaneous items
00618     boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00619 
00620     // intialize the event setup provider
00621     esp_ = espController->makeProvider(*parameterSet, *common);
00622 
00623     // initialize the looper, if any
00624     looper_ = fillLooper(*esp_, *parameterSet, *common);
00625     if(looper_) {
00626       looper_->setActionTable(items.act_table_.get());
00627       looper_->attachTo(*items.actReg_);
00628     }
00629 
00630     // initialize the input source
00631     input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00632 
00633     // intialize the Schedule
00634     schedule_ = items.initSchedule(*parameterSet);
00635 
00636     // set the data members
00637     act_table_ = items.act_table_;
00638     actReg_ = items.actReg_;
00639     preg_ = items.preg_;
00640     processConfiguration_ = items.processConfiguration_;
00641 
00642     FDEBUG(2) << parameterSet << std::endl;
00643     connectSigs(this);
00644 
00645     // initialize the subprocess, if there is one
00646     if(subProcessParameterSet) {
00647       subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController, *actReg_, token, serviceregistry::kConfigurationOverrides));
00648     }
00649   }
00650 
00651   EventProcessor::~EventProcessor() {
00652     // Make the services available while everything is being deleted.
00653     ServiceToken token = getToken();
00654     ServiceRegistry::Operate op(token);
00655 
00656     // The state machine should have already been cleaned up
00657     // and destroyed at this point by a call to EndJob or
00658     // earlier when it completed processing events, but if it
00659     // has not been we'll take care of it here at the last moment.
00660     // This could cause problems if we are already handling an
00661     // exception and another one is thrown here ...  For a critical
00662     // executable the solution to this problem is for the code using
00663     // the EventProcessor to explicitly call EndJob or use runToCompletion,
00664     // then the next line of code is never executed.
00665     terminateMachine();
00666 
00667     try {
00668       changeState(mDtor);
00669     }
00670     catch(cms::Exception& e) {
00671       LogError("System")
00672         << e.explainSelf() << "\n";
00673     }
00674 
00675     // manually destroy all these thing that may need the services around
00676     subProcess_.reset();
00677     esp_.reset();
00678     schedule_.reset();
00679     input_.reset();
00680     looper_.reset();
00681     actReg_.reset();
00682 
00683     pset::Registry* psetRegistry = pset::Registry::instance();
00684     psetRegistry->data().clear();
00685     psetRegistry->extra().setID(ParameterSetID());
00686 
00687     EntryDescriptionRegistry::instance()->data().clear();
00688     ParentageRegistry::instance()->data().clear();
00689     ProcessConfigurationRegistry::instance()->data().clear();
00690     ProcessHistoryRegistry::instance()->data().clear();
00691     BranchIDListHelper::clearRegistries();
00692   }
00693 
00694   void
00695   EventProcessor::rewind() {
00696     beginJob(); //make sure this was called
00697     changeState(mStopAsync);
00698     changeState(mInputRewind);
00699     {
00700       StateSentry toerror(this);
00701 
00702       //make the services available
00703       ServiceRegistry::Operate operate(serviceToken_);
00704 
00705       {
00706         input_->repeat();
00707         input_->rewind();
00708       }
00709       changeState(mCountComplete);
00710       toerror.succeeded();
00711     }
00712     changeState(mFinished);
00713   }
00714 
00715   EventProcessor::StatusCode
00716   EventProcessor::run(int numberEventsToProcess, bool) {
00717     return runEventCount(numberEventsToProcess);
00718   }
00719 
00720   EventProcessor::StatusCode
00721   EventProcessor::skip(int numberToSkip) {
00722     beginJob(); //make sure this was called
00723     changeState(mSkip);
00724     {
00725       StateSentry toerror(this);
00726 
00727       //make the services available
00728       ServiceRegistry::Operate operate(serviceToken_);
00729 
00730       {
00731         input_->skipEvents(numberToSkip);
00732       }
00733       changeState(mCountComplete);
00734       toerror.succeeded();
00735     }
00736     changeState(mFinished);
00737     return epSuccess;
00738   }
00739 
00740   void
00741   EventProcessor::beginJob() {
00742     if(state_ != sInit) return;
00743     bk::beginJob();
00744     // can only be run if in the initial state
00745     changeState(mBeginJob);
00746 
00747     // StateSentry toerror(this); // should we add this ?
00748     //make the services available
00749     ServiceRegistry::Operate operate(serviceToken_);
00750 
00751     //NOTE:  This implementation assumes 'Job' means one call
00752     // the EventProcessor::run
00753     // If it really means once per 'application' then this code will
00754     // have to be changed.
00755     // Also have to deal with case where have 'run' then new Module
00756     // added and do 'run'
00757     // again.  In that case the newly added Module needs its 'beginJob'
00758     // to be called.
00759 
00760     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
00761     //  For now we delay calling beginOfJob until first beginOfRun
00762     //if(looper_) {
00763     //   looper_->beginOfJob(es);
00764     //}
00765     try {
00766       try {
00767         input_->doBeginJob();
00768       }
00769       catch (cms::Exception& e) { throw; }
00770       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00771       catch (std::exception& e) { convertException::stdToEDM(e); }
00772       catch(std::string& s) { convertException::stringToEDM(s); }
00773       catch(char const* c) { convertException::charPtrToEDM(c); }
00774       catch (...) { convertException::unknownToEDM(); }
00775     }
00776     catch(cms::Exception& ex) {
00777       ex.addContext("Calling beginJob for the source");
00778       throw;
00779     }
00780     schedule_->beginJob();
00781     // toerror.succeeded(); // should we add this?
00782     if(hasSubProcess()) subProcess_->doBeginJob();
00783     actReg_->postBeginJobSignal_();
00784   }
00785 
00786   void
00787   EventProcessor::endJob() {
00788     // Collects exceptions, so we don't throw before all operations are performed.
00789     ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00790 
00791     // only allowed to run if state is sIdle, sJobReady, sRunGiven
00792     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00793 
00794     //make the services available
00795     ServiceRegistry::Operate operate(serviceToken_);
00796 
00797     c.call(boost::bind(&EventProcessor::terminateMachine, this));
00798     schedule_->endJob(c);
00799     if(hasSubProcess()) {
00800       c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00801     }
00802     c.call(boost::bind(&InputSource::doEndJob, input_));
00803     if(looper_) {
00804       c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00805     }
00806     c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00807     if(c.hasThrown()) {
00808       c.rethrow();
00809     }
00810   }
00811 
00812   ServiceToken
00813   EventProcessor::getToken() {
00814     return serviceToken_;
00815   }
00816 
00817   //Setup signal handler to listen for when forked children stop
00818   namespace {
00819     volatile bool child_failed = false;
00820     volatile unsigned int num_children_done = 0;
00821     volatile int child_fail_exit_status = 0;
00822 
00823     extern "C" {
00824       void ep_sigchld(int, siginfo_t*, void*) {
00825         //printf("in sigchld\n");
00826         //FDEBUG(1) << "in sigchld handler\n";
00827         int stat_loc;
00828         pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00829         while(0<p) {
00830           //printf("  looping\n");
00831           if(WIFEXITED(stat_loc)) {
00832             ++num_children_done;
00833             if(0 != WEXITSTATUS(stat_loc)) {
00834               child_failed = true;
00835               child_fail_exit_status = WEXITSTATUS(stat_loc);
00836             }
00837           }
00838           if(WIFSIGNALED(stat_loc)) {
00839             ++num_children_done;
00840             child_failed = true;
00841           }
00842           p = waitpid(-1, &stat_loc, WNOHANG);
00843         }
00844       }
00845     }
00846 
00847   }
00848 
00849   enum {
00850     kChildSucceed,
00851     kChildExitBadly,
00852     kChildSegv,
00853     kMaxChildAction
00854   };
00855 
00856   namespace {
00857     unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00858       unsigned int n = 0;
00859       while(numberOfChildren != 0) {
00860         ++n;
00861         numberOfChildren /= 10;
00862       }
00863       if(n == 0) {
00864         n = 3; // Protect against zero numberOfChildren
00865       }
00866       return n;
00867     }
00868 
00869     class MessageSenderToSource {
00870     public:
00871       MessageSenderToSource(int iQueueID, long iNEventsToProcess):
00872       m_queueID(iQueueID),
00873       m_nEventsToProcess(iNEventsToProcess) {}
00874       void operator()() {
00875         std::cout << "I am controller" << std::endl;
00876         //this is the first child and therefore the controller
00877 
00878         multicore::MessageForSource sndmsg;
00879         sndmsg.startIndex = 0;
00880         sndmsg.nIndices = m_nEventsToProcess;
00881         if(m_nEventsToProcess > 0 && m_nEventsToProcess < m_nEventsToProcess) {
00882           sndmsg.nIndices = m_nEventsToProcess;
00883         }
00884         int value = 0;
00885         do {
00886           value = msgsnd(m_queueID, &sndmsg, multicore::MessageForSource::sizeForBuffer(), 0);
00887           //std::cerr << "worker asked for work" << std::endl;
00888           sndmsg.startIndex += sndmsg.nIndices;
00889         } while(value == 0);
00890         if(errno != EIDRM and errno != EINVAL) {
00891           //NOTE: on Linux we sometimes get EINVAL after the queue was removed rather than EIDRM
00892           perror("message queue send failed");
00893         }
00894         return;
00895       }
00896 
00897     private:
00898       int const m_queueID;
00899       long const m_nEventsToProcess;
00900     };
00901   }
00902 
00903   bool
00904   EventProcessor::forkProcess(std::string const& jobReportFile) {
00905 
00906     if(0 == numberOfForkedChildren_) {return true;}
00907     assert(0<numberOfForkedChildren_);
00908     //do what we want done in common
00909     {
00910       beginJob(); //make sure this was run
00911       // make the services available
00912       ServiceRegistry::Operate operate(serviceToken_);
00913 
00914       InputSource::ItemType itemType;
00915       itemType = input_->nextItemType();
00916 
00917       assert(itemType == InputSource::IsFile);
00918       {
00919         readFile();
00920       }
00921       itemType = input_->nextItemType();
00922       assert(itemType == InputSource::IsRun);
00923 
00924       std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl;
00925       IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
00926                       input_->runAuxiliary()->beginTime());
00927       EventSetup const& es = esp_->eventSetupForInstance(ts);
00928 
00929       //now get all the data available in the EventSetup
00930       std::vector<eventsetup::EventSetupRecordKey> recordKeys;
00931       es.fillAvailableRecordKeys(recordKeys);
00932       std::vector<eventsetup::DataKey> dataKeys;
00933       for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
00934           itKey != itEnd;
00935           ++itKey) {
00936         eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
00937         //see if this is on our exclusion list
00938         ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
00939         ExcludedData const* excludedData(0);
00940         if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
00941           excludedData = &(itExcludeRec->second);
00942           if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
00943             //skip all items in this record
00944             continue;
00945           }
00946         }
00947         if(0 != recordPtr) {
00948           dataKeys.clear();
00949           recordPtr->fillRegisteredDataKeys(dataKeys);
00950           for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
00951               itDataKey != itDataKeyEnd;
00952               ++itDataKey) {
00953             //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00954             if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
00955               std::cout << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00956               continue;
00957             }
00958             try {
00959               recordPtr->doGet(*itDataKey);
00960             } catch(cms::Exception& e) {
00961              LogWarning("EventSetupPreFetching") << e.what();
00962             }
00963           }
00964         }
00965       }
00966     }
00967     std::cout << "  done prefetching" << std::endl;
00968 {
00969     // make the services available
00970     ServiceRegistry::Operate operate(serviceToken_);
00971     Service<JobReport> jobReport;
00972     jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
00973 
00974     //Now actually do the forking
00975     actReg_->preForkReleaseResourcesSignal_();
00976     input_->doPreForkReleaseResources();
00977     schedule_->preForkReleaseResources();
00978 }
00979     installCustomHandler(SIGCHLD, ep_sigchld);
00980 
00981     //Create a message queue used to set what events each child processes
00982     int queueID;
00983     if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) {
00984       printf("Error obtaining message queue\n");
00985       exit(EXIT_FAILURE);
00986     }
00987 
00988     unsigned int childIndex = 0;
00989     unsigned int const kMaxChildren = numberOfForkedChildren_;
00990     unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
00991     std::vector<pid_t> childrenIds;
00992     childrenIds.reserve(kMaxChildren);
00993 {
00994     // make the services available
00995     ServiceRegistry::Operate operate(serviceToken_);
00996     Service<JobReport> jobReport;
00997     for(; childIndex < kMaxChildren; ++childIndex) {
00998       pid_t value = fork();
00999       if(value == 0) {
01000         // this is the child process, redirect stdout and stderr to a log file
01001         fflush(stdout);
01002         fflush(stderr);
01003         std::stringstream stout;
01004         stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01005         if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01006           std::cerr << "Error during freopen of child process "
01007           << childIndex << std::endl;
01008         }
01009         if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01010           std::cerr << "Error during dup2 of child process"
01011           << childIndex << std::endl;
01012         }
01013 
01014         std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl;
01015         if(setCpuAffinity_) {
01016           // CPU affinity is handled differently on macosx.
01017           // We disable it and print a message until someone reads:
01018           //
01019           // https://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
01020           //
01021           // and implements it.
01022 #ifdef __APPLE__
01023           std::cout << "Architecture support for CPU affinity not implemented." << std::endl;
01024 #else
01025           std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl;
01026           cpu_set_t mask;
01027           CPU_ZERO(&mask);
01028           CPU_SET(childIndex, &mask);
01029           if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01030             std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl;
01031             exit(-1);
01032           }
01033 #endif
01034         }
01035         break;
01036       }
01037       if(value < 0) {
01038         std::cerr << "failed to create a child" << std::endl;
01039         //message queue is a system resource so must be cleaned up
01040         // before parent goes away
01041         msgctl(queueID, IPC_RMID, 0);
01042         exit(-1);
01043       }
01044       childrenIds.push_back(value);
01045     }
01046 
01047     if(childIndex < kMaxChildren) {
01048       jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01049       actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01050 
01051       boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID));
01052       input_->doPostForkReacquireResources(receiver);
01053       schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01054       //NOTE: sources have to reset themselves by listening to the post fork message
01055       //rewindInput();
01056       return true;
01057     }
01058     jobReport->parentAfterFork(jobReportFile);
01059 }
01060 
01061     //this is the original, which is now the master for all the children
01062 
01063     //Need to wait for signals from the children or externally
01064     // To wait we must
01065     // 1) block the signals we want to wait on so we do not have a race condition
01066     // 2) check that we haven't already meet our ending criteria
01067     // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
01068     sigset_t blockingSigSet;
01069     sigset_t unblockingSigSet;
01070     sigset_t oldSigSet;
01071     pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01072     pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01073     sigaddset(&blockingSigSet, SIGCHLD);
01074     sigaddset(&blockingSigSet, SIGUSR2);
01075     sigaddset(&blockingSigSet, SIGINT);
01076     sigdelset(&unblockingSigSet, SIGCHLD);
01077     sigdelset(&unblockingSigSet, SIGUSR2);
01078     sigdelset(&unblockingSigSet, SIGINT);
01079     pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01080 
01081     //create a thread that sends the units of work to workers
01082     // we create it after all signals were blocked so that this
01083     // thread is never interupted by a signal
01084     MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_);
01085     boost::thread senderThread(sender);
01086 
01087     while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01088       sigsuspend(&unblockingSigSet);
01089       std::cout << "woke from sigwait" << std::endl;
01090     }
01091     pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01092 
01093     std::cout << "num children who have already stopped " << num_children_done << std::endl;
01094     if(child_failed) {
01095       std::cout << "child failed" << std::endl;
01096     }
01097     if(shutdown_flag) {
01098       std::cout << "asked to shutdown" << std::endl;
01099     }
01100 
01101     if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01102       std::cout << "must stop children" << std::endl;
01103       for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01104           it != itEnd; ++it) {
01105         /* int result = */ kill(*it, SIGUSR2);
01106       }
01107       pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01108       while(num_children_done != kMaxChildren) {
01109         sigsuspend(&unblockingSigSet);
01110       }
01111       pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01112     }
01113     //remove message queue, this will cause the message thread to terminate
01114     // kill it now since all children already stopped
01115     msgctl(queueID, IPC_RMID, 0);
01116     //now wait for the sender thread to finish. This should be quick since
01117     // we killed the message queue
01118     senderThread.join();
01119     if(child_failed) {
01120       throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01121     }
01122     return false;
01123   }
01124 
01125   void
01126   EventProcessor::connectSigs(EventProcessor* ep) {
01127     // When the FwkImpl signals are given, pass them to the
01128     // appropriate EventProcessor signals so that the outside world
01129     // can see the signal.
01130     actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01131     actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01132   }
01133 
01134   std::vector<ModuleDescription const*>
01135   EventProcessor::getAllModuleDescriptions() const {
01136     return schedule_->getAllModuleDescriptions();
01137   }
01138 
01139   int
01140   EventProcessor::totalEvents() const {
01141     return schedule_->totalEvents();
01142   }
01143 
01144   int
01145   EventProcessor::totalEventsPassed() const {
01146     return schedule_->totalEventsPassed();
01147   }
01148 
01149   int
01150   EventProcessor::totalEventsFailed() const {
01151     return schedule_->totalEventsFailed();
01152   }
01153 
01154   void
01155   EventProcessor::enableEndPaths(bool active) {
01156     schedule_->enableEndPaths(active);
01157   }
01158 
01159   bool
01160   EventProcessor::endPathsEnabled() const {
01161     return schedule_->endPathsEnabled();
01162   }
01163 
01164   void
01165   EventProcessor::getTriggerReport(TriggerReport& rep) const {
01166     schedule_->getTriggerReport(rep);
01167   }
01168 
01169   void
01170   EventProcessor::clearCounters() {
01171     schedule_->clearCounters();
01172   }
01173 
01174   char const* EventProcessor::currentStateName() const {
01175     return stateName(getState());
01176   }
01177 
01178   char const* EventProcessor::stateName(State s) const {
01179     return stateNames[s];
01180   }
01181 
01182   char const* EventProcessor::msgName(Msg m) const {
01183     return msgNames[m];
01184   }
01185 
01186   State EventProcessor::getState() const {
01187     return state_;
01188   }
01189 
01190   EventProcessor::StatusCode EventProcessor::statusAsync() const {
01191     // the thread will record exception/error status in the event processor
01192     // for us to look at and report here
01193     return last_rc_;
01194   }
01195 
01196   void
01197   EventProcessor::setRunNumber(RunNumber_t runNumber) {
01198     if(runNumber == 0) {
01199       runNumber = 1;
01200       LogWarning("Invalid Run")
01201         << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01202         << "Run number was set to 1 instead\n";
01203     }
01204 
01205     // inside of beginJob there is a check to see if it has been called before
01206     beginJob();
01207     changeState(mSetRun);
01208 
01209     // interface not correct yet
01210     input_->setRunNumber(runNumber);
01211   }
01212 
01213   void
01214   EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
01215     // inside of beginJob there is a check to see if it has been called before
01216     beginJob();
01217     changeState(mSetRun);
01218 
01219     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
01220     //input_->declareRunNumber(runNumber);
01221   }
01222 
01223   EventProcessor::StatusCode
01224   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01225     bool rc = true;
01226     boost::xtime timeout;
01227     boost::xtime_get(&timeout, boost::TIME_UTC);
01228     timeout.sec += timeout_seconds;
01229 
01230     // make sure to include a timeout here so we don't wait forever
01231     // I suspect there are still timing issues with thread startup
01232     // and the setting of the various control variables (stop_count, id_set)
01233     {
01234       boost::mutex::scoped_lock sl(stop_lock_);
01235 
01236       // look here - if runAsync not active, just return the last return code
01237       if(stop_count_ < 0) return last_rc_;
01238 
01239       if(timeout_seconds == 0) {
01240         while(stop_count_ == 0) stopper_.wait(sl);
01241       } else {
01242         while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01243       }
01244 
01245       if(rc == false) {
01246           // timeout occurred
01247           // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
01248           // this is a temporary hack until we get the input source
01249           // upgraded to allow blocking input sources to be unblocked
01250 
01251           // the next line is dangerous and causes all sorts of trouble
01252           if(id_set_) pthread_cancel(event_loop_id_);
01253 
01254           // we will not do anything yet
01255           LogWarning("timeout")
01256             << "An asynchronous request was made to shut down "
01257             << "the event loop "
01258             << "and the event loop did not shutdown after "
01259             << timeout_seconds << " seconds\n";
01260       } else {
01261           event_loop_->join();
01262           event_loop_.reset();
01263           id_set_ = false;
01264           stop_count_ = -1;
01265       }
01266     }
01267     return rc == false ? epTimedOut : last_rc_;
01268   }
01269 
01270   EventProcessor::StatusCode
01271   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01272     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01273     if(rc != epTimedOut) changeState(mCountComplete);
01274     else errorState();
01275     return rc;
01276   }
01277 
01278 
01279   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01280     changeState(mStopAsync);
01281     StatusCode rc = waitForAsyncCompletion(secs);
01282     if(rc != epTimedOut) changeState(mFinished);
01283     else errorState();
01284     return rc;
01285   }
01286 
01287   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01288     changeState(mShutdownAsync);
01289     StatusCode rc = waitForAsyncCompletion(secs);
01290     if(rc != epTimedOut) changeState(mFinished);
01291     else errorState();
01292     return rc;
01293   }
01294 
01295   void EventProcessor::errorState() {
01296     state_ = sError;
01297   }
01298 
01299   // next function irrelevant now
01300   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01301     // make sure to include a timeout here so we don't wait forever
01302     // I suspect there are still timing issues with thread startup
01303     // and the setting of the various control variables (stop_count, id_set)
01304     changeState(m);
01305     return waitForAsyncCompletion(60*2);
01306   }
01307 
01308   void EventProcessor::changeState(Msg msg) {
01309     // most likely need to serialize access to this routine
01310 
01311     boost::mutex::scoped_lock sl(state_lock_);
01312     State curr = state_;
01313     int rc;
01314     // found if(not end of table) and
01315     // (state == table.state && (msg == table.message || msg == any))
01316     for(rc = 0;
01317         table[rc].current != sInvalid &&
01318           (curr != table[rc].current ||
01319            (curr == table[rc].current &&
01320              msg != table[rc].message && table[rc].message != mAny));
01321         ++rc);
01322 
01323     if(table[rc].current == sInvalid)
01324       throw cms::Exception("BadState")
01325         << "A member function of EventProcessor has been called in an"
01326         << " inappropriate order.\n"
01327         << "Bad transition from " << stateName(curr) << " "
01328         << "using message " << msgName(msg) << "\n"
01329         << "No where to go from here.\n";
01330 
01331     FDEBUG(1) << "changeState: current=" << stateName(curr)
01332               << ", message=" << msgName(msg)
01333               << " -> new=" << stateName(table[rc].final) << "\n";
01334 
01335     state_ = table[rc].final;
01336   }
01337 
01338   void EventProcessor::runAsync() {
01339     beginJob();
01340     {
01341       boost::mutex::scoped_lock sl(stop_lock_);
01342       if(id_set_ == true) {
01343           std::string err("runAsync called while async event loop already running\n");
01344           LogError("FwkJob") << err;
01345           throw cms::Exception("BadState") << err;
01346       }
01347 
01348       changeState(mRunAsync);
01349 
01350       stop_count_ = 0;
01351       last_rc_ = epSuccess; // forget the last value!
01352       event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01353       boost::xtime timeout;
01354       boost::xtime_get(&timeout, boost::TIME_UTC);
01355       timeout.sec += 60; // 60 seconds to start!!!!
01356       if(starter_.timed_wait(sl, timeout) == false) {
01357           // yikes - the thread did not start
01358           throw cms::Exception("BadState")
01359             << "Async run thread did not start in 60 seconds\n";
01360       }
01361     }
01362   }
01363 
01364   void EventProcessor::asyncRun(EventProcessor* me) {
01365     // set up signals to allow for interruptions
01366     // ignore all other signals
01367     // make sure no exceptions escape out
01368 
01369     // temporary hack until we modify the input source to allow
01370     // wakeup calls from other threads.  This mimics the solution
01371     // in EventFilter/Processor, which I do not like.
01372     // allowing cancels means that the thread just disappears at
01373     // certain points.  This is bad for C++ stack variables.
01374     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01375     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
01376     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01377     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01378 
01379     {
01380       boost::mutex::scoped_lock(me->stop_lock_);
01381       me->event_loop_id_ = pthread_self();
01382       me->id_set_ = true;
01383       me->starter_.notify_all();
01384     }
01385 
01386     Status rc = epException;
01387     FDEBUG(2) << "asyncRun starting ......................\n";
01388 
01389     try {
01390       bool onlineStateTransitions = true;
01391       rc = me->runToCompletion(onlineStateTransitions);
01392     }
01393     catch (cms::Exception& e) {
01394       LogError("FwkJob") << "cms::Exception caught in "
01395                          << "EventProcessor::asyncRun"
01396                          << "\n"
01397                          << e.explainSelf();
01398       me->last_error_text_ = e.explainSelf();
01399     }
01400     catch (std::exception& e) {
01401       LogError("FwkJob") << "Standard library exception caught in "
01402                          << "EventProcessor::asyncRun"
01403                          << "\n"
01404                          << e.what();
01405       me->last_error_text_ = e.what();
01406     }
01407     catch (...) {
01408       LogError("FwkJob") << "Unknown exception caught in "
01409                          << "EventProcessor::asyncRun"
01410                          << "\n";
01411       me->last_error_text_ = "Unknown exception caught";
01412       rc = epOther;
01413     }
01414 
01415     me->last_rc_ = rc;
01416 
01417     {
01418       // notify anyone waiting for exit that we are doing so now
01419       boost::mutex::scoped_lock sl(me->stop_lock_);
01420       ++me->stop_count_;
01421       me->stopper_.notify_all();
01422     }
01423     FDEBUG(2) << "asyncRun ending ......................\n";
01424   }
01425 
01426 
01427   EventProcessor::StatusCode
01428   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01429 
01430     StateSentry toerror(this);
01431 
01432     int numberOfEventsToProcess = -1;
01433     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01434 
01435     if(machine_.get() != 0) {
01436       throw Exception(errors::LogicError)
01437         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01438         << "Please report this error to the Framework group\n";
01439     }
01440 
01441     toerror.succeeded();
01442 
01443     return returnCode;
01444   }
01445 
01446   EventProcessor::StatusCode
01447   EventProcessor::runEventCount(int numberOfEventsToProcess) {
01448 
01449     StateSentry toerror(this);
01450 
01451     bool onlineStateTransitions = false;
01452     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01453 
01454     toerror.succeeded();
01455 
01456     return returnCode;
01457   }
01458 
01459   EventProcessor::StatusCode
01460   EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01461 
01462     // Reusable event principal
01463     boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
01464     principalCache_.insert(ep);
01465 
01466     beginJob(); //make sure this was called
01467 
01468     if(!onlineStateTransitions) changeState(mRunCount);
01469 
01470     StatusCode returnCode = epSuccess;
01471     stateMachineWasInErrorState_ = false;
01472 
01473     // make the services available
01474     ServiceRegistry::Operate operate(serviceToken_);
01475 
01476     if(machine_.get() == 0) {
01477 
01478       statemachine::FileMode fileMode;
01479       if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01480       else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01481       else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01482       else {
01483          throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01484             << fileMode_ << ".\n"
01485             << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01486       }
01487 
01488       statemachine::EmptyRunLumiMode emptyRunLumiMode;
01489       if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01490       else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01491       else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01492       else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01493       else {
01494          throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01495             << emptyRunLumiMode_ << ".\n"
01496             << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01497       }
01498 
01499       machine_.reset(new statemachine::Machine(this,
01500                                                fileMode,
01501                                                emptyRunLumiMode));
01502 
01503       machine_->initiate();
01504     }
01505 
01506     try {
01507       try {
01508 
01509         InputSource::ItemType itemType;
01510 
01511         int iEvents = 0;
01512 
01513         while(true) {
01514 
01515           itemType = input_->nextItemType();
01516 
01517           FDEBUG(1) << "itemType = " << itemType << "\n";
01518 
01519           // These are used for asynchronous running only and
01520           // and are checking to see if stopAsync or shutdownAsync
01521           // were called from another thread.  In the future, we
01522           // may need to do something better than polling the state.
01523           // With the current code this is the simplest thing and
01524           // it should always work.  If the interaction between
01525           // threads becomes more complex this may cause problems.
01526           if(state_ == sStopping) {
01527             FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01528             forceLooperToEnd_ = true;
01529             machine_->process_event(statemachine::Stop());
01530             forceLooperToEnd_ = false;
01531             break;
01532           }
01533           else if(state_ == sShuttingDown) {
01534             FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01535             forceLooperToEnd_ = true;
01536             machine_->process_event(statemachine::Stop());
01537             forceLooperToEnd_ = false;
01538             break;
01539           }
01540 
01541           // Look for a shutdown signal
01542           {
01543             boost::mutex::scoped_lock sl(usr2_lock);
01544             if(shutdown_flag) {
01545               changeState(mShutdownSignal);
01546               returnCode = epSignal;
01547               forceLooperToEnd_ = true;
01548               machine_->process_event(statemachine::Stop());
01549               forceLooperToEnd_ = false;
01550               break;
01551             }
01552           }
01553 
01554           if(itemType == InputSource::IsStop) {
01555             machine_->process_event(statemachine::Stop());
01556           }
01557           else if(itemType == InputSource::IsFile) {
01558             machine_->process_event(statemachine::File());
01559           }
01560           else if(itemType == InputSource::IsRun) {
01561             machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run()));
01562           }
01563           else if(itemType == InputSource::IsLumi) {
01564             machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01565           }
01566           else if(itemType == InputSource::IsEvent) {
01567             machine_->process_event(statemachine::Event());
01568             ++iEvents;
01569             if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01570               returnCode = epCountComplete;
01571               changeState(mInputExhausted);
01572               FDEBUG(1) << "Event count complete, pausing event loop\n";
01573               break;
01574             }
01575           }
01576           // This should be impossible
01577           else {
01578             throw Exception(errors::LogicError)
01579               << "Unknown next item type passed to EventProcessor\n"
01580               << "Please report this error to the Framework group\n";
01581           }
01582 
01583           if(machine_->terminated()) {
01584             changeState(mInputExhausted);
01585             break;
01586           }
01587         }  // End of loop over state machine events
01588       } // Try block
01589       catch (cms::Exception& e) { throw; }
01590       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01591       catch (std::exception& e) { convertException::stdToEDM(e); }
01592       catch(std::string& s) { convertException::stringToEDM(s); }
01593       catch(char const* c) { convertException::charPtrToEDM(c); }
01594       catch (...) { convertException::unknownToEDM(); }
01595     } // Try block
01596     // Some comments on exception handling related to the boost state machine:
01597     //
01598     // Some states used in the machine are special because they
01599     // perform actions while the machine is being terminated, actions
01600     // such as close files, call endRun, call endLumi etc ...  Each of these
01601     // states has two functions that perform these actions.  The functions
01602     // are almost identical.  The major difference is that one version
01603     // catches all exceptions and the other lets exceptions pass through.
01604     // The destructor catches them and the other function named "exit" lets
01605     // them pass through.  On a normal termination, boost will always call
01606     // "exit" and then the state destructor.  In our state classes, the
01607     // the destructors do nothing if the exit function already took
01608     // care of things.  Here's the interesting part.  When boost is
01609     // handling an exception the "exit" function is not called (a boost
01610     // feature).
01611     //
01612     // If an exception occurs while the boost machine is in control
01613     // (which usually means inside a process_event call), then
01614     // the boost state machine destroys its states and "terminates" itself.
01615     // This already done before we hit the catch blocks below. In this case
01616     // the call to terminateMachine below only destroys an already
01617     // terminated state machine.  Because exit is not called, the state destructors
01618     // handle cleaning up lumis, runs, and files.  The destructors swallow
01619     // all exceptions and only pass through the exceptions messages, which
01620     // are tacked onto the original exception below.
01621     //
01622     // If an exception occurs when the boost state machine is not
01623     // in control (outside the process_event functions), then boost
01624     // cannot destroy its own states.  The terminateMachine function
01625     // below takes care of that.  The flag "alreadyHandlingException"
01626     // is set true so that the state exit functions do nothing (and
01627     // cannot throw more exceptions while handling the first).  Then the
01628     // state destructors take care of this because exit did nothing.
01629     //
01630     // In both cases above, the EventProcessor::endOfLoop function is
01631     // not called because it can throw exceptions.
01632     //
01633     // One tricky aspect of the state machine is that things that can
01634     // throw should not be invoked by the state machine while another
01635     // exception is being handled.
01636     // Another tricky aspect is that it appears to be important to
01637     // terminate the state machine before invoking its destructor.
01638     // We've seen crashes that are not understood when that is not
01639     // done.  Maintainers of this code should be careful about this.
01640 
01641     catch (cms::Exception & e) {
01642       alreadyHandlingException_ = true;
01643       terminateMachine();
01644       alreadyHandlingException_ = false;
01645       if (!exceptionMessageLumis_.empty()) {
01646         e.addAdditionalInfo(exceptionMessageLumis_);
01647         if (e.alreadyPrinted()) {
01648           LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01649         }
01650       }
01651       if (!exceptionMessageRuns_.empty()) {
01652         e.addAdditionalInfo(exceptionMessageRuns_);
01653         if (e.alreadyPrinted()) {
01654           LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01655         }
01656       }
01657       if (!exceptionMessageFiles_.empty()) {
01658         e.addAdditionalInfo(exceptionMessageFiles_);
01659         if (e.alreadyPrinted()) {
01660           LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01661         }
01662       }
01663       throw;
01664     }
01665 
01666     if(machine_->terminated()) {
01667       FDEBUG(1) << "The state machine reports it has been terminated\n";
01668       machine_.reset();
01669     }
01670 
01671     if(!onlineStateTransitions) changeState(mFinished);
01672 
01673     if(stateMachineWasInErrorState_) {
01674       throw cms::Exception("BadState")
01675         << "The boost state machine in the EventProcessor exited after\n"
01676         << "entering the Error state.\n";
01677     }
01678 
01679     return returnCode;
01680   }
01681 
01682   void EventProcessor::readFile() {
01683     FDEBUG(1) << " \treadFile\n";
01684     fb_ = input_->readFile();
01685     if(numberOfForkedChildren_ > 0) {
01686         fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01687     }
01688   }
01689 
01690   void EventProcessor::closeInputFile() {
01691     if (fb_.get() != 0) {
01692       input_->closeFile(fb_);
01693     }
01694     FDEBUG(1) << "\tcloseInputFile\n";
01695   }
01696 
01697   void EventProcessor::openOutputFiles() {
01698     if (fb_.get() != 0) {
01699       schedule_->openOutputFiles(*fb_);
01700       if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01701     }
01702     FDEBUG(1) << "\topenOutputFiles\n";
01703   }
01704 
01705   void EventProcessor::closeOutputFiles() {
01706     if (fb_.get() != 0) {
01707       schedule_->closeOutputFiles();
01708       if(hasSubProcess()) subProcess_->closeOutputFiles();
01709     }
01710     FDEBUG(1) << "\tcloseOutputFiles\n";
01711   }
01712 
01713   void EventProcessor::respondToOpenInputFile() {
01714     if (fb_.get() != 0) {
01715       schedule_->respondToOpenInputFile(*fb_);
01716       if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01717     }
01718     FDEBUG(1) << "\trespondToOpenInputFile\n";
01719   }
01720 
01721   void EventProcessor::respondToCloseInputFile() {
01722     if (fb_.get() != 0) {
01723       schedule_->respondToCloseInputFile(*fb_);
01724       if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01725     }
01726     FDEBUG(1) << "\trespondToCloseInputFile\n";
01727   }
01728 
01729   void EventProcessor::respondToOpenOutputFiles() {
01730     if (fb_.get() != 0) {
01731       schedule_->respondToOpenOutputFiles(*fb_);
01732       if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01733     }
01734     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01735   }
01736 
01737   void EventProcessor::respondToCloseOutputFiles() {
01738     if (fb_.get() != 0) {
01739       schedule_->respondToCloseOutputFiles(*fb_);
01740       if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01741     }
01742     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01743   }
01744 
01745   void EventProcessor::startingNewLoop() {
01746     shouldWeStop_ = false;
01747     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
01748     // until after we've called beginOfJob
01749     if(looper_ && looperBeginJobRun_) {
01750       looper_->doStartingNewLoop();
01751     }
01752     FDEBUG(1) << "\tstartingNewLoop\n";
01753   }
01754 
01755   bool EventProcessor::endOfLoop() {
01756     if(looper_) {
01757       ModuleChanger changer(schedule_.get());
01758       looper_->setModuleChanger(&changer);
01759       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01760       looper_->setModuleChanger(0);
01761       if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01762       else return false;
01763     }
01764     FDEBUG(1) << "\tendOfLoop\n";
01765     return true;
01766   }
01767 
01768   void EventProcessor::rewindInput() {
01769     input_->repeat();
01770     input_->rewind();
01771     FDEBUG(1) << "\trewind\n";
01772   }
01773 
01774   void EventProcessor::prepareForNextLoop() {
01775     looper_->prepareForNextLoop(esp_.get());
01776     FDEBUG(1) << "\tprepareForNextLoop\n";
01777   }
01778 
01779   bool EventProcessor::shouldWeCloseOutput() const {
01780     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01781     return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01782   }
01783 
01784   void EventProcessor::doErrorStuff() {
01785     FDEBUG(1) << "\tdoErrorStuff\n";
01786     LogError("StateMachine")
01787       << "The EventProcessor state machine encountered an unexpected event\n"
01788       << "and went to the error state\n"
01789       << "Will attempt to terminate processing normally\n"
01790       << "(IF using the looper the next loop will be attempted)\n"
01791       << "This likely indicates a bug in an input module or corrupted input or both\n";
01792     stateMachineWasInErrorState_ = true;
01793   }
01794 
01795   void EventProcessor::beginRun(statemachine::Run const& run) {
01796     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01797     input_->doBeginRun(runPrincipal);
01798     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01799                     runPrincipal.beginTime());
01800     if(forceESCacheClearOnNewRun_){
01801       esp_->forceCacheClear();
01802     }
01803     EventSetup const& es = esp_->eventSetupForInstance(ts);
01804     if(looper_ && looperBeginJobRun_== false) {
01805       looper_->copyInfo(ScheduleInfo(schedule_.get()));
01806       looper_->beginOfJob(es);
01807       looperBeginJobRun_ = true;
01808       looper_->doStartingNewLoop();
01809     }
01810     {
01811       typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01812       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01813       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01814       if(hasSubProcess()) {
01815         subProcess_->doBeginRun(runPrincipal, ts);
01816       }
01817     }
01818     FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01819     if(looper_) {
01820       looper_->doBeginRun(runPrincipal, es);
01821     }
01822   }
01823 
01824   void EventProcessor::endRun(statemachine::Run const& run) {
01825     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01826     input_->doEndRun(runPrincipal);
01827     IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01828                     runPrincipal.endTime());
01829     EventSetup const& es = esp_->eventSetupForInstance(ts);
01830     {
01831       typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
01832       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01833       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01834       if(hasSubProcess()) {
01835         subProcess_->doEndRun(runPrincipal, ts);
01836       }
01837     }
01838     FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
01839     if(looper_) {
01840       looper_->doEndRun(runPrincipal, es);
01841     }
01842   }
01843 
01844   void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
01845     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01846     input_->doBeginLumi(lumiPrincipal);
01847 
01848     Service<RandomNumberGenerator> rng;
01849     if(rng.isAvailable()) {
01850       LuminosityBlock lb(lumiPrincipal, ModuleDescription());
01851       rng->preBeginLumi(lb);
01852     }
01853 
01854     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
01855     // lumi blocks know their start and end times why not also start and end events?
01856     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
01857     EventSetup const& es = esp_->eventSetupForInstance(ts);
01858     {
01859       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
01860       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01861       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01862       if(hasSubProcess()) {
01863         subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
01864       }
01865     }
01866     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01867     if(looper_) {
01868       looper_->doBeginLuminosityBlock(lumiPrincipal, es);
01869     }
01870   }
01871 
01872   void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
01873     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01874     input_->doEndLumi(lumiPrincipal);
01875     //NOTE: Using the max event number for the end of a lumi block is a bad idea
01876     // lumi blocks know their start and end times why not also start and end events?
01877     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
01878                     lumiPrincipal.endTime());
01879     EventSetup const& es = esp_->eventSetupForInstance(ts);
01880     {
01881       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
01882       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01883       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01884       if(hasSubProcess()) {
01885         subProcess_->doEndLuminosityBlock(lumiPrincipal, ts);
01886       }
01887     }
01888     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01889     if(looper_) {
01890       looper_->doEndLuminosityBlock(lumiPrincipal, es);
01891     }
01892   }
01893 
01894   statemachine::Run EventProcessor::readAndCacheRun() {
01895     input_->readAndCacheRun();
01896     input_->markRun();
01897     return statemachine::Run(input_->processHistoryID(), input_->run());
01898   }
01899 
01900   int EventProcessor::readAndCacheLumi() {
01901     input_->readAndCacheLumi();
01902     input_->markLumi();
01903     return input_->luminosityBlock();
01904   }
01905 
01906   void EventProcessor::writeRun(statemachine::Run const& run) {
01907     schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
01908     if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
01909     FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
01910   }
01911 
01912   void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
01913     principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
01914     FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
01915   }
01916 
01917   void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
01918     schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
01919     if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
01920     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01921   }
01922 
01923   void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
01924     principalCache_.deleteLumi(phid, run, lumi);
01925     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01926   }
01927 
01928   void EventProcessor::readAndProcessEvent() {
01929     EventPrincipal *pep = 0;
01930     try {
01931       try {
01932         pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
01933         FDEBUG(1) << "\treadEvent\n";
01934       }
01935       catch (cms::Exception& e) { throw; }
01936       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01937       catch (std::exception& e) { convertException::stdToEDM(e); }
01938       catch(std::string& s) { convertException::stringToEDM(s); }
01939       catch(char const* c) { convertException::charPtrToEDM(c); }
01940       catch (...) { convertException::unknownToEDM(); }
01941     }
01942     catch(cms::Exception& ex) {
01943       ex.addContext("Calling readEvent in the input source");
01944       throw;
01945     }
01946     assert(pep != 0);
01947 
01948     IOVSyncValue ts(pep->id(), pep->time());
01949     EventSetup const& es = esp_->eventSetupForInstance(ts);
01950     {
01951       typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
01952       ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
01953       schedule_->processOneOccurrence<Traits>(*pep, es);
01954       if(hasSubProcess()) {
01955         subProcess_->doEvent(*pep, ts);
01956       }
01957     }
01958 
01959     if(looper_) {
01960       bool randomAccess = input_->randomAccess();
01961       ProcessingController::ForwardState forwardState = input_->forwardState();
01962       ProcessingController::ReverseState reverseState = input_->reverseState();
01963       ProcessingController pc(forwardState, reverseState, randomAccess);
01964 
01965       EDLooperBase::Status status = EDLooperBase::kContinue;
01966       do {
01967         status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
01968 
01969         bool succeeded = true;
01970         if(randomAccess) {
01971           if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
01972             input_->skipEvents(-2);
01973           }
01974           else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
01975             succeeded = input_->goToEvent(pc.specifiedEventTransition());
01976           }
01977         }
01978         pc.setLastOperationSucceeded(succeeded);
01979       } while(!pc.lastOperationSucceeded());
01980       if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
01981 
01982     }
01983 
01984     FDEBUG(1) << "\tprocessEvent\n";
01985     pep->clearEventPrincipal();
01986   }
01987 
01988   bool EventProcessor::shouldWeStop() const {
01989     FDEBUG(1) << "\tshouldWeStop\n";
01990     if(shouldWeStop_) return true;
01991     return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
01992   }
01993 
01994   void EventProcessor::setExceptionMessageFiles(std::string& message) {
01995     exceptionMessageFiles_ = message;
01996   }
01997 
01998   void EventProcessor::setExceptionMessageRuns(std::string& message) {
01999     exceptionMessageRuns_ = message;
02000   }
02001 
02002   void EventProcessor::setExceptionMessageLumis(std::string& message) {
02003     exceptionMessageLumis_ = message;
02004   }
02005 
02006   bool EventProcessor::alreadyHandlingException() const {
02007     return alreadyHandlingException_;
02008   }
02009 
02010   void EventProcessor::terminateMachine() {
02011     if(machine_.get() != 0) {
02012       if(!machine_->terminated()) {
02013         forceLooperToEnd_ = true;
02014         machine_->process_event(statemachine::Stop());
02015         forceLooperToEnd_ = false;
02016       }
02017       else {
02018         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
02019       }
02020       if(machine_->terminated()) {
02021         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02022       }
02023       machine_.reset();
02024     }
02025   }
02026 }