CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_2_9/src/FWCore/Framework/src/EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011 
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036 
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038 
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046 
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049 
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057 
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060 
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063 
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069 
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072 
00073 //Used for forking
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080 
00081 //Used for CPU affinity
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085 
00086 namespace edm {
00087 
00088   namespace event_processor {
00089 
00090     class StateSentry {
00091     public:
00092       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00093       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00094       void succeeded() {success_ = true;}
00095 
00096     private:
00097       EventProcessor* ep_;
00098       bool success_;
00099     };
00100   }
00101 
00102   using namespace event_processor;
00103 
00104   namespace {
00105     template <typename T>
00106     class ScheduleSignalSentry {
00107     public:
00108       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00109            a_(a), principal_(principal), es_(es) {
00110         if (a_) T::preScheduleSignal(a_, principal_);
00111       }
00112       ~ScheduleSignalSentry() {
00113         if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00114       }
00115 
00116     private:
00117       // We own none of these resources.
00118       ActivityRegistry* a_;
00119       typename T::MyPrincipal* principal_;
00120       EventSetup const* es_;
00121     };
00122   }
00123 
00124   namespace {
00125 
00126     // the next two tables must be kept in sync with the state and
00127     // message enums from the header
00128 
00129     char const* stateNames[] = {
00130       "Init",
00131       "JobReady",
00132       "RunGiven",
00133       "Running",
00134       "Stopping",
00135       "ShuttingDown",
00136       "Done",
00137       "JobEnded",
00138       "Error",
00139       "ErrorEnded",
00140       "End",
00141       "Invalid"
00142     };
00143 
00144     char const* msgNames[] = {
00145       "SetRun",
00146       "Skip",
00147       "RunAsync",
00148       "Run(ID)",
00149       "Run(count)",
00150       "BeginJob",
00151       "StopAsync",
00152       "ShutdownAsync",
00153       "EndJob",
00154       "CountComplete",
00155       "InputExhausted",
00156       "StopSignal",
00157       "ShutdownSignal",
00158       "Finished",
00159       "Any",
00160       "dtor",
00161       "Exception",
00162       "Rewind"
00163     };
00164   }
00165     // IMPORTANT NOTE:
00166     // the mAny messages are special, they must appear last in the
00167     // table if multiple entries for a CurrentState are present.
00168     // the changeState function does not use the mAny yet!!!
00169 
00170     struct TransEntry {
00171       State current;
00172       Msg   message;
00173       State final;
00174     };
00175 
00176     // we should use this information to initialize a two dimensional
00177     // table of t[CurrentState][Message] = FinalState
00178 
00179     /*
00180       the way this is current written, the async run can thread function
00181       can return in the "JobReady" state - but not yet cleaned up.  The
00182       problem is that only when stop/shutdown async is called is the
00183       thread cleaned up. But the stop/shudown async functions attempt
00184       first to change the state using messages that are not valid in
00185       "JobReady" state.
00186 
00187       I think most of the problems can be solved by using two states
00188       for "running": RunningS and RunningA (sync/async). The problems
00189       seems to be the all the transitions out of running for both
00190       modes of operation.  The other solution might be to only go to
00191       "Stopping" from Running, and use the return code from "run_p" to
00192       set the final state.  If this is used, then in the sync mode the
00193       "Stopping" state would be momentary.
00194 
00195      */
00196 
00197     TransEntry table[] = {
00198     // CurrentState   Message         FinalState
00199     // -----------------------------------------
00200       { sInit,          mException,      sError },
00201       { sInit,          mBeginJob,       sJobReady },
00202       { sJobReady,      mException,      sError },
00203       { sJobReady,      mSetRun,         sRunGiven },
00204       { sJobReady,      mInputRewind,    sRunning },
00205       { sJobReady,      mSkip,           sRunning },
00206       { sJobReady,      mRunID,          sRunning },
00207       { sJobReady,      mRunCount,       sRunning },
00208       { sJobReady,      mEndJob,         sJobEnded },
00209       { sJobReady,      mBeginJob,       sJobReady },
00210       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00211 
00212       { sJobReady,      mStopAsync,      sJobReady },
00213       { sJobReady,      mCountComplete,  sJobReady },
00214       { sJobReady,      mFinished,       sJobReady },
00215 
00216       { sRunGiven,      mException,      sError },
00217       { sRunGiven,      mRunAsync,       sRunning },
00218       { sRunGiven,      mBeginJob,       sRunGiven },
00219       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00220       { sRunGiven,      mStopAsync,      sStopping },
00221       { sRunning,       mException,      sError },
00222       { sRunning,       mStopAsync,      sStopping },
00223       { sRunning,       mShutdownAsync,  sShuttingDown },
00224       { sRunning,       mShutdownSignal, sShuttingDown },
00225       { sRunning,       mCountComplete,  sStopping }, // sJobReady
00226       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00227 
00228       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00229       { sStopping,      mException,      sError },
00230       { sStopping,      mFinished,       sJobReady },
00231       { sStopping,      mCountComplete,  sJobReady },
00232       { sStopping,      mShutdownSignal, sShuttingDown },
00233       { sStopping,      mStopAsync,      sStopping },     // stay
00234       { sStopping,      mInputExhausted, sStopping },     // stay
00235       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00236       { sShuttingDown,  mException,      sError },
00237       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00238       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00239       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00240       { sShuttingDown,  mFinished,       sDone },
00241       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00242       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00243       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00244       { sDone,          mEndJob,         sJobEnded },
00245       { sDone,          mException,      sError },
00246       { sJobEnded,      mDtor,           sEnd },
00247       { sJobEnded,      mException,      sError },
00248       { sError,         mEndJob,         sError },   // funny one here
00249       { sError,         mDtor,           sError },   // funny one here
00250       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00251       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00252       { sInvalid,       mAny,            sInvalid }
00253     };
00254 
00255 
00256     // Note: many of the messages generate the mBeginJob message first
00257     //  mRunID, mRunCount, mSetRun
00258 
00259   // ---------------------------------------------------------------
00260   boost::shared_ptr<InputSource>
00261   makeInput(ParameterSet& params,
00262             CommonParams const& common,
00263             ProductRegistry& preg,
00264             PrincipalCache& pCache,
00265             boost::shared_ptr<ActivityRegistry> areg,
00266             boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00267     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00268     if(main_input == 0) {
00269       throw Exception(errors::Configuration)
00270         << "There must be exactly one source in the configuration.\n"
00271         << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00272     }
00273 
00274     std::string modtype(main_input->getParameter<std::string>("@module_type"));
00275 
00276     std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00277       ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00278     ConfigurationDescriptions descriptions(filler->baseType());
00279     filler->fill(descriptions);
00280 
00281     try {
00282       try {
00283         descriptions.validate(*main_input, std::string("source"));
00284       }
00285       catch (cms::Exception& e) { throw; }
00286       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00287       catch (std::exception& e) { convertException::stdToEDM(e); }
00288       catch(std::string& s) { convertException::stringToEDM(s); }
00289       catch(char const* c) { convertException::charPtrToEDM(c); }
00290       catch (...) { convertException::unknownToEDM(); }
00291     }
00292     catch (cms::Exception & iException) {
00293       std::ostringstream ost;
00294       ost << "Validating configuration of input source of type " << modtype;
00295       iException.addContext(ost.str());
00296       throw;
00297     }
00298 
00299     main_input->registerIt();
00300 
00301     // Fill in "ModuleDescription", in case the input source produces
00302     // any EDproducts, which would be registered in the ProductRegistry.
00303     // Also fill in the process history item for this process.
00304     // There is no module label for the unnamed input source, so
00305     // just use "source".
00306     // Only the tracked parameters belong in the process configuration.
00307     ModuleDescription md(main_input->id(),
00308                          main_input->getParameter<std::string>("@module_type"),
00309                          "source",
00310                          processConfiguration.get());
00311 
00312     InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00313     areg->preSourceConstructionSignal_(md);
00314     boost::shared_ptr<InputSource> input;
00315     try {
00316       try {
00317         input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00318       }
00319       catch (cms::Exception& e) { throw; }
00320       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00321       catch (std::exception& e) { convertException::stdToEDM(e); }
00322       catch(std::string& s) { convertException::stringToEDM(s); }
00323       catch(char const* c) { convertException::charPtrToEDM(c); }
00324       catch (...) { convertException::unknownToEDM(); }
00325     }
00326     catch (cms::Exception& iException) {
00327       areg->postSourceConstructionSignal_(md);
00328       std::ostringstream ost;
00329       ost << "Constructing input source of type " << modtype;
00330       iException.addContext(ost.str());
00331       throw;
00332     }
00333     areg->postSourceConstructionSignal_(md);
00334     return input;
00335   }
00336 
00337   // ---------------------------------------------------------------
00338   boost::shared_ptr<EDLooperBase>
00339   fillLooper(eventsetup::EventSetupProvider& cp,
00340                          ParameterSet& params,
00341                          CommonParams const& common) {
00342     boost::shared_ptr<EDLooperBase> vLooper;
00343 
00344     std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00345 
00346     if(loopers.size() == 0) {
00347        return vLooper;
00348     }
00349 
00350     assert(1 == loopers.size());
00351 
00352     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00353         itName != itNameEnd;
00354         ++itName) {
00355 
00356       ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00357       providerPSet->registerIt();
00358       vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00359                                                         *providerPSet,
00360                                                         common.processName_,
00361                                                         common.releaseVersion_,
00362                                                         common.passID_);
00363       }
00364       return vLooper;
00365 
00366   }
00367 
00368   // ---------------------------------------------------------------
00369   EventProcessor::EventProcessor(std::string const& config,
00370                                 ServiceToken const& iToken,
00371                                 serviceregistry::ServiceLegacy iLegacy,
00372                                 std::vector<std::string> const& defaultServices,
00373                                 std::vector<std::string> const& forcedServices) :
00374     preProcessEventSignal_(),
00375     postProcessEventSignal_(),
00376     actReg_(),
00377     preg_(),
00378     serviceToken_(),
00379     input_(),
00380     esp_(),
00381     act_table_(),
00382     processConfiguration_(),
00383     schedule_(),
00384     subProcess_(),
00385     historyAppender_(new HistoryAppender),
00386     state_(sInit),
00387     event_loop_(),
00388     state_lock_(),
00389     stop_lock_(),
00390     stopper_(),
00391     starter_(),
00392     stop_count_(-1),
00393     last_rc_(epSuccess),
00394     last_error_text_(),
00395     id_set_(false),
00396     event_loop_id_(),
00397     my_sig_num_(getSigNum()),
00398     fb_(),
00399     looper_(),
00400     machine_(),
00401     principalCache_(),
00402     shouldWeStop_(false),
00403     stateMachineWasInErrorState_(false),
00404     fileMode_(),
00405     emptyRunLumiMode_(),
00406     exceptionMessageFiles_(),
00407     exceptionMessageRuns_(),
00408     exceptionMessageLumis_(),
00409     alreadyHandlingException_(false),
00410     forceLooperToEnd_(false),
00411     looperBeginJobRun_(false),
00412     forceESCacheClearOnNewRun_(false),
00413     numberOfForkedChildren_(0),
00414     numberOfSequentialEventsPerChild_(1),
00415     setCpuAffinity_(false),
00416     eventSetupDataToExcludeFromPrefetching_() {
00417     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00418     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00419     processDesc->addServices(defaultServices, forcedServices);
00420     init(processDesc, iToken, iLegacy);
00421   }
00422 
00423   EventProcessor::EventProcessor(std::string const& config,
00424                                 std::vector<std::string> const& defaultServices,
00425                                 std::vector<std::string> const& forcedServices) :
00426     preProcessEventSignal_(),
00427     postProcessEventSignal_(),
00428     actReg_(),
00429     preg_(),
00430     serviceToken_(),
00431     input_(),
00432     esp_(),
00433     act_table_(),
00434     processConfiguration_(),
00435     schedule_(),
00436     subProcess_(),
00437     historyAppender_(new HistoryAppender),
00438     state_(sInit),
00439     event_loop_(),
00440     state_lock_(),
00441     stop_lock_(),
00442     stopper_(),
00443     starter_(),
00444     stop_count_(-1),
00445     last_rc_(epSuccess),
00446     last_error_text_(),
00447     id_set_(false),
00448     event_loop_id_(),
00449     my_sig_num_(getSigNum()),
00450     fb_(),
00451     looper_(),
00452     machine_(),
00453     principalCache_(),
00454     shouldWeStop_(false),
00455     stateMachineWasInErrorState_(false),
00456     fileMode_(),
00457     emptyRunLumiMode_(),
00458     exceptionMessageFiles_(),
00459     exceptionMessageRuns_(),
00460     exceptionMessageLumis_(),
00461     alreadyHandlingException_(false),
00462     forceLooperToEnd_(false),
00463     looperBeginJobRun_(false),
00464     forceESCacheClearOnNewRun_(false),
00465     numberOfForkedChildren_(0),
00466     numberOfSequentialEventsPerChild_(1),
00467     setCpuAffinity_(false),
00468     eventSetupDataToExcludeFromPrefetching_() {
00469     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00470     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00471     processDesc->addServices(defaultServices, forcedServices);
00472     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00473   }
00474 
00475   EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00476                  ServiceToken const& token,
00477                  serviceregistry::ServiceLegacy legacy) :
00478     preProcessEventSignal_(),
00479     postProcessEventSignal_(),
00480     actReg_(),
00481     preg_(),
00482     serviceToken_(),
00483     input_(),
00484     esp_(),
00485     act_table_(),
00486     processConfiguration_(),
00487     schedule_(),
00488     subProcess_(),
00489     historyAppender_(new HistoryAppender),
00490     state_(sInit),
00491     event_loop_(),
00492     state_lock_(),
00493     stop_lock_(),
00494     stopper_(),
00495     starter_(),
00496     stop_count_(-1),
00497     last_rc_(epSuccess),
00498     last_error_text_(),
00499     id_set_(false),
00500     event_loop_id_(),
00501     my_sig_num_(getSigNum()),
00502     fb_(),
00503     looper_(),
00504     machine_(),
00505     principalCache_(),
00506     shouldWeStop_(false),
00507     stateMachineWasInErrorState_(false),
00508     fileMode_(),
00509     emptyRunLumiMode_(),
00510     exceptionMessageFiles_(),
00511     exceptionMessageRuns_(),
00512     exceptionMessageLumis_(),
00513     alreadyHandlingException_(false),
00514     forceLooperToEnd_(false),
00515     looperBeginJobRun_(false),
00516     forceESCacheClearOnNewRun_(false),
00517     numberOfForkedChildren_(0),
00518     numberOfSequentialEventsPerChild_(1),
00519     setCpuAffinity_(false),
00520     eventSetupDataToExcludeFromPrefetching_() {
00521     init(processDesc, token, legacy);
00522   }
00523 
00524 
00525   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00526     preProcessEventSignal_(),
00527     postProcessEventSignal_(),
00528     actReg_(),
00529     preg_(),
00530     serviceToken_(),
00531     input_(),
00532     esp_(),
00533     act_table_(),
00534     processConfiguration_(),
00535     schedule_(),
00536     subProcess_(),
00537     historyAppender_(new HistoryAppender),
00538     state_(sInit),
00539     event_loop_(),
00540     state_lock_(),
00541     stop_lock_(),
00542     stopper_(),
00543     starter_(),
00544     stop_count_(-1),
00545     last_rc_(epSuccess),
00546     last_error_text_(),
00547     id_set_(false),
00548     event_loop_id_(),
00549     my_sig_num_(getSigNum()),
00550     fb_(),
00551     looper_(),
00552     machine_(),
00553     principalCache_(),
00554     shouldWeStop_(false),
00555     stateMachineWasInErrorState_(false),
00556     fileMode_(),
00557     emptyRunLumiMode_(),
00558     exceptionMessageFiles_(),
00559     exceptionMessageRuns_(),
00560     exceptionMessageLumis_(),
00561     alreadyHandlingException_(false),
00562     forceLooperToEnd_(false),
00563     looperBeginJobRun_(false),
00564     forceESCacheClearOnNewRun_(false),
00565     numberOfForkedChildren_(0),
00566     numberOfSequentialEventsPerChild_(1),
00567     setCpuAffinity_(false),
00568     eventSetupDataToExcludeFromPrefetching_() {
00569     if(isPython) {
00570       boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00571       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00572       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00573     }
00574     else {
00575       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00576       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00577     }
00578   }
00579 
00580   void
00581   EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00582                         ServiceToken const& iToken,
00583                         serviceregistry::ServiceLegacy iLegacy) {
00584 
00585     //std::cerr << processDesc->dump() << std::endl;
00586     // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
00587     //  They must be cleared here because some processes run multiple EventProcessors in succession.
00588     BranchIDListHelper::clearRegistries();
00589 
00590     boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00591     //std::cerr << parameterSet->dump() << std::endl;
00592 
00593     // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
00594     boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00595 
00596     // Now set some parameters specific to the main process.
00597     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00598     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00599     emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00600     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00601     ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00602     numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00603     numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00604     setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00605     std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00606     for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00607         itPS != itPSEnd;
00608         ++itPS) {
00609       eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00610                                                 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00611                                                                itPS->getUntrackedParameter<std::string>("label", "")));
00612     }
00613     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00614 
00615     std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00616 
00617     // Now do general initialization
00618     ScheduleItems items;
00619 
00620     //initialize the services
00621     boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00622     ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00623     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00624 
00625     //make the services available
00626     ServiceRegistry::Operate operate(serviceToken_);
00627 
00628     // intialize miscellaneous items
00629     boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00630 
00631     // intialize the event setup provider
00632     esp_ = espController->makeProvider(*parameterSet, *common);
00633 
00634     // initialize the looper, if any
00635     looper_ = fillLooper(*esp_, *parameterSet, *common);
00636     if(looper_) {
00637       looper_->setActionTable(items.act_table_.get());
00638       looper_->attachTo(*items.actReg_);
00639     }
00640 
00641     // initialize the input source
00642     input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00643 
00644     // intialize the Schedule
00645     schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00646 
00647     // set the data members
00648     act_table_ = items.act_table_;
00649     actReg_ = items.actReg_;
00650     preg_ = items.preg_;
00651     processConfiguration_ = items.processConfiguration_;
00652 
00653     FDEBUG(2) << parameterSet << std::endl;
00654     connectSigs(this);
00655 
00656     // initialize the subprocess, if there is one
00657     if(subProcessParameterSet) {
00658       subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController, *actReg_, token, serviceregistry::kConfigurationOverrides));
00659     }
00660   }
00661 
00662   EventProcessor::~EventProcessor() {
00663     // Make the services available while everything is being deleted.
00664     ServiceToken token = getToken();
00665     ServiceRegistry::Operate op(token);
00666 
00667     // The state machine should have already been cleaned up
00668     // and destroyed at this point by a call to EndJob or
00669     // earlier when it completed processing events, but if it
00670     // has not been we'll take care of it here at the last moment.
00671     // This could cause problems if we are already handling an
00672     // exception and another one is thrown here ...  For a critical
00673     // executable the solution to this problem is for the code using
00674     // the EventProcessor to explicitly call EndJob or use runToCompletion,
00675     // then the next line of code is never executed.
00676     terminateMachine();
00677 
00678     try {
00679       changeState(mDtor);
00680     }
00681     catch(cms::Exception& e) {
00682       LogError("System")
00683         << e.explainSelf() << "\n";
00684     }
00685 
00686     // manually destroy all these thing that may need the services around
00687     subProcess_.reset();
00688     esp_.reset();
00689     schedule_.reset();
00690     input_.reset();
00691     looper_.reset();
00692     actReg_.reset();
00693 
00694     pset::Registry* psetRegistry = pset::Registry::instance();
00695     psetRegistry->data().clear();
00696     psetRegistry->extra().setID(ParameterSetID());
00697 
00698     EntryDescriptionRegistry::instance()->data().clear();
00699     ParentageRegistry::instance()->data().clear();
00700     ProcessConfigurationRegistry::instance()->data().clear();
00701     ProcessHistoryRegistry::instance()->data().clear();
00702     BranchIDListHelper::clearRegistries();
00703   }
00704 
00705   void
00706   EventProcessor::rewind() {
00707     beginJob(); //make sure this was called
00708     changeState(mStopAsync);
00709     changeState(mInputRewind);
00710     {
00711       StateSentry toerror(this);
00712 
00713       //make the services available
00714       ServiceRegistry::Operate operate(serviceToken_);
00715 
00716       {
00717         input_->repeat();
00718         input_->rewind();
00719       }
00720       changeState(mCountComplete);
00721       toerror.succeeded();
00722     }
00723     changeState(mFinished);
00724   }
00725 
00726   EventProcessor::StatusCode
00727   EventProcessor::run(int numberEventsToProcess, bool) {
00728     return runEventCount(numberEventsToProcess);
00729   }
00730 
00731   EventProcessor::StatusCode
00732   EventProcessor::skip(int numberToSkip) {
00733     beginJob(); //make sure this was called
00734     changeState(mSkip);
00735     {
00736       StateSentry toerror(this);
00737 
00738       //make the services available
00739       ServiceRegistry::Operate operate(serviceToken_);
00740 
00741       {
00742         input_->skipEvents(numberToSkip);
00743       }
00744       changeState(mCountComplete);
00745       toerror.succeeded();
00746     }
00747     changeState(mFinished);
00748     return epSuccess;
00749   }
00750 
00751   void
00752   EventProcessor::beginJob() {
00753     if(state_ != sInit) return;
00754     bk::beginJob();
00755     // can only be run if in the initial state
00756     changeState(mBeginJob);
00757 
00758     // StateSentry toerror(this); // should we add this ?
00759     //make the services available
00760     ServiceRegistry::Operate operate(serviceToken_);
00761 
00762     //NOTE:  This implementation assumes 'Job' means one call
00763     // the EventProcessor::run
00764     // If it really means once per 'application' then this code will
00765     // have to be changed.
00766     // Also have to deal with case where have 'run' then new Module
00767     // added and do 'run'
00768     // again.  In that case the newly added Module needs its 'beginJob'
00769     // to be called.
00770 
00771     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
00772     //  For now we delay calling beginOfJob until first beginOfRun
00773     //if(looper_) {
00774     //   looper_->beginOfJob(es);
00775     //}
00776     try {
00777       try {
00778         input_->doBeginJob();
00779       }
00780       catch (cms::Exception& e) { throw; }
00781       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00782       catch (std::exception& e) { convertException::stdToEDM(e); }
00783       catch(std::string& s) { convertException::stringToEDM(s); }
00784       catch(char const* c) { convertException::charPtrToEDM(c); }
00785       catch (...) { convertException::unknownToEDM(); }
00786     }
00787     catch(cms::Exception& ex) {
00788       ex.addContext("Calling beginJob for the source");
00789       throw;
00790     }
00791     schedule_->beginJob();
00792     // toerror.succeeded(); // should we add this?
00793     if(hasSubProcess()) subProcess_->doBeginJob();
00794     actReg_->postBeginJobSignal_();
00795   }
00796 
00797   void
00798   EventProcessor::endJob() {
00799     // Collects exceptions, so we don't throw before all operations are performed.
00800     ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00801 
00802     // only allowed to run if state is sIdle, sJobReady, sRunGiven
00803     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00804 
00805     //make the services available
00806     ServiceRegistry::Operate operate(serviceToken_);
00807 
00808     c.call(boost::bind(&EventProcessor::terminateMachine, this));
00809     schedule_->endJob(c);
00810     if(hasSubProcess()) {
00811       c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00812     }
00813     c.call(boost::bind(&InputSource::doEndJob, input_));
00814     if(looper_) {
00815       c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00816     }
00817     c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00818     if(c.hasThrown()) {
00819       c.rethrow();
00820     }
00821   }
00822 
00823   ServiceToken
00824   EventProcessor::getToken() {
00825     return serviceToken_;
00826   }
00827 
00828   //Setup signal handler to listen for when forked children stop
00829   namespace {
00830     //These are volatile since the compiler can not be allowed to optimize them
00831     // since they can be modified in the signaller handler
00832     volatile bool child_failed = false;
00833     volatile unsigned int num_children_done = 0;
00834     volatile int child_fail_exit_status = 0;
00835     volatile int child_fail_signal = 0;
00836 
00837     extern "C" {
00838       void ep_sigchld(int, siginfo_t*, void*) {
00839         //printf("in sigchld\n");
00840         //FDEBUG(1) << "in sigchld handler\n";
00841         int stat_loc;
00842         pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00843         while(0<p) {
00844           //printf("  looping\n");
00845           if(WIFEXITED(stat_loc)) {
00846             ++num_children_done;
00847             if(0 != WEXITSTATUS(stat_loc)) {
00848               child_fail_exit_status = WEXITSTATUS(stat_loc);
00849               child_failed = true;
00850             }
00851           }
00852           if(WIFSIGNALED(stat_loc)) {
00853             ++num_children_done;
00854             child_fail_signal = WTERMSIG(stat_loc);
00855             child_failed = true;
00856           }
00857           p = waitpid(-1, &stat_loc, WNOHANG);
00858         }
00859       }
00860     }
00861 
00862   }
00863 
00864   enum {
00865     kChildSucceed,
00866     kChildExitBadly,
00867     kChildSegv,
00868     kMaxChildAction
00869   };
00870 
00871   namespace {
00872     unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00873       unsigned int n = 0;
00874       while(numberOfChildren != 0) {
00875         ++n;
00876         numberOfChildren /= 10;
00877       }
00878       if(n == 0) {
00879         n = 3; // Protect against zero numberOfChildren
00880       }
00881       return n;
00882     }
00883     
00884     /*This class embodied the thread which is used to listen to the forked children and
00885      then tell them which events they should process */
00886     class MessageSenderToSource {
00887     public:
00888       MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00889       void operator()();
00890 
00891     private:
00892       const std::vector<int>& m_childrenPipes;
00893       long const m_nEventsToProcess;
00894       fd_set m_socketSet;
00895       unsigned int m_aliveChildren;
00896       int m_maxFd;
00897     };
00898     
00899     MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00900                                                  std::vector<int> const& childrenPipes,
00901                                                  long iNEventsToProcess):
00902     m_childrenPipes(childrenPipes),
00903     m_nEventsToProcess(iNEventsToProcess),
00904     m_aliveChildren(childrenSockets.size()),
00905     m_maxFd(0)
00906     {
00907       FD_ZERO(&m_socketSet);
00908       for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00909            it != itEnd; it++) {
00910         FD_SET(*it, &m_socketSet);
00911         if (*it > m_maxFd) {
00912           m_maxFd = *it;
00913         }
00914       }
00915       for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00916            it != itEnd; ++it) {
00917         FD_SET(*it, &m_socketSet);
00918         if (*it > m_maxFd) {
00919           m_maxFd = *it;
00920         }
00921       }
00922       m_maxFd++; // select reads [0,m_maxFd).
00923     }
00924    
00925     /* This function is the heart of the communication between parent and child.
00926      * When ready for more data, the child (see MessageReceiverForSource) requests
00927      * data through a AF_UNIX socket message.  The parent will then assign the next
00928      * chunk of data by sending a message back.
00929      *
00930      * Additionally, this function also monitors the read-side of the pipe fd from the child.
00931      * If the child dies unexpectedly, the pipe will be selected as ready for read and
00932      * will return EPIPE when read from.  Further, if the child thinks the parent has died
00933      * (defined as waiting more than 1s for a response), it will write a single byte to
00934      * the pipe.  If the parent has died, the child will get a EPIPE and throw an exception.
00935      * If still alive, the parent will read the byte and ignore it.
00936      *
00937      * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
00938      * handler can distinguish between success and failure cases.
00939      */
00940  
00941     void
00942     MessageSenderToSource::operator()() {
00943       multicore::MessageForParent childMsg;
00944       LogInfo("ForkingController") << "I am controller";
00945       //this is the master and therefore the controller
00946       
00947       multicore::MessageForSource sndmsg;
00948       sndmsg.startIndex = 0;
00949       sndmsg.nIndices = m_nEventsToProcess;
00950       do {
00951         
00952         fd_set readSockets, errorSockets;
00953         // Wait for a request from a child for events.
00954         memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00955         memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00956         // Note that we don't timeout; may be reconsidered in the future.
00957         ssize_t rc;
00958         while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00959         if (rc < 0) {
00960           std::cerr << "select failed; should be impossible due to preconditions.\n";
00961           abort();
00962           break;
00963         }
00964 
00965         // Read the message from the child.
00966         for (int idx=0; idx<m_maxFd; idx++) {
00967 
00968           // Handle errors
00969           if (FD_ISSET(idx, &errorSockets)) {
00970             LogInfo("ForkingController") << "Error on socket " << idx;
00971             FD_CLR(idx, &m_socketSet);
00972             close(idx);
00973             // See if it was the watchdog pipe that died.
00974             for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00975               if (*it == idx) {
00976                 m_aliveChildren--;
00977               }
00978             }
00979             continue;
00980           }
00981           
00982           if (!FD_ISSET(idx, &readSockets)) {
00983             continue;
00984           }
00985 
00986           // See if this FD is a child watchdog pipe.  If so, read from it to prevent
00987           // writes from blocking.
00988           bool is_pipe = false;
00989           for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00990               if (*it == idx) {
00991                 is_pipe = true;
00992                 char buf;
00993                 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00994                 if (rc <= 0) {
00995                   m_aliveChildren--;
00996                   FD_CLR(idx, &m_socketSet);
00997                   close(idx);
00998                 }
00999               }
01000           }
01001 
01002           // Only execute this block if the FD is a socket for sending the child work.
01003           if (!is_pipe) {
01004             while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
01005             if (rc < 0) {
01006               FD_CLR(idx, &m_socketSet);
01007               close(idx);
01008               continue;
01009             }
01010           
01011             // Tell the child what events to process.
01012             // If 'send' fails, then the child process has failed (any other possibilities are
01013             // eliminated because we are using fixed-size messages with Unix datagram sockets).
01014             // Thus, the SIGCHLD handler will fire and set child_fail = true.
01015             while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
01016             if (rc < 0) {
01017               FD_CLR(idx, &m_socketSet);
01018               close(idx);
01019               continue;
01020             }
01021             //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
01022             sndmsg.startIndex += sndmsg.nIndices;
01023           }
01024         }
01025       
01026       } while (m_aliveChildren > 0);
01027       
01028       return;
01029     }
01030 
01031   }
01032 
01033   bool
01034   EventProcessor::forkProcess(std::string const& jobReportFile) {
01035 
01036     if(0 == numberOfForkedChildren_) {return true;}
01037     assert(0<numberOfForkedChildren_);
01038     //do what we want done in common
01039     {
01040       beginJob(); //make sure this was run
01041       // make the services available
01042       ServiceRegistry::Operate operate(serviceToken_);
01043 
01044       InputSource::ItemType itemType;
01045       itemType = input_->nextItemType();
01046 
01047       assert(itemType == InputSource::IsFile);
01048       {
01049         readFile();
01050       }
01051       itemType = input_->nextItemType();
01052       assert(itemType == InputSource::IsRun);
01053 
01054       LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01055       IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01056                       input_->runAuxiliary()->beginTime());
01057       EventSetup const& es = esp_->eventSetupForInstance(ts);
01058 
01059       //now get all the data available in the EventSetup
01060       std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01061       es.fillAvailableRecordKeys(recordKeys);
01062       std::vector<eventsetup::DataKey> dataKeys;
01063       for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01064           itKey != itEnd;
01065           ++itKey) {
01066         eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01067         //see if this is on our exclusion list
01068         ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01069         ExcludedData const* excludedData(0);
01070         if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01071           excludedData = &(itExcludeRec->second);
01072           if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01073             //skip all items in this record
01074             continue;
01075           }
01076         }
01077         if(0 != recordPtr) {
01078           dataKeys.clear();
01079           recordPtr->fillRegisteredDataKeys(dataKeys);
01080           for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01081               itDataKey != itDataKeyEnd;
01082               ++itDataKey) {
01083             //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01084             if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01085               LogInfo("ForkingEventSetupPreFetching") << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01086               continue;
01087             }
01088             try {
01089               recordPtr->doGet(*itDataKey);
01090             } catch(cms::Exception& e) {
01091              LogWarning("ForkingEventSetupPreFetching") << e.what();
01092             }
01093           }
01094         }
01095       }
01096     }
01097     LogSystem("ForkingEventSetupPreFetching") <<"  done prefetching";
01098     {
01099       // make the services available
01100       ServiceRegistry::Operate operate(serviceToken_);
01101       Service<JobReport> jobReport;
01102       jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01103 
01104       //Now actually do the forking
01105       actReg_->preForkReleaseResourcesSignal_();
01106       input_->doPreForkReleaseResources();
01107       schedule_->preForkReleaseResources();
01108     }
01109     installCustomHandler(SIGCHLD, ep_sigchld);
01110 
01111 
01112     unsigned int childIndex = 0;
01113     unsigned int const kMaxChildren = numberOfForkedChildren_;
01114     unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01115     std::vector<pid_t> childrenIds;
01116     childrenIds.reserve(kMaxChildren);
01117     std::vector<int> childrenSockets;
01118     childrenSockets.reserve(kMaxChildren);
01119     std::vector<int> childrenPipes;
01120     childrenPipes.reserve(kMaxChildren);
01121     std::vector<int> childrenSocketsCopy;
01122     childrenSocketsCopy.reserve(kMaxChildren);
01123     std::vector<int> childrenPipesCopy;
01124     childrenPipesCopy.reserve(kMaxChildren);
01125     int pipes[2];
01126 
01127     {
01128       // make the services available
01129       ServiceRegistry::Operate operate(serviceToken_);
01130       Service<JobReport> jobReport;
01131       int sockets[2], fd_flags;
01132       for(; childIndex < kMaxChildren; ++childIndex) {
01133         // Create a UNIX_DGRAM socket pair
01134         if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01135           printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01136           exit(EXIT_FAILURE);
01137         }
01138         if (pipe(pipes)) {
01139           printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01140           exit(EXIT_FAILURE);
01141         }
01142         // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
01143         if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01144           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01145           exit(EXIT_FAILURE);
01146         }
01147         // Mark socket as non-block.  Child must be careful to do select prior
01148         // to reading from socket.
01149         if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01150           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01151           exit(EXIT_FAILURE);
01152         }
01153         if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01154           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01155           exit(EXIT_FAILURE);
01156         }
01157         if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01158           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01159           exit(EXIT_FAILURE);
01160         }
01161         // Linux man page notes there are some edge cases where reading from a
01162         // fd can block, even after a select.
01163         if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01164           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01165           exit(EXIT_FAILURE);
01166         }
01167         if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01168           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01169           exit(EXIT_FAILURE);
01170         }
01171 
01172         childrenPipesCopy = childrenPipes;
01173         childrenSocketsCopy = childrenSockets;
01174 
01175         pid_t value = fork();
01176         if(value == 0) {
01177           // Close the parent's side of the socket and pipe which will talk to us.
01178           close(pipes[0]);
01179           close(sockets[0]);
01180           // Close our copies of the parent's other communication pipes.
01181           for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01182             close(*it);
01183           }
01184           for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01185             close(*it);
01186           }
01187 
01188           // this is the child process, redirect stdout and stderr to a log file
01189           fflush(stdout);
01190           fflush(stderr);
01191           std::stringstream stout;
01192           stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01193           if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01194             LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01195           }
01196           if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01197             LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01198           }
01199 
01200           LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01201           if(setCpuAffinity_) {
01202             // CPU affinity is handled differently on macosx.
01203             // We disable it and print a message until someone reads:
01204             //
01205             // https://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
01206             //
01207             // and implements it.
01208 #ifdef __APPLE__
01209             LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01210 #else
01211             LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01212             cpu_set_t mask;
01213             CPU_ZERO(&mask);
01214             CPU_SET(childIndex, &mask);
01215             if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01216               LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01217               exit(-1);
01218             }
01219 #endif
01220           }
01221           break;
01222         } else {
01223           //this is the parent
01224           close(pipes[1]);
01225           close(sockets[1]);
01226         }
01227         if(value < 0) {
01228           LogError("ForkingChild") << "failed to create a child";
01229           exit(-1);
01230         }
01231         childrenIds.push_back(value);
01232         childrenSockets.push_back(sockets[0]);
01233         childrenPipes.push_back(pipes[0]);
01234       }
01235 
01236       if(childIndex < kMaxChildren) {
01237         jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01238         actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01239 
01240         boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01241         input_->doPostForkReacquireResources(receiver);
01242         schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01243         //NOTE: sources have to reset themselves by listening to the post fork message
01244         //rewindInput();
01245         return true;
01246       }
01247       jobReport->parentAfterFork(jobReportFile);
01248     }
01249 
01250     //this is the original, which is now the master for all the children
01251 
01252     //Need to wait for signals from the children or externally
01253     // To wait we must
01254     // 1) block the signals we want to wait on so we do not have a race condition
01255     // 2) check that we haven't already meet our ending criteria
01256     // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
01257     sigset_t blockingSigSet;
01258     sigset_t unblockingSigSet;
01259     sigset_t oldSigSet;
01260     pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01261     pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01262     sigaddset(&blockingSigSet, SIGCHLD);
01263     sigaddset(&blockingSigSet, SIGUSR2);
01264     sigaddset(&blockingSigSet, SIGINT);
01265     sigdelset(&unblockingSigSet, SIGCHLD);
01266     sigdelset(&unblockingSigSet, SIGUSR2);
01267     sigdelset(&unblockingSigSet, SIGINT);
01268     pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01269 
01270     // If there are too many fd's (unlikely, but possible) for select, denote this 
01271     // because the sender will fail.
01272     bool too_many_fds = false;
01273     if (pipes[1]+1 > FD_SETSIZE) {
01274       LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01275       too_many_fds = true;
01276     }
01277 
01278     //create a thread that sends the units of work to workers
01279     // we create it after all signals were blocked so that this
01280     // thread is never interupted by a signal
01281     MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01282     boost::thread senderThread(sender);
01283 
01284     while(!too_many_fds && !shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01285       sigsuspend(&unblockingSigSet);
01286       LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01287     }
01288     pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01289 
01290     LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01291     if(child_failed) {
01292       LogError("ForkingStopping") << "child failed";
01293     }
01294     if(shutdown_flag) {
01295       LogSystem("ForkingStopping") << "asked to shutdown";
01296     }
01297 
01298     if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01299       LogInfo("ForkingStopping") << "must stop children" << std::endl;
01300       for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01301           it != itEnd; ++it) {
01302         /* int result = */ kill(*it, SIGUSR2);
01303       }
01304       pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01305       while(num_children_done != kMaxChildren) {
01306         sigsuspend(&unblockingSigSet);
01307       }
01308       pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01309     }
01310     // The senderThread will notice the pipes die off, one by one.  Once all children are gone, it will exit.
01311     senderThread.join();
01312     if(child_failed) {
01313       if (child_fail_signal) {
01314         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01315       } else if (child_fail_exit_status) {
01316         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01317       } else {
01318         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01319       }
01320     }
01321     if(too_many_fds) {
01322       throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01323     }
01324     return false;
01325   }
01326 
01327   void
01328   EventProcessor::connectSigs(EventProcessor* ep) {
01329     // When the FwkImpl signals are given, pass them to the
01330     // appropriate EventProcessor signals so that the outside world
01331     // can see the signal.
01332     actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01333     actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01334   }
01335 
01336   std::vector<ModuleDescription const*>
01337   EventProcessor::getAllModuleDescriptions() const {
01338     return schedule_->getAllModuleDescriptions();
01339   }
01340 
01341   int
01342   EventProcessor::totalEvents() const {
01343     return schedule_->totalEvents();
01344   }
01345 
01346   int
01347   EventProcessor::totalEventsPassed() const {
01348     return schedule_->totalEventsPassed();
01349   }
01350 
01351   int
01352   EventProcessor::totalEventsFailed() const {
01353     return schedule_->totalEventsFailed();
01354   }
01355 
01356   void
01357   EventProcessor::enableEndPaths(bool active) {
01358     schedule_->enableEndPaths(active);
01359   }
01360 
01361   bool
01362   EventProcessor::endPathsEnabled() const {
01363     return schedule_->endPathsEnabled();
01364   }
01365 
01366   void
01367   EventProcessor::getTriggerReport(TriggerReport& rep) const {
01368     schedule_->getTriggerReport(rep);
01369   }
01370 
01371   void
01372   EventProcessor::clearCounters() {
01373     schedule_->clearCounters();
01374   }
01375 
01376   char const* EventProcessor::currentStateName() const {
01377     return stateName(getState());
01378   }
01379 
01380   char const* EventProcessor::stateName(State s) const {
01381     return stateNames[s];
01382   }
01383 
01384   char const* EventProcessor::msgName(Msg m) const {
01385     return msgNames[m];
01386   }
01387 
01388   State EventProcessor::getState() const {
01389     return state_;
01390   }
01391 
01392   EventProcessor::StatusCode EventProcessor::statusAsync() const {
01393     // the thread will record exception/error status in the event processor
01394     // for us to look at and report here
01395     return last_rc_;
01396   }
01397 
01398   void
01399   EventProcessor::setRunNumber(RunNumber_t runNumber) {
01400     if(runNumber == 0) {
01401       runNumber = 1;
01402       LogWarning("Invalid Run")
01403         << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01404         << "Run number was set to 1 instead\n";
01405     }
01406 
01407     // inside of beginJob there is a check to see if it has been called before
01408     beginJob();
01409     changeState(mSetRun);
01410 
01411     // interface not correct yet
01412     input_->setRunNumber(runNumber);
01413   }
01414 
01415   void
01416   EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
01417     // inside of beginJob there is a check to see if it has been called before
01418     beginJob();
01419     changeState(mSetRun);
01420 
01421     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
01422     //input_->declareRunNumber(runNumber);
01423   }
01424 
01425   EventProcessor::StatusCode
01426   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01427     bool rc = true;
01428     boost::xtime timeout;
01429     boost::xtime_get(&timeout, boost::TIME_UTC);
01430     timeout.sec += timeout_seconds;
01431 
01432     // make sure to include a timeout here so we don't wait forever
01433     // I suspect there are still timing issues with thread startup
01434     // and the setting of the various control variables (stop_count, id_set)
01435     {
01436       boost::mutex::scoped_lock sl(stop_lock_);
01437 
01438       // look here - if runAsync not active, just return the last return code
01439       if(stop_count_ < 0) return last_rc_;
01440 
01441       if(timeout_seconds == 0) {
01442         while(stop_count_ == 0) stopper_.wait(sl);
01443       } else {
01444         while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01445       }
01446 
01447       if(rc == false) {
01448           // timeout occurred
01449           // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
01450           // this is a temporary hack until we get the input source
01451           // upgraded to allow blocking input sources to be unblocked
01452 
01453           // the next line is dangerous and causes all sorts of trouble
01454           if(id_set_) pthread_cancel(event_loop_id_);
01455 
01456           // we will not do anything yet
01457           LogWarning("timeout")
01458             << "An asynchronous request was made to shut down "
01459             << "the event loop "
01460             << "and the event loop did not shutdown after "
01461             << timeout_seconds << " seconds\n";
01462       } else {
01463           event_loop_->join();
01464           event_loop_.reset();
01465           id_set_ = false;
01466           stop_count_ = -1;
01467       }
01468     }
01469     return rc == false ? epTimedOut : last_rc_;
01470   }
01471 
01472   EventProcessor::StatusCode
01473   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01474     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01475     if(rc != epTimedOut) changeState(mCountComplete);
01476     else errorState();
01477     return rc;
01478   }
01479 
01480 
01481   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01482     changeState(mStopAsync);
01483     StatusCode rc = waitForAsyncCompletion(secs);
01484     if(rc != epTimedOut) changeState(mFinished);
01485     else errorState();
01486     return rc;
01487   }
01488 
01489   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01490     changeState(mShutdownAsync);
01491     StatusCode rc = waitForAsyncCompletion(secs);
01492     if(rc != epTimedOut) changeState(mFinished);
01493     else errorState();
01494     return rc;
01495   }
01496 
01497   void EventProcessor::errorState() {
01498     state_ = sError;
01499   }
01500 
01501   // next function irrelevant now
01502   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01503     // make sure to include a timeout here so we don't wait forever
01504     // I suspect there are still timing issues with thread startup
01505     // and the setting of the various control variables (stop_count, id_set)
01506     changeState(m);
01507     return waitForAsyncCompletion(60*2);
01508   }
01509 
01510   void EventProcessor::changeState(Msg msg) {
01511     // most likely need to serialize access to this routine
01512 
01513     boost::mutex::scoped_lock sl(state_lock_);
01514     State curr = state_;
01515     int rc;
01516     // found if(not end of table) and
01517     // (state == table.state && (msg == table.message || msg == any))
01518     for(rc = 0;
01519         table[rc].current != sInvalid &&
01520           (curr != table[rc].current ||
01521            (curr == table[rc].current &&
01522              msg != table[rc].message && table[rc].message != mAny));
01523         ++rc);
01524 
01525     if(table[rc].current == sInvalid)
01526       throw cms::Exception("BadState")
01527         << "A member function of EventProcessor has been called in an"
01528         << " inappropriate order.\n"
01529         << "Bad transition from " << stateName(curr) << " "
01530         << "using message " << msgName(msg) << "\n"
01531         << "No where to go from here.\n";
01532 
01533     FDEBUG(1) << "changeState: current=" << stateName(curr)
01534               << ", message=" << msgName(msg)
01535               << " -> new=" << stateName(table[rc].final) << "\n";
01536 
01537     state_ = table[rc].final;
01538   }
01539 
01540   void EventProcessor::runAsync() {
01541     beginJob();
01542     {
01543       boost::mutex::scoped_lock sl(stop_lock_);
01544       if(id_set_ == true) {
01545           std::string err("runAsync called while async event loop already running\n");
01546           LogError("FwkJob") << err;
01547           throw cms::Exception("BadState") << err;
01548       }
01549 
01550       changeState(mRunAsync);
01551 
01552       stop_count_ = 0;
01553       last_rc_ = epSuccess; // forget the last value!
01554       event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01555       boost::xtime timeout;
01556       boost::xtime_get(&timeout, boost::TIME_UTC);
01557       timeout.sec += 60; // 60 seconds to start!!!!
01558       if(starter_.timed_wait(sl, timeout) == false) {
01559           // yikes - the thread did not start
01560           throw cms::Exception("BadState")
01561             << "Async run thread did not start in 60 seconds\n";
01562       }
01563     }
01564   }
01565 
01566   void EventProcessor::asyncRun(EventProcessor* me) {
01567     // set up signals to allow for interruptions
01568     // ignore all other signals
01569     // make sure no exceptions escape out
01570 
01571     // temporary hack until we modify the input source to allow
01572     // wakeup calls from other threads.  This mimics the solution
01573     // in EventFilter/Processor, which I do not like.
01574     // allowing cancels means that the thread just disappears at
01575     // certain points.  This is bad for C++ stack variables.
01576     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01577     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
01578     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01579     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01580 
01581     {
01582       boost::mutex::scoped_lock(me->stop_lock_);
01583       me->event_loop_id_ = pthread_self();
01584       me->id_set_ = true;
01585       me->starter_.notify_all();
01586     }
01587 
01588     Status rc = epException;
01589     FDEBUG(2) << "asyncRun starting ......................\n";
01590 
01591     try {
01592       bool onlineStateTransitions = true;
01593       rc = me->runToCompletion(onlineStateTransitions);
01594     }
01595     catch (cms::Exception& e) {
01596       LogError("FwkJob") << "cms::Exception caught in "
01597                          << "EventProcessor::asyncRun"
01598                          << "\n"
01599                          << e.explainSelf();
01600       me->last_error_text_ = e.explainSelf();
01601     }
01602     catch (std::exception& e) {
01603       LogError("FwkJob") << "Standard library exception caught in "
01604                          << "EventProcessor::asyncRun"
01605                          << "\n"
01606                          << e.what();
01607       me->last_error_text_ = e.what();
01608     }
01609     catch (...) {
01610       LogError("FwkJob") << "Unknown exception caught in "
01611                          << "EventProcessor::asyncRun"
01612                          << "\n";
01613       me->last_error_text_ = "Unknown exception caught";
01614       rc = epOther;
01615     }
01616 
01617     me->last_rc_ = rc;
01618 
01619     {
01620       // notify anyone waiting for exit that we are doing so now
01621       boost::mutex::scoped_lock sl(me->stop_lock_);
01622       ++me->stop_count_;
01623       me->stopper_.notify_all();
01624     }
01625     FDEBUG(2) << "asyncRun ending ......................\n";
01626   }
01627 
01628 
01629   EventProcessor::StatusCode
01630   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01631 
01632     StateSentry toerror(this);
01633 
01634     int numberOfEventsToProcess = -1;
01635     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01636 
01637     if(machine_.get() != 0) {
01638       throw Exception(errors::LogicError)
01639         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01640         << "Please report this error to the Framework group\n";
01641     }
01642 
01643     toerror.succeeded();
01644 
01645     return returnCode;
01646   }
01647 
01648   EventProcessor::StatusCode
01649   EventProcessor::runEventCount(int numberOfEventsToProcess) {
01650 
01651     StateSentry toerror(this);
01652 
01653     bool onlineStateTransitions = false;
01654     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01655 
01656     toerror.succeeded();
01657 
01658     return returnCode;
01659   }
01660 
01661   EventProcessor::StatusCode
01662   EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01663 
01664     // Reusable event principal
01665     boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_, historyAppender_.get()));
01666     principalCache_.insert(ep);
01667 
01668     beginJob(); //make sure this was called
01669 
01670     if(!onlineStateTransitions) changeState(mRunCount);
01671 
01672     StatusCode returnCode = epSuccess;
01673     stateMachineWasInErrorState_ = false;
01674 
01675     // make the services available
01676     ServiceRegistry::Operate operate(serviceToken_);
01677 
01678     if(machine_.get() == 0) {
01679 
01680       statemachine::FileMode fileMode;
01681       if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01682       else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01683       else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01684       else {
01685          throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01686             << fileMode_ << ".\n"
01687             << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01688       }
01689 
01690       statemachine::EmptyRunLumiMode emptyRunLumiMode;
01691       if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01692       else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01693       else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01694       else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01695       else {
01696          throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01697             << emptyRunLumiMode_ << ".\n"
01698             << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01699       }
01700 
01701       machine_.reset(new statemachine::Machine(this,
01702                                                fileMode,
01703                                                emptyRunLumiMode));
01704 
01705       machine_->initiate();
01706     }
01707 
01708     try {
01709       try {
01710 
01711         InputSource::ItemType itemType;
01712 
01713         int iEvents = 0;
01714 
01715         while(true) {
01716 
01717           itemType = input_->nextItemType();
01718 
01719           FDEBUG(1) << "itemType = " << itemType << "\n";
01720 
01721           // These are used for asynchronous running only and
01722           // and are checking to see if stopAsync or shutdownAsync
01723           // were called from another thread.  In the future, we
01724           // may need to do something better than polling the state.
01725           // With the current code this is the simplest thing and
01726           // it should always work.  If the interaction between
01727           // threads becomes more complex this may cause problems.
01728           if(state_ == sStopping) {
01729             FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01730             forceLooperToEnd_ = true;
01731             machine_->process_event(statemachine::Stop());
01732             forceLooperToEnd_ = false;
01733             break;
01734           }
01735           else if(state_ == sShuttingDown) {
01736             FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01737             forceLooperToEnd_ = true;
01738             machine_->process_event(statemachine::Stop());
01739             forceLooperToEnd_ = false;
01740             break;
01741           }
01742 
01743           // Look for a shutdown signal
01744           {
01745             boost::mutex::scoped_lock sl(usr2_lock);
01746             if(shutdown_flag) {
01747               changeState(mShutdownSignal);
01748               returnCode = epSignal;
01749               forceLooperToEnd_ = true;
01750               machine_->process_event(statemachine::Stop());
01751               forceLooperToEnd_ = false;
01752               break;
01753             }
01754           }
01755 
01756           if(itemType == InputSource::IsStop) {
01757             machine_->process_event(statemachine::Stop());
01758           }
01759           else if(itemType == InputSource::IsFile) {
01760             machine_->process_event(statemachine::File());
01761           }
01762           else if(itemType == InputSource::IsRun) {
01763             machine_->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01764           }
01765           else if(itemType == InputSource::IsLumi) {
01766             machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01767           }
01768           else if(itemType == InputSource::IsEvent) {
01769             machine_->process_event(statemachine::Event());
01770             ++iEvents;
01771             if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01772               returnCode = epCountComplete;
01773               changeState(mInputExhausted);
01774               FDEBUG(1) << "Event count complete, pausing event loop\n";
01775               break;
01776             }
01777           }
01778           // This should be impossible
01779           else {
01780             throw Exception(errors::LogicError)
01781               << "Unknown next item type passed to EventProcessor\n"
01782               << "Please report this error to the Framework group\n";
01783           }
01784 
01785           if(machine_->terminated()) {
01786             changeState(mInputExhausted);
01787             break;
01788           }
01789         }  // End of loop over state machine events
01790       } // Try block
01791       catch (cms::Exception& e) { throw; }
01792       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01793       catch (std::exception& e) { convertException::stdToEDM(e); }
01794       catch(std::string& s) { convertException::stringToEDM(s); }
01795       catch(char const* c) { convertException::charPtrToEDM(c); }
01796       catch (...) { convertException::unknownToEDM(); }
01797     } // Try block
01798     // Some comments on exception handling related to the boost state machine:
01799     //
01800     // Some states used in the machine are special because they
01801     // perform actions while the machine is being terminated, actions
01802     // such as close files, call endRun, call endLumi etc ...  Each of these
01803     // states has two functions that perform these actions.  The functions
01804     // are almost identical.  The major difference is that one version
01805     // catches all exceptions and the other lets exceptions pass through.
01806     // The destructor catches them and the other function named "exit" lets
01807     // them pass through.  On a normal termination, boost will always call
01808     // "exit" and then the state destructor.  In our state classes, the
01809     // the destructors do nothing if the exit function already took
01810     // care of things.  Here's the interesting part.  When boost is
01811     // handling an exception the "exit" function is not called (a boost
01812     // feature).
01813     //
01814     // If an exception occurs while the boost machine is in control
01815     // (which usually means inside a process_event call), then
01816     // the boost state machine destroys its states and "terminates" itself.
01817     // This already done before we hit the catch blocks below. In this case
01818     // the call to terminateMachine below only destroys an already
01819     // terminated state machine.  Because exit is not called, the state destructors
01820     // handle cleaning up lumis, runs, and files.  The destructors swallow
01821     // all exceptions and only pass through the exceptions messages, which
01822     // are tacked onto the original exception below.
01823     //
01824     // If an exception occurs when the boost state machine is not
01825     // in control (outside the process_event functions), then boost
01826     // cannot destroy its own states.  The terminateMachine function
01827     // below takes care of that.  The flag "alreadyHandlingException"
01828     // is set true so that the state exit functions do nothing (and
01829     // cannot throw more exceptions while handling the first).  Then the
01830     // state destructors take care of this because exit did nothing.
01831     //
01832     // In both cases above, the EventProcessor::endOfLoop function is
01833     // not called because it can throw exceptions.
01834     //
01835     // One tricky aspect of the state machine is that things that can
01836     // throw should not be invoked by the state machine while another
01837     // exception is being handled.
01838     // Another tricky aspect is that it appears to be important to
01839     // terminate the state machine before invoking its destructor.
01840     // We've seen crashes that are not understood when that is not
01841     // done.  Maintainers of this code should be careful about this.
01842 
01843     catch (cms::Exception & e) {
01844       alreadyHandlingException_ = true;
01845       terminateMachine();
01846       alreadyHandlingException_ = false;
01847       if (!exceptionMessageLumis_.empty()) {
01848         e.addAdditionalInfo(exceptionMessageLumis_);
01849         if (e.alreadyPrinted()) {
01850           LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01851         }
01852       }
01853       if (!exceptionMessageRuns_.empty()) {
01854         e.addAdditionalInfo(exceptionMessageRuns_);
01855         if (e.alreadyPrinted()) {
01856           LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01857         }
01858       }
01859       if (!exceptionMessageFiles_.empty()) {
01860         e.addAdditionalInfo(exceptionMessageFiles_);
01861         if (e.alreadyPrinted()) {
01862           LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01863         }
01864       }
01865       throw;
01866     }
01867 
01868     if(machine_->terminated()) {
01869       FDEBUG(1) << "The state machine reports it has been terminated\n";
01870       machine_.reset();
01871     }
01872 
01873     if(!onlineStateTransitions) changeState(mFinished);
01874 
01875     if(stateMachineWasInErrorState_) {
01876       throw cms::Exception("BadState")
01877         << "The boost state machine in the EventProcessor exited after\n"
01878         << "entering the Error state.\n";
01879     }
01880 
01881     return returnCode;
01882   }
01883 
01884   void EventProcessor::readFile() {
01885     FDEBUG(1) << " \treadFile\n";
01886     fb_ = input_->readFile();
01887     if(numberOfForkedChildren_ > 0) {
01888         fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01889     }
01890   }
01891 
01892   void EventProcessor::closeInputFile() {
01893     if (fb_.get() != 0) {
01894       input_->closeFile(fb_);
01895     }
01896     FDEBUG(1) << "\tcloseInputFile\n";
01897   }
01898 
01899   void EventProcessor::openOutputFiles() {
01900     if (fb_.get() != 0) {
01901       schedule_->openOutputFiles(*fb_);
01902       if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01903     }
01904     FDEBUG(1) << "\topenOutputFiles\n";
01905   }
01906 
01907   void EventProcessor::closeOutputFiles() {
01908     if (fb_.get() != 0) {
01909       schedule_->closeOutputFiles();
01910       if(hasSubProcess()) subProcess_->closeOutputFiles();
01911     }
01912     FDEBUG(1) << "\tcloseOutputFiles\n";
01913   }
01914 
01915   void EventProcessor::respondToOpenInputFile() {
01916     if (fb_.get() != 0) {
01917       schedule_->respondToOpenInputFile(*fb_);
01918       if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01919     }
01920     FDEBUG(1) << "\trespondToOpenInputFile\n";
01921   }
01922 
01923   void EventProcessor::respondToCloseInputFile() {
01924     if (fb_.get() != 0) {
01925       schedule_->respondToCloseInputFile(*fb_);
01926       if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01927     }
01928     FDEBUG(1) << "\trespondToCloseInputFile\n";
01929   }
01930 
01931   void EventProcessor::respondToOpenOutputFiles() {
01932     if (fb_.get() != 0) {
01933       schedule_->respondToOpenOutputFiles(*fb_);
01934       if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01935     }
01936     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01937   }
01938 
01939   void EventProcessor::respondToCloseOutputFiles() {
01940     if (fb_.get() != 0) {
01941       schedule_->respondToCloseOutputFiles(*fb_);
01942       if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01943     }
01944     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01945   }
01946 
01947   void EventProcessor::startingNewLoop() {
01948     shouldWeStop_ = false;
01949     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
01950     // until after we've called beginOfJob
01951     if(looper_ && looperBeginJobRun_) {
01952       looper_->doStartingNewLoop();
01953     }
01954     FDEBUG(1) << "\tstartingNewLoop\n";
01955   }
01956 
01957   bool EventProcessor::endOfLoop() {
01958     if(looper_) {
01959       ModuleChanger changer(schedule_.get());
01960       looper_->setModuleChanger(&changer);
01961       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01962       looper_->setModuleChanger(0);
01963       if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01964       else return false;
01965     }
01966     FDEBUG(1) << "\tendOfLoop\n";
01967     return true;
01968   }
01969 
01970   void EventProcessor::rewindInput() {
01971     input_->repeat();
01972     input_->rewind();
01973     FDEBUG(1) << "\trewind\n";
01974   }
01975 
01976   void EventProcessor::prepareForNextLoop() {
01977     looper_->prepareForNextLoop(esp_.get());
01978     FDEBUG(1) << "\tprepareForNextLoop\n";
01979   }
01980 
01981   bool EventProcessor::shouldWeCloseOutput() const {
01982     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01983     return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01984   }
01985 
01986   void EventProcessor::doErrorStuff() {
01987     FDEBUG(1) << "\tdoErrorStuff\n";
01988     LogError("StateMachine")
01989       << "The EventProcessor state machine encountered an unexpected event\n"
01990       << "and went to the error state\n"
01991       << "Will attempt to terminate processing normally\n"
01992       << "(IF using the looper the next loop will be attempted)\n"
01993       << "This likely indicates a bug in an input module or corrupted input or both\n";
01994     stateMachineWasInErrorState_ = true;
01995   }
01996 
01997   void EventProcessor::beginRun(statemachine::Run const& run) {
01998     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01999     input_->doBeginRun(runPrincipal);
02000     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
02001                     runPrincipal.beginTime());
02002     if(forceESCacheClearOnNewRun_){
02003       esp_->forceCacheClear();
02004     }
02005     EventSetup const& es = esp_->eventSetupForInstance(ts);
02006     if(looper_ && looperBeginJobRun_== false) {
02007       looper_->copyInfo(ScheduleInfo(schedule_.get()));
02008       looper_->beginOfJob(es);
02009       looperBeginJobRun_ = true;
02010       looper_->doStartingNewLoop();
02011     }
02012     {
02013       typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
02014       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02015       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
02016       if(hasSubProcess()) {
02017         subProcess_->doBeginRun(runPrincipal, ts);
02018       }
02019     }
02020     FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
02021     if(looper_) {
02022       looper_->doBeginRun(runPrincipal, es);
02023     }
02024   }
02025 
02026   void EventProcessor::endRun(statemachine::Run const& run) {
02027     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02028     input_->doEndRun(runPrincipal);
02029     IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
02030                     runPrincipal.endTime());
02031     EventSetup const& es = esp_->eventSetupForInstance(ts);
02032     {
02033       typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02034       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02035       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
02036       if(hasSubProcess()) {
02037         subProcess_->doEndRun(runPrincipal, ts);
02038       }
02039     }
02040     FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02041     if(looper_) {
02042       looper_->doEndRun(runPrincipal, es);
02043     }
02044   }
02045 
02046   void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
02047     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02048     input_->doBeginLumi(lumiPrincipal);
02049 
02050     Service<RandomNumberGenerator> rng;
02051     if(rng.isAvailable()) {
02052       LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02053       rng->preBeginLumi(lb);
02054     }
02055 
02056     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
02057     // lumi blocks know their start and end times why not also start and end events?
02058     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02059     EventSetup const& es = esp_->eventSetupForInstance(ts);
02060     {
02061       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02062       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02063       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02064       if(hasSubProcess()) {
02065         subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02066       }
02067     }
02068     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02069     if(looper_) {
02070       looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02071     }
02072   }
02073 
02074   void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
02075     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02076     input_->doEndLumi(lumiPrincipal);
02077     //NOTE: Using the max event number for the end of a lumi block is a bad idea
02078     // lumi blocks know their start and end times why not also start and end events?
02079     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02080                     lumiPrincipal.endTime());
02081     EventSetup const& es = esp_->eventSetupForInstance(ts);
02082     {
02083       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02084       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02085       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02086       if(hasSubProcess()) {
02087         subProcess_->doEndLuminosityBlock(lumiPrincipal, ts);
02088       }
02089     }
02090     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02091     if(looper_) {
02092       looper_->doEndLuminosityBlock(lumiPrincipal, es);
02093     }
02094   }
02095 
02096   statemachine::Run EventProcessor::readAndCacheRun(bool merge) {
02097     input_->readAndCacheRun(merge, *historyAppender_);
02098     input_->markRun();
02099     return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02100   }
02101 
02102   int EventProcessor::readAndCacheLumi(bool merge) {
02103     input_->readAndCacheLumi(merge, *historyAppender_);
02104     input_->markLumi();
02105     return input_->luminosityBlock();
02106   }
02107 
02108   void EventProcessor::writeRun(statemachine::Run const& run) {
02109     schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02110     if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02111     FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02112   }
02113 
02114   void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02115     principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02116     if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02117     FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02118   }
02119 
02120   void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
02121     schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02122     if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02123     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02124   }
02125 
02126   void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
02127     principalCache_.deleteLumi(phid, run, lumi);
02128     if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02129     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02130   }
02131 
02132   void EventProcessor::readAndProcessEvent() {
02133     EventPrincipal *pep = 0;
02134     try {
02135       try {
02136         pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
02137         FDEBUG(1) << "\treadEvent\n";
02138       }
02139       catch (cms::Exception& e) { throw; }
02140       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
02141       catch (std::exception& e) { convertException::stdToEDM(e); }
02142       catch(std::string& s) { convertException::stringToEDM(s); }
02143       catch(char const* c) { convertException::charPtrToEDM(c); }
02144       catch (...) { convertException::unknownToEDM(); }
02145     }
02146     catch(cms::Exception& ex) {
02147       ex.addContext("Calling readEvent in the input source");
02148       throw;
02149     }
02150     assert(pep != 0);
02151 
02152     IOVSyncValue ts(pep->id(), pep->time());
02153     EventSetup const& es = esp_->eventSetupForInstance(ts);
02154     {
02155       typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02156       ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02157       schedule_->processOneOccurrence<Traits>(*pep, es);
02158       if(hasSubProcess()) {
02159         subProcess_->doEvent(*pep, ts);
02160       }
02161     }
02162 
02163     if(looper_) {
02164       bool randomAccess = input_->randomAccess();
02165       ProcessingController::ForwardState forwardState = input_->forwardState();
02166       ProcessingController::ReverseState reverseState = input_->reverseState();
02167       ProcessingController pc(forwardState, reverseState, randomAccess);
02168 
02169       EDLooperBase::Status status = EDLooperBase::kContinue;
02170       do {
02171         status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02172 
02173         bool succeeded = true;
02174         if(randomAccess) {
02175           if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02176             input_->skipEvents(-2);
02177           }
02178           else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02179             succeeded = input_->goToEvent(pc.specifiedEventTransition());
02180           }
02181         }
02182         pc.setLastOperationSucceeded(succeeded);
02183       } while(!pc.lastOperationSucceeded());
02184       if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02185 
02186     }
02187 
02188     FDEBUG(1) << "\tprocessEvent\n";
02189     pep->clearEventPrincipal();
02190   }
02191 
02192   bool EventProcessor::shouldWeStop() const {
02193     FDEBUG(1) << "\tshouldWeStop\n";
02194     if(shouldWeStop_) return true;
02195     return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02196   }
02197 
02198   void EventProcessor::setExceptionMessageFiles(std::string& message) {
02199     exceptionMessageFiles_ = message;
02200   }
02201 
02202   void EventProcessor::setExceptionMessageRuns(std::string& message) {
02203     exceptionMessageRuns_ = message;
02204   }
02205 
02206   void EventProcessor::setExceptionMessageLumis(std::string& message) {
02207     exceptionMessageLumis_ = message;
02208   }
02209 
02210   bool EventProcessor::alreadyHandlingException() const {
02211     return alreadyHandlingException_;
02212   }
02213 
02214   void EventProcessor::terminateMachine() {
02215     if(machine_.get() != 0) {
02216       if(!machine_->terminated()) {
02217         forceLooperToEnd_ = true;
02218         machine_->process_event(statemachine::Stop());
02219         forceLooperToEnd_ = false;
02220       }
02221       else {
02222         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
02223       }
02224       if(machine_->terminated()) {
02225         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02226       }
02227       machine_.reset();
02228     }
02229   }
02230 }