CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/FWCore/Framework/interface/EventProcessor.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 EventProcessor: This defines the 'framework application' object. It is
00007 configured in the user's main() function, and is set running.
00008 
00009 ----------------------------------------------------------------------*/
00010 
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012 
00013 #include "FWCore/Framework/interface/Frameworkfwd.h"
00014 #include "FWCore/Framework/interface/IEventProcessor.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00017 
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019 
00020 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00021 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00023 
00024 #include "boost/shared_ptr.hpp"
00025 #include "boost/thread/condition.hpp"
00026 #include "boost/utility.hpp"
00027 
00028 #include <map>
00029 #include <memory>
00030 #include <set>
00031 #include <string>
00032 #include <vector>
00033 
00034 namespace statemachine {
00035   class Machine;
00036   class Run;
00037 }
00038 
00039 namespace edm {
00040 
00041   class ActionTable;
00042   class EDLooperBase;
00043   class ProcessDesc;
00044   class SubProcess;
00045   namespace eventsetup {
00046     class EventSetupProvider;
00047   }
00048 
00049   namespace event_processor {
00050     /*
00051       Several of these state are likely to be transitory in
00052       the offline because they are completly driven by the
00053       data coming from the input source.
00054     */
00055     enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00056                  sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00057 
00058     enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00059                mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00060                mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00061                mAny, mDtor, mException, mInputRewind };
00062 
00063     class StateSentry;
00064   }
00065 
00066   class EventProcessor : public IEventProcessor, private boost::noncopyable {
00067   public:
00068 
00069     // The input string 'config' contains the entire contents of a  configuration file.
00070     // Also allows the attachement of pre-existing services specified  by 'token', and
00071     // the specification of services by name only (defaultServices and forcedServices).
00072     // 'defaultServices' are overridden by 'config'.
00073     // 'forcedServices' override the 'config'.
00074     explicit EventProcessor(std::string const& config,
00075                             ServiceToken const& token = ServiceToken(),
00076                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00077                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00078                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
00079 
00080     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
00081 
00082     EventProcessor(std::string const& config,
00083                    std::vector<std::string> const& defaultServices,
00084                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
00085 
00086     EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00087                    ServiceToken const& token,
00088                    serviceregistry::ServiceLegacy legacy);
00089 
00091     EventProcessor(std::string const& config, bool isPython);
00092 
00093     ~EventProcessor();
00094 
00099     void beginJob();
00100 
00104     void endJob();
00105 
00109     char const* currentStateName() const;
00110     char const* stateName(event_processor::State s) const;
00111     char const* msgName(event_processor::Msg m) const;
00112     event_processor::State getState() const;
00113     void runAsync();
00114     StatusCode statusAsync() const;
00115 
00116     // Concerning the async control functions:
00117     // The event processor is left with the running thread.
00118     // The async thread is stuck at this point and the process
00119     // is likely not going to be able to continue.
00120     // The reason for this timeout could be either an infinite loop
00121     // or I/O blocking forever.
00122     // The only thing to do is end the process.
00123     // If you call endJob, you will likely get an exception from the
00124     // state checks telling you that it is not valid to call this function.
00125     // All these function force the event processor state into an
00126     // error state.
00127 
00128     // tell the event loop to stop and wait for its completion
00129     StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00130 
00131     // tell the event loop to shutdown and wait for the completion
00132     StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00133 
00134     // wait until async event loop thread completes
00135     // or timeout occurs (See StatusCode for return values)
00136     StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00137 
00138     // Both of these calls move the EP to the ready to run state but only
00139     // the first actually sets the run number, the other one just stores
00140     // the run number set externally in order to later compare to the one
00141     // read from the input source for verification
00142     void setRunNumber(RunNumber_t runNumber);
00143     void declareRunNumber(RunNumber_t runNumber);
00144 
00145     // -------------
00146 
00147     // These next two functions are deprecated.  Please use
00148     // RunToCompletion or RunEventCount instead.  These will
00149     // be deleted as soon as we have time to clean up the code
00150     // in packages outside the Framework that uses them already.
00151     StatusCode run(int numberEventsToProcess, bool repeatable = true);
00152     StatusCode run();
00153 
00154     // Skip the specified number of events.
00155     // If numberToSkip is negative, we will back up.
00156     StatusCode skip(int numberToSkip);
00157 
00158     // Rewind to the first event
00159     void rewind();
00160 
00163 
00167 
00168     std::vector<ModuleDescription const*>
00169     getAllModuleDescriptions() const;
00170 
00174     int totalEvents() const;
00175 
00178     int totalEventsPassed() const;
00179 
00182     int totalEventsFailed() const;
00183 
00186     void enableEndPaths(bool active);
00187 
00190     bool endPathsEnabled() const;
00191 
00194     void getTriggerReport(TriggerReport& rep) const;
00195 
00197     void clearCounters();
00198 
00199     // Really should not be public,
00200     //   but the EventFilter needs it for now.
00201     ServiceToken getToken();
00202 
00205     ActivityRegistry::PreProcessEvent&
00206     preProcessEventSignal() {return preProcessEventSignal_;}
00207 
00210     ActivityRegistry::PostProcessEvent&
00211     postProcessEventSignal() {return postProcessEventSignal_;}
00212 
00213     //------------------------------------------------------------------
00214     //
00215     // Nested classes and structs below.
00216 
00217     // The function "runToCompletion" will run until the job is "complete",
00218     // which means:
00219     //       1 - no more input data
00220     //       2 - input maxEvents parameter limit reached
00221     //       3 - output maxEvents parameter limit reached
00222     //       4 - input maxLuminosityBlocks parameter limit reached
00223     //       5 - looper directs processing to end
00224     // The function "runEventCount" will pause after processing the
00225     // number of input events specified by the argument.  One can
00226     // call it again to resume processing at the same point.  This
00227     // function will also stop at the same point as "runToCompletion"
00228     // if the job is complete before the requested number of events
00229     // are processed.  If the requested number of events is less than
00230     // 1, "runEventCount" interprets this as infinity and does not
00231     // pause until the job is complete.
00232     //
00233     // The return values from these functions are as follows:
00234     //   epSignal - processing terminated early, SIGUSR2 encountered
00235     //   epCountComplete - "runEventCount" processed the number of events
00236     //                     requested by the argument
00237     //   epSuccess - all other cases
00238     //
00239     // We expect that in most cases, processes will call
00240     // "runToCompletion" once per job and not use "runEventCount".
00241     //
00242     // If a process used "runEventCount", then it would need to
00243     // check the value returned by "runEventCount" to determine
00244     // if it processed the requested number of events.  It would
00245     // only make sense to call it again if it returned epCountComplete
00246     // on the preceding call.
00247 
00248     // The online is an exceptional case.  Online uses the DaqSource
00249     // and the StreamerOutputModule, which are specially written to
00250     // handle multiple calls of "runToCompletion" in the same job.
00251     // The call to setRunNumber resets the DaqSource between those calls.
00252     // With most sources and output modules, this does not work.
00253     // If and only if called by the online, the argument to runToCompletion
00254     // is set to true and this affects the state initial and final state
00255     // transitions that are managed directly in EventProcessor.cc. (I am
00256     // not sure if there is a reason for this or it is just a historical
00257     // peculiarity that could be cleaned up and removed).
00258 
00259     virtual StatusCode runToCompletion(bool onlineStateTransitions);
00260     virtual StatusCode runEventCount(int numberOfEventsToProcess);
00261 
00262     // The following functions are used by the code implementing our
00263     // boost statemachine
00264 
00265     virtual void readFile();
00266     virtual void closeInputFile();
00267     virtual void openOutputFiles();
00268     virtual void closeOutputFiles();
00269 
00270     virtual void respondToOpenInputFile();
00271     virtual void respondToCloseInputFile();
00272     virtual void respondToOpenOutputFiles();
00273     virtual void respondToCloseOutputFiles();
00274 
00275     virtual void startingNewLoop();
00276     virtual bool endOfLoop();
00277     virtual void rewindInput();
00278     virtual void prepareForNextLoop();
00279     virtual bool shouldWeCloseOutput() const;
00280 
00281     virtual void doErrorStuff();
00282 
00283     virtual void beginRun(statemachine::Run const& run);
00284     virtual void endRun(statemachine::Run const& run);
00285 
00286     virtual void beginLumi(ProcessHistoryID const& phid, int run, int lumi);
00287     virtual void endLumi(ProcessHistoryID const& phid, int run, int lumi);
00288 
00289     virtual statemachine::Run readAndCacheRun();
00290     virtual int readAndCacheLumi();
00291     virtual void writeRun(statemachine::Run const& run);
00292     virtual void deleteRunFromCache(statemachine::Run const& run);
00293     virtual void writeLumi(ProcessHistoryID const& phid, int run, int lumi);
00294     virtual void deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi);
00295 
00296     virtual void readAndProcessEvent();
00297     virtual bool shouldWeStop() const;
00298 
00299     virtual void setExceptionMessageFiles(std::string& message);
00300     virtual void setExceptionMessageRuns(std::string& message);
00301     virtual void setExceptionMessageLumis(std::string& message);
00302 
00303     virtual bool alreadyHandlingException() const;
00304 
00305     //returns 'true' if this was a child and we should continue processing
00306     bool forkProcess(std::string const& jobReportFile);
00307 
00308   private:
00309     //------------------------------------------------------------------
00310     //
00311     // Now private functions.
00312     // init() is used by only by constructors
00313     void init(boost::shared_ptr<ProcessDesc>& processDesc,
00314               ServiceToken const& token,
00315               serviceregistry::ServiceLegacy);
00316 
00317     StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
00318     void terminateMachine();
00319 
00320     StatusCode doneAsync(event_processor::Msg m);
00321 
00322     StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00323 
00324     void connectSigs(EventProcessor* ep);
00325 
00326     void changeState(event_processor::Msg);
00327     void errorState();
00328     void setupSignal();
00329 
00330     static void asyncRun(EventProcessor*);
00331 
00332     bool hasSubProcess() const {
00333       return subProcess_.get() != 0;
00334     }
00335 
00336     //------------------------------------------------------------------
00337     //
00338     // Data members below.
00339     // Are all these data members really needed? Some of them are used
00340     // only during construction, and never again. If they aren't
00341     // really needed, we should remove them.
00342 
00343     ActivityRegistry::PreProcessEvent             preProcessEventSignal_;
00344     ActivityRegistry::PostProcessEvent            postProcessEventSignal_;
00345     boost::shared_ptr<ActivityRegistry>           actReg_;
00346     boost::shared_ptr<SignallingProductRegistry>  preg_;
00347     ServiceToken                                  serviceToken_;
00348     boost::shared_ptr<InputSource>                input_;
00349     boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00350     boost::shared_ptr<ActionTable const>          act_table_;
00351     boost::shared_ptr<ProcessConfiguration>       processConfiguration_;
00352     std::auto_ptr<Schedule>                       schedule_;
00353     std::auto_ptr<SubProcess>                     subProcess_;
00354 
00355     volatile event_processor::State               state_;
00356     boost::shared_ptr<boost::thread>              event_loop_;
00357 
00358     boost::mutex                                  state_lock_;
00359     boost::mutex                                  stop_lock_;
00360     boost::condition                              stopper_;
00361     boost::condition                              starter_;
00362     volatile int                                  stop_count_;
00363     volatile Status                               last_rc_;
00364     std::string                                   last_error_text_;
00365     volatile bool                                 id_set_;
00366     volatile pthread_t                            event_loop_id_;
00367     int                                           my_sig_num_;
00368     boost::shared_ptr<FileBlock>                  fb_;
00369     boost::shared_ptr<EDLooperBase>               looper_;
00370 
00371     std::auto_ptr<statemachine::Machine>          machine_;
00372     PrincipalCache                                principalCache_;
00373     bool                                          shouldWeStop_;
00374     bool                                          stateMachineWasInErrorState_;
00375     std::string                                   fileMode_;
00376     std::string                                   emptyRunLumiMode_;
00377     std::string                                   exceptionMessageFiles_;
00378     std::string                                   exceptionMessageRuns_;
00379     std::string                                   exceptionMessageLumis_;
00380     bool                                          alreadyHandlingException_;
00381     bool                                          forceLooperToEnd_;
00382     bool                                          looperBeginJobRun_;
00383     bool                                          forceESCacheClearOnNewRun_;
00384 
00385     int                                           numberOfForkedChildren_;
00386     unsigned int                                  numberOfSequentialEventsPerChild_;
00387     bool                                          setCpuAffinity_;
00388     typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00389     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00390     ExcludedDataMap                               eventSetupDataToExcludeFromPrefetching_;
00391     friend class event_processor::StateSentry;
00392   }; // class EventProcessor
00393 
00394   //--------------------------------------------------------------------
00395 
00396   inline
00397   EventProcessor::StatusCode
00398   EventProcessor::run() {
00399     return run(-1, false);
00400   }
00401 }
00402 #endif