00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
00010 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00011 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00012
00013 #include "FWCore/Framework/interface/CommonParams.h"
00014 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00015 #include "FWCore/Framework/interface/EDLooperBase.h"
00016 #include "FWCore/Framework/interface/EventPrincipal.h"
00017 #include "FWCore/Framework/interface/EventSetupProvider.h"
00018 #include "FWCore/Framework/interface/EventSetupRecord.h"
00019 #include "FWCore/Framework/src/EventSetupsController.h"
00020 #include "FWCore/Framework/interface/FileBlock.h"
00021 #include "FWCore/Framework/interface/InputSourceDescription.h"
00022 #include "FWCore/Framework/interface/IOVSyncValue.h"
00023 #include "FWCore/Framework/interface/LooperFactory.h"
00024 #include "FWCore/Framework/interface/LuminosityBlock.h"
00025 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00026 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00027 #include "FWCore/Framework/interface/ModuleChanger.h"
00028 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00029 #include "FWCore/Framework/interface/ProcessingController.h"
00030 #include "FWCore/Framework/interface/RunPrincipal.h"
00031 #include "FWCore/Framework/interface/Schedule.h"
00032 #include "FWCore/Framework/interface/ScheduleInfo.h"
00033 #include "FWCore/Framework/interface/TriggerNamesService.h"
00034 #include "FWCore/Framework/src/Breakpoints.h"
00035 #include "FWCore/Framework/src/EPStates.h"
00036 #include "FWCore/Framework/src/InputSourceFactory.h"
00037
00038 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00039
00040 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/GetPassID.h"
00053 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00054 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00055 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00056 #include "FWCore/Version/interface/GetReleaseVersion.h"
00057
00058 #include "boost/bind.hpp"
00059 #include "boost/thread/xtime.hpp"
00060
00061 #include <exception>
00062 #include <iomanip>
00063 #include <iostream>
00064 #include <utility>
00065
00066 #include <sys/ipc.h>
00067 #include <sys/msg.h>
00068
00069
00070 #include <sys/types.h>
00071 #include <sys/wait.h>
00072 #include <unistd.h>
00073
00074
00075 #ifndef __APPLE__
00076 #include <sched.h>
00077 #endif
00078
00079 namespace edm {
00080
00081 namespace event_processor {
00082
00083 class StateSentry {
00084 public:
00085 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00086 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00087 void succeeded() {success_ = true;}
00088
00089 private:
00090 EventProcessor* ep_;
00091 bool success_;
00092 };
00093 }
00094
00095 using namespace event_processor;
00096
00097 namespace {
00098 template <typename T>
00099 class ScheduleSignalSentry {
00100 public:
00101 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00102 a_(a), principal_(principal), es_(es) {
00103 if (a_) T::preScheduleSignal(a_, principal_);
00104 }
00105 ~ScheduleSignalSentry() {
00106 if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00107 }
00108
00109 private:
00110
00111 ActivityRegistry* a_;
00112 typename T::MyPrincipal* principal_;
00113 EventSetup const* es_;
00114 };
00115 }
00116
00117 namespace {
00118
00119
00120
00121
00122 char const* stateNames[] = {
00123 "Init",
00124 "JobReady",
00125 "RunGiven",
00126 "Running",
00127 "Stopping",
00128 "ShuttingDown",
00129 "Done",
00130 "JobEnded",
00131 "Error",
00132 "ErrorEnded",
00133 "End",
00134 "Invalid"
00135 };
00136
00137 char const* msgNames[] = {
00138 "SetRun",
00139 "Skip",
00140 "RunAsync",
00141 "Run(ID)",
00142 "Run(count)",
00143 "BeginJob",
00144 "StopAsync",
00145 "ShutdownAsync",
00146 "EndJob",
00147 "CountComplete",
00148 "InputExhausted",
00149 "StopSignal",
00150 "ShutdownSignal",
00151 "Finished",
00152 "Any",
00153 "dtor",
00154 "Exception",
00155 "Rewind"
00156 };
00157 }
00158
00159
00160
00161
00162
00163 struct TransEntry {
00164 State current;
00165 Msg message;
00166 State final;
00167 };
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190 TransEntry table[] = {
00191
00192
00193 { sInit, mException, sError },
00194 { sInit, mBeginJob, sJobReady },
00195 { sJobReady, mException, sError },
00196 { sJobReady, mSetRun, sRunGiven },
00197 { sJobReady, mInputRewind, sRunning },
00198 { sJobReady, mSkip, sRunning },
00199 { sJobReady, mRunID, sRunning },
00200 { sJobReady, mRunCount, sRunning },
00201 { sJobReady, mEndJob, sJobEnded },
00202 { sJobReady, mBeginJob, sJobReady },
00203 { sJobReady, mDtor, sEnd },
00204
00205 { sJobReady, mStopAsync, sJobReady },
00206 { sJobReady, mCountComplete, sJobReady },
00207 { sJobReady, mFinished, sJobReady },
00208
00209 { sRunGiven, mException, sError },
00210 { sRunGiven, mRunAsync, sRunning },
00211 { sRunGiven, mBeginJob, sRunGiven },
00212 { sRunGiven, mShutdownAsync, sShuttingDown },
00213 { sRunGiven, mStopAsync, sStopping },
00214 { sRunning, mException, sError },
00215 { sRunning, mStopAsync, sStopping },
00216 { sRunning, mShutdownAsync, sShuttingDown },
00217 { sRunning, mShutdownSignal, sShuttingDown },
00218 { sRunning, mCountComplete, sStopping },
00219 { sRunning, mInputExhausted, sStopping },
00220
00221 { sStopping, mInputRewind, sRunning },
00222 { sStopping, mException, sError },
00223 { sStopping, mFinished, sJobReady },
00224 { sStopping, mCountComplete, sJobReady },
00225 { sStopping, mShutdownSignal, sShuttingDown },
00226 { sStopping, mStopAsync, sStopping },
00227 { sStopping, mInputExhausted, sStopping },
00228
00229 { sShuttingDown, mException, sError },
00230 { sShuttingDown, mShutdownSignal, sShuttingDown },
00231 { sShuttingDown, mCountComplete, sDone },
00232 { sShuttingDown, mInputExhausted, sDone },
00233 { sShuttingDown, mFinished, sDone },
00234
00235
00236
00237 { sDone, mEndJob, sJobEnded },
00238 { sDone, mException, sError },
00239 { sJobEnded, mDtor, sEnd },
00240 { sJobEnded, mException, sError },
00241 { sError, mEndJob, sError },
00242 { sError, mDtor, sError },
00243 { sInit, mDtor, sEnd },
00244 { sStopping, mShutdownAsync, sShuttingDown },
00245 { sInvalid, mAny, sInvalid }
00246 };
00247
00248
00249
00250
00251
00252
00253 boost::shared_ptr<InputSource>
00254 makeInput(ParameterSet& params,
00255 CommonParams const& common,
00256 ProductRegistry& preg,
00257 PrincipalCache& pCache,
00258 boost::shared_ptr<ActivityRegistry> areg,
00259 boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00260 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00261 if(main_input == 0) {
00262 throw Exception(errors::Configuration, "FailedInputSource")
00263 << "Configuration of primary input source has failed\n";
00264 }
00265
00266 std::string modtype;
00267 try {
00268 modtype = main_input->getParameter<std::string>("@module_type");
00269 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00270 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00271 ConfigurationDescriptions descriptions(filler->baseType());
00272 filler->fill(descriptions);
00273 descriptions.validate(*main_input, std::string("source"));
00274 }
00275 catch (cms::Exception& iException) {
00276 Exception toThrow(errors::Configuration, "Failed validating primary input source configuration.");
00277 toThrow << "\nSource plugin name is \"" << modtype << "\"\n";
00278 toThrow.append(iException);
00279 throw toThrow;
00280 }
00281
00282 main_input->registerIt();
00283
00284
00285
00286
00287
00288
00289
00290 ModuleDescription md(main_input->id(),
00291 main_input->getParameter<std::string>("@module_type"),
00292 "source",
00293 processConfiguration);
00294
00295 InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00296 areg->preSourceConstructionSignal_(md);
00297 boost::shared_ptr<InputSource> input;
00298 try {
00299 input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00300 areg->postSourceConstructionSignal_(md);
00301 }
00302 catch (Exception& iException) {
00303 areg->postSourceConstructionSignal_(md);
00304
00305 Exception toThrow(iException.categoryCode(), "Error occured while constructing primary input source.");
00306 toThrow << "\nSource is of type \"" << modtype << "\"\n";
00307 toThrow.append(iException);
00308 throw toThrow;
00309 }
00310 catch (cms::Exception& iException) {
00311 areg->postSourceConstructionSignal_(md);
00312 Exception toThrow(errors::Configuration, "Error occured while constructing primary input source.");
00313 toThrow << "\nSource is of type \"" << modtype << "\"\n";
00314 toThrow.append(iException);
00315 throw toThrow;
00316 }
00317
00318 return input;
00319 }
00320
00321
00322 boost::shared_ptr<EDLooperBase>
00323 fillLooper(eventsetup::EventSetupProvider& cp,
00324 ParameterSet& params,
00325 CommonParams const& common) {
00326 boost::shared_ptr<EDLooperBase> vLooper;
00327
00328 std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00329
00330 if(loopers.size() == 0) {
00331 return vLooper;
00332 }
00333
00334 assert(1 == loopers.size());
00335
00336 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00337 itName != itNameEnd;
00338 ++itName) {
00339
00340 ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00341 providerPSet->registerIt();
00342 vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00343 *providerPSet,
00344 common.processName_,
00345 common.releaseVersion_,
00346 common.passID_);
00347 }
00348 return vLooper;
00349
00350 }
00351
00352
00353 EventProcessor::EventProcessor(std::string const& config,
00354 ServiceToken const& iToken,
00355 serviceregistry::ServiceLegacy iLegacy,
00356 std::vector<std::string> const& defaultServices,
00357 std::vector<std::string> const& forcedServices) :
00358 preProcessEventSignal_(),
00359 postProcessEventSignal_(),
00360 actReg_(new ActivityRegistry),
00361 preg_(new SignallingProductRegistry),
00362 serviceToken_(),
00363 input_(),
00364 esp_(),
00365 act_table_(),
00366 processConfiguration_(),
00367 schedule_(),
00368 state_(sInit),
00369 event_loop_(),
00370 state_lock_(),
00371 stop_lock_(),
00372 stopper_(),
00373 stop_count_(-1),
00374 last_rc_(epSuccess),
00375 last_error_text_(),
00376 id_set_(false),
00377 event_loop_id_(),
00378 my_sig_num_(getSigNum()),
00379 fb_(),
00380 looper_(),
00381 shouldWeStop_(false),
00382 stateMachineWasInErrorState_(false),
00383 alreadyHandlingException_(false),
00384 forceLooperToEnd_(false),
00385 looperBeginJobRun_(false),
00386 forceESCacheClearOnNewRun_(false),
00387 numberOfForkedChildren_(0),
00388 numberOfSequentialEventsPerChild_(1),
00389 setCpuAffinity_(false) {
00390 boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00391 processDesc->addServices(defaultServices, forcedServices);
00392 init(processDesc, iToken, iLegacy);
00393 }
00394
00395 EventProcessor::EventProcessor(std::string const& config,
00396 std::vector<std::string> const& defaultServices,
00397 std::vector<std::string> const& forcedServices) :
00398 preProcessEventSignal_(),
00399 postProcessEventSignal_(),
00400 actReg_(new ActivityRegistry),
00401 preg_(new SignallingProductRegistry),
00402 serviceToken_(),
00403 input_(),
00404 esp_(),
00405 act_table_(),
00406 processConfiguration_(),
00407 schedule_(),
00408 state_(sInit),
00409 event_loop_(),
00410 state_lock_(),
00411 stop_lock_(),
00412 stopper_(),
00413 stop_count_(-1),
00414 last_rc_(epSuccess),
00415 last_error_text_(),
00416 id_set_(false),
00417 event_loop_id_(),
00418 my_sig_num_(getSigNum()),
00419 fb_(),
00420 looper_(),
00421 shouldWeStop_(false),
00422 stateMachineWasInErrorState_(false),
00423 alreadyHandlingException_(false),
00424 forceLooperToEnd_(false),
00425 looperBeginJobRun_(false),
00426 forceESCacheClearOnNewRun_(false),
00427 numberOfForkedChildren_(0),
00428 numberOfSequentialEventsPerChild_(1),
00429 setCpuAffinity_(false) {
00430 boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00431 processDesc->addServices(defaultServices, forcedServices);
00432 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00433 }
00434
00435 EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00436 ServiceToken const& token,
00437 serviceregistry::ServiceLegacy legacy) :
00438 preProcessEventSignal_(),
00439 postProcessEventSignal_(),
00440 actReg_(new ActivityRegistry),
00441 preg_(new SignallingProductRegistry),
00442 serviceToken_(),
00443 input_(),
00444 esp_(),
00445 act_table_(),
00446 processConfiguration_(),
00447 schedule_(),
00448 state_(sInit),
00449 event_loop_(),
00450 state_lock_(),
00451 stop_lock_(),
00452 stopper_(),
00453 stop_count_(-1),
00454 last_rc_(epSuccess),
00455 last_error_text_(),
00456 id_set_(false),
00457 event_loop_id_(),
00458 my_sig_num_(getSigNum()),
00459 fb_(),
00460 looper_(),
00461 shouldWeStop_(false),
00462 stateMachineWasInErrorState_(false),
00463 alreadyHandlingException_(false),
00464 forceLooperToEnd_(false),
00465 looperBeginJobRun_(false),
00466 forceESCacheClearOnNewRun_(false)
00467 {
00468 init(processDesc, token, legacy);
00469 }
00470
00471
00472 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00473 preProcessEventSignal_(),
00474 postProcessEventSignal_(),
00475 actReg_(new ActivityRegistry),
00476 preg_(new SignallingProductRegistry),
00477 serviceToken_(),
00478 input_(),
00479 esp_(),
00480 act_table_(),
00481 processConfiguration_(),
00482 schedule_(),
00483 state_(sInit),
00484 event_loop_(),
00485 state_lock_(),
00486 stop_lock_(),
00487 stopper_(),
00488 stop_count_(-1),
00489 last_rc_(epSuccess),
00490 last_error_text_(),
00491 id_set_(false),
00492 event_loop_id_(),
00493 my_sig_num_(getSigNum()),
00494 fb_(),
00495 looper_(),
00496 shouldWeStop_(false),
00497 stateMachineWasInErrorState_(false),
00498 alreadyHandlingException_(false),
00499 forceLooperToEnd_(false),
00500 looperBeginJobRun_(false),
00501 forceESCacheClearOnNewRun_(false)
00502 {
00503 if(isPython) {
00504 boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
00505 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00506 }
00507 else {
00508 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00509 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00510 }
00511 }
00512
00513 void
00514 EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00515 ServiceToken const& iToken,
00516 serviceregistry::ServiceLegacy iLegacy) {
00517
00518
00519
00520 BranchIDListHelper::clearRegistries();
00521
00522 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00523
00524 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00525 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00526 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00527 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00528 ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00529 numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00530 numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00531 setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00532 std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00533 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00534 itPS != itPSEnd;
00535 ++itPS) {
00536 eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00537 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00538 itPS->getUntrackedParameter<std::string>("label", "")));
00539 }
00540
00541 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00542
00543
00544
00545 ServiceToken tempToken(ServiceRegistry::createSet(*pServiceSets, iToken, iLegacy));
00546
00547
00548 for(std::vector<ParameterSet>::const_iterator it = pServiceSets->begin(), itEnd = pServiceSets->end();
00549 it != itEnd;
00550 ++it) {
00551 if(it->exists("@save_config")) {
00552 parameterSet->addParameter(it->getParameter<std::string>("@service_type"), *it);
00553 }
00554 }
00555
00556
00557 tempToken.copySlotsTo(*actReg_);
00558
00559
00560 typedef serviceregistry::ServiceWrapper<ConstProductRegistry> w_CPR;
00561 boost::shared_ptr<w_CPR>
00562 reg(new w_CPR(std::auto_ptr<ConstProductRegistry>(new ConstProductRegistry(*preg_))));
00563 ServiceToken tempToken2(ServiceRegistry::createContaining(reg,
00564 tempToken,
00565 serviceregistry::kOverlapIsError));
00566
00567
00568
00569 std::string processName = parameterSet->getParameter<std::string>("@process_name");
00570
00571 typedef service::TriggerNamesService TNS;
00572 typedef serviceregistry::ServiceWrapper<TNS> w_TNS;
00573
00574 boost::shared_ptr<w_TNS> tnsptr
00575 (new w_TNS(std::auto_ptr<TNS>(new TNS(*parameterSet))));
00576
00577 serviceToken_ = ServiceRegistry::createContaining(tnsptr,
00578 tempToken2,
00579 serviceregistry::kOverlapIsError);
00580
00581
00582 ServiceRegistry::Operate operate(serviceToken_);
00583
00584 act_table_.reset(new ActionTable(*parameterSet));
00585 CommonParams common = CommonParams(processName,
00586 getReleaseVersion(),
00587 getPassID(),
00588 parameterSet->getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1),
00589 parameterSet->getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1));
00590
00591 std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00592 esp_ = espController->makeProvider(*parameterSet, common);
00593 processConfiguration_.reset(new ProcessConfiguration(processName, getReleaseVersion(), getPassID()));
00594
00595 looper_ = fillLooper(*esp_, *parameterSet, common);
00596 if(looper_) {
00597 looper_->setActionTable(act_table_.get());
00598 looper_->attachTo(*actReg_);
00599 }
00600
00601 input_ = makeInput(*parameterSet, common, *preg_, principalCache_, actReg_, processConfiguration_);
00602
00603 schedule_ = std::auto_ptr<Schedule>
00604 (new Schedule(*parameterSet,
00605 ServiceRegistry::instance().get<TNS>(),
00606 *preg_,
00607 *act_table_,
00608 actReg_,
00609 processConfiguration_));
00610
00611
00612 FDEBUG(2) << parameterSet << std::endl;
00613 connectSigs(this);
00614 }
00615
00616 EventProcessor::~EventProcessor() {
00617
00618 ServiceToken token = getToken();
00619 ServiceRegistry::Operate op(token);
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630 terminateMachine();
00631
00632 try {
00633 changeState(mDtor);
00634 }
00635 catch(cms::Exception& e) {
00636 LogError("System")
00637 << e.explainSelf() << "\n";
00638 }
00639
00640
00641 esp_.reset();
00642 schedule_.reset();
00643 input_.reset();
00644 looper_.reset();
00645 actReg_.reset();
00646
00647 pset::Registry* psetRegistry = pset::Registry::instance();
00648 psetRegistry->data().clear();
00649 psetRegistry->extra().setID(ParameterSetID());
00650
00651 EntryDescriptionRegistry::instance()->data().clear();
00652 ParentageRegistry::instance()->data().clear();
00653 ProcessConfigurationRegistry::instance()->data().clear();
00654 ProcessHistoryRegistry::instance()->data().clear();
00655 BranchIDListHelper::clearRegistries();
00656 }
00657
00658 void
00659 EventProcessor::rewind() {
00660 beginJob();
00661 changeState(mStopAsync);
00662 changeState(mInputRewind);
00663 {
00664 StateSentry toerror(this);
00665
00666
00667 ServiceRegistry::Operate operate(serviceToken_);
00668
00669 {
00670 input_->repeat();
00671 input_->rewind();
00672 }
00673 changeState(mCountComplete);
00674 toerror.succeeded();
00675 }
00676 changeState(mFinished);
00677 }
00678
00679 EventProcessor::StatusCode
00680 EventProcessor::run(int numberEventsToProcess, bool) {
00681 return runEventCount(numberEventsToProcess);
00682 }
00683
00684 EventProcessor::StatusCode
00685 EventProcessor::skip(int numberToSkip) {
00686 beginJob();
00687 changeState(mSkip);
00688 {
00689 StateSentry toerror(this);
00690
00691
00692 ServiceRegistry::Operate operate(serviceToken_);
00693
00694 {
00695 input_->skipEvents(numberToSkip);
00696 }
00697 changeState(mCountComplete);
00698 toerror.succeeded();
00699 }
00700 changeState(mFinished);
00701 return epSuccess;
00702 }
00703
00704 void
00705 EventProcessor::beginJob() {
00706 if(state_ != sInit) return;
00707 bk::beginJob();
00708
00709 changeState(mBeginJob);
00710
00711
00712
00713 ServiceRegistry::Operate operate(serviceToken_);
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729 try {
00730 input_->doBeginJob();
00731 } catch(cms::Exception& e) {
00732 LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00733 e << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00734 throw;
00735 } catch(std::exception& e) {
00736 LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n";
00737 throw;
00738 } catch(...) {
00739 LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n";
00740 throw;
00741 }
00742
00743 schedule_->beginJob();
00744 actReg_->postBeginJobSignal_();
00745
00746 }
00747
00748 void
00749 EventProcessor::endJob() {
00750
00751 ExceptionCollector c;
00752
00753
00754 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00755
00756
00757 ServiceRegistry::Operate operate(serviceToken_);
00758
00759 c.call(boost::bind(&EventProcessor::terminateMachine, this));
00760 c.call(boost::bind(&Schedule::endJob, schedule_.get()));
00761 c.call(boost::bind(&InputSource::doEndJob, input_));
00762 if(looper_) {
00763 c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00764 }
00765 c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00766 if(c.hasThrown()) {
00767 c.rethrow();
00768 }
00769 }
00770
00771 ServiceToken
00772 EventProcessor::getToken() {
00773 return serviceToken_;
00774 }
00775
00776
00777 namespace {
00778 volatile bool child_failed = false;
00779 volatile unsigned int num_children_done = 0;
00780 volatile int child_fail_exit_status = 0;
00781
00782 extern "C" {
00783 void ep_sigchld(int, siginfo_t*, void*) {
00784
00785
00786 int stat_loc;
00787 pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00788 while(0<p) {
00789
00790 if(WIFEXITED(stat_loc)) {
00791 ++num_children_done;
00792 if(0 != WEXITSTATUS(stat_loc)) {
00793 child_failed = true;
00794 child_fail_exit_status = WEXITSTATUS(stat_loc);
00795 }
00796 }
00797 if(WIFSIGNALED(stat_loc)) {
00798 ++num_children_done;
00799 child_failed = true;
00800 }
00801 p = waitpid(-1, &stat_loc, WNOHANG);
00802 }
00803 }
00804 }
00805
00806 }
00807
00808 enum {
00809 kChildSucceed,
00810 kChildExitBadly,
00811 kChildSegv,
00812 kMaxChildAction
00813 };
00814
00815 namespace {
00816 unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00817 unsigned int n = 0;
00818 while(numberOfChildren != 0) {
00819 ++n;
00820 numberOfChildren /= 10;
00821 }
00822 if(n == 0) {
00823 n = 3;
00824 }
00825 return n;
00826 }
00827
00828 class MessageSenderToSource {
00829 public:
00830 MessageSenderToSource(int iQueueID, long iNEventsToProcess):
00831 m_queueID(iQueueID),
00832 m_nEventsToProcess(iNEventsToProcess) {}
00833 void operator()() {
00834 std::cout << "I am controller" << std::endl;
00835
00836
00837 multicore::MessageForSource sndmsg;
00838 sndmsg.startIndex = 0;
00839 sndmsg.nIndices = m_nEventsToProcess;
00840 if(m_nEventsToProcess > 0 && m_nEventsToProcess < m_nEventsToProcess) {
00841 sndmsg.nIndices = m_nEventsToProcess;
00842 }
00843 int value = 0;
00844 do {
00845 value = msgsnd(m_queueID, &sndmsg, multicore::MessageForSource::sizeForBuffer(), 0);
00846
00847 sndmsg.startIndex += sndmsg.nIndices;
00848 } while(value == 0);
00849 if(errno != EIDRM and errno != EINVAL) {
00850
00851 perror("message queue send failed");
00852 }
00853 return;
00854 }
00855
00856 private:
00857 int const m_queueID;
00858 long const m_nEventsToProcess;
00859 };
00860 }
00861
00862 bool
00863 EventProcessor::forkProcess(std::string const& jobReportFile) {
00864
00865 if(0 == numberOfForkedChildren_) {return true;}
00866 assert(0<numberOfForkedChildren_);
00867
00868 {
00869 beginJob();
00870
00871 ServiceRegistry::Operate operate(serviceToken_);
00872
00873 InputSource::ItemType itemType;
00874 itemType = input_->nextItemType();
00875
00876 assert(itemType == InputSource::IsFile);
00877 {
00878 readFile();
00879 }
00880 itemType = input_->nextItemType();
00881 assert(itemType == InputSource::IsRun);
00882
00883 std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl;
00884 IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
00885 input_->runAuxiliary()->beginTime());
00886 EventSetup const& es = esp_->eventSetupForInstance(ts);
00887
00888
00889 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
00890 es.fillAvailableRecordKeys(recordKeys);
00891 std::vector<eventsetup::DataKey> dataKeys;
00892 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
00893 itKey != itEnd;
00894 ++itKey) {
00895 eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
00896
00897 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
00898 ExcludedData const* excludedData(0);
00899 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
00900 excludedData = &(itExcludeRec->second);
00901 if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
00902
00903 continue;
00904 }
00905 }
00906 if(0 != recordPtr) {
00907 dataKeys.clear();
00908 recordPtr->fillRegisteredDataKeys(dataKeys);
00909 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
00910 itDataKey != itDataKeyEnd;
00911 ++itDataKey) {
00912
00913 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
00914 std::cout << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00915 continue;
00916 }
00917 try {
00918 recordPtr->doGet(*itDataKey);
00919 } catch(cms::Exception& e) {
00920 LogWarning("EventSetupPreFetching") << e.what();
00921 }
00922 }
00923 }
00924 }
00925 }
00926 std::cout << " done prefetching" << std::endl;
00927 {
00928
00929 ServiceRegistry::Operate operate(serviceToken_);
00930 Service<JobReport> jobReport;
00931 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
00932
00933
00934 actReg_->preForkReleaseResourcesSignal_();
00935 input_->doPreForkReleaseResources();
00936 schedule_->preForkReleaseResources();
00937 }
00938 installCustomHandler(SIGCHLD, ep_sigchld);
00939
00940
00941 int queueID;
00942 if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) {
00943 printf("Error obtaining message queue\n");
00944 exit(EXIT_FAILURE);
00945 }
00946
00947 unsigned int childIndex = 0;
00948 unsigned int const kMaxChildren = numberOfForkedChildren_;
00949 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
00950 std::vector<pid_t> childrenIds;
00951 childrenIds.reserve(kMaxChildren);
00952 {
00953
00954 ServiceRegistry::Operate operate(serviceToken_);
00955 Service<JobReport> jobReport;
00956 for(; childIndex < kMaxChildren; ++childIndex) {
00957 pid_t value = fork();
00958 if(value == 0) {
00959
00960 fflush(stdout);
00961 fflush(stderr);
00962 std::stringstream stout;
00963 stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
00964 if(0 == freopen(stout.str().c_str(), "w", stdout)) {
00965 std::cerr << "Error during freopen of child process "
00966 << childIndex << std::endl;
00967 }
00968 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
00969 std::cerr << "Error during dup2 of child process"
00970 << childIndex << std::endl;
00971 }
00972
00973 std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl;
00974 if(setCpuAffinity_) {
00975
00976
00977
00978
00979
00980
00981 #ifdef __APPLE__
00982 std::cout << "Architecture support for CPU affinity not implemented." << std::endl;
00983 #else
00984 std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl;
00985 cpu_set_t mask;
00986 CPU_ZERO(&mask);
00987 CPU_SET(childIndex, &mask);
00988 if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
00989 std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl;
00990 exit(-1);
00991 }
00992 #endif
00993 }
00994 break;
00995 }
00996 if(value < 0) {
00997 std::cerr << "failed to create a child" << std::endl;
00998
00999
01000 msgctl(queueID, IPC_RMID, 0);
01001 exit(-1);
01002 }
01003 childrenIds.push_back(value);
01004 }
01005
01006 if(childIndex < kMaxChildren) {
01007 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01008 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01009
01010 boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID));
01011 input_->doPostForkReacquireResources(receiver);
01012 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01013
01014
01015 return true;
01016 }
01017 jobReport->parentAfterFork(jobReportFile);
01018 }
01019
01020
01021
01022
01023
01024
01025
01026
01027 sigset_t blockingSigSet;
01028 sigset_t unblockingSigSet;
01029 sigset_t oldSigSet;
01030 pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01031 pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01032 sigaddset(&blockingSigSet, SIGCHLD);
01033 sigaddset(&blockingSigSet, SIGUSR2);
01034 sigaddset(&blockingSigSet, SIGINT);
01035 sigdelset(&unblockingSigSet, SIGCHLD);
01036 sigdelset(&unblockingSigSet, SIGUSR2);
01037 sigdelset(&unblockingSigSet, SIGINT);
01038 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01039
01040
01041
01042
01043 MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_);
01044 boost::thread senderThread(sender);
01045
01046 while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01047 sigsuspend(&unblockingSigSet);
01048 std::cout << "woke from sigwait" << std::endl;
01049 }
01050 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01051
01052 std::cout << "num children who have already stopped " << num_children_done << std::endl;
01053 if(child_failed) {
01054 std::cout << "child failed" << std::endl;
01055 }
01056 if(shutdown_flag) {
01057 std::cout << "asked to shutdown" << std::endl;
01058 }
01059
01060 if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01061 std::cout << "must stop children" << std::endl;
01062 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01063 it != itEnd; ++it) {
01064 kill(*it, SIGUSR2);
01065 }
01066 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01067 while(num_children_done != kMaxChildren) {
01068 sigsuspend(&unblockingSigSet);
01069 }
01070 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01071 }
01072
01073
01074 msgctl(queueID, IPC_RMID, 0);
01075
01076
01077 senderThread.join();
01078 if(child_failed) {
01079 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01080 }
01081 return false;
01082 }
01083
01084 void
01085 EventProcessor::connectSigs(EventProcessor* ep) {
01086
01087
01088
01089 actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01090 actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01091 }
01092
01093 std::vector<ModuleDescription const*>
01094 EventProcessor::getAllModuleDescriptions() const {
01095 return schedule_->getAllModuleDescriptions();
01096 }
01097
01098 int
01099 EventProcessor::totalEvents() const {
01100 return schedule_->totalEvents();
01101 }
01102
01103 int
01104 EventProcessor::totalEventsPassed() const {
01105 return schedule_->totalEventsPassed();
01106 }
01107
01108 int
01109 EventProcessor::totalEventsFailed() const {
01110 return schedule_->totalEventsFailed();
01111 }
01112
01113 void
01114 EventProcessor::enableEndPaths(bool active) {
01115 schedule_->enableEndPaths(active);
01116 }
01117
01118 bool
01119 EventProcessor::endPathsEnabled() const {
01120 return schedule_->endPathsEnabled();
01121 }
01122
01123 void
01124 EventProcessor::getTriggerReport(TriggerReport& rep) const {
01125 schedule_->getTriggerReport(rep);
01126 }
01127
01128 void
01129 EventProcessor::clearCounters() {
01130 schedule_->clearCounters();
01131 }
01132
01133 char const* EventProcessor::currentStateName() const {
01134 return stateName(getState());
01135 }
01136
01137 char const* EventProcessor::stateName(State s) const {
01138 return stateNames[s];
01139 }
01140
01141 char const* EventProcessor::msgName(Msg m) const {
01142 return msgNames[m];
01143 }
01144
01145 State EventProcessor::getState() const {
01146 return state_;
01147 }
01148
01149 EventProcessor::StatusCode EventProcessor::statusAsync() const {
01150
01151
01152 return last_rc_;
01153 }
01154
01155 void
01156 EventProcessor::setRunNumber(RunNumber_t runNumber) {
01157 if(runNumber == 0) {
01158 runNumber = 1;
01159 LogWarning("Invalid Run")
01160 << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01161 << "Run number was set to 1 instead\n";
01162 }
01163
01164
01165 beginJob();
01166 changeState(mSetRun);
01167
01168
01169 input_->setRunNumber(runNumber);
01170 }
01171
01172 void
01173 EventProcessor::declareRunNumber(RunNumber_t runNumber) {
01174
01175 beginJob();
01176 changeState(mSetRun);
01177
01178
01179
01180 }
01181
01182 EventProcessor::StatusCode
01183 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01184 bool rc = true;
01185 boost::xtime timeout;
01186 boost::xtime_get(&timeout, boost::TIME_UTC);
01187 timeout.sec += timeout_seconds;
01188
01189
01190
01191
01192 {
01193 boost::mutex::scoped_lock sl(stop_lock_);
01194
01195
01196 if(stop_count_ < 0) return last_rc_;
01197
01198 if(timeout_seconds == 0) {
01199 while(stop_count_ == 0) stopper_.wait(sl);
01200 } else {
01201 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01202 }
01203
01204 if(rc == false) {
01205
01206
01207
01208
01209
01210
01211 if(id_set_) pthread_cancel(event_loop_id_);
01212
01213
01214 LogWarning("timeout")
01215 << "An asynchronous request was made to shut down "
01216 << "the event loop "
01217 << "and the event loop did not shutdown after "
01218 << timeout_seconds << " seconds\n";
01219 } else {
01220 event_loop_->join();
01221 event_loop_.reset();
01222 id_set_ = false;
01223 stop_count_ = -1;
01224 }
01225 }
01226 return rc == false ? epTimedOut : last_rc_;
01227 }
01228
01229 EventProcessor::StatusCode
01230 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01231 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01232 if(rc != epTimedOut) changeState(mCountComplete);
01233 else errorState();
01234 return rc;
01235 }
01236
01237
01238 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01239 changeState(mStopAsync);
01240 StatusCode rc = waitForAsyncCompletion(secs);
01241 if(rc != epTimedOut) changeState(mFinished);
01242 else errorState();
01243 return rc;
01244 }
01245
01246 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01247 changeState(mShutdownAsync);
01248 StatusCode rc = waitForAsyncCompletion(secs);
01249 if(rc != epTimedOut) changeState(mFinished);
01250 else errorState();
01251 return rc;
01252 }
01253
01254 void EventProcessor::errorState() {
01255 state_ = sError;
01256 }
01257
01258
01259 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01260
01261
01262
01263 changeState(m);
01264 return waitForAsyncCompletion(60*2);
01265 }
01266
01267 void EventProcessor::changeState(Msg msg) {
01268
01269
01270 boost::mutex::scoped_lock sl(state_lock_);
01271 State curr = state_;
01272 int rc;
01273
01274
01275 for(rc = 0;
01276 table[rc].current != sInvalid &&
01277 (curr != table[rc].current ||
01278 (curr == table[rc].current &&
01279 msg != table[rc].message && table[rc].message != mAny));
01280 ++rc);
01281
01282 if(table[rc].current == sInvalid)
01283 throw cms::Exception("BadState")
01284 << "A member function of EventProcessor has been called in an"
01285 << " inappropriate order.\n"
01286 << "Bad transition from " << stateName(curr) << " "
01287 << "using message " << msgName(msg) << "\n"
01288 << "No where to go from here.\n";
01289
01290 FDEBUG(1) << "changeState: current=" << stateName(curr)
01291 << ", message=" << msgName(msg)
01292 << " -> new=" << stateName(table[rc].final) << "\n";
01293
01294 state_ = table[rc].final;
01295 }
01296
01297 void EventProcessor::runAsync() {
01298 beginJob();
01299 {
01300 boost::mutex::scoped_lock sl(stop_lock_);
01301 if(id_set_ == true) {
01302 std::string err("runAsync called while async event loop already running\n");
01303 LogError("FwkJob") << err;
01304 throw cms::Exception("BadState") << err;
01305 }
01306
01307 changeState(mRunAsync);
01308
01309 stop_count_ = 0;
01310 last_rc_ = epSuccess;
01311 event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01312 boost::xtime timeout;
01313 boost::xtime_get(&timeout, boost::TIME_UTC);
01314 timeout.sec += 60;
01315 if(starter_.timed_wait(sl, timeout) == false) {
01316
01317 throw cms::Exception("BadState")
01318 << "Async run thread did not start in 60 seconds\n";
01319 }
01320 }
01321 }
01322
01323 void EventProcessor::asyncRun(EventProcessor* me) {
01324
01325
01326
01327
01328
01329
01330
01331
01332
01333 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01334
01335 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01336 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01337
01338 {
01339 boost::mutex::scoped_lock(me->stop_lock_);
01340 me->event_loop_id_ = pthread_self();
01341 me->id_set_ = true;
01342 me->starter_.notify_all();
01343 }
01344
01345 Status rc = epException;
01346 FDEBUG(2) << "asyncRun starting ......................\n";
01347
01348 try {
01349 bool onlineStateTransitions = true;
01350 rc = me->runToCompletion(onlineStateTransitions);
01351 }
01352 catch (cms::Exception& e) {
01353 LogError("FwkJob") << "cms::Exception caught in "
01354 << "EventProcessor::asyncRun"
01355 << "\n"
01356 << e.explainSelf();
01357 me->last_error_text_ = e.explainSelf();
01358 }
01359 catch (std::exception& e) {
01360 LogError("FwkJob") << "Standard library exception caught in "
01361 << "EventProcessor::asyncRun"
01362 << "\n"
01363 << e.what();
01364 me->last_error_text_ = e.what();
01365 }
01366 catch (...) {
01367 LogError("FwkJob") << "Unknown exception caught in "
01368 << "EventProcessor::asyncRun"
01369 << "\n";
01370 me->last_error_text_ = "Unknown exception caught";
01371 rc = epOther;
01372 }
01373
01374 me->last_rc_ = rc;
01375
01376 {
01377
01378 boost::mutex::scoped_lock sl(me->stop_lock_);
01379 ++me->stop_count_;
01380 me->stopper_.notify_all();
01381 }
01382 FDEBUG(2) << "asyncRun ending ......................\n";
01383 }
01384
01385
01386 EventProcessor::StatusCode
01387 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01388
01389 StateSentry toerror(this);
01390
01391 int numberOfEventsToProcess = -1;
01392 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01393
01394 if(machine_.get() != 0) {
01395 throw Exception(errors::LogicError)
01396 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01397 << "Please report this error to the Framework group\n";
01398 }
01399
01400 toerror.succeeded();
01401
01402 return returnCode;
01403 }
01404
01405 EventProcessor::StatusCode
01406 EventProcessor::runEventCount(int numberOfEventsToProcess) {
01407
01408 StateSentry toerror(this);
01409
01410 bool onlineStateTransitions = false;
01411 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01412
01413 toerror.succeeded();
01414
01415 return returnCode;
01416 }
01417
01418 EventProcessor::StatusCode
01419 EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01420
01421
01422 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
01423 principalCache_.insert(ep);
01424
01425 beginJob();
01426
01427 if(!onlineStateTransitions) changeState(mRunCount);
01428
01429 StatusCode returnCode = epSuccess;
01430 stateMachineWasInErrorState_ = false;
01431
01432
01433 ServiceRegistry::Operate operate(serviceToken_);
01434
01435 if(machine_.get() == 0) {
01436
01437 statemachine::FileMode fileMode;
01438 if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01439 else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01440 else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01441 else {
01442 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01443 << fileMode_ << ".\n"
01444 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01445 }
01446
01447 statemachine::EmptyRunLumiMode emptyRunLumiMode;
01448 if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01449 else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01450 else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01451 else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01452 else {
01453 throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01454 << emptyRunLumiMode_ << ".\n"
01455 << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01456 }
01457
01458 machine_.reset(new statemachine::Machine(this,
01459 fileMode,
01460 emptyRunLumiMode));
01461
01462 machine_->initiate();
01463 }
01464
01465 try {
01466
01467 InputSource::ItemType itemType;
01468
01469 int iEvents = 0;
01470
01471 while(true) {
01472
01473 itemType = input_->nextItemType();
01474
01475 FDEBUG(1) << "itemType = " << itemType << "\n";
01476
01477
01478
01479
01480
01481
01482
01483
01484 if(state_ == sStopping) {
01485 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01486 forceLooperToEnd_ = true;
01487 machine_->process_event(statemachine::Stop());
01488 forceLooperToEnd_ = false;
01489 break;
01490 }
01491 else if(state_ == sShuttingDown) {
01492 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01493 forceLooperToEnd_ = true;
01494 machine_->process_event(statemachine::Stop());
01495 forceLooperToEnd_ = false;
01496 break;
01497 }
01498
01499
01500 {
01501 boost::mutex::scoped_lock sl(usr2_lock);
01502 if(shutdown_flag) {
01503 changeState(mShutdownSignal);
01504 returnCode = epSignal;
01505 forceLooperToEnd_ = true;
01506 machine_->process_event(statemachine::Stop());
01507 forceLooperToEnd_ = false;
01508 break;
01509 }
01510 }
01511
01512 if(itemType == InputSource::IsStop) {
01513 machine_->process_event(statemachine::Stop());
01514 }
01515 else if(itemType == InputSource::IsFile) {
01516 machine_->process_event(statemachine::File());
01517 }
01518 else if(itemType == InputSource::IsRun) {
01519 machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run()));
01520 }
01521 else if(itemType == InputSource::IsLumi) {
01522 machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01523 }
01524 else if(itemType == InputSource::IsEvent) {
01525 machine_->process_event(statemachine::Event());
01526 ++iEvents;
01527 if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01528 returnCode = epCountComplete;
01529 changeState(mInputExhausted);
01530 FDEBUG(1) << "Event count complete, pausing event loop\n";
01531 break;
01532 }
01533 }
01534
01535 else {
01536 throw Exception(errors::LogicError)
01537 << "Unknown next item type passed to EventProcessor\n"
01538 << "Please report this error to the Framework group\n";
01539 }
01540
01541 if(machine_->terminated()) {
01542 changeState(mInputExhausted);
01543 break;
01544 }
01545 }
01546 }
01547
01548
01549
01550
01551
01552
01553
01554
01555
01556
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578
01579
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589
01590
01591
01592
01593 catch (cms::Exception& e) {
01594 alreadyHandlingException_ = true;
01595 terminateMachine();
01596 alreadyHandlingException_ = false;
01597 e << "cms::Exception caught in EventProcessor and rethrown\n";
01598 e << exceptionMessageLumis_;
01599 e << exceptionMessageRuns_;
01600 e << exceptionMessageFiles_;
01601 throw;
01602 }
01603 catch (std::bad_alloc& e) {
01604 alreadyHandlingException_ = true;
01605 terminateMachine();
01606 alreadyHandlingException_ = false;
01607 throw cms::Exception("std::bad_alloc")
01608 << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n"
01609 << "The job has probably exhausted the virtual memory available to the process.\n"
01610 << exceptionMessageLumis_
01611 << exceptionMessageRuns_
01612 << exceptionMessageFiles_;
01613 }
01614 catch (std::exception& e) {
01615 alreadyHandlingException_ = true;
01616 terminateMachine();
01617 alreadyHandlingException_ = false;
01618 throw cms::Exception("StdException")
01619 << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n"
01620 << "Previous information:\n" << e.what() << "\n"
01621 << exceptionMessageLumis_
01622 << exceptionMessageRuns_
01623 << exceptionMessageFiles_;
01624 }
01625 catch (...) {
01626 alreadyHandlingException_ = true;
01627 terminateMachine();
01628 alreadyHandlingException_ = false;
01629 throw cms::Exception("Unknown")
01630 << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n"
01631 << exceptionMessageLumis_
01632 << exceptionMessageRuns_
01633 << exceptionMessageFiles_;
01634 }
01635
01636 if(machine_->terminated()) {
01637 FDEBUG(1) << "The state machine reports it has been terminated\n";
01638 machine_.reset();
01639 }
01640
01641 if(!onlineStateTransitions) changeState(mFinished);
01642
01643 if(stateMachineWasInErrorState_) {
01644 throw cms::Exception("BadState")
01645 << "The boost state machine in the EventProcessor exited after\n"
01646 << "entering the Error state.\n";
01647 }
01648
01649 return returnCode;
01650 }
01651
01652 void EventProcessor::readFile() {
01653 FDEBUG(1) << " \treadFile\n";
01654 fb_ = input_->readFile();
01655 if(numberOfForkedChildren_ > 0) {
01656 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01657 }
01658 }
01659
01660 void EventProcessor::closeInputFile() {
01661 input_->closeFile(fb_);
01662 FDEBUG(1) << "\tcloseInputFile\n";
01663 }
01664
01665 void EventProcessor::openOutputFiles() {
01666 schedule_->openOutputFiles(*fb_);
01667 FDEBUG(1) << "\topenOutputFiles\n";
01668 }
01669
01670 void EventProcessor::closeOutputFiles() {
01671 schedule_->closeOutputFiles();
01672 FDEBUG(1) << "\tcloseOutputFiles\n";
01673 }
01674
01675 void EventProcessor::respondToOpenInputFile() {
01676 schedule_->respondToOpenInputFile(*fb_);
01677 FDEBUG(1) << "\trespondToOpenInputFile\n";
01678 }
01679
01680 void EventProcessor::respondToCloseInputFile() {
01681 schedule_->respondToCloseInputFile(*fb_);
01682 FDEBUG(1) << "\trespondToCloseInputFile\n";
01683 }
01684
01685 void EventProcessor::respondToOpenOutputFiles() {
01686 schedule_->respondToOpenOutputFiles(*fb_);
01687 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01688 }
01689
01690 void EventProcessor::respondToCloseOutputFiles() {
01691 schedule_->respondToCloseOutputFiles(*fb_);
01692 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01693 }
01694
01695 void EventProcessor::startingNewLoop() {
01696 shouldWeStop_ = false;
01697
01698
01699 if(looper_ && looperBeginJobRun_) {
01700 looper_->doStartingNewLoop();
01701 }
01702 FDEBUG(1) << "\tstartingNewLoop\n";
01703 }
01704
01705 bool EventProcessor::endOfLoop() {
01706 if(looper_) {
01707 ModuleChanger changer(schedule_.get());
01708 looper_->setModuleChanger(&changer);
01709 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01710 looper_->setModuleChanger(0);
01711 if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01712 else return false;
01713 }
01714 FDEBUG(1) << "\tendOfLoop\n";
01715 return true;
01716 }
01717
01718 void EventProcessor::rewindInput() {
01719 input_->repeat();
01720 input_->rewind();
01721 FDEBUG(1) << "\trewind\n";
01722 }
01723
01724 void EventProcessor::prepareForNextLoop() {
01725 looper_->prepareForNextLoop(esp_.get());
01726 FDEBUG(1) << "\tprepareForNextLoop\n";
01727 }
01728
01729 bool EventProcessor::shouldWeCloseOutput() const {
01730 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01731 return schedule_->shouldWeCloseOutput();
01732 }
01733
01734 void EventProcessor::doErrorStuff() {
01735 FDEBUG(1) << "\tdoErrorStuff\n";
01736 LogError("StateMachine")
01737 << "The EventProcessor state machine encountered an unexpected event\n"
01738 << "and went to the error state\n"
01739 << "Will attempt to terminate processing normally\n"
01740 << "(IF using the looper the next loop will be attempted)\n"
01741 << "This likely indicates a bug in an input module or corrupted input or both\n";
01742 stateMachineWasInErrorState_ = true;
01743 }
01744
01745 void EventProcessor::beginRun(statemachine::Run const& run) {
01746 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01747 input_->doBeginRun(runPrincipal);
01748 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01749 runPrincipal.beginTime());
01750 if(forceESCacheClearOnNewRun_){
01751 esp_->forceCacheClear();
01752 }
01753 EventSetup const& es = esp_->eventSetupForInstance(ts);
01754 if(looper_ && looperBeginJobRun_== false) {
01755 looper_->copyInfo(ScheduleInfo(schedule_.get()));
01756 looper_->beginOfJob(es);
01757 looperBeginJobRun_ = true;
01758 looper_->doStartingNewLoop();
01759 }
01760 {
01761 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01762 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01763 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01764 }
01765 FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01766 if(looper_) {
01767 looper_->doBeginRun(runPrincipal, es);
01768 }
01769 }
01770
01771 void EventProcessor::endRun(statemachine::Run const& run) {
01772 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01773 input_->doEndRun(runPrincipal);
01774 IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01775 runPrincipal.endTime());
01776 EventSetup const& es = esp_->eventSetupForInstance(ts);
01777 {
01778 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
01779 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01780 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01781 }
01782 FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
01783 if(looper_) {
01784 looper_->doEndRun(runPrincipal, es);
01785 }
01786 }
01787
01788 void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
01789 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01790 input_->doBeginLumi(lumiPrincipal);
01791
01792 Service<RandomNumberGenerator> rng;
01793 if(rng.isAvailable()) {
01794 LuminosityBlock lb(lumiPrincipal, ModuleDescription());
01795 rng->preBeginLumi(lb);
01796 }
01797
01798
01799
01800 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
01801 EventSetup const& es = esp_->eventSetupForInstance(ts);
01802 {
01803 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
01804 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01805 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01806 }
01807 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01808 if(looper_) {
01809 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
01810 }
01811 }
01812
01813 void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
01814 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01815 input_->doEndLumi(lumiPrincipal);
01816
01817
01818 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
01819 lumiPrincipal.endTime());
01820 EventSetup const& es = esp_->eventSetupForInstance(ts);
01821 {
01822 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
01823 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01824 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01825 }
01826 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01827 if(looper_) {
01828 looper_->doEndLuminosityBlock(lumiPrincipal, es);
01829 }
01830 }
01831
01832 statemachine::Run EventProcessor::readAndCacheRun() {
01833
01834 input_->readAndCacheRun();
01835 input_->markRun();
01836 return statemachine::Run(input_->processHistoryID(), input_->run());
01837 }
01838
01839 int EventProcessor::readAndCacheLumi() {
01840 input_->readAndCacheLumi();
01841 input_->markLumi();
01842 return input_->luminosityBlock();
01843 }
01844
01845 void EventProcessor::writeRun(statemachine::Run const& run) {
01846 schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
01847 FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
01848 }
01849
01850 void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
01851 principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
01852 FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
01853 }
01854
01855 void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
01856 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
01857 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01858 }
01859
01860 void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
01861 principalCache_.deleteLumi(phid, run, lumi);
01862 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01863 }
01864
01865 void EventProcessor::readAndProcessEvent() {
01866 EventPrincipal *pep = 0;
01867 try {
01868 pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
01869 FDEBUG(1) << "\treadEvent\n";
01870 }
01871 catch(cms::Exception& e) {
01872 actions::ActionCodes action = act_table_->find(e.rootCause());
01873 if(action == actions::Rethrow) {
01874 throw;
01875 } else {
01876 LogWarning(e.category())
01877 << "an exception occurred and all paths for the event are being skipped: \n"
01878 << e.what();
01879 return;
01880 }
01881 }
01882 assert(pep != 0);
01883
01884 IOVSyncValue ts(pep->id(), pep->time());
01885 EventSetup const& es = esp_->eventSetupForInstance(ts);
01886 {
01887 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
01888 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
01889 schedule_->processOneOccurrence<Traits>(*pep, es);
01890 }
01891
01892 if(looper_) {
01893 bool randomAccess = input_->randomAccess();
01894 ProcessingController::ForwardState forwardState = input_->forwardState();
01895 ProcessingController::ReverseState reverseState = input_->reverseState();
01896 ProcessingController pc(forwardState, reverseState, randomAccess);
01897
01898 EDLooperBase::Status status = EDLooperBase::kContinue;
01899 do {
01900 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
01901
01902 bool succeeded = true;
01903 if(randomAccess) {
01904 if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
01905 input_->skipEvents(-2);
01906 }
01907 else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
01908 succeeded = input_->goToEvent(pc.specifiedEventTransition());
01909 }
01910 }
01911 pc.setLastOperationSucceeded(succeeded);
01912 } while(!pc.lastOperationSucceeded());
01913 if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
01914
01915 }
01916
01917 FDEBUG(1) << "\tprocessEvent\n";
01918 pep->clearEventPrincipal();
01919 }
01920
01921 bool EventProcessor::shouldWeStop() const {
01922 FDEBUG(1) << "\tshouldWeStop\n";
01923 if(shouldWeStop_) return true;
01924 return schedule_->terminate();
01925 }
01926
01927 void EventProcessor::setExceptionMessageFiles(std::string& message) {
01928 exceptionMessageFiles_ = message;
01929 }
01930
01931 void EventProcessor::setExceptionMessageRuns(std::string& message) {
01932 exceptionMessageRuns_ = message;
01933 }
01934
01935 void EventProcessor::setExceptionMessageLumis(std::string& message) {
01936 exceptionMessageLumis_ = message;
01937 }
01938
01939 bool EventProcessor::alreadyHandlingException() const {
01940 return alreadyHandlingException_;
01941 }
01942
01943 void EventProcessor::terminateMachine() {
01944 if(machine_.get() != 0) {
01945 if(!machine_->terminated()) {
01946 forceLooperToEnd_ = true;
01947 machine_->process_event(statemachine::Stop());
01948 forceLooperToEnd_ = false;
01949 }
01950 else {
01951 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
01952 }
01953 if(machine_->terminated()) {
01954 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
01955 }
01956 machine_.reset();
01957 }
01958 }
01959 }