CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/FWCore/Framework/src/EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011 
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036 
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038 
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046 
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049 
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057 
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060 
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063 
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069 
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072 
00073 //Used for forking
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080 
00081 //Used for CPU affinity
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085 
00086 //Needed for introspection
00087 #include "Cintex/Cintex.h"
00088 
00089 namespace edm {
00090 
00091   namespace event_processor {
00092 
00093     class StateSentry {
00094     public:
00095       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00096       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00097       void succeeded() {success_ = true;}
00098 
00099     private:
00100       EventProcessor* ep_;
00101       bool success_;
00102     };
00103   }
00104 
00105   using namespace event_processor;
00106 
00107   namespace {
00108     template <typename T>
00109     class ScheduleSignalSentry {
00110     public:
00111       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00112            a_(a), principal_(principal), es_(es), allowThrow_(false) {
00113         if (a_) T::preScheduleSignal(a_, principal_);
00114       }
00115       ~ScheduleSignalSentry() noexcept(false) {
00116         try {
00117           if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00118         } catch(...) {
00119           if( allowThrow_) {throw;}
00120         }
00121       }
00122 
00123       void allowThrow() {
00124         allowThrow_ = true;
00125       }
00126     private:
00127       // We own none of these resources.
00128       ActivityRegistry* a_;
00129       typename T::MyPrincipal* principal_;
00130       EventSetup const* es_;
00131       bool allowThrow_ =true;
00132     };
00133   }
00134 
00135   namespace {
00136 
00137     // the next two tables must be kept in sync with the state and
00138     // message enums from the header
00139 
00140     char const* stateNames[] = {
00141       "Init",
00142       "JobReady",
00143       "RunGiven",
00144       "Running",
00145       "Stopping",
00146       "ShuttingDown",
00147       "Done",
00148       "JobEnded",
00149       "Error",
00150       "ErrorEnded",
00151       "End",
00152       "Invalid"
00153     };
00154 
00155     char const* msgNames[] = {
00156       "SetRun",
00157       "Skip",
00158       "RunAsync",
00159       "Run(ID)",
00160       "Run(count)",
00161       "BeginJob",
00162       "StopAsync",
00163       "ShutdownAsync",
00164       "EndJob",
00165       "CountComplete",
00166       "InputExhausted",
00167       "StopSignal",
00168       "ShutdownSignal",
00169       "Finished",
00170       "Any",
00171       "dtor",
00172       "Exception",
00173       "Rewind"
00174     };
00175   }
00176     // IMPORTANT NOTE:
00177     // the mAny messages are special, they must appear last in the
00178     // table if multiple entries for a CurrentState are present.
00179     // the changeState function does not use the mAny yet!!!
00180 
00181     struct TransEntry {
00182       State current;
00183       Msg   message;
00184       State final;
00185     };
00186 
00187     // we should use this information to initialize a two dimensional
00188     // table of t[CurrentState][Message] = FinalState
00189 
00190     /*
00191       the way this is current written, the async run can thread function
00192       can return in the "JobReady" state - but not yet cleaned up.  The
00193       problem is that only when stop/shutdown async is called is the
00194       thread cleaned up. But the stop/shudown async functions attempt
00195       first to change the state using messages that are not valid in
00196       "JobReady" state.
00197 
00198       I think most of the problems can be solved by using two states
00199       for "running": RunningS and RunningA (sync/async). The problems
00200       seems to be the all the transitions out of running for both
00201       modes of operation.  The other solution might be to only go to
00202       "Stopping" from Running, and use the return code from "run_p" to
00203       set the final state.  If this is used, then in the sync mode the
00204       "Stopping" state would be momentary.
00205 
00206      */
00207 
00208     TransEntry table[] = {
00209     // CurrentState   Message         FinalState
00210     // -----------------------------------------
00211       { sInit,          mException,      sError },
00212       { sInit,          mBeginJob,       sJobReady },
00213       { sJobReady,      mException,      sError },
00214       { sJobReady,      mSetRun,         sRunGiven },
00215       { sJobReady,      mInputRewind,    sRunning },
00216       { sJobReady,      mSkip,           sRunning },
00217       { sJobReady,      mRunID,          sRunning },
00218       { sJobReady,      mRunCount,       sRunning },
00219       { sJobReady,      mEndJob,         sJobEnded },
00220       { sJobReady,      mBeginJob,       sJobReady },
00221       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00222 
00223       { sJobReady,      mStopAsync,      sJobReady },
00224       { sJobReady,      mCountComplete,  sJobReady },
00225       { sJobReady,      mFinished,       sJobReady },
00226 
00227       { sRunGiven,      mException,      sError },
00228       { sRunGiven,      mRunAsync,       sRunning },
00229       { sRunGiven,      mBeginJob,       sRunGiven },
00230       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00231       { sRunGiven,      mStopAsync,      sStopping },
00232       { sRunning,       mException,      sError },
00233       { sRunning,       mStopAsync,      sStopping },
00234       { sRunning,       mShutdownAsync,  sShuttingDown },
00235       { sRunning,       mShutdownSignal, sShuttingDown },
00236       { sRunning,       mCountComplete,  sStopping }, // sJobReady
00237       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00238 
00239       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00240       { sStopping,      mException,      sError },
00241       { sStopping,      mFinished,       sJobReady },
00242       { sStopping,      mCountComplete,  sJobReady },
00243       { sStopping,      mShutdownSignal, sShuttingDown },
00244       { sStopping,      mStopAsync,      sStopping },     // stay
00245       { sStopping,      mInputExhausted, sStopping },     // stay
00246       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00247       { sShuttingDown,  mException,      sError },
00248       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00249       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00250       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00251       { sShuttingDown,  mFinished,       sDone },
00252       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00253       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00254       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00255       { sDone,          mEndJob,         sJobEnded },
00256       { sDone,          mException,      sError },
00257       { sJobEnded,      mDtor,           sEnd },
00258       { sJobEnded,      mException,      sError },
00259       { sError,         mEndJob,         sError },   // funny one here
00260       { sError,         mDtor,           sError },   // funny one here
00261       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00262       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00263       { sInvalid,       mAny,            sInvalid }
00264     };
00265 
00266 
00267     // Note: many of the messages generate the mBeginJob message first
00268     //  mRunID, mRunCount, mSetRun
00269 
00270   // ---------------------------------------------------------------
00271   std::unique_ptr<InputSource>
00272   makeInput(ParameterSet& params,
00273             CommonParams const& common,
00274             ProductRegistry& preg,
00275             boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
00276             boost::shared_ptr<ActivityRegistry> areg,
00277             boost::shared_ptr<ProcessConfiguration const> processConfiguration) {
00278     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00279     if(main_input == 0) {
00280       throw Exception(errors::Configuration)
00281         << "There must be exactly one source in the configuration.\n"
00282         << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00283     }
00284 
00285     std::string modtype(main_input->getParameter<std::string>("@module_type"));
00286 
00287     std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00288       ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00289     ConfigurationDescriptions descriptions(filler->baseType());
00290     filler->fill(descriptions);
00291 
00292     try {
00293       try {
00294         descriptions.validate(*main_input, std::string("source"));
00295       }
00296       catch (cms::Exception& e) { throw; }
00297       catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00298       catch (std::exception& e) { convertException::stdToEDM(e); }
00299       catch (std::string& s) { convertException::stringToEDM(s); }
00300       catch (char const* c) { convertException::charPtrToEDM(c); }
00301       catch (...) { convertException::unknownToEDM(); }
00302     }
00303     catch (cms::Exception & iException) {
00304       std::ostringstream ost;
00305       ost << "Validating configuration of input source of type " << modtype;
00306       iException.addContext(ost.str());
00307       throw;
00308     }
00309 
00310     main_input->registerIt();
00311 
00312     // Fill in "ModuleDescription", in case the input source produces
00313     // any EDproducts, which would be registered in the ProductRegistry.
00314     // Also fill in the process history item for this process.
00315     // There is no module label for the unnamed input source, so
00316     // just use "source".
00317     // Only the tracked parameters belong in the process configuration.
00318     ModuleDescription md(main_input->id(),
00319                          main_input->getParameter<std::string>("@module_type"),
00320                          "source",
00321                          processConfiguration.get());
00322 
00323     InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_);
00324     areg->preSourceConstructionSignal_(md);
00325     std::unique_ptr<InputSource> input;
00326     try {
00327       try {
00328         input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00329       }
00330       catch (cms::Exception& e) { throw; }
00331       catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00332       catch (std::exception& e) { convertException::stdToEDM(e); }
00333       catch (std::string& s) { convertException::stringToEDM(s); }
00334       catch (char const* c) { convertException::charPtrToEDM(c); }
00335       catch (...) { convertException::unknownToEDM(); }
00336     }
00337     catch (cms::Exception& iException) {
00338       areg->postSourceConstructionSignal_(md);
00339       std::ostringstream ost;
00340       ost << "Constructing input source of type " << modtype;
00341       iException.addContext(ost.str());
00342       throw;
00343     }
00344     areg->postSourceConstructionSignal_(md);
00345     return input;
00346   }
00347 
00348   // ---------------------------------------------------------------
00349   boost::shared_ptr<EDLooperBase>
00350   fillLooper(eventsetup::EventSetupsController& esController,
00351              eventsetup::EventSetupProvider& cp,
00352                          ParameterSet& params) {
00353     boost::shared_ptr<EDLooperBase> vLooper;
00354 
00355     std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00356 
00357     if(loopers.size() == 0) {
00358        return vLooper;
00359     }
00360 
00361     assert(1 == loopers.size());
00362 
00363     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00364         itName != itNameEnd;
00365         ++itName) {
00366 
00367       ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00368       providerPSet->registerIt();
00369       vLooper = eventsetup::LooperFactory::get()->addTo(esController,
00370                                                         cp,
00371                                                         *providerPSet);
00372       }
00373       return vLooper;
00374 
00375   }
00376 
00377   // ---------------------------------------------------------------
00378   EventProcessor::EventProcessor(std::string const& config,
00379                                 ServiceToken const& iToken,
00380                                 serviceregistry::ServiceLegacy iLegacy,
00381                                 std::vector<std::string> const& defaultServices,
00382                                 std::vector<std::string> const& forcedServices) :
00383     preProcessEventSignal_(),
00384     postProcessEventSignal_(),
00385     actReg_(),
00386     preg_(),
00387     branchIDListHelper_(),
00388     serviceToken_(),
00389     input_(),
00390     espController_(new eventsetup::EventSetupsController),
00391     esp_(),
00392     act_table_(),
00393     processConfiguration_(),
00394     schedule_(),
00395     subProcess_(),
00396     historyAppender_(new HistoryAppender),
00397     state_(sInit),
00398     event_loop_(),
00399     state_lock_(),
00400     stop_lock_(),
00401     stopper_(),
00402     starter_(),
00403     stop_count_(-1),
00404     last_rc_(epSuccess),
00405     last_error_text_(),
00406     id_set_(false),
00407     event_loop_id_(),
00408     my_sig_num_(getSigNum()),
00409     fb_(),
00410     looper_(),
00411     principalCache_(),
00412     shouldWeStop_(false),
00413     stateMachineWasInErrorState_(false),
00414     fileMode_(),
00415     emptyRunLumiMode_(),
00416     exceptionMessageFiles_(),
00417     exceptionMessageRuns_(),
00418     exceptionMessageLumis_(),
00419     alreadyHandlingException_(false),
00420     forceLooperToEnd_(false),
00421     looperBeginJobRun_(false),
00422     forceESCacheClearOnNewRun_(false),
00423     numberOfForkedChildren_(0),
00424     numberOfSequentialEventsPerChild_(1),
00425     setCpuAffinity_(false),
00426     eventSetupDataToExcludeFromPrefetching_() {
00427     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00428     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00429     processDesc->addServices(defaultServices, forcedServices);
00430     init(processDesc, iToken, iLegacy);
00431   }
00432 
00433   EventProcessor::EventProcessor(std::string const& config,
00434                                 std::vector<std::string> const& defaultServices,
00435                                 std::vector<std::string> const& forcedServices) :
00436     preProcessEventSignal_(),
00437     postProcessEventSignal_(),
00438     actReg_(),
00439     preg_(),
00440     branchIDListHelper_(),
00441     serviceToken_(),
00442     input_(),
00443     espController_(new eventsetup::EventSetupsController),
00444     esp_(),
00445     act_table_(),
00446     processConfiguration_(),
00447     schedule_(),
00448     subProcess_(),
00449     historyAppender_(new HistoryAppender),
00450     state_(sInit),
00451     event_loop_(),
00452     state_lock_(),
00453     stop_lock_(),
00454     stopper_(),
00455     starter_(),
00456     stop_count_(-1),
00457     last_rc_(epSuccess),
00458     last_error_text_(),
00459     id_set_(false),
00460     event_loop_id_(),
00461     my_sig_num_(getSigNum()),
00462     fb_(),
00463     looper_(),
00464     principalCache_(),
00465     shouldWeStop_(false),
00466     stateMachineWasInErrorState_(false),
00467     fileMode_(),
00468     emptyRunLumiMode_(),
00469     exceptionMessageFiles_(),
00470     exceptionMessageRuns_(),
00471     exceptionMessageLumis_(),
00472     alreadyHandlingException_(false),
00473     forceLooperToEnd_(false),
00474     looperBeginJobRun_(false),
00475     forceESCacheClearOnNewRun_(false),
00476     numberOfForkedChildren_(0),
00477     numberOfSequentialEventsPerChild_(1),
00478     setCpuAffinity_(false),
00479     eventSetupDataToExcludeFromPrefetching_() {
00480     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00481     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00482     processDesc->addServices(defaultServices, forcedServices);
00483     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00484   }
00485 
00486   EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00487                  ServiceToken const& token,
00488                  serviceregistry::ServiceLegacy legacy) :
00489     preProcessEventSignal_(),
00490     postProcessEventSignal_(),
00491     actReg_(),
00492     preg_(),
00493     branchIDListHelper_(),
00494     serviceToken_(),
00495     input_(),
00496     espController_(new eventsetup::EventSetupsController),
00497     esp_(),
00498     act_table_(),
00499     processConfiguration_(),
00500     schedule_(),
00501     subProcess_(),
00502     historyAppender_(new HistoryAppender),
00503     state_(sInit),
00504     event_loop_(),
00505     state_lock_(),
00506     stop_lock_(),
00507     stopper_(),
00508     starter_(),
00509     stop_count_(-1),
00510     last_rc_(epSuccess),
00511     last_error_text_(),
00512     id_set_(false),
00513     event_loop_id_(),
00514     my_sig_num_(getSigNum()),
00515     fb_(),
00516     looper_(),
00517     principalCache_(),
00518     shouldWeStop_(false),
00519     stateMachineWasInErrorState_(false),
00520     fileMode_(),
00521     emptyRunLumiMode_(),
00522     exceptionMessageFiles_(),
00523     exceptionMessageRuns_(),
00524     exceptionMessageLumis_(),
00525     alreadyHandlingException_(false),
00526     forceLooperToEnd_(false),
00527     looperBeginJobRun_(false),
00528     forceESCacheClearOnNewRun_(false),
00529     numberOfForkedChildren_(0),
00530     numberOfSequentialEventsPerChild_(1),
00531     setCpuAffinity_(false),
00532     eventSetupDataToExcludeFromPrefetching_() {
00533     init(processDesc, token, legacy);
00534   }
00535 
00536 
00537   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00538     preProcessEventSignal_(),
00539     postProcessEventSignal_(),
00540     actReg_(),
00541     preg_(),
00542     branchIDListHelper_(),
00543     serviceToken_(),
00544     input_(),
00545     espController_(new eventsetup::EventSetupsController),
00546     esp_(),
00547     act_table_(),
00548     processConfiguration_(),
00549     schedule_(),
00550     subProcess_(),
00551     historyAppender_(new HistoryAppender),
00552     state_(sInit),
00553     event_loop_(),
00554     state_lock_(),
00555     stop_lock_(),
00556     stopper_(),
00557     starter_(),
00558     stop_count_(-1),
00559     last_rc_(epSuccess),
00560     last_error_text_(),
00561     id_set_(false),
00562     event_loop_id_(),
00563     my_sig_num_(getSigNum()),
00564     fb_(),
00565     looper_(),
00566     principalCache_(),
00567     shouldWeStop_(false),
00568     stateMachineWasInErrorState_(false),
00569     fileMode_(),
00570     emptyRunLumiMode_(),
00571     exceptionMessageFiles_(),
00572     exceptionMessageRuns_(),
00573     exceptionMessageLumis_(),
00574     alreadyHandlingException_(false),
00575     forceLooperToEnd_(false),
00576     looperBeginJobRun_(false),
00577     forceESCacheClearOnNewRun_(false),
00578     numberOfForkedChildren_(0),
00579     numberOfSequentialEventsPerChild_(1),
00580     setCpuAffinity_(false),
00581     eventSetupDataToExcludeFromPrefetching_() {
00582     if(isPython) {
00583       boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00584       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00585       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00586     }
00587     else {
00588       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00589       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00590     }
00591   }
00592 
00593   void
00594   EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00595                         ServiceToken const& iToken,
00596                         serviceregistry::ServiceLegacy iLegacy) {
00597 
00598     //std::cerr << processDesc->dump() << std::endl;
00599    
00600     ROOT::Cintex::Cintex::Enable();
00601 
00602     boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00603     //std::cerr << parameterSet->dump() << std::endl;
00604 
00605     // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
00606     boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00607 
00608     // Now set some parameters specific to the main process.
00609     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00610     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00611     emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00612     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00613     ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00614     numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00615     numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00616     setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00617     continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
00618     std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00619     for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00620         itPS != itPSEnd;
00621         ++itPS) {
00622       eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00623                                                 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00624                                                                itPS->getUntrackedParameter<std::string>("label", "")));
00625     }
00626     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00627 
00628     // Now do general initialization
00629     ScheduleItems items;
00630 
00631     //initialize the services
00632     boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00633     ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00634     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00635 
00636     //make the services available
00637     ServiceRegistry::Operate operate(serviceToken_);
00638 
00639     // intialize miscellaneous items
00640     boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00641 
00642     // intialize the event setup provider
00643     esp_ = espController_->makeProvider(*parameterSet);
00644 
00645     // initialize the looper, if any
00646     looper_ = fillLooper(*espController_, *esp_, *parameterSet);
00647     if(looper_) {
00648       looper_->setActionTable(items.act_table_.get());
00649       looper_->attachTo(*items.actReg_);
00650     }
00651 
00652     // initialize the input source
00653     input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
00654 
00655     // intialize the Schedule
00656     schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00657 
00658     // set the data members
00659     act_table_ = std::move(items.act_table_);
00660     actReg_ = items.actReg_;
00661     preg_.reset(items.preg_.release());
00662     branchIDListHelper_ = items.branchIDListHelper_;
00663     processConfiguration_ = items.processConfiguration_;
00664 
00665     FDEBUG(2) << parameterSet << std::endl;
00666     connectSigs(this);
00667 
00668     // Reusable event principal
00669     boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
00670     principalCache_.insert(ep);
00671       
00672     // initialize the subprocess, if there is one
00673     if(subProcessParameterSet) {
00674       subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
00675     }
00676   }
00677 
00678   EventProcessor::~EventProcessor() {
00679     // Make the services available while everything is being deleted.
00680     ServiceToken token = getToken();
00681     ServiceRegistry::Operate op(token);
00682     try {
00683       changeState(mDtor);
00684     }
00685     catch(cms::Exception& e) {
00686       LogError("System")
00687         << e.explainSelf() << "\n";
00688     }
00689 
00690     // manually destroy all these thing that may need the services around
00691     espController_.reset();
00692     subProcess_.reset();
00693     esp_.reset();
00694     schedule_.reset();
00695     input_.reset();
00696     looper_.reset();
00697     actReg_.reset();
00698 
00699     pset::Registry* psetRegistry = pset::Registry::instance();
00700     psetRegistry->data().clear();
00701     psetRegistry->extra().setID(ParameterSetID());
00702 
00703     EntryDescriptionRegistry::instance()->data().clear();
00704     ParentageRegistry::instance()->data().clear();
00705     ProcessConfigurationRegistry::instance()->data().clear();
00706     ProcessHistoryRegistry::instance()->data().clear();
00707   }
00708 
00709   void
00710   EventProcessor::beginJob() {
00711     if(state_ != sInit) return;
00712     bk::beginJob();
00713     // can only be run if in the initial state
00714     changeState(mBeginJob);
00715 
00716     // StateSentry toerror(this); // should we add this ?
00717     //make the services available
00718     ServiceRegistry::Operate operate(serviceToken_);
00719 
00720     //NOTE:  This implementation assumes 'Job' means one call
00721     // the EventProcessor::run
00722     // If it really means once per 'application' then this code will
00723     // have to be changed.
00724     // Also have to deal with case where have 'run' then new Module
00725     // added and do 'run'
00726     // again.  In that case the newly added Module needs its 'beginJob'
00727     // to be called.
00728 
00729     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
00730     //  For now we delay calling beginOfJob until first beginOfRun
00731     //if(looper_) {
00732     //   looper_->beginOfJob(es);
00733     //}
00734     try {
00735       try {
00736         input_->doBeginJob();
00737       }
00738       catch (cms::Exception& e) { throw; }
00739       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00740       catch (std::exception& e) { convertException::stdToEDM(e); }
00741       catch(std::string& s) { convertException::stringToEDM(s); }
00742       catch(char const* c) { convertException::charPtrToEDM(c); }
00743       catch (...) { convertException::unknownToEDM(); }
00744     }
00745     catch(cms::Exception& ex) {
00746       ex.addContext("Calling beginJob for the source");
00747       throw;
00748     }
00749     schedule_->beginJob(*preg_);
00750     // toerror.succeeded(); // should we add this?
00751     if(hasSubProcess()) subProcess_->doBeginJob();
00752     actReg_->postBeginJobSignal_();
00753   }
00754 
00755   void
00756   EventProcessor::endJob() {
00757     // Collects exceptions, so we don't throw before all operations are performed.
00758     ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00759 
00760     // only allowed to run if state is sIdle, sJobReady, sRunGiven
00761     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00762 
00763     //make the services available
00764     ServiceRegistry::Operate operate(serviceToken_);
00765 
00766     schedule_->endJob(c);
00767     if(hasSubProcess()) {
00768       c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00769     }
00770     c.call(boost::bind(&InputSource::doEndJob, input_.get()));
00771     if(looper_) {
00772       c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00773     }
00774     auto actReg = actReg_.get();
00775     c.call([actReg](){actReg->postEndJobSignal_();});
00776     if(c.hasThrown()) {
00777       c.rethrow();
00778     }
00779   }
00780 
00781   ServiceToken
00782   EventProcessor::getToken() {
00783     return serviceToken_;
00784   }
00785 
00786   //Setup signal handler to listen for when forked children stop
00787   namespace {
00788     //These are volatile since the compiler can not be allowed to optimize them
00789     // since they can be modified in the signaller handler
00790     volatile bool child_failed = false;
00791     volatile unsigned int num_children_done = 0;
00792     volatile int child_fail_exit_status = 0;
00793     volatile int child_fail_signal = 0;
00794     
00795     //NOTE: We setup the signal handler to run in the main thread which
00796     // is also the same thread that then reads the above values
00797 
00798     extern "C" {
00799       void ep_sigchld(int, siginfo_t*, void*) {
00800         //printf("in sigchld\n");
00801         //FDEBUG(1) << "in sigchld handler\n";
00802         int stat_loc;
00803         pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00804         while(0<p) {
00805           //printf("  looping\n");
00806           if(WIFEXITED(stat_loc)) {
00807             ++num_children_done;
00808             if(0 != WEXITSTATUS(stat_loc)) {
00809               child_fail_exit_status = WEXITSTATUS(stat_loc);
00810               child_failed = true;
00811             }
00812           }
00813           if(WIFSIGNALED(stat_loc)) {
00814             ++num_children_done;
00815             child_fail_signal = WTERMSIG(stat_loc);
00816             child_failed = true;
00817           }
00818           p = waitpid(-1, &stat_loc, WNOHANG);
00819         }
00820       }
00821     }
00822 
00823   }
00824 
00825   enum {
00826     kChildSucceed,
00827     kChildExitBadly,
00828     kChildSegv,
00829     kMaxChildAction
00830   };
00831 
00832   namespace {
00833     unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00834       unsigned int n = 0;
00835       while(numberOfChildren != 0) {
00836         ++n;
00837         numberOfChildren /= 10;
00838       }
00839       if(n == 0) {
00840         n = 3; // Protect against zero numberOfChildren
00841       }
00842       return n;
00843     }
00844     
00845     /*This class embodied the thread which is used to listen to the forked children and
00846      then tell them which events they should process */
00847     class MessageSenderToSource {
00848     public:
00849       MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00850       void operator()();
00851 
00852     private:
00853       const std::vector<int>& m_childrenPipes;
00854       long const m_nEventsToProcess;
00855       fd_set m_socketSet;
00856       unsigned int m_aliveChildren;
00857       int m_maxFd;
00858     };
00859     
00860     MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00861                                                  std::vector<int> const& childrenPipes,
00862                                                  long iNEventsToProcess):
00863     m_childrenPipes(childrenPipes),
00864     m_nEventsToProcess(iNEventsToProcess),
00865     m_aliveChildren(childrenSockets.size()),
00866     m_maxFd(0)
00867     {
00868       FD_ZERO(&m_socketSet);
00869       for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00870            it != itEnd; it++) {
00871         FD_SET(*it, &m_socketSet);
00872         if (*it > m_maxFd) {
00873           m_maxFd = *it;
00874         }
00875       }
00876       for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00877            it != itEnd; ++it) {
00878         FD_SET(*it, &m_socketSet);
00879         if (*it > m_maxFd) {
00880           m_maxFd = *it;
00881         }
00882       }
00883       m_maxFd++; // select reads [0,m_maxFd).
00884     }
00885    
00886     /* This function is the heart of the communication between parent and child.
00887      * When ready for more data, the child (see MessageReceiverForSource) requests
00888      * data through a AF_UNIX socket message.  The parent will then assign the next
00889      * chunk of data by sending a message back.
00890      *
00891      * Additionally, this function also monitors the read-side of the pipe fd from the child.
00892      * If the child dies unexpectedly, the pipe will be selected as ready for read and
00893      * will return EPIPE when read from.  Further, if the child thinks the parent has died
00894      * (defined as waiting more than 1s for a response), it will write a single byte to
00895      * the pipe.  If the parent has died, the child will get a EPIPE and throw an exception.
00896      * If still alive, the parent will read the byte and ignore it.
00897      *
00898      * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
00899      * handler can distinguish between success and failure cases.
00900      */
00901  
00902     void
00903     MessageSenderToSource::operator()() {
00904       multicore::MessageForParent childMsg;
00905       LogInfo("ForkingController") << "I am controller";
00906       //this is the master and therefore the controller
00907       
00908       multicore::MessageForSource sndmsg;
00909       sndmsg.startIndex = 0;
00910       sndmsg.nIndices = m_nEventsToProcess;
00911       do {
00912         
00913         fd_set readSockets, errorSockets;
00914         // Wait for a request from a child for events.
00915         memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00916         memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00917         // Note that we don't timeout; may be reconsidered in the future.
00918         ssize_t rc;
00919         while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00920         if (rc < 0) {
00921           std::cerr << "select failed; should be impossible due to preconditions.\n";
00922           abort();
00923           break;
00924         }
00925 
00926         // Read the message from the child.
00927         for (int idx=0; idx<m_maxFd; idx++) {
00928 
00929           // Handle errors
00930           if (FD_ISSET(idx, &errorSockets)) {
00931             LogInfo("ForkingController") << "Error on socket " << idx;
00932             FD_CLR(idx, &m_socketSet);
00933             close(idx);
00934             // See if it was the watchdog pipe that died.
00935             for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00936               if (*it == idx) {
00937                 m_aliveChildren--;
00938               }
00939             }
00940             continue;
00941           }
00942           
00943           if (!FD_ISSET(idx, &readSockets)) {
00944             continue;
00945           }
00946 
00947           // See if this FD is a child watchdog pipe.  If so, read from it to prevent
00948           // writes from blocking.
00949           bool is_pipe = false;
00950           for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00951               if (*it == idx) {
00952                 is_pipe = true;
00953                 char buf;
00954                 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00955                 if (rc <= 0) {
00956                   m_aliveChildren--;
00957                   FD_CLR(idx, &m_socketSet);
00958                   close(idx);
00959                 }
00960               }
00961           }
00962 
00963           // Only execute this block if the FD is a socket for sending the child work.
00964           if (!is_pipe) {
00965             while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
00966             if (rc < 0) {
00967               FD_CLR(idx, &m_socketSet);
00968               close(idx);
00969               continue;
00970             }
00971           
00972             // Tell the child what events to process.
00973             // If 'send' fails, then the child process has failed (any other possibilities are
00974             // eliminated because we are using fixed-size messages with Unix datagram sockets).
00975             // Thus, the SIGCHLD handler will fire and set child_fail = true.
00976             while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
00977             if (rc < 0) {
00978               FD_CLR(idx, &m_socketSet);
00979               close(idx);
00980               continue;
00981             }
00982             //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
00983             sndmsg.startIndex += sndmsg.nIndices;
00984           }
00985         }
00986       
00987       } while (m_aliveChildren > 0);
00988       
00989       return;
00990     }
00991 
00992   }
00993 
00994   
00995   void EventProcessor::possiblyContinueAfterForkChildFailure() {
00996     if(child_failed && continueAfterChildFailure_) {
00997       if (child_fail_signal) {
00998         LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
00999         child_fail_signal=0;
01000       } else if (child_fail_exit_status) {
01001         LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01002         child_fail_exit_status=0;
01003       } else {
01004         LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01005       }
01006       child_failed =false;
01007     }
01008   }
01009 
01010   bool
01011   EventProcessor::forkProcess(std::string const& jobReportFile) {
01012 
01013     if(0 == numberOfForkedChildren_) {return true;}
01014     assert(0<numberOfForkedChildren_);
01015     //do what we want done in common
01016     {
01017       beginJob(); //make sure this was run
01018       // make the services available
01019       ServiceRegistry::Operate operate(serviceToken_);
01020 
01021       InputSource::ItemType itemType;
01022       itemType = input_->nextItemType();
01023 
01024       assert(itemType == InputSource::IsFile);
01025       {
01026         readFile();
01027       }
01028       itemType = input_->nextItemType();
01029       assert(itemType == InputSource::IsRun);
01030 
01031       LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01032       IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01033                       input_->runAuxiliary()->beginTime());
01034       espController_->eventSetupForInstance(ts);
01035       EventSetup const& es = esp_->eventSetup();
01036 
01037       //now get all the data available in the EventSetup
01038       std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01039       es.fillAvailableRecordKeys(recordKeys);
01040       std::vector<eventsetup::DataKey> dataKeys;
01041       for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01042           itKey != itEnd;
01043           ++itKey) {
01044         eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01045         //see if this is on our exclusion list
01046         ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01047         ExcludedData const* excludedData(nullptr);
01048         if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01049           excludedData = &(itExcludeRec->second);
01050           if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01051             //skip all items in this record
01052             continue;
01053           }
01054         }
01055         if(0 != recordPtr) {
01056           dataKeys.clear();
01057           recordPtr->fillRegisteredDataKeys(dataKeys);
01058           for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01059               itDataKey != itDataKeyEnd;
01060               ++itDataKey) {
01061             //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01062             if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01063               LogInfo("ForkingEventSetupPreFetching") << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01064               continue;
01065             }
01066             try {
01067               recordPtr->doGet(*itDataKey);
01068             } catch(cms::Exception& e) {
01069              LogWarning("ForkingEventSetupPreFetching") << e.what();
01070             }
01071           }
01072         }
01073       }
01074     }
01075     LogSystem("ForkingEventSetupPreFetching") <<"  done prefetching";
01076     {
01077       // make the services available
01078       ServiceRegistry::Operate operate(serviceToken_);
01079       Service<JobReport> jobReport;
01080       jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01081 
01082       //Now actually do the forking
01083       actReg_->preForkReleaseResourcesSignal_();
01084       input_->doPreForkReleaseResources();
01085       schedule_->preForkReleaseResources();
01086     }
01087     installCustomHandler(SIGCHLD, ep_sigchld);
01088 
01089 
01090     unsigned int childIndex = 0;
01091     unsigned int const kMaxChildren = numberOfForkedChildren_;
01092     unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01093     std::vector<pid_t> childrenIds;
01094     childrenIds.reserve(kMaxChildren);
01095     std::vector<int> childrenSockets;
01096     childrenSockets.reserve(kMaxChildren);
01097     std::vector<int> childrenPipes;
01098     childrenPipes.reserve(kMaxChildren);
01099     std::vector<int> childrenSocketsCopy;
01100     childrenSocketsCopy.reserve(kMaxChildren);
01101     std::vector<int> childrenPipesCopy;
01102     childrenPipesCopy.reserve(kMaxChildren);
01103     int pipes[] {0, 0};
01104 
01105     {
01106       // make the services available
01107       ServiceRegistry::Operate operate(serviceToken_);
01108       Service<JobReport> jobReport;
01109       int sockets[2], fd_flags;
01110       for(; childIndex < kMaxChildren; ++childIndex) {
01111         // Create a UNIX_DGRAM socket pair
01112         if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01113           printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01114           exit(EXIT_FAILURE);
01115         }
01116         if (pipe(pipes)) {
01117           printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01118           exit(EXIT_FAILURE);
01119         }
01120         // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
01121         if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01122           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01123           exit(EXIT_FAILURE);
01124         }
01125         // Mark socket as non-block.  Child must be careful to do select prior
01126         // to reading from socket.
01127         if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01128           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01129           exit(EXIT_FAILURE);
01130         }
01131         if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01132           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01133           exit(EXIT_FAILURE);
01134         }
01135         if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01136           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01137           exit(EXIT_FAILURE);
01138         }
01139         // Linux man page notes there are some edge cases where reading from a
01140         // fd can block, even after a select.
01141         if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01142           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01143           exit(EXIT_FAILURE);
01144         }
01145         if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01146           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01147           exit(EXIT_FAILURE);
01148         }
01149 
01150         childrenPipesCopy = childrenPipes;
01151         childrenSocketsCopy = childrenSockets;
01152 
01153         pid_t value = fork();
01154         if(value == 0) {
01155           // Close the parent's side of the socket and pipe which will talk to us.
01156           close(pipes[0]);
01157           close(sockets[0]);
01158           // Close our copies of the parent's other communication pipes.
01159           for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01160             close(*it);
01161           }
01162           for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01163             close(*it);
01164           }
01165 
01166           // this is the child process, redirect stdout and stderr to a log file
01167           fflush(stdout);
01168           fflush(stderr);
01169           std::stringstream stout;
01170           stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01171           if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01172             LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01173           }
01174           if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01175             LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01176           }
01177 
01178           LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01179           if(setCpuAffinity_) {
01180             // CPU affinity is handled differently on macosx.
01181             // We disable it and print a message until someone reads:
01182             //
01183             // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
01184             //
01185             // and implements it.
01186 #ifdef __APPLE__
01187             LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01188 #else
01189             LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01190             cpu_set_t mask;
01191             CPU_ZERO(&mask);
01192             CPU_SET(childIndex, &mask);
01193             if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01194               LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01195               exit(-1);
01196             }
01197 #endif
01198           }
01199           break;
01200         } else {
01201           //this is the parent
01202           close(pipes[1]);
01203           close(sockets[1]);
01204         }
01205         if(value < 0) {
01206           LogError("ForkingChild") << "failed to create a child";
01207           exit(-1);
01208         }
01209         childrenIds.push_back(value);
01210         childrenSockets.push_back(sockets[0]);
01211         childrenPipes.push_back(pipes[0]);
01212       }
01213 
01214       if(childIndex < kMaxChildren) {
01215         jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01216         actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01217 
01218         boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01219         input_->doPostForkReacquireResources(receiver);
01220         schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01221         //NOTE: sources have to reset themselves by listening to the post fork message
01222         //rewindInput();
01223         return true;
01224       }
01225       jobReport->parentAfterFork(jobReportFile);
01226     }
01227 
01228     //this is the original, which is now the master for all the children
01229 
01230     //Need to wait for signals from the children or externally
01231     // To wait we must
01232     // 1) block the signals we want to wait on so we do not have a race condition
01233     // 2) check that we haven't already meet our ending criteria
01234     // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
01235     sigset_t blockingSigSet;
01236     sigset_t unblockingSigSet;
01237     sigset_t oldSigSet;
01238     pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01239     pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01240     sigaddset(&blockingSigSet, SIGCHLD);
01241     sigaddset(&blockingSigSet, SIGUSR2);
01242     sigaddset(&blockingSigSet, SIGINT);
01243     sigdelset(&unblockingSigSet, SIGCHLD);
01244     sigdelset(&unblockingSigSet, SIGUSR2);
01245     sigdelset(&unblockingSigSet, SIGINT);
01246     pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01247 
01248     // If there are too many fd's (unlikely, but possible) for select, denote this 
01249     // because the sender will fail.
01250     bool too_many_fds = false;
01251     if (pipes[1]+1 > FD_SETSIZE) {
01252       LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01253       too_many_fds = true;
01254     }
01255 
01256     //create a thread that sends the units of work to workers
01257     // we create it after all signals were blocked so that this
01258     // thread is never interupted by a signal
01259     MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01260     boost::thread senderThread(sender);
01261 
01262     if(not too_many_fds) {
01263       //NOTE: a child could have failed before we got here and even after this call
01264       // which is why the 'if' is conditional on continueAfterChildFailure_
01265       possiblyContinueAfterForkChildFailure();
01266       while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
01267         sigsuspend(&unblockingSigSet);
01268         possiblyContinueAfterForkChildFailure();
01269         LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01270       }
01271     }
01272     pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01273 
01274     LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01275     if(child_failed) {
01276       LogError("ForkingStopping") << "child failed";
01277     }
01278     if(shutdown_flag) {
01279       LogSystem("ForkingStopping") << "asked to shutdown";
01280     }
01281 
01282     if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01283       LogInfo("ForkingStopping") << "must stop children" << std::endl;
01284       for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01285           it != itEnd; ++it) {
01286         /* int result = */ kill(*it, SIGUSR2);
01287       }
01288       pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01289       while(num_children_done != kMaxChildren) {
01290         sigsuspend(&unblockingSigSet);
01291       }
01292       pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01293     }
01294     // The senderThread will notice the pipes die off, one by one.  Once all children are gone, it will exit.
01295     senderThread.join();
01296     if(child_failed && !continueAfterChildFailure_) {
01297       if (child_fail_signal) {
01298         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01299       } else if (child_fail_exit_status) {
01300         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01301       } else {
01302         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01303       }
01304     }
01305     if(too_many_fds) {
01306       throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01307     }
01308     return false;
01309   }
01310 
01311   void
01312   EventProcessor::connectSigs(EventProcessor* ep) {
01313     // When the FwkImpl signals are given, pass them to the
01314     // appropriate EventProcessor signals so that the outside world
01315     // can see the signal.
01316     actReg_->preProcessEventSignal_.connect(std::cref(ep->preProcessEventSignal_));
01317     actReg_->postProcessEventSignal_.connect(std::cref(ep->postProcessEventSignal_));
01318   }
01319 
01320   std::vector<ModuleDescription const*>
01321   EventProcessor::getAllModuleDescriptions() const {
01322     return schedule_->getAllModuleDescriptions();
01323   }
01324 
01325   int
01326   EventProcessor::totalEvents() const {
01327     return schedule_->totalEvents();
01328   }
01329 
01330   int
01331   EventProcessor::totalEventsPassed() const {
01332     return schedule_->totalEventsPassed();
01333   }
01334 
01335   int
01336   EventProcessor::totalEventsFailed() const {
01337     return schedule_->totalEventsFailed();
01338   }
01339 
01340   void
01341   EventProcessor::enableEndPaths(bool active) {
01342     schedule_->enableEndPaths(active);
01343   }
01344 
01345   bool
01346   EventProcessor::endPathsEnabled() const {
01347     return schedule_->endPathsEnabled();
01348   }
01349 
01350   void
01351   EventProcessor::getTriggerReport(TriggerReport& rep) const {
01352     schedule_->getTriggerReport(rep);
01353   }
01354 
01355   void
01356   EventProcessor::clearCounters() {
01357     schedule_->clearCounters();
01358   }
01359 
01360   char const* EventProcessor::currentStateName() const {
01361     return stateName(getState());
01362   }
01363 
01364   char const* EventProcessor::stateName(State s) const {
01365     return stateNames[s];
01366   }
01367 
01368   char const* EventProcessor::msgName(Msg m) const {
01369     return msgNames[m];
01370   }
01371 
01372   State EventProcessor::getState() const {
01373     return state_;
01374   }
01375 
01376   EventProcessor::StatusCode EventProcessor::statusAsync() const {
01377     // the thread will record exception/error status in the event processor
01378     // for us to look at and report here
01379     return last_rc_;
01380   }
01381 
01382   void
01383   EventProcessor::setRunNumber(RunNumber_t runNumber) {
01384     if(runNumber == 0) {
01385       runNumber = 1;
01386       LogWarning("Invalid Run")
01387         << "EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
01388         << "Run number was set to 1 instead\n";
01389     }
01390 
01391     // inside of beginJob there is a check to see if it has been called before
01392     beginJob();
01393     changeState(mSetRun);
01394 
01395     // interface not correct yet
01396     input_->setRunNumber(runNumber);
01397   }
01398 
01399   void
01400   EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
01401     // inside of beginJob there is a check to see if it has been called before
01402     beginJob();
01403     changeState(mSetRun);
01404 
01405     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
01406     //input_->declareRunNumber(runNumber);
01407   }
01408 
01409   EventProcessor::StatusCode
01410   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01411     bool rc = true;
01412     boost::xtime timeout;
01413 
01414 #if BOOST_VERSION >= 105000
01415     boost::xtime_get(&timeout, boost::TIME_UTC_);
01416 #else
01417     boost::xtime_get(&timeout, boost::TIME_UTC);
01418 #endif
01419     timeout.sec += timeout_seconds;
01420 
01421     // make sure to include a timeout here so we don't wait forever
01422     // I suspect there are still timing issues with thread startup
01423     // and the setting of the various control variables (stop_count, id_set)
01424     {
01425       boost::mutex::scoped_lock sl(stop_lock_);
01426 
01427       // look here - if runAsync not active, just return the last return code
01428       if(stop_count_ < 0) return last_rc_;
01429 
01430       if(timeout_seconds == 0) {
01431         while(stop_count_ == 0) stopper_.wait(sl);
01432       } else {
01433         while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01434       }
01435 
01436       if(rc == false) {
01437           // timeout occurred
01438           // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
01439           // this is a temporary hack until we get the input source
01440           // upgraded to allow blocking input sources to be unblocked
01441 
01442           // the next line is dangerous and causes all sorts of trouble
01443           if(id_set_) pthread_cancel(event_loop_id_);
01444 
01445           // we will not do anything yet
01446           LogWarning("timeout")
01447             << "An asynchronous request was made to shut down "
01448             << "the event loop "
01449             << "and the event loop did not shutdown after "
01450             << timeout_seconds << " seconds\n";
01451       } else {
01452           event_loop_->join();
01453           event_loop_.reset();
01454           id_set_ = false;
01455           stop_count_ = -1;
01456       }
01457     }
01458     return rc == false ? epTimedOut : last_rc_;
01459   }
01460 
01461   EventProcessor::StatusCode
01462   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01463     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01464     if(rc != epTimedOut) changeState(mCountComplete);
01465     else errorState();
01466     return rc;
01467   }
01468 
01469 
01470   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01471     changeState(mStopAsync);
01472     StatusCode rc = waitForAsyncCompletion(secs);
01473     if(rc != epTimedOut) changeState(mFinished);
01474     else errorState();
01475     return rc;
01476   }
01477 
01478   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01479     changeState(mShutdownAsync);
01480     StatusCode rc = waitForAsyncCompletion(secs);
01481     if(rc != epTimedOut) changeState(mFinished);
01482     else errorState();
01483     return rc;
01484   }
01485 
01486   void EventProcessor::errorState() {
01487     state_ = sError;
01488   }
01489 
01490   // next function irrelevant now
01491   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01492     // make sure to include a timeout here so we don't wait forever
01493     // I suspect there are still timing issues with thread startup
01494     // and the setting of the various control variables (stop_count, id_set)
01495     changeState(m);
01496     return waitForAsyncCompletion(60*2);
01497   }
01498 
01499   void EventProcessor::changeState(Msg msg) {
01500     // most likely need to serialize access to this routine
01501 
01502     boost::mutex::scoped_lock sl(state_lock_);
01503     State curr = state_;
01504     int rc;
01505     // found if(not end of table) and
01506     // (state == table.state && (msg == table.message || msg == any))
01507     for(rc = 0;
01508         table[rc].current != sInvalid &&
01509           (curr != table[rc].current ||
01510            (curr == table[rc].current &&
01511              msg != table[rc].message && table[rc].message != mAny));
01512         ++rc);
01513 
01514     if(table[rc].current == sInvalid)
01515       throw cms::Exception("BadState")
01516         << "A member function of EventProcessor has been called in an"
01517         << " inappropriate order.\n"
01518         << "Bad transition from " << stateName(curr) << " "
01519         << "using message " << msgName(msg) << "\n"
01520         << "No where to go from here.\n";
01521 
01522     FDEBUG(1) << "changeState: current=" << stateName(curr)
01523               << ", message=" << msgName(msg)
01524               << " -> new=" << stateName(table[rc].final) << "\n";
01525 
01526     state_ = table[rc].final;
01527   }
01528 
01529   void EventProcessor::runAsync() {
01530     beginJob();
01531     {
01532       boost::mutex::scoped_lock sl(stop_lock_);
01533       if(id_set_ == true) {
01534           std::string err("runAsync called while async event loop already running\n");
01535           LogError("FwkJob") << err;
01536           throw cms::Exception("BadState") << err;
01537       }
01538 
01539       changeState(mRunAsync);
01540 
01541       stop_count_ = 0;
01542       last_rc_ = epSuccess; // forget the last value!
01543       event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01544       boost::xtime timeout;
01545 #if BOOST_VERSION >= 105000
01546       boost::xtime_get(&timeout, boost::TIME_UTC_);
01547 #else
01548       boost::xtime_get(&timeout, boost::TIME_UTC);
01549 #endif
01550       timeout.sec += 60; // 60 seconds to start!!!!
01551       if(starter_.timed_wait(sl, timeout) == false) {
01552           // yikes - the thread did not start
01553           throw cms::Exception("BadState")
01554             << "Async run thread did not start in 60 seconds\n";
01555       }
01556     }
01557   }
01558 
01559   void EventProcessor::asyncRun(EventProcessor* me) {
01560     // set up signals to allow for interruptions
01561     // ignore all other signals
01562     // make sure no exceptions escape out
01563 
01564     // temporary hack until we modify the input source to allow
01565     // wakeup calls from other threads.  This mimics the solution
01566     // in EventFilter/Processor, which I do not like.
01567     // allowing cancels means that the thread just disappears at
01568     // certain points.  This is bad for C++ stack variables.
01569     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01570     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
01571     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01572     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01573 
01574     {
01575       boost::mutex::scoped_lock sl(me->stop_lock_);
01576       me->event_loop_id_ = pthread_self();
01577       me->id_set_ = true;
01578       me->starter_.notify_all();
01579     }
01580 
01581     Status rc = epException;
01582     FDEBUG(2) << "asyncRun starting ......................\n";
01583 
01584     try {
01585       bool onlineStateTransitions = true;
01586       rc = me->runToCompletion(onlineStateTransitions);
01587     }
01588     catch (cms::Exception& e) {
01589       LogError("FwkJob") << "cms::Exception caught in "
01590                          << "EventProcessor::asyncRun"
01591                          << "\n"
01592                          << e.explainSelf();
01593       me->last_error_text_ = e.explainSelf();
01594     }
01595     catch (std::exception& e) {
01596       LogError("FwkJob") << "Standard library exception caught in "
01597                          << "EventProcessor::asyncRun"
01598                          << "\n"
01599                          << e.what();
01600       me->last_error_text_ = e.what();
01601     }
01602     catch (...) {
01603       LogError("FwkJob") << "Unknown exception caught in "
01604                          << "EventProcessor::asyncRun"
01605                          << "\n";
01606       me->last_error_text_ = "Unknown exception caught";
01607       rc = epOther;
01608     }
01609 
01610     me->last_rc_ = rc;
01611 
01612     {
01613       // notify anyone waiting for exit that we are doing so now
01614       boost::mutex::scoped_lock sl(me->stop_lock_);
01615       ++me->stop_count_;
01616       me->stopper_.notify_all();
01617     }
01618     FDEBUG(2) << "asyncRun ending ......................\n";
01619   }
01620 
01621   std::auto_ptr<statemachine::Machine>
01622   EventProcessor::createStateMachine() {
01623     statemachine::FileMode fileMode;
01624     if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01625     else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01626     else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01627     else {
01628       throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01629       << fileMode_ << ".\n"
01630       << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01631     }
01632     
01633     statemachine::EmptyRunLumiMode emptyRunLumiMode;
01634     if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01635     else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01636     else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01637     else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01638     else {
01639       throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01640       << emptyRunLumiMode_ << ".\n"
01641       << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01642     }
01643     
01644     std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
01645                                              fileMode,
01646                                              emptyRunLumiMode));
01647     
01648     machine->initiate();
01649     return machine;
01650   }
01651 
01652 
01653   EventProcessor::StatusCode
01654   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01655 
01656     StateSentry toerror(this);
01657 
01658     StatusCode returnCode=epSuccess;
01659     std::auto_ptr<statemachine::Machine> machine;
01660     {
01661       beginJob(); //make sure this was called
01662       
01663       if(!onlineStateTransitions) changeState(mRunCount);
01664       
01665       //StatusCode returnCode = epSuccess;
01666       stateMachineWasInErrorState_ = false;
01667       
01668       // make the services available
01669       ServiceRegistry::Operate operate(serviceToken_);
01670 
01671       machine = createStateMachine();
01672       try {
01673         try {
01674           
01675           InputSource::ItemType itemType;
01676           
01677           while(true) {
01678             
01679             bool more = true;
01680             if(numberOfForkedChildren_ > 0) {
01681               size_t size = preg_->size();
01682               more = input_->skipForForking();
01683               if(more) {
01684                 if(size < preg_->size()) {
01685                   principalCache_.adjustIndexesAfterProductRegistryAddition();
01686                 }
01687                 principalCache_.adjustEventToNewProductRegistry(preg_);
01688               }
01689             } 
01690             itemType = (more ? input_->nextItemType() : InputSource::IsStop);
01691             
01692             FDEBUG(1) << "itemType = " << itemType << "\n";
01693             
01694             // These are used for asynchronous running only and
01695             // and are checking to see if stopAsync or shutdownAsync
01696             // were called from another thread.  In the future, we
01697             // may need to do something better than polling the state.
01698             // With the current code this is the simplest thing and
01699             // it should always work.  If the interaction between
01700             // threads becomes more complex this may cause problems.
01701             if(state_ == sStopping) {
01702               FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01703               forceLooperToEnd_ = true;
01704               machine->process_event(statemachine::Stop());
01705               forceLooperToEnd_ = false;
01706               break;
01707             }
01708             else if(state_ == sShuttingDown) {
01709               FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01710               forceLooperToEnd_ = true;
01711               machine->process_event(statemachine::Stop());
01712               forceLooperToEnd_ = false;
01713               break;
01714             }
01715             
01716             // Look for a shutdown signal
01717             {
01718               boost::mutex::scoped_lock sl(usr2_lock);
01719               if(shutdown_flag) {
01720                 changeState(mShutdownSignal);
01721                 returnCode = epSignal;
01722                 forceLooperToEnd_ = true;
01723                 machine->process_event(statemachine::Stop());
01724                 forceLooperToEnd_ = false;
01725                 break;
01726               }
01727             }
01728             
01729             if(itemType == InputSource::IsStop) {
01730               machine->process_event(statemachine::Stop());
01731             }
01732             else if(itemType == InputSource::IsFile) {
01733               machine->process_event(statemachine::File());
01734             }
01735             else if(itemType == InputSource::IsRun) {
01736               machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01737             }
01738             else if(itemType == InputSource::IsLumi) {
01739               machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
01740             }
01741             else if(itemType == InputSource::IsEvent) {
01742               machine->process_event(statemachine::Event());
01743             }
01744             // This should be impossible
01745             else {
01746               throw Exception(errors::LogicError)
01747               << "Unknown next item type passed to EventProcessor\n"
01748               << "Please report this error to the Framework group\n";
01749             }
01750             
01751             if(machine->terminated()) {
01752               changeState(mInputExhausted);
01753               break;
01754             }
01755           }  // End of loop over state machine events
01756         } // Try block
01757         catch (cms::Exception& e) { throw; }
01758         catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01759         catch (std::exception& e) { convertException::stdToEDM(e); }
01760         catch(std::string& s) { convertException::stringToEDM(s); }
01761         catch(char const* c) { convertException::charPtrToEDM(c); }
01762         catch (...) { convertException::unknownToEDM(); }
01763       } // Try block
01764       // Some comments on exception handling related to the boost state machine:
01765       //
01766       // Some states used in the machine are special because they
01767       // perform actions while the machine is being terminated, actions
01768       // such as close files, call endRun, call endLumi etc ...  Each of these
01769       // states has two functions that perform these actions.  The functions
01770       // are almost identical.  The major difference is that one version
01771       // catches all exceptions and the other lets exceptions pass through.
01772       // The destructor catches them and the other function named "exit" lets
01773       // them pass through.  On a normal termination, boost will always call
01774       // "exit" and then the state destructor.  In our state classes, the
01775       // the destructors do nothing if the exit function already took
01776       // care of things.  Here's the interesting part.  When boost is
01777       // handling an exception the "exit" function is not called (a boost
01778       // feature).
01779       //
01780       // If an exception occurs while the boost machine is in control
01781       // (which usually means inside a process_event call), then
01782       // the boost state machine destroys its states and "terminates" itself.
01783       // This already done before we hit the catch blocks below. In this case
01784       // the call to terminateMachine below only destroys an already
01785       // terminated state machine.  Because exit is not called, the state destructors
01786       // handle cleaning up lumis, runs, and files.  The destructors swallow
01787       // all exceptions and only pass through the exceptions messages, which
01788       // are tacked onto the original exception below.
01789       //
01790       // If an exception occurs when the boost state machine is not
01791       // in control (outside the process_event functions), then boost
01792       // cannot destroy its own states.  The terminateMachine function
01793       // below takes care of that.  The flag "alreadyHandlingException"
01794       // is set true so that the state exit functions do nothing (and
01795       // cannot throw more exceptions while handling the first).  Then the
01796       // state destructors take care of this because exit did nothing.
01797       //
01798       // In both cases above, the EventProcessor::endOfLoop function is
01799       // not called because it can throw exceptions.
01800       //
01801       // One tricky aspect of the state machine is that things that can
01802       // throw should not be invoked by the state machine while another
01803       // exception is being handled.
01804       // Another tricky aspect is that it appears to be important to
01805       // terminate the state machine before invoking its destructor.
01806       // We've seen crashes that are not understood when that is not
01807       // done.  Maintainers of this code should be careful about this.
01808       
01809       catch (cms::Exception & e) {
01810         alreadyHandlingException_ = true;
01811         terminateMachine(machine);
01812         alreadyHandlingException_ = false;
01813         if (!exceptionMessageLumis_.empty()) {
01814           e.addAdditionalInfo(exceptionMessageLumis_);
01815           if (e.alreadyPrinted()) {
01816             LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01817           }
01818         }
01819         if (!exceptionMessageRuns_.empty()) {
01820           e.addAdditionalInfo(exceptionMessageRuns_);
01821           if (e.alreadyPrinted()) {
01822             LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01823           }
01824         }
01825         if (!exceptionMessageFiles_.empty()) {
01826           e.addAdditionalInfo(exceptionMessageFiles_);
01827           if (e.alreadyPrinted()) {
01828             LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01829           }
01830         }
01831         throw;
01832       }
01833       
01834       if(machine->terminated()) {
01835         FDEBUG(1) << "The state machine reports it has been terminated\n";
01836         machine.reset();
01837       }
01838       
01839       if(!onlineStateTransitions) changeState(mFinished);
01840       
01841       if(stateMachineWasInErrorState_) {
01842         throw cms::Exception("BadState")
01843         << "The boost state machine in the EventProcessor exited after\n"
01844         << "entering the Error state.\n";
01845       }
01846       
01847     }
01848     if(machine.get() != 0) {
01849       terminateMachine(machine);
01850       throw Exception(errors::LogicError)
01851         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01852         << "Please report this error to the Framework group\n";
01853     }
01854 
01855     toerror.succeeded();
01856 
01857     return returnCode;
01858   }
01859 
01860   void EventProcessor::readFile() {
01861     FDEBUG(1) << " \treadFile\n";
01862     size_t size = preg_->size();
01863     fb_ = input_->readFile();
01864     if(size < preg_->size()) {
01865       principalCache_.adjustIndexesAfterProductRegistryAddition();
01866     }
01867     principalCache_.adjustEventToNewProductRegistry(preg_);
01868     if(numberOfForkedChildren_ > 0) {
01869         fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01870     }
01871   }
01872 
01873   void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
01874     if (fb_.get() != nullptr) {
01875       input_->closeFile(fb_.get(), cleaningUpAfterException);
01876     }
01877     FDEBUG(1) << "\tcloseInputFile\n";
01878   }
01879 
01880   void EventProcessor::openOutputFiles() {
01881     if (fb_.get() != nullptr) {
01882       schedule_->openOutputFiles(*fb_);
01883       if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01884     }
01885     FDEBUG(1) << "\topenOutputFiles\n";
01886   }
01887 
01888   void EventProcessor::closeOutputFiles() {
01889     if (fb_.get() != nullptr) {
01890       schedule_->closeOutputFiles();
01891       if(hasSubProcess()) subProcess_->closeOutputFiles();
01892     }
01893     FDEBUG(1) << "\tcloseOutputFiles\n";
01894   }
01895 
01896   void EventProcessor::respondToOpenInputFile() {
01897     if (fb_.get() != nullptr) {
01898       schedule_->respondToOpenInputFile(*fb_);
01899       if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01900     }
01901     FDEBUG(1) << "\trespondToOpenInputFile\n";
01902   }
01903 
01904   void EventProcessor::respondToCloseInputFile() {
01905     if (fb_.get() != nullptr) {
01906       schedule_->respondToCloseInputFile(*fb_);
01907       if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01908     }
01909     FDEBUG(1) << "\trespondToCloseInputFile\n";
01910   }
01911 
01912   void EventProcessor::respondToOpenOutputFiles() {
01913     if (fb_.get() != nullptr) {
01914       schedule_->respondToOpenOutputFiles(*fb_);
01915       if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01916     }
01917     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01918   }
01919 
01920   void EventProcessor::respondToCloseOutputFiles() {
01921     if (fb_.get() != nullptr) {
01922       schedule_->respondToCloseOutputFiles(*fb_);
01923       if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01924     }
01925     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01926   }
01927 
01928   void EventProcessor::startingNewLoop() {
01929     shouldWeStop_ = false;
01930     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
01931     // until after we've called beginOfJob
01932     if(looper_ && looperBeginJobRun_) {
01933       looper_->doStartingNewLoop();
01934     }
01935     FDEBUG(1) << "\tstartingNewLoop\n";
01936   }
01937 
01938   bool EventProcessor::endOfLoop() {
01939     if(looper_) {
01940       ModuleChanger changer(schedule_.get());
01941       looper_->setModuleChanger(&changer);
01942       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01943       looper_->setModuleChanger(nullptr);
01944       if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01945       else return false;
01946     }
01947     FDEBUG(1) << "\tendOfLoop\n";
01948     return true;
01949   }
01950 
01951   void EventProcessor::rewindInput() {
01952     input_->repeat();
01953     input_->rewind();
01954     FDEBUG(1) << "\trewind\n";
01955   }
01956 
01957   void EventProcessor::prepareForNextLoop() {
01958     looper_->prepareForNextLoop(esp_.get());
01959     FDEBUG(1) << "\tprepareForNextLoop\n";
01960   }
01961 
01962   bool EventProcessor::shouldWeCloseOutput() const {
01963     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01964     return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01965   }
01966 
01967   void EventProcessor::doErrorStuff() {
01968     FDEBUG(1) << "\tdoErrorStuff\n";
01969     LogError("StateMachine")
01970       << "The EventProcessor state machine encountered an unexpected event\n"
01971       << "and went to the error state\n"
01972       << "Will attempt to terminate processing normally\n"
01973       << "(IF using the looper the next loop will be attempted)\n"
01974       << "This likely indicates a bug in an input module or corrupted input or both\n";
01975     stateMachineWasInErrorState_ = true;
01976   }
01977 
01978   void EventProcessor::beginRun(statemachine::Run const& run) {
01979     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01980     input_->doBeginRun(runPrincipal);
01981     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01982                     runPrincipal.beginTime());
01983     if(forceESCacheClearOnNewRun_){
01984       espController_->forceCacheClear();
01985     }
01986     espController_->eventSetupForInstance(ts);
01987     EventSetup const& es = esp_->eventSetup();
01988     if(looper_ && looperBeginJobRun_== false) {
01989       looper_->copyInfo(ScheduleInfo(schedule_.get()));
01990       looper_->beginOfJob(es);
01991       looperBeginJobRun_ = true;
01992       looper_->doStartingNewLoop();
01993     }
01994     {
01995       typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01996       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01997       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01998       if(hasSubProcess()) {
01999         subProcess_->doBeginRun(runPrincipal, ts);
02000       }
02001       sentry.allowThrow();
02002     }
02003     FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
02004     if(looper_) {
02005       looper_->doBeginRun(runPrincipal, es);
02006     }
02007   }
02008 
02009   void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
02010     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02011     input_->doEndRun(runPrincipal, cleaningUpAfterException);
02012     IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
02013                     runPrincipal.endTime());
02014     espController_->eventSetupForInstance(ts);
02015     EventSetup const& es = esp_->eventSetup();
02016     {
02017       typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02018       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02019       schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
02020       if(hasSubProcess()) {
02021         subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
02022       }
02023       sentry.allowThrow();
02024     }
02025     FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02026     if(looper_) {
02027       looper_->doEndRun(runPrincipal, es);
02028     }
02029   }
02030 
02031   void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02032     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02033     input_->doBeginLumi(lumiPrincipal);
02034 
02035     Service<RandomNumberGenerator> rng;
02036     if(rng.isAvailable()) {
02037       LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02038       rng->preBeginLumi(lb);
02039     }
02040 
02041     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
02042     // lumi blocks know their start and end times why not also start and end events?
02043     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02044     espController_->eventSetupForInstance(ts);
02045     EventSetup const& es = esp_->eventSetup();
02046     {
02047       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02048       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02049       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02050       if(hasSubProcess()) {
02051         subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02052       }
02053       sentry.allowThrow();
02054     }
02055     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02056     if(looper_) {
02057       looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02058     }
02059   }
02060 
02061   void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
02062     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02063     input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
02064     //NOTE: Using the max event number for the end of a lumi block is a bad idea
02065     // lumi blocks know their start and end times why not also start and end events?
02066     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02067                     lumiPrincipal.endTime());
02068     espController_->eventSetupForInstance(ts);
02069     EventSetup const& es = esp_->eventSetup();
02070     {
02071       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02072       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02073       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
02074       if(hasSubProcess()) {
02075         subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
02076       }
02077       sentry.allowThrow();
02078     }
02079     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02080     if(looper_) {
02081       looper_->doEndLuminosityBlock(lumiPrincipal, es);
02082     }
02083   }
02084 
02085   statemachine::Run EventProcessor::readAndCacheRun() {
02086     if (principalCache_.hasRunPrincipal()) {
02087       throw edm::Exception(edm::errors::LogicError)
02088         << "EventProcessor::readAndCacheRun\n"
02089         << "Illegal attempt to insert run into cache\n"
02090         << "Contact a Framework Developer\n";
02091     }
02092     principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
02093     return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02094   }
02095 
02096   statemachine::Run EventProcessor::readAndMergeRun() {
02097     principalCache_.merge(input_->runAuxiliary(), preg_);
02098     input_->readAndMergeRun(principalCache_.runPrincipalPtr());
02099     return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02100   }
02101 
02102   int EventProcessor::readAndCacheLumi() {
02103     if (principalCache_.hasLumiPrincipal()) {
02104       throw edm::Exception(edm::errors::LogicError)
02105         << "EventProcessor::readAndCacheRun\n"
02106         << "Illegal attempt to insert lumi into cache\n"
02107         << "Contact a Framework Developer\n";
02108     }
02109     if (!principalCache_.hasRunPrincipal()) {
02110       throw edm::Exception(edm::errors::LogicError)
02111         << "EventProcessor::readAndCacheRun\n"
02112         << "Illegal attempt to insert lumi into cache\n"
02113         << "Run is invalid\n"
02114         << "Contact a Framework Developer\n";
02115     }
02116     principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
02117     principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
02118     return input_->luminosityBlock();
02119   }
02120 
02121   int EventProcessor::readAndMergeLumi() {
02122     principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
02123     input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
02124     return input_->luminosityBlock();
02125   }
02126 
02127   void EventProcessor::writeRun(statemachine::Run const& run) {
02128     schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02129     if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02130     FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02131   }
02132 
02133   void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02134     principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02135     if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02136     FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02137   }
02138 
02139   void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02140     schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02141     if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02142     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02143   }
02144 
02145   void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02146     principalCache_.deleteLumi(phid, run, lumi);
02147     if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02148     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02149   }
02150 
02151   void EventProcessor::readAndProcessEvent() {
02152     EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
02153     FDEBUG(1) << "\treadEvent\n";
02154     assert(pep != 0);
02155     pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
02156     assert(pep->luminosityBlockPrincipalPtrValid());
02157     assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
02158     assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
02159 
02160     IOVSyncValue ts(pep->id(), pep->time());
02161     espController_->eventSetupForInstance(ts);
02162     EventSetup const& es = esp_->eventSetup();
02163     {
02164       typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02165       ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02166       schedule_->processOneOccurrence<Traits>(*pep, es);
02167       if(hasSubProcess()) {
02168         subProcess_->doEvent(*pep, ts);
02169       }
02170       sentry.allowThrow();
02171     }
02172 
02173     if(looper_) {
02174       bool randomAccess = input_->randomAccess();
02175       ProcessingController::ForwardState forwardState = input_->forwardState();
02176       ProcessingController::ReverseState reverseState = input_->reverseState();
02177       ProcessingController pc(forwardState, reverseState, randomAccess);
02178 
02179       EDLooperBase::Status status = EDLooperBase::kContinue;
02180       do {
02181         status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02182 
02183         bool succeeded = true;
02184         if(randomAccess) {
02185           if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02186             input_->skipEvents(-2);
02187           }
02188           else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02189             succeeded = input_->goToEvent(pc.specifiedEventTransition());
02190           }
02191         }
02192         pc.setLastOperationSucceeded(succeeded);
02193       } while(!pc.lastOperationSucceeded());
02194       if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02195 
02196     }
02197 
02198     FDEBUG(1) << "\tprocessEvent\n";
02199     pep->clearEventPrincipal();
02200   }
02201 
02202   bool EventProcessor::shouldWeStop() const {
02203     FDEBUG(1) << "\tshouldWeStop\n";
02204     if(shouldWeStop_) return true;
02205     return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02206   }
02207 
02208   void EventProcessor::setExceptionMessageFiles(std::string& message) {
02209     exceptionMessageFiles_ = message;
02210   }
02211 
02212   void EventProcessor::setExceptionMessageRuns(std::string& message) {
02213     exceptionMessageRuns_ = message;
02214   }
02215 
02216   void EventProcessor::setExceptionMessageLumis(std::string& message) {
02217     exceptionMessageLumis_ = message;
02218   }
02219 
02220   bool EventProcessor::alreadyHandlingException() const {
02221     return alreadyHandlingException_;
02222   }
02223 
02224   void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
02225     if(iMachine.get() != 0) {
02226       if(!iMachine->terminated()) {
02227         forceLooperToEnd_ = true;
02228         iMachine->process_event(statemachine::Stop());
02229         forceLooperToEnd_ = false;
02230       }
02231       else {
02232         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
02233       }
02234       if(iMachine->terminated()) {
02235         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02236       }
02237       iMachine.reset();
02238     }
02239   }
02240 }