00001 #ifndef FWCore_Framework_EventProcessor_h
00002 #define FWCore_Framework_EventProcessor_h
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
00012
00013 #include "FWCore/Framework/interface/Frameworkfwd.h"
00014 #include "FWCore/Framework/interface/IEventProcessor.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00017
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019
00020 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00021 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
00022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00023
00024 #include "boost/shared_ptr.hpp"
00025 #include "boost/thread/condition.hpp"
00026 #include "boost/utility.hpp"
00027
00028 #include <map>
00029 #include <memory>
00030 #include <set>
00031 #include <string>
00032 #include <vector>
00033
00034 namespace statemachine {
00035 class Machine;
00036 class Run;
00037 }
00038
00039 namespace edm {
00040
00041 class ActionTable;
00042 class EDLooperBase;
00043 class ProcessDesc;
00044 namespace eventsetup {
00045 class EventSetupProvider;
00046 }
00047
00048 namespace event_processor {
00049
00050
00051
00052
00053
00054 enum State { sInit = 0, sJobReady, sRunGiven, sRunning, sStopping,
00055 sShuttingDown, sDone, sJobEnded, sError, sErrorEnded, sEnd, sInvalid };
00056
00057 enum Msg { mSetRun = 0, mSkip, mRunAsync, mRunID, mRunCount, mBeginJob,
00058 mStopAsync, mShutdownAsync, mEndJob, mCountComplete,
00059 mInputExhausted, mStopSignal, mShutdownSignal, mFinished,
00060 mAny, mDtor, mException, mInputRewind };
00061
00062 class StateSentry;
00063 }
00064
00065 class EventProcessor : public IEventProcessor, private boost::noncopyable {
00066 public:
00067
00068
00069
00070
00071
00072
00073 explicit EventProcessor(std::string const& config,
00074 ServiceToken const& token = ServiceToken(),
00075 serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
00076 std::vector<std::string> const& defaultServices = std::vector<std::string>(),
00077 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00078
00079
00080
00081 EventProcessor(std::string const& config,
00082 std::vector<std::string> const& defaultServices,
00083 std::vector<std::string> const& forcedServices = std::vector<std::string>());
00084
00085 EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00086 ServiceToken const& token,
00087 serviceregistry::ServiceLegacy legacy);
00088
00090 EventProcessor(std::string const& config, bool isPython);
00091
00092 ~EventProcessor();
00093
00098 void beginJob();
00099
00103 void endJob();
00104
00108 char const* currentStateName() const;
00109 char const* stateName(event_processor::State s) const;
00110 char const* msgName(event_processor::Msg m) const;
00111 event_processor::State getState() const;
00112 void runAsync();
00113 StatusCode statusAsync() const;
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128 StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
00129
00130
00131 StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
00132
00133
00134
00135 StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
00136
00137
00138
00139
00140
00141 void setRunNumber(RunNumber_t runNumber);
00142 void declareRunNumber(RunNumber_t runNumber);
00143
00144
00145
00146
00147
00148
00149
00150 StatusCode run(int numberEventsToProcess, bool repeatable = true);
00151 StatusCode run();
00152
00153
00154
00155 StatusCode skip(int numberToSkip);
00156
00157
00158 void rewind();
00159
00162
00166
00167 std::vector<ModuleDescription const*>
00168 getAllModuleDescriptions() const;
00169
00173 int totalEvents() const;
00174
00177 int totalEventsPassed() const;
00178
00181 int totalEventsFailed() const;
00182
00185 void enableEndPaths(bool active);
00186
00189 bool endPathsEnabled() const;
00190
00193 void getTriggerReport(TriggerReport& rep) const;
00194
00196 void clearCounters();
00197
00198
00199
00200 ServiceToken getToken();
00201
00204 ActivityRegistry::PreProcessEvent&
00205 preProcessEventSignal() {return preProcessEventSignal_;}
00206
00209 ActivityRegistry::PostProcessEvent&
00210 postProcessEventSignal() {return postProcessEventSignal_;}
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258 virtual StatusCode runToCompletion(bool onlineStateTransitions);
00259 virtual StatusCode runEventCount(int numberOfEventsToProcess);
00260
00261
00262
00263
00264 virtual void readFile();
00265 virtual void closeInputFile();
00266 virtual void openOutputFiles();
00267 virtual void closeOutputFiles();
00268
00269 virtual void respondToOpenInputFile();
00270 virtual void respondToCloseInputFile();
00271 virtual void respondToOpenOutputFiles();
00272 virtual void respondToCloseOutputFiles();
00273
00274 virtual void startingNewLoop();
00275 virtual bool endOfLoop();
00276 virtual void rewindInput();
00277 virtual void prepareForNextLoop();
00278 virtual bool shouldWeCloseOutput() const;
00279
00280 virtual void doErrorStuff();
00281
00282 virtual void beginRun(statemachine::Run const& run);
00283 virtual void endRun(statemachine::Run const& run);
00284
00285 virtual void beginLumi(ProcessHistoryID const& phid, int run, int lumi);
00286 virtual void endLumi(ProcessHistoryID const& phid, int run, int lumi);
00287
00288 virtual statemachine::Run readAndCacheRun();
00289 virtual int readAndCacheLumi();
00290 virtual void writeRun(statemachine::Run const& run);
00291 virtual void deleteRunFromCache(statemachine::Run const& run);
00292 virtual void writeLumi(ProcessHistoryID const& phid, int run, int lumi);
00293 virtual void deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi);
00294
00295 virtual void readAndProcessEvent();
00296 virtual bool shouldWeStop() const;
00297
00298 virtual void setExceptionMessageFiles(std::string& message);
00299 virtual void setExceptionMessageRuns(std::string& message);
00300 virtual void setExceptionMessageLumis(std::string& message);
00301
00302 virtual bool alreadyHandlingException() const;
00303
00304
00305 bool forkProcess(std::string const& jobReportFile);
00306
00307 private:
00308
00309
00310
00311
00312 void init(boost::shared_ptr<ProcessDesc>& processDesc,
00313 ServiceToken const& token,
00314 serviceregistry::ServiceLegacy);
00315
00316 StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
00317 void terminateMachine();
00318
00319 StatusCode doneAsync(event_processor::Msg m);
00320
00321 StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
00322
00323 void connectSigs(EventProcessor* ep);
00324
00325 void changeState(event_processor::Msg);
00326 void errorState();
00327 void setupSignal();
00328
00329 static void asyncRun(EventProcessor*);
00330
00331
00332
00333
00334
00335
00336
00337
00338 ActivityRegistry::PreProcessEvent preProcessEventSignal_;
00339 ActivityRegistry::PostProcessEvent postProcessEventSignal_;
00340 boost::shared_ptr<ActivityRegistry> actReg_;
00341 boost::shared_ptr<SignallingProductRegistry> preg_;
00342 ServiceToken serviceToken_;
00343 boost::shared_ptr<InputSource> input_;
00344 boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
00345 boost::shared_ptr<ActionTable const> act_table_;
00346 boost::shared_ptr<ProcessConfiguration> processConfiguration_;
00347 std::auto_ptr<Schedule> schedule_;
00348
00349 volatile event_processor::State state_;
00350 boost::shared_ptr<boost::thread> event_loop_;
00351
00352 boost::mutex state_lock_;
00353 boost::mutex stop_lock_;
00354 boost::condition stopper_;
00355 boost::condition starter_;
00356 volatile int stop_count_;
00357 volatile Status last_rc_;
00358 std::string last_error_text_;
00359 volatile bool id_set_;
00360 volatile pthread_t event_loop_id_;
00361 int my_sig_num_;
00362 boost::shared_ptr<FileBlock> fb_;
00363 boost::shared_ptr<EDLooperBase> looper_;
00364
00365 std::auto_ptr<statemachine::Machine> machine_;
00366 PrincipalCache principalCache_;
00367 bool shouldWeStop_;
00368 bool stateMachineWasInErrorState_;
00369 std::string fileMode_;
00370 std::string emptyRunLumiMode_;
00371 std::string exceptionMessageFiles_;
00372 std::string exceptionMessageRuns_;
00373 std::string exceptionMessageLumis_;
00374 bool alreadyHandlingException_;
00375 bool forceLooperToEnd_;
00376 bool looperBeginJobRun_;
00377 bool forceESCacheClearOnNewRun_;
00378
00379 int numberOfForkedChildren_;
00380 unsigned int numberOfSequentialEventsPerChild_;
00381 bool setCpuAffinity_;
00382 typedef std::set<std::pair<std::string, std::string> > ExcludedData;
00383 typedef std::map<std::string, ExcludedData> ExcludedDataMap;
00384 ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
00385 friend class event_processor::StateSentry;
00386 };
00387
00388
00389
00390 inline
00391 EventProcessor::StatusCode
00392 EventProcessor::run() {
00393 return run(-1, false);
00394 }
00395 }
00396 #endif