CMS 3D CMS Logo

EventProcessor.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 EventProcessor: This defines the 'framework application' object. It is
00007 configured in the user's main() function, and is set running.
00008 
00009 $Id: EventProcessor.h,v 1.69 2008/10/20 20:19:58 wdd Exp $
00010 
00011 ----------------------------------------------------------------------*/
00012 
00013 #include <string>
00014 #include <vector>
00015 #include <memory>
00016 
00017 #include "boost/shared_ptr.hpp"
00018 #include "boost/thread/thread.hpp"
00019 #include "boost/utility.hpp"
00020 
00021 #include "FWCore/Framework/interface/IEventProcessor.h"
00022 #include "FWCore/Framework/interface/InputSource.h"
00023 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00024 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00025 #include "FWCore/Framework/src/WorkerRegistry.h"
00026 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00027 #include "FWCore/Framework/interface/Actions.h"
00028 #include "DataFormats/Provenance/interface/PassID.h"
00029 #include "DataFormats/Provenance/interface/ReleaseVersion.h"
00030 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00031 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00032 
00033 #include "FWCore/Framework/interface/Frameworkfwd.h"
00034 #include "FWCore/Framework/src/PrincipalCache.h"
00035 
00036 namespace statemachine {
00037   class Machine;
00038 }
00039 
00040 namespace edm {
00041 
00042   class ProcessDesc;
00043   namespace eventsetup {
00044     class EventSetupProvider;
00045   }
00046 
00047   namespace event_processor
00048   {  
00049     /*
00050       Several of these state are likely to be transitory in
00051       the offline because they are completly driven by the
00052       data coming from the input source.
00053     */
00054     enum State { sInit=0,sJobReady,sRunGiven,sRunning,sStopping,
00055                  sShuttingDown,sDone,sJobEnded,sError,sErrorEnded,sEnd,sInvalid };
00056     
00057     enum Msg { mSetRun=0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00058                mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00059                mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00060                mAny, mDtor, mException, mInputRewind };
00061 
00062     class StateSentry;
00063   }
00064 
00065   class EventProcessor : public IEventProcessor, private boost::noncopyable
00066   {
00067   public:
00068 
00069     // The input string 'config' contains the entire contents of a  configuration file.
00070     // Also allows the attachement of pre-existing services specified  by 'token', and
00071     // the specification of services by name only (defaultServices and forcedServices).
00072     // 'defaultServices' are overridden by 'config'.
00073     // 'forcedServices' cause an exception if the same service is specified in 'config'.
00074     explicit EventProcessor(std::string const& config,
00075                             ServiceToken const& token = ServiceToken(),
00076                             serviceregistry::ServiceLegacy =
00077                               serviceregistry::kOverlapIsError,
00078                             std::vector<std::string> const& defaultServices =
00079                               std::vector<std::string>(),
00080                             std::vector<std::string> const& forcedServices =
00081                               std::vector<std::string>());
00082 
00083     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
00084 
00085    EventProcessor(std::string const& config,
00086                   std::vector<std::string> const& defaultServices,
00087                   std::vector<std::string> const& forcedServices =
00088                   std::vector<std::string>());
00089     
00090 
00091     EventProcessor(boost::shared_ptr<edm::ProcessDesc> & processDesc,
00092                    ServiceToken const& token,
00093                    serviceregistry::ServiceLegacy legacy);
00094 
00096     EventProcessor(std::string const& config, bool isPython);
00097 
00098     ~EventProcessor();
00099 
00104     void beginJob();
00105 
00109     void endJob();
00110 
00114     char const* currentStateName() const;
00115     char const* stateName(event_processor::State s) const;
00116     char const* msgName(event_processor::Msg m) const;
00117     event_processor::State getState() const;
00118     void runAsync();
00119     StatusCode statusAsync() const;
00120 
00121     // Concerning the async control functions:
00122     // The event processor is left with the running thread.
00123     // The async thread is stuck at this point and the process 
00124     // is likely not going to be able to continue. 
00125     // The reason for this timeout could be either an infinite loop
00126     // or I/O blocking forever.
00127     // The only thing to do is end the process.
00128     // If you call endJob, you will likely get an exception from the
00129     // state checks telling you that it is not valid to call this function.
00130     // All these function force the event processor state into an
00131     // error state.
00132 
00133     // tell the event loop to stop and wait for its completion
00134     StatusCode stopAsync(unsigned int timeout_secs=60*2);
00135     
00136     // tell the event loop to shutdown and wait for the completion
00137     StatusCode shutdownAsync(unsigned int timeout_secs=60*2);
00138 
00139     // wait until async event loop thread completes
00140     // or timeout occurs (See StatusCode for return values)
00141     StatusCode waitTillDoneAsync(unsigned int timeout_seconds=0);
00142 
00143     // Both of these calls move the EP to the ready to run state but only
00144     // the first actually sets the run number, the other one just stores
00145     // the run number set externally in order to later compare to the one
00146     // read from the input source for verification
00147     void setRunNumber(RunNumber_t runNumber);
00148     void declareRunNumber(RunNumber_t runNumber);
00149 
00150     // -------------
00151 
00152     // These next two functions are deprecated.  Please use
00153     // RunToCompletion or RunEventCount instead.  These will
00154     // be deleted as soon as we have time to clean up the code
00155     // in packages outside the Framework that uses them already.
00156     StatusCode run(int numberEventsToProcess, bool repeatable = true);
00157     StatusCode run();
00158 
00159     // Process one event with the given EventID
00160     StatusCode run(EventID const& id);
00161 
00162     // Skip the specified number of events.
00163     // If numberToSkip is negative, we will back up.
00164     StatusCode skip(int numberToSkip);
00165  
00166     // Rewind to the first event
00167     void rewind();
00168 
00171 
00175 
00176     std::vector<ModuleDescription const*>
00177     getAllModuleDescriptions() const;
00178 
00182     int totalEvents() const;
00183 
00186     int totalEventsPassed() const;
00187 
00190     int totalEventsFailed() const;
00191 
00194     void enableEndPaths(bool active);
00195 
00198     bool endPathsEnabled() const;
00199 
00202     void getTriggerReport(TriggerReport& rep) const;      
00203 
00205     void clearCounters();
00206 
00207     // Really should not be public,
00208     //   but the EventFilter needs it for now.    
00209     ServiceToken getToken();
00210 
00213     ActivityRegistry::PreProcessEvent &
00214     preProcessEventSignal() {return preProcessEventSignal_;}
00215 
00218     ActivityRegistry::PostProcessEvent &
00219     postProcessEventSignal() {return postProcessEventSignal_;}
00220 
00221     //------------------------------------------------------------------
00222     //
00223     // Nested classes and structs below.
00224 
00225     struct CommonParams
00226     {
00227       CommonParams():
00228         processName_(),
00229         releaseVersion_(),
00230         passID_(),
00231         maxEventsInput_(),
00232         maxLumisInput_()
00233       { }
00234 
00235       CommonParams(std::string const& processName,
00236                    ReleaseVersion const& releaseVersion,
00237                    PassID const& passID,
00238                    int maxEvents,
00239                    int maxLumis):
00240         processName_(processName),
00241         releaseVersion_(releaseVersion),
00242         passID_(passID),
00243         maxEventsInput_(maxEvents),
00244         maxLumisInput_(maxLumis)
00245       { }
00246       
00247       std::string processName_;
00248       ReleaseVersion releaseVersion_;
00249       PassID passID_;
00250       int maxEventsInput_;
00251       int maxLumisInput_;
00252     }; // struct CommonParams
00253 
00254 
00255     // The function "runToCompletion" will run until the job is "complete",
00256     // which means:
00257     //       1 - no more input data
00258     //       2 - input maxEvents parameter limit reached
00259     //       3 - output maxEvents parameter limit reached
00260     //       4 - input maxLuminosityBlocks parameter limit reached
00261     //       5 - looper directs processing to end
00262     // The function "runEventCount" will pause after processing the
00263     // number of input events specified by the argument.  One can
00264     // call it again to resume processing at the same point.  This
00265     // function will also stop at the same point as "runToCompletion"
00266     // if the job is complete before the requested number of events
00267     // are processed.  If the requested number of events is less than
00268     // 1, "runEventCount" interprets this as infinity and does not
00269     // pause until the job is complete.
00270     //
00271     // The return values from these functions are as follows:
00272     //   epSignal - processing terminated early, SIGUSR2 encountered
00273     //   epCountComplete - "runEventCount" processed the number of events
00274     //                     requested by the argument
00275     //   epSuccess - all other cases    
00276     //
00277     // We expect that in most cases, processes will call
00278     // "runToCompletion" once per job and not use "runEventCount".
00279     //
00280     // If a process used "runEventCount", then it would need to
00281     // check the value returned by "runEventCount" to determine
00282     // if it processed the requested number of events.  It would
00283     // only make sense to call it again if it returned epCountComplete
00284     // on the preceding call.
00285 
00286     // The online is an exceptional case.  Online uses the DaqSource
00287     // and the StreamerOutputModule, which are specially written to
00288     // handle multiple calls of "runToCompletion" in the same job.
00289     // The call to setRunNumber resets the DaqSource between those calls.
00290     // With most sources and output modules, this does not work.
00291     // If and only if called by the online, the argument to runToCompletion
00292     // is set to true and this affects the state initial and final state
00293     // transitions that are managed directly in EventProcessor.cc. (I am
00294     // not sure if there is a reason for this or it is just a historical
00295     // peculiarity that could be cleaned up and removed).
00296 
00297     virtual StatusCode runToCompletion(bool onlineStateTransitions);
00298     virtual StatusCode runEventCount(int numberOfEventsToProcess);
00299 
00300     // The following functions are used by the code implementing our
00301     // boost statemachine
00302 
00303     virtual void readFile();
00304     virtual void closeInputFile();
00305     virtual void openOutputFiles();
00306     virtual void closeOutputFiles();
00307 
00308     virtual void respondToOpenInputFile();
00309     virtual void respondToCloseInputFile();
00310     virtual void respondToOpenOutputFiles();
00311     virtual void respondToCloseOutputFiles();
00312 
00313     virtual void startingNewLoop();
00314     virtual bool endOfLoop();
00315     virtual void rewindInput();
00316     virtual void prepareForNextLoop();
00317     virtual void writeLumiCache();
00318     virtual void writeRunCache();
00319     virtual bool shouldWeCloseOutput() const;
00320 
00321     virtual void doErrorStuff();
00322 
00323     virtual void beginRun(int run);
00324     virtual void endRun(int run);
00325 
00326     virtual void beginLumi(int run, int lumi);
00327     virtual void endLumi(int run, int lumi);
00328 
00329     virtual int readAndCacheRun();
00330     virtual int readAndCacheLumi();
00331     virtual void writeRun(int run);
00332     virtual void deleteRunFromCache(int run);
00333     virtual void writeLumi(int run, int lumi);
00334     virtual void deleteLumiFromCache(int run, int lumi);
00335 
00336     virtual void readEvent();
00337     virtual void processEvent();
00338     virtual bool shouldWeStop() const;
00339 
00340     virtual void setExceptionMessageFiles(std::string& message);
00341     virtual void setExceptionMessageRuns(std::string& message);
00342     virtual void setExceptionMessageLumis(std::string& message);
00343 
00344     virtual bool alreadyHandlingException() const;
00345 
00346   private:
00347     //------------------------------------------------------------------
00348     //
00349     // Now private functions.
00350     // init() is used by only by constructors
00351     void init(boost::shared_ptr<edm::ProcessDesc> & processDesc,
00352               ServiceToken const& token,
00353               serviceregistry::ServiceLegacy);
00354   
00355 
00356                        
00357     StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
00358     void terminateMachine();
00359 
00360     StatusCode doneAsync(event_processor::Msg m);
00361     
00362     std::auto_ptr<EventPrincipal> doOneEvent(EventID const& id);
00363     void procOneEvent(EventPrincipal *pep);
00364 
00365     StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00366 
00367     void connectSigs(EventProcessor * ep);
00368 
00369     void changeState(event_processor::Msg);
00370     void errorState();
00371     void setupSignal();
00372 
00373     static void asyncRun(EventProcessor *);
00374 
00375     //------------------------------------------------------------------
00376     //
00377     // Data members below.
00378     // Are all these data members really needed? Some of them are used
00379     // only during construction, and never again. If they aren't
00380     // really needed, we should remove them.    
00381 
00382     ActivityRegistry::PreProcessEvent             preProcessEventSignal_;
00383     ActivityRegistry::PostProcessEvent            postProcessEventSignal_;
00384     ParameterSet                                  maxEventsPset_;
00385     ParameterSet                                  maxLumisPset_;
00386     boost::shared_ptr<ActivityRegistry>           actReg_;
00387     WorkerRegistry                                wreg_;
00388     SignallingProductRegistry                     preg_;
00389     ServiceToken                                  serviceToken_;
00390     boost::shared_ptr<InputSource>                input_;
00391     std::auto_ptr<Schedule>                       schedule_;
00392     std::auto_ptr<eventsetup::EventSetupProvider> esp_;    
00393     ActionTable                                   act_table_;
00394 
00395     volatile event_processor::State               state_;
00396     boost::shared_ptr<boost::thread>              event_loop_;
00397 
00398     boost::mutex                                  state_lock_;
00399     boost::mutex                                  stop_lock_;
00400     boost::condition                              stopper_;
00401     boost::condition                              starter_;
00402     volatile int                                  stop_count_;
00403     volatile Status                               last_rc_;
00404     std::string                                   last_error_text_;
00405     volatile bool                                 id_set_;
00406     volatile pthread_t                            event_loop_id_;
00407     int                                           my_sig_num_;
00408     boost::shared_ptr<FileBlock>                  fb_;
00409     boost::shared_ptr<EDLooper>                   looper_;
00410 
00411     std::auto_ptr<statemachine::Machine>          machine_;
00412     PrincipalCache                                principalCache_;
00413     std::auto_ptr<EventPrincipal>                 sm_evp_;
00414     bool                                          shouldWeStop_;
00415     bool                                          stateMachineWasInErrorState_;
00416     std::string                                   fileMode_;
00417     bool                                          handleEmptyRuns_;
00418     bool                                          handleEmptyLumis_;
00419     std::string                                   exceptionMessageFiles_;
00420     std::string                                   exceptionMessageRuns_;
00421     std::string                                   exceptionMessageLumis_;
00422     bool                                          alreadyHandlingException_;
00423     bool                                          forceLooperToEnd_;
00424 
00425     friend class event_processor::StateSentry;
00426   }; // class EventProcessor
00427 
00428   //--------------------------------------------------------------------
00429 
00430   inline
00431   EventProcessor::StatusCode
00432   EventProcessor::run() {
00433     return run(-1, false);
00434   }
00435 }
00436 #endif

Generated on Tue Jun 9 17:35:29 2009 for CMSSW by  doxygen 1.5.4