CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/FWCore/Framework/interface/EventProcessor.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 EventProcessor: This defines the 'framework application' object. It is
00007 configured in the user's main() function, and is set running.
00008 
00009 ----------------------------------------------------------------------*/
00010 
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012 #include "DataFormats/Provenance/interface/RunID.h"
00013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
00014 
00015 #include "FWCore/Framework/interface/Frameworkfwd.h"
00016 #include "FWCore/Framework/interface/IEventProcessor.h"
00017 #include "FWCore/Framework/src/PrincipalCache.h"
00018 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00019 
00020 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00021 
00022 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00023 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00024 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00025 
00026 #include "boost/shared_ptr.hpp"
00027 #include "boost/thread/condition.hpp"
00028 
00029 #include <map>
00030 #include <memory>
00031 #include <set>
00032 #include <string>
00033 #include <vector>
00034 
00035 namespace statemachine {
00036   class Machine;
00037   class Run;
00038 }
00039 
00040 namespace edm {
00041 
00042   class ActionTable;
00043   class BranchIDListHelper;
00044   class EDLooperBase;
00045   class HistoryAppender;
00046   class ProcessDesc;
00047   class SubProcess;
00048   namespace eventsetup {
00049     class EventSetupProvider;
00050     class EventSetupsController;
00051   }
00052 
00053   namespace event_processor {
00054     /*
00055       Several of these state are likely to be transitory in
00056       the offline because they are completly driven by the
00057       data coming from the input source.
00058     */
00059     enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00060                  sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00061 
00062     enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00063                mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00064                mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00065                mAny, mDtor, mException, mInputRewind };
00066 
00067     class StateSentry;
00068   }
00069 
00070   class EventProcessor : public IEventProcessor {
00071   public:
00072 
00073     // The input string 'config' contains the entire contents of a  configuration file.
00074     // Also allows the attachement of pre-existing services specified  by 'token', and
00075     // the specification of services by name only (defaultServices and forcedServices).
00076     // 'defaultServices' are overridden by 'config'.
00077     // 'forcedServices' override the 'config'.
00078     explicit EventProcessor(std::string const& config,
00079                             ServiceToken const& token = ServiceToken(),
00080                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00081                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00082                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
00083 
00084     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
00085 
00086     EventProcessor(std::string const& config,
00087                    std::vector<std::string> const& defaultServices,
00088                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
00089 
00090     EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00091                    ServiceToken const& token,
00092                    serviceregistry::ServiceLegacy legacy);
00093 
00095     EventProcessor(std::string const& config, bool isPython);
00096 
00097     ~EventProcessor();
00098 
00099     EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
00100     EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
00101 
00106     void beginJob();
00107 
00111     void endJob();
00112 
00116     char const* currentStateName() const;
00117     char const* stateName(event_processor::State s) const;
00118     char const* msgName(event_processor::Msg m) const;
00119     event_processor::State getState() const;
00120     void runAsync();
00121     StatusCode statusAsync() const;
00122 
00123     // Concerning the async control functions:
00124     // The event processor is left with the running thread.
00125     // The async thread is stuck at this point and the process
00126     // is likely not going to be able to continue.
00127     // The reason for this timeout could be either an infinite loop
00128     // or I/O blocking forever.
00129     // The only thing to do is end the process.
00130     // If you call endJob, you will likely get an exception from the
00131     // state checks telling you that it is not valid to call this function.
00132     // All these function force the event processor state into an
00133     // error state.
00134 
00135     // tell the event loop to stop and wait for its completion
00136     StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00137 
00138     // tell the event loop to shutdown and wait for the completion
00139     StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00140 
00141     // wait until async event loop thread completes
00142     // or timeout occurs (See StatusCode for return values)
00143     StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00144 
00145     // Both of these calls move the EP to the ready to run state but only
00146     // the first actually sets the run number, the other one just stores
00147     // the run number set externally in order to later compare to the one
00148     // read from the input source for verification
00149     void setRunNumber(RunNumber_t runNumber);
00150     void declareRunNumber(RunNumber_t runNumber);
00151 
00152     // -------------
00153 
00154     // Same as runToCompletion(false) but since it was used extensively
00155     // outside of the framework (and is simpler) will keep
00156     StatusCode run();
00157 
00160 
00164 
00165     std::vector<ModuleDescription const*>
00166     getAllModuleDescriptions() const;
00167 
00171     int totalEvents() const;
00172 
00175     int totalEventsPassed() const;
00176 
00179     int totalEventsFailed() const;
00180 
00183     void enableEndPaths(bool active);
00184 
00187     bool endPathsEnabled() const;
00188 
00191     void getTriggerReport(TriggerReport& rep) const;
00192 
00194     void clearCounters();
00195 
00196     // Really should not be public,
00197     //   but the EventFilter needs it for now.
00198     ServiceToken getToken();
00199 
00202     ActivityRegistry::PreProcessEvent&
00203     preProcessEventSignal() {return preProcessEventSignal_;}
00204 
00207     ActivityRegistry::PostProcessEvent&
00208     postProcessEventSignal() {return postProcessEventSignal_;}
00209 
00210     //------------------------------------------------------------------
00211     //
00212     // Nested classes and structs below.
00213 
00214     // The function "runToCompletion" will run until the job is "complete",
00215     // which means:
00216     //       1 - no more input data
00217     //       2 - input maxEvents parameter limit reached
00218     //       3 - output maxEvents parameter limit reached
00219     //       4 - input maxLuminosityBlocks parameter limit reached
00220     //       5 - looper directs processing to end
00221     //
00222     // The return values from the function are as follows:
00223     //   epSignal - processing terminated early, SIGUSR2 encountered
00224     //   epCountComplete - "runEventCount" processed the number of events
00225     //                     requested by the argument
00226     //   epSuccess - all other cases
00227     //
00228     // The online is an exceptional case.  Online uses the DaqSource
00229     // and the StreamerOutputModule, which are specially written to
00230     // handle multiple calls of "runToCompletion" in the same job.
00231     // The call to setRunNumber resets the DaqSource between those calls.
00232     // With most sources and output modules, this does not work.
00233     // If and only if called by the online, the argument to runToCompletion
00234     // is set to true and this affects the state initial and final state
00235     // transitions that are managed directly in EventProcessor.cc. (I am
00236     // not sure if there is a reason for this or it is just a historical
00237     // peculiarity that could be cleaned up and removed).
00238 
00239     virtual StatusCode runToCompletion(bool onlineStateTransitions);
00240 
00241     // The following functions are used by the code implementing our
00242     // boost statemachine
00243 
00244     virtual void readFile();
00245     virtual void closeInputFile(bool cleaningUpAfterException);
00246     virtual void openOutputFiles();
00247     virtual void closeOutputFiles();
00248 
00249     virtual void respondToOpenInputFile();
00250     virtual void respondToCloseInputFile();
00251     virtual void respondToOpenOutputFiles();
00252     virtual void respondToCloseOutputFiles();
00253 
00254     virtual void startingNewLoop();
00255     virtual bool endOfLoop();
00256     virtual void rewindInput();
00257     virtual void prepareForNextLoop();
00258     virtual bool shouldWeCloseOutput() const;
00259 
00260     virtual void doErrorStuff();
00261 
00262     virtual void beginRun(statemachine::Run const& run);
00263     virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
00264 
00265     virtual void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
00266     virtual void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException);
00267 
00268     virtual statemachine::Run readAndCacheRun();
00269     virtual statemachine::Run readAndMergeRun();
00270     virtual int readAndCacheLumi();
00271     virtual int readAndMergeLumi();
00272     virtual void writeRun(statemachine::Run const& run);
00273     virtual void deleteRunFromCache(statemachine::Run const& run);
00274     virtual void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
00275     virtual void deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
00276 
00277     virtual void readAndProcessEvent();
00278     virtual bool shouldWeStop() const;
00279 
00280     virtual void setExceptionMessageFiles(std::string& message);
00281     virtual void setExceptionMessageRuns(std::string& message);
00282     virtual void setExceptionMessageLumis(std::string& message);
00283 
00284     virtual bool alreadyHandlingException() const;
00285 
00286     //returns 'true' if this was a child and we should continue processing
00287     bool forkProcess(std::string const& jobReportFile);
00288 
00289   private:
00290     //------------------------------------------------------------------
00291     //
00292     // Now private functions.
00293     // init() is used by only by constructors
00294     void init(boost::shared_ptr<ProcessDesc>& processDesc,
00295               ServiceToken const& token,
00296               serviceregistry::ServiceLegacy);
00297 
00298     void terminateMachine(std::auto_ptr<statemachine::Machine>&);
00299     std::auto_ptr<statemachine::Machine> createStateMachine();
00300 
00301     StatusCode doneAsync(event_processor::Msg m);
00302 
00303     StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00304 
00305     void connectSigs(EventProcessor* ep);
00306 
00307     void changeState(event_processor::Msg);
00308     void errorState();
00309     void setupSignal();
00310 
00311     static void asyncRun(EventProcessor*);
00312 
00313     bool hasSubProcess() const {
00314       return subProcess_.get() != 0;
00315     }
00316 
00317     void possiblyContinueAfterForkChildFailure();
00318     //------------------------------------------------------------------
00319     //
00320     // Data members below.
00321     // Are all these data members really needed? Some of them are used
00322     // only during construction, and never again. If they aren't
00323     // really needed, we should remove them.
00324 
00325     ActivityRegistry::PreProcessEvent             preProcessEventSignal_;
00326     ActivityRegistry::PostProcessEvent            postProcessEventSignal_;
00327     boost::shared_ptr<ActivityRegistry>           actReg_;
00328     boost::shared_ptr<ProductRegistry const>      preg_;
00329     boost::shared_ptr<BranchIDListHelper>         branchIDListHelper_;
00330     ServiceToken                                  serviceToken_;
00331     std::unique_ptr<InputSource>                  input_;
00332     std::unique_ptr<eventsetup::EventSetupsController> espController_;
00333     boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00334     std::unique_ptr<ActionTable const>          act_table_;
00335     boost::shared_ptr<ProcessConfiguration const>       processConfiguration_;
00336     std::auto_ptr<Schedule>                       schedule_;
00337     std::auto_ptr<SubProcess>                     subProcess_;
00338     std::unique_ptr<HistoryAppender>            historyAppender_;
00339 
00340     volatile event_processor::State               state_;
00341     boost::shared_ptr<boost::thread>              event_loop_;
00342 
00343     boost::mutex                                  state_lock_;
00344     boost::mutex                                  stop_lock_;
00345     boost::condition                              stopper_;
00346     boost::condition                              starter_;
00347     volatile int                                  stop_count_;
00348     volatile Status                               last_rc_;
00349     std::string                                   last_error_text_;
00350     volatile bool                                 id_set_;
00351     volatile pthread_t                            event_loop_id_;
00352     int                                           my_sig_num_;
00353     std::unique_ptr<FileBlock>                    fb_;
00354     boost::shared_ptr<EDLooperBase>               looper_;
00355 
00356     PrincipalCache                                principalCache_;
00357     bool                                          shouldWeStop_;
00358     bool                                          stateMachineWasInErrorState_;
00359     std::string                                   fileMode_;
00360     std::string                                   emptyRunLumiMode_;
00361     std::string                                   exceptionMessageFiles_;
00362     std::string                                   exceptionMessageRuns_;
00363     std::string                                   exceptionMessageLumis_;
00364     bool                                          alreadyHandlingException_;
00365     bool                                          forceLooperToEnd_;
00366     bool                                          looperBeginJobRun_;
00367     bool                                          forceESCacheClearOnNewRun_;
00368 
00369     int                                           numberOfForkedChildren_;
00370     unsigned int                                  numberOfSequentialEventsPerChild_;
00371     bool                                          setCpuAffinity_;
00372     bool                                          continueAfterChildFailure_;
00373     
00374     typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00375     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00376     ExcludedDataMap                               eventSetupDataToExcludeFromPrefetching_;
00377     friend class event_processor::StateSentry;
00378   }; // class EventProcessor
00379 
00380   //--------------------------------------------------------------------
00381 
00382   inline
00383   EventProcessor::StatusCode
00384   EventProcessor::run() {
00385     return runToCompletion(false);
00386   }
00387 }
00388 #endif