CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/FWCore/Framework/interface/EventProcessor.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 EventProcessor: This defines the 'framework application' object. It is
00007 configured in the user's main() function, and is set running.
00008 
00009 ----------------------------------------------------------------------*/
00010 
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012 
00013 #include "FWCore/Framework/interface/Frameworkfwd.h"
00014 #include "FWCore/Framework/interface/IEventProcessor.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00017 
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019 
00020 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00021 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00023 
00024 #include "boost/shared_ptr.hpp"
00025 #include "boost/thread/condition.hpp"
00026 #include "boost/utility.hpp"
00027 
00028 #include <map>
00029 #include <memory>
00030 #include <set>
00031 #include <string>
00032 #include <vector>
00033 
00034 namespace statemachine {
00035   class Machine;
00036   class Run;
00037 }
00038 
00039 namespace edm {
00040 
00041   class ActionTable;
00042   class EDLooperBase;
00043   class ProcessDesc;
00044   namespace eventsetup {
00045     class EventSetupProvider;
00046   }
00047 
00048   namespace event_processor {
00049     /*
00050       Several of these state are likely to be transitory in
00051       the offline because they are completly driven by the
00052       data coming from the input source.
00053     */
00054     enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00055                  sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00056 
00057     enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00058                mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00059                mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00060                mAny, mDtor, mException, mInputRewind };
00061 
00062     class StateSentry;
00063   }
00064 
00065   class EventProcessor : public IEventProcessor, private boost::noncopyable {
00066   public:
00067 
00068     // The input string 'config' contains the entire contents of a  configuration file.
00069     // Also allows the attachement of pre-existing services specified  by 'token', and
00070     // the specification of services by name only (defaultServices and forcedServices).
00071     // 'defaultServices' are overridden by 'config'.
00072     // 'forcedServices' cause an exception if the same service is specified in 'config'.
00073     explicit EventProcessor(std::string const& config,
00074                             ServiceToken const& token = ServiceToken(),
00075                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00076                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00077                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
00078 
00079     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
00080 
00081     EventProcessor(std::string const& config,
00082                    std::vector<std::string> const& defaultServices,
00083                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
00084 
00085     EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00086                    ServiceToken const& token,
00087                    serviceregistry::ServiceLegacy legacy);
00088 
00090     EventProcessor(std::string const& config, bool isPython);
00091 
00092     ~EventProcessor();
00093 
00098     void beginJob();
00099 
00103     void endJob();
00104 
00108     char const* currentStateName() const;
00109     char const* stateName(event_processor::State s) const;
00110     char const* msgName(event_processor::Msg m) const;
00111     event_processor::State getState() const;
00112     void runAsync();
00113     StatusCode statusAsync() const;
00114 
00115     // Concerning the async control functions:
00116     // The event processor is left with the running thread.
00117     // The async thread is stuck at this point and the process
00118     // is likely not going to be able to continue.
00119     // The reason for this timeout could be either an infinite loop
00120     // or I/O blocking forever.
00121     // The only thing to do is end the process.
00122     // If you call endJob, you will likely get an exception from the
00123     // state checks telling you that it is not valid to call this function.
00124     // All these function force the event processor state into an
00125     // error state.
00126 
00127     // tell the event loop to stop and wait for its completion
00128     StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00129 
00130     // tell the event loop to shutdown and wait for the completion
00131     StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00132 
00133     // wait until async event loop thread completes
00134     // or timeout occurs (See StatusCode for return values)
00135     StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00136 
00137     // Both of these calls move the EP to the ready to run state but only
00138     // the first actually sets the run number, the other one just stores
00139     // the run number set externally in order to later compare to the one
00140     // read from the input source for verification
00141     void setRunNumber(RunNumber_t runNumber);
00142     void declareRunNumber(RunNumber_t runNumber);
00143 
00144     // -------------
00145 
00146     // These next two functions are deprecated.  Please use
00147     // RunToCompletion or RunEventCount instead.  These will
00148     // be deleted as soon as we have time to clean up the code
00149     // in packages outside the Framework that uses them already.
00150     StatusCode run(int numberEventsToProcess, bool repeatable = true);
00151     StatusCode run();
00152 
00153     // Skip the specified number of events.
00154     // If numberToSkip is negative, we will back up.
00155     StatusCode skip(int numberToSkip);
00156 
00157     // Rewind to the first event
00158     void rewind();
00159 
00162 
00166 
00167     std::vector<ModuleDescription const*>
00168     getAllModuleDescriptions() const;
00169 
00173     int totalEvents() const;
00174 
00177     int totalEventsPassed() const;
00178 
00181     int totalEventsFailed() const;
00182 
00185     void enableEndPaths(bool active);
00186 
00189     bool endPathsEnabled() const;
00190 
00193     void getTriggerReport(TriggerReport& rep) const;
00194 
00196     void clearCounters();
00197 
00198     // Really should not be public,
00199     //   but the EventFilter needs it for now.
00200     ServiceToken getToken();
00201 
00204     ActivityRegistry::PreProcessEvent&
00205     preProcessEventSignal() {return preProcessEventSignal_;}
00206 
00209     ActivityRegistry::PostProcessEvent&
00210     postProcessEventSignal() {return postProcessEventSignal_;}
00211 
00212     //------------------------------------------------------------------
00213     //
00214     // Nested classes and structs below.
00215 
00216     // The function "runToCompletion" will run until the job is "complete",
00217     // which means:
00218     //       1 - no more input data
00219     //       2 - input maxEvents parameter limit reached
00220     //       3 - output maxEvents parameter limit reached
00221     //       4 - input maxLuminosityBlocks parameter limit reached
00222     //       5 - looper directs processing to end
00223     // The function "runEventCount" will pause after processing the
00224     // number of input events specified by the argument.  One can
00225     // call it again to resume processing at the same point.  This
00226     // function will also stop at the same point as "runToCompletion"
00227     // if the job is complete before the requested number of events
00228     // are processed.  If the requested number of events is less than
00229     // 1, "runEventCount" interprets this as infinity and does not
00230     // pause until the job is complete.
00231     //
00232     // The return values from these functions are as follows:
00233     //   epSignal - processing terminated early, SIGUSR2 encountered
00234     //   epCountComplete - "runEventCount" processed the number of events
00235     //                     requested by the argument
00236     //   epSuccess - all other cases
00237     //
00238     // We expect that in most cases, processes will call
00239     // "runToCompletion" once per job and not use "runEventCount".
00240     //
00241     // If a process used "runEventCount", then it would need to
00242     // check the value returned by "runEventCount" to determine
00243     // if it processed the requested number of events.  It would
00244     // only make sense to call it again if it returned epCountComplete
00245     // on the preceding call.
00246 
00247     // The online is an exceptional case.  Online uses the DaqSource
00248     // and the StreamerOutputModule, which are specially written to
00249     // handle multiple calls of "runToCompletion" in the same job.
00250     // The call to setRunNumber resets the DaqSource between those calls.
00251     // With most sources and output modules, this does not work.
00252     // If and only if called by the online, the argument to runToCompletion
00253     // is set to true and this affects the state initial and final state
00254     // transitions that are managed directly in EventProcessor.cc. (I am
00255     // not sure if there is a reason for this or it is just a historical
00256     // peculiarity that could be cleaned up and removed).
00257 
00258     virtual StatusCode runToCompletion(bool onlineStateTransitions);
00259     virtual StatusCode runEventCount(int numberOfEventsToProcess);
00260 
00261     // The following functions are used by the code implementing our
00262     // boost statemachine
00263 
00264     virtual void readFile();
00265     virtual void closeInputFile();
00266     virtual void openOutputFiles();
00267     virtual void closeOutputFiles();
00268 
00269     virtual void respondToOpenInputFile();
00270     virtual void respondToCloseInputFile();
00271     virtual void respondToOpenOutputFiles();
00272     virtual void respondToCloseOutputFiles();
00273 
00274     virtual void startingNewLoop();
00275     virtual bool endOfLoop();
00276     virtual void rewindInput();
00277     virtual void prepareForNextLoop();
00278     virtual bool shouldWeCloseOutput() const;
00279 
00280     virtual void doErrorStuff();
00281 
00282     virtual void beginRun(statemachine::Run const& run);
00283     virtual void endRun(statemachine::Run const& run);
00284 
00285     virtual void beginLumi(ProcessHistoryID const& phid, int run, int lumi);
00286     virtual void endLumi(ProcessHistoryID const& phid, int run, int lumi);
00287 
00288     virtual statemachine::Run readAndCacheRun();
00289     virtual int readAndCacheLumi();
00290     virtual void writeRun(statemachine::Run const& run);
00291     virtual void deleteRunFromCache(statemachine::Run const& run);
00292     virtual void writeLumi(ProcessHistoryID const& phid, int run, int lumi);
00293     virtual void deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi);
00294 
00295     virtual void readAndProcessEvent();
00296     virtual bool shouldWeStop() const;
00297 
00298     virtual void setExceptionMessageFiles(std::string& message);
00299     virtual void setExceptionMessageRuns(std::string& message);
00300     virtual void setExceptionMessageLumis(std::string& message);
00301 
00302     virtual bool alreadyHandlingException() const;
00303 
00304     //returns 'true' if this was a child and we should continue processing
00305     bool forkProcess(std::string const& jobReportFile);
00306 
00307   private:
00308     //------------------------------------------------------------------
00309     //
00310     // Now private functions.
00311     // init() is used by only by constructors
00312     void init(boost::shared_ptr<ProcessDesc>& processDesc,
00313               ServiceToken const& token,
00314               serviceregistry::ServiceLegacy);
00315 
00316     StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
00317     void terminateMachine();
00318 
00319     StatusCode doneAsync(event_processor::Msg m);
00320 
00321     StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00322 
00323     void connectSigs(EventProcessor* ep);
00324 
00325     void changeState(event_processor::Msg);
00326     void errorState();
00327     void setupSignal();
00328 
00329     static void asyncRun(EventProcessor*);
00330 
00331     //------------------------------------------------------------------
00332     //
00333     // Data members below.
00334     // Are all these data members really needed? Some of them are used
00335     // only during construction, and never again. If they aren't
00336     // really needed, we should remove them.
00337 
00338     ActivityRegistry::PreProcessEvent             preProcessEventSignal_;
00339     ActivityRegistry::PostProcessEvent            postProcessEventSignal_;
00340     boost::shared_ptr<ActivityRegistry>           actReg_;
00341     boost::shared_ptr<SignallingProductRegistry>  preg_;
00342     ServiceToken                                  serviceToken_;
00343     boost::shared_ptr<InputSource>                input_;
00344     boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00345     boost::shared_ptr<ActionTable const>          act_table_;
00346     boost::shared_ptr<ProcessConfiguration>       processConfiguration_;
00347     std::auto_ptr<Schedule>                       schedule_;
00348 
00349     volatile event_processor::State               state_;
00350     boost::shared_ptr<boost::thread>              event_loop_;
00351 
00352     boost::mutex                                  state_lock_;
00353     boost::mutex                                  stop_lock_;
00354     boost::condition                              stopper_;
00355     boost::condition                              starter_;
00356     volatile int                                  stop_count_;
00357     volatile Status                               last_rc_;
00358     std::string                                   last_error_text_;
00359     volatile bool                                 id_set_;
00360     volatile pthread_t                            event_loop_id_;
00361     int                                           my_sig_num_;
00362     boost::shared_ptr<FileBlock>                  fb_;
00363     boost::shared_ptr<EDLooperBase>               looper_;
00364 
00365     std::auto_ptr<statemachine::Machine>          machine_;
00366     PrincipalCache                                principalCache_;
00367     bool                                          shouldWeStop_;
00368     bool                                          stateMachineWasInErrorState_;
00369     std::string                                   fileMode_;
00370     std::string                                   emptyRunLumiMode_;
00371     std::string                                   exceptionMessageFiles_;
00372     std::string                                   exceptionMessageRuns_;
00373     std::string                                   exceptionMessageLumis_;
00374     bool                                          alreadyHandlingException_;
00375     bool                                          forceLooperToEnd_;
00376     bool                                          looperBeginJobRun_;
00377     bool                                          forceESCacheClearOnNewRun_;
00378 
00379     int                                           numberOfForkedChildren_;
00380     unsigned int                                  numberOfSequentialEventsPerChild_;
00381     bool                                          setCpuAffinity_;
00382     typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00383     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00384     ExcludedDataMap                               eventSetupDataToExcludeFromPrefetching_;
00385     friend class event_processor::StateSentry;
00386   }; // class EventProcessor
00387 
00388   //--------------------------------------------------------------------
00389 
00390   inline
00391   EventProcessor::StatusCode
00392   EventProcessor::run() {
00393     return run(-1, false);
00394   }
00395 }
00396 #endif