CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/FWCore/Framework/src/EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011 
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036 
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038 
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046 
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049 
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057 
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060 
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063 
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069 
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072 
00073 //Used for forking
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080 
00081 //Used for CPU affinity
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085 
00086 namespace edm {
00087 
00088   namespace event_processor {
00089 
00090     class StateSentry {
00091     public:
00092       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00093       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00094       void succeeded() {success_ = true;}
00095 
00096     private:
00097       EventProcessor* ep_;
00098       bool success_;
00099     };
00100   }
00101 
00102   using namespace event_processor;
00103 
00104   namespace {
00105     template <typename T>
00106     class ScheduleSignalSentry {
00107     public:
00108       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00109            a_(a), principal_(principal), es_(es) {
00110         if (a_) T::preScheduleSignal(a_, principal_);
00111       }
00112       ~ScheduleSignalSentry() {
00113         if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00114       }
00115 
00116     private:
00117       // We own none of these resources.
00118       ActivityRegistry* a_;
00119       typename T::MyPrincipal* principal_;
00120       EventSetup const* es_;
00121     };
00122   }
00123 
00124   namespace {
00125 
00126     // the next two tables must be kept in sync with the state and
00127     // message enums from the header
00128 
00129     char const* stateNames[] = {
00130       "Init",
00131       "JobReady",
00132       "RunGiven",
00133       "Running",
00134       "Stopping",
00135       "ShuttingDown",
00136       "Done",
00137       "JobEnded",
00138       "Error",
00139       "ErrorEnded",
00140       "End",
00141       "Invalid"
00142     };
00143 
00144     char const* msgNames[] = {
00145       "SetRun",
00146       "Skip",
00147       "RunAsync",
00148       "Run(ID)",
00149       "Run(count)",
00150       "BeginJob",
00151       "StopAsync",
00152       "ShutdownAsync",
00153       "EndJob",
00154       "CountComplete",
00155       "InputExhausted",
00156       "StopSignal",
00157       "ShutdownSignal",
00158       "Finished",
00159       "Any",
00160       "dtor",
00161       "Exception",
00162       "Rewind"
00163     };
00164   }
00165     // IMPORTANT NOTE:
00166     // the mAny messages are special, they must appear last in the
00167     // table if multiple entries for a CurrentState are present.
00168     // the changeState function does not use the mAny yet!!!
00169 
00170     struct TransEntry {
00171       State current;
00172       Msg   message;
00173       State final;
00174     };
00175 
00176     // we should use this information to initialize a two dimensional
00177     // table of t[CurrentState][Message] = FinalState
00178 
00179     /*
00180       the way this is current written, the async run can thread function
00181       can return in the "JobReady" state - but not yet cleaned up.  The
00182       problem is that only when stop/shutdown async is called is the
00183       thread cleaned up. But the stop/shudown async functions attempt
00184       first to change the state using messages that are not valid in
00185       "JobReady" state.
00186 
00187       I think most of the problems can be solved by using two states
00188       for "running": RunningS and RunningA (sync/async). The problems
00189       seems to be the all the transitions out of running for both
00190       modes of operation.  The other solution might be to only go to
00191       "Stopping" from Running, and use the return code from "run_p" to
00192       set the final state.  If this is used, then in the sync mode the
00193       "Stopping" state would be momentary.
00194 
00195      */
00196 
00197     TransEntry table[] = {
00198     // CurrentState   Message         FinalState
00199     // -----------------------------------------
00200       { sInit,          mException,      sError },
00201       { sInit,          mBeginJob,       sJobReady },
00202       { sJobReady,      mException,      sError },
00203       { sJobReady,      mSetRun,         sRunGiven },
00204       { sJobReady,      mInputRewind,    sRunning },
00205       { sJobReady,      mSkip,           sRunning },
00206       { sJobReady,      mRunID,          sRunning },
00207       { sJobReady,      mRunCount,       sRunning },
00208       { sJobReady,      mEndJob,         sJobEnded },
00209       { sJobReady,      mBeginJob,       sJobReady },
00210       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00211 
00212       { sJobReady,      mStopAsync,      sJobReady },
00213       { sJobReady,      mCountComplete,  sJobReady },
00214       { sJobReady,      mFinished,       sJobReady },
00215 
00216       { sRunGiven,      mException,      sError },
00217       { sRunGiven,      mRunAsync,       sRunning },
00218       { sRunGiven,      mBeginJob,       sRunGiven },
00219       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00220       { sRunGiven,      mStopAsync,      sStopping },
00221       { sRunning,       mException,      sError },
00222       { sRunning,       mStopAsync,      sStopping },
00223       { sRunning,       mShutdownAsync,  sShuttingDown },
00224       { sRunning,       mShutdownSignal, sShuttingDown },
00225       { sRunning,       mCountComplete,  sStopping }, // sJobReady
00226       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00227 
00228       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00229       { sStopping,      mException,      sError },
00230       { sStopping,      mFinished,       sJobReady },
00231       { sStopping,      mCountComplete,  sJobReady },
00232       { sStopping,      mShutdownSignal, sShuttingDown },
00233       { sStopping,      mStopAsync,      sStopping },     // stay
00234       { sStopping,      mInputExhausted, sStopping },     // stay
00235       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00236       { sShuttingDown,  mException,      sError },
00237       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00238       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00239       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00240       { sShuttingDown,  mFinished,       sDone },
00241       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00242       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00243       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00244       { sDone,          mEndJob,         sJobEnded },
00245       { sDone,          mException,      sError },
00246       { sJobEnded,      mDtor,           sEnd },
00247       { sJobEnded,      mException,      sError },
00248       { sError,         mEndJob,         sError },   // funny one here
00249       { sError,         mDtor,           sError },   // funny one here
00250       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00251       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00252       { sInvalid,       mAny,            sInvalid }
00253     };
00254 
00255 
00256     // Note: many of the messages generate the mBeginJob message first
00257     //  mRunID, mRunCount, mSetRun
00258 
00259   // ---------------------------------------------------------------
00260   boost::shared_ptr<InputSource>
00261   makeInput(ParameterSet& params,
00262             CommonParams const& common,
00263             ProductRegistry& preg,
00264             PrincipalCache& pCache,
00265             boost::shared_ptr<ActivityRegistry> areg,
00266             boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00267     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00268     if(main_input == 0) {
00269       throw Exception(errors::Configuration)
00270         << "There must be exactly one source in the configuration.\n"
00271         << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00272     }
00273 
00274     std::string modtype(main_input->getParameter<std::string>("@module_type"));
00275 
00276     std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00277       ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00278     ConfigurationDescriptions descriptions(filler->baseType());
00279     filler->fill(descriptions);
00280 
00281     try {
00282       try {
00283         descriptions.validate(*main_input, std::string("source"));
00284       }
00285       catch (cms::Exception& e) { throw; }
00286       catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00287       catch (std::exception& e) { convertException::stdToEDM(e); }
00288       catch (std::string& s) { convertException::stringToEDM(s); }
00289       catch (char const* c) { convertException::charPtrToEDM(c); }
00290       catch (...) { convertException::unknownToEDM(); }
00291     }
00292     catch (cms::Exception & iException) {
00293       std::ostringstream ost;
00294       ost << "Validating configuration of input source of type " << modtype;
00295       iException.addContext(ost.str());
00296       throw;
00297     }
00298 
00299     main_input->registerIt();
00300 
00301     // Fill in "ModuleDescription", in case the input source produces
00302     // any EDproducts, which would be registered in the ProductRegistry.
00303     // Also fill in the process history item for this process.
00304     // There is no module label for the unnamed input source, so
00305     // just use "source".
00306     // Only the tracked parameters belong in the process configuration.
00307     ModuleDescription md(main_input->id(),
00308                          main_input->getParameter<std::string>("@module_type"),
00309                          "source",
00310                          processConfiguration.get());
00311 
00312     InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00313     areg->preSourceConstructionSignal_(md);
00314     boost::shared_ptr<InputSource> input;
00315     try {
00316       try {
00317         input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00318       }
00319       catch (cms::Exception& e) { throw; }
00320       catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00321       catch (std::exception& e) { convertException::stdToEDM(e); }
00322       catch (std::string& s) { convertException::stringToEDM(s); }
00323       catch (char const* c) { convertException::charPtrToEDM(c); }
00324       catch (...) { convertException::unknownToEDM(); }
00325     }
00326     catch (cms::Exception& iException) {
00327       areg->postSourceConstructionSignal_(md);
00328       std::ostringstream ost;
00329       ost << "Constructing input source of type " << modtype;
00330       iException.addContext(ost.str());
00331       throw;
00332     }
00333     areg->postSourceConstructionSignal_(md);
00334     return input;
00335   }
00336 
00337   // ---------------------------------------------------------------
00338   boost::shared_ptr<EDLooperBase>
00339   fillLooper(eventsetup::EventSetupsController& esController,
00340              eventsetup::EventSetupProvider& cp,
00341                          ParameterSet& params) {
00342     boost::shared_ptr<EDLooperBase> vLooper;
00343 
00344     std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00345 
00346     if(loopers.size() == 0) {
00347        return vLooper;
00348     }
00349 
00350     assert(1 == loopers.size());
00351 
00352     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00353         itName != itNameEnd;
00354         ++itName) {
00355 
00356       ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00357       providerPSet->registerIt();
00358       vLooper = eventsetup::LooperFactory::get()->addTo(esController,
00359                                                         cp,
00360                                                         *providerPSet);
00361       }
00362       return vLooper;
00363 
00364   }
00365 
00366   // ---------------------------------------------------------------
00367   EventProcessor::EventProcessor(std::string const& config,
00368                                 ServiceToken const& iToken,
00369                                 serviceregistry::ServiceLegacy iLegacy,
00370                                 std::vector<std::string> const& defaultServices,
00371                                 std::vector<std::string> const& forcedServices) :
00372     preProcessEventSignal_(),
00373     postProcessEventSignal_(),
00374     actReg_(),
00375     preg_(),
00376     serviceToken_(),
00377     input_(),
00378     espController_(new eventsetup::EventSetupsController),
00379     esp_(),
00380     act_table_(),
00381     processConfiguration_(),
00382     schedule_(),
00383     subProcess_(),
00384     historyAppender_(new HistoryAppender),
00385     state_(sInit),
00386     event_loop_(),
00387     state_lock_(),
00388     stop_lock_(),
00389     stopper_(),
00390     starter_(),
00391     stop_count_(-1),
00392     last_rc_(epSuccess),
00393     last_error_text_(),
00394     id_set_(false),
00395     event_loop_id_(),
00396     my_sig_num_(getSigNum()),
00397     fb_(),
00398     looper_(),
00399     machine_(),
00400     principalCache_(),
00401     shouldWeStop_(false),
00402     stateMachineWasInErrorState_(false),
00403     fileMode_(),
00404     emptyRunLumiMode_(),
00405     exceptionMessageFiles_(),
00406     exceptionMessageRuns_(),
00407     exceptionMessageLumis_(),
00408     alreadyHandlingException_(false),
00409     forceLooperToEnd_(false),
00410     looperBeginJobRun_(false),
00411     forceESCacheClearOnNewRun_(false),
00412     numberOfForkedChildren_(0),
00413     numberOfSequentialEventsPerChild_(1),
00414     setCpuAffinity_(false),
00415     eventSetupDataToExcludeFromPrefetching_() {
00416     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00417     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00418     processDesc->addServices(defaultServices, forcedServices);
00419     init(processDesc, iToken, iLegacy);
00420   }
00421 
00422   EventProcessor::EventProcessor(std::string const& config,
00423                                 std::vector<std::string> const& defaultServices,
00424                                 std::vector<std::string> const& forcedServices) :
00425     preProcessEventSignal_(),
00426     postProcessEventSignal_(),
00427     actReg_(),
00428     preg_(),
00429     serviceToken_(),
00430     input_(),
00431     espController_(new eventsetup::EventSetupsController),
00432     esp_(),
00433     act_table_(),
00434     processConfiguration_(),
00435     schedule_(),
00436     subProcess_(),
00437     historyAppender_(new HistoryAppender),
00438     state_(sInit),
00439     event_loop_(),
00440     state_lock_(),
00441     stop_lock_(),
00442     stopper_(),
00443     starter_(),
00444     stop_count_(-1),
00445     last_rc_(epSuccess),
00446     last_error_text_(),
00447     id_set_(false),
00448     event_loop_id_(),
00449     my_sig_num_(getSigNum()),
00450     fb_(),
00451     looper_(),
00452     machine_(),
00453     principalCache_(),
00454     shouldWeStop_(false),
00455     stateMachineWasInErrorState_(false),
00456     fileMode_(),
00457     emptyRunLumiMode_(),
00458     exceptionMessageFiles_(),
00459     exceptionMessageRuns_(),
00460     exceptionMessageLumis_(),
00461     alreadyHandlingException_(false),
00462     forceLooperToEnd_(false),
00463     looperBeginJobRun_(false),
00464     forceESCacheClearOnNewRun_(false),
00465     numberOfForkedChildren_(0),
00466     numberOfSequentialEventsPerChild_(1),
00467     setCpuAffinity_(false),
00468     eventSetupDataToExcludeFromPrefetching_() {
00469     boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00470     boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00471     processDesc->addServices(defaultServices, forcedServices);
00472     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00473   }
00474 
00475   EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00476                  ServiceToken const& token,
00477                  serviceregistry::ServiceLegacy legacy) :
00478     preProcessEventSignal_(),
00479     postProcessEventSignal_(),
00480     actReg_(),
00481     preg_(),
00482     serviceToken_(),
00483     input_(),
00484     espController_(new eventsetup::EventSetupsController),
00485     esp_(),
00486     act_table_(),
00487     processConfiguration_(),
00488     schedule_(),
00489     subProcess_(),
00490     historyAppender_(new HistoryAppender),
00491     state_(sInit),
00492     event_loop_(),
00493     state_lock_(),
00494     stop_lock_(),
00495     stopper_(),
00496     starter_(),
00497     stop_count_(-1),
00498     last_rc_(epSuccess),
00499     last_error_text_(),
00500     id_set_(false),
00501     event_loop_id_(),
00502     my_sig_num_(getSigNum()),
00503     fb_(),
00504     looper_(),
00505     machine_(),
00506     principalCache_(),
00507     shouldWeStop_(false),
00508     stateMachineWasInErrorState_(false),
00509     fileMode_(),
00510     emptyRunLumiMode_(),
00511     exceptionMessageFiles_(),
00512     exceptionMessageRuns_(),
00513     exceptionMessageLumis_(),
00514     alreadyHandlingException_(false),
00515     forceLooperToEnd_(false),
00516     looperBeginJobRun_(false),
00517     forceESCacheClearOnNewRun_(false),
00518     numberOfForkedChildren_(0),
00519     numberOfSequentialEventsPerChild_(1),
00520     setCpuAffinity_(false),
00521     eventSetupDataToExcludeFromPrefetching_() {
00522     init(processDesc, token, legacy);
00523   }
00524 
00525 
00526   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00527     preProcessEventSignal_(),
00528     postProcessEventSignal_(),
00529     actReg_(),
00530     preg_(),
00531     serviceToken_(),
00532     input_(),
00533     espController_(new eventsetup::EventSetupsController),
00534     esp_(),
00535     act_table_(),
00536     processConfiguration_(),
00537     schedule_(),
00538     subProcess_(),
00539     historyAppender_(new HistoryAppender),
00540     state_(sInit),
00541     event_loop_(),
00542     state_lock_(),
00543     stop_lock_(),
00544     stopper_(),
00545     starter_(),
00546     stop_count_(-1),
00547     last_rc_(epSuccess),
00548     last_error_text_(),
00549     id_set_(false),
00550     event_loop_id_(),
00551     my_sig_num_(getSigNum()),
00552     fb_(),
00553     looper_(),
00554     machine_(),
00555     principalCache_(),
00556     shouldWeStop_(false),
00557     stateMachineWasInErrorState_(false),
00558     fileMode_(),
00559     emptyRunLumiMode_(),
00560     exceptionMessageFiles_(),
00561     exceptionMessageRuns_(),
00562     exceptionMessageLumis_(),
00563     alreadyHandlingException_(false),
00564     forceLooperToEnd_(false),
00565     looperBeginJobRun_(false),
00566     forceESCacheClearOnNewRun_(false),
00567     numberOfForkedChildren_(0),
00568     numberOfSequentialEventsPerChild_(1),
00569     setCpuAffinity_(false),
00570     eventSetupDataToExcludeFromPrefetching_() {
00571     if(isPython) {
00572       boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00573       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00574       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00575     }
00576     else {
00577       boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00578       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00579     }
00580   }
00581 
00582   void
00583   EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00584                         ServiceToken const& iToken,
00585                         serviceregistry::ServiceLegacy iLegacy) {
00586 
00587     //std::cerr << processDesc->dump() << std::endl;
00588     // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
00589     //  They must be cleared here because some processes run multiple EventProcessors in succession.
00590     BranchIDListHelper::clearRegistries();
00591 
00592     boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00593     //std::cerr << parameterSet->dump() << std::endl;
00594 
00595     // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
00596     boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00597 
00598     // Now set some parameters specific to the main process.
00599     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00600     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00601     emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00602     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00603     ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00604     numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00605     numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00606     setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00607     std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00608     for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00609         itPS != itPSEnd;
00610         ++itPS) {
00611       eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00612                                                 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00613                                                                itPS->getUntrackedParameter<std::string>("label", "")));
00614     }
00615     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00616 
00617     // Now do general initialization
00618     ScheduleItems items;
00619 
00620     //initialize the services
00621     boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00622     ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00623     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00624 
00625     //make the services available
00626     ServiceRegistry::Operate operate(serviceToken_);
00627 
00628     // intialize miscellaneous items
00629     boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00630 
00631     // intialize the event setup provider
00632     esp_ = espController_->makeProvider(*parameterSet);
00633 
00634     // initialize the looper, if any
00635     looper_ = fillLooper(*espController_, *esp_, *parameterSet);
00636     if(looper_) {
00637       looper_->setActionTable(items.act_table_.get());
00638       looper_->attachTo(*items.actReg_);
00639     }
00640 
00641     // initialize the input source
00642     input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00643 
00644     // intialize the Schedule
00645     schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00646 
00647     // set the data members
00648     act_table_ = items.act_table_;
00649     actReg_ = items.actReg_;
00650     preg_ = items.preg_;
00651     processConfiguration_ = items.processConfiguration_;
00652 
00653     FDEBUG(2) << parameterSet << std::endl;
00654     connectSigs(this);
00655 
00656     // initialize the subprocess, if there is one
00657     if(subProcessParameterSet) {
00658       subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
00659     }
00660     espController_->clearComponents();
00661   }
00662 
00663   EventProcessor::~EventProcessor() {
00664     // Make the services available while everything is being deleted.
00665     ServiceToken token = getToken();
00666     ServiceRegistry::Operate op(token);
00667 
00668     // The state machine should have already been cleaned up
00669     // and destroyed at this point by a call to EndJob or
00670     // earlier when it completed processing events, but if it
00671     // has not been we'll take care of it here at the last moment.
00672     // This could cause problems if we are already handling an
00673     // exception and another one is thrown here ...  For a critical
00674     // executable the solution to this problem is for the code using
00675     // the EventProcessor to explicitly call EndJob or use runToCompletion,
00676     // then the next line of code is never executed.
00677     terminateMachine();
00678 
00679     try {
00680       changeState(mDtor);
00681     }
00682     catch(cms::Exception& e) {
00683       LogError("System")
00684         << e.explainSelf() << "\n";
00685     }
00686 
00687     // manually destroy all these thing that may need the services around
00688     espController_.reset();
00689     subProcess_.reset();
00690     esp_.reset();
00691     schedule_.reset();
00692     input_.reset();
00693     looper_.reset();
00694     actReg_.reset();
00695 
00696     pset::Registry* psetRegistry = pset::Registry::instance();
00697     psetRegistry->data().clear();
00698     psetRegistry->extra().setID(ParameterSetID());
00699 
00700     EntryDescriptionRegistry::instance()->data().clear();
00701     ParentageRegistry::instance()->data().clear();
00702     ProcessConfigurationRegistry::instance()->data().clear();
00703     ProcessHistoryRegistry::instance()->data().clear();
00704     BranchIDListHelper::clearRegistries();
00705   }
00706 
00707   void
00708   EventProcessor::rewind() {
00709     beginJob(); //make sure this was called
00710     changeState(mStopAsync);
00711     changeState(mInputRewind);
00712     {
00713       StateSentry toerror(this);
00714 
00715       //make the services available
00716       ServiceRegistry::Operate operate(serviceToken_);
00717 
00718       {
00719         input_->repeat();
00720         input_->rewind();
00721       }
00722       changeState(mCountComplete);
00723       toerror.succeeded();
00724     }
00725     changeState(mFinished);
00726   }
00727 
00728   EventProcessor::StatusCode
00729   EventProcessor::run(int numberEventsToProcess, bool) {
00730     return runEventCount(numberEventsToProcess);
00731   }
00732 
00733   EventProcessor::StatusCode
00734   EventProcessor::skip(int numberToSkip) {
00735     beginJob(); //make sure this was called
00736     changeState(mSkip);
00737     {
00738       StateSentry toerror(this);
00739 
00740       //make the services available
00741       ServiceRegistry::Operate operate(serviceToken_);
00742 
00743       {
00744         input_->skipEvents(numberToSkip);
00745       }
00746       changeState(mCountComplete);
00747       toerror.succeeded();
00748     }
00749     changeState(mFinished);
00750     return epSuccess;
00751   }
00752 
00753   void
00754   EventProcessor::beginJob() {
00755     if(state_ != sInit) return;
00756     bk::beginJob();
00757     // can only be run if in the initial state
00758     changeState(mBeginJob);
00759 
00760     // StateSentry toerror(this); // should we add this ?
00761     //make the services available
00762     ServiceRegistry::Operate operate(serviceToken_);
00763 
00764     //NOTE:  This implementation assumes 'Job' means one call
00765     // the EventProcessor::run
00766     // If it really means once per 'application' then this code will
00767     // have to be changed.
00768     // Also have to deal with case where have 'run' then new Module
00769     // added and do 'run'
00770     // again.  In that case the newly added Module needs its 'beginJob'
00771     // to be called.
00772 
00773     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
00774     //  For now we delay calling beginOfJob until first beginOfRun
00775     //if(looper_) {
00776     //   looper_->beginOfJob(es);
00777     //}
00778     try {
00779       try {
00780         input_->doBeginJob();
00781       }
00782       catch (cms::Exception& e) { throw; }
00783       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00784       catch (std::exception& e) { convertException::stdToEDM(e); }
00785       catch(std::string& s) { convertException::stringToEDM(s); }
00786       catch(char const* c) { convertException::charPtrToEDM(c); }
00787       catch (...) { convertException::unknownToEDM(); }
00788     }
00789     catch(cms::Exception& ex) {
00790       ex.addContext("Calling beginJob for the source");
00791       throw;
00792     }
00793     schedule_->beginJob();
00794     // toerror.succeeded(); // should we add this?
00795     if(hasSubProcess()) subProcess_->doBeginJob();
00796     actReg_->postBeginJobSignal_();
00797   }
00798 
00799   void
00800   EventProcessor::endJob() {
00801     // Collects exceptions, so we don't throw before all operations are performed.
00802     ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00803 
00804     // only allowed to run if state is sIdle, sJobReady, sRunGiven
00805     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00806 
00807     //make the services available
00808     ServiceRegistry::Operate operate(serviceToken_);
00809 
00810     c.call(boost::bind(&EventProcessor::terminateMachine, this));
00811     schedule_->endJob(c);
00812     if(hasSubProcess()) {
00813       c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00814     }
00815     c.call(boost::bind(&InputSource::doEndJob, input_));
00816     if(looper_) {
00817       c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00818     }
00819     c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00820     if(c.hasThrown()) {
00821       c.rethrow();
00822     }
00823   }
00824 
00825   ServiceToken
00826   EventProcessor::getToken() {
00827     return serviceToken_;
00828   }
00829 
00830   //Setup signal handler to listen for when forked children stop
00831   namespace {
00832     //These are volatile since the compiler can not be allowed to optimize them
00833     // since they can be modified in the signaller handler
00834     volatile bool child_failed = false;
00835     volatile unsigned int num_children_done = 0;
00836     volatile int child_fail_exit_status = 0;
00837     volatile int child_fail_signal = 0;
00838 
00839     extern "C" {
00840       void ep_sigchld(int, siginfo_t*, void*) {
00841         //printf("in sigchld\n");
00842         //FDEBUG(1) << "in sigchld handler\n";
00843         int stat_loc;
00844         pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00845         while(0<p) {
00846           //printf("  looping\n");
00847           if(WIFEXITED(stat_loc)) {
00848             ++num_children_done;
00849             if(0 != WEXITSTATUS(stat_loc)) {
00850               child_fail_exit_status = WEXITSTATUS(stat_loc);
00851               child_failed = true;
00852             }
00853           }
00854           if(WIFSIGNALED(stat_loc)) {
00855             ++num_children_done;
00856             child_fail_signal = WTERMSIG(stat_loc);
00857             child_failed = true;
00858           }
00859           p = waitpid(-1, &stat_loc, WNOHANG);
00860         }
00861       }
00862     }
00863 
00864   }
00865 
00866   enum {
00867     kChildSucceed,
00868     kChildExitBadly,
00869     kChildSegv,
00870     kMaxChildAction
00871   };
00872 
00873   namespace {
00874     unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00875       unsigned int n = 0;
00876       while(numberOfChildren != 0) {
00877         ++n;
00878         numberOfChildren /= 10;
00879       }
00880       if(n == 0) {
00881         n = 3; // Protect against zero numberOfChildren
00882       }
00883       return n;
00884     }
00885     
00886     /*This class embodied the thread which is used to listen to the forked children and
00887      then tell them which events they should process */
00888     class MessageSenderToSource {
00889     public:
00890       MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00891       void operator()();
00892 
00893     private:
00894       const std::vector<int>& m_childrenPipes;
00895       long const m_nEventsToProcess;
00896       fd_set m_socketSet;
00897       unsigned int m_aliveChildren;
00898       int m_maxFd;
00899     };
00900     
00901     MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00902                                                  std::vector<int> const& childrenPipes,
00903                                                  long iNEventsToProcess):
00904     m_childrenPipes(childrenPipes),
00905     m_nEventsToProcess(iNEventsToProcess),
00906     m_aliveChildren(childrenSockets.size()),
00907     m_maxFd(0)
00908     {
00909       FD_ZERO(&m_socketSet);
00910       for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00911            it != itEnd; it++) {
00912         FD_SET(*it, &m_socketSet);
00913         if (*it > m_maxFd) {
00914           m_maxFd = *it;
00915         }
00916       }
00917       for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00918            it != itEnd; ++it) {
00919         FD_SET(*it, &m_socketSet);
00920         if (*it > m_maxFd) {
00921           m_maxFd = *it;
00922         }
00923       }
00924       m_maxFd++; // select reads [0,m_maxFd).
00925     }
00926    
00927     /* This function is the heart of the communication between parent and child.
00928      * When ready for more data, the child (see MessageReceiverForSource) requests
00929      * data through a AF_UNIX socket message.  The parent will then assign the next
00930      * chunk of data by sending a message back.
00931      *
00932      * Additionally, this function also monitors the read-side of the pipe fd from the child.
00933      * If the child dies unexpectedly, the pipe will be selected as ready for read and
00934      * will return EPIPE when read from.  Further, if the child thinks the parent has died
00935      * (defined as waiting more than 1s for a response), it will write a single byte to
00936      * the pipe.  If the parent has died, the child will get a EPIPE and throw an exception.
00937      * If still alive, the parent will read the byte and ignore it.
00938      *
00939      * Note this function is complemented by the SIGCHLD handler above as currently only the SIGCHLD
00940      * handler can distinguish between success and failure cases.
00941      */
00942  
00943     void
00944     MessageSenderToSource::operator()() {
00945       multicore::MessageForParent childMsg;
00946       LogInfo("ForkingController") << "I am controller";
00947       //this is the master and therefore the controller
00948       
00949       multicore::MessageForSource sndmsg;
00950       sndmsg.startIndex = 0;
00951       sndmsg.nIndices = m_nEventsToProcess;
00952       do {
00953         
00954         fd_set readSockets, errorSockets;
00955         // Wait for a request from a child for events.
00956         memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00957         memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00958         // Note that we don't timeout; may be reconsidered in the future.
00959         ssize_t rc;
00960         while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00961         if (rc < 0) {
00962           std::cerr << "select failed; should be impossible due to preconditions.\n";
00963           abort();
00964           break;
00965         }
00966 
00967         // Read the message from the child.
00968         for (int idx=0; idx<m_maxFd; idx++) {
00969 
00970           // Handle errors
00971           if (FD_ISSET(idx, &errorSockets)) {
00972             LogInfo("ForkingController") << "Error on socket " << idx;
00973             FD_CLR(idx, &m_socketSet);
00974             close(idx);
00975             // See if it was the watchdog pipe that died.
00976             for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00977               if (*it == idx) {
00978                 m_aliveChildren--;
00979               }
00980             }
00981             continue;
00982           }
00983           
00984           if (!FD_ISSET(idx, &readSockets)) {
00985             continue;
00986           }
00987 
00988           // See if this FD is a child watchdog pipe.  If so, read from it to prevent
00989           // writes from blocking.
00990           bool is_pipe = false;
00991           for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00992               if (*it == idx) {
00993                 is_pipe = true;
00994                 char buf;
00995                 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00996                 if (rc <= 0) {
00997                   m_aliveChildren--;
00998                   FD_CLR(idx, &m_socketSet);
00999                   close(idx);
01000                 }
01001               }
01002           }
01003 
01004           // Only execute this block if the FD is a socket for sending the child work.
01005           if (!is_pipe) {
01006             while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
01007             if (rc < 0) {
01008               FD_CLR(idx, &m_socketSet);
01009               close(idx);
01010               continue;
01011             }
01012           
01013             // Tell the child what events to process.
01014             // If 'send' fails, then the child process has failed (any other possibilities are
01015             // eliminated because we are using fixed-size messages with Unix datagram sockets).
01016             // Thus, the SIGCHLD handler will fire and set child_fail = true.
01017             while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
01018             if (rc < 0) {
01019               FD_CLR(idx, &m_socketSet);
01020               close(idx);
01021               continue;
01022             }
01023             //std::cout << "Sent chunk starting at " << sndmsg.startIndex << " to child, length " << sndmsg.nIndices << std::endl;
01024             sndmsg.startIndex += sndmsg.nIndices;
01025           }
01026         }
01027       
01028       } while (m_aliveChildren > 0);
01029       
01030       return;
01031     }
01032 
01033   }
01034 
01035   bool
01036   EventProcessor::forkProcess(std::string const& jobReportFile) {
01037 
01038     if(0 == numberOfForkedChildren_) {return true;}
01039     assert(0<numberOfForkedChildren_);
01040     //do what we want done in common
01041     {
01042       beginJob(); //make sure this was run
01043       // make the services available
01044       ServiceRegistry::Operate operate(serviceToken_);
01045 
01046       InputSource::ItemType itemType;
01047       itemType = input_->nextItemType();
01048 
01049       assert(itemType == InputSource::IsFile);
01050       {
01051         readFile();
01052       }
01053       itemType = input_->nextItemType();
01054       assert(itemType == InputSource::IsRun);
01055 
01056       LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01057       IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01058                       input_->runAuxiliary()->beginTime());
01059       espController_->eventSetupForInstance(ts);
01060       EventSetup const& es = esp_->eventSetup();
01061 
01062       //now get all the data available in the EventSetup
01063       std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01064       es.fillAvailableRecordKeys(recordKeys);
01065       std::vector<eventsetup::DataKey> dataKeys;
01066       for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01067           itKey != itEnd;
01068           ++itKey) {
01069         eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01070         //see if this is on our exclusion list
01071         ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01072         ExcludedData const* excludedData(0);
01073         if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01074           excludedData = &(itExcludeRec->second);
01075           if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01076             //skip all items in this record
01077             continue;
01078           }
01079         }
01080         if(0 != recordPtr) {
01081           dataKeys.clear();
01082           recordPtr->fillRegisteredDataKeys(dataKeys);
01083           for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01084               itDataKey != itDataKeyEnd;
01085               ++itDataKey) {
01086             //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01087             if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01088               LogInfo("ForkingEventSetupPreFetching") << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01089               continue;
01090             }
01091             try {
01092               recordPtr->doGet(*itDataKey);
01093             } catch(cms::Exception& e) {
01094              LogWarning("ForkingEventSetupPreFetching") << e.what();
01095             }
01096           }
01097         }
01098       }
01099     }
01100     LogSystem("ForkingEventSetupPreFetching") <<"  done prefetching";
01101     {
01102       // make the services available
01103       ServiceRegistry::Operate operate(serviceToken_);
01104       Service<JobReport> jobReport;
01105       jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01106 
01107       //Now actually do the forking
01108       actReg_->preForkReleaseResourcesSignal_();
01109       input_->doPreForkReleaseResources();
01110       schedule_->preForkReleaseResources();
01111     }
01112     installCustomHandler(SIGCHLD, ep_sigchld);
01113 
01114 
01115     unsigned int childIndex = 0;
01116     unsigned int const kMaxChildren = numberOfForkedChildren_;
01117     unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01118     std::vector<pid_t> childrenIds;
01119     childrenIds.reserve(kMaxChildren);
01120     std::vector<int> childrenSockets;
01121     childrenSockets.reserve(kMaxChildren);
01122     std::vector<int> childrenPipes;
01123     childrenPipes.reserve(kMaxChildren);
01124     std::vector<int> childrenSocketsCopy;
01125     childrenSocketsCopy.reserve(kMaxChildren);
01126     std::vector<int> childrenPipesCopy;
01127     childrenPipesCopy.reserve(kMaxChildren);
01128     int pipes[2];
01129 
01130     {
01131       // make the services available
01132       ServiceRegistry::Operate operate(serviceToken_);
01133       Service<JobReport> jobReport;
01134       int sockets[2], fd_flags;
01135       for(; childIndex < kMaxChildren; ++childIndex) {
01136         // Create a UNIX_DGRAM socket pair
01137         if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01138           printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01139           exit(EXIT_FAILURE);
01140         }
01141         if (pipe(pipes)) {
01142           printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01143           exit(EXIT_FAILURE);
01144         }
01145         // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
01146         if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01147           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01148           exit(EXIT_FAILURE);
01149         }
01150         // Mark socket as non-block.  Child must be careful to do select prior
01151         // to reading from socket.
01152         if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01153           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01154           exit(EXIT_FAILURE);
01155         }
01156         if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01157           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01158           exit(EXIT_FAILURE);
01159         }
01160         if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01161           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01162           exit(EXIT_FAILURE);
01163         }
01164         // Linux man page notes there are some edge cases where reading from a
01165         // fd can block, even after a select.
01166         if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01167           printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01168           exit(EXIT_FAILURE);
01169         }
01170         if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01171           printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01172           exit(EXIT_FAILURE);
01173         }
01174 
01175         childrenPipesCopy = childrenPipes;
01176         childrenSocketsCopy = childrenSockets;
01177 
01178         pid_t value = fork();
01179         if(value == 0) {
01180           // Close the parent's side of the socket and pipe which will talk to us.
01181           close(pipes[0]);
01182           close(sockets[0]);
01183           // Close our copies of the parent's other communication pipes.
01184           for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01185             close(*it);
01186           }
01187           for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01188             close(*it);
01189           }
01190 
01191           // this is the child process, redirect stdout and stderr to a log file
01192           fflush(stdout);
01193           fflush(stderr);
01194           std::stringstream stout;
01195           stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01196           if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01197             LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01198           }
01199           if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01200             LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01201           }
01202 
01203           LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01204           if(setCpuAffinity_) {
01205             // CPU affinity is handled differently on macosx.
01206             // We disable it and print a message until someone reads:
01207             //
01208             // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
01209             //
01210             // and implements it.
01211 #ifdef __APPLE__
01212             LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01213 #else
01214             LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01215             cpu_set_t mask;
01216             CPU_ZERO(&mask);
01217             CPU_SET(childIndex, &mask);
01218             if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01219               LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01220               exit(-1);
01221             }
01222 #endif
01223           }
01224           break;
01225         } else {
01226           //this is the parent
01227           close(pipes[1]);
01228           close(sockets[1]);
01229         }
01230         if(value < 0) {
01231           LogError("ForkingChild") << "failed to create a child";
01232           exit(-1);
01233         }
01234         childrenIds.push_back(value);
01235         childrenSockets.push_back(sockets[0]);
01236         childrenPipes.push_back(pipes[0]);
01237       }
01238 
01239       if(childIndex < kMaxChildren) {
01240         jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01241         actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01242 
01243         boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01244         input_->doPostForkReacquireResources(receiver);
01245         schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01246         //NOTE: sources have to reset themselves by listening to the post fork message
01247         //rewindInput();
01248         return true;
01249       }
01250       jobReport->parentAfterFork(jobReportFile);
01251     }
01252 
01253     //this is the original, which is now the master for all the children
01254 
01255     //Need to wait for signals from the children or externally
01256     // To wait we must
01257     // 1) block the signals we want to wait on so we do not have a race condition
01258     // 2) check that we haven't already meet our ending criteria
01259     // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
01260     sigset_t blockingSigSet;
01261     sigset_t unblockingSigSet;
01262     sigset_t oldSigSet;
01263     pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01264     pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01265     sigaddset(&blockingSigSet, SIGCHLD);
01266     sigaddset(&blockingSigSet, SIGUSR2);
01267     sigaddset(&blockingSigSet, SIGINT);
01268     sigdelset(&unblockingSigSet, SIGCHLD);
01269     sigdelset(&unblockingSigSet, SIGUSR2);
01270     sigdelset(&unblockingSigSet, SIGINT);
01271     pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01272 
01273     // If there are too many fd's (unlikely, but possible) for select, denote this 
01274     // because the sender will fail.
01275     bool too_many_fds = false;
01276     if (pipes[1]+1 > FD_SETSIZE) {
01277       LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01278       too_many_fds = true;
01279     }
01280 
01281     //create a thread that sends the units of work to workers
01282     // we create it after all signals were blocked so that this
01283     // thread is never interupted by a signal
01284     MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01285     boost::thread senderThread(sender);
01286 
01287     while(!too_many_fds && !shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01288       sigsuspend(&unblockingSigSet);
01289       LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01290     }
01291     pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01292 
01293     LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01294     if(child_failed) {
01295       LogError("ForkingStopping") << "child failed";
01296     }
01297     if(shutdown_flag) {
01298       LogSystem("ForkingStopping") << "asked to shutdown";
01299     }
01300 
01301     if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01302       LogInfo("ForkingStopping") << "must stop children" << std::endl;
01303       for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01304           it != itEnd; ++it) {
01305         /* int result = */ kill(*it, SIGUSR2);
01306       }
01307       pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01308       while(num_children_done != kMaxChildren) {
01309         sigsuspend(&unblockingSigSet);
01310       }
01311       pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01312     }
01313     // The senderThread will notice the pipes die off, one by one.  Once all children are gone, it will exit.
01314     senderThread.join();
01315     if(child_failed) {
01316       if (child_fail_signal) {
01317         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01318       } else if (child_fail_exit_status) {
01319         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01320       } else {
01321         throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01322       }
01323     }
01324     if(too_many_fds) {
01325       throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01326     }
01327     return false;
01328   }
01329 
01330   void
01331   EventProcessor::connectSigs(EventProcessor* ep) {
01332     // When the FwkImpl signals are given, pass them to the
01333     // appropriate EventProcessor signals so that the outside world
01334     // can see the signal.
01335     actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01336     actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01337   }
01338 
01339   std::vector<ModuleDescription const*>
01340   EventProcessor::getAllModuleDescriptions() const {
01341     return schedule_->getAllModuleDescriptions();
01342   }
01343 
01344   int
01345   EventProcessor::totalEvents() const {
01346     return schedule_->totalEvents();
01347   }
01348 
01349   int
01350   EventProcessor::totalEventsPassed() const {
01351     return schedule_->totalEventsPassed();
01352   }
01353 
01354   int
01355   EventProcessor::totalEventsFailed() const {
01356     return schedule_->totalEventsFailed();
01357   }
01358 
01359   void
01360   EventProcessor::enableEndPaths(bool active) {
01361     schedule_->enableEndPaths(active);
01362   }
01363 
01364   bool
01365   EventProcessor::endPathsEnabled() const {
01366     return schedule_->endPathsEnabled();
01367   }
01368 
01369   void
01370   EventProcessor::getTriggerReport(TriggerReport& rep) const {
01371     schedule_->getTriggerReport(rep);
01372   }
01373 
01374   void
01375   EventProcessor::clearCounters() {
01376     schedule_->clearCounters();
01377   }
01378 
01379   char const* EventProcessor::currentStateName() const {
01380     return stateName(getState());
01381   }
01382 
01383   char const* EventProcessor::stateName(State s) const {
01384     return stateNames[s];
01385   }
01386 
01387   char const* EventProcessor::msgName(Msg m) const {
01388     return msgNames[m];
01389   }
01390 
01391   State EventProcessor::getState() const {
01392     return state_;
01393   }
01394 
01395   EventProcessor::StatusCode EventProcessor::statusAsync() const {
01396     // the thread will record exception/error status in the event processor
01397     // for us to look at and report here
01398     return last_rc_;
01399   }
01400 
01401   void
01402   EventProcessor::setRunNumber(RunNumber_t runNumber) {
01403     if(runNumber == 0) {
01404       runNumber = 1;
01405       LogWarning("Invalid Run")
01406         << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01407         << "Run number was set to 1 instead\n";
01408     }
01409 
01410     // inside of beginJob there is a check to see if it has been called before
01411     beginJob();
01412     changeState(mSetRun);
01413 
01414     // interface not correct yet
01415     input_->setRunNumber(runNumber);
01416   }
01417 
01418   void
01419   EventProcessor::declareRunNumber(RunNumber_t /*runNumber*/) {
01420     // inside of beginJob there is a check to see if it has been called before
01421     beginJob();
01422     changeState(mSetRun);
01423 
01424     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
01425     //input_->declareRunNumber(runNumber);
01426   }
01427 
01428   EventProcessor::StatusCode
01429   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01430     bool rc = true;
01431     boost::xtime timeout;
01432     boost::xtime_get(&timeout, boost::TIME_UTC);
01433     timeout.sec += timeout_seconds;
01434 
01435     // make sure to include a timeout here so we don't wait forever
01436     // I suspect there are still timing issues with thread startup
01437     // and the setting of the various control variables (stop_count, id_set)
01438     {
01439       boost::mutex::scoped_lock sl(stop_lock_);
01440 
01441       // look here - if runAsync not active, just return the last return code
01442       if(stop_count_ < 0) return last_rc_;
01443 
01444       if(timeout_seconds == 0) {
01445         while(stop_count_ == 0) stopper_.wait(sl);
01446       } else {
01447         while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01448       }
01449 
01450       if(rc == false) {
01451           // timeout occurred
01452           // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
01453           // this is a temporary hack until we get the input source
01454           // upgraded to allow blocking input sources to be unblocked
01455 
01456           // the next line is dangerous and causes all sorts of trouble
01457           if(id_set_) pthread_cancel(event_loop_id_);
01458 
01459           // we will not do anything yet
01460           LogWarning("timeout")
01461             << "An asynchronous request was made to shut down "
01462             << "the event loop "
01463             << "and the event loop did not shutdown after "
01464             << timeout_seconds << " seconds\n";
01465       } else {
01466           event_loop_->join();
01467           event_loop_.reset();
01468           id_set_ = false;
01469           stop_count_ = -1;
01470       }
01471     }
01472     return rc == false ? epTimedOut : last_rc_;
01473   }
01474 
01475   EventProcessor::StatusCode
01476   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01477     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01478     if(rc != epTimedOut) changeState(mCountComplete);
01479     else errorState();
01480     return rc;
01481   }
01482 
01483 
01484   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01485     changeState(mStopAsync);
01486     StatusCode rc = waitForAsyncCompletion(secs);
01487     if(rc != epTimedOut) changeState(mFinished);
01488     else errorState();
01489     return rc;
01490   }
01491 
01492   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01493     changeState(mShutdownAsync);
01494     StatusCode rc = waitForAsyncCompletion(secs);
01495     if(rc != epTimedOut) changeState(mFinished);
01496     else errorState();
01497     return rc;
01498   }
01499 
01500   void EventProcessor::errorState() {
01501     state_ = sError;
01502   }
01503 
01504   // next function irrelevant now
01505   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01506     // make sure to include a timeout here so we don't wait forever
01507     // I suspect there are still timing issues with thread startup
01508     // and the setting of the various control variables (stop_count, id_set)
01509     changeState(m);
01510     return waitForAsyncCompletion(60*2);
01511   }
01512 
01513   void EventProcessor::changeState(Msg msg) {
01514     // most likely need to serialize access to this routine
01515 
01516     boost::mutex::scoped_lock sl(state_lock_);
01517     State curr = state_;
01518     int rc;
01519     // found if(not end of table) and
01520     // (state == table.state && (msg == table.message || msg == any))
01521     for(rc = 0;
01522         table[rc].current != sInvalid &&
01523           (curr != table[rc].current ||
01524            (curr == table[rc].current &&
01525              msg != table[rc].message && table[rc].message != mAny));
01526         ++rc);
01527 
01528     if(table[rc].current == sInvalid)
01529       throw cms::Exception("BadState")
01530         << "A member function of EventProcessor has been called in an"
01531         << " inappropriate order.\n"
01532         << "Bad transition from " << stateName(curr) << " "
01533         << "using message " << msgName(msg) << "\n"
01534         << "No where to go from here.\n";
01535 
01536     FDEBUG(1) << "changeState: current=" << stateName(curr)
01537               << ", message=" << msgName(msg)
01538               << " -> new=" << stateName(table[rc].final) << "\n";
01539 
01540     state_ = table[rc].final;
01541   }
01542 
01543   void EventProcessor::runAsync() {
01544     beginJob();
01545     {
01546       boost::mutex::scoped_lock sl(stop_lock_);
01547       if(id_set_ == true) {
01548           std::string err("runAsync called while async event loop already running\n");
01549           LogError("FwkJob") << err;
01550           throw cms::Exception("BadState") << err;
01551       }
01552 
01553       changeState(mRunAsync);
01554 
01555       stop_count_ = 0;
01556       last_rc_ = epSuccess; // forget the last value!
01557       event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01558       boost::xtime timeout;
01559       boost::xtime_get(&timeout, boost::TIME_UTC);
01560       timeout.sec += 60; // 60 seconds to start!!!!
01561       if(starter_.timed_wait(sl, timeout) == false) {
01562           // yikes - the thread did not start
01563           throw cms::Exception("BadState")
01564             << "Async run thread did not start in 60 seconds\n";
01565       }
01566     }
01567   }
01568 
01569   void EventProcessor::asyncRun(EventProcessor* me) {
01570     // set up signals to allow for interruptions
01571     // ignore all other signals
01572     // make sure no exceptions escape out
01573 
01574     // temporary hack until we modify the input source to allow
01575     // wakeup calls from other threads.  This mimics the solution
01576     // in EventFilter/Processor, which I do not like.
01577     // allowing cancels means that the thread just disappears at
01578     // certain points.  This is bad for C++ stack variables.
01579     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01580     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
01581     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01582     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01583 
01584     {
01585       boost::mutex::scoped_lock(me->stop_lock_);
01586       me->event_loop_id_ = pthread_self();
01587       me->id_set_ = true;
01588       me->starter_.notify_all();
01589     }
01590 
01591     Status rc = epException;
01592     FDEBUG(2) << "asyncRun starting ......................\n";
01593 
01594     try {
01595       bool onlineStateTransitions = true;
01596       rc = me->runToCompletion(onlineStateTransitions);
01597     }
01598     catch (cms::Exception& e) {
01599       LogError("FwkJob") << "cms::Exception caught in "
01600                          << "EventProcessor::asyncRun"
01601                          << "\n"
01602                          << e.explainSelf();
01603       me->last_error_text_ = e.explainSelf();
01604     }
01605     catch (std::exception& e) {
01606       LogError("FwkJob") << "Standard library exception caught in "
01607                          << "EventProcessor::asyncRun"
01608                          << "\n"
01609                          << e.what();
01610       me->last_error_text_ = e.what();
01611     }
01612     catch (...) {
01613       LogError("FwkJob") << "Unknown exception caught in "
01614                          << "EventProcessor::asyncRun"
01615                          << "\n";
01616       me->last_error_text_ = "Unknown exception caught";
01617       rc = epOther;
01618     }
01619 
01620     me->last_rc_ = rc;
01621 
01622     {
01623       // notify anyone waiting for exit that we are doing so now
01624       boost::mutex::scoped_lock sl(me->stop_lock_);
01625       ++me->stop_count_;
01626       me->stopper_.notify_all();
01627     }
01628     FDEBUG(2) << "asyncRun ending ......................\n";
01629   }
01630 
01631 
01632   EventProcessor::StatusCode
01633   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01634 
01635     StateSentry toerror(this);
01636 
01637     int numberOfEventsToProcess = -1;
01638     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01639 
01640     if(machine_.get() != 0) {
01641       throw Exception(errors::LogicError)
01642         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01643         << "Please report this error to the Framework group\n";
01644     }
01645 
01646     toerror.succeeded();
01647 
01648     return returnCode;
01649   }
01650 
01651   EventProcessor::StatusCode
01652   EventProcessor::runEventCount(int numberOfEventsToProcess) {
01653 
01654     StateSentry toerror(this);
01655 
01656     bool onlineStateTransitions = false;
01657     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01658 
01659     toerror.succeeded();
01660 
01661     return returnCode;
01662   }
01663 
01664   EventProcessor::StatusCode
01665   EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01666 
01667     // Reusable event principal
01668     boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_, historyAppender_.get()));
01669     principalCache_.insert(ep);
01670 
01671     beginJob(); //make sure this was called
01672 
01673     if(!onlineStateTransitions) changeState(mRunCount);
01674 
01675     StatusCode returnCode = epSuccess;
01676     stateMachineWasInErrorState_ = false;
01677 
01678     // make the services available
01679     ServiceRegistry::Operate operate(serviceToken_);
01680 
01681     if(machine_.get() == 0) {
01682 
01683       statemachine::FileMode fileMode;
01684       if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01685       else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01686       else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01687       else {
01688          throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01689             << fileMode_ << ".\n"
01690             << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01691       }
01692 
01693       statemachine::EmptyRunLumiMode emptyRunLumiMode;
01694       if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01695       else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01696       else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01697       else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01698       else {
01699          throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01700             << emptyRunLumiMode_ << ".\n"
01701             << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01702       }
01703 
01704       machine_.reset(new statemachine::Machine(this,
01705                                                fileMode,
01706                                                emptyRunLumiMode));
01707 
01708       machine_->initiate();
01709     }
01710 
01711     try {
01712       try {
01713 
01714         InputSource::ItemType itemType;
01715 
01716         int iEvents = 0;
01717 
01718         while(true) {
01719 
01720           itemType = input_->nextItemType();
01721 
01722           FDEBUG(1) << "itemType = " << itemType << "\n";
01723 
01724           // These are used for asynchronous running only and
01725           // and are checking to see if stopAsync or shutdownAsync
01726           // were called from another thread.  In the future, we
01727           // may need to do something better than polling the state.
01728           // With the current code this is the simplest thing and
01729           // it should always work.  If the interaction between
01730           // threads becomes more complex this may cause problems.
01731           if(state_ == sStopping) {
01732             FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01733             forceLooperToEnd_ = true;
01734             machine_->process_event(statemachine::Stop());
01735             forceLooperToEnd_ = false;
01736             break;
01737           }
01738           else if(state_ == sShuttingDown) {
01739             FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01740             forceLooperToEnd_ = true;
01741             machine_->process_event(statemachine::Stop());
01742             forceLooperToEnd_ = false;
01743             break;
01744           }
01745 
01746           // Look for a shutdown signal
01747           {
01748             boost::mutex::scoped_lock sl(usr2_lock);
01749             if(shutdown_flag) {
01750               changeState(mShutdownSignal);
01751               returnCode = epSignal;
01752               forceLooperToEnd_ = true;
01753               machine_->process_event(statemachine::Stop());
01754               forceLooperToEnd_ = false;
01755               break;
01756             }
01757           }
01758 
01759           if(itemType == InputSource::IsStop) {
01760             machine_->process_event(statemachine::Stop());
01761           }
01762           else if(itemType == InputSource::IsFile) {
01763             machine_->process_event(statemachine::File());
01764           }
01765           else if(itemType == InputSource::IsRun) {
01766             machine_->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01767           }
01768           else if(itemType == InputSource::IsLumi) {
01769             machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01770           }
01771           else if(itemType == InputSource::IsEvent) {
01772             machine_->process_event(statemachine::Event());
01773             ++iEvents;
01774             if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01775               returnCode = epCountComplete;
01776               changeState(mInputExhausted);
01777               FDEBUG(1) << "Event count complete, pausing event loop\n";
01778               break;
01779             }
01780           }
01781           // This should be impossible
01782           else {
01783             throw Exception(errors::LogicError)
01784               << "Unknown next item type passed to EventProcessor\n"
01785               << "Please report this error to the Framework group\n";
01786           }
01787 
01788           if(machine_->terminated()) {
01789             changeState(mInputExhausted);
01790             break;
01791           }
01792         }  // End of loop over state machine events
01793       } // Try block
01794       catch (cms::Exception& e) { throw; }
01795       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01796       catch (std::exception& e) { convertException::stdToEDM(e); }
01797       catch(std::string& s) { convertException::stringToEDM(s); }
01798       catch(char const* c) { convertException::charPtrToEDM(c); }
01799       catch (...) { convertException::unknownToEDM(); }
01800     } // Try block
01801     // Some comments on exception handling related to the boost state machine:
01802     //
01803     // Some states used in the machine are special because they
01804     // perform actions while the machine is being terminated, actions
01805     // such as close files, call endRun, call endLumi etc ...  Each of these
01806     // states has two functions that perform these actions.  The functions
01807     // are almost identical.  The major difference is that one version
01808     // catches all exceptions and the other lets exceptions pass through.
01809     // The destructor catches them and the other function named "exit" lets
01810     // them pass through.  On a normal termination, boost will always call
01811     // "exit" and then the state destructor.  In our state classes, the
01812     // the destructors do nothing if the exit function already took
01813     // care of things.  Here's the interesting part.  When boost is
01814     // handling an exception the "exit" function is not called (a boost
01815     // feature).
01816     //
01817     // If an exception occurs while the boost machine is in control
01818     // (which usually means inside a process_event call), then
01819     // the boost state machine destroys its states and "terminates" itself.
01820     // This already done before we hit the catch blocks below. In this case
01821     // the call to terminateMachine below only destroys an already
01822     // terminated state machine.  Because exit is not called, the state destructors
01823     // handle cleaning up lumis, runs, and files.  The destructors swallow
01824     // all exceptions and only pass through the exceptions messages, which
01825     // are tacked onto the original exception below.
01826     //
01827     // If an exception occurs when the boost state machine is not
01828     // in control (outside the process_event functions), then boost
01829     // cannot destroy its own states.  The terminateMachine function
01830     // below takes care of that.  The flag "alreadyHandlingException"
01831     // is set true so that the state exit functions do nothing (and
01832     // cannot throw more exceptions while handling the first).  Then the
01833     // state destructors take care of this because exit did nothing.
01834     //
01835     // In both cases above, the EventProcessor::endOfLoop function is
01836     // not called because it can throw exceptions.
01837     //
01838     // One tricky aspect of the state machine is that things that can
01839     // throw should not be invoked by the state machine while another
01840     // exception is being handled.
01841     // Another tricky aspect is that it appears to be important to
01842     // terminate the state machine before invoking its destructor.
01843     // We've seen crashes that are not understood when that is not
01844     // done.  Maintainers of this code should be careful about this.
01845 
01846     catch (cms::Exception & e) {
01847       alreadyHandlingException_ = true;
01848       terminateMachine();
01849       alreadyHandlingException_ = false;
01850       if (!exceptionMessageLumis_.empty()) {
01851         e.addAdditionalInfo(exceptionMessageLumis_);
01852         if (e.alreadyPrinted()) {
01853           LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01854         }
01855       }
01856       if (!exceptionMessageRuns_.empty()) {
01857         e.addAdditionalInfo(exceptionMessageRuns_);
01858         if (e.alreadyPrinted()) {
01859           LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01860         }
01861       }
01862       if (!exceptionMessageFiles_.empty()) {
01863         e.addAdditionalInfo(exceptionMessageFiles_);
01864         if (e.alreadyPrinted()) {
01865           LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01866         }
01867       }
01868       throw;
01869     }
01870 
01871     if(machine_->terminated()) {
01872       FDEBUG(1) << "The state machine reports it has been terminated\n";
01873       machine_.reset();
01874     }
01875 
01876     if(!onlineStateTransitions) changeState(mFinished);
01877 
01878     if(stateMachineWasInErrorState_) {
01879       throw cms::Exception("BadState")
01880         << "The boost state machine in the EventProcessor exited after\n"
01881         << "entering the Error state.\n";
01882     }
01883 
01884     return returnCode;
01885   }
01886 
01887   void EventProcessor::readFile() {
01888     FDEBUG(1) << " \treadFile\n";
01889     fb_ = input_->readFile();
01890     if(numberOfForkedChildren_ > 0) {
01891         fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01892     }
01893   }
01894 
01895   void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
01896     if (fb_.get() != 0) {
01897       input_->closeFile(fb_, cleaningUpAfterException);
01898     }
01899     FDEBUG(1) << "\tcloseInputFile\n";
01900   }
01901 
01902   void EventProcessor::openOutputFiles() {
01903     if (fb_.get() != 0) {
01904       schedule_->openOutputFiles(*fb_);
01905       if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01906     }
01907     FDEBUG(1) << "\topenOutputFiles\n";
01908   }
01909 
01910   void EventProcessor::closeOutputFiles() {
01911     if (fb_.get() != 0) {
01912       schedule_->closeOutputFiles();
01913       if(hasSubProcess()) subProcess_->closeOutputFiles();
01914     }
01915     FDEBUG(1) << "\tcloseOutputFiles\n";
01916   }
01917 
01918   void EventProcessor::respondToOpenInputFile() {
01919     if (fb_.get() != 0) {
01920       schedule_->respondToOpenInputFile(*fb_);
01921       if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01922     }
01923     FDEBUG(1) << "\trespondToOpenInputFile\n";
01924   }
01925 
01926   void EventProcessor::respondToCloseInputFile() {
01927     if (fb_.get() != 0) {
01928       schedule_->respondToCloseInputFile(*fb_);
01929       if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01930     }
01931     FDEBUG(1) << "\trespondToCloseInputFile\n";
01932   }
01933 
01934   void EventProcessor::respondToOpenOutputFiles() {
01935     if (fb_.get() != 0) {
01936       schedule_->respondToOpenOutputFiles(*fb_);
01937       if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01938     }
01939     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01940   }
01941 
01942   void EventProcessor::respondToCloseOutputFiles() {
01943     if (fb_.get() != 0) {
01944       schedule_->respondToCloseOutputFiles(*fb_);
01945       if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01946     }
01947     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01948   }
01949 
01950   void EventProcessor::startingNewLoop() {
01951     shouldWeStop_ = false;
01952     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
01953     // until after we've called beginOfJob
01954     if(looper_ && looperBeginJobRun_) {
01955       looper_->doStartingNewLoop();
01956     }
01957     FDEBUG(1) << "\tstartingNewLoop\n";
01958   }
01959 
01960   bool EventProcessor::endOfLoop() {
01961     if(looper_) {
01962       ModuleChanger changer(schedule_.get());
01963       looper_->setModuleChanger(&changer);
01964       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01965       looper_->setModuleChanger(0);
01966       if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01967       else return false;
01968     }
01969     FDEBUG(1) << "\tendOfLoop\n";
01970     return true;
01971   }
01972 
01973   void EventProcessor::rewindInput() {
01974     input_->repeat();
01975     input_->rewind();
01976     FDEBUG(1) << "\trewind\n";
01977   }
01978 
01979   void EventProcessor::prepareForNextLoop() {
01980     looper_->prepareForNextLoop(esp_.get());
01981     FDEBUG(1) << "\tprepareForNextLoop\n";
01982   }
01983 
01984   bool EventProcessor::shouldWeCloseOutput() const {
01985     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01986     return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01987   }
01988 
01989   void EventProcessor::doErrorStuff() {
01990     FDEBUG(1) << "\tdoErrorStuff\n";
01991     LogError("StateMachine")
01992       << "The EventProcessor state machine encountered an unexpected event\n"
01993       << "and went to the error state\n"
01994       << "Will attempt to terminate processing normally\n"
01995       << "(IF using the looper the next loop will be attempted)\n"
01996       << "This likely indicates a bug in an input module or corrupted input or both\n";
01997     stateMachineWasInErrorState_ = true;
01998   }
01999 
02000   void EventProcessor::beginRun(statemachine::Run const& run) {
02001     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02002     input_->doBeginRun(runPrincipal);
02003     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
02004                     runPrincipal.beginTime());
02005     if(forceESCacheClearOnNewRun_){
02006       espController_->forceCacheClear();
02007     }
02008     espController_->eventSetupForInstance(ts);
02009     EventSetup const& es = esp_->eventSetup();
02010     if(looper_ && looperBeginJobRun_== false) {
02011       looper_->copyInfo(ScheduleInfo(schedule_.get()));
02012       looper_->beginOfJob(es);
02013       looperBeginJobRun_ = true;
02014       looper_->doStartingNewLoop();
02015     }
02016     {
02017       typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
02018       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02019       schedule_->processOneOccurrence<Traits>(runPrincipal, es);
02020       if(hasSubProcess()) {
02021         subProcess_->doBeginRun(runPrincipal, ts);
02022       }
02023     }
02024     FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
02025     if(looper_) {
02026       looper_->doBeginRun(runPrincipal, es);
02027     }
02028   }
02029 
02030   void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
02031     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02032     input_->doEndRun(runPrincipal, cleaningUpAfterException);
02033     IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
02034                     runPrincipal.endTime());
02035     espController_->eventSetupForInstance(ts);
02036     EventSetup const& es = esp_->eventSetup();
02037     {
02038       typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02039       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02040       schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
02041       if(hasSubProcess()) {
02042         subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
02043       }
02044     }
02045     FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02046     if(looper_) {
02047       looper_->doEndRun(runPrincipal, es);
02048     }
02049   }
02050 
02051   void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
02052     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02053     input_->doBeginLumi(lumiPrincipal);
02054 
02055     Service<RandomNumberGenerator> rng;
02056     if(rng.isAvailable()) {
02057       LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02058       rng->preBeginLumi(lb);
02059     }
02060 
02061     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
02062     // lumi blocks know their start and end times why not also start and end events?
02063     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02064     espController_->eventSetupForInstance(ts);
02065     EventSetup const& es = esp_->eventSetup();
02066     {
02067       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02068       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02069       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02070       if(hasSubProcess()) {
02071         subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02072       }
02073     }
02074     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02075     if(looper_) {
02076       looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02077     }
02078   }
02079 
02080   void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi, bool cleaningUpAfterException) {
02081     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02082     input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
02083     //NOTE: Using the max event number for the end of a lumi block is a bad idea
02084     // lumi blocks know their start and end times why not also start and end events?
02085     IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02086                     lumiPrincipal.endTime());
02087     espController_->eventSetupForInstance(ts);
02088     EventSetup const& es = esp_->eventSetup();
02089     {
02090       typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02091       ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02092       schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
02093       if(hasSubProcess()) {
02094         subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
02095       }
02096     }
02097     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02098     if(looper_) {
02099       looper_->doEndLuminosityBlock(lumiPrincipal, es);
02100     }
02101   }
02102 
02103   statemachine::Run EventProcessor::readAndCacheRun(bool merge) {
02104     input_->readAndCacheRun(merge, *historyAppender_);
02105     input_->markRun();
02106     return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02107   }
02108 
02109   int EventProcessor::readAndCacheLumi(bool merge) {
02110     input_->readAndCacheLumi(merge, *historyAppender_);
02111     input_->markLumi();
02112     return input_->luminosityBlock();
02113   }
02114 
02115   void EventProcessor::writeRun(statemachine::Run const& run) {
02116     schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02117     if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02118     FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02119   }
02120 
02121   void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02122     principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02123     if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02124     FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02125   }
02126 
02127   void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
02128     schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02129     if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02130     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02131   }
02132 
02133   void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
02134     principalCache_.deleteLumi(phid, run, lumi);
02135     if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02136     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02137   }
02138 
02139   void EventProcessor::readAndProcessEvent() {
02140     EventPrincipal *pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
02141     FDEBUG(1) << "\treadEvent\n";
02142     assert(pep != 0);
02143 
02144     IOVSyncValue ts(pep->id(), pep->time());
02145     espController_->eventSetupForInstance(ts);
02146     EventSetup const& es = esp_->eventSetup();
02147     {
02148       typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02149       ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02150       schedule_->processOneOccurrence<Traits>(*pep, es);
02151       if(hasSubProcess()) {
02152         subProcess_->doEvent(*pep, ts);
02153       }
02154     }
02155 
02156     if(looper_) {
02157       bool randomAccess = input_->randomAccess();
02158       ProcessingController::ForwardState forwardState = input_->forwardState();
02159       ProcessingController::ReverseState reverseState = input_->reverseState();
02160       ProcessingController pc(forwardState, reverseState, randomAccess);
02161 
02162       EDLooperBase::Status status = EDLooperBase::kContinue;
02163       do {
02164         status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02165 
02166         bool succeeded = true;
02167         if(randomAccess) {
02168           if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02169             input_->skipEvents(-2);
02170           }
02171           else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02172             succeeded = input_->goToEvent(pc.specifiedEventTransition());
02173           }
02174         }
02175         pc.setLastOperationSucceeded(succeeded);
02176       } while(!pc.lastOperationSucceeded());
02177       if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02178 
02179     }
02180 
02181     FDEBUG(1) << "\tprocessEvent\n";
02182     pep->clearEventPrincipal();
02183   }
02184 
02185   bool EventProcessor::shouldWeStop() const {
02186     FDEBUG(1) << "\tshouldWeStop\n";
02187     if(shouldWeStop_) return true;
02188     return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02189   }
02190 
02191   void EventProcessor::setExceptionMessageFiles(std::string& message) {
02192     exceptionMessageFiles_ = message;
02193   }
02194 
02195   void EventProcessor::setExceptionMessageRuns(std::string& message) {
02196     exceptionMessageRuns_ = message;
02197   }
02198 
02199   void EventProcessor::setExceptionMessageLumis(std::string& message) {
02200     exceptionMessageLumis_ = message;
02201   }
02202 
02203   bool EventProcessor::alreadyHandlingException() const {
02204     return alreadyHandlingException_;
02205   }
02206 
02207   void EventProcessor::terminateMachine() {
02208     if(machine_.get() != 0) {
02209       if(!machine_->terminated()) {
02210         forceLooperToEnd_ = true;
02211         machine_->process_event(statemachine::Stop());
02212         forceLooperToEnd_ = false;
02213       }
02214       else {
02215         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
02216       }
02217       if(machine_->terminated()) {
02218         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02219       }
02220       machine_.reset();
02221     }
02222   }
02223 }