CMS 3D CMS Logo

EventProcessor.cc

Go to the documentation of this file.
00001 
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003 
00004 #include <exception>
00005 #include <utility>
00006 #include <iostream>
00007 #include <iomanip>
00008 
00009 #include "boost/bind.hpp"
00010 #include "boost/thread/xtime.hpp"
00011 
00012 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
00013 #include "DataFormats/Provenance/interface/BranchType.h"
00014 #include "FWCore/Utilities/interface/DebugMacros.h"
00015 #include "FWCore/Utilities/interface/EDMException.h"
00016 #include "FWCore/Utilities/interface/GetReleaseVersion.h"
00017 #include "FWCore/Utilities/interface/GetPassID.h"
00018 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00019 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00020 
00021 #include "FWCore/Framework/interface/IOVSyncValue.h"
00022 #include "FWCore/Framework/interface/SourceFactory.h"
00023 #include "FWCore/Framework/interface/ModuleFactory.h"
00024 #include "FWCore/Framework/interface/LooperFactory.h"
00025 #include "FWCore/Framework/interface/EventPrincipal.h"
00026 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00027 #include "FWCore/Framework/interface/RunPrincipal.h"
00028 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00029 #include "FWCore/Framework/interface/TriggerNamesService.h"
00030 #include "FWCore/Framework/interface/InputSourceDescription.h"
00031 #include "FWCore/Framework/interface/EventSetupProvider.h"
00032 #include "FWCore/Framework/interface/InputSource.h"
00033 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00034 
00035 #include "FWCore/Framework/src/Breakpoints.h"
00036 #include "FWCore/Framework/src/InputSourceFactory.h"
00037 
00038 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00039 #include "FWCore/ParameterSet/interface/PythonProcessDesc.h"
00040 
00041 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00042 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00043 
00044 #include "FWCore/Framework/interface/Schedule.h"
00045 #include "FWCore/Framework/interface/EDLooper.h"
00046 
00047 #include "FWCore/Framework/src/EPStates.h"
00048 
00049 using boost::shared_ptr;
00050 using edm::serviceregistry::ServiceLegacy; 
00051 using edm::serviceregistry::kOverlapIsError;
00052 
00053 namespace edm {
00054 
00055   namespace event_processor {
00056 
00057     class StateSentry
00058     {
00059     public:
00060       StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00061       ~StateSentry() {if(!success_) ep_->changeState(mException);}
00062       void succeeded() {success_ = true;}
00063 
00064     private:
00065       EventProcessor* ep_;
00066       bool success_;
00067     };
00068   }
00069 
00070   using namespace event_processor;
00071   using namespace edm::service;
00072 
00073   namespace {
00074 
00075     // the next two tables must be kept in sync with the state and
00076     // message enums from the header
00077 
00078     char const* stateNames[] = {
00079       "Init",
00080       "JobReady",
00081       "RunGiven",
00082       "Running",
00083       "Stopping",
00084       "ShuttingDown",
00085       "Done",
00086       "JobEnded",
00087       "Error",
00088       "ErrorEnded",
00089       "End",
00090       "Invalid"
00091     };
00092 
00093     char const* msgNames[] = {
00094       "SetRun",
00095       "Skip",
00096       "RunAsync",
00097       "Run(ID)",
00098       "Run(count)",
00099       "BeginJob",
00100       "StopAsync",
00101       "ShutdownAsync",
00102       "EndJob",
00103       "CountComplete",
00104       "InputExhausted",
00105       "StopSignal",
00106       "ShutdownSignal",
00107       "Finished",
00108       "Any",
00109       "dtor",
00110       "Exception",
00111       "Rewind"
00112     };
00113   }
00114     // IMPORTANT NOTE:
00115     // the mAny messages are special, they must appear last in the
00116     // table if multiple entries for a CurrentState are present.
00117     // the changeState function does not use the mAny yet!!!
00118 
00119     struct TransEntry
00120     {
00121       State current;
00122       Msg   message;
00123       State final;
00124     };
00125 
00126     // we should use this information to initialize a two dimensional
00127     // table of t[CurrentState][Message] = FinalState
00128 
00129     /*
00130       the way this is current written, the async run can thread function
00131       can return in the "JobReady" state - but not yet cleaned up.  The
00132       problem is that only when stop/shutdown async is called is the 
00133       thread cleaned up. But the stop/shudown async functions attempt
00134       first to change the state using messages that are not valid in
00135       "JobReady" state.
00136 
00137       I think most of the problems can be solved by using two states
00138       for "running": RunningS and RunningA (sync/async). The problems
00139       seems to be the all the transitions out of running for both
00140       modes of operation.  The other solution might be to only go to
00141       "Stopping" from Running, and use the return code from "run_p" to
00142       set the final state.  If this is used, then in the sync mode the
00143       "Stopping" state would be momentary.
00144 
00145      */
00146 
00147     TransEntry table[] = {
00148     // CurrentState   Message         FinalState
00149     // -----------------------------------------
00150       { sInit,          mException,      sError },
00151       { sInit,          mBeginJob,       sJobReady },
00152       { sJobReady,      mException,      sError },
00153       { sJobReady,      mSetRun,         sRunGiven },
00154       { sJobReady,      mInputRewind,    sRunning },
00155       { sJobReady,      mSkip,           sRunning },
00156       { sJobReady,      mRunID,          sRunning },
00157       { sJobReady,      mRunCount,       sRunning },
00158       { sJobReady,      mEndJob,         sJobEnded },
00159       { sJobReady,      mBeginJob,       sJobReady },
00160       { sJobReady,      mDtor,           sEnd },    // should this be allowed?
00161 
00162       { sJobReady,      mStopAsync,      sJobReady },
00163       { sJobReady,      mCountComplete,  sJobReady },
00164       { sJobReady,      mFinished,       sJobReady },
00165 
00166       { sRunGiven,      mException,      sError },
00167       { sRunGiven,      mRunAsync,       sRunning },
00168       { sRunGiven,      mBeginJob,       sRunGiven },
00169       { sRunGiven,      mShutdownAsync,  sShuttingDown },
00170       { sRunGiven,      mStopAsync,      sStopping },
00171       { sRunning,       mException,      sError },
00172       { sRunning,       mStopAsync,      sStopping },
00173       { sRunning,       mShutdownAsync,  sShuttingDown },
00174       { sRunning,       mShutdownSignal, sShuttingDown },
00175       { sRunning,       mCountComplete,  sStopping }, // sJobReady 
00176       { sRunning,       mInputExhausted, sStopping }, // sJobReady
00177 
00178       { sStopping,      mInputRewind,    sRunning }, // The looper needs this
00179       { sStopping,      mException,      sError },
00180       { sStopping,      mFinished,       sJobReady },
00181       { sStopping,      mCountComplete,  sJobReady },
00182       { sStopping,      mShutdownSignal, sShuttingDown },
00183       { sStopping,      mStopAsync,      sStopping },     // stay
00184       { sStopping,      mInputExhausted, sStopping },     // stay
00185       //{ sStopping,      mAny,            sJobReady },     // <- ??????
00186       { sShuttingDown,  mException,      sError },
00187       { sShuttingDown,  mShutdownSignal, sShuttingDown },
00188       { sShuttingDown,  mCountComplete,  sDone }, // needed?
00189       { sShuttingDown,  mInputExhausted, sDone }, // needed?
00190       { sShuttingDown,  mFinished,       sDone },
00191       //{ sShuttingDown,  mShutdownAsync,  sShuttingDown }, // only one at
00192       //{ sShuttingDown,  mStopAsync,      sShuttingDown }, // a time
00193       //{ sShuttingDown,  mAny,            sDone },         // <- ??????
00194       { sDone,          mEndJob,         sJobEnded },
00195       { sDone,          mException,      sError },
00196       { sJobEnded,      mDtor,           sEnd },
00197       { sJobEnded,      mException,      sError },
00198       { sError,         mEndJob,         sError },   // funny one here
00199       { sError,         mDtor,           sError },   // funny one here
00200       { sInit,          mDtor,           sEnd },     // for StorM dummy EP
00201       { sStopping,      mShutdownAsync,  sShuttingDown }, // For FUEP tests
00202       { sInvalid,       mAny,            sInvalid }
00203     };
00204 
00205 
00206     // Note: many of the messages generate the mBeginJob message first 
00207     //  mRunID, mRunCount, mSetRun
00208 
00209   // ---------------------------------------------------------------
00210   shared_ptr<InputSource> 
00211   makeInput(ParameterSet const& params,
00212             EventProcessor::CommonParams const& common,
00213             ProductRegistry& preg,
00214             boost::shared_ptr<ActivityRegistry> areg)
00215   {
00216     // find single source
00217     bool sourceSpecified = false;
00218     try {
00219       ParameterSet main_input =
00220         params.getParameter<ParameterSet>("@main_input");
00221       
00222       // Fill in "ModuleDescription", in case the input source produces
00223       // any EDproducts,which would be registered in the ProductRegistry.
00224       // Also fill in the process history item for this process.
00225       ModuleDescription md;
00226       md.parameterSetID_ = main_input.id();
00227       md.moduleName_ =
00228         main_input.getParameter<std::string>("@module_type");
00229       // There is no module label for the unnamed input source, so 
00230       // just use "source".
00231       md.moduleLabel_ = "source";
00232       md.processConfiguration_ = ProcessConfiguration(common.processName_,
00233                                 params.id(), getReleaseVersion(), getPassID());
00234 
00235       sourceSpecified = true;
00236       InputSourceDescription isdesc(md, preg, areg, common.maxEventsInput_, common.maxLumisInput_);
00237       areg->preSourceConstructionSignal_(md);
00238       shared_ptr<InputSource> input(InputSourceFactory::get()->makeInputSource(main_input, isdesc).release());
00239       areg->postSourceConstructionSignal_(md);
00240       
00241       return input;
00242     } 
00243     catch(edm::Exception const& iException) {
00244         if(sourceSpecified == false && 
00245            errors::Configuration == iException.categoryCode()) {
00246             throw edm::Exception(errors::Configuration, "FailedInputSource")
00247               << "Configuration of main input source has failed\n"
00248               << iException;
00249         } else {
00250             throw;
00251         }
00252     }
00253     return shared_ptr<InputSource>();
00254   }
00255   
00256   // ---------------------------------------------------------------
00257   static
00258   std::auto_ptr<eventsetup::EventSetupProvider>
00259   makeEventSetupProvider(ParameterSet const& params)
00260   {
00261     using namespace edm::eventsetup;
00262     std::vector<std::string> prefers =
00263       params.getParameter<std::vector<std::string> >("@all_esprefers");
00264 
00265     if(prefers.empty()) {
00266       return std::auto_ptr<EventSetupProvider>(new EventSetupProvider());
00267     }
00268 
00269     EventSetupProvider::PreferredProviderInfo preferInfo;
00270     EventSetupProvider::RecordToDataMap recordToData;
00271 
00272     //recordToData.insert(std::make_pair(std::string("DummyRecord"),
00273     //      std::make_pair(std::string("DummyData"),std::string())));
00274     //preferInfo[ComponentDescription("DummyProxyProvider","",false)]=
00275     //      recordToData;
00276 
00277     for(std::vector<std::string>::iterator itName = prefers.begin(), itNameEnd = prefers.end();
00278         itName != itNameEnd;
00279         ++itName) 
00280       {
00281         recordToData.clear();
00282         ParameterSet preferPSet = params.getParameter<ParameterSet>(*itName);
00283         std::vector<std::string> recordNames = preferPSet.getParameterNames();
00284         for(std::vector<std::string>::iterator itRecordName = recordNames.begin(),
00285             itRecordNameEnd = recordNames.end();
00286             itRecordName != itRecordNameEnd;
00287             ++itRecordName) {
00288 
00289           if((*itRecordName)[0] == '@') {
00290             //this is a 'hidden parameter' so skip it
00291             continue;
00292           }
00293 
00294           //this should be a record name with its info
00295           try {
00296             std::vector<std::string> dataInfo =
00297               preferPSet.getParameter<std::vector<std::string> >(*itRecordName);
00298             
00299             if(dataInfo.empty()) {
00300               //FUTURE: empty should just mean all data
00301               throw edm::Exception(errors::Configuration)
00302                 << "The record named "
00303                 << *itRecordName << " specifies no data items";
00304             }
00305             //FUTURE: 'any' should be a special name
00306             for(std::vector<std::string>::iterator itDatum = dataInfo.begin(),
00307                 itDatumEnd = dataInfo.end();
00308                 itDatum != itDatumEnd;
00309                 ++itDatum){
00310               std::string datumName(*itDatum, 0, itDatum->find_first_of("/"));
00311               std::string labelName;
00312 
00313               if(itDatum->size() != datumName.size()) {
00314                 labelName = std::string(*itDatum, datumName.size()+1);
00315               }
00316               recordToData.insert(std::make_pair(std::string(*itRecordName),
00317                                                  std::make_pair(datumName,
00318                                                                 labelName)));
00319             }
00320           } catch(cms::Exception const& iException) {
00321             cms::Exception theError("ESPreferConfigurationError");
00322             theError << "While parsing the es_prefer statement for type="
00323                      << preferPSet.getParameter<std::string>("@module_type")
00324                      << " label=\""
00325                      << preferPSet.getParameter<std::string>("@module_label")
00326                      << "\" an error occurred.";
00327             theError.append(iException);
00328             throw theError;
00329           }
00330         }
00331         preferInfo[ComponentDescription(preferPSet.getParameter<std::string>("@module_type"),
00332                                         preferPSet.getParameter<std::string>("@module_label"),
00333                                         false)]
00334           = recordToData;
00335       }
00336     return std::auto_ptr<EventSetupProvider>(new EventSetupProvider(&preferInfo));
00337   }
00338   
00339   // ---------------------------------------------------------------
00340   void 
00341   fillEventSetupProvider(edm::eventsetup::EventSetupProvider& cp,
00342                          ParameterSet const& params,
00343                          EventProcessor::CommonParams const& common)
00344   {
00345     using namespace edm::eventsetup;
00346     std::vector<std::string> providers =
00347       params.getParameter<std::vector<std::string> >("@all_esmodules");
00348 
00349     for(std::vector<std::string>::iterator itName = providers.begin(), itNameEnd = providers.end();
00350         itName != itNameEnd;
00351         ++itName) 
00352       {
00353         ParameterSet providerPSet = params.getParameter<ParameterSet>(*itName);
00354         ModuleFactory::get()->addTo(cp, 
00355                                     providerPSet, 
00356                                     common.processName_, 
00357                                     common.releaseVersion_, 
00358                                     common.passID_);
00359       }
00360     
00361     std::vector<std::string> sources = 
00362       params.getParameter<std::vector<std::string> >("@all_essources");
00363 
00364     for(std::vector<std::string>::iterator itName = sources.begin(), itNameEnd = sources.end();
00365         itName != itNameEnd;
00366         ++itName) 
00367       {
00368         ParameterSet providerPSet = params.getParameter<ParameterSet>(*itName);
00369         SourceFactory::get()->addTo(cp, 
00370                                     providerPSet, 
00371                                     common.processName_, 
00372                                     common.releaseVersion_, 
00373                                     common.passID_);
00374     }
00375   }
00376 
00377   // ---------------------------------------------------------------
00378   boost::shared_ptr<edm::EDLooper> 
00379   fillLooper(edm::eventsetup::EventSetupProvider& cp,
00380                          ParameterSet const& params,
00381                          EventProcessor::CommonParams const& common)
00382   {
00383     using namespace edm::eventsetup;
00384     boost::shared_ptr<edm::EDLooper> vLooper;
00385     
00386     std::vector<std::string> loopers =
00387       params.getParameter<std::vector<std::string> >("@all_loopers");
00388 
00389     if(loopers.size() == 0) {
00390        return vLooper;
00391     }
00392    
00393     assert(1 == loopers.size());
00394 
00395     for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00396         itName != itNameEnd;
00397         ++itName) 
00398       {
00399         ParameterSet providerPSet = params.getParameter<ParameterSet>(*itName);
00400         vLooper = LooperFactory::get()->addTo(cp, 
00401                                     providerPSet, 
00402                                     common.processName_, 
00403                                     common.releaseVersion_, 
00404                                     common.passID_);
00405       }
00406       return vLooper;
00407     
00408   }
00409 
00410   // ---------------------------------------------------------------
00411   EventProcessor::EventProcessor(std::string const& config,
00412                                 ServiceToken const& iToken, 
00413                                 serviceregistry::ServiceLegacy iLegacy,
00414                                 std::vector<std::string> const& defaultServices,
00415                                 std::vector<std::string> const& forcedServices) :
00416     preProcessEventSignal_(),
00417     postProcessEventSignal_(),
00418     maxEventsPset_(),
00419     maxLumisPset_(),
00420     actReg_(new ActivityRegistry),
00421     wreg_(actReg_),
00422     preg_(),
00423     serviceToken_(),
00424     input_(),
00425     schedule_(),
00426     esp_(),
00427     act_table_(),
00428     state_(sInit),
00429     event_loop_(),
00430     state_lock_(),
00431     stop_lock_(),
00432     stopper_(),
00433     stop_count_(-1),
00434     last_rc_(epSuccess),
00435     last_error_text_(),
00436     id_set_(false),
00437     event_loop_id_(),
00438     my_sig_num_(getSigNum()),
00439     fb_(),
00440     looper_(),
00441     shouldWeStop_(false),
00442     alreadyHandlingException_(false),
00443     forceLooperToEnd_(false)
00444   {
00445     boost::shared_ptr<edm::ProcessDesc> processDesc(new edm::ProcessDesc(config));
00446     processDesc->addServices(defaultServices, forcedServices);
00447     init(processDesc, iToken, iLegacy);
00448   }
00449 
00450   EventProcessor::EventProcessor(std::string const& config,
00451                                 std::vector<std::string> const& defaultServices,
00452                                 std::vector<std::string> const& forcedServices) :
00453     preProcessEventSignal_(),
00454     postProcessEventSignal_(),
00455     maxEventsPset_(),
00456     maxLumisPset_(),
00457     actReg_(new ActivityRegistry),
00458     wreg_(actReg_),
00459     preg_(),
00460     serviceToken_(),
00461     input_(),
00462     schedule_(),
00463     esp_(),
00464     act_table_(),
00465     state_(sInit),
00466     event_loop_(),
00467     state_lock_(),
00468     stop_lock_(),
00469     stopper_(),
00470     stop_count_(-1),
00471     last_rc_(epSuccess),
00472     last_error_text_(),
00473     id_set_(false),
00474     event_loop_id_(),
00475     my_sig_num_(getSigNum()),
00476     fb_(),
00477     looper_(),
00478     shouldWeStop_(false),
00479     alreadyHandlingException_(false),
00480     forceLooperToEnd_(false)
00481   {
00482     boost::shared_ptr<edm::ProcessDesc> processDesc(new edm::ProcessDesc(config));
00483     processDesc->addServices(defaultServices, forcedServices);
00484     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00485   }
00486 
00487   EventProcessor::EventProcessor(boost::shared_ptr<edm::ProcessDesc> & processDesc,
00488                  ServiceToken const& token,
00489                  serviceregistry::ServiceLegacy legacy) :
00490     preProcessEventSignal_(),
00491     postProcessEventSignal_(),
00492     maxEventsPset_(),
00493     maxLumisPset_(),
00494     actReg_(new ActivityRegistry),
00495     wreg_(actReg_),
00496     preg_(),
00497     serviceToken_(),
00498     input_(),
00499     schedule_(),
00500     esp_(),
00501     act_table_(),
00502     state_(sInit),
00503     event_loop_(),
00504     state_lock_(),
00505     stop_lock_(),
00506     stopper_(),
00507     stop_count_(-1),
00508     last_rc_(epSuccess),
00509     last_error_text_(),
00510     id_set_(false),
00511     event_loop_id_(),
00512     my_sig_num_(getSigNum()),
00513     fb_(),
00514     looper_(),
00515     shouldWeStop_(false),
00516     alreadyHandlingException_(false),
00517     forceLooperToEnd_(false)
00518   {
00519     init(processDesc, token, legacy);
00520   }
00521 
00522 
00523   EventProcessor::EventProcessor(std::string const& config, bool isPython):
00524     preProcessEventSignal_(),
00525     postProcessEventSignal_(),
00526     maxEventsPset_(),
00527     maxLumisPset_(),
00528     actReg_(new ActivityRegistry),
00529     wreg_(actReg_),
00530     preg_(),
00531     serviceToken_(),
00532     input_(),
00533     schedule_(),
00534     esp_(),
00535     act_table_(),
00536     state_(sInit),
00537     event_loop_(),
00538     state_lock_(),
00539     stop_lock_(),
00540     stopper_(),
00541     stop_count_(-1),
00542     last_rc_(epSuccess),
00543     last_error_text_(),
00544     id_set_(false),
00545     event_loop_id_(),
00546     my_sig_num_(getSigNum()),
00547     fb_(),
00548     looper_(),
00549     shouldWeStop_(false),
00550     alreadyHandlingException_(false),
00551     forceLooperToEnd_(false)
00552   {
00553     if(isPython)
00554     {
00555       boost::shared_ptr<edm::ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00556       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00557     }
00558     else
00559     {
00560       boost::shared_ptr<edm::ProcessDesc> processDesc(new edm::ProcessDesc(config));
00561       init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00562     }
00563   }
00564 
00565   void
00566   EventProcessor::init(boost::shared_ptr<edm::ProcessDesc> & processDesc,
00567                         ServiceToken const& iToken, 
00568                         serviceregistry::ServiceLegacy iLegacy) {
00569     // TODO: Fix const-correctness. The ParameterSets that are
00570     // returned here should be const, so that we can be sure they are
00571     // not modified.
00572 
00573     shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00574 
00575     ParameterSet optionsPset(parameterSet->getUntrackedParameter<ParameterSet>("options", ParameterSet()));
00576     fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00577     handleEmptyRuns_ = optionsPset.getUntrackedParameter<bool>("handleEmptyRuns", true);
00578     handleEmptyLumis_ = optionsPset.getUntrackedParameter<bool>("handleEmptyLumis", true);
00579 
00580     maxEventsPset_ = parameterSet->getUntrackedParameter<ParameterSet>("maxEvents", ParameterSet());
00581     maxLumisPset_ = parameterSet->getUntrackedParameter<ParameterSet>("maxLuminosityBlocks", ParameterSet());
00582 
00583     shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00584     //makeParameterSets(config, parameterSet, pServiceSets);
00585 
00586     //create the services
00587     ServiceToken tempToken(ServiceRegistry::createSet(*pServiceSets, iToken, iLegacy));
00588 
00589     // Copy slots that hold all the registered callback functions like
00590     // PostBeginJob into an ActivityRegistry that is owned by EventProcessor
00591     tempToken.copySlotsTo(*actReg_); 
00592     
00593     //add the ProductRegistry as a service ONLY for the construction phase
00594     typedef serviceregistry::ServiceWrapper<ConstProductRegistry> w_CPR;
00595     shared_ptr<w_CPR>
00596       reg(new w_CPR(std::auto_ptr<ConstProductRegistry>(new ConstProductRegistry(preg_))));
00597     ServiceToken tempToken2(ServiceRegistry::createContaining(reg, 
00598                                                               tempToken, 
00599                                                               kOverlapIsError));
00600 
00601     // the next thing is ugly: pull out the trigger path pset and 
00602     // create a service and extra token for it
00603     std::string processName = parameterSet->getParameter<std::string>("@process_name");
00604 
00605     typedef edm::service::TriggerNamesService TNS;
00606     typedef serviceregistry::ServiceWrapper<TNS> w_TNS;
00607 
00608     shared_ptr<w_TNS> tnsptr
00609       (new w_TNS(std::auto_ptr<TNS>(new TNS(*parameterSet))));
00610 
00611     serviceToken_ = ServiceRegistry::createContaining(tnsptr, 
00612                                                     tempToken2, 
00613                                                     kOverlapIsError);
00614 
00615     //make the services available
00616     ServiceRegistry::Operate operate(serviceToken_);
00617      
00618     //parameterSet = builder.getProcessPSet();
00619     act_table_ = ActionTable(*parameterSet);
00620     CommonParams common = CommonParams(processName,
00621                            getReleaseVersion(),
00622                            getPassID(),
00623                            maxEventsPset_.getUntrackedParameter<int>("input", -1),
00624                            maxLumisPset_.getUntrackedParameter<int>("input", -1));
00625 
00626     esp_ = makeEventSetupProvider(*parameterSet);
00627     fillEventSetupProvider(*esp_, *parameterSet, common);
00628 
00629     looper_ = fillLooper(*esp_, *parameterSet, common);
00630     if (looper_) looper_->setActionTable(&act_table_);
00631     
00632     input_= makeInput(*parameterSet, common, preg_, actReg_);
00633     schedule_ = std::auto_ptr<Schedule>
00634       (new Schedule(*parameterSet,
00635                     ServiceRegistry::instance().get<TNS>(),
00636                     wreg_,
00637                     preg_,
00638                     act_table_,
00639                     actReg_));
00640 
00641     //   initialize(iToken,iLegacy);
00642     FDEBUG(2) << parameterSet->toString() << std::endl;
00643     connectSigs(this);
00644   }
00645 
00646   EventProcessor::~EventProcessor()
00647   {
00648     // Make the services available while everything is being deleted.
00649     ServiceToken token = getToken();
00650     ServiceRegistry::Operate op(token); 
00651 
00652     // The state machine should have already been cleaned up
00653     // and destroyed at this point by a call to EndJob or
00654     // earlier when it completed processing events, but if it
00655     // has not been we'll take care of it here at the last moment.
00656     // This could cause problems if we are already handling an
00657     // exception and another one is thrown here ...  For a critical
00658     // executable the solution to this problem is for the code using
00659     // the EventProcessor to explicitly call EndJob or use runToCompletion,
00660     // then the next line of code is never executed.
00661     terminateMachine();
00662 
00663     try {
00664       changeState(mDtor);
00665     }
00666     catch(cms::Exception& e)
00667       {
00668         LogError("System")
00669           << e.explainSelf() << "\n";
00670       }
00671 
00672     // manually destroy all these thing that may need the services around
00673     esp_.reset();
00674     schedule_.reset();
00675     input_.reset();
00676     looper_.reset();
00677     wreg_.clear();
00678     actReg_.reset();
00679   }
00680 
00681   void
00682   EventProcessor::rewind()
00683   {
00684     beginJob(); //make sure this was called
00685     changeState(mStopAsync);
00686     changeState(mInputRewind);
00687     {
00688       StateSentry toerror(this);
00689 
00690       //make the services available
00691       ServiceRegistry::Operate operate(serviceToken_);
00692       
00693       {
00694         input_->repeat();
00695         input_->rewind();
00696       }
00697       changeState(mCountComplete);
00698       toerror.succeeded();
00699     }
00700     changeState(mFinished);
00701   }
00702 
00703   std::auto_ptr<EventPrincipal>
00704   EventProcessor::doOneEvent(EventID const& id) {
00705     std::auto_ptr<EventPrincipal> pep;
00706     {
00707       pep = input_->readEvent(id);
00708     }
00709     procOneEvent(pep.get());
00710     return pep;
00711   }
00712 
00713   void
00714   EventProcessor::procOneEvent(EventPrincipal *pep) {
00715     if(0 != pep) {
00716       IOVSyncValue ts(pep->id(), pep->luminosityBlock(), pep->time());
00717       EventSetup const& es = esp_->eventSetupForInstance(ts);
00718       schedule_->processOneOccurrence<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(*pep, es);
00719     }
00720   }
00721 
00722   EventProcessor::StatusCode
00723   EventProcessor::run(int numberEventsToProcess, bool repeatable)
00724   {
00725     return runEventCount(numberEventsToProcess);
00726   }
00727   
00728   EventProcessor::StatusCode
00729   EventProcessor::run(EventID const& id)
00730   {
00731     beginJob(); //make sure this was called
00732     changeState(mRunID);
00733     StateSentry toerror(this);
00734     Status rc = epSuccess;
00735 
00736     //make the services available
00737     ServiceRegistry::Operate operate(serviceToken_);
00738 
00739     if(doOneEvent(id).get() == 0) {
00740       changeState(mInputExhausted);
00741     } else {
00742       changeState(mCountComplete);
00743       rc = epInputComplete;
00744     }
00745     toerror.succeeded();
00746     changeState(mFinished);
00747     return rc;
00748   }
00749 
00750   EventProcessor::StatusCode
00751   EventProcessor::skip(int numberToSkip)
00752   {
00753     beginJob(); //make sure this was called
00754     changeState(mSkip);
00755     {
00756       StateSentry toerror(this);
00757 
00758       //make the services available
00759       ServiceRegistry::Operate operate(serviceToken_);
00760       
00761       {
00762         input_->skipEvents(numberToSkip);
00763       }
00764       changeState(mCountComplete);
00765       toerror.succeeded();
00766     }
00767     changeState(mFinished);
00768     return epSuccess;
00769   }
00770 
00771   void
00772   EventProcessor::beginJob() 
00773   {
00774     if(state_ != sInit) return;
00775     bk::beginJob();
00776     // can only be run if in the initial state
00777     changeState(mBeginJob);
00778 
00779     // StateSentry toerror(this); // should we add this ? 
00780     //make the services available
00781     ServiceRegistry::Operate operate(serviceToken_);
00782     
00783     //NOTE:  This implementation assumes 'Job' means one call 
00784     // the EventProcessor::run
00785     // If it really means once per 'application' then this code will
00786     // have to be changed.
00787     // Also have to deal with case where have 'run' then new Module 
00788     // added and do 'run'
00789     // again.  In that case the newly added Module needs its 'beginJob'
00790     // to be called.
00791     EventSetup const& es =
00792       esp_->eventSetupForInstance(IOVSyncValue::beginOfTime());
00793     if(looper_) {
00794        looper_->beginOfJob(es);
00795     }
00796     try {
00797       input_->doBeginJob(es);
00798     } catch(cms::Exception& e) {
00799       LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00800       e << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00801       throw;
00802     } catch(std::exception& e) {
00803       LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n";
00804       throw;
00805     } catch(...) {
00806       LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n";
00807       throw;
00808     }
00809     schedule_->beginJob(es);
00810     actReg_->postBeginJobSignal_();
00811     // toerror.succeeded(); // should we add this?
00812   }
00813 
00814   void
00815   EventProcessor::endJob() 
00816   {
00817     // Collects exceptions, so we don't throw before all operations are performed.
00818     ExceptionCollector c;
00819 
00820     // only allowed to run if state is sIdle,sJobReady,sRunGiven
00821     c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00822 
00823     //make the services available
00824     ServiceRegistry::Operate operate(serviceToken_);  
00825 
00826     c.call(boost::bind(&EventProcessor::terminateMachine, this));
00827     c.call(boost::bind(&Schedule::endJob, schedule_.get()));
00828     c.call(boost::bind(&InputSource::doEndJob, input_));
00829     if (looper_) {
00830       c.call(boost::bind(&EDLooper::endOfJob, looper_));
00831     }
00832     c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00833     if (c.hasThrown()) {
00834       c.rethrow();
00835     }
00836   }
00837 
00838   ServiceToken
00839   EventProcessor::getToken()
00840   {
00841     return serviceToken_;
00842   }
00843 
00844   void
00845   EventProcessor::connectSigs(EventProcessor* ep)
00846   {
00847     // When the FwkImpl signals are given, pass them to the
00848     // appropriate EventProcessor signals so that the outside world
00849     // can see the signal.
00850     actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
00851     actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
00852   }
00853 
00854   std::vector<ModuleDescription const*>
00855   EventProcessor::getAllModuleDescriptions() const
00856   {
00857     return schedule_->getAllModuleDescriptions();
00858   }
00859 
00860   int
00861   EventProcessor::totalEvents() const
00862   {
00863     return schedule_->totalEvents();
00864   }
00865 
00866   int
00867   EventProcessor::totalEventsPassed() const
00868   {
00869     return schedule_->totalEventsPassed();
00870   }
00871 
00872   int
00873   EventProcessor::totalEventsFailed() const
00874   {
00875     return schedule_->totalEventsFailed();
00876   }
00877 
00878   void 
00879   EventProcessor::enableEndPaths(bool active)
00880   {
00881     schedule_->enableEndPaths(active);
00882   }
00883 
00884   bool 
00885   EventProcessor::endPathsEnabled() const
00886   {
00887     return schedule_->endPathsEnabled();
00888   }
00889   
00890   void
00891   EventProcessor::getTriggerReport(TriggerReport& rep) const
00892   {
00893     schedule_->getTriggerReport(rep);
00894   }
00895 
00896   void
00897   EventProcessor::clearCounters()
00898   {
00899     schedule_->clearCounters();
00900   }
00901 
00902 
00903   char const* EventProcessor::currentStateName() const
00904   {
00905     return stateName(getState());
00906   }
00907 
00908   char const* EventProcessor::stateName(State s) const
00909   {
00910     return stateNames[s];
00911   }
00912 
00913   char const* EventProcessor::msgName(Msg m) const
00914   {
00915     return msgNames[m];
00916   }
00917 
00918   State EventProcessor::getState() const
00919   {
00920     return state_;
00921   }
00922 
00923   EventProcessor::StatusCode EventProcessor::statusAsync() const
00924   {
00925     // the thread will record exception/error status in the event processor
00926     // for us to look at and report here
00927     return last_rc_;
00928   }
00929 
00930   void
00931   EventProcessor::setRunNumber(RunNumber_t runNumber)
00932   {
00933     if (runNumber == 0) {
00934       runNumber = 1;
00935       LogWarning("Invalid Run")
00936         << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
00937         << "Run number was set to 1 instead\n";
00938     }
00939 
00940     // inside of beginJob there is a check to see if it has been called before
00941     beginJob();
00942     changeState(mSetRun);
00943 
00944     // interface not correct yet
00945     input_->setRunNumber(runNumber);
00946   }
00947 
00948   void
00949   EventProcessor::declareRunNumber(RunNumber_t runNumber)
00950   {
00951     // inside of beginJob there is a check to see if it has been called before
00952     beginJob();
00953     changeState(mSetRun);
00954 
00955     // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
00956     //input_->declareRunNumber(runNumber);
00957   }
00958 
00959   EventProcessor::StatusCode 
00960   EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds)
00961   {
00962     bool rc = true;
00963     boost::xtime timeout;
00964     boost::xtime_get(&timeout, boost::TIME_UTC); 
00965     timeout.sec += timeout_seconds;
00966 
00967     // make sure to include a timeout here so we don't wait forever
00968     // I suspect there are still timing issues with thread startup
00969     // and the setting of the various control variables (stop_count,id_set)
00970     {
00971       boost::mutex::scoped_lock sl(stop_lock_);
00972 
00973       // look here - if runAsync not active, just return the last return code
00974       if(stop_count_ < 0) return last_rc_;
00975 
00976       if(timeout_seconds==0)
00977         while(stop_count_==0) stopper_.wait(sl);
00978       else
00979         while(stop_count_==0 &&
00980               (rc = stopper_.timed_wait(sl,timeout)) == true);
00981       
00982       if(rc == false)
00983         {
00984           // timeout occurred
00985           // if(id_set_) pthread_kill(event_loop_id_,my_sig_num_);
00986           // this is a temporary hack until we get the input source
00987           // upgraded to allow blocking input sources to be unblocked
00988 
00989           // the next line is dangerous and causes all sorts of trouble
00990           if(id_set_) pthread_cancel(event_loop_id_);
00991 
00992           // we will not do anything yet
00993           LogWarning("timeout")
00994             << "An asynchronous request was made to shut down "
00995             << "the event loop "
00996             << "and the event loop did not shutdown after "
00997             << timeout_seconds << " seconds\n";
00998         }
00999       else
01000         {
01001           event_loop_->join();
01002           event_loop_.reset();
01003           id_set_ = false;
01004           stop_count_ = -1;
01005         }
01006     }
01007     return rc==false?epTimedOut:last_rc_;
01008   }
01009 
01010   EventProcessor::StatusCode 
01011   EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs)
01012   {
01013     StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01014     if(rc!=epTimedOut) changeState(mCountComplete);
01015     else errorState();
01016     return rc;
01017   }
01018 
01019   
01020   EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs)
01021   {
01022     changeState(mStopAsync);
01023     StatusCode rc = waitForAsyncCompletion(secs);
01024     if(rc!=epTimedOut) changeState(mFinished);
01025     else errorState();
01026     return rc;
01027   }
01028   
01029   EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs)
01030   {
01031     changeState(mShutdownAsync);
01032     StatusCode rc = waitForAsyncCompletion(secs);
01033     if(rc!=epTimedOut) changeState(mFinished);
01034     else errorState();
01035     return rc;
01036   }
01037   
01038   void EventProcessor::errorState()
01039   {
01040     state_ = sError;
01041   }
01042 
01043   // next function irrelevant now
01044   EventProcessor::StatusCode EventProcessor::doneAsync(Msg m)
01045   {
01046     // make sure to include a timeout here so we don't wait forever
01047     // I suspect there are still timing issues with thread startup
01048     // and the setting of the various control variables (stop_count,id_set)
01049     changeState(m);
01050     return waitForAsyncCompletion(60*2);
01051   }
01052   
01053   void EventProcessor::changeState(Msg msg)
01054   {
01055     // most likely need to serialize access to this routine
01056 
01057     boost::mutex::scoped_lock sl(state_lock_);
01058     State curr = state_;
01059     int rc;
01060     // found if(not end of table) and 
01061     // (state == table.state && (msg == table.message || msg == any))
01062     for(rc = 0;
01063         table[rc].current != sInvalid && 
01064           (curr != table[rc].current || 
01065            (curr == table[rc].current && 
01066              msg != table[rc].message && table[rc].message != mAny));
01067         ++rc);
01068 
01069     if(table[rc].current == sInvalid)
01070       throw cms::Exception("BadState")
01071         << "A member function of EventProcessor has been called in an"
01072         << " inappropriate order.\n"
01073         << "Bad transition from " << stateName(curr) << " "
01074         << "using message " << msgName(msg) << "\n"
01075         << "No where to go from here.\n";
01076 
01077     FDEBUG(1) << "changeState: current=" << stateName(curr)
01078               << ", message=" << msgName(msg) 
01079               << " -> new=" << stateName(table[rc].final) << "\n";
01080 
01081     state_ = table[rc].final;
01082   }
01083 
01084   void EventProcessor::runAsync()
01085   {
01086     using boost::thread;
01087     beginJob();
01088     {
01089       boost::mutex::scoped_lock sl(stop_lock_);
01090       if(id_set_==true) {
01091           std::string err("runAsync called while async event loop already running\n");
01092           edm::LogError("FwkJob") << err;
01093           throw cms::Exception("BadState") << err;
01094       }
01095 
01096       changeState(mRunAsync);
01097 
01098       stop_count_=0;
01099       last_rc_=epSuccess; // forget the last value!
01100       event_loop_.reset(new thread(boost::bind(EventProcessor::asyncRun,this)));
01101       boost::xtime timeout;
01102       boost::xtime_get(&timeout, boost::TIME_UTC); 
01103       timeout.sec += 60; // 60 seconds to start!!!!
01104       if(starter_.timed_wait(sl,timeout)==false) {
01105           // yikes - the thread did not start
01106           throw cms::Exception("BadState")
01107             << "Async run thread did not start in 60 seconds\n";
01108       }
01109     }
01110   }
01111 
01112   void EventProcessor::asyncRun(EventProcessor* me)
01113   {
01114     // set up signals to allow for interruptions
01115     // ignore all other signals
01116     // make sure no exceptions escape out
01117 
01118     // temporary hack until we modify the input source to allow
01119     // wakeup calls from other threads.  This mimics the solution
01120     // in EventFilter/Processor, which I do not like.
01121     // allowing cancels means that the thread just disappears at
01122     // certain points.  This is bad for C++ stack variables.
01123     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,0);
01124     //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,0);
01125     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
01126     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,0);
01127 
01128     {
01129       boost::mutex::scoped_lock(me->stop_lock_);
01130       me->event_loop_id_ = pthread_self();
01131       me->id_set_ = true;
01132       me->starter_.notify_all();
01133     }
01134 
01135     Status rc = epException;
01136     FDEBUG(2) << "asyncRun starting ......................\n";
01137 
01138     try {
01139       bool onlineStateTransitions = true;
01140       rc = me->runToCompletion(onlineStateTransitions);
01141     }
01142     catch (cms::Exception& e) {
01143       edm::LogError("FwkJob") << "cms::Exception caught in "
01144                               << "EventProcessor::asyncRun" 
01145                               << "\n"
01146                               << e.explainSelf();
01147       me->last_error_text_ = e.explainSelf();
01148     }
01149     catch (std::exception& e) {
01150       edm::LogError("FwkJob") << "Standard library exception caught in " 
01151                               << "EventProcessor::asyncRun" 
01152                               << "\n"
01153                               << e.what();
01154       me->last_error_text_ = e.what();
01155     }
01156     catch (...) {
01157       edm::LogError("FwkJob") << "Unknown exception caught in "
01158                               << "EventProcessor::asyncRun" 
01159                               << "\n";
01160       me->last_error_text_ = "Unknown exception caught";
01161       rc = epOther;
01162     }
01163 
01164     me->last_rc_ = rc;
01165 
01166     {
01167       // notify anyone waiting for exit that we are doing so now
01168       boost::mutex::scoped_lock sl(me->stop_lock_);
01169       ++me->stop_count_;
01170       me->stopper_.notify_all();
01171     }
01172     FDEBUG(2) << "asyncRun ending ......................\n";
01173   }
01174 
01175 
01176   edm::EventProcessor::StatusCode
01177   EventProcessor::runToCompletion(bool onlineStateTransitions) {
01178 
01179     StateSentry toerror(this);
01180 
01181     int numberOfEventsToProcess = -1;
01182     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01183 
01184     if (machine_.get() != 0) {
01185       throw edm::Exception(errors::LogicError)
01186         << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01187         << "Please report this error to the Framework group\n";
01188     }
01189 
01190     toerror.succeeded();
01191 
01192     return returnCode;
01193   }
01194 
01195   edm::EventProcessor::StatusCode
01196   EventProcessor::runEventCount(int numberOfEventsToProcess) {
01197 
01198     StateSentry toerror(this);
01199 
01200     bool onlineStateTransitions = false;
01201     StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01202 
01203     toerror.succeeded();
01204 
01205     return returnCode;
01206   }
01207 
01208   edm::EventProcessor::StatusCode
01209   EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01210 
01211     beginJob(); //make sure this was called
01212 
01213     if (!onlineStateTransitions) changeState(mRunCount);
01214 
01215     StatusCode returnCode = epSuccess;
01216     stateMachineWasInErrorState_ = false;
01217 
01218     // make the services available
01219     ServiceRegistry::Operate operate(serviceToken_);
01220 
01221     if (machine_.get() == 0) {
01222  
01223       statemachine::FileMode fileMode;
01224       if (fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01225       else if (fileMode_ == std::string("MERGE")) fileMode = statemachine::MERGE;
01226       else if (fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01227       else if (fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01228       else if (fileMode_ == std::string("FULLLUMIMERGE")) fileMode = statemachine::FULLLUMIMERGE;
01229       else {
01230         throw edm::Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01231             << fileMode_ << ".\n"
01232             << "Legal values are 'MERGE', 'NOMERGE', 'FULLMERGE', and 'FULLLUMIMERGE'.\n";
01233       }
01234 
01235       machine_.reset(new statemachine::Machine(this,
01236                                                fileMode,
01237                                                handleEmptyRuns_,
01238                                                handleEmptyLumis_));
01239 
01240       machine_->initiate();
01241     }
01242 
01243     try {
01244 
01245       InputSource::ItemType itemType;
01246 
01247       int iEvents = 0;
01248 
01249       while (true) {
01250 
01251         itemType = input_->nextItemType();
01252 
01253         FDEBUG(1) << "itemType = " << itemType << "\n";
01254 
01255         // These are used for asynchronous running only and
01256         // and are checking to see if stopAsync or shutdownAsync
01257         // were called from another thread.  In the future, we
01258         // may need to do something better than polling the state.
01259         // With the current code this is the simplest thing and
01260         // it should always work.  If the interaction between
01261         // threads becomes more complex this may cause problems.
01262         if (state_ == sStopping) {
01263           FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01264           forceLooperToEnd_ = true;
01265           machine_->process_event(statemachine::Stop());
01266           forceLooperToEnd_ = false;
01267           break;
01268         }
01269         else if (state_ == sShuttingDown) {
01270           FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01271           forceLooperToEnd_ = true;
01272           machine_->process_event(statemachine::Stop());
01273           forceLooperToEnd_ = false;
01274           break;
01275         }
01276 
01277         // Look for a shutdown signal
01278         {
01279           boost::mutex::scoped_lock sl(usr2_lock);
01280           if (edm::shutdown_flag) {
01281             changeState(mShutdownSignal);
01282             returnCode = epSignal;
01283             forceLooperToEnd_ = true;
01284             machine_->process_event(statemachine::Stop());
01285             forceLooperToEnd_ = false;
01286             break;
01287           }
01288         }
01289 
01290         if (itemType == InputSource::IsStop) {
01291           machine_->process_event(statemachine::Stop());
01292         }
01293         else if (itemType == InputSource::IsFile) {
01294           machine_->process_event(statemachine::File());
01295         }
01296         else if (itemType == InputSource::IsRun) {
01297           machine_->process_event(statemachine::Run(input_->run()));
01298         }
01299         else if (itemType == InputSource::IsLumi) {
01300           machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01301         }
01302         else if (itemType == InputSource::IsEvent) {
01303           machine_->process_event(statemachine::Event());
01304           ++iEvents;
01305           if (numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01306             returnCode = epCountComplete;            
01307             changeState(mInputExhausted);
01308             FDEBUG(1) << "Event count complete, pausing event loop\n";
01309             break;
01310           }
01311         }
01312         // This should be impossible
01313         else {
01314           throw edm::Exception(errors::LogicError)
01315             << "Unknown next item type passed to EventProcessor\n"
01316             << "Please report this error to the Framework group\n";
01317         }
01318 
01319         if (machine_->terminated()) {
01320           changeState(mInputExhausted);
01321           break;
01322         }
01323       }  // End of loop over state machine events
01324     } // Try block 
01325 
01326     // Some comments on exception handling related to the boost state machine:
01327     //
01328     // Some states used in the machine are special because they
01329     // perform actions while the machine is being terminated, actions
01330     // such as close files, call endRun, call endLumi etc ...  Each of these
01331     // states has two functions that perform these actions.  The functions
01332     // are almost identical.  The major difference is that one version
01333     // catches all exceptions and the other lets exceptions pass through.
01334     // The destructor catches them and the other function named "exit" lets
01335     // them pass through.  On a normal termination, boost will always call
01336     // "exit" and then the state destructor.  In our state classes, the
01337     // the destructors do nothing if the exit function already took
01338     // care of things.  Here's the interesting part.  When boost is
01339     // handling an exception the "exit" function is not called (a boost
01340     // feature).
01341     //
01342     // If an exception occurs while the boost machine is in control
01343     // (which usually means inside a process_event call), then
01344     // the boost state machine destroys its states and "terminates" itself.
01345     // This already done before we hit the catch blocks below. In this case
01346     // the call to terminateMachine below only destroys an already
01347     // terminated state machine.  Because exit is not called, the state destructors
01348     // handle cleaning up lumis, runs, and files.  The destructors swallow
01349     // all exceptions and only pass through the exceptions messages which
01350     // are tacked onto the original exception below.
01351     // 
01352     // If an exception occurs when the boost state machine is not
01353     // in control (outside the process_event functions), then boost
01354     // cannot destroy its own states.  The terminateMachine function
01355     // below takes care of that.  The flag "alreadyHandlingException"
01356     // is set true so that the state exit functions do nothing (and
01357     // cannot throw more exceptions while handling the first).  Then the
01358     // state destructors take care of this because exit did nothing.
01359     //
01360     // In both cases above, the EventProcessor::endOfLoop function is
01361     // not called because it can throw exceptions.
01362     //
01363     // One tricky aspect of the state machine is that things which can
01364     // throw should not be invoked by the state machine while another
01365     // exception is being handled.
01366     // Another tricky aspect is that it appears to be important to 
01367     // terminate the state machine before invoking its destructor.
01368     // We've seen crashes which are not understood when that is not
01369     // done.  Maintainers of this code should be careful about this.
01370 
01371     catch (cms::Exception& e) {
01372       alreadyHandlingException_ = true;
01373       terminateMachine();
01374       alreadyHandlingException_ = false;
01375       e << "cms::Exception caught in EventProcessor and rethrown\n";
01376       e << exceptionMessageLumis_;
01377       e << exceptionMessageRuns_;
01378       e << exceptionMessageFiles_;
01379       throw e;
01380     }
01381     catch (std::bad_alloc& e) {
01382       alreadyHandlingException_ = true;
01383       terminateMachine();
01384       alreadyHandlingException_ = false;
01385       throw cms::Exception("std::bad_alloc")
01386         << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n"
01387         << "The job has probably exhausted the virtual memory available to the process.\n"
01388         << exceptionMessageLumis_
01389         << exceptionMessageRuns_
01390         << exceptionMessageFiles_;
01391     }
01392     catch (std::exception& e) {
01393       alreadyHandlingException_ = true;
01394       terminateMachine();
01395       alreadyHandlingException_ = false;
01396       throw cms::Exception("StdException")
01397         << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n"
01398         << "Previous information:\n" << e.what() << "\n"
01399         << exceptionMessageLumis_
01400         << exceptionMessageRuns_
01401         << exceptionMessageFiles_;
01402     }
01403     catch (...) {
01404       alreadyHandlingException_ = true;
01405       terminateMachine();
01406       alreadyHandlingException_ = false;
01407       throw cms::Exception("Unknown")
01408         << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n"
01409         << exceptionMessageLumis_
01410         << exceptionMessageRuns_
01411         << exceptionMessageFiles_;
01412     }
01413 
01414     if (machine_->terminated()) {
01415       FDEBUG(1) << "The state machine reports it has been terminated\n";
01416       machine_.reset();
01417     }
01418 
01419     if (!onlineStateTransitions) changeState(mFinished);
01420 
01421     if (stateMachineWasInErrorState_) {
01422       throw cms::Exception("BadState")
01423         << "The boost state machine in the EventProcessor exited after\n"
01424         << "entering the Error state.\n";
01425     }
01426 
01427     return returnCode;
01428   }
01429 
01430   void EventProcessor::readFile() {
01431     FDEBUG(1) << " \treadFile\n";
01432     fb_ = input_->readFile();
01433   }
01434 
01435   void EventProcessor::closeInputFile() {
01436     input_->closeFile();
01437     FDEBUG(1) << "\tcloseInputFile\n";
01438   }
01439 
01440   void EventProcessor::openOutputFiles() {
01441     schedule_->openOutputFiles(*fb_);
01442     FDEBUG(1) << "\topenOutputFiles\n";
01443   }
01444 
01445   void EventProcessor::closeOutputFiles() {
01446     schedule_->closeOutputFiles();
01447     FDEBUG(1) << "\tcloseOutputFiles\n";
01448   }
01449 
01450   void EventProcessor::respondToOpenInputFile() {
01451     schedule_->respondToOpenInputFile(*fb_);
01452     FDEBUG(1) << "\trespondToOpenInputFile\n";
01453   }
01454 
01455   void EventProcessor::respondToCloseInputFile() {
01456     schedule_->respondToCloseInputFile(*fb_);
01457     FDEBUG(1) << "\trespondToCloseInputFile\n";
01458   }
01459 
01460   void EventProcessor::respondToOpenOutputFiles() {
01461     schedule_->respondToOpenOutputFiles(*fb_);
01462     FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01463   }
01464 
01465   void EventProcessor::respondToCloseOutputFiles() {
01466     schedule_->respondToCloseOutputFiles(*fb_);
01467     FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01468   }
01469 
01470   void EventProcessor::startingNewLoop() {
01471     shouldWeStop_ = false;
01472     if (looper_) {
01473       looper_->doStartingNewLoop();
01474     }
01475     FDEBUG(1) << "\tstartingNewLoop\n";
01476   }
01477 
01478   bool EventProcessor::endOfLoop() {
01479     if (looper_) {
01480       EDLooper::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01481       if (status != EDLooper::kContinue || forceLooperToEnd_) return true;
01482       else return false;
01483     }
01484     FDEBUG(1) << "\tendOfLoop\n";
01485     return true;
01486   }
01487 
01488   void EventProcessor::rewindInput() {
01489     input_->repeat();
01490     input_->rewind();
01491     FDEBUG(1) << "\trewind\n";
01492   }
01493 
01494   void EventProcessor::prepareForNextLoop() {
01495     looper_->prepareForNextLoop(esp_.get());
01496     FDEBUG(1) << "\tprepareForNextLoop\n";
01497   }
01498 
01499   void EventProcessor::writeLumiCache() {
01500     while (!principalCache_.noMoreLumis()) {
01501       schedule_->writeLumi(principalCache_.lowestLumi());
01502       principalCache_.deleteLowestLumi();      
01503     }
01504     FDEBUG(1) << "\twriteLumiCache\n";
01505   }
01506 
01507   void EventProcessor::writeRunCache() {
01508     while (!principalCache_.noMoreRuns()) {
01509       schedule_->writeRun(principalCache_.lowestRun());
01510       principalCache_.deleteLowestRun();      
01511     }
01512     FDEBUG(1) << "\twriteRunCache\n";
01513   }
01514 
01515   bool EventProcessor::shouldWeCloseOutput() const {
01516     FDEBUG(1) << "\tshouldWeCloseOutput\n";
01517     return schedule_->shouldWeCloseOutput();
01518   }
01519 
01520   void EventProcessor::doErrorStuff() {
01521     FDEBUG(1) << "\tdoErrorStuff\n";
01522     edm::LogError("StateMachine")
01523       << "The EventProcessor state machine encountered an unexpected event\n"
01524       << "and went to the error state\n"
01525       << "Will attempt to terminate processing normally\n"
01526       << "(IF using the looper the next loop will be attempted)\n"
01527       << "This likely indicates a bug in an input module or corrupted input or both\n";
01528     stateMachineWasInErrorState_ = true;
01529   }
01530 
01531   void EventProcessor::beginRun(int run) {
01532     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run);
01533     IOVSyncValue ts(EventID(runPrincipal.run(),0),
01534                     0,
01535                     runPrincipal.beginTime());
01536     EventSetup const& es = esp_->eventSetupForInstance(ts);
01537     schedule_->processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionBegin> >(runPrincipal, es);
01538     FDEBUG(1) << "\tbeginRun " << run << "\n";
01539   }
01540 
01541   void EventProcessor::endRun(int run) {
01542     RunPrincipal& runPrincipal = principalCache_.runPrincipal(run);
01543     input_->doEndRun(runPrincipal);
01544     IOVSyncValue ts(EventID(runPrincipal.run(),EventID::maxEventNumber()),
01545                     LuminosityBlockID::maxLuminosityBlockNumber(),
01546                     runPrincipal.endTime());
01547     EventSetup const& es = esp_->eventSetupForInstance(ts);
01548     schedule_->processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionEnd> >(runPrincipal, es);
01549     FDEBUG(1) << "\tendRun " << run << "\n";
01550   }
01551 
01552   void EventProcessor::beginLumi(int run, int lumi) {
01553     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(run, lumi);
01554     // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
01555     // lumi blocks know their start and end times why not also start and end events?
01556     IOVSyncValue ts(EventID(lumiPrincipal.run(),0), lumiPrincipal.luminosityBlock(), lumiPrincipal.beginTime());
01557     EventSetup const& es = esp_->eventSetupForInstance(ts);
01558     schedule_->processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> >(lumiPrincipal, es);
01559     FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01560   }
01561 
01562   void EventProcessor::endLumi(int run, int lumi) {
01563     LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(run, lumi);
01564     input_->doEndLumi(lumiPrincipal);
01565     //NOTE: Using the max event number for the end of a lumi block is a bad idea
01566     // lumi blocks know their start and end times why not also start and end events?
01567     IOVSyncValue ts(EventID(lumiPrincipal.run(),EventID::maxEventNumber()),
01568                     lumiPrincipal.luminosityBlock(),
01569                     lumiPrincipal.endTime());
01570     EventSetup const& es = esp_->eventSetupForInstance(ts);
01571     schedule_->processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> >(lumiPrincipal, es);
01572     FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01573   }
01574 
01575   int EventProcessor::readAndCacheRun() {
01576     principalCache_.insert(input_->readRun());
01577     FDEBUG(1) << "\treadAndCacheRun " << "\n";
01578     return principalCache_.runPrincipal().run();
01579   }
01580 
01581   int EventProcessor::readAndCacheLumi() {
01582     principalCache_.insert(input_->readLuminosityBlock(principalCache_.runPrincipalPtr()));
01583     FDEBUG(1) << "\treadAndCacheLumi " << "\n";
01584     return principalCache_.lumiPrincipal().luminosityBlock();
01585   }
01586 
01587   void EventProcessor::writeRun(int run) {
01588     schedule_->writeRun(principalCache_.runPrincipal(run));
01589     FDEBUG(1) << "\twriteRun " << run << "\n";
01590   }
01591 
01592   void EventProcessor::deleteRunFromCache(int run) {
01593     principalCache_.deleteRun(run);
01594     FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
01595   }
01596 
01597   void EventProcessor::writeLumi(int run, int lumi) {
01598     schedule_->writeLumi(principalCache_.lumiPrincipal(run, lumi));
01599     FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01600   }
01601 
01602   void EventProcessor::deleteLumiFromCache(int run, int lumi) {
01603     principalCache_.deleteLumi(run, lumi);
01604     FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01605   }
01606 
01607   void EventProcessor::readEvent() {
01608     sm_evp_ = input_->readEvent(principalCache_.lumiPrincipalPtr());
01609     FDEBUG(1) << "\treadEvent\n";
01610   }
01611 
01612   void EventProcessor::processEvent() {
01613     IOVSyncValue ts(sm_evp_->id(), sm_evp_->luminosityBlock(), sm_evp_->time());
01614     EventSetup const& es = esp_->eventSetupForInstance(ts);
01615     schedule_->processOneOccurrence<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(*sm_evp_, es);
01616  
01617     if (looper_) {
01618       EDLooper::Status status = looper_->doDuringLoop(*sm_evp_, esp_->eventSetup());
01619       if (status != EDLooper::kContinue) shouldWeStop_ = true;
01620     }
01621 
01622     FDEBUG(1) << "\tprocessEvent\n";
01623   }
01624 
01625   bool EventProcessor::shouldWeStop() const {
01626     FDEBUG(1) << "\tshouldWeStop\n";
01627     if (shouldWeStop_) return true;
01628     return schedule_->terminate();
01629   }
01630 
01631   void EventProcessor::setExceptionMessageFiles(std::string& message) {
01632     exceptionMessageFiles_ = message;
01633   }
01634 
01635   void EventProcessor::setExceptionMessageRuns(std::string& message) {
01636     exceptionMessageRuns_ = message;
01637   }
01638 
01639   void EventProcessor::setExceptionMessageLumis(std::string& message) {
01640     exceptionMessageLumis_ = message;
01641   }
01642 
01643   bool EventProcessor::alreadyHandlingException() const {
01644     return alreadyHandlingException_;
01645   }
01646 
01647   void EventProcessor::terminateMachine() {
01648     if (machine_.get() != 0) {
01649       if (!machine_->terminated()) {
01650         forceLooperToEnd_ = true;
01651         machine_->process_event(statemachine::Stop());
01652         forceLooperToEnd_ = false;
01653       }
01654       else {
01655         FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
01656       }
01657       if (machine_->terminated()) {
01658         FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
01659       }
01660       machine_.reset();
01661     }
01662   }
01663 }

Generated on Tue Jun 9 17:35:59 2009 for CMSSW by  doxygen 1.5.4