00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012 #include "DataFormats/Provenance/interface/RunID.h"
00013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
00014
00015 #include "FWCore/Framework/interface/Frameworkfwd.h"
00016 #include "FWCore/Framework/interface/IEventProcessor.h"
00017 #include "FWCore/Framework/src/PrincipalCache.h"
00018 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00019
00020 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00021
00022 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00023 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00024 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00025
00026 #include "boost/shared_ptr.hpp"
00027 #include "boost/thread/condition.hpp"
00028
00029 #include <map>
00030 #include <memory>
00031 #include <set>
00032 #include <string>
00033 #include <vector>
00034
00035 namespace statemachine {
00036 class Machine;
00037 class Run;
00038 }
00039
00040 namespace edm {
00041
00042 class ActionTable;
00043 class BranchIDListHelper;
00044 class EDLooperBase;
00045 class HistoryAppender;
00046 class ProcessDesc;
00047 class SubProcess;
00048 namespace eventsetup {
00049 class EventSetupProvider;
00050 class EventSetupsController;
00051 }
00052
00053 namespace event_processor {
00054
00055
00056
00057
00058
00059 enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00060 sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00061
00062 enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00063 mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00064 mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00065 mAny, mDtor, mException, mInputRewind };
00066
00067 class StateSentry;
00068 }
00069
00070 class EventProcessor : public IEventProcessor {
00071 public:
00072
00073
00074
00075
00076
00077
00078 explicit EventProcessor(std::string const& config,
00079 ServiceToken const& token = ServiceToken(),
00080 serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00081 std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00082 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00083
00084
00085
00086 EventProcessor(std::string const& config,
00087 std::vector<std::string> const& defaultServices,
00088 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00089
00090 EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00091 ServiceToken const& token,
00092 serviceregistry::ServiceLegacy legacy);
00093
00095 EventProcessor(std::string const& config, bool isPython);
00096
00097 ~EventProcessor();
00098
00099 EventProcessor(EventProcessor const&) = delete;
00100 EventProcessor& operator=(EventProcessor const&) = delete;
00101
00106 void beginJob();
00107
00111 void endJob();
00112
00116 char const* currentStateName() const;
00117 char const* stateName(event_processor::State s) const;
00118 char const* msgName(event_processor::Msg m) const;
00119 event_processor::State getState() const;
00120 void runAsync();
00121 StatusCode statusAsync() const;
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136 StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00137
00138
00139 StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00140
00141
00142
00143 StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00144
00145
00146
00147
00148
00149 void setRunNumber(RunNumber_t runNumber);
00150 void declareRunNumber(RunNumber_t runNumber);
00151
00152
00153
00154
00155
00156 StatusCode run();
00157
00160
00164
00165 std::vector<ModuleDescription const*>
00166 getAllModuleDescriptions() const;
00167
00171 int totalEvents() const;
00172
00175 int totalEventsPassed() const;
00176
00179 int totalEventsFailed() const;
00180
00183 void enableEndPaths(bool active);
00184
00187 bool endPathsEnabled() const;
00188
00191 void getTriggerReport(TriggerReport& rep) const;
00192
00194 void clearCounters();
00195
00196
00197
00198 ServiceToken getToken();
00199
00202 ActivityRegistry::PreProcessEvent&
00203 preProcessEventSignal() {return preProcessEventSignal_;}
00204
00207 ActivityRegistry::PostProcessEvent&
00208 postProcessEventSignal() {return postProcessEventSignal_;}
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239 virtual StatusCode runToCompletion(bool onlineStateTransitions);
00240
00241
00242
00243
00244 virtual void readFile();
00245 virtual void closeInputFile(bool cleaningUpAfterException);
00246 virtual void openOutputFiles();
00247 virtual void closeOutputFiles();
00248
00249 virtual void respondToOpenInputFile();
00250 virtual void respondToCloseInputFile();
00251 virtual void respondToOpenOutputFiles();
00252 virtual void respondToCloseOutputFiles();
00253
00254 virtual void startingNewLoop();
00255 virtual bool endOfLoop();
00256 virtual void rewindInput();
00257 virtual void prepareForNextLoop();
00258 virtual bool shouldWeCloseOutput() const;
00259
00260 virtual void doErrorStuff();
00261
00262 virtual void beginRun(statemachine::Run const& run);
00263 virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
00264
00265 virtual void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
00266 virtual void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException);
00267
00268 virtual statemachine::Run readAndCacheRun();
00269 virtual statemachine::Run readAndMergeRun();
00270 virtual int readAndCacheLumi();
00271 virtual int readAndMergeLumi();
00272 virtual void writeRun(statemachine::Run const& run);
00273 virtual void deleteRunFromCache(statemachine::Run const& run);
00274 virtual void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
00275 virtual void deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
00276
00277 virtual void readAndProcessEvent();
00278 virtual bool shouldWeStop() const;
00279
00280 virtual void setExceptionMessageFiles(std::string& message);
00281 virtual void setExceptionMessageRuns(std::string& message);
00282 virtual void setExceptionMessageLumis(std::string& message);
00283
00284 virtual bool alreadyHandlingException() const;
00285
00286
00287 bool forkProcess(std::string const& jobReportFile);
00288
00289 private:
00290
00291
00292
00293
00294 void init(boost::shared_ptr<ProcessDesc>& processDesc,
00295 ServiceToken const& token,
00296 serviceregistry::ServiceLegacy);
00297
00298 void terminateMachine(std::auto_ptr<statemachine::Machine>&);
00299 std::auto_ptr<statemachine::Machine> createStateMachine();
00300
00301 StatusCode doneAsync(event_processor::Msg m);
00302
00303 StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00304
00305 void connectSigs(EventProcessor* ep);
00306
00307 void changeState(event_processor::Msg);
00308 void errorState();
00309 void setupSignal();
00310
00311 static void asyncRun(EventProcessor*);
00312
00313 bool hasSubProcess() const {
00314 return subProcess_.get() != 0;
00315 }
00316
00317 void possiblyContinueAfterForkChildFailure();
00318
00319
00320
00321
00322
00323
00324
00325 ActivityRegistry::PreProcessEvent preProcessEventSignal_;
00326 ActivityRegistry::PostProcessEvent postProcessEventSignal_;
00327 boost::shared_ptr<ActivityRegistry> actReg_;
00328 boost::shared_ptr<ProductRegistry const> preg_;
00329 boost::shared_ptr<BranchIDListHelper> branchIDListHelper_;
00330 ServiceToken serviceToken_;
00331 std::unique_ptr<InputSource> input_;
00332 std::unique_ptr<eventsetup::EventSetupsController> espController_;
00333 boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00334 std::unique_ptr<ActionTable const> act_table_;
00335 boost::shared_ptr<ProcessConfiguration const> processConfiguration_;
00336 std::auto_ptr<Schedule> schedule_;
00337 std::auto_ptr<SubProcess> subProcess_;
00338 std::unique_ptr<HistoryAppender> historyAppender_;
00339
00340 volatile event_processor::State state_;
00341 boost::shared_ptr<boost::thread> event_loop_;
00342
00343 boost::mutex state_lock_;
00344 boost::mutex stop_lock_;
00345 boost::condition stopper_;
00346 boost::condition starter_;
00347 volatile int stop_count_;
00348 volatile Status last_rc_;
00349 std::string last_error_text_;
00350 volatile bool id_set_;
00351 volatile pthread_t event_loop_id_;
00352 int my_sig_num_;
00353 std::unique_ptr<FileBlock> fb_;
00354 boost::shared_ptr<EDLooperBase> looper_;
00355
00356 PrincipalCache principalCache_;
00357 bool shouldWeStop_;
00358 bool stateMachineWasInErrorState_;
00359 std::string fileMode_;
00360 std::string emptyRunLumiMode_;
00361 std::string exceptionMessageFiles_;
00362 std::string exceptionMessageRuns_;
00363 std::string exceptionMessageLumis_;
00364 bool alreadyHandlingException_;
00365 bool forceLooperToEnd_;
00366 bool looperBeginJobRun_;
00367 bool forceESCacheClearOnNewRun_;
00368
00369 int numberOfForkedChildren_;
00370 unsigned int numberOfSequentialEventsPerChild_;
00371 bool setCpuAffinity_;
00372 bool continueAfterChildFailure_;
00373
00374 typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00375 typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00376 ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
00377 friend class event_processor::StateSentry;
00378 };
00379
00380
00381
00382 inline
00383 EventProcessor::StatusCode
00384 EventProcessor::run() {
00385 return runToCompletion(false);
00386 }
00387 }
00388 #endif