00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
00010 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00011 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00012
00013 #include "FWCore/Framework/interface/CommonParams.h"
00014 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00015 #include "FWCore/Framework/interface/EDLooperBase.h"
00016 #include "FWCore/Framework/interface/EventPrincipal.h"
00017 #include "FWCore/Framework/interface/EventSetupProvider.h"
00018 #include "FWCore/Framework/interface/EventSetupRecord.h"
00019 #include "FWCore/Framework/interface/FileBlock.h"
00020 #include "FWCore/Framework/interface/InputSourceDescription.h"
00021 #include "FWCore/Framework/interface/IOVSyncValue.h"
00022 #include "FWCore/Framework/interface/LooperFactory.h"
00023 #include "FWCore/Framework/interface/LuminosityBlock.h"
00024 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00025 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00026 #include "FWCore/Framework/interface/ModuleChanger.h"
00027 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00028 #include "FWCore/Framework/interface/ProcessingController.h"
00029 #include "FWCore/Framework/interface/RunPrincipal.h"
00030 #include "FWCore/Framework/interface/Schedule.h"
00031 #include "FWCore/Framework/interface/ScheduleInfo.h"
00032 #include "FWCore/Framework/interface/SubProcess.h"
00033 #include "FWCore/Framework/src/Breakpoints.h"
00034 #include "FWCore/Framework/src/EPStates.h"
00035 #include "FWCore/Framework/src/EventSetupsController.h"
00036 #include "FWCore/Framework/src/InputSourceFactory.h"
00037
00038 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00039
00040 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00041 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00043 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00044 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00045 #include "FWCore/ParameterSet/interface/Registry.h"
00046 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00047
00048 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00049 #include "FWCore/ServiceRegistry/interface/Service.h"
00050
00051 #include "FWCore/Utilities/interface/DebugMacros.h"
00052 #include "FWCore/Utilities/interface/EDMException.h"
00053 #include "FWCore/Utilities/interface/GetPassID.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057
00058 #include "boost/bind.hpp"
00059 #include "boost/thread/xtime.hpp"
00060
00061 #include <exception>
00062 #include <iomanip>
00063 #include <iostream>
00064 #include <utility>
00065
00066 #include <sys/ipc.h>
00067 #include <sys/msg.h>
00068
00069
00070 #include <sys/types.h>
00071 #include <sys/wait.h>
00072 #include <unistd.h>
00073
00074
00075 #ifndef __APPLE__
00076 #include <sched.h>
00077 #endif
00078
00079 namespace edm {
00080
00081 namespace event_processor {
00082
00083 class StateSentry {
00084 public:
00085 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00086 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00087 void succeeded() {success_ = true;}
00088
00089 private:
00090 EventProcessor* ep_;
00091 bool success_;
00092 };
00093 }
00094
00095 using namespace event_processor;
00096
00097 namespace {
00098 template <typename T>
00099 class ScheduleSignalSentry {
00100 public:
00101 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00102 a_(a), principal_(principal), es_(es) {
00103 if (a_) T::preScheduleSignal(a_, principal_);
00104 }
00105 ~ScheduleSignalSentry() {
00106 if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00107 }
00108
00109 private:
00110
00111 ActivityRegistry* a_;
00112 typename T::MyPrincipal* principal_;
00113 EventSetup const* es_;
00114 };
00115 }
00116
00117 namespace {
00118
00119
00120
00121
00122 char const* stateNames[] = {
00123 "Init",
00124 "JobReady",
00125 "RunGiven",
00126 "Running",
00127 "Stopping",
00128 "ShuttingDown",
00129 "Done",
00130 "JobEnded",
00131 "Error",
00132 "ErrorEnded",
00133 "End",
00134 "Invalid"
00135 };
00136
00137 char const* msgNames[] = {
00138 "SetRun",
00139 "Skip",
00140 "RunAsync",
00141 "Run(ID)",
00142 "Run(count)",
00143 "BeginJob",
00144 "StopAsync",
00145 "ShutdownAsync",
00146 "EndJob",
00147 "CountComplete",
00148 "InputExhausted",
00149 "StopSignal",
00150 "ShutdownSignal",
00151 "Finished",
00152 "Any",
00153 "dtor",
00154 "Exception",
00155 "Rewind"
00156 };
00157 }
00158
00159
00160
00161
00162
00163 struct TransEntry {
00164 State current;
00165 Msg message;
00166 State final;
00167 };
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190 TransEntry table[] = {
00191
00192
00193 { sInit, mException, sError },
00194 { sInit, mBeginJob, sJobReady },
00195 { sJobReady, mException, sError },
00196 { sJobReady, mSetRun, sRunGiven },
00197 { sJobReady, mInputRewind, sRunning },
00198 { sJobReady, mSkip, sRunning },
00199 { sJobReady, mRunID, sRunning },
00200 { sJobReady, mRunCount, sRunning },
00201 { sJobReady, mEndJob, sJobEnded },
00202 { sJobReady, mBeginJob, sJobReady },
00203 { sJobReady, mDtor, sEnd },
00204
00205 { sJobReady, mStopAsync, sJobReady },
00206 { sJobReady, mCountComplete, sJobReady },
00207 { sJobReady, mFinished, sJobReady },
00208
00209 { sRunGiven, mException, sError },
00210 { sRunGiven, mRunAsync, sRunning },
00211 { sRunGiven, mBeginJob, sRunGiven },
00212 { sRunGiven, mShutdownAsync, sShuttingDown },
00213 { sRunGiven, mStopAsync, sStopping },
00214 { sRunning, mException, sError },
00215 { sRunning, mStopAsync, sStopping },
00216 { sRunning, mShutdownAsync, sShuttingDown },
00217 { sRunning, mShutdownSignal, sShuttingDown },
00218 { sRunning, mCountComplete, sStopping },
00219 { sRunning, mInputExhausted, sStopping },
00220
00221 { sStopping, mInputRewind, sRunning },
00222 { sStopping, mException, sError },
00223 { sStopping, mFinished, sJobReady },
00224 { sStopping, mCountComplete, sJobReady },
00225 { sStopping, mShutdownSignal, sShuttingDown },
00226 { sStopping, mStopAsync, sStopping },
00227 { sStopping, mInputExhausted, sStopping },
00228
00229 { sShuttingDown, mException, sError },
00230 { sShuttingDown, mShutdownSignal, sShuttingDown },
00231 { sShuttingDown, mCountComplete, sDone },
00232 { sShuttingDown, mInputExhausted, sDone },
00233 { sShuttingDown, mFinished, sDone },
00234
00235
00236
00237 { sDone, mEndJob, sJobEnded },
00238 { sDone, mException, sError },
00239 { sJobEnded, mDtor, sEnd },
00240 { sJobEnded, mException, sError },
00241 { sError, mEndJob, sError },
00242 { sError, mDtor, sError },
00243 { sInit, mDtor, sEnd },
00244 { sStopping, mShutdownAsync, sShuttingDown },
00245 { sInvalid, mAny, sInvalid }
00246 };
00247
00248
00249
00250
00251
00252
00253 boost::shared_ptr<InputSource>
00254 makeInput(ParameterSet& params,
00255 CommonParams const& common,
00256 ProductRegistry& preg,
00257 PrincipalCache& pCache,
00258 boost::shared_ptr<ActivityRegistry> areg,
00259 boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00260 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00261 if(main_input == 0) {
00262 throw Exception(errors::Configuration, "FailedInputSource")
00263 << "Configuration of primary input source has failed\n";
00264 }
00265
00266 std::string modtype;
00267 try {
00268 modtype = main_input->getParameter<std::string>("@module_type");
00269 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00270 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00271 ConfigurationDescriptions descriptions(filler->baseType());
00272 filler->fill(descriptions);
00273 descriptions.validate(*main_input, std::string("source"));
00274 }
00275 catch (cms::Exception& iException) {
00276 Exception toThrow(errors::Configuration, "Failed validating primary input source configuration.");
00277 toThrow << "\nSource plugin name is \"" << modtype << "\"\n";
00278 toThrow.append(iException);
00279 throw toThrow;
00280 }
00281
00282 main_input->registerIt();
00283
00284
00285
00286
00287
00288
00289
00290 ModuleDescription md(main_input->id(),
00291 main_input->getParameter<std::string>("@module_type"),
00292 "source",
00293 processConfiguration);
00294
00295 InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00296 areg->preSourceConstructionSignal_(md);
00297 boost::shared_ptr<InputSource> input;
00298 try {
00299 input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00300 areg->postSourceConstructionSignal_(md);
00301 }
00302 catch (Exception& iException) {
00303 areg->postSourceConstructionSignal_(md);
00304
00305 Exception toThrow(iException.categoryCode(), "Error occured while constructing primary input source.");
00306 toThrow << "\nSource is of type \"" << modtype << "\"\n";
00307 toThrow.append(iException);
00308 throw toThrow;
00309 }
00310 catch (cms::Exception& iException) {
00311 areg->postSourceConstructionSignal_(md);
00312 Exception toThrow(errors::Configuration, "Error occured while constructing primary input source.");
00313 toThrow << "\nSource is of type \"" << modtype << "\"\n";
00314 toThrow.append(iException);
00315 throw toThrow;
00316 }
00317
00318 return input;
00319 }
00320
00321
00322 boost::shared_ptr<EDLooperBase>
00323 fillLooper(eventsetup::EventSetupProvider& cp,
00324 ParameterSet& params,
00325 CommonParams const& common) {
00326 boost::shared_ptr<EDLooperBase> vLooper;
00327
00328 std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00329
00330 if(loopers.size() == 0) {
00331 return vLooper;
00332 }
00333
00334 assert(1 == loopers.size());
00335
00336 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00337 itName != itNameEnd;
00338 ++itName) {
00339
00340 ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00341 providerPSet->registerIt();
00342 vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00343 *providerPSet,
00344 common.processName_,
00345 common.releaseVersion_,
00346 common.passID_);
00347 }
00348 return vLooper;
00349
00350 }
00351
00352
00353 EventProcessor::EventProcessor(std::string const& config,
00354 ServiceToken const& iToken,
00355 serviceregistry::ServiceLegacy iLegacy,
00356 std::vector<std::string> const& defaultServices,
00357 std::vector<std::string> const& forcedServices) :
00358 preProcessEventSignal_(),
00359 postProcessEventSignal_(),
00360 actReg_(),
00361 preg_(),
00362 serviceToken_(),
00363 input_(),
00364 esp_(),
00365 act_table_(),
00366 processConfiguration_(),
00367 schedule_(),
00368 subProcess_(),
00369 state_(sInit),
00370 event_loop_(),
00371 state_lock_(),
00372 stop_lock_(),
00373 stopper_(),
00374 starter_(),
00375 stop_count_(-1),
00376 last_rc_(epSuccess),
00377 last_error_text_(),
00378 id_set_(false),
00379 event_loop_id_(),
00380 my_sig_num_(getSigNum()),
00381 fb_(),
00382 looper_(),
00383 machine_(),
00384 principalCache_(),
00385 shouldWeStop_(false),
00386 stateMachineWasInErrorState_(false),
00387 fileMode_(),
00388 emptyRunLumiMode_(),
00389 exceptionMessageFiles_(),
00390 exceptionMessageRuns_(),
00391 exceptionMessageLumis_(),
00392 alreadyHandlingException_(false),
00393 forceLooperToEnd_(false),
00394 looperBeginJobRun_(false),
00395 forceESCacheClearOnNewRun_(false),
00396 numberOfForkedChildren_(0),
00397 numberOfSequentialEventsPerChild_(1),
00398 setCpuAffinity_(false),
00399 eventSetupDataToExcludeFromPrefetching_() {
00400 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00401 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00402 processDesc->addServices(defaultServices, forcedServices);
00403 init(processDesc, iToken, iLegacy);
00404 }
00405
00406 EventProcessor::EventProcessor(std::string const& config,
00407 std::vector<std::string> const& defaultServices,
00408 std::vector<std::string> const& forcedServices) :
00409 preProcessEventSignal_(),
00410 postProcessEventSignal_(),
00411 actReg_(),
00412 preg_(),
00413 serviceToken_(),
00414 input_(),
00415 esp_(),
00416 act_table_(),
00417 processConfiguration_(),
00418 schedule_(),
00419 subProcess_(),
00420 state_(sInit),
00421 event_loop_(),
00422 state_lock_(),
00423 stop_lock_(),
00424 stopper_(),
00425 starter_(),
00426 stop_count_(-1),
00427 last_rc_(epSuccess),
00428 last_error_text_(),
00429 id_set_(false),
00430 event_loop_id_(),
00431 my_sig_num_(getSigNum()),
00432 fb_(),
00433 looper_(),
00434 machine_(),
00435 principalCache_(),
00436 shouldWeStop_(false),
00437 stateMachineWasInErrorState_(false),
00438 fileMode_(),
00439 emptyRunLumiMode_(),
00440 exceptionMessageFiles_(),
00441 exceptionMessageRuns_(),
00442 exceptionMessageLumis_(),
00443 alreadyHandlingException_(false),
00444 forceLooperToEnd_(false),
00445 looperBeginJobRun_(false),
00446 forceESCacheClearOnNewRun_(false),
00447 numberOfForkedChildren_(0),
00448 numberOfSequentialEventsPerChild_(1),
00449 setCpuAffinity_(false),
00450 eventSetupDataToExcludeFromPrefetching_() {
00451 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00452 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00453 processDesc->addServices(defaultServices, forcedServices);
00454 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00455 }
00456
00457 EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00458 ServiceToken const& token,
00459 serviceregistry::ServiceLegacy legacy) :
00460 preProcessEventSignal_(),
00461 postProcessEventSignal_(),
00462 actReg_(),
00463 preg_(),
00464 serviceToken_(),
00465 input_(),
00466 esp_(),
00467 act_table_(),
00468 processConfiguration_(),
00469 schedule_(),
00470 subProcess_(),
00471 state_(sInit),
00472 event_loop_(),
00473 state_lock_(),
00474 stop_lock_(),
00475 stopper_(),
00476 starter_(),
00477 stop_count_(-1),
00478 last_rc_(epSuccess),
00479 last_error_text_(),
00480 id_set_(false),
00481 event_loop_id_(),
00482 my_sig_num_(getSigNum()),
00483 fb_(),
00484 looper_(),
00485 machine_(),
00486 principalCache_(),
00487 shouldWeStop_(false),
00488 stateMachineWasInErrorState_(false),
00489 fileMode_(),
00490 emptyRunLumiMode_(),
00491 exceptionMessageFiles_(),
00492 exceptionMessageRuns_(),
00493 exceptionMessageLumis_(),
00494 alreadyHandlingException_(false),
00495 forceLooperToEnd_(false),
00496 looperBeginJobRun_(false),
00497 forceESCacheClearOnNewRun_(false),
00498 numberOfForkedChildren_(0),
00499 numberOfSequentialEventsPerChild_(1),
00500 setCpuAffinity_(false),
00501 eventSetupDataToExcludeFromPrefetching_() {
00502 init(processDesc, token, legacy);
00503 }
00504
00505
00506 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00507 preProcessEventSignal_(),
00508 postProcessEventSignal_(),
00509 actReg_(),
00510 preg_(),
00511 serviceToken_(),
00512 input_(),
00513 esp_(),
00514 act_table_(),
00515 processConfiguration_(),
00516 schedule_(),
00517 subProcess_(),
00518 state_(sInit),
00519 event_loop_(),
00520 state_lock_(),
00521 stop_lock_(),
00522 stopper_(),
00523 starter_(),
00524 stop_count_(-1),
00525 last_rc_(epSuccess),
00526 last_error_text_(),
00527 id_set_(false),
00528 event_loop_id_(),
00529 my_sig_num_(getSigNum()),
00530 fb_(),
00531 looper_(),
00532 machine_(),
00533 principalCache_(),
00534 shouldWeStop_(false),
00535 stateMachineWasInErrorState_(false),
00536 fileMode_(),
00537 emptyRunLumiMode_(),
00538 exceptionMessageFiles_(),
00539 exceptionMessageRuns_(),
00540 exceptionMessageLumis_(),
00541 alreadyHandlingException_(false),
00542 forceLooperToEnd_(false),
00543 looperBeginJobRun_(false),
00544 forceESCacheClearOnNewRun_(false),
00545 numberOfForkedChildren_(0),
00546 numberOfSequentialEventsPerChild_(1),
00547 setCpuAffinity_(false),
00548 eventSetupDataToExcludeFromPrefetching_() {
00549 if(isPython) {
00550 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00551 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00552 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00553 }
00554 else {
00555 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00556 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00557 }
00558 }
00559
00560 void
00561 EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00562 ServiceToken const& iToken,
00563 serviceregistry::ServiceLegacy iLegacy) {
00564
00565
00566
00567
00568 BranchIDListHelper::clearRegistries();
00569
00570 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00571
00572
00573
00574 boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00575
00576
00577 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00578 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00579 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00580 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00581 ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00582 numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00583 numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00584 setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00585 std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00586 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00587 itPS != itPSEnd;
00588 ++itPS) {
00589 eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00590 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00591 itPS->getUntrackedParameter<std::string>("label", "")));
00592 }
00593
00594 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00595
00596 std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00597
00598
00599 ScheduleItems items;
00600
00601
00602 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00603 ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00604 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00605
00606
00607 ServiceRegistry::Operate operate(serviceToken_);
00608
00609
00610 boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00611
00612
00613 esp_ = espController->makeProvider(*parameterSet, *common);
00614
00615
00616 looper_ = fillLooper(*esp_, *parameterSet, *common);
00617 if(looper_) {
00618 looper_->setActionTable(items.act_table_.get());
00619 looper_->attachTo(*items.actReg_);
00620 }
00621
00622
00623 input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00624
00625
00626 schedule_ = items.initSchedule(*parameterSet);
00627
00628
00629 act_table_ = items.act_table_;
00630 actReg_ = items.actReg_;
00631 preg_ = items.preg_;
00632 processConfiguration_ = items.processConfiguration_;
00633
00634 FDEBUG(2) << parameterSet << std::endl;
00635 connectSigs(this);
00636
00637
00638 if(subProcessParameterSet) {
00639 subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController, *actReg_, token, serviceregistry::kConfigurationOverrides));
00640 }
00641 }
00642
00643 EventProcessor::~EventProcessor() {
00644
00645 ServiceToken token = getToken();
00646 ServiceRegistry::Operate op(token);
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657 terminateMachine();
00658
00659 try {
00660 changeState(mDtor);
00661 }
00662 catch(cms::Exception& e) {
00663 LogError("System")
00664 << e.explainSelf() << "\n";
00665 }
00666
00667
00668 subProcess_.reset();
00669 esp_.reset();
00670 schedule_.reset();
00671 input_.reset();
00672 looper_.reset();
00673 actReg_.reset();
00674
00675 pset::Registry* psetRegistry = pset::Registry::instance();
00676 psetRegistry->data().clear();
00677 psetRegistry->extra().setID(ParameterSetID());
00678
00679 EntryDescriptionRegistry::instance()->data().clear();
00680 ParentageRegistry::instance()->data().clear();
00681 ProcessConfigurationRegistry::instance()->data().clear();
00682 ProcessHistoryRegistry::instance()->data().clear();
00683 BranchIDListHelper::clearRegistries();
00684 }
00685
00686 void
00687 EventProcessor::rewind() {
00688 beginJob();
00689 changeState(mStopAsync);
00690 changeState(mInputRewind);
00691 {
00692 StateSentry toerror(this);
00693
00694
00695 ServiceRegistry::Operate operate(serviceToken_);
00696
00697 {
00698 input_->repeat();
00699 input_->rewind();
00700 }
00701 changeState(mCountComplete);
00702 toerror.succeeded();
00703 }
00704 changeState(mFinished);
00705 }
00706
00707 EventProcessor::StatusCode
00708 EventProcessor::run(int numberEventsToProcess, bool) {
00709 return runEventCount(numberEventsToProcess);
00710 }
00711
00712 EventProcessor::StatusCode
00713 EventProcessor::skip(int numberToSkip) {
00714 beginJob();
00715 changeState(mSkip);
00716 {
00717 StateSentry toerror(this);
00718
00719
00720 ServiceRegistry::Operate operate(serviceToken_);
00721
00722 {
00723 input_->skipEvents(numberToSkip);
00724 }
00725 changeState(mCountComplete);
00726 toerror.succeeded();
00727 }
00728 changeState(mFinished);
00729 return epSuccess;
00730 }
00731
00732 void
00733 EventProcessor::beginJob() {
00734 if(state_ != sInit) return;
00735 bk::beginJob();
00736
00737 changeState(mBeginJob);
00738
00739
00740
00741 ServiceRegistry::Operate operate(serviceToken_);
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757 try {
00758 input_->doBeginJob();
00759 } catch(cms::Exception& e) {
00760 LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00761 e << "A cms::Exception happened while processing the beginJob of the 'source'\n";
00762 throw;
00763 } catch(std::exception& e) {
00764 LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n";
00765 throw;
00766 } catch(...) {
00767 LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n";
00768 throw;
00769 }
00770
00771 schedule_->beginJob();
00772
00773 if(hasSubProcess()) subProcess_->doBeginJob();
00774 actReg_->postBeginJobSignal_();
00775 }
00776
00777 void
00778 EventProcessor::endJob() {
00779
00780 ExceptionCollector c;
00781
00782
00783 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00784
00785
00786 ServiceRegistry::Operate operate(serviceToken_);
00787
00788 c.call(boost::bind(&EventProcessor::terminateMachine, this));
00789 c.call(boost::bind(&Schedule::endJob, schedule_.get()));
00790 if(hasSubProcess()) {
00791 c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00792 }
00793 c.call(boost::bind(&InputSource::doEndJob, input_));
00794 if(looper_) {
00795 c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00796 }
00797 c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00798 if(c.hasThrown()) {
00799 c.rethrow();
00800 }
00801 }
00802
00803 ServiceToken
00804 EventProcessor::getToken() {
00805 return serviceToken_;
00806 }
00807
00808
00809 namespace {
00810 volatile bool child_failed = false;
00811 volatile unsigned int num_children_done = 0;
00812 volatile int child_fail_exit_status = 0;
00813
00814 extern "C" {
00815 void ep_sigchld(int, siginfo_t*, void*) {
00816
00817
00818 int stat_loc;
00819 pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00820 while(0<p) {
00821
00822 if(WIFEXITED(stat_loc)) {
00823 ++num_children_done;
00824 if(0 != WEXITSTATUS(stat_loc)) {
00825 child_failed = true;
00826 child_fail_exit_status = WEXITSTATUS(stat_loc);
00827 }
00828 }
00829 if(WIFSIGNALED(stat_loc)) {
00830 ++num_children_done;
00831 child_failed = true;
00832 }
00833 p = waitpid(-1, &stat_loc, WNOHANG);
00834 }
00835 }
00836 }
00837
00838 }
00839
00840 enum {
00841 kChildSucceed,
00842 kChildExitBadly,
00843 kChildSegv,
00844 kMaxChildAction
00845 };
00846
00847 namespace {
00848 unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00849 unsigned int n = 0;
00850 while(numberOfChildren != 0) {
00851 ++n;
00852 numberOfChildren /= 10;
00853 }
00854 if(n == 0) {
00855 n = 3;
00856 }
00857 return n;
00858 }
00859
00860 class MessageSenderToSource {
00861 public:
00862 MessageSenderToSource(int iQueueID, long iNEventsToProcess):
00863 m_queueID(iQueueID),
00864 m_nEventsToProcess(iNEventsToProcess) {}
00865 void operator()() {
00866 std::cout << "I am controller" << std::endl;
00867
00868
00869 multicore::MessageForSource sndmsg;
00870 sndmsg.startIndex = 0;
00871 sndmsg.nIndices = m_nEventsToProcess;
00872 if(m_nEventsToProcess > 0 && m_nEventsToProcess < m_nEventsToProcess) {
00873 sndmsg.nIndices = m_nEventsToProcess;
00874 }
00875 int value = 0;
00876 do {
00877 value = msgsnd(m_queueID, &sndmsg, multicore::MessageForSource::sizeForBuffer(), 0);
00878
00879 sndmsg.startIndex += sndmsg.nIndices;
00880 } while(value == 0);
00881 if(errno != EIDRM and errno != EINVAL) {
00882
00883 perror("message queue send failed");
00884 }
00885 return;
00886 }
00887
00888 private:
00889 int const m_queueID;
00890 long const m_nEventsToProcess;
00891 };
00892 }
00893
00894 bool
00895 EventProcessor::forkProcess(std::string const& jobReportFile) {
00896
00897 if(0 == numberOfForkedChildren_) {return true;}
00898 assert(0<numberOfForkedChildren_);
00899
00900 {
00901 beginJob();
00902
00903 ServiceRegistry::Operate operate(serviceToken_);
00904
00905 InputSource::ItemType itemType;
00906 itemType = input_->nextItemType();
00907
00908 assert(itemType == InputSource::IsFile);
00909 {
00910 readFile();
00911 }
00912 itemType = input_->nextItemType();
00913 assert(itemType == InputSource::IsRun);
00914
00915 std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl;
00916 IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
00917 input_->runAuxiliary()->beginTime());
00918 EventSetup const& es = esp_->eventSetupForInstance(ts);
00919
00920
00921 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
00922 es.fillAvailableRecordKeys(recordKeys);
00923 std::vector<eventsetup::DataKey> dataKeys;
00924 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
00925 itKey != itEnd;
00926 ++itKey) {
00927 eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
00928
00929 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
00930 ExcludedData const* excludedData(0);
00931 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
00932 excludedData = &(itExcludeRec->second);
00933 if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
00934
00935 continue;
00936 }
00937 }
00938 if(0 != recordPtr) {
00939 dataKeys.clear();
00940 recordPtr->fillRegisteredDataKeys(dataKeys);
00941 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
00942 itDataKey != itDataKeyEnd;
00943 ++itDataKey) {
00944
00945 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
00946 std::cout << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00947 continue;
00948 }
00949 try {
00950 recordPtr->doGet(*itDataKey);
00951 } catch(cms::Exception& e) {
00952 LogWarning("EventSetupPreFetching") << e.what();
00953 }
00954 }
00955 }
00956 }
00957 }
00958 std::cout << " done prefetching" << std::endl;
00959 {
00960
00961 ServiceRegistry::Operate operate(serviceToken_);
00962 Service<JobReport> jobReport;
00963 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
00964
00965
00966 actReg_->preForkReleaseResourcesSignal_();
00967 input_->doPreForkReleaseResources();
00968 schedule_->preForkReleaseResources();
00969 }
00970 installCustomHandler(SIGCHLD, ep_sigchld);
00971
00972
00973 int queueID;
00974 if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) {
00975 printf("Error obtaining message queue\n");
00976 exit(EXIT_FAILURE);
00977 }
00978
00979 unsigned int childIndex = 0;
00980 unsigned int const kMaxChildren = numberOfForkedChildren_;
00981 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
00982 std::vector<pid_t> childrenIds;
00983 childrenIds.reserve(kMaxChildren);
00984 {
00985
00986 ServiceRegistry::Operate operate(serviceToken_);
00987 Service<JobReport> jobReport;
00988 for(; childIndex < kMaxChildren; ++childIndex) {
00989 pid_t value = fork();
00990 if(value == 0) {
00991
00992 fflush(stdout);
00993 fflush(stderr);
00994 std::stringstream stout;
00995 stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
00996 if(0 == freopen(stout.str().c_str(), "w", stdout)) {
00997 std::cerr << "Error during freopen of child process "
00998 << childIndex << std::endl;
00999 }
01000 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01001 std::cerr << "Error during dup2 of child process"
01002 << childIndex << std::endl;
01003 }
01004
01005 std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl;
01006 if(setCpuAffinity_) {
01007
01008
01009
01010
01011
01012
01013 #ifdef __APPLE__
01014 std::cout << "Architecture support for CPU affinity not implemented." << std::endl;
01015 #else
01016 std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl;
01017 cpu_set_t mask;
01018 CPU_ZERO(&mask);
01019 CPU_SET(childIndex, &mask);
01020 if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01021 std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl;
01022 exit(-1);
01023 }
01024 #endif
01025 }
01026 break;
01027 }
01028 if(value < 0) {
01029 std::cerr << "failed to create a child" << std::endl;
01030
01031
01032 msgctl(queueID, IPC_RMID, 0);
01033 exit(-1);
01034 }
01035 childrenIds.push_back(value);
01036 }
01037
01038 if(childIndex < kMaxChildren) {
01039 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01040 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01041
01042 boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID));
01043 input_->doPostForkReacquireResources(receiver);
01044 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01045
01046
01047 return true;
01048 }
01049 jobReport->parentAfterFork(jobReportFile);
01050 }
01051
01052
01053
01054
01055
01056
01057
01058
01059 sigset_t blockingSigSet;
01060 sigset_t unblockingSigSet;
01061 sigset_t oldSigSet;
01062 pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01063 pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01064 sigaddset(&blockingSigSet, SIGCHLD);
01065 sigaddset(&blockingSigSet, SIGUSR2);
01066 sigaddset(&blockingSigSet, SIGINT);
01067 sigdelset(&unblockingSigSet, SIGCHLD);
01068 sigdelset(&unblockingSigSet, SIGUSR2);
01069 sigdelset(&unblockingSigSet, SIGINT);
01070 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01071
01072
01073
01074
01075 MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_);
01076 boost::thread senderThread(sender);
01077
01078 while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01079 sigsuspend(&unblockingSigSet);
01080 std::cout << "woke from sigwait" << std::endl;
01081 }
01082 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01083
01084 std::cout << "num children who have already stopped " << num_children_done << std::endl;
01085 if(child_failed) {
01086 std::cout << "child failed" << std::endl;
01087 }
01088 if(shutdown_flag) {
01089 std::cout << "asked to shutdown" << std::endl;
01090 }
01091
01092 if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01093 std::cout << "must stop children" << std::endl;
01094 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01095 it != itEnd; ++it) {
01096 kill(*it, SIGUSR2);
01097 }
01098 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01099 while(num_children_done != kMaxChildren) {
01100 sigsuspend(&unblockingSigSet);
01101 }
01102 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01103 }
01104
01105
01106 msgctl(queueID, IPC_RMID, 0);
01107
01108
01109 senderThread.join();
01110 if(child_failed) {
01111 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01112 }
01113 return false;
01114 }
01115
01116 void
01117 EventProcessor::connectSigs(EventProcessor* ep) {
01118
01119
01120
01121 actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01122 actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01123 }
01124
01125 std::vector<ModuleDescription const*>
01126 EventProcessor::getAllModuleDescriptions() const {
01127 return schedule_->getAllModuleDescriptions();
01128 }
01129
01130 int
01131 EventProcessor::totalEvents() const {
01132 return schedule_->totalEvents();
01133 }
01134
01135 int
01136 EventProcessor::totalEventsPassed() const {
01137 return schedule_->totalEventsPassed();
01138 }
01139
01140 int
01141 EventProcessor::totalEventsFailed() const {
01142 return schedule_->totalEventsFailed();
01143 }
01144
01145 void
01146 EventProcessor::enableEndPaths(bool active) {
01147 schedule_->enableEndPaths(active);
01148 }
01149
01150 bool
01151 EventProcessor::endPathsEnabled() const {
01152 return schedule_->endPathsEnabled();
01153 }
01154
01155 void
01156 EventProcessor::getTriggerReport(TriggerReport& rep) const {
01157 schedule_->getTriggerReport(rep);
01158 }
01159
01160 void
01161 EventProcessor::clearCounters() {
01162 schedule_->clearCounters();
01163 }
01164
01165 char const* EventProcessor::currentStateName() const {
01166 return stateName(getState());
01167 }
01168
01169 char const* EventProcessor::stateName(State s) const {
01170 return stateNames[s];
01171 }
01172
01173 char const* EventProcessor::msgName(Msg m) const {
01174 return msgNames[m];
01175 }
01176
01177 State EventProcessor::getState() const {
01178 return state_;
01179 }
01180
01181 EventProcessor::StatusCode EventProcessor::statusAsync() const {
01182
01183
01184 return last_rc_;
01185 }
01186
01187 void
01188 EventProcessor::setRunNumber(RunNumber_t runNumber) {
01189 if(runNumber == 0) {
01190 runNumber = 1;
01191 LogWarning("Invalid Run")
01192 << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01193 << "Run number was set to 1 instead\n";
01194 }
01195
01196
01197 beginJob();
01198 changeState(mSetRun);
01199
01200
01201 input_->setRunNumber(runNumber);
01202 }
01203
01204 void
01205 EventProcessor::declareRunNumber(RunNumber_t runNumber) {
01206
01207 beginJob();
01208 changeState(mSetRun);
01209
01210
01211
01212 }
01213
01214 EventProcessor::StatusCode
01215 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01216 bool rc = true;
01217 boost::xtime timeout;
01218 boost::xtime_get(&timeout, boost::TIME_UTC);
01219 timeout.sec += timeout_seconds;
01220
01221
01222
01223
01224 {
01225 boost::mutex::scoped_lock sl(stop_lock_);
01226
01227
01228 if(stop_count_ < 0) return last_rc_;
01229
01230 if(timeout_seconds == 0) {
01231 while(stop_count_ == 0) stopper_.wait(sl);
01232 } else {
01233 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01234 }
01235
01236 if(rc == false) {
01237
01238
01239
01240
01241
01242
01243 if(id_set_) pthread_cancel(event_loop_id_);
01244
01245
01246 LogWarning("timeout")
01247 << "An asynchronous request was made to shut down "
01248 << "the event loop "
01249 << "and the event loop did not shutdown after "
01250 << timeout_seconds << " seconds\n";
01251 } else {
01252 event_loop_->join();
01253 event_loop_.reset();
01254 id_set_ = false;
01255 stop_count_ = -1;
01256 }
01257 }
01258 return rc == false ? epTimedOut : last_rc_;
01259 }
01260
01261 EventProcessor::StatusCode
01262 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01263 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01264 if(rc != epTimedOut) changeState(mCountComplete);
01265 else errorState();
01266 return rc;
01267 }
01268
01269
01270 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01271 changeState(mStopAsync);
01272 StatusCode rc = waitForAsyncCompletion(secs);
01273 if(rc != epTimedOut) changeState(mFinished);
01274 else errorState();
01275 return rc;
01276 }
01277
01278 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01279 changeState(mShutdownAsync);
01280 StatusCode rc = waitForAsyncCompletion(secs);
01281 if(rc != epTimedOut) changeState(mFinished);
01282 else errorState();
01283 return rc;
01284 }
01285
01286 void EventProcessor::errorState() {
01287 state_ = sError;
01288 }
01289
01290
01291 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01292
01293
01294
01295 changeState(m);
01296 return waitForAsyncCompletion(60*2);
01297 }
01298
01299 void EventProcessor::changeState(Msg msg) {
01300
01301
01302 boost::mutex::scoped_lock sl(state_lock_);
01303 State curr = state_;
01304 int rc;
01305
01306
01307 for(rc = 0;
01308 table[rc].current != sInvalid &&
01309 (curr != table[rc].current ||
01310 (curr == table[rc].current &&
01311 msg != table[rc].message && table[rc].message != mAny));
01312 ++rc);
01313
01314 if(table[rc].current == sInvalid)
01315 throw cms::Exception("BadState")
01316 << "A member function of EventProcessor has been called in an"
01317 << " inappropriate order.\n"
01318 << "Bad transition from " << stateName(curr) << " "
01319 << "using message " << msgName(msg) << "\n"
01320 << "No where to go from here.\n";
01321
01322 FDEBUG(1) << "changeState: current=" << stateName(curr)
01323 << ", message=" << msgName(msg)
01324 << " -> new=" << stateName(table[rc].final) << "\n";
01325
01326 state_ = table[rc].final;
01327 }
01328
01329 void EventProcessor::runAsync() {
01330 beginJob();
01331 {
01332 boost::mutex::scoped_lock sl(stop_lock_);
01333 if(id_set_ == true) {
01334 std::string err("runAsync called while async event loop already running\n");
01335 LogError("FwkJob") << err;
01336 throw cms::Exception("BadState") << err;
01337 }
01338
01339 changeState(mRunAsync);
01340
01341 stop_count_ = 0;
01342 last_rc_ = epSuccess;
01343 event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01344 boost::xtime timeout;
01345 boost::xtime_get(&timeout, boost::TIME_UTC);
01346 timeout.sec += 60;
01347 if(starter_.timed_wait(sl, timeout) == false) {
01348
01349 throw cms::Exception("BadState")
01350 << "Async run thread did not start in 60 seconds\n";
01351 }
01352 }
01353 }
01354
01355 void EventProcessor::asyncRun(EventProcessor* me) {
01356
01357
01358
01359
01360
01361
01362
01363
01364
01365 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01366
01367 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01368 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01369
01370 {
01371 boost::mutex::scoped_lock(me->stop_lock_);
01372 me->event_loop_id_ = pthread_self();
01373 me->id_set_ = true;
01374 me->starter_.notify_all();
01375 }
01376
01377 Status rc = epException;
01378 FDEBUG(2) << "asyncRun starting ......................\n";
01379
01380 try {
01381 bool onlineStateTransitions = true;
01382 rc = me->runToCompletion(onlineStateTransitions);
01383 }
01384 catch (cms::Exception& e) {
01385 LogError("FwkJob") << "cms::Exception caught in "
01386 << "EventProcessor::asyncRun"
01387 << "\n"
01388 << e.explainSelf();
01389 me->last_error_text_ = e.explainSelf();
01390 }
01391 catch (std::exception& e) {
01392 LogError("FwkJob") << "Standard library exception caught in "
01393 << "EventProcessor::asyncRun"
01394 << "\n"
01395 << e.what();
01396 me->last_error_text_ = e.what();
01397 }
01398 catch (...) {
01399 LogError("FwkJob") << "Unknown exception caught in "
01400 << "EventProcessor::asyncRun"
01401 << "\n";
01402 me->last_error_text_ = "Unknown exception caught";
01403 rc = epOther;
01404 }
01405
01406 me->last_rc_ = rc;
01407
01408 {
01409
01410 boost::mutex::scoped_lock sl(me->stop_lock_);
01411 ++me->stop_count_;
01412 me->stopper_.notify_all();
01413 }
01414 FDEBUG(2) << "asyncRun ending ......................\n";
01415 }
01416
01417
01418 EventProcessor::StatusCode
01419 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01420
01421 StateSentry toerror(this);
01422
01423 int numberOfEventsToProcess = -1;
01424 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01425
01426 if(machine_.get() != 0) {
01427 throw Exception(errors::LogicError)
01428 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01429 << "Please report this error to the Framework group\n";
01430 }
01431
01432 toerror.succeeded();
01433
01434 return returnCode;
01435 }
01436
01437 EventProcessor::StatusCode
01438 EventProcessor::runEventCount(int numberOfEventsToProcess) {
01439
01440 StateSentry toerror(this);
01441
01442 bool onlineStateTransitions = false;
01443 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01444
01445 toerror.succeeded();
01446
01447 return returnCode;
01448 }
01449
01450 EventProcessor::StatusCode
01451 EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01452
01453
01454 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
01455 principalCache_.insert(ep);
01456
01457 beginJob();
01458
01459 if(!onlineStateTransitions) changeState(mRunCount);
01460
01461 StatusCode returnCode = epSuccess;
01462 stateMachineWasInErrorState_ = false;
01463
01464
01465 ServiceRegistry::Operate operate(serviceToken_);
01466
01467 if(machine_.get() == 0) {
01468
01469 statemachine::FileMode fileMode;
01470 if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01471 else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01472 else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01473 else {
01474 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01475 << fileMode_ << ".\n"
01476 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01477 }
01478
01479 statemachine::EmptyRunLumiMode emptyRunLumiMode;
01480 if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01481 else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01482 else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01483 else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01484 else {
01485 throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01486 << emptyRunLumiMode_ << ".\n"
01487 << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01488 }
01489
01490 machine_.reset(new statemachine::Machine(this,
01491 fileMode,
01492 emptyRunLumiMode));
01493
01494 machine_->initiate();
01495 }
01496
01497 try {
01498
01499 InputSource::ItemType itemType;
01500
01501 int iEvents = 0;
01502
01503 while(true) {
01504
01505 itemType = input_->nextItemType();
01506
01507 FDEBUG(1) << "itemType = " << itemType << "\n";
01508
01509
01510
01511
01512
01513
01514
01515
01516 if(state_ == sStopping) {
01517 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01518 forceLooperToEnd_ = true;
01519 machine_->process_event(statemachine::Stop());
01520 forceLooperToEnd_ = false;
01521 break;
01522 }
01523 else if(state_ == sShuttingDown) {
01524 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01525 forceLooperToEnd_ = true;
01526 machine_->process_event(statemachine::Stop());
01527 forceLooperToEnd_ = false;
01528 break;
01529 }
01530
01531
01532 {
01533 boost::mutex::scoped_lock sl(usr2_lock);
01534 if(shutdown_flag) {
01535 changeState(mShutdownSignal);
01536 returnCode = epSignal;
01537 forceLooperToEnd_ = true;
01538 machine_->process_event(statemachine::Stop());
01539 forceLooperToEnd_ = false;
01540 break;
01541 }
01542 }
01543
01544 if(itemType == InputSource::IsStop) {
01545 machine_->process_event(statemachine::Stop());
01546 }
01547 else if(itemType == InputSource::IsFile) {
01548 machine_->process_event(statemachine::File());
01549 }
01550 else if(itemType == InputSource::IsRun) {
01551 machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run()));
01552 }
01553 else if(itemType == InputSource::IsLumi) {
01554 machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01555 }
01556 else if(itemType == InputSource::IsEvent) {
01557 machine_->process_event(statemachine::Event());
01558 ++iEvents;
01559 if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01560 returnCode = epCountComplete;
01561 changeState(mInputExhausted);
01562 FDEBUG(1) << "Event count complete, pausing event loop\n";
01563 break;
01564 }
01565 }
01566
01567 else {
01568 throw Exception(errors::LogicError)
01569 << "Unknown next item type passed to EventProcessor\n"
01570 << "Please report this error to the Framework group\n";
01571 }
01572
01573 if(machine_->terminated()) {
01574 changeState(mInputExhausted);
01575 break;
01576 }
01577 }
01578 }
01579
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609
01610
01611
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622
01623
01624
01625 catch (cms::Exception& e) {
01626 alreadyHandlingException_ = true;
01627 terminateMachine();
01628 alreadyHandlingException_ = false;
01629 e << "cms::Exception caught in EventProcessor and rethrown\n";
01630 e << exceptionMessageLumis_;
01631 e << exceptionMessageRuns_;
01632 e << exceptionMessageFiles_;
01633 throw;
01634 }
01635 catch (std::bad_alloc& e) {
01636 alreadyHandlingException_ = true;
01637 terminateMachine();
01638 alreadyHandlingException_ = false;
01639 throw cms::Exception("std::bad_alloc")
01640 << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n"
01641 << "The job has probably exhausted the virtual memory available to the process.\n"
01642 << exceptionMessageLumis_
01643 << exceptionMessageRuns_
01644 << exceptionMessageFiles_;
01645 }
01646 catch (std::exception& e) {
01647 alreadyHandlingException_ = true;
01648 terminateMachine();
01649 alreadyHandlingException_ = false;
01650 throw cms::Exception("StdException")
01651 << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n"
01652 << "Previous information:\n" << e.what() << "\n"
01653 << exceptionMessageLumis_
01654 << exceptionMessageRuns_
01655 << exceptionMessageFiles_;
01656 }
01657 catch (...) {
01658 alreadyHandlingException_ = true;
01659 terminateMachine();
01660 alreadyHandlingException_ = false;
01661 throw cms::Exception("Unknown")
01662 << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n"
01663 << exceptionMessageLumis_
01664 << exceptionMessageRuns_
01665 << exceptionMessageFiles_;
01666 }
01667
01668 if(machine_->terminated()) {
01669 FDEBUG(1) << "The state machine reports it has been terminated\n";
01670 machine_.reset();
01671 }
01672
01673 if(!onlineStateTransitions) changeState(mFinished);
01674
01675 if(stateMachineWasInErrorState_) {
01676 throw cms::Exception("BadState")
01677 << "The boost state machine in the EventProcessor exited after\n"
01678 << "entering the Error state.\n";
01679 }
01680
01681 return returnCode;
01682 }
01683
01684 void EventProcessor::readFile() {
01685 FDEBUG(1) << " \treadFile\n";
01686 fb_ = input_->readFile();
01687 if(numberOfForkedChildren_ > 0) {
01688 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01689 }
01690 }
01691
01692 void EventProcessor::closeInputFile() {
01693 input_->closeFile(fb_);
01694 FDEBUG(1) << "\tcloseInputFile\n";
01695 }
01696
01697 void EventProcessor::openOutputFiles() {
01698 schedule_->openOutputFiles(*fb_);
01699 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01700 FDEBUG(1) << "\topenOutputFiles\n";
01701 }
01702
01703 void EventProcessor::closeOutputFiles() {
01704 schedule_->closeOutputFiles();
01705 if(hasSubProcess()) subProcess_->closeOutputFiles();
01706 FDEBUG(1) << "\tcloseOutputFiles\n";
01707 }
01708
01709 void EventProcessor::respondToOpenInputFile() {
01710 schedule_->respondToOpenInputFile(*fb_);
01711 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01712 FDEBUG(1) << "\trespondToOpenInputFile\n";
01713 }
01714
01715 void EventProcessor::respondToCloseInputFile() {
01716 schedule_->respondToCloseInputFile(*fb_);
01717 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01718 FDEBUG(1) << "\trespondToCloseInputFile\n";
01719 }
01720
01721 void EventProcessor::respondToOpenOutputFiles() {
01722 schedule_->respondToOpenOutputFiles(*fb_);
01723 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01724 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01725 }
01726
01727 void EventProcessor::respondToCloseOutputFiles() {
01728 schedule_->respondToCloseOutputFiles(*fb_);
01729 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01730 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01731 }
01732
01733 void EventProcessor::startingNewLoop() {
01734 shouldWeStop_ = false;
01735
01736
01737 if(looper_ && looperBeginJobRun_) {
01738 looper_->doStartingNewLoop();
01739 }
01740 FDEBUG(1) << "\tstartingNewLoop\n";
01741 }
01742
01743 bool EventProcessor::endOfLoop() {
01744 if(looper_) {
01745 ModuleChanger changer(schedule_.get());
01746 looper_->setModuleChanger(&changer);
01747 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01748 looper_->setModuleChanger(0);
01749 if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01750 else return false;
01751 }
01752 FDEBUG(1) << "\tendOfLoop\n";
01753 return true;
01754 }
01755
01756 void EventProcessor::rewindInput() {
01757 input_->repeat();
01758 input_->rewind();
01759 FDEBUG(1) << "\trewind\n";
01760 }
01761
01762 void EventProcessor::prepareForNextLoop() {
01763 looper_->prepareForNextLoop(esp_.get());
01764 FDEBUG(1) << "\tprepareForNextLoop\n";
01765 }
01766
01767 bool EventProcessor::shouldWeCloseOutput() const {
01768 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01769 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01770 }
01771
01772 void EventProcessor::doErrorStuff() {
01773 FDEBUG(1) << "\tdoErrorStuff\n";
01774 LogError("StateMachine")
01775 << "The EventProcessor state machine encountered an unexpected event\n"
01776 << "and went to the error state\n"
01777 << "Will attempt to terminate processing normally\n"
01778 << "(IF using the looper the next loop will be attempted)\n"
01779 << "This likely indicates a bug in an input module or corrupted input or both\n";
01780 stateMachineWasInErrorState_ = true;
01781 }
01782
01783 void EventProcessor::beginRun(statemachine::Run const& run) {
01784 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01785 input_->doBeginRun(runPrincipal);
01786 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01787 runPrincipal.beginTime());
01788 if(forceESCacheClearOnNewRun_){
01789 esp_->forceCacheClear();
01790 }
01791 EventSetup const& es = esp_->eventSetupForInstance(ts);
01792 if(looper_ && looperBeginJobRun_== false) {
01793 looper_->copyInfo(ScheduleInfo(schedule_.get()));
01794 looper_->beginOfJob(es);
01795 looperBeginJobRun_ = true;
01796 looper_->doStartingNewLoop();
01797 }
01798 {
01799 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01800 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01801 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01802 if(hasSubProcess()) {
01803 subProcess_->doBeginRun(runPrincipal, ts);
01804 }
01805 }
01806 FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01807 if(looper_) {
01808 looper_->doBeginRun(runPrincipal, es);
01809 }
01810 }
01811
01812 void EventProcessor::endRun(statemachine::Run const& run) {
01813 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01814 input_->doEndRun(runPrincipal);
01815 IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01816 runPrincipal.endTime());
01817 EventSetup const& es = esp_->eventSetupForInstance(ts);
01818 {
01819 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
01820 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01821 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01822 if(hasSubProcess()) {
01823 subProcess_->doEndRun(runPrincipal, ts);
01824 }
01825 }
01826 FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
01827 if(looper_) {
01828 looper_->doEndRun(runPrincipal, es);
01829 }
01830 }
01831
01832 void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
01833 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01834 input_->doBeginLumi(lumiPrincipal);
01835
01836 Service<RandomNumberGenerator> rng;
01837 if(rng.isAvailable()) {
01838 LuminosityBlock lb(lumiPrincipal, ModuleDescription());
01839 rng->preBeginLumi(lb);
01840 }
01841
01842
01843
01844 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
01845 EventSetup const& es = esp_->eventSetupForInstance(ts);
01846 {
01847 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
01848 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01849 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01850 if(hasSubProcess()) {
01851 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
01852 }
01853 }
01854 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01855 if(looper_) {
01856 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
01857 }
01858 }
01859
01860 void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
01861 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01862 input_->doEndLumi(lumiPrincipal);
01863
01864
01865 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
01866 lumiPrincipal.endTime());
01867 EventSetup const& es = esp_->eventSetupForInstance(ts);
01868 {
01869 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
01870 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01871 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01872 if(hasSubProcess()) {
01873 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts);
01874 }
01875 }
01876 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01877 if(looper_) {
01878 looper_->doEndLuminosityBlock(lumiPrincipal, es);
01879 }
01880 }
01881
01882 statemachine::Run EventProcessor::readAndCacheRun() {
01883 input_->readAndCacheRun();
01884 input_->markRun();
01885 return statemachine::Run(input_->processHistoryID(), input_->run());
01886 }
01887
01888 int EventProcessor::readAndCacheLumi() {
01889 input_->readAndCacheLumi();
01890 input_->markLumi();
01891 return input_->luminosityBlock();
01892 }
01893
01894 void EventProcessor::writeRun(statemachine::Run const& run) {
01895 schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
01896 if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
01897 FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
01898 }
01899
01900 void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
01901 principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
01902 FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
01903 }
01904
01905 void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
01906 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
01907 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
01908 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01909 }
01910
01911 void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
01912 principalCache_.deleteLumi(phid, run, lumi);
01913 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01914 }
01915
01916 void EventProcessor::readAndProcessEvent() {
01917 EventPrincipal *pep = 0;
01918 try {
01919 pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
01920 FDEBUG(1) << "\treadEvent\n";
01921 }
01922 catch(cms::Exception& e) {
01923 actions::ActionCodes action = act_table_->find(e.rootCause());
01924 if(action == actions::Rethrow) {
01925 throw;
01926 } else {
01927 LogWarning(e.category())
01928 << "an exception occurred and all paths for the event are being skipped: \n"
01929 << e.what();
01930 return;
01931 }
01932 }
01933 assert(pep != 0);
01934
01935 IOVSyncValue ts(pep->id(), pep->time());
01936 EventSetup const& es = esp_->eventSetupForInstance(ts);
01937 {
01938 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
01939 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
01940 schedule_->processOneOccurrence<Traits>(*pep, es);
01941 if(hasSubProcess()) {
01942 subProcess_->doEvent(*pep, ts);
01943 }
01944 }
01945
01946 if(looper_) {
01947 bool randomAccess = input_->randomAccess();
01948 ProcessingController::ForwardState forwardState = input_->forwardState();
01949 ProcessingController::ReverseState reverseState = input_->reverseState();
01950 ProcessingController pc(forwardState, reverseState, randomAccess);
01951
01952 EDLooperBase::Status status = EDLooperBase::kContinue;
01953 do {
01954 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
01955
01956 bool succeeded = true;
01957 if(randomAccess) {
01958 if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
01959 input_->skipEvents(-2);
01960 }
01961 else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
01962 succeeded = input_->goToEvent(pc.specifiedEventTransition());
01963 }
01964 }
01965 pc.setLastOperationSucceeded(succeeded);
01966 } while(!pc.lastOperationSucceeded());
01967 if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
01968
01969 }
01970
01971 FDEBUG(1) << "\tprocessEvent\n";
01972 pep->clearEventPrincipal();
01973 }
01974
01975 bool EventProcessor::shouldWeStop() const {
01976 FDEBUG(1) << "\tshouldWeStop\n";
01977 if(shouldWeStop_) return true;
01978 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
01979 }
01980
01981 void EventProcessor::setExceptionMessageFiles(std::string& message) {
01982 exceptionMessageFiles_ = message;
01983 }
01984
01985 void EventProcessor::setExceptionMessageRuns(std::string& message) {
01986 exceptionMessageRuns_ = message;
01987 }
01988
01989 void EventProcessor::setExceptionMessageLumis(std::string& message) {
01990 exceptionMessageLumis_ = message;
01991 }
01992
01993 bool EventProcessor::alreadyHandlingException() const {
01994 return alreadyHandlingException_;
01995 }
01996
01997 void EventProcessor::terminateMachine() {
01998 if(machine_.get() != 0) {
01999 if(!machine_->terminated()) {
02000 forceLooperToEnd_ = true;
02001 machine_->process_event(statemachine::Stop());
02002 forceLooperToEnd_ = false;
02003 }
02004 else {
02005 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
02006 }
02007 if(machine_->terminated()) {
02008 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02009 }
02010 machine_.reset();
02011 }
02012 }
02013 }