00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include <exception>
00005 #include <utility>
00006 #include <iostream>
00007 #include <iomanip>
00008
00009 #include "boost/bind.hpp"
00010 #include "boost/thread/xtime.hpp"
00011
00012 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
00013 #include "DataFormats/Provenance/interface/BranchType.h"
00014 #include "FWCore/Utilities/interface/DebugMacros.h"
00015 #include "FWCore/Utilities/interface/EDMException.h"
00016 #include "FWCore/Utilities/interface/GetReleaseVersion.h"
00017 #include "FWCore/Utilities/interface/GetPassID.h"
00018 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00019 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00020
00021 #include "FWCore/Framework/interface/IOVSyncValue.h"
00022 #include "FWCore/Framework/interface/SourceFactory.h"
00023 #include "FWCore/Framework/interface/ModuleFactory.h"
00024 #include "FWCore/Framework/interface/LooperFactory.h"
00025 #include "FWCore/Framework/interface/EventPrincipal.h"
00026 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00027 #include "FWCore/Framework/interface/RunPrincipal.h"
00028 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00029 #include "FWCore/Framework/interface/TriggerNamesService.h"
00030 #include "FWCore/Framework/interface/InputSourceDescription.h"
00031 #include "FWCore/Framework/interface/EventSetupProvider.h"
00032 #include "FWCore/Framework/interface/InputSource.h"
00033 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00034
00035 #include "FWCore/Framework/src/Breakpoints.h"
00036 #include "FWCore/Framework/src/InputSourceFactory.h"
00037
00038 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00039 #include "FWCore/ParameterSet/interface/PythonProcessDesc.h"
00040
00041 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00042 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00043
00044 #include "FWCore/Framework/interface/Schedule.h"
00045 #include "FWCore/Framework/interface/EDLooper.h"
00046
00047 #include "FWCore/Framework/src/EPStates.h"
00048
00049 using boost::shared_ptr;
00050 using edm::serviceregistry::ServiceLegacy;
00051 using edm::serviceregistry::kOverlapIsError;
00052
00053 namespace edm {
00054
00055 namespace event_processor {
00056
00057 class StateSentry
00058 {
00059 public:
00060 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00061 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00062 void succeeded() {success_ = true;}
00063
00064 private:
00065 EventProcessor* ep_;
00066 bool success_;
00067 };
00068 }
00069
00070 using namespace event_processor;
00071 using namespace edm::service;
00072
00073 namespace {
00074
00075
00076
00077
00078 char const* stateNames[] = {
00079 "Init",
00080 "JobReady",
00081 "RunGiven",
00082 "Running",
00083 "Stopping",
00084 "ShuttingDown",
00085 "Done",
00086 "JobEnded",
00087 "Error",
00088 "ErrorEnded",
00089 "End",
00090 "Invalid"
00091 };
00092
00093 char const* msgNames[] = {
00094 "SetRun",
00095 "Skip",
00096 "RunAsync",
00097 "Run(ID)",
00098 "Run(count)",
00099 "BeginJob",
00100 "StopAsync",
00101 "ShutdownAsync",
00102 "EndJob",
00103 "CountComplete",
00104 "InputExhausted",
00105 "StopSignal",
00106 "ShutdownSignal",
00107 "Finished",
00108 "Any",
00109 "dtor",
00110 "Exception",
00111 "Rewind"
00112 };
00113 }
00114
00115
00116
00117
00118
00119 struct TransEntry
00120 {
00121 State current;
00122 Msg message;
00123 State final;
00124 };
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147 TransEntry table[] = {
00148
00149
00150 { sInit, mException, sError },
00151 { sInit, mBeginJob, sJobReady },
00152 { sJobReady, mException, sError },
00153 { sJobReady, mSetRun, sRunGiven },
00154 { sJobReady, mInputRewind, sRunning },
00155 { sJobReady, mSkip, sRunning },
00156 { sJobReady, mRunID, sRunning },
00157 { sJobReady, mRunCount, sRunning },
00158 { sJobReady, mEndJob, sJobEnded },
00159 { sJobReady, mBeginJob, sJobReady },
00160 { sJobReady, mDtor, sEnd },
00161
00162 { sJobReady, mStopAsync, sJobReady },
00163 { sJobReady, mCountComplete, sJobReady },
00164 { sJobReady, mFinished, sJobReady },
00165
00166 { sRunGiven, mException, sError },
00167 { sRunGiven, mRunAsync, sRunning },
00168 { sRunGiven, mBeginJob, sRunGiven },
00169 { sRunGiven, mShutdownAsync, sShuttingDown },
00170 { sRunGiven, mStopAsync, sStopping },
00171 { sRunning, mException, sError },
00172 { sRunning, mStopAsync, sStopping },
00173 { sRunning, mShutdownAsync, sShuttingDown },
00174 { sRunning, mShutdownSignal, sShuttingDown },
00175 { sRunning, mCountComplete, sStopping },
00176 { sRunning, mInputExhausted, sStopping },
00177
00178 { sStopping, mInputRewind, sRunning },
00179 { sStopping, mException, sError },
00180 { sStopping, mFinished, sJobReady },
00181 { sStopping, mCountComplete, sJobReady },
00182 { sStopping, mShutdownSignal, sShuttingDown },
00183 { sStopping, mStopAsync, sStopping },
00184 { sStopping, mInputExhausted, sStopping },
00185
00186 { sShuttingDown, mException, sError },
00187 { sShuttingDown, mShutdownSignal, sShuttingDown },
00188 { sShuttingDown, mCountComplete, sDone },
00189 { sShuttingDown, mInputExhausted, sDone },
00190 { sShuttingDown, mFinished, sDone },
00191
00192
00193
00194 { sDone, mEndJob, sJobEnded },
00195 { sDone, mException, sError },
00196 { sJobEnded, mDtor, sEnd },
00197 { sJobEnded, mException, sError },
00198 { sError, mEndJob, sError },
00199 { sError, mDtor, sError },
00200 { sInit, mDtor, sEnd },
00201 { sStopping, mShutdownAsync, sShuttingDown },
00202 { sInvalid, mAny, sInvalid }
00203 };
00204
00205
00206
00207
00208
00209
00210 shared_ptr<InputSource>
00211 makeInput(ParameterSet const& params,
00212 EventProcessor::CommonParams const& common,
00213 ProductRegistry& preg,
00214 boost::shared_ptr<ActivityRegistry> areg)
00215 {
00216
00217 bool sourceSpecified = false;
00218 try {
00219 ParameterSet main_input =
00220 params.getParameter<ParameterSet>("@main_input");
00221
00222
00223
00224
00225 ModuleDescription md;
00226 md.parameterSetID_ = main_input.id();
00227 md.moduleName_ =
00228 main_input.getParameter<std::string>("@module_type");
00229
00230
00231 md.moduleLabel_ = "source";
00232 md.processConfiguration_ = ProcessConfiguration(common.processName_,
00233 params.id(), getReleaseVersion(), getPassID());
00234
00235 sourceSpecified = true;
00236 InputSourceDescription isdesc(md, preg, areg, common.maxEventsInput_, common.maxLumisInput_);
00237 areg->preSourceConstructionSignal_(md);
00238 shared_ptr<InputSource> input(InputSourceFactory::get()->makeInputSource(main_input, isdesc).release());
00239 areg->postSourceConstructionSignal_(md);
00240
00241 return input;
00242 }
00243 catch(edm::Exception const& iException) {
00244 if(sourceSpecified == false &&
00245 errors::Configuration == iException.categoryCode()) {
00246 throw edm::Exception(errors::Configuration, "FailedInputSource")
00247 << "Configuration of main input source has failed\n"
00248 << iException;
00249 } else {
00250 throw;
00251 }
00252 }
00253 return shared_ptr<InputSource>();
00254 }
00255
00256
00257 static
00258 std::auto_ptr<eventsetup::EventSetupProvider>
00259 makeEventSetupProvider(ParameterSet const& params)
00260 {
00261 using namespace edm::eventsetup;
00262 std::vector<std::string> prefers =
00263 params.getParameter<std::vector<std::string> >("@all_esprefers");
00264
00265 if(prefers.empty()) {
00266 return std::auto_ptr<EventSetupProvider>(new EventSetupProvider());
00267 }
00268
00269 EventSetupProvider::PreferredProviderInfo preferInfo;
00270 EventSetupProvider::RecordToDataMap recordToData;
00271
00272
00273
00274
00275
00276
00277 for(std::vector<std::string>::iterator itName = prefers.begin(), itNameEnd = prefers.end();
00278 itName != itNameEnd;
00279 ++itName)
00280 {
00281 recordToData.clear();
00282 ParameterSet preferPSet = params.getParameter<ParameterSet>(*itName);
00283 std::vector<std::string> recordNames = preferPSet.getParameterNames();
00284 for(std::vector<std::string>::iterator itRecordName = recordNames.begin(),
00285 itRecordNameEnd = recordNames.end();
00286 itRecordName != itRecordNameEnd;
00287 ++itRecordName) {
00288
00289 if((*itRecordName)[0] == '@') {
00290
00291 continue;
00292 }
00293
00294
00295 try {
00296 std::vector<std::string> dataInfo =
00297 preferPSet.getParameter<std::vector<std::string> >(*itRecordName);
00298
00299 if(dataInfo.empty()) {
00300
00301 throw edm::Exception(errors::Configuration)
00302 << "The record named "
00303 << *itRecordName << " specifies no data items";
00304 }
00305
00306 for(std::vector<std::string>::iterator itDatum = dataInfo.begin(),
00307 itDatumEnd = dataInfo.end();
00308 itDatum != itDatumEnd;
00309 ++itDatum){
00310 std::string datumName(*itDatum, 0, itDatum->find_first_of("/"));
00311 std::string labelName;
00312
00313 if(itDatum->size() != datumName.size()) {
00314 labelName = std::string(*itDatum, datumName.size()+1);
00315 }
00316 recordToData.insert(std::make_pair(std::string(*itRecordName),
00317 std::make_pair(datumName,
00318 labelName)));
00319 }
00320 } catch(cms::Exception const& iException) {
00321 cms::Exception theError("ESPreferConfigurationError");
00322 theError << "While parsing the es_prefer statement for type="
00323 << preferPSet.getParameter<std::string>("@module_type")
00324 << " label=\""
00325 << preferPSet.getParameter<std::string>("@module_label")
00326 << "\" an error occurred.";
00327 theError.append(iException);
00328 throw theError;
00329 }
00330 }
00331 preferInfo[ComponentDescription(preferPSet.getParameter<std::string>("@module_type"),
00332 preferPSet.getParameter<std::string>("@module_label"),
00333 false)]
00334 = recordToData;
00335 }
00336 return std::auto_ptr<EventSetupProvider>(new EventSetupProvider(&preferInfo));
00337 }
00338
00339
00340 void
00341 fillEventSetupProvider(edm::eventsetup::EventSetupProvider& cp,
00342 ParameterSet const& params,
00343 EventProcessor::CommonParams const& common)
00344 {
00345 using namespace edm::eventsetup;
00346 std::vector<std::string> providers =
00347 params.getParameter<std::vector<std::string> >("@all_esmodules");
00348
00349 for(std::vector<std::string>::iterator itName = providers.begin(), itNameEnd = providers.end();
00350 itName != itNameEnd;
00351 ++itName)
00352 {
00353 ParameterSet providerPSet = params.getParameter<ParameterSet>(*itName);
00354 ModuleFactory::get()->addTo(cp,
00355 providerPSet,
00356 common.processName_,
00357 common.releaseVersion_,
00358 common.passID_);
00359 }
00360
00361 std::vector<std::string> sources =
00362 params.getParameter<std::vector<std::string> >("@all_essources");
00363
00364 for(std::vector<std::string>::iterator itName = sources.begin(), itNameEnd = sources.end();
00365 itName != itNameEnd;
00366 ++itName)
00367 {
00368 ParameterSet providerPSet = params.getParameter<ParameterSet>(*itName);
00369 SourceFactory::get()->addTo(cp,
00370 providerPSet,
00371 common.processName_,
00372 common.releaseVersion_,
00373 common.passID_);
00374 }
00375 }
00376
00377
00378 boost::shared_ptr<edm::EDLooper>
00379 fillLooper(edm::eventsetup::EventSetupProvider& cp,
00380 ParameterSet const& params,
00381 EventProcessor::CommonParams const& common)
00382 {
00383 using namespace edm::eventsetup;
00384 boost::shared_ptr<edm::EDLooper> vLooper;
00385
00386 std::vector<std::string> loopers =
00387 params.getParameter<std::vector<std::string> >("@all_loopers");
00388
00389 if(loopers.size() == 0) {
00390 return vLooper;
00391 }
00392
00393 assert(1 == loopers.size());
00394
00395 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00396 itName != itNameEnd;
00397 ++itName)
00398 {
00399 ParameterSet providerPSet = params.getParameter<ParameterSet>(*itName);
00400 vLooper = LooperFactory::get()->addTo(cp,
00401 providerPSet,
00402 common.processName_,
00403 common.releaseVersion_,
00404 common.passID_);
00405 }
00406 return vLooper;
00407
00408 }
00409
00410
00411 EventProcessor::EventProcessor(std::string const& config,
00412 ServiceToken const& iToken,
00413 serviceregistry::ServiceLegacy iLegacy,
00414 std::vector<std::string> const& defaultServices,
00415 std::vector<std::string> const& forcedServices) :
00416 preProcessEventSignal_(),
00417 postProcessEventSignal_(),
00418 maxEventsPset_(),
00419 maxLumisPset_(),
00420 actReg_(new ActivityRegistry),
00421 wreg_(actReg_),
00422 preg_(),
00423 serviceToken_(),
00424 input_(),
00425 schedule_(),
00426 esp_(),
00427 act_table_(),
00428 state_(sInit),
00429 event_loop_(),
00430 state_lock_(),
00431 stop_lock_(),
00432 stopper_(),
00433 stop_count_(-1),
00434 last_rc_(epSuccess),
00435 last_error_text_(),
00436 id_set_(false),
00437 event_loop_id_(),
00438 my_sig_num_(getSigNum()),
00439 fb_(),
00440 looper_(),
00441 shouldWeStop_(false),
00442 alreadyHandlingException_(false),
00443 forceLooperToEnd_(false)
00444 {
00445 boost::shared_ptr<edm::ProcessDesc> processDesc(new edm::ProcessDesc(config));
00446 processDesc->addServices(defaultServices, forcedServices);
00447 init(processDesc, iToken, iLegacy);
00448 }
00449
00450 EventProcessor::EventProcessor(std::string const& config,
00451 std::vector<std::string> const& defaultServices,
00452 std::vector<std::string> const& forcedServices) :
00453 preProcessEventSignal_(),
00454 postProcessEventSignal_(),
00455 maxEventsPset_(),
00456 maxLumisPset_(),
00457 actReg_(new ActivityRegistry),
00458 wreg_(actReg_),
00459 preg_(),
00460 serviceToken_(),
00461 input_(),
00462 schedule_(),
00463 esp_(),
00464 act_table_(),
00465 state_(sInit),
00466 event_loop_(),
00467 state_lock_(),
00468 stop_lock_(),
00469 stopper_(),
00470 stop_count_(-1),
00471 last_rc_(epSuccess),
00472 last_error_text_(),
00473 id_set_(false),
00474 event_loop_id_(),
00475 my_sig_num_(getSigNum()),
00476 fb_(),
00477 looper_(),
00478 shouldWeStop_(false),
00479 alreadyHandlingException_(false),
00480 forceLooperToEnd_(false)
00481 {
00482 boost::shared_ptr<edm::ProcessDesc> processDesc(new edm::ProcessDesc(config));
00483 processDesc->addServices(defaultServices, forcedServices);
00484 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00485 }
00486
00487 EventProcessor::EventProcessor(boost::shared_ptr<edm::ProcessDesc> & processDesc,
00488 ServiceToken const& token,
00489 serviceregistry::ServiceLegacy legacy) :
00490 preProcessEventSignal_(),
00491 postProcessEventSignal_(),
00492 maxEventsPset_(),
00493 maxLumisPset_(),
00494 actReg_(new ActivityRegistry),
00495 wreg_(actReg_),
00496 preg_(),
00497 serviceToken_(),
00498 input_(),
00499 schedule_(),
00500 esp_(),
00501 act_table_(),
00502 state_(sInit),
00503 event_loop_(),
00504 state_lock_(),
00505 stop_lock_(),
00506 stopper_(),
00507 stop_count_(-1),
00508 last_rc_(epSuccess),
00509 last_error_text_(),
00510 id_set_(false),
00511 event_loop_id_(),
00512 my_sig_num_(getSigNum()),
00513 fb_(),
00514 looper_(),
00515 shouldWeStop_(false),
00516 alreadyHandlingException_(false),
00517 forceLooperToEnd_(false)
00518 {
00519 init(processDesc, token, legacy);
00520 }
00521
00522
00523 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00524 preProcessEventSignal_(),
00525 postProcessEventSignal_(),
00526 maxEventsPset_(),
00527 maxLumisPset_(),
00528 actReg_(new ActivityRegistry),
00529 wreg_(actReg_),
00530 preg_(),
00531 serviceToken_(),
00532 input_(),
00533 schedule_(),
00534 esp_(),
00535 act_table_(),
00536 state_(sInit),
00537 event_loop_(),
00538 state_lock_(),
00539 stop_lock_(),
00540 stopper_(),
00541 stop_count_(-1),
00542 last_rc_(epSuccess),
00543 last_error_text_(),
00544 id_set_(false),
00545 event_loop_id_(),
00546 my_sig_num_(getSigNum()),
00547 fb_(),
00548 looper_(),
00549 shouldWeStop_(false),
00550 alreadyHandlingException_(false),
00551 forceLooperToEnd_(false)
00552 {
00553 if(isPython)
00554 {
00555 boost::shared_ptr<edm::ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00556 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00557 }
00558 else
00559 {
00560 boost::shared_ptr<edm::ProcessDesc> processDesc(new edm::ProcessDesc(config));
00561 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00562 }
00563 }
00564
00565 void
00566 EventProcessor::init(boost::shared_ptr<edm::ProcessDesc> & processDesc,
00567 ServiceToken const& iToken,
00568 serviceregistry::ServiceLegacy iLegacy) {
00569
00570
00571
00572
00573 shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00574
00575 ParameterSet optionsPset(parameterSet->getUntrackedParameter<ParameterSet>("options", ParameterSet()));
00576 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00577 handleEmptyRuns_ = optionsPset.getUntrackedParameter<bool>("handleEmptyRuns", true);
00578 handleEmptyLumis_ = optionsPset.getUntrackedParameter<bool>("handleEmptyLumis", true);
00579
00580 maxEventsPset_ = parameterSet->getUntrackedParameter<ParameterSet>("maxEvents", ParameterSet());
00581 maxLumisPset_ = parameterSet->getUntrackedParameter<ParameterSet>("maxLuminosityBlocks", ParameterSet());
00582
00583 shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00584
00585
00586
00587 ServiceToken tempToken(ServiceRegistry::createSet(*pServiceSets, iToken, iLegacy));
00588
00589
00590
00591 tempToken.copySlotsTo(*actReg_);
00592
00593
00594 typedef serviceregistry::ServiceWrapper<ConstProductRegistry> w_CPR;
00595 shared_ptr<w_CPR>
00596 reg(new w_CPR(std::auto_ptr<ConstProductRegistry>(new ConstProductRegistry(preg_))));
00597 ServiceToken tempToken2(ServiceRegistry::createContaining(reg,
00598 tempToken,
00599 kOverlapIsError));
00600
00601
00602
00603 std::string processName = parameterSet->getParameter<std::string>("@process_name");
00604
00605 typedef edm::service::TriggerNamesService TNS;
00606 typedef serviceregistry::ServiceWrapper<TNS> w_TNS;
00607
00608 shared_ptr<w_TNS> tnsptr
00609 (new w_TNS(std::auto_ptr<TNS>(new TNS(*parameterSet))));
00610
00611 serviceToken_ = ServiceRegistry::createContaining(tnsptr,
00612 tempToken2,
00613 kOverlapIsError);
00614
00615
00616 ServiceRegistry::Operate operate(serviceToken_);
00617
00618
00619 act_table_ = ActionTable(*parameterSet);
00620 CommonParams common = CommonParams(processName,
00621 getReleaseVersion(),
00622 getPassID(),
00623 maxEventsPset_.getUntrackedParameter<int>("input", -1),
00624 maxLumisPset_.getUntrackedParameter<int>("input", -1));
00625
00626 esp_ = makeEventSetupProvider(*parameterSet);
00627 fillEventSetupProvider(*esp_, *parameterSet, common);
00628
00629 looper_ = fillLooper(*esp_, *parameterSet, common);
00630 if (looper_) looper_->setActionTable(&act_table_);
00631
00632 input_= makeInput(*parameterSet, common, preg_, actReg_);
00633 schedule_ = std::auto_ptr<Schedule>
00634 (new Schedule(*parameterSet,
00635 ServiceRegistry::instance().get<TNS>(),
00636 wreg_,
00637 preg_,
00638 act_table_,
00639 actReg_));
00640
00641
00642 FDEBUG(2) << parameterSet->toString() << std::endl;
00643 connectSigs(this);
00644 }
00645
00646 EventProcessor::~EventProcessor()
00647 {
00648
00649 ServiceToken token = getToken();
00650 ServiceRegistry::Operate op(token);
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661 terminateMachine();
00662
00663 try {
00664 changeState(mDtor);
00665 }
00666 catch(cms::Exception& e)
00667 {
00668 LogError("System")
00669 << e.explainSelf() << "\n";
00670 }
00671
00672
00673 esp_.reset();
00674 schedule_.reset();
00675 input_.reset();
00676 looper_.reset();
00677 wreg_.clear();
00678 actReg_.reset();
00679 }
00680
00681 void
00682 EventProcessor::rewind()
00683 {
00684 beginJob();
00685 changeState(mStopAsync);
00686 changeState(mInputRewind);
00687 {
00688 StateSentry toerror(this);
00689
00690
00691 ServiceRegistry::Operate operate(serviceToken_);
00692
00693 {
00694 input_->repeat();
00695 input_->rewind();
00696 }
00697 changeState(mCountComplete);
00698 toerror.succeeded();
00699 }
00700 changeState(mFinished);
00701 }
00702
00703 std::auto_ptr<EventPrincipal>
00704 EventProcessor::doOneEvent(EventID const& id) {
00705 std::auto_ptr<EventPrincipal> pep;
00706 {
00707 pep = input_->readEvent(id);
00708 }
00709 procOneEvent(pep.get());
00710 return pep;
00711 }
00712
00713 void
00714 EventProcessor::procOneEvent(EventPrincipal *pep) {
00715 if(0 != pep) {
00716 IOVSyncValue ts(pep->id(), pep->luminosityBlock(), pep->time());
00717 EventSetup const& es = esp_->eventSetupForInstance(ts);
00718 schedule_->processOneOccurrence<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(*pep, es);
00719 }
00720 }
00721
00722 EventProcessor::StatusCode
00723 EventProcessor::run(int numberEventsToProcess, bool repeatable)
00724 {
00725 return runEventCount(numberEventsToProcess);
00726 }
00727
00728 EventProcessor::StatusCode
00729 EventProcessor::run(EventID const& id)
00730 {
00731 beginJob();
00732 changeState(mRunID);
00733 StateSentry toerror(this);
00734 Status rc = epSuccess;
00735
00736
00737 ServiceRegistry::Operate operate(serviceToken_);
00738
00739 if(doOneEvent(id).get() == 0) {
00740 changeState(mInputExhausted);
00741 } else {
00742 changeState(mCountComplete);
00743 rc = epInputComplete;
00744 }
00745 toerror.succeeded();
00746 changeState(mFinished);
00747 return rc;
00748 }
00749
00750 EventProcessor::StatusCode
00751 EventProcessor::skip(int numberToSkip)
00752 {
00753 beginJob();
00754 changeState(mSkip);
00755 {
00756 StateSentry toerror(this);
00757
00758
00759 ServiceRegistry::Operate operate(serviceToken_);
00760
00761 {
00762 input_->skipEvents(numberToSkip);
00763 }
00764 changeState(mCountComplete);
00765 toerror.succeeded();
00766 }
00767 changeState(mFinished);
00768 return epSuccess;
00769 }
00770
00771 void
00772 EventProcessor::beginJob()
00773 {
00774 if(state_ != sInit) return;
00775 bk::beginJob();
00776
00777 changeState(mBeginJob);
00778
00779
00780
00781 ServiceRegistry::Operate operate(serviceToken_);
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791 EventSetup const& es =
00792 esp_->eventSetupForInstance(IOVSyncValue::beginOfTime());
00793 if(looper_) {
00794 looper_->beginOfJob(es);
00795 }
00796 try {
00797 input_->doBeginJob(es);
00798 } catch(cms::Exception& e) {
00799 LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00800 e << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00801 throw;
00802 } catch(std::exception& e) {
00803 LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n";
00804 throw;
00805 } catch(...) {
00806 LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n";
00807 throw;
00808 }
00809 schedule_->beginJob(es);
00810 actReg_->postBeginJobSignal_();
00811
00812 }
00813
00814 void
00815 EventProcessor::endJob()
00816 {
00817
00818 ExceptionCollector c;
00819
00820
00821 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00822
00823
00824 ServiceRegistry::Operate operate(serviceToken_);
00825
00826 c.call(boost::bind(&EventProcessor::terminateMachine, this));
00827 c.call(boost::bind(&Schedule::endJob, schedule_.get()));
00828 c.call(boost::bind(&InputSource::doEndJob, input_));
00829 if (looper_) {
00830 c.call(boost::bind(&EDLooper::endOfJob, looper_));
00831 }
00832 c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00833 if (c.hasThrown()) {
00834 c.rethrow();
00835 }
00836 }
00837
00838 ServiceToken
00839 EventProcessor::getToken()
00840 {
00841 return serviceToken_;
00842 }
00843
00844 void
00845 EventProcessor::connectSigs(EventProcessor* ep)
00846 {
00847
00848
00849
00850 actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
00851 actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
00852 }
00853
00854 std::vector<ModuleDescription const*>
00855 EventProcessor::getAllModuleDescriptions() const
00856 {
00857 return schedule_->getAllModuleDescriptions();
00858 }
00859
00860 int
00861 EventProcessor::totalEvents() const
00862 {
00863 return schedule_->totalEvents();
00864 }
00865
00866 int
00867 EventProcessor::totalEventsPassed() const
00868 {
00869 return schedule_->totalEventsPassed();
00870 }
00871
00872 int
00873 EventProcessor::totalEventsFailed() const
00874 {
00875 return schedule_->totalEventsFailed();
00876 }
00877
00878 void
00879 EventProcessor::enableEndPaths(bool active)
00880 {
00881 schedule_->enableEndPaths(active);
00882 }
00883
00884 bool
00885 EventProcessor::endPathsEnabled() const
00886 {
00887 return schedule_->endPathsEnabled();
00888 }
00889
00890 void
00891 EventProcessor::getTriggerReport(TriggerReport& rep) const
00892 {
00893 schedule_->getTriggerReport(rep);
00894 }
00895
00896 void
00897 EventProcessor::clearCounters()
00898 {
00899 schedule_->clearCounters();
00900 }
00901
00902
00903 char const* EventProcessor::currentStateName() const
00904 {
00905 return stateName(getState());
00906 }
00907
00908 char const* EventProcessor::stateName(State s) const
00909 {
00910 return stateNames[s];
00911 }
00912
00913 char const* EventProcessor::msgName(Msg m) const
00914 {
00915 return msgNames[m];
00916 }
00917
00918 State EventProcessor::getState() const
00919 {
00920 return state_;
00921 }
00922
00923 EventProcessor::StatusCode EventProcessor::statusAsync() const
00924 {
00925
00926
00927 return last_rc_;
00928 }
00929
00930 void
00931 EventProcessor::setRunNumber(RunNumber_t runNumber)
00932 {
00933 if (runNumber == 0) {
00934 runNumber = 1;
00935 LogWarning("Invalid Run")
00936 << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
00937 << "Run number was set to 1 instead\n";
00938 }
00939
00940
00941 beginJob();
00942 changeState(mSetRun);
00943
00944
00945 input_->setRunNumber(runNumber);
00946 }
00947
00948 void
00949 EventProcessor::declareRunNumber(RunNumber_t runNumber)
00950 {
00951
00952 beginJob();
00953 changeState(mSetRun);
00954
00955
00956
00957 }
00958
00959 EventProcessor::StatusCode
00960 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds)
00961 {
00962 bool rc = true;
00963 boost::xtime timeout;
00964 boost::xtime_get(&timeout, boost::TIME_UTC);
00965 timeout.sec += timeout_seconds;
00966
00967
00968
00969
00970 {
00971 boost::mutex::scoped_lock sl(stop_lock_);
00972
00973
00974 if(stop_count_ < 0) return last_rc_;
00975
00976 if(timeout_seconds==0)
00977 while(stop_count_==0) stopper_.wait(sl);
00978 else
00979 while(stop_count_==0 &&
00980 (rc = stopper_.timed_wait(sl,timeout)) == true);
00981
00982 if(rc == false)
00983 {
00984
00985
00986
00987
00988
00989
00990 if(id_set_) pthread_cancel(event_loop_id_);
00991
00992
00993 LogWarning("timeout")
00994 << "An asynchronous request was made to shut down "
00995 << "the event loop "
00996 << "and the event loop did not shutdown after "
00997 << timeout_seconds << " seconds\n";
00998 }
00999 else
01000 {
01001 event_loop_->join();
01002 event_loop_.reset();
01003 id_set_ = false;
01004 stop_count_ = -1;
01005 }
01006 }
01007 return rc==false?epTimedOut:last_rc_;
01008 }
01009
01010 EventProcessor::StatusCode
01011 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs)
01012 {
01013 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01014 if(rc!=epTimedOut) changeState(mCountComplete);
01015 else errorState();
01016 return rc;
01017 }
01018
01019
01020 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs)
01021 {
01022 changeState(mStopAsync);
01023 StatusCode rc = waitForAsyncCompletion(secs);
01024 if(rc!=epTimedOut) changeState(mFinished);
01025 else errorState();
01026 return rc;
01027 }
01028
01029 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs)
01030 {
01031 changeState(mShutdownAsync);
01032 StatusCode rc = waitForAsyncCompletion(secs);
01033 if(rc!=epTimedOut) changeState(mFinished);
01034 else errorState();
01035 return rc;
01036 }
01037
01038 void EventProcessor::errorState()
01039 {
01040 state_ = sError;
01041 }
01042
01043
01044 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m)
01045 {
01046
01047
01048
01049 changeState(m);
01050 return waitForAsyncCompletion(60*2);
01051 }
01052
01053 void EventProcessor::changeState(Msg msg)
01054 {
01055
01056
01057 boost::mutex::scoped_lock sl(state_lock_);
01058 State curr = state_;
01059 int rc;
01060
01061
01062 for(rc = 0;
01063 table[rc].current != sInvalid &&
01064 (curr != table[rc].current ||
01065 (curr == table[rc].current &&
01066 msg != table[rc].message && table[rc].message != mAny));
01067 ++rc);
01068
01069 if(table[rc].current == sInvalid)
01070 throw cms::Exception("BadState")
01071 << "A member function of EventProcessor has been called in an"
01072 << " inappropriate order.\n"
01073 << "Bad transition from " << stateName(curr) << " "
01074 << "using message " << msgName(msg) << "\n"
01075 << "No where to go from here.\n";
01076
01077 FDEBUG(1) << "changeState: current=" << stateName(curr)
01078 << ", message=" << msgName(msg)
01079 << " -> new=" << stateName(table[rc].final) << "\n";
01080
01081 state_ = table[rc].final;
01082 }
01083
01084 void EventProcessor::runAsync()
01085 {
01086 using boost::thread;
01087 beginJob();
01088 {
01089 boost::mutex::scoped_lock sl(stop_lock_);
01090 if(id_set_==true) {
01091 std::string err("runAsync called while async event loop already running\n");
01092 edm::LogError("FwkJob") << err;
01093 throw cms::Exception("BadState") << err;
01094 }
01095
01096 changeState(mRunAsync);
01097
01098 stop_count_=0;
01099 last_rc_=epSuccess;
01100 event_loop_.reset(new thread(boost::bind(EventProcessor::asyncRun,this)));
01101 boost::xtime timeout;
01102 boost::xtime_get(&timeout, boost::TIME_UTC);
01103 timeout.sec += 60;
01104 if(starter_.timed_wait(sl,timeout)==false) {
01105
01106 throw cms::Exception("BadState")
01107 << "Async run thread did not start in 60 seconds\n";
01108 }
01109 }
01110 }
01111
01112 void EventProcessor::asyncRun(EventProcessor* me)
01113 {
01114
01115
01116
01117
01118
01119
01120
01121
01122
01123 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,0);
01124
01125 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
01126 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,0);
01127
01128 {
01129 boost::mutex::scoped_lock(me->stop_lock_);
01130 me->event_loop_id_ = pthread_self();
01131 me->id_set_ = true;
01132 me->starter_.notify_all();
01133 }
01134
01135 Status rc = epException;
01136 FDEBUG(2) << "asyncRun starting ......................\n";
01137
01138 try {
01139 bool onlineStateTransitions = true;
01140 rc = me->runToCompletion(onlineStateTransitions);
01141 }
01142 catch (cms::Exception& e) {
01143 edm::LogError("FwkJob") << "cms::Exception caught in "
01144 << "EventProcessor::asyncRun"
01145 << "\n"
01146 << e.explainSelf();
01147 me->last_error_text_ = e.explainSelf();
01148 }
01149 catch (std::exception& e) {
01150 edm::LogError("FwkJob") << "Standard library exception caught in "
01151 << "EventProcessor::asyncRun"
01152 << "\n"
01153 << e.what();
01154 me->last_error_text_ = e.what();
01155 }
01156 catch (...) {
01157 edm::LogError("FwkJob") << "Unknown exception caught in "
01158 << "EventProcessor::asyncRun"
01159 << "\n";
01160 me->last_error_text_ = "Unknown exception caught";
01161 rc = epOther;
01162 }
01163
01164 me->last_rc_ = rc;
01165
01166 {
01167
01168 boost::mutex::scoped_lock sl(me->stop_lock_);
01169 ++me->stop_count_;
01170 me->stopper_.notify_all();
01171 }
01172 FDEBUG(2) << "asyncRun ending ......................\n";
01173 }
01174
01175
01176 edm::EventProcessor::StatusCode
01177 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01178
01179 StateSentry toerror(this);
01180
01181 int numberOfEventsToProcess = -1;
01182 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01183
01184 if (machine_.get() != 0) {
01185 throw edm::Exception(errors::LogicError)
01186 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01187 << "Please report this error to the Framework group\n";
01188 }
01189
01190 toerror.succeeded();
01191
01192 return returnCode;
01193 }
01194
01195 edm::EventProcessor::StatusCode
01196 EventProcessor::runEventCount(int numberOfEventsToProcess) {
01197
01198 StateSentry toerror(this);
01199
01200 bool onlineStateTransitions = false;
01201 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01202
01203 toerror.succeeded();
01204
01205 return returnCode;
01206 }
01207
01208 edm::EventProcessor::StatusCode
01209 EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01210
01211 beginJob();
01212
01213 if (!onlineStateTransitions) changeState(mRunCount);
01214
01215 StatusCode returnCode = epSuccess;
01216 stateMachineWasInErrorState_ = false;
01217
01218
01219 ServiceRegistry::Operate operate(serviceToken_);
01220
01221 if (machine_.get() == 0) {
01222
01223 statemachine::FileMode fileMode;
01224 if (fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01225 else if (fileMode_ == std::string("MERGE")) fileMode = statemachine::MERGE;
01226 else if (fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01227 else if (fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01228 else if (fileMode_ == std::string("FULLLUMIMERGE")) fileMode = statemachine::FULLLUMIMERGE;
01229 else {
01230 throw edm::Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01231 << fileMode_ << ".\n"
01232 << "Legal values are 'MERGE', 'NOMERGE', 'FULLMERGE', and 'FULLLUMIMERGE'.\n";
01233 }
01234
01235 machine_.reset(new statemachine::Machine(this,
01236 fileMode,
01237 handleEmptyRuns_,
01238 handleEmptyLumis_));
01239
01240 machine_->initiate();
01241 }
01242
01243 try {
01244
01245 InputSource::ItemType itemType;
01246
01247 int iEvents = 0;
01248
01249 while (true) {
01250
01251 itemType = input_->nextItemType();
01252
01253 FDEBUG(1) << "itemType = " << itemType << "\n";
01254
01255
01256
01257
01258
01259
01260
01261
01262 if (state_ == sStopping) {
01263 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01264 forceLooperToEnd_ = true;
01265 machine_->process_event(statemachine::Stop());
01266 forceLooperToEnd_ = false;
01267 break;
01268 }
01269 else if (state_ == sShuttingDown) {
01270 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01271 forceLooperToEnd_ = true;
01272 machine_->process_event(statemachine::Stop());
01273 forceLooperToEnd_ = false;
01274 break;
01275 }
01276
01277
01278 {
01279 boost::mutex::scoped_lock sl(usr2_lock);
01280 if (edm::shutdown_flag) {
01281 changeState(mShutdownSignal);
01282 returnCode = epSignal;
01283 forceLooperToEnd_ = true;
01284 machine_->process_event(statemachine::Stop());
01285 forceLooperToEnd_ = false;
01286 break;
01287 }
01288 }
01289
01290 if (itemType == InputSource::IsStop) {
01291 machine_->process_event(statemachine::Stop());
01292 }
01293 else if (itemType == InputSource::IsFile) {
01294 machine_->process_event(statemachine::File());
01295 }
01296 else if (itemType == InputSource::IsRun) {
01297 machine_->process_event(statemachine::Run(input_->run()));
01298 }
01299 else if (itemType == InputSource::IsLumi) {
01300 machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01301 }
01302 else if (itemType == InputSource::IsEvent) {
01303 machine_->process_event(statemachine::Event());
01304 ++iEvents;
01305 if (numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01306 returnCode = epCountComplete;
01307 changeState(mInputExhausted);
01308 FDEBUG(1) << "Event count complete, pausing event loop\n";
01309 break;
01310 }
01311 }
01312
01313 else {
01314 throw edm::Exception(errors::LogicError)
01315 << "Unknown next item type passed to EventProcessor\n"
01316 << "Please report this error to the Framework group\n";
01317 }
01318
01319 if (machine_->terminated()) {
01320 changeState(mInputExhausted);
01321 break;
01322 }
01323 }
01324 }
01325
01326
01327
01328
01329
01330
01331
01332
01333
01334
01335
01336
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361
01362
01363
01364
01365
01366
01367
01368
01369
01370
01371 catch (cms::Exception& e) {
01372 alreadyHandlingException_ = true;
01373 terminateMachine();
01374 alreadyHandlingException_ = false;
01375 e << "cms::Exception caught in EventProcessor and rethrown\n";
01376 e << exceptionMessageLumis_;
01377 e << exceptionMessageRuns_;
01378 e << exceptionMessageFiles_;
01379 throw e;
01380 }
01381 catch (std::bad_alloc& e) {
01382 alreadyHandlingException_ = true;
01383 terminateMachine();
01384 alreadyHandlingException_ = false;
01385 throw cms::Exception("std::bad_alloc")
01386 << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n"
01387 << "The job has probably exhausted the virtual memory available to the process.\n"
01388 << exceptionMessageLumis_
01389 << exceptionMessageRuns_
01390 << exceptionMessageFiles_;
01391 }
01392 catch (std::exception& e) {
01393 alreadyHandlingException_ = true;
01394 terminateMachine();
01395 alreadyHandlingException_ = false;
01396 throw cms::Exception("StdException")
01397 << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n"
01398 << "Previous information:\n" << e.what() << "\n"
01399 << exceptionMessageLumis_
01400 << exceptionMessageRuns_
01401 << exceptionMessageFiles_;
01402 }
01403 catch (...) {
01404 alreadyHandlingException_ = true;
01405 terminateMachine();
01406 alreadyHandlingException_ = false;
01407 throw cms::Exception("Unknown")
01408 << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n"
01409 << exceptionMessageLumis_
01410 << exceptionMessageRuns_
01411 << exceptionMessageFiles_;
01412 }
01413
01414 if (machine_->terminated()) {
01415 FDEBUG(1) << "The state machine reports it has been terminated\n";
01416 machine_.reset();
01417 }
01418
01419 if (!onlineStateTransitions) changeState(mFinished);
01420
01421 if (stateMachineWasInErrorState_) {
01422 throw cms::Exception("BadState")
01423 << "The boost state machine in the EventProcessor exited after\n"
01424 << "entering the Error state.\n";
01425 }
01426
01427 return returnCode;
01428 }
01429
01430 void EventProcessor::readFile() {
01431 FDEBUG(1) << " \treadFile\n";
01432 fb_ = input_->readFile();
01433 }
01434
01435 void EventProcessor::closeInputFile() {
01436 input_->closeFile();
01437 FDEBUG(1) << "\tcloseInputFile\n";
01438 }
01439
01440 void EventProcessor::openOutputFiles() {
01441 schedule_->openOutputFiles(*fb_);
01442 FDEBUG(1) << "\topenOutputFiles\n";
01443 }
01444
01445 void EventProcessor::closeOutputFiles() {
01446 schedule_->closeOutputFiles();
01447 FDEBUG(1) << "\tcloseOutputFiles\n";
01448 }
01449
01450 void EventProcessor::respondToOpenInputFile() {
01451 schedule_->respondToOpenInputFile(*fb_);
01452 FDEBUG(1) << "\trespondToOpenInputFile\n";
01453 }
01454
01455 void EventProcessor::respondToCloseInputFile() {
01456 schedule_->respondToCloseInputFile(*fb_);
01457 FDEBUG(1) << "\trespondToCloseInputFile\n";
01458 }
01459
01460 void EventProcessor::respondToOpenOutputFiles() {
01461 schedule_->respondToOpenOutputFiles(*fb_);
01462 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01463 }
01464
01465 void EventProcessor::respondToCloseOutputFiles() {
01466 schedule_->respondToCloseOutputFiles(*fb_);
01467 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01468 }
01469
01470 void EventProcessor::startingNewLoop() {
01471 shouldWeStop_ = false;
01472 if (looper_) {
01473 looper_->doStartingNewLoop();
01474 }
01475 FDEBUG(1) << "\tstartingNewLoop\n";
01476 }
01477
01478 bool EventProcessor::endOfLoop() {
01479 if (looper_) {
01480 EDLooper::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01481 if (status != EDLooper::kContinue || forceLooperToEnd_) return true;
01482 else return false;
01483 }
01484 FDEBUG(1) << "\tendOfLoop\n";
01485 return true;
01486 }
01487
01488 void EventProcessor::rewindInput() {
01489 input_->repeat();
01490 input_->rewind();
01491 FDEBUG(1) << "\trewind\n";
01492 }
01493
01494 void EventProcessor::prepareForNextLoop() {
01495 looper_->prepareForNextLoop(esp_.get());
01496 FDEBUG(1) << "\tprepareForNextLoop\n";
01497 }
01498
01499 void EventProcessor::writeLumiCache() {
01500 while (!principalCache_.noMoreLumis()) {
01501 schedule_->writeLumi(principalCache_.lowestLumi());
01502 principalCache_.deleteLowestLumi();
01503 }
01504 FDEBUG(1) << "\twriteLumiCache\n";
01505 }
01506
01507 void EventProcessor::writeRunCache() {
01508 while (!principalCache_.noMoreRuns()) {
01509 schedule_->writeRun(principalCache_.lowestRun());
01510 principalCache_.deleteLowestRun();
01511 }
01512 FDEBUG(1) << "\twriteRunCache\n";
01513 }
01514
01515 bool EventProcessor::shouldWeCloseOutput() const {
01516 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01517 return schedule_->shouldWeCloseOutput();
01518 }
01519
01520 void EventProcessor::doErrorStuff() {
01521 FDEBUG(1) << "\tdoErrorStuff\n";
01522 edm::LogError("StateMachine")
01523 << "The EventProcessor state machine encountered an unexpected event\n"
01524 << "and went to the error state\n"
01525 << "Will attempt to terminate processing normally\n"
01526 << "(IF using the looper the next loop will be attempted)\n"
01527 << "This likely indicates a bug in an input module or corrupted input or both\n";
01528 stateMachineWasInErrorState_ = true;
01529 }
01530
01531 void EventProcessor::beginRun(int run) {
01532 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run);
01533 IOVSyncValue ts(EventID(runPrincipal.run(),0),
01534 0,
01535 runPrincipal.beginTime());
01536 EventSetup const& es = esp_->eventSetupForInstance(ts);
01537 schedule_->processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionBegin> >(runPrincipal, es);
01538 FDEBUG(1) << "\tbeginRun " << run << "\n";
01539 }
01540
01541 void EventProcessor::endRun(int run) {
01542 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run);
01543 input_->doEndRun(runPrincipal);
01544 IOVSyncValue ts(EventID(runPrincipal.run(),EventID::maxEventNumber()),
01545 LuminosityBlockID::maxLuminosityBlockNumber(),
01546 runPrincipal.endTime());
01547 EventSetup const& es = esp_->eventSetupForInstance(ts);
01548 schedule_->processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionEnd> >(runPrincipal, es);
01549 FDEBUG(1) << "\tendRun " << run << "\n";
01550 }
01551
01552 void EventProcessor::beginLumi(int run, int lumi) {
01553 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(run, lumi);
01554
01555
01556 IOVSyncValue ts(EventID(lumiPrincipal.run(),0), lumiPrincipal.luminosityBlock(), lumiPrincipal.beginTime());
01557 EventSetup const& es = esp_->eventSetupForInstance(ts);
01558 schedule_->processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> >(lumiPrincipal, es);
01559 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01560 }
01561
01562 void EventProcessor::endLumi(int run, int lumi) {
01563 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(run, lumi);
01564 input_->doEndLumi(lumiPrincipal);
01565
01566
01567 IOVSyncValue ts(EventID(lumiPrincipal.run(),EventID::maxEventNumber()),
01568 lumiPrincipal.luminosityBlock(),
01569 lumiPrincipal.endTime());
01570 EventSetup const& es = esp_->eventSetupForInstance(ts);
01571 schedule_->processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> >(lumiPrincipal, es);
01572 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01573 }
01574
01575 int EventProcessor::readAndCacheRun() {
01576 principalCache_.insert(input_->readRun());
01577 FDEBUG(1) << "\treadAndCacheRun " << "\n";
01578 return principalCache_.runPrincipal().run();
01579 }
01580
01581 int EventProcessor::readAndCacheLumi() {
01582 principalCache_.insert(input_->readLuminosityBlock(principalCache_.runPrincipalPtr()));
01583 FDEBUG(1) << "\treadAndCacheLumi " << "\n";
01584 return principalCache_.lumiPrincipal().luminosityBlock();
01585 }
01586
01587 void EventProcessor::writeRun(int run) {
01588 schedule_->writeRun(principalCache_.runPrincipal(run));
01589 FDEBUG(1) << "\twriteRun " << run << "\n";
01590 }
01591
01592 void EventProcessor::deleteRunFromCache(int run) {
01593 principalCache_.deleteRun(run);
01594 FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
01595 }
01596
01597 void EventProcessor::writeLumi(int run, int lumi) {
01598 schedule_->writeLumi(principalCache_.lumiPrincipal(run, lumi));
01599 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01600 }
01601
01602 void EventProcessor::deleteLumiFromCache(int run, int lumi) {
01603 principalCache_.deleteLumi(run, lumi);
01604 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01605 }
01606
01607 void EventProcessor::readEvent() {
01608 sm_evp_ = input_->readEvent(principalCache_.lumiPrincipalPtr());
01609 FDEBUG(1) << "\treadEvent\n";
01610 }
01611
01612 void EventProcessor::processEvent() {
01613 IOVSyncValue ts(sm_evp_->id(), sm_evp_->luminosityBlock(), sm_evp_->time());
01614 EventSetup const& es = esp_->eventSetupForInstance(ts);
01615 schedule_->processOneOccurrence<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(*sm_evp_, es);
01616
01617 if (looper_) {
01618 EDLooper::Status status = looper_->doDuringLoop(*sm_evp_, esp_->eventSetup());
01619 if (status != EDLooper::kContinue) shouldWeStop_ = true;
01620 }
01621
01622 FDEBUG(1) << "\tprocessEvent\n";
01623 }
01624
01625 bool EventProcessor::shouldWeStop() const {
01626 FDEBUG(1) << "\tshouldWeStop\n";
01627 if (shouldWeStop_) return true;
01628 return schedule_->terminate();
01629 }
01630
01631 void EventProcessor::setExceptionMessageFiles(std::string& message) {
01632 exceptionMessageFiles_ = message;
01633 }
01634
01635 void EventProcessor::setExceptionMessageRuns(std::string& message) {
01636 exceptionMessageRuns_ = message;
01637 }
01638
01639 void EventProcessor::setExceptionMessageLumis(std::string& message) {
01640 exceptionMessageLumis_ = message;
01641 }
01642
01643 bool EventProcessor::alreadyHandlingException() const {
01644 return alreadyHandlingException_;
01645 }
01646
01647 void EventProcessor::terminateMachine() {
01648 if (machine_.get() != 0) {
01649 if (!machine_->terminated()) {
01650 forceLooperToEnd_ = true;
01651 machine_->process_event(statemachine::Stop());
01652 forceLooperToEnd_ = false;
01653 }
01654 else {
01655 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
01656 }
01657 if (machine_->terminated()) {
01658 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
01659 }
01660 machine_.reset();
01661 }
01662 }
01663 }