00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012
00013 #include "FWCore/Framework/interface/Frameworkfwd.h"
00014 #include "FWCore/Framework/interface/IEventProcessor.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00017
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019
00020 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00021 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00023
00024 #include "boost/shared_ptr.hpp"
00025 #include "boost/thread/condition.hpp"
00026 #include "boost/utility.hpp"
00027
00028 #include <map>
00029 #include <memory>
00030 #include <set>
00031 #include <string>
00032 #include <vector>
00033
00034 namespace statemachine {
00035 class Machine;
00036 class Run;
00037 }
00038
00039 namespace edm {
00040
00041 class ActionTable;
00042 class EDLooperBase;
00043 class ProcessDesc;
00044 class SubProcess;
00045 namespace eventsetup {
00046 class EventSetupProvider;
00047 }
00048
00049 namespace event_processor {
00050
00051
00052
00053
00054
00055 enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00056 sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00057
00058 enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00059 mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00060 mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00061 mAny, mDtor, mException, mInputRewind };
00062
00063 class StateSentry;
00064 }
00065
00066 class EventProcessor : public IEventProcessor, private boost::noncopyable {
00067 public:
00068
00069
00070
00071
00072
00073
00074 explicit EventProcessor(std::string const& config,
00075 ServiceToken const& token = ServiceToken(),
00076 serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00077 std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00078 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00079
00080
00081
00082 EventProcessor(std::string const& config,
00083 std::vector<std::string> const& defaultServices,
00084 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00085
00086 EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00087 ServiceToken const& token,
00088 serviceregistry::ServiceLegacy legacy);
00089
00091 EventProcessor(std::string const& config, bool isPython);
00092
00093 ~EventProcessor();
00094
00099 void beginJob();
00100
00104 void endJob();
00105
00109 char const* currentStateName() const;
00110 char const* stateName(event_processor::State s) const;
00111 char const* msgName(event_processor::Msg m) const;
00112 event_processor::State getState() const;
00113 void runAsync();
00114 StatusCode statusAsync() const;
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129 StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00130
00131
00132 StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00133
00134
00135
00136 StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00137
00138
00139
00140
00141
00142 void setRunNumber(RunNumber_t runNumber);
00143 void declareRunNumber(RunNumber_t runNumber);
00144
00145
00146
00147
00148
00149
00150
00151 StatusCode run(int numberEventsToProcess, bool repeatable = true);
00152 StatusCode run();
00153
00154
00155
00156 StatusCode skip(int numberToSkip);
00157
00158
00159 void rewind();
00160
00163
00167
00168 std::vector<ModuleDescription const*>
00169 getAllModuleDescriptions() const;
00170
00174 int totalEvents() const;
00175
00178 int totalEventsPassed() const;
00179
00182 int totalEventsFailed() const;
00183
00186 void enableEndPaths(bool active);
00187
00190 bool endPathsEnabled() const;
00191
00194 void getTriggerReport(TriggerReport& rep) const;
00195
00197 void clearCounters();
00198
00199
00200
00201 ServiceToken getToken();
00202
00205 ActivityRegistry::PreProcessEvent&
00206 preProcessEventSignal() {return preProcessEventSignal_;}
00207
00210 ActivityRegistry::PostProcessEvent&
00211 postProcessEventSignal() {return postProcessEventSignal_;}
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259 virtual StatusCode runToCompletion(bool onlineStateTransitions);
00260 virtual StatusCode runEventCount(int numberOfEventsToProcess);
00261
00262
00263
00264
00265 virtual void readFile();
00266 virtual void closeInputFile();
00267 virtual void openOutputFiles();
00268 virtual void closeOutputFiles();
00269
00270 virtual void respondToOpenInputFile();
00271 virtual void respondToCloseInputFile();
00272 virtual void respondToOpenOutputFiles();
00273 virtual void respondToCloseOutputFiles();
00274
00275 virtual void startingNewLoop();
00276 virtual bool endOfLoop();
00277 virtual void rewindInput();
00278 virtual void prepareForNextLoop();
00279 virtual bool shouldWeCloseOutput() const;
00280
00281 virtual void doErrorStuff();
00282
00283 virtual void beginRun(statemachine::Run const& run);
00284 virtual void endRun(statemachine::Run const& run);
00285
00286 virtual void beginLumi(ProcessHistoryID const& phid, int run, int lumi);
00287 virtual void endLumi(ProcessHistoryID const& phid, int run, int lumi);
00288
00289 virtual statemachine::Run readAndCacheRun();
00290 virtual int readAndCacheLumi();
00291 virtual void writeRun(statemachine::Run const& run);
00292 virtual void deleteRunFromCache(statemachine::Run const& run);
00293 virtual void writeLumi(ProcessHistoryID const& phid, int run, int lumi);
00294 virtual void deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi);
00295
00296 virtual void readAndProcessEvent();
00297 virtual bool shouldWeStop() const;
00298
00299 virtual void setExceptionMessageFiles(std::string& message);
00300 virtual void setExceptionMessageRuns(std::string& message);
00301 virtual void setExceptionMessageLumis(std::string& message);
00302
00303 virtual bool alreadyHandlingException() const;
00304
00305
00306 bool forkProcess(std::string const& jobReportFile);
00307
00308 private:
00309
00310
00311
00312
00313 void init(boost::shared_ptr<ProcessDesc>& processDesc,
00314 ServiceToken const& token,
00315 serviceregistry::ServiceLegacy);
00316
00317 StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
00318 void terminateMachine();
00319
00320 StatusCode doneAsync(event_processor::Msg m);
00321
00322 StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00323
00324 void connectSigs(EventProcessor* ep);
00325
00326 void changeState(event_processor::Msg);
00327 void errorState();
00328 void setupSignal();
00329
00330 static void asyncRun(EventProcessor*);
00331
00332 bool hasSubProcess() const {
00333 return subProcess_.get() != 0;
00334 }
00335
00336
00337
00338
00339
00340
00341
00342
00343 ActivityRegistry::PreProcessEvent preProcessEventSignal_;
00344 ActivityRegistry::PostProcessEvent postProcessEventSignal_;
00345 boost::shared_ptr<ActivityRegistry> actReg_;
00346 boost::shared_ptr<SignallingProductRegistry> preg_;
00347 ServiceToken serviceToken_;
00348 boost::shared_ptr<InputSource> input_;
00349 boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00350 boost::shared_ptr<ActionTable const> act_table_;
00351 boost::shared_ptr<ProcessConfiguration> processConfiguration_;
00352 std::auto_ptr<Schedule> schedule_;
00353 std::auto_ptr<SubProcess> subProcess_;
00354
00355 volatile event_processor::State state_;
00356 boost::shared_ptr<boost::thread> event_loop_;
00357
00358 boost::mutex state_lock_;
00359 boost::mutex stop_lock_;
00360 boost::condition stopper_;
00361 boost::condition starter_;
00362 volatile int stop_count_;
00363 volatile Status last_rc_;
00364 std::string last_error_text_;
00365 volatile bool id_set_;
00366 volatile pthread_t event_loop_id_;
00367 int my_sig_num_;
00368 boost::shared_ptr<FileBlock> fb_;
00369 boost::shared_ptr<EDLooperBase> looper_;
00370
00371 std::auto_ptr<statemachine::Machine> machine_;
00372 PrincipalCache principalCache_;
00373 bool shouldWeStop_;
00374 bool stateMachineWasInErrorState_;
00375 std::string fileMode_;
00376 std::string emptyRunLumiMode_;
00377 std::string exceptionMessageFiles_;
00378 std::string exceptionMessageRuns_;
00379 std::string exceptionMessageLumis_;
00380 bool alreadyHandlingException_;
00381 bool forceLooperToEnd_;
00382 bool looperBeginJobRun_;
00383 bool forceESCacheClearOnNewRun_;
00384
00385 int numberOfForkedChildren_;
00386 unsigned int numberOfSequentialEventsPerChild_;
00387 bool setCpuAffinity_;
00388 typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00389 typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00390 ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
00391 friend class event_processor::StateSentry;
00392 };
00393
00394
00395
00396 inline
00397 EventProcessor::StatusCode
00398 EventProcessor::run() {
00399 return run(-1, false);
00400 }
00401 }
00402 #endif