CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_1/src/FWCore/Framework/src/EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011 
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036 
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038 
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046 
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049 
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057 
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060 
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063 
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069 
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072 
00073 //Used for forking
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080 
00081 //Used for CPU affinity
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085 
00086 namespace edm {
00087 
00088   namespace event_processor {
00089 
00090     class StateSentry {
00091     public:
00092       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00093       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00094       void succeeded() {success_ = true;}
00095 
00096     private:
00097       EventProcessor* ep_;
00098       bool success_;
00099     };
00100   }
00101 
00102   using namespace event_processor;
00103 
00104   namespace {
00105     template <typename T>
00106     class ScheduleSignalSentry {
00107     public:
00108       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00109            a_(a), principal_(principal), es_(es) {
00110         if (a_) T::preScheduleSignal(a_, principal_);
00111       }
00112       ~ScheduleSignalSentry() {
00113         if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00114       }
00115 
00116     private:
00117       // We own none of these resources.
00118       ActivityRegistry* a_;
00119       typename T::MyPrincipal* principal_;
00120       EventSetup const* es_;
00121     };
00122   }
00123 
00124   namespace {
00125 
00126     // the next two tables must be kept in sync with the state and
00127     // message enums from the header
00128 
00129     char const* stateNames[] = {
00130       "Init",
00131       "JobReady",
00132       "RunGiven",
00133       "Running",
00134       "Stopping",
00135       "ShuttingDown",
00136       "Done",
00137       "JobEnded",
00138       "Error",
00139       "ErrorEnded",
00140       "End",
00141       "Invalid"
00142     };
00143 
00144     char const* msgNames[] = {
00145       "SetRun",
00146       "Skip",
00147       "RunAsync",
00148       "Run(ID)",
00149       "Run(count)",
00150       "BeginJob",
00151       "StopAsync",
00152       "ShutdownAsync",
00153       "EndJob",
00154       "CountComplete",
00155       "InputExhausted",
00156       "StopSignal",
00157       "ShutdownSignal",
00158       "Finished",
00159       "Any",
00160       "dtor",
00161       "Exception",
00162       "Rewind"
00163     };
00164   }
00165     // IMPORTANT NOTE:
00166     // the mAny messages are special, they must appear last in the
00167     // table if multiple entries for a CurrentState are present.
00168     // the changeState function does not use the mAny yet!!!
00169 
00170     struct TransEntry {
00171       State current;
00172       Msg   message;
00173       State final;
00174     };
00175 
00176     // we should use this information to initialize a two dimensional
00177     // table of t[CurrentState][Message] = FinalState
00178 
00179     /*
00180       the way this is current written, the async run can thread function
00181       can return in the "JobReady" state - but not yet cleaned up.  The
00182       problem is that only when stop/shutdown async is called is the
00183       thread cleaned up. But the stop/shudown async functions attempt
00184       first to change the state using messages that are not valid in
00185       "JobReady" state.
00186 
00187       I think most of the problems can be solved by using two states
00188       for "running": RunningS and RunningA (sync/async). The problems
00189       seems to be the all the transitions out of running for both
00190       modes of operation.  The other solution might be to only go to
00191       "Stopping" from Running, and use the return code from "run_p" to
00192       set the final state.  If this is used, then in the sync mode the
00193       "Stopping" state would be momentary.
00194 
00195      */
00196 
00197     TransEntry table[] = {
00198     // CurrentState   Message         FinalState
00199     // -----------------------------------------
00200       { sInit,          mException,      sError },
00201       { sInit,          mBeginJob,       sJobReady },
00202       { sJobReady,      mException,      sError },
00203       { sJobReady,      mSetRun,         sRunGiven },
00204       { sJobReady,      mInputRewind,    sRunning },
00205       { sJobReady,      mSkip,           sRunning },
00206       { sJobReady,      mRunID,          sRunning },
00207       { sJobReady,      mRunCount,       sRunning },
00208       { sJobReady,      mEndJob,         sJobEnded },
00209       { sJobReady,      mBeginJob,       sJobReady },
00210       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00211 
00212       { sJobReady,      mStopAsync,      sJobReady },
00213       { sJobReady,      mCountComplete,  sJobReady },
00214       { sJobReady,      mFinished,       sJobReady },
00215 
00216       { sRunGiven,      mException,      sError },
00217       { sRunGiven,      mRunAsync,       sRunning },
00218       { sRunGiven,      mBeginJob,       sRunGiven },
00219       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00220       { sRunGiven,      mStopAsync,      sStopping },
00221       { sRunning,       mException,      sError },
00222       { sRunning,       mStopAsync,      sStopping },
00223       { sRunning,       mShutdownAsync,  sShuttingDown },
00224       { sRunning,       mShutdownSignal, sShuttingDown },
00225       { sRunning,       mCountComplete,  sStopping }, // sJobReady
00226       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00227 
00228       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00229       { sStopping,      mException,      sError },
00230       { sStopping,      mFinished,       sJobReady },
00231       { sStopping,      mCountComplete,  sJobReady },
00232       { sStopping,      mShutdownSignal, sShuttingDown },
00233       { sStopping,      mStopAsync,      sStopping },     // stay
00234       { sStopping,      mInputExhausted, sStopping },     // stay
00235       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00236       { sShuttingDown,  mException,      sError },
00237       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00238       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00239       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00240       { sShuttingDown,  mFinished,       sDone },
00241       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00242       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00243       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00244       { sDone,          mEndJob,         sJobEnded },
00245       { sDone,          mException,      sError },
00246       { sJobEnded,      mDtor,           sEnd },
00247       { sJobEnded,      mException,      sError },
00248       { sError,         mEndJob,         sError },   // funny one here
00249       { sError,         mDtor,           sError },   // funny one here
00250       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00251       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00252       { sInvalid,       mAny,            sInvalid }
00253     };
00254 
00255 
00256     // Note: many of the messages generate the mBeginJob message first
00257     //  mRunID, mRunCount, mSetRun
00258 
00259   // ---------------------------------------------------------------
00260   boost::shared_ptr<InputSource>
00261   makeInput(ParameterSet& params,
00262             CommonParams const& common,
00263             ProductRegistry& preg,
00264             boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
00265             boost::shared_ptr<ActivityRegistry> areg,
00266             boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00267     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00268     if(main_input == 0) {
00269       throw Exception(errors::Configuration)
00270         << "There must be exactly one source in the configuration.\n"
00271         << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00272     }
00273 
00274     std::string modtype(main_input->getParameter<std::string>("@module_type"));
00275 
00276     std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00277       ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00278     ConfigurationDescriptions descriptions(filler->baseType());
00279     filler->fill(descriptions);
00280 
00281     try {
00282       try {
00283         descriptions.validate(*main_input, std::string("source"));
00284       }
00285       catch (cms::Exception& e) { throw; }
00286       catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00287       catch (std::exception& e) { convertException::stdToEDM(e); }
00288       catch (std::string& s) { convertException::stringToEDM(s); }
00289       catch (char const* c) { convertException::charPtrToEDM(c); }
00290       catch (...) { convertException::unknownToEDM(); }
00291     }
00292     catch (cms::Exception & iException) {
00293       std::ostringstream ost;
00294       ost << "Validating configuration of input source of type " << modtype;
00295       iException.addContext(ost.str());
00296       throw;
00297     }
00298 
00299     main_input->registerIt();
00300 
00301     // Fill in "ModuleDescription", in case the input source produces
00302     // any EDproducts, which would be registered in the ProductRegistry.
00303     // Also fill in the process history item for this process.
00304     // There is no module label for the unnamed input source, so
00305     // just use "source".
00306     // Only the tracked parameters belong in the process configuration.
00307     ModuleDescription md(main_input->id(),
00308                          main_input->getParameter<std::string>("@module_type"),
00309                          "source",
00310                          processConfiguration.get());
00311 
00312     InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_);
00313     areg->preSourceConstructionSignal_(md);
00314     boost::shared_ptr<InputSource> input;
00315     try {
00316       try {
00317         input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00318       }
00319       catch (cms::Exception& e) { throw; }
00320       catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00321       catch (std::exception& e) { convertException::stdToEDM(e); }
00322       catch (std::string& s) { convertException::stringToEDM(s); }
00323       catch (char const* c) { convertException::charPtrToEDM(c); }
00324       catch (...) { convertException::unknownToEDM(); }
00325     }
00326     catch (cms::Exception& iException) {
00327       areg->postSourceConstructionSignal_(md);
00328       std::ostringstream ost;
00329       ost << "Constructing input source of type " << modtype;
00330       iException.addContext(ost.str());
00331       throw;
00332     }
00333     areg->postSourceConstructionSignal_(md);
00334     return input;
00335   }
00336 
00337   // ---------------------------------------------------------------
00338   boost::shared_ptr<EDLooperBase>
00339   fillLooper(eventsetup::EventSetupsController& esController,
00340              eventsetup::EventSetupProvider& cp,
00341                          ParameterSet& params) {
00342     boost::shared_ptr<EDLooperBase> vLooper;
00343 
00344     std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00345 
00346     if(loopers.size() == 0) {
00347        return vLooper;
00348     }
00349 
00350     assert(1 == loopers.size());
00351 
00352     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00353         itName != itNameEnd;
00354         ++itName) {
00355 
00356       ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00357       providerPSet->registerIt();
00358       vLooper = eventsetup::LooperFactory::get()->addTo(esController,
00359                                                         cp,
00360                                                         *providerPSet);
00361       }
00362       return vLooper;
00363 
00364   }
00365 
00366   // ---------------------------------------------------------------
00367   EventProcessor::EventProcessor(std::string const& config,
00368                                 ServiceToken const& iToken,
00369                                 serviceregistry::ServiceLegacy iLegacy,
00370                                 std::vector<std::string> const& defaultServices,
00371                                 std::vector<std::string> const& forcedServices) :
00372     preProcessEventSignal_(),
00373     postProcessEventSignal_(),
00374     actReg_(),
00375     preg_(),
00376     branchIDListHelper_(),
00377     serviceToken_(),
00378     input_(),
00379     espController_(new eventsetup::EventSetupsController),
00380     esp_(),
00381     act_table_(),
00382     processConfiguration_(),
00383     schedule_(),
00384     subProcess_(),
00385     historyAppender_(new HistoryAppender),
00386     state_(sInit),
00387     event_loop_(),
00388     state_lock_(),
00389     stop_lock_(),
00390     stopper_(),
00391     starter_(),
00392     stop_count_(-1),
00393     last_rc_(epSuccess),
00394     last_error_text_(),
00395     id_set_(false),
00396     event_loop_id_(),
00397     my_sig_num_(getSigNum()),
00398     fb_(),
00399     looper_(),
00400     principalCache_(),
00401     shouldWeStop_(false),
00402     stateMachineWasInErrorState_(false),
00403     fileMode_(),
00404     emptyRunLumiMode_(),
00405     exceptionMessageFiles_(),
00406     exceptionMessageRuns_(),
00407     exceptionMessageLumis_(),
00408     alreadyHandlingException_(false),
00409     forceLooperToEnd_(false),
00410     looperBeginJobRun_(false),
00411     forceESCacheClearOnNewRun_(false),
00412     numberOfForkedChildren_(0),
00413     numberOfSequentialEventsPerChild_(1),
00414     setCpuAffinity_(false),
00415     eventSetupDataToExcludeFromPrefetching_() {
00416     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00417     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00418     processDesc->addServices(defaultServices, forcedServices);
00419     init(processDesc, iToken, iLegacy);
00420   }
00421 
00422   EventProcessor::EventProcessor(std::string const& config,
00423                                 std::vector<std::string> const& defaultServices,
00424                                 std::vector<std::string> const& forcedServices) :
00425     preProcessEventSignal_(),
00426     postProcessEventSignal_(),
00427     actReg_(),
00428     preg_(),
00429     branchIDListHelper_(),
00430     serviceToken_(),
00431     input_(),
00432     espController_(new eventsetup::EventSetupsController),
00433     esp_(),
00434     act_table_(),
00435     processConfiguration_(),
00436     schedule_(),
00437     subProcess_(),
00438     historyAppender_(new HistoryAppender),
00439     state_(sInit),
00440     event_loop_(),
00441     state_lock_(),
00442     stop_lock_(),
00443     stopper_(),
00444     starter_(),
00445     stop_count_(-1),
00446     last_rc_(epSuccess),
00447     last_error_text_(),
00448     id_set_(false),
00449     event_loop_id_(),
00450     my_sig_num_(getSigNum()),
00451     fb_(),
00452     looper_(),
00453     principalCache_(),
00454     shouldWeStop_(false),
00455     stateMachineWasInErrorState_(false),
00456     fileMode_(),
00457     emptyRunLumiMode_(),
00458     exceptionMessageFiles_(),
00459     exceptionMessageRuns_(),
00460     exceptionMessageLumis_(),
00461     alreadyHandlingException_(false),
00462     forceLooperToEnd_(false),
00463     looperBeginJobRun_(false),
00464     forceESCacheClearOnNewRun_(false),
00465     numberOfForkedChildren_(0),
00466     numberOfSequentialEventsPerChild_(1),
00467     setCpuAffinity_(false),
00468     eventSetupDataToExcludeFromPrefetching_() {
00469     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00470     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00471     processDesc->addServices(defaultServices, forcedServices);
00472     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00473   }
00474 
00475   EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00476                  ServiceToken const& token,
00477                  serviceregistry::ServiceLegacy legacy) :
00478     preProcessEventSignal_(),
00479     postProcessEventSignal_(),
00480     actReg_(),
00481     preg_(),
00482     branchIDListHelper_(),
00483     serviceToken_(),
00484     input_(),
00485     espController_(new eventsetup::EventSetupsController),
00486     esp_(),
00487     act_table_(),
00488     processConfiguration_(),
00489     schedule_(),
00490     subProcess_(),
00491     historyAppender_(new HistoryAppender),
00492     state_(sInit),
00493     event_loop_(),
00494     state_lock_(),
00495     stop_lock_(),
00496     stopper_(),
00497     starter_(),
00498     stop_count_(-1),
00499     last_rc_(epSuccess),
00500     last_error_text_(),
00501     id_set_(false),
00502     event_loop_id_(),
00503     my_sig_num_(getSigNum()),
00504     fb_(),
00505     looper_(),
00506     principalCache_(),
00507     shouldWeStop_(false),
00508     stateMachineWasInErrorState_(false),
00509     fileMode_(),
00510     emptyRunLumiMode_(),
00511     exceptionMessageFiles_(),
00512     exceptionMessageRuns_(),
00513     exceptionMessageLumis_(),
00514     alreadyHandlingException_(false),
00515     forceLooperToEnd_(false),
00516     looperBeginJobRun_(false),
00517     forceESCacheClearOnNewRun_(false),
00518     numberOfForkedChildren_(0),
00519     numberOfSequentialEventsPerChild_(1),
00520     setCpuAffinity_(false),
00521     eventSetupDataToExcludeFromPrefetching_() {
00522     init(processDesc, token, legacy);
00523   }
00524 
00525 
00526   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00527     preProcessEventSignal_(),
00528     postProcessEventSignal_(),
00529     actReg_(),
00530     preg_(),
00531     branchIDListHelper_(),
00532     serviceToken_(),
00533     input_(),
00534     espController_(new eventsetup::EventSetupsController),
00535     esp_(),
00536     act_table_(),
00537     processConfiguration_(),
00538     schedule_(),
00539     subProcess_(),
00540     historyAppender_(new HistoryAppender),
00541     state_(sInit),
00542     event_loop_(),
00543     state_lock_(),
00544     stop_lock_(),
00545     stopper_(),
00546     starter_(),
00547     stop_count_(-1),
00548     last_rc_(epSuccess),
00549     last_error_text_(),
00550     id_set_(false),
00551     event_loop_id_(),
00552     my_sig_num_(getSigNum()),
00553     fb_(),
00554     looper_(),
00555     principalCache_(),
00556     shouldWeStop_(false),
00557     stateMachineWasInErrorState_(false),
00558     fileMode_(),
00559     emptyRunLumiMode_(),
00560     exceptionMessageFiles_(),
00561     exceptionMessageRuns_(),
00562     exceptionMessageLumis_(),
00563     alreadyHandlingException_(false),
00564     forceLooperToEnd_(false),
00565     looperBeginJobRun_(false),
00566     forceESCacheClearOnNewRun_(false),
00567     numberOfForkedChildren_(0),
00568     numberOfSequentialEventsPerChild_(1),
00569     setCpuAffinity_(false),
00570     eventSetupDataToExcludeFromPrefetching_() {
00571     if(isPython) {
00572       boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00573       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00574       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00575     }
00576     else {
00577       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00578       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00579     }
00580   }
00581 
00582   void
00583   EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00584                         ServiceToken const& iToken,
00585                         serviceregistry::ServiceLegacy iLegacy) {
00586 
00587     //std::cerr << processDesc->dump() << std::endl;
00588 
00589     boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00590     //std::cerr << parameterSet->dump() << std::endl;
00591 
00592     // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
00593     boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00594 
00595     // Now set some parameters specific to the main process.
00596     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00597     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00598     emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00599     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00600     ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00601     numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00602     numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00603     setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00604     continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
00605     std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00606     for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00607         itPS != itPSEnd;
00608         ++itPS) {
00609       eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00610                                                 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00611                                                                itPS->getUntrackedParameter<std::string>("label", "")));
00612     }
00613     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00614 
00615     // Now do general initialization
00616     ScheduleItems items;
00617 
00618     //initialize the services
00619     boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00620     ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00621     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00622 
00623     //make the services available
00624     ServiceRegistry::Operate operate(serviceToken_);
00625 
00626     // intialize miscellaneous items
00627     boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00628 
00629     // intialize the event setup provider
00630     esp_ = espController_->makeProvider(*parameterSet);
00631 
00632     // initialize the looper, if any
00633     looper_ = fillLooper(*espController_, *esp_, *parameterSet);
00634     if(looper_) {
00635       looper_->setActionTable(items.act_table_.get());
00636       looper_->attachTo(*items.actReg_);
00637     }
00638 
00639     // initialize the input source
00640     input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
00641 
00642     // intialize the Schedule
00643     schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00644 
00645     // set the data members
00646     act_table_ = items.act_table_;
00647     actReg_ = items.actReg_;
00648     preg_ = items.preg_;
00649     branchIDListHelper_ = items.branchIDListHelper_;
00650     processConfiguration_ = items.processConfiguration_;
00651 
00652     FDEBUG(2) << parameterSet << std::endl;
00653     connectSigs(this);
00654 
00655     // Reusable event principal
00656     boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
00657     principalCache_.insert(ep);
00658       
00659     // initialize the subprocess, if there is one
00660     if(subProcessParameterSet) {
00661       subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
00662     }
00663   }
00664 
00665   EventProcessor::~EventProcessor() {
00666     // Make the services available while everything is being deleted.
00667     ServiceToken token = getToken();
00668     ServiceRegistry::Operate op(token);
00669     try {
00670       changeState(mDtor);
00671     }
00672     catch(cms::Exception& e) {
00673       LogError("System")
00674         << e.explainSelf() << "\n";
00675     }
00676 
00677     // manually destroy all these thing that may need the services around
00678     espController_.reset();
00679     subProcess_.reset();
00680     esp_.reset();
00681     schedule_.reset();
00682     input_.reset();
00683     looper_.reset();
00684     actReg_.reset();
00685 
00686     pset::Registry* psetRegistry = pset::Registry::instance();
00687     psetRegistry->data().clear();
00688     psetRegistry->extra().setID(ParameterSetID());
00689 
00690     EntryDescriptionRegistry::instance()->data().clear();
00691     ParentageRegistry::instance()->data().clear();
00692     ProcessConfigurationRegistry::instance()->data().clear();
00693     ProcessHistoryRegistry::instance()->data().clear();
00694   }
00695 
00696   void
00697   EventProcessor::beginJob() {
00698     if(state_ != sInit) return;
00699     bk::beginJob();
00700     // can only be run if in the initial state
00701     changeState(mBeginJob);
00702 
00703     // StateSentry toerror(this); // should we add this ?
00704     //make the services available
00705     ServiceRegistry::Operate operate(serviceToken_);
00706 
00707     //NOTE:  This implementation assumes 'Job' means one call
00708     // the EventProcessor::run
00709     // If it really means once per 'application' then this code will
00710     // have to be changed.
00711     // Also have to deal with case where have 'run' then new Module
00712     // added and do 'run'
00713     // again.  In that case the newly added Module needs its 'beginJob'
00714     // to be called.
00715 
00716     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
00717     //  For now we delay calling beginOfJob until first beginOfRun
00718     //if(looper_) {
00719     //   looper_->beginOfJob(es);
00720     //}
00721     try {
00722       try {
00723         input_->doBeginJob();
00724       }
00725       catch (cms::Exception& e) { throw; }
00726       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00727       catch (std::exception& e) { convertException::stdToEDM(e); }
00728       catch(std::string& s) { convertException::stringToEDM(s); }
00729       catch(char const* c) { convertException::charPtrToEDM(c); }
00730       catch (...) { convertException::unknownToEDM(); }
00731     }
00732     catch(cms::Exception& ex) {
00733       ex.addContext("Calling beginJob for the source");
00734       throw;
00735     }
00736     schedule_->beginJob();
00737     // toerror.succeeded(); // should we add this?
00738     if(hasSubProcess()) subProcess_->doBeginJob();
00739     actReg_->postBeginJobSignal_();
00740   }
00741 
00742   void
00743   EventProcessor::endJob() {
00744     // Collects exceptions, so we don't throw before all operations are performed.
00745     ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00746 
00747     // only allowed to run if state is sIdle, sJobReady, sRunGiven
00748     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00749 
00750     //make the services available
00751     ServiceRegistry::Operate operate(serviceToken_);
00752 
00753     schedule_->endJob(c);
00754     if(hasSubProcess()) {
00755       c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00756     }
00757     c.call(boost::bind(&InputSource::doEndJob, input_));
00758     if(looper_) {
00759       c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00760     }
00761     c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00762     if(c.hasThrown()) {
00763       c.rethrow();
00764     }
00765   }
00766 
00767   ServiceToken
00768   EventProcessor::getToken() {
00769     return serviceToken_;
00770   }
00771 
00772   //Setup signal handler to listen for when forked children stop
00773   namespace {
00774     //These are volatile since the compiler can not be allowed to optimize them
00775     // since they can be modified in the signaller handler
00776     volatile bool child_failed = false;
00777     volatile unsigned int num_children_done = 0;
00778     volatile int child_fail_exit_status = 0;
00779     volatile int child_fail_signal = 0;
00780     
00781     //NOTE: We setup the signal handler to run in the main thread which
00782     // is also the same thread that then reads the above values
00783 
00784     extern "C" {
00785       void ep_sigchld(int, siginfo_t*, void*) {
00786         //printf("in sigchld\n");
00787         //FDEBUG(1) << "in sigchld handler\n";
00788         int stat_loc;
00789         pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00790         while(0<p) {
00791           //printf("  looping\n");
00792           if(WIFEXITED(stat_loc)) {
00793             ++num_children_done;
00794             if(0 != WEXITSTATUS(stat_loc)) {
00795               child_fail_exit_status = WEXITSTATUS(stat_loc);
00796               child_failed = true;
00797             }
00798           }
00799           if(WIFSIGNALED(stat_loc)) {
00800             ++num_children_done;
00801             child_fail_signal = WTERMSIG(stat_loc);
00802             child_failed = true;
00803           }
00804           p = waitpid(-1, &stat_loc, WNOHANG);
00805         }
00806       }
00807     }
00808 
00809   }
00810 
00811   enum {
00812     kChildSucceed,
00813     kChildExitBadly,
00814     kChildSegv,
00815     kMaxChildAction
00816   };
00817 
00818   namespace {
00819     unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00820       unsigned int n = 0;
00821       while(numberOfChildren != 0) {
00822         ++n;
00823         numberOfChildren /= 10;
00824       }
00825       if(n == 0) {
00826         n = 3; // Protect against zero numberOfChildren
00827       }
00828       return n;
00829     }
00830     
00831     /*This class embodied the thread which is used to listen to the forked children and
00832      then tell them which events they should process */
00833     class MessageSenderToSource {
00834     public:
00835       MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00836       void operator()();
00837 
00838     private:
00839       const std::vector<int>& m_childrenPipes;
00840       long const m_nEventsToProcess;
00841       fd_set m_socketSet;
00842       unsigned int m_aliveChildren;
00843       int m_maxFd;
00844     };
00845     
00846     MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00847                                                  std::vector<int> const& childrenPipes,
00848                                                  long iNEventsToProcess):
00849     m_childrenPipes(childrenPipes),
00850     m_nEventsToProcess(iNEventsToProcess),
00851     m_aliveChildren(childrenSockets.size()),
00852     m_maxFd(0)
00853     {
00854       FD_ZERO(&m_socketSet);
00855       for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00856            it != itEnd; it++) {
00857         FD_SET(*it, &m_socketSet);
00858         if (*it > m_maxFd) {
00859           m_maxFd = *it;
00860         }
00861       }
00862       for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00863            it != itEnd; ++it) {
00864         FD_SET(*it, &m_socketSet);
00865         if (*it > m_maxFd) {
00866           m_maxFd = *it;
00867         }
00868       }
00869       m_maxFd++; // select reads [0,m_maxFd).
00870     }
00871    
00872     /* This function is the heart of the communication between parent and child.
00873      * When ready for more data, the child (see MessageReceiverForSource) requests
00874      * data through a AF_UNIX socket message.  The parent will then assign the next
00875      * chunk of data by sending a message back.
00876      *
00877      * Additionally, this function also monitors the read-side of the pipe fd from the child.
00878      * If the child dies unexpectedly, the pipe will be selected as ready for read and
00879      * will return EPIPE when read from.  Further, if the child thinks the parent has died
00880      * (defined as waiting more than 1s for a response), it will write a single byte to
00881      * the pipe.  If the parent has died, the child will get a EPIPE and throw an exception.
00882      * If still alive, the parent will read the byte and ignore it.
00883      *
00884      * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
00885      * handler can distinguish between success and failure cases.
00886      */
00887  
00888     void
00889     MessageSenderToSource::operator()() {
00890       multicore::MessageForParent childMsg;
00891       LogInfo("ForkingController") << "I am controller";
00892       //this is the master and therefore the controller
00893       
00894       multicore::MessageForSource sndmsg;
00895       sndmsg.startIndex = 0;
00896       sndmsg.nIndices = m_nEventsToProcess;
00897       do {
00898         
00899         fd_set readSockets, errorSockets;
00900         // Wait for a request from a child for events.
00901         memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00902         memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00903         // Note that we don't timeout; may be reconsidered in the future.
00904         ssize_t rc;
00905         while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00906         if (rc < 0) {
00907           std::cerr << "select failed; should be impossible due to preconditions.\n";
00908           abort();
00909           break;
00910         }
00911 
00912         // Read the message from the child.
00913         for (int idx=0; idx<m_maxFd; idx++) {
00914 
00915           // Handle errors
00916           if (FD_ISSET(idx, &errorSockets)) {
00917             LogInfo("ForkingController") << "Error on socket " << idx;
00918             FD_CLR(idx, &m_socketSet);
00919             close(idx);
00920             // See if it was the watchdog pipe that died.
00921             for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00922               if (*it == idx) {
00923                 m_aliveChildren--;
00924               }
00925             }
00926             continue;
00927           }
00928           
00929           if (!FD_ISSET(idx, &readSockets)) {
00930             continue;
00931           }
00932 
00933           // See if this FD is a child watchdog pipe.  If so, read from it to prevent
00934           // writes from blocking.
00935           bool is_pipe = false;
00936           for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00937               if (*it == idx) {
00938                 is_pipe = true;
00939                 char buf;
00940                 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00941                 if (rc <= 0) {
00942                   m_aliveChildren--;
00943                   FD_CLR(idx, &m_socketSet);
00944                   close(idx);
00945                 }
00946               }
00947           }
00948 
00949           // Only execute this block if the FD is a socket for sending the child work.
00950           if (!is_pipe) {
00951             while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
00952             if (rc < 0) {
00953               FD_CLR(idx, &m_socketSet);
00954               close(idx);
00955               continue;
00956             }
00957           
00958             // Tell the child what events to process.
00959             // If 'send' fails, then the child process has failed (any other possibilities are
00960             // eliminated because we are using fixed-size messages with Unix datagram sockets).
00961             // Thus, the SIGCHLD handler will fire and set child_fail = true.
00962             while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
00963             if (rc < 0) {
00964               FD_CLR(idx, &m_socketSet);
00965               close(idx);
00966               continue;
00967             }
00968             //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
00969             sndmsg.startIndex += sndmsg.nIndices;
00970           }
00971         }
00972       
00973       } while (m_aliveChildren > 0);
00974       
00975       return;
00976     }
00977 
00978   }
00979 
00980   
00981   void EventProcessor::possiblyContinueAfterForkChildFailure() {
00982     if(child_failed && continueAfterChildFailure_) {
00983       if (child_fail_signal) {
00984         LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
00985         child_fail_signal=0;
00986       } else if (child_fail_exit_status) {
00987         LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
00988         child_fail_exit_status=0;
00989       } else {
00990         LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
00991       }
00992       child_failed =false;
00993     }
00994   }
00995 
00996   bool
00997   EventProcessor::forkProcess(std::string const& jobReportFile) {
00998 
00999     if(0 == numberOfForkedChildren_) {return true;}
01000     assert(0<numberOfForkedChildren_);
01001     //do what we want done in common
01002     {
01003       beginJob(); //make sure this was run
01004       // make the services available
01005       ServiceRegistry::Operate operate(serviceToken_);
01006 
01007       InputSource::ItemType itemType;
01008       itemType = input_->nextItemType();
01009 
01010       assert(itemType == InputSource::IsFile);
01011       {
01012         readFile();
01013       }
01014       itemType = input_->nextItemType();
01015       assert(itemType == InputSource::IsRun);
01016 
01017       LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01018       IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01019                       input_->runAuxiliary()->beginTime());
01020       espController_->eventSetupForInstance(ts);
01021       EventSetup const& es = esp_->eventSetup();
01022 
01023       //now get all the data available in the EventSetup
01024       std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01025       es.fillAvailableRecordKeys(recordKeys);
01026       std::vector<eventsetup::DataKey> dataKeys;
01027       for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01028           itKey != itEnd;
01029           ++itKey) {
01030         eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01031         //see if this is on our exclusion list
01032         ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01033         ExcludedData const* excludedData(0);
01034         if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01035           excludedData = &(itExcludeRec->second);
01036           if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01037             //skip all items in this record
01038             continue;
01039           }
01040         }
01041         if(0 != recordPtr) {
01042           dataKeys.clear();
01043           recordPtr->fillRegisteredDataKeys(dataKeys);
01044           for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01045               itDataKey != itDataKeyEnd;
01046               ++itDataKey) {
01047             //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01048             if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01049               LogInfo("ForkingEventSetupPreFetching") << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01050               continue;
01051             }
01052             try {
01053               recordPtr->doGet(*itDataKey);
01054             } catch(cms::Exception& e) {
01055              LogWarning("ForkingEventSetupPreFetching") << e.what();
01056             }
01057           }
01058         }
01059       }
01060     }
01061     LogSystem("ForkingEventSetupPreFetching") <<"  done prefetching";
01062     {
01063       // make the services available
01064       ServiceRegistry::Operate operate(serviceToken_);
01065       Service<JobReport> jobReport;
01066       jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01067 
01068       //Now actually do the forking
01069       actReg_->preForkReleaseResourcesSignal_();
01070       input_->doPreForkReleaseResources();
01071       schedule_->preForkReleaseResources();
01072     }
01073     installCustomHandler(SIGCHLD, ep_sigchld);
01074 
01075 
01076     unsigned int childIndex = 0;
01077     unsigned int const kMaxChildren = numberOfForkedChildren_;
01078     unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01079     std::vector<pid_t> childrenIds;
01080     childrenIds.reserve(kMaxChildren);
01081     std::vector<int> childrenSockets;
01082     childrenSockets.reserve(kMaxChildren);
01083     std::vector<int> childrenPipes;
01084     childrenPipes.reserve(kMaxChildren);
01085     std::vector<int> childrenSocketsCopy;
01086     childrenSocketsCopy.reserve(kMaxChildren);
01087     std::vector<int> childrenPipesCopy;
01088     childrenPipesCopy.reserve(kMaxChildren);
01089     int pipes[] {0, 0};
01090 
01091     {
01092       // make the services available
01093       ServiceRegistry::Operate operate(serviceToken_);
01094       Service<JobReport> jobReport;
01095       int sockets[2], fd_flags;
01096       for(; childIndex < kMaxChildren; ++childIndex) {
01097         // Create a UNIX_DGRAM socket pair
01098         if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01099           printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01100           exit(EXIT_FAILURE);
01101         }
01102         if (pipe(pipes)) {
01103           printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01104           exit(EXIT_FAILURE);
01105         }
01106         // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
01107         if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01108           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01109           exit(EXIT_FAILURE);
01110         }
01111         // Mark socket as non-block.  Child must be careful to do select prior
01112         // to reading from socket.
01113         if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01114           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01115           exit(EXIT_FAILURE);
01116         }
01117         if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01118           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01119           exit(EXIT_FAILURE);
01120         }
01121         if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01122           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01123           exit(EXIT_FAILURE);
01124         }
01125         // Linux man page notes there are some edge cases where reading from a
01126         // fd can block, even after a select.
01127         if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01128           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01129           exit(EXIT_FAILURE);
01130         }
01131         if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01132           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01133           exit(EXIT_FAILURE);
01134         }
01135 
01136         childrenPipesCopy = childrenPipes;
01137         childrenSocketsCopy = childrenSockets;
01138 
01139         pid_t value = fork();
01140         if(value == 0) {
01141           // Close the parent's side of the socket and pipe which will talk to us.
01142           close(pipes[0]);
01143           close(sockets[0]);
01144           // Close our copies of the parent's other communication pipes.
01145           for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01146             close(*it);
01147           }
01148           for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01149             close(*it);
01150           }
01151 
01152           // this is the child process, redirect stdout and stderr to a log file
01153           fflush(stdout);
01154           fflush(stderr);
01155           std::stringstream stout;
01156           stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01157           if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01158             LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01159           }
01160           if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01161             LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01162           }
01163 
01164           LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01165           if(setCpuAffinity_) {
01166             // CPU affinity is handled differently on macosx.
01167             // We disable it and print a message until someone reads:
01168             //
01169             // https://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
01170             //
01171             // and implements it.
01172 #ifdef __APPLE__
01173             LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01174 #else
01175             LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01176             cpu_set_t mask;
01177             CPU_ZERO(&mask);
01178             CPU_SET(childIndex, &mask);
01179             if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01180               LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01181               exit(-1);
01182             }
01183 #endif
01184           }
01185           break;
01186         } else {
01187           //this is the parent
01188           close(pipes[1]);
01189           close(sockets[1]);
01190         }
01191         if(value < 0) {
01192           LogError("ForkingChild") << "failed to create a child";
01193           exit(-1);
01194         }
01195         childrenIds.push_back(value);
01196         childrenSockets.push_back(sockets[0]);
01197         childrenPipes.push_back(pipes[0]);
01198       }
01199 
01200       if(childIndex < kMaxChildren) {
01201         jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01202         actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01203 
01204         boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01205         input_->doPostForkReacquireResources(receiver);
01206         schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01207         //NOTE: sources have to reset themselves by listening to the post fork message
01208         //rewindInput();
01209         return true;
01210       }
01211       jobReport->parentAfterFork(jobReportFile);
01212     }
01213 
01214     //this is the original, which is now the master for all the children
01215 
01216     //Need to wait for signals from the children or externally
01217     // To wait we must
01218     // 1) block the signals we want to wait on so we do not have a race condition
01219     // 2) check that we haven't already meet our ending criteria
01220     // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
01221     sigset_t blockingSigSet;
01222     sigset_t unblockingSigSet;
01223     sigset_t oldSigSet;
01224     pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01225     pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01226     sigaddset(&blockingSigSet, SIGCHLD);
01227     sigaddset(&blockingSigSet, SIGUSR2);
01228     sigaddset(&blockingSigSet, SIGINT);
01229     sigdelset(&unblockingSigSet, SIGCHLD);
01230     sigdelset(&unblockingSigSet, SIGUSR2);
01231     sigdelset(&unblockingSigSet, SIGINT);
01232     pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01233 
01234     // If there are too many fd's (unlikely, but possible) for select, denote this 
01235     // because the sender will fail.
01236     bool too_many_fds = false;
01237     if (pipes[1]+1 > FD_SETSIZE) {
01238       LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01239       too_many_fds = true;
01240     }
01241 
01242     //create a thread that sends the units of work to workers
01243     // we create it after all signals were blocked so that this
01244     // thread is never interupted by a signal
01245     MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01246     boost::thread senderThread(sender);
01247 
01248     if(not too_many_fds) {
01249       //NOTE: a child could have failed before we got here and even after this call
01250       // which is why the 'if' is conditional on continueAfterChildFailure_
01251       possiblyContinueAfterForkChildFailure();
01252       while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
01253         sigsuspend(&unblockingSigSet);
01254         possiblyContinueAfterForkChildFailure();
01255         LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01256       }
01257     }
01258     pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01259 
01260     LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01261     if(child_failed) {
01262       LogError("ForkingStopping") << "child failed";
01263     }
01264     if(shutdown_flag) {
01265       LogSystem("ForkingStopping") << "asked to shutdown";
01266     }
01267 
01268     if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01269       LogInfo("ForkingStopping") << "must stop children" << std::endl;
01270       for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01271           it != itEnd; ++it) {
01272         /* int result = */ kill(*it, SIGUSR2);
01273       }
01274       pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01275       while(num_children_done != kMaxChildren) {
01276         sigsuspend(&unblockingSigSet);
01277       }
01278       pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01279     }
01280     // The senderThread will notice the pipes die off, one by one.  Once all children are gone, it will exit.
01281     senderThread.join();
01282     if(child_failed && !continueAfterChildFailure_) {
01283       if (child_fail_signal) {
01284         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01285       } else if (child_fail_exit_status) {
01286         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01287       } else {
01288         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01289       }
01290     }
01291     if(too_many_fds) {
01292       throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01293     }
01294     return false;
01295   }
01296 
01297   void
01298   EventProcessor::connectSigs(EventProcessor* ep) {
01299     // When the FwkImpl signals are given, pass them to the
01300     // appropriate EventProcessor signals so that the outside world
01301     // can see the signal.
01302     actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01303     actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01304   }
01305 
01306   std::vector<ModuleDescription const*>
01307   EventProcessor::getAllModuleDescriptions() const {
01308     return schedule_->getAllModuleDescriptions();
01309   }
01310 
01311   int
01312   EventProcessor::totalEvents() const {
01313     return schedule_->totalEvents();
01314   }
01315 
01316   int
01317   EventProcessor::totalEventsPassed() const {
01318     return schedule_->totalEventsPassed();
01319   }
01320 
01321   int
01322   EventProcessor::totalEventsFailed() const {
01323     return schedule_->totalEventsFailed();
01324   }
01325 
01326   void
01327   EventProcessor::enableEndPaths(bool active) {
01328     schedule_->enableEndPaths(active);
01329   }
01330 
01331   bool
01332   EventProcessor::endPathsEnabled() const {
01333     return schedule_->endPathsEnabled();
01334   }
01335 
01336   void
01337   EventProcessor::getTriggerReport(TriggerReport& rep) const {
01338     schedule_->getTriggerReport(rep);
01339   }
01340 
01341   void
01342   EventProcessor::clearCounters() {
01343     schedule_->clearCounters();
01344   }
01345 
01346   char const* EventProcessor::currentStateName() const {
01347     return stateName(getState());
01348   }
01349 
01350   char const* EventProcessor::stateName(State s) const {
01351     return stateNames[s];
01352   }
01353 
01354   char const* EventProcessor::msgName(Msg m) const {
01355     return msgNames[m];
01356   }
01357 
01358   State EventProcessor::getState() const {
01359     return state_;
01360   }
01361 
01362   EventProcessor::StatusCode EventProcessor::statusAsync() const {
01363     // the thread will record exception/error status in the event processor
01364     // for us to look at and report here
01365     return last_rc_;
01366   }
01367 
01368   void
01369   EventProcessor::setRunNumber(RunNumber_t runNumber) {
01370     if(runNumber == 0) {
01371       runNumber = 1;
01372       LogWarning("Invalid Run")
01373         << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01374         << "Run number was set to 1 instead\n";
01375     }
01376 
01377     // inside of beginJob there is a check to see if it has been called before
01378     beginJob();
01379     changeState(mSetRun);
01380 
01381     // interface not correct yet
01382     input_->setRunNumber(runNumber);
01383   }
01384 
01385   void
01386   EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
01387     // inside of beginJob there is a check to see if it has been called before
01388     beginJob();
01389     changeState(mSetRun);
01390 
01391     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
01392     //input_->declareRunNumber(runNumber);
01393   }
01394 
01395   EventProcessor::StatusCode
01396   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01397     bool rc = true;
01398     boost::xtime timeout;
01399 
01400 #if BOOST_VERSION >= 105000
01401     boost::xtime_get(&timeout, boost::TIME_UTC_);
01402 #else
01403     boost::xtime_get(&timeout, boost::TIME_UTC);
01404 #endif
01405     timeout.sec += timeout_seconds;
01406 
01407     // make sure to include a timeout here so we don't wait forever
01408     // I suspect there are still timing issues with thread startup
01409     // and the setting of the various control variables (stop_count, id_set)
01410     {
01411       boost::mutex::scoped_lock sl(stop_lock_);
01412 
01413       // look here - if runAsync not active, just return the last return code
01414       if(stop_count_ < 0) return last_rc_;
01415 
01416       if(timeout_seconds == 0) {
01417         while(stop_count_ == 0) stopper_.wait(sl);
01418       } else {
01419         while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01420       }
01421 
01422       if(rc == false) {
01423           // timeout occurred
01424           // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
01425           // this is a temporary hack until we get the input source
01426           // upgraded to allow blocking input sources to be unblocked
01427 
01428           // the next line is dangerous and causes all sorts of trouble
01429           if(id_set_) pthread_cancel(event_loop_id_);
01430 
01431           // we will not do anything yet
01432           LogWarning("timeout")
01433             << "An asynchronous request was made to shut down "
01434             << "the event loop "
01435             << "and the event loop did not shutdown after "
01436             << timeout_seconds << " seconds\n";
01437       } else {
01438           event_loop_->join();
01439           event_loop_.reset();
01440           id_set_ = false;
01441           stop_count_ = -1;
01442       }
01443     }
01444     return rc == false ? epTimedOut : last_rc_;
01445   }
01446 
01447   EventProcessor::StatusCode
01448   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01449     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01450     if(rc != epTimedOut) changeState(mCountComplete);
01451     else errorState();
01452     return rc;
01453   }
01454 
01455 
01456   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01457     changeState(mStopAsync);
01458     StatusCode rc = waitForAsyncCompletion(secs);
01459     if(rc != epTimedOut) changeState(mFinished);
01460     else errorState();
01461     return rc;
01462   }
01463 
01464   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01465     changeState(mShutdownAsync);
01466     StatusCode rc = waitForAsyncCompletion(secs);
01467     if(rc != epTimedOut) changeState(mFinished);
01468     else errorState();
01469     return rc;
01470   }
01471 
01472   void EventProcessor::errorState() {
01473     state_ = sError;
01474   }
01475 
01476   // next function irrelevant now
01477   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01478     // make sure to include a timeout here so we don't wait forever
01479     // I suspect there are still timing issues with thread startup
01480     // and the setting of the various control variables (stop_count, id_set)
01481     changeState(m);
01482     return waitForAsyncCompletion(60*2);
01483   }
01484 
01485   void EventProcessor::changeState(Msg msg) {
01486     // most likely need to serialize access to this routine
01487 
01488     boost::mutex::scoped_lock sl(state_lock_);
01489     State curr = state_;
01490     int rc;
01491     // found if(not end of table) and
01492     // (state == table.state && (msg == table.message || msg == any))
01493     for(rc = 0;
01494         table[rc].current != sInvalid &&
01495           (curr != table[rc].current ||
01496            (curr == table[rc].current &&
01497              msg != table[rc].message && table[rc].message != mAny));
01498         ++rc);
01499 
01500     if(table[rc].current == sInvalid)
01501       throw cms::Exception("BadState")
01502         << "A member function of EventProcessor has been called in an"
01503         << " inappropriate order.\n"
01504         << "Bad transition from " << stateName(curr) << " "
01505         << "using message " << msgName(msg) << "\n"
01506         << "No where to go from here.\n";
01507 
01508     FDEBUG(1) << "changeState: current=" << stateName(curr)
01509               << ", message=" << msgName(msg)
01510               << " -> new=" << stateName(table[rc].final) << "\n";
01511 
01512     state_ = table[rc].final;
01513   }
01514 
01515   void EventProcessor::runAsync() {
01516     beginJob();
01517     {
01518       boost::mutex::scoped_lock sl(stop_lock_);
01519       if(id_set_ == true) {
01520           std::string err("runAsync called while async event loop already running\n");
01521           LogError("FwkJob") << err;
01522           throw cms::Exception("BadState") << err;
01523       }
01524 
01525       changeState(mRunAsync);
01526 
01527       stop_count_ = 0;
01528       last_rc_ = epSuccess; // forget the last value!
01529       event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01530       boost::xtime timeout;
01531 #if BOOST_VERSION >= 105000
01532       boost::xtime_get(&timeout, boost::TIME_UTC_);
01533 #else
01534       boost::xtime_get(&timeout, boost::TIME_UTC);
01535 #endif
01536       timeout.sec += 60; // 60 seconds to start!!!!
01537       if(starter_.timed_wait(sl, timeout) == false) {
01538           // yikes - the thread did not start
01539           throw cms::Exception("BadState")
01540             << "Async run thread did not start in 60 seconds\n";
01541       }
01542     }
01543   }
01544 
01545   void EventProcessor::asyncRun(EventProcessor* me) {
01546     // set up signals to allow for interruptions
01547     // ignore all other signals
01548     // make sure no exceptions escape out
01549 
01550     // temporary hack until we modify the input source to allow
01551     // wakeup calls from other threads.  This mimics the solution
01552     // in EventFilter/Processor, which I do not like.
01553     // allowing cancels means that the thread just disappears at
01554     // certain points.  This is bad for C++ stack variables.
01555     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01556     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
01557     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01558     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01559 
01560     {
01561       boost::mutex::scoped_lock sl(me->stop_lock_);
01562       me->event_loop_id_ = pthread_self();
01563       me->id_set_ = true;
01564       me->starter_.notify_all();
01565     }
01566 
01567     Status rc = epException;
01568     FDEBUG(2) << "asyncRun starting ......................\n";
01569 
01570     try {
01571       bool onlineStateTransitions = true;
01572       rc = me->runToCompletion(onlineStateTransitions);
01573     }
01574     catch (cms::Exception& e) {
01575       LogError("FwkJob") << "cms::Exception caught in "
01576                          << "EventProcessor::asyncRun"
01577                          << "\n"
01578                          << e.explainSelf();
01579       me->last_error_text_ = e.explainSelf();
01580     }
01581     catch (std::exception& e) {
01582       LogError("FwkJob") << "Standard library exception caught in "
01583                          << "EventProcessor::asyncRun"
01584                          << "\n"
01585                          << e.what();
01586       me->last_error_text_ = e.what();
01587     }
01588     catch (...) {
01589       LogError("FwkJob") << "Unknown exception caught in "
01590                          << "EventProcessor::asyncRun"
01591                          << "\n";
01592       me->last_error_text_ = "Unknown exception caught";
01593       rc = epOther;
01594     }
01595 
01596     me->last_rc_ = rc;
01597 
01598     {
01599       // notify anyone waiting for exit that we are doing so now
01600       boost::mutex::scoped_lock sl(me->stop_lock_);
01601       ++me->stop_count_;
01602       me->stopper_.notify_all();
01603     }
01604     FDEBUG(2) << "asyncRun ending ......................\n";
01605   }
01606 
01607   std::auto_ptr<statemachine::Machine>
01608   EventProcessor::createStateMachine() {
01609     statemachine::FileMode fileMode;
01610     if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01611     else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01612     else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01613     else {
01614       throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01615       << fileMode_ << ".\n"
01616       << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01617     }
01618     
01619     statemachine::EmptyRunLumiMode emptyRunLumiMode;
01620     if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01621     else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01622     else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01623     else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01624     else {
01625       throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01626       << emptyRunLumiMode_ << ".\n"
01627       << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01628     }
01629     
01630     std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
01631                                              fileMode,
01632                                              emptyRunLumiMode));
01633     
01634     machine->initiate();
01635     return machine;
01636   }
01637 
01638 
01639   EventProcessor::StatusCode
01640   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01641 
01642     StateSentry toerror(this);
01643 
01644     StatusCode returnCode=epSuccess;
01645     std::auto_ptr<statemachine::Machine> machine;
01646     {
01647       beginJob(); //make sure this was called
01648       
01649       if(!onlineStateTransitions) changeState(mRunCount);
01650       
01651       //StatusCode returnCode = epSuccess;
01652       stateMachineWasInErrorState_ = false;
01653       
01654       // make the services available
01655       ServiceRegistry::Operate operate(serviceToken_);
01656 
01657       machine = createStateMachine();
01658       try {
01659         try {
01660           
01661           InputSource::ItemType itemType;
01662           
01663           while(true) {
01664             
01665             bool more = true;
01666             if(numberOfForkedChildren_ > 0) {
01667               size_t size = preg_->size();
01668               more = input_->skipForForking();
01669               if(more) {
01670                 if(size < preg_->size()) {
01671                   principalCache_.adjustIndexesAfterProductRegistryAddition();
01672                 }
01673                 principalCache_.adjustEventToNewProductRegistry(preg_);
01674               }
01675             } 
01676             itemType = (more ? input_->nextItemType() : InputSource::IsStop);
01677             
01678             FDEBUG(1) << "itemType = " << itemType << "\n";
01679             
01680             // These are used for asynchronous running only and
01681             // and are checking to see if stopAsync or shutdownAsync
01682             // were called from another thread.  In the future, we
01683             // may need to do something better than polling the state.
01684             // With the current code this is the simplest thing and
01685             // it should always work.  If the interaction between
01686             // threads becomes more complex this may cause problems.
01687             if(state_ == sStopping) {
01688               FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01689               forceLooperToEnd_ = true;
01690               machine->process_event(statemachine::Stop());
01691               forceLooperToEnd_ = false;
01692               break;
01693             }
01694             else if(state_ == sShuttingDown) {
01695               FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01696               forceLooperToEnd_ = true;
01697               machine->process_event(statemachine::Stop());
01698               forceLooperToEnd_ = false;
01699               break;
01700             }
01701             
01702             // Look for a shutdown signal
01703             {
01704               boost::mutex::scoped_lock sl(usr2_lock);
01705               if(shutdown_flag) {
01706                 changeState(mShutdownSignal);
01707                 returnCode = epSignal;
01708                 forceLooperToEnd_ = true;
01709                 machine->process_event(statemachine::Stop());
01710                 forceLooperToEnd_ = false;
01711                 break;
01712               }
01713             }
01714             
01715             if(itemType == InputSource::IsStop) {
01716               machine->process_event(statemachine::Stop());
01717             }
01718             else if(itemType == InputSource::IsFile) {
01719               machine->process_event(statemachine::File());
01720             }
01721             else if(itemType == InputSource::IsRun) {
01722               machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01723             }
01724             else if(itemType == InputSource::IsLumi) {
01725               machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
01726             }
01727             else if(itemType == InputSource::IsEvent) {
01728               machine->process_event(statemachine::Event());
01729             }
01730             // This should be impossible
01731             else {
01732               throw Exception(errors::LogicError)
01733               << "Unknown next item type passed to EventProcessor\n"
01734               << "Please report this error to the Framework group\n";
01735             }
01736             
01737             if(machine->terminated()) {
01738               changeState(mInputExhausted);
01739               break;
01740             }
01741           }  // End of loop over state machine events
01742         } // Try block
01743         catch (cms::Exception& e) { throw; }
01744         catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01745         catch (std::exception& e) { convertException::stdToEDM(e); }
01746         catch(std::string& s) { convertException::stringToEDM(s); }
01747         catch(char const* c) { convertException::charPtrToEDM(c); }
01748         catch (...) { convertException::unknownToEDM(); }
01749       } // Try block
01750       // Some comments on exception handling related to the boost state machine:
01751       //
01752       // Some states used in the machine are special because they
01753       // perform actions while the machine is being terminated, actions
01754       // such as close files, call endRun, call endLumi etc ...  Each of these
01755       // states has two functions that perform these actions.  The functions
01756       // are almost identical.  The major difference is that one version
01757       // catches all exceptions and the other lets exceptions pass through.
01758       // The destructor catches them and the other function named "exit" lets
01759       // them pass through.  On a normal termination, boost will always call
01760       // "exit" and then the state destructor.  In our state classes, the
01761       // the destructors do nothing if the exit function already took
01762       // care of things.  Here's the interesting part.  When boost is
01763       // handling an exception the "exit" function is not called (a boost
01764       // feature).
01765       //
01766       // If an exception occurs while the boost machine is in control
01767       // (which usually means inside a process_event call), then
01768       // the boost state machine destroys its states and "terminates" itself.
01769       // This already done before we hit the catch blocks below. In this case
01770       // the call to terminateMachine below only destroys an already
01771       // terminated state machine.  Because exit is not called, the state destructors
01772       // handle cleaning up lumis, runs, and files.  The destructors swallow
01773       // all exceptions and only pass through the exceptions messages, which
01774       // are tacked onto the original exception below.
01775       //
01776       // If an exception occurs when the boost state machine is not
01777       // in control (outside the process_event functions), then boost
01778       // cannot destroy its own states.  The terminateMachine function
01779       // below takes care of that.  The flag "alreadyHandlingException"
01780       // is set true so that the state exit functions do nothing (and
01781       // cannot throw more exceptions while handling the first).  Then the
01782       // state destructors take care of this because exit did nothing.
01783       //
01784       // In both cases above, the EventProcessor::endOfLoop function is
01785       // not called because it can throw exceptions.
01786       //
01787       // One tricky aspect of the state machine is that things that can
01788       // throw should not be invoked by the state machine while another
01789       // exception is being handled.
01790       // Another tricky aspect is that it appears to be important to
01791       // terminate the state machine before invoking its destructor.
01792       // We've seen crashes that are not understood when that is not
01793       // done.  Maintainers of this code should be careful about this.
01794       
01795       catch (cms::Exception & e) {
01796         alreadyHandlingException_ = true;
01797         terminateMachine(machine);
01798         alreadyHandlingException_ = false;
01799         if (!exceptionMessageLumis_.empty()) {
01800           e.addAdditionalInfo(exceptionMessageLumis_);
01801           if (e.alreadyPrinted()) {
01802             LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01803           }
01804         }
01805         if (!exceptionMessageRuns_.empty()) {
01806           e.addAdditionalInfo(exceptionMessageRuns_);
01807           if (e.alreadyPrinted()) {
01808             LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01809           }
01810         }
01811         if (!exceptionMessageFiles_.empty()) {
01812           e.addAdditionalInfo(exceptionMessageFiles_);
01813           if (e.alreadyPrinted()) {
01814             LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01815           }
01816         }
01817         throw;
01818       }
01819       
01820       if(machine->terminated()) {
01821         FDEBUG(1) << "The state machine reports it has been terminated\n";
01822         machine.reset();
01823       }
01824       
01825       if(!onlineStateTransitions) changeState(mFinished);
01826       
01827       if(stateMachineWasInErrorState_) {
01828         throw cms::Exception("BadState")
01829         << "The boost state machine in the EventProcessor exited after\n"
01830         << "entering the Error state.\n";
01831       }
01832       
01833     }
01834     if(machine.get() != 0) {
01835       terminateMachine(machine);
01836       throw Exception(errors::LogicError)
01837         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01838         << "Please report this error to the Framework group\n";
01839     }
01840 
01841     toerror.succeeded();
01842 
01843     return returnCode;
01844   }
01845 
01846   void EventProcessor::readFile() {
01847     FDEBUG(1) << " \treadFile\n";
01848     size_t size = preg_->size();
01849     fb_ = input_->readFile();
01850     if(size < preg_->size()) {
01851       principalCache_.adjustIndexesAfterProductRegistryAddition();
01852     }
01853     principalCache_.adjustEventToNewProductRegistry(preg_);
01854     if(numberOfForkedChildren_ > 0) {
01855         fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01856     }
01857   }
01858 
01859   void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
01860     if (fb_.get() != 0) {
01861       input_->closeFile(fb_, cleaningUpAfterException);
01862     }
01863     FDEBUG(1) << "\tcloseInputFile\n";
01864   }
01865 
01866   void EventProcessor::openOutputFiles() {
01867     if (fb_.get() != 0) {
01868       schedule_->openOutputFiles(*fb_);
01869       if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01870     }
01871     FDEBUG(1) << "\topenOutputFiles\n";
01872   }
01873 
01874   void EventProcessor::closeOutputFiles() {
01875     if (fb_.get() != 0) {
01876       schedule_->closeOutputFiles();
01877       if(hasSubProcess()) subProcess_->closeOutputFiles();
01878     }
01879     FDEBUG(1) << "\tcloseOutputFiles\n";
01880   }
01881 
01882   void EventProcessor::respondToOpenInputFile() {
01883     if (fb_.get() != 0) {
01884       schedule_->respondToOpenInputFile(*fb_);
01885       if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01886     }
01887     FDEBUG(1) << "\trespondToOpenInputFile\n";
01888   }
01889 
01890   void EventProcessor::respondToCloseInputFile() {
01891     if (fb_.get() != 0) {
01892       schedule_->respondToCloseInputFile(*fb_);
01893       if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01894     }
01895     FDEBUG(1) << "\trespondToCloseInputFile\n";
01896   }
01897 
01898   void EventProcessor::respondToOpenOutputFiles() {
01899     if (fb_.get() != 0) {
01900       schedule_->respondToOpenOutputFiles(*fb_);
01901       if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01902     }
01903     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01904   }
01905 
01906   void EventProcessor::respondToCloseOutputFiles() {
01907     if (fb_.get() != 0) {
01908       schedule_->respondToCloseOutputFiles(*fb_);
01909       if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01910     }
01911     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01912   }
01913 
01914   void EventProcessor::startingNewLoop() {
01915     shouldWeStop_ = false;
01916     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
01917     // until after we've called beginOfJob
01918     if(looper_ && looperBeginJobRun_) {
01919       looper_->doStartingNewLoop();
01920     }
01921     FDEBUG(1) << "\tstartingNewLoop\n";
01922   }
01923 
01924   bool EventProcessor::endOfLoop() {
01925     if(looper_) {
01926       ModuleChanger changer(schedule_.get());
01927       looper_->setModuleChanger(&changer);
01928       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01929       looper_->setModuleChanger(0);
01930       if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01931       else return false;
01932     }
01933     FDEBUG(1) << "\tendOfLoop\n";
01934     return true;
01935   }
01936 
01937   void EventProcessor::rewindInput() {
01938     input_->repeat();
01939     input_->rewind();
01940     FDEBUG(1) << "\trewind\n";
01941   }
01942 
01943   void EventProcessor::prepareForNextLoop() {
01944     looper_->prepareForNextLoop(esp_.get());
01945     FDEBUG(1) << "\tprepareForNextLoop\n";
01946   }
01947 
01948   bool EventProcessor::shouldWeCloseOutput() const {
01949     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01950     return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01951   }
01952 
01953   void EventProcessor::doErrorStuff() {
01954     FDEBUG(1) << "\tdoErrorStuff\n";
01955     LogError("StateMachine")
01956       << "The EventProcessor state machine encountered an unexpected event\n"
01957       << "and went to the error state\n"
01958       << "Will attempt to terminate processing normally\n"
01959       << "(IF using the looper the next loop will be attempted)\n"
01960       << "This likely indicates a bug in an input module or corrupted input or both\n";
01961     stateMachineWasInErrorState_ = true;
01962   }
01963 
01964   void EventProcessor::beginRun(statemachine::Run const& run) {
01965     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01966     input_->doBeginRun(runPrincipal);
01967     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01968                     runPrincipal.beginTime());
01969     if(forceESCacheClearOnNewRun_){
01970       espController_->forceCacheClear();
01971     }
01972     espController_->eventSetupForInstance(ts);
01973     EventSetup const& es = esp_->eventSetup();
01974     if(looper_ && looperBeginJobRun_== false) {
01975       looper_->copyInfo(ScheduleInfo(schedule_.get()));
01976       looper_->beginOfJob(es);
01977       looperBeginJobRun_ = true;
01978       looper_->doStartingNewLoop();
01979     }
01980     {
01981       typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01982       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01983       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01984       if(hasSubProcess()) {
01985         subProcess_->doBeginRun(runPrincipal, ts);
01986       }
01987     }
01988     FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01989     if(looper_) {
01990       looper_->doBeginRun(runPrincipal, es);
01991     }
01992   }
01993 
01994   void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
01995     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01996     input_->doEndRun(runPrincipal, cleaningUpAfterException);
01997     IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01998                     runPrincipal.endTime());
01999     espController_->eventSetupForInstance(ts);
02000     EventSetup const& es = esp_->eventSetup();
02001     {
02002       typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02003       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02004       schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
02005       if(hasSubProcess()) {
02006         subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
02007       }
02008     }
02009     FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02010     if(looper_) {
02011       looper_->doEndRun(runPrincipal, es);
02012     }
02013   }
02014 
02015   void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02016     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02017     input_->doBeginLumi(lumiPrincipal);
02018 
02019     Service<RandomNumberGenerator> rng;
02020     if(rng.isAvailable()) {
02021       LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02022       rng->preBeginLumi(lb);
02023     }
02024 
02025     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
02026     // lumi blocks know their start and end times why not also start and end events?
02027     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02028     espController_->eventSetupForInstance(ts);
02029     EventSetup const& es = esp_->eventSetup();
02030     {
02031       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02032       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02033       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02034       if(hasSubProcess()) {
02035         subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02036       }
02037     }
02038     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02039     if(looper_) {
02040       looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02041     }
02042   }
02043 
02044   void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
02045     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02046     input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
02047     //NOTE: Using the max event number for the end of a lumi block is a bad idea
02048     // lumi blocks know their start and end times why not also start and end events?
02049     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02050                     lumiPrincipal.endTime());
02051     espController_->eventSetupForInstance(ts);
02052     EventSetup const& es = esp_->eventSetup();
02053     {
02054       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02055       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02056       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
02057       if(hasSubProcess()) {
02058         subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
02059       }
02060     }
02061     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02062     if(looper_) {
02063       looper_->doEndLuminosityBlock(lumiPrincipal, es);
02064     }
02065   }
02066 
02067   statemachine::Run EventProcessor::readAndCacheRun() {
02068     if (principalCache_.hasRunPrincipal()) {
02069       throw edm::Exception(edm::errors::LogicError)
02070         << "EventProcessor::readAndCacheRun\n"
02071         << "Illegal attempt to insert run into cache\n"
02072         << "Contact a Framework Developer\n";
02073     }
02074     principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
02075     return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02076   }
02077 
02078   statemachine::Run EventProcessor::readAndMergeRun() {
02079     principalCache_.merge(input_->runAuxiliary(), preg_);
02080     input_->readAndMergeRun(principalCache_.runPrincipalPtr());
02081     return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02082   }
02083 
02084   int EventProcessor::readAndCacheLumi() {
02085     if (principalCache_.hasLumiPrincipal()) {
02086       throw edm::Exception(edm::errors::LogicError)
02087         << "EventProcessor::readAndCacheRun\n"
02088         << "Illegal attempt to insert lumi into cache\n"
02089         << "Contact a Framework Developer\n";
02090     }
02091     if (!principalCache_.hasRunPrincipal()) {
02092       throw edm::Exception(edm::errors::LogicError)
02093         << "EventProcessor::readAndCacheRun\n"
02094         << "Illegal attempt to insert lumi into cache\n"
02095         << "Run is invalid\n"
02096         << "Contact a Framework Developer\n";
02097     }
02098     principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
02099     principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
02100     return input_->luminosityBlock();
02101   }
02102 
02103   int EventProcessor::readAndMergeLumi() {
02104     principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
02105     input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
02106     return input_->luminosityBlock();
02107   }
02108 
02109   void EventProcessor::writeRun(statemachine::Run const& run) {
02110     schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02111     if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02112     FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02113   }
02114 
02115   void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02116     principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02117     if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02118     FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02119   }
02120 
02121   void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02122     schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02123     if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02124     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02125   }
02126 
02127   void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02128     principalCache_.deleteLumi(phid, run, lumi);
02129     if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02130     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02131   }
02132 
02133   void EventProcessor::readAndProcessEvent() {
02134     EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
02135     FDEBUG(1) << "\treadEvent\n";
02136     assert(pep != 0);
02137     pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
02138     assert(pep->luminosityBlockPrincipalPtrValid());
02139     assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
02140     assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
02141 
02142     IOVSyncValue ts(pep->id(), pep->time());
02143     espController_->eventSetupForInstance(ts);
02144     EventSetup const& es = esp_->eventSetup();
02145     {
02146       typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02147       ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02148       schedule_->processOneOccurrence<Traits>(*pep, es);
02149       if(hasSubProcess()) {
02150         subProcess_->doEvent(*pep, ts);
02151       }
02152     }
02153 
02154     if(looper_) {
02155       bool randomAccess = input_->randomAccess();
02156       ProcessingController::ForwardState forwardState = input_->forwardState();
02157       ProcessingController::ReverseState reverseState = input_->reverseState();
02158       ProcessingController pc(forwardState, reverseState, randomAccess);
02159 
02160       EDLooperBase::Status status = EDLooperBase::kContinue;
02161       do {
02162         status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02163 
02164         bool succeeded = true;
02165         if(randomAccess) {
02166           if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02167             input_->skipEvents(-2);
02168           }
02169           else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02170             succeeded = input_->goToEvent(pc.specifiedEventTransition());
02171           }
02172         }
02173         pc.setLastOperationSucceeded(succeeded);
02174       } while(!pc.lastOperationSucceeded());
02175       if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02176 
02177     }
02178 
02179     FDEBUG(1) << "\tprocessEvent\n";
02180     pep->clearEventPrincipal();
02181   }
02182 
02183   bool EventProcessor::shouldWeStop() const {
02184     FDEBUG(1) << "\tshouldWeStop\n";
02185     if(shouldWeStop_) return true;
02186     return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02187   }
02188 
02189   void EventProcessor::setExceptionMessageFiles(std::string& message) {
02190     exceptionMessageFiles_ = message;
02191   }
02192 
02193   void EventProcessor::setExceptionMessageRuns(std::string& message) {
02194     exceptionMessageRuns_ = message;
02195   }
02196 
02197   void EventProcessor::setExceptionMessageLumis(std::string& message) {
02198     exceptionMessageLumis_ = message;
02199   }
02200 
02201   bool EventProcessor::alreadyHandlingException() const {
02202     return alreadyHandlingException_;
02203   }
02204 
02205   void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
02206     if(iMachine.get() != 0) {
02207       if(!iMachine->terminated()) {
02208         forceLooperToEnd_ = true;
02209         iMachine->process_event(statemachine::Stop());
02210         forceLooperToEnd_ = false;
02211       }
02212       else {
02213         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
02214       }
02215       if(iMachine->terminated()) {
02216         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02217       }
02218       iMachine.reset();
02219     }
02220   }
02221 }