00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012
00013 #include "FWCore/Framework/interface/Frameworkfwd.h"
00014 #include "FWCore/Framework/interface/IEventProcessor.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00017
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019
00020 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00021 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00023
00024 #include "boost/shared_ptr.hpp"
00025 #include "boost/scoped_ptr.hpp"
00026 #include "boost/thread/condition.hpp"
00027 #include "boost/utility.hpp"
00028
00029 #include <map>
00030 #include <memory>
00031 #include <set>
00032 #include <string>
00033 #include <vector>
00034
00035 namespace statemachine {
00036 class Machine;
00037 class Run;
00038 }
00039
00040 namespace edm {
00041
00042 class ActionTable;
00043 class EDLooperBase;
00044 class HistoryAppender;
00045 class ProcessDesc;
00046 class SubProcess;
00047 namespace eventsetup {
00048 class EventSetupProvider;
00049 class EventSetupsController;
00050 }
00051
00052 namespace event_processor {
00053
00054
00055
00056
00057
00058 enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00059 sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00060
00061 enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00062 mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00063 mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00064 mAny, mDtor, mException, mInputRewind };
00065
00066 class StateSentry;
00067 }
00068
00069 class EventProcessor : public IEventProcessor, private boost::noncopyable {
00070 public:
00071
00072
00073
00074
00075
00076
00077 explicit EventProcessor(std::string const& config,
00078 ServiceToken const& token = ServiceToken(),
00079 serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00080 std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00081 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00082
00083
00084
00085 EventProcessor(std::string const& config,
00086 std::vector<std::string> const& defaultServices,
00087 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00088
00089 EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00090 ServiceToken const& token,
00091 serviceregistry::ServiceLegacy legacy);
00092
00094 EventProcessor(std::string const& config, bool isPython);
00095
00096 ~EventProcessor();
00097
00102 void beginJob();
00103
00107 void endJob();
00108
00112 char const* currentStateName() const;
00113 char const* stateName(event_processor::State s) const;
00114 char const* msgName(event_processor::Msg m) const;
00115 event_processor::State getState() const;
00116 void runAsync();
00117 StatusCode statusAsync() const;
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00133
00134
00135 StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00136
00137
00138
00139 StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00140
00141
00142
00143
00144
00145 void setRunNumber(RunNumber_t runNumber);
00146 void declareRunNumber(RunNumber_t runNumber);
00147
00148
00149
00150
00151
00152
00153
00154 StatusCode run(int numberEventsToProcess, bool repeatable = true);
00155 StatusCode run();
00156
00157
00158
00159 StatusCode skip(int numberToSkip);
00160
00161
00162 void rewind();
00163
00166
00170
00171 std::vector<ModuleDescription const*>
00172 getAllModuleDescriptions() const;
00173
00177 int totalEvents() const;
00178
00181 int totalEventsPassed() const;
00182
00185 int totalEventsFailed() const;
00186
00189 void enableEndPaths(bool active);
00190
00193 bool endPathsEnabled() const;
00194
00197 void getTriggerReport(TriggerReport& rep) const;
00198
00200 void clearCounters();
00201
00202
00203
00204 ServiceToken getToken();
00205
00208 ActivityRegistry::PreProcessEvent&
00209 preProcessEventSignal() {return preProcessEventSignal_;}
00210
00213 ActivityRegistry::PostProcessEvent&
00214 postProcessEventSignal() {return postProcessEventSignal_;}
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262 virtual StatusCode runToCompletion(bool onlineStateTransitions);
00263 virtual StatusCode runEventCount(int numberOfEventsToProcess);
00264
00265
00266
00267
00268 virtual void readFile();
00269 virtual void closeInputFile(bool cleaningUpAfterException);
00270 virtual void openOutputFiles();
00271 virtual void closeOutputFiles();
00272
00273 virtual void respondToOpenInputFile();
00274 virtual void respondToCloseInputFile();
00275 virtual void respondToOpenOutputFiles();
00276 virtual void respondToCloseOutputFiles();
00277
00278 virtual void startingNewLoop();
00279 virtual bool endOfLoop();
00280 virtual void rewindInput();
00281 virtual void prepareForNextLoop();
00282 virtual bool shouldWeCloseOutput() const;
00283
00284 virtual void doErrorStuff();
00285
00286 virtual void beginRun(statemachine::Run const& run);
00287 virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
00288
00289 virtual void beginLumi(ProcessHistoryID const& phid, int run, int lumi);
00290 virtual void endLumi(ProcessHistoryID const& phid, int run, int lumi, bool cleaningUpAfterException);
00291
00292 virtual statemachine::Run readAndCacheRun(bool merge);
00293 virtual int readAndCacheLumi(bool merge);
00294 virtual void writeRun(statemachine::Run const& run);
00295 virtual void deleteRunFromCache(statemachine::Run const& run);
00296 virtual void writeLumi(ProcessHistoryID const& phid, int run, int lumi);
00297 virtual void deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi);
00298
00299 virtual void readAndProcessEvent();
00300 virtual bool shouldWeStop() const;
00301
00302 virtual void setExceptionMessageFiles(std::string& message);
00303 virtual void setExceptionMessageRuns(std::string& message);
00304 virtual void setExceptionMessageLumis(std::string& message);
00305
00306 virtual bool alreadyHandlingException() const;
00307
00308
00309 bool forkProcess(std::string const& jobReportFile);
00310
00311 private:
00312
00313
00314
00315
00316 void init(boost::shared_ptr<ProcessDesc>& processDesc,
00317 ServiceToken const& token,
00318 serviceregistry::ServiceLegacy);
00319
00320 StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
00321 void terminateMachine();
00322
00323 StatusCode doneAsync(event_processor::Msg m);
00324
00325 StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00326
00327 void connectSigs(EventProcessor* ep);
00328
00329 void changeState(event_processor::Msg);
00330 void errorState();
00331 void setupSignal();
00332
00333 static void asyncRun(EventProcessor*);
00334
00335 bool hasSubProcess() const {
00336 return subProcess_.get() != 0;
00337 }
00338
00339
00340
00341
00342
00343
00344
00345
00346 ActivityRegistry::PreProcessEvent preProcessEventSignal_;
00347 ActivityRegistry::PostProcessEvent postProcessEventSignal_;
00348 boost::shared_ptr<ActivityRegistry> actReg_;
00349 boost::shared_ptr<SignallingProductRegistry> preg_;
00350 ServiceToken serviceToken_;
00351 boost::shared_ptr<InputSource> input_;
00352 boost::scoped_ptr<eventsetup::EventSetupsController> espController_;
00353 boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00354 boost::shared_ptr<ActionTable const> act_table_;
00355 boost::shared_ptr<ProcessConfiguration> processConfiguration_;
00356 std::auto_ptr<Schedule> schedule_;
00357 std::auto_ptr<SubProcess> subProcess_;
00358 boost::scoped_ptr<HistoryAppender> historyAppender_;
00359
00360 volatile event_processor::State state_;
00361 boost::shared_ptr<boost::thread> event_loop_;
00362
00363 boost::mutex state_lock_;
00364 boost::mutex stop_lock_;
00365 boost::condition stopper_;
00366 boost::condition starter_;
00367 volatile int stop_count_;
00368 volatile Status last_rc_;
00369 std::string last_error_text_;
00370 volatile bool id_set_;
00371 volatile pthread_t event_loop_id_;
00372 int my_sig_num_;
00373 boost::shared_ptr<FileBlock> fb_;
00374 boost::shared_ptr<EDLooperBase> looper_;
00375
00376 std::auto_ptr<statemachine::Machine> machine_;
00377 PrincipalCache principalCache_;
00378 bool shouldWeStop_;
00379 bool stateMachineWasInErrorState_;
00380 std::string fileMode_;
00381 std::string emptyRunLumiMode_;
00382 std::string exceptionMessageFiles_;
00383 std::string exceptionMessageRuns_;
00384 std::string exceptionMessageLumis_;
00385 bool alreadyHandlingException_;
00386 bool forceLooperToEnd_;
00387 bool looperBeginJobRun_;
00388 bool forceESCacheClearOnNewRun_;
00389
00390 int numberOfForkedChildren_;
00391 unsigned int numberOfSequentialEventsPerChild_;
00392 bool setCpuAffinity_;
00393 typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00394 typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00395 ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
00396 friend class event_processor::StateSentry;
00397 };
00398
00399
00400
00401 inline
00402 EventProcessor::StatusCode
00403 EventProcessor::run() {
00404 return run(-1, false);
00405 }
00406 }
00407 #endif