CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_1/src/FWCore/Framework/interface/EventProcessor.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 EventProcessor: This defines the 'framework application' object. It is
00007 configured in the user's main() function, and is set running.
00008 
00009 ----------------------------------------------------------------------*/
00010 
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012 
00013 #include "FWCore/Framework/interface/Frameworkfwd.h"
00014 #include "FWCore/Framework/interface/IEventProcessor.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00017 
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019 
00020 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00021 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00023 
00024 #include "boost/shared_ptr.hpp"
00025 #include "boost/scoped_ptr.hpp"
00026 #include "boost/thread/condition.hpp"
00027 #include "boost/utility.hpp"
00028 
00029 #include <map>
00030 #include <memory>
00031 #include <set>
00032 #include <string>
00033 #include <vector>
00034 
00035 namespace statemachine {
00036   class Machine;
00037   class Run;
00038 }
00039 
00040 namespace edm {
00041 
00042   class ActionTable;
00043   class EDLooperBase;
00044   class HistoryAppender;
00045   class ProcessDesc;
00046   class SubProcess;
00047   namespace eventsetup {
00048     class EventSetupProvider;
00049     class EventSetupsController;
00050   }
00051 
00052   namespace event_processor {
00053     /*
00054       Several of these state are likely to be transitory in
00055       the offline because they are completly driven by the
00056       data coming from the input source.
00057     */
00058     enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00059                  sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00060 
00061     enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00062                mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00063                mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00064                mAny, mDtor, mException, mInputRewind };
00065 
00066     class StateSentry;
00067   }
00068 
00069   class EventProcessor : public IEventProcessor, private boost::noncopyable {
00070   public:
00071 
00072     // The input string 'config' contains the entire contents of a  configuration file.
00073     // Also allows the attachement of pre-existing services specified  by 'token', and
00074     // the specification of services by name only (defaultServices and forcedServices).
00075     // 'defaultServices' are overridden by 'config'.
00076     // 'forcedServices' override the 'config'.
00077     explicit EventProcessor(std::string const& config,
00078                             ServiceToken const& token = ServiceToken(),
00079                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00080                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00081                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
00082 
00083     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
00084 
00085     EventProcessor(std::string const& config,
00086                    std::vector<std::string> const& defaultServices,
00087                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
00088 
00089     EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00090                    ServiceToken const& token,
00091                    serviceregistry::ServiceLegacy legacy);
00092 
00094     EventProcessor(std::string const& config, bool isPython);
00095 
00096     ~EventProcessor();
00097 
00102     void beginJob();
00103 
00107     void endJob();
00108 
00112     char const* currentStateName() const;
00113     char const* stateName(event_processor::State s) const;
00114     char const* msgName(event_processor::Msg m) const;
00115     event_processor::State getState() const;
00116     void runAsync();
00117     StatusCode statusAsync() const;
00118 
00119     // Concerning the async control functions:
00120     // The event processor is left with the running thread.
00121     // The async thread is stuck at this point and the process
00122     // is likely not going to be able to continue.
00123     // The reason for this timeout could be either an infinite loop
00124     // or I/O blocking forever.
00125     // The only thing to do is end the process.
00126     // If you call endJob, you will likely get an exception from the
00127     // state checks telling you that it is not valid to call this function.
00128     // All these function force the event processor state into an
00129     // error state.
00130 
00131     // tell the event loop to stop and wait for its completion
00132     StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00133 
00134     // tell the event loop to shutdown and wait for the completion
00135     StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00136 
00137     // wait until async event loop thread completes
00138     // or timeout occurs (See StatusCode for return values)
00139     StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00140 
00141     // Both of these calls move the EP to the ready to run state but only
00142     // the first actually sets the run number, the other one just stores
00143     // the run number set externally in order to later compare to the one
00144     // read from the input source for verification
00145     void setRunNumber(RunNumber_t runNumber);
00146     void declareRunNumber(RunNumber_t runNumber);
00147 
00148     // -------------
00149 
00150     // These next two functions are deprecated.  Please use
00151     // RunToCompletion or RunEventCount instead.  These will
00152     // be deleted as soon as we have time to clean up the code
00153     // in packages outside the Framework that uses them already.
00154     StatusCode run(int numberEventsToProcess, bool repeatable = true);
00155     StatusCode run();
00156 
00157     // Skip the specified number of events.
00158     // If numberToSkip is negative, we will back up.
00159     StatusCode skip(int numberToSkip);
00160 
00161     // Rewind to the first event
00162     void rewind();
00163 
00166 
00170 
00171     std::vector<ModuleDescription const*>
00172     getAllModuleDescriptions() const;
00173 
00177     int totalEvents() const;
00178 
00181     int totalEventsPassed() const;
00182 
00185     int totalEventsFailed() const;
00186 
00189     void enableEndPaths(bool active);
00190 
00193     bool endPathsEnabled() const;
00194 
00197     void getTriggerReport(TriggerReport& rep) const;
00198 
00200     void clearCounters();
00201 
00202     // Really should not be public,
00203     //   but the EventFilter needs it for now.
00204     ServiceToken getToken();
00205 
00208     ActivityRegistry::PreProcessEvent&
00209     preProcessEventSignal() {return preProcessEventSignal_;}
00210 
00213     ActivityRegistry::PostProcessEvent&
00214     postProcessEventSignal() {return postProcessEventSignal_;}
00215 
00216     //------------------------------------------------------------------
00217     //
00218     // Nested classes and structs below.
00219 
00220     // The function "runToCompletion" will run until the job is "complete",
00221     // which means:
00222     //       1 - no more input data
00223     //       2 - input maxEvents parameter limit reached
00224     //       3 - output maxEvents parameter limit reached
00225     //       4 - input maxLuminosityBlocks parameter limit reached
00226     //       5 - looper directs processing to end
00227     // The function "runEventCount" will pause after processing the
00228     // number of input events specified by the argument.  One can
00229     // call it again to resume processing at the same point.  This
00230     // function will also stop at the same point as "runToCompletion"
00231     // if the job is complete before the requested number of events
00232     // are processed.  If the requested number of events is less than
00233     // 1, "runEventCount" interprets this as infinity and does not
00234     // pause until the job is complete.
00235     //
00236     // The return values from these functions are as follows:
00237     //   epSignal - processing terminated early, SIGUSR2 encountered
00238     //   epCountComplete - "runEventCount" processed the number of events
00239     //                     requested by the argument
00240     //   epSuccess - all other cases
00241     //
00242     // We expect that in most cases, processes will call
00243     // "runToCompletion" once per job and not use "runEventCount".
00244     //
00245     // If a process used "runEventCount", then it would need to
00246     // check the value returned by "runEventCount" to determine
00247     // if it processed the requested number of events.  It would
00248     // only make sense to call it again if it returned epCountComplete
00249     // on the preceding call.
00250 
00251     // The online is an exceptional case.  Online uses the DaqSource
00252     // and the StreamerOutputModule, which are specially written to
00253     // handle multiple calls of "runToCompletion" in the same job.
00254     // The call to setRunNumber resets the DaqSource between those calls.
00255     // With most sources and output modules, this does not work.
00256     // If and only if called by the online, the argument to runToCompletion
00257     // is set to true and this affects the state initial and final state
00258     // transitions that are managed directly in EventProcessor.cc. (I am
00259     // not sure if there is a reason for this or it is just a historical
00260     // peculiarity that could be cleaned up and removed).
00261 
00262     virtual StatusCode runToCompletion(bool onlineStateTransitions);
00263     virtual StatusCode runEventCount(int numberOfEventsToProcess);
00264 
00265     // The following functions are used by the code implementing our
00266     // boost statemachine
00267 
00268     virtual void readFile();
00269     virtual void closeInputFile(bool cleaningUpAfterException);
00270     virtual void openOutputFiles();
00271     virtual void closeOutputFiles();
00272 
00273     virtual void respondToOpenInputFile();
00274     virtual void respondToCloseInputFile();
00275     virtual void respondToOpenOutputFiles();
00276     virtual void respondToCloseOutputFiles();
00277 
00278     virtual void startingNewLoop();
00279     virtual bool endOfLoop();
00280     virtual void rewindInput();
00281     virtual void prepareForNextLoop();
00282     virtual bool shouldWeCloseOutput() const;
00283 
00284     virtual void doErrorStuff();
00285 
00286     virtual void beginRun(statemachine::Run const& run);
00287     virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
00288 
00289     virtual void beginLumi(ProcessHistoryID const& phid, int run, int lumi);
00290     virtual void endLumi(ProcessHistoryID const& phid, int run, int lumi, bool cleaningUpAfterException);
00291 
00292     virtual statemachine::Run readAndCacheRun(bool merge);
00293     virtual int readAndCacheLumi(bool merge);
00294     virtual void writeRun(statemachine::Run const& run);
00295     virtual void deleteRunFromCache(statemachine::Run const& run);
00296     virtual void writeLumi(ProcessHistoryID const& phid, int run, int lumi);
00297     virtual void deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi);
00298 
00299     virtual void readAndProcessEvent();
00300     virtual bool shouldWeStop() const;
00301 
00302     virtual void setExceptionMessageFiles(std::string& message);
00303     virtual void setExceptionMessageRuns(std::string& message);
00304     virtual void setExceptionMessageLumis(std::string& message);
00305 
00306     virtual bool alreadyHandlingException() const;
00307 
00308     //returns 'true' if this was a child and we should continue processing
00309     bool forkProcess(std::string const& jobReportFile);
00310 
00311   private:
00312     //------------------------------------------------------------------
00313     //
00314     // Now private functions.
00315     // init() is used by only by constructors
00316     void init(boost::shared_ptr<ProcessDesc>& processDesc,
00317               ServiceToken const& token,
00318               serviceregistry::ServiceLegacy);
00319 
00320     StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
00321     void terminateMachine();
00322 
00323     StatusCode doneAsync(event_processor::Msg m);
00324 
00325     StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00326 
00327     void connectSigs(EventProcessor* ep);
00328 
00329     void changeState(event_processor::Msg);
00330     void errorState();
00331     void setupSignal();
00332 
00333     static void asyncRun(EventProcessor*);
00334 
00335     bool hasSubProcess() const {
00336       return subProcess_.get() != 0;
00337     }
00338 
00339     //------------------------------------------------------------------
00340     //
00341     // Data members below.
00342     // Are all these data members really needed? Some of them are used
00343     // only during construction, and never again. If they aren't
00344     // really needed, we should remove them.
00345 
00346     ActivityRegistry::PreProcessEvent             preProcessEventSignal_;
00347     ActivityRegistry::PostProcessEvent            postProcessEventSignal_;
00348     boost::shared_ptr<ActivityRegistry>           actReg_;
00349     boost::shared_ptr<SignallingProductRegistry>  preg_;
00350     ServiceToken                                  serviceToken_;
00351     boost::shared_ptr<InputSource>                input_;
00352     boost::scoped_ptr<eventsetup::EventSetupsController> espController_;
00353     boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00354     boost::shared_ptr<ActionTable const>          act_table_;
00355     boost::shared_ptr<ProcessConfiguration>       processConfiguration_;
00356     std::auto_ptr<Schedule>                       schedule_;
00357     std::auto_ptr<SubProcess>                     subProcess_;
00358     boost::scoped_ptr<HistoryAppender>            historyAppender_;
00359 
00360     volatile event_processor::State               state_;
00361     boost::shared_ptr<boost::thread>              event_loop_;
00362 
00363     boost::mutex                                  state_lock_;
00364     boost::mutex                                  stop_lock_;
00365     boost::condition                              stopper_;
00366     boost::condition                              starter_;
00367     volatile int                                  stop_count_;
00368     volatile Status                               last_rc_;
00369     std::string                                   last_error_text_;
00370     volatile bool                                 id_set_;
00371     volatile pthread_t                            event_loop_id_;
00372     int                                           my_sig_num_;
00373     boost::shared_ptr<FileBlock>                  fb_;
00374     boost::shared_ptr<EDLooperBase>               looper_;
00375 
00376     std::auto_ptr<statemachine::Machine>          machine_;
00377     PrincipalCache                                principalCache_;
00378     bool                                          shouldWeStop_;
00379     bool                                          stateMachineWasInErrorState_;
00380     std::string                                   fileMode_;
00381     std::string                                   emptyRunLumiMode_;
00382     std::string                                   exceptionMessageFiles_;
00383     std::string                                   exceptionMessageRuns_;
00384     std::string                                   exceptionMessageLumis_;
00385     bool                                          alreadyHandlingException_;
00386     bool                                          forceLooperToEnd_;
00387     bool                                          looperBeginJobRun_;
00388     bool                                          forceESCacheClearOnNewRun_;
00389 
00390     int                                           numberOfForkedChildren_;
00391     unsigned int                                  numberOfSequentialEventsPerChild_;
00392     bool                                          setCpuAffinity_;
00393     typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00394     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00395     ExcludedDataMap                               eventSetupDataToExcludeFromPrefetching_;
00396     friend class event_processor::StateSentry;
00397   }; // class EventProcessor
00398 
00399   //--------------------------------------------------------------------
00400 
00401   inline
00402   EventProcessor::StatusCode
00403   EventProcessor::run() {
00404     return run(-1, false);
00405   }
00406 }
00407 #endif