00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/InputSourceDescription.h"
00019 #include "FWCore/Framework/interface/IOVSyncValue.h"
00020 #include "FWCore/Framework/interface/LooperFactory.h"
00021 #include "FWCore/Framework/interface/LuminosityBlock.h"
00022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00023 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00024 #include "FWCore/Framework/interface/ModuleChanger.h"
00025 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00026 #include "FWCore/Framework/interface/ProcessingController.h"
00027 #include "FWCore/Framework/interface/RunPrincipal.h"
00028 #include "FWCore/Framework/interface/Schedule.h"
00029 #include "FWCore/Framework/interface/ScheduleInfo.h"
00030 #include "FWCore/Framework/interface/SubProcess.h"
00031 #include "FWCore/Framework/src/Breakpoints.h"
00032 #include "FWCore/Framework/src/EPStates.h"
00033 #include "FWCore/Framework/src/EventSetupsController.h"
00034 #include "FWCore/Framework/src/InputSourceFactory.h"
00035
00036 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00037
00038 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00039 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00040 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00042 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00043 #include "FWCore/ParameterSet/interface/Registry.h"
00044 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00045
00046 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00047 #include "FWCore/ServiceRegistry/interface/Service.h"
00048
00049 #include "FWCore/Utilities/interface/DebugMacros.h"
00050 #include "FWCore/Utilities/interface/EDMException.h"
00051 #include "FWCore/Utilities/interface/Exception.h"
00052 #include "FWCore/Utilities/interface/ConvertException.h"
00053 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00054 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00055 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00056
00057 #include "boost/bind.hpp"
00058 #include "boost/thread/xtime.hpp"
00059
00060 #include <exception>
00061 #include <iomanip>
00062 #include <iostream>
00063 #include <utility>
00064 #include <sstream>
00065
00066 #include <sys/ipc.h>
00067 #include <sys/msg.h>
00068
00069
00070 #include <sys/types.h>
00071 #include <sys/wait.h>
00072 #include <unistd.h>
00073
00074
00075 #ifndef __APPLE__
00076 #include <sched.h>
00077 #endif
00078
00079 namespace edm {
00080
00081 namespace event_processor {
00082
00083 class StateSentry {
00084 public:
00085 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00086 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00087 void succeeded() {success_ = true;}
00088
00089 private:
00090 EventProcessor* ep_;
00091 bool success_;
00092 };
00093 }
00094
00095 using namespace event_processor;
00096
00097 namespace {
00098 template <typename T>
00099 class ScheduleSignalSentry {
00100 public:
00101 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00102 a_(a), principal_(principal), es_(es) {
00103 if (a_) T::preScheduleSignal(a_, principal_);
00104 }
00105 ~ScheduleSignalSentry() {
00106 if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00107 }
00108
00109 private:
00110
00111 ActivityRegistry* a_;
00112 typename T::MyPrincipal* principal_;
00113 EventSetup const* es_;
00114 };
00115 }
00116
00117 namespace {
00118
00119
00120
00121
00122 char const* stateNames[] = {
00123 "Init",
00124 "JobReady",
00125 "RunGiven",
00126 "Running",
00127 "Stopping",
00128 "ShuttingDown",
00129 "Done",
00130 "JobEnded",
00131 "Error",
00132 "ErrorEnded",
00133 "End",
00134 "Invalid"
00135 };
00136
00137 char const* msgNames[] = {
00138 "SetRun",
00139 "Skip",
00140 "RunAsync",
00141 "Run(ID)",
00142 "Run(count)",
00143 "BeginJob",
00144 "StopAsync",
00145 "ShutdownAsync",
00146 "EndJob",
00147 "CountComplete",
00148 "InputExhausted",
00149 "StopSignal",
00150 "ShutdownSignal",
00151 "Finished",
00152 "Any",
00153 "dtor",
00154 "Exception",
00155 "Rewind"
00156 };
00157 }
00158
00159
00160
00161
00162
00163 struct TransEntry {
00164 State current;
00165 Msg message;
00166 State final;
00167 };
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190 TransEntry table[] = {
00191
00192
00193 { sInit, mException, sError },
00194 { sInit, mBeginJob, sJobReady },
00195 { sJobReady, mException, sError },
00196 { sJobReady, mSetRun, sRunGiven },
00197 { sJobReady, mInputRewind, sRunning },
00198 { sJobReady, mSkip, sRunning },
00199 { sJobReady, mRunID, sRunning },
00200 { sJobReady, mRunCount, sRunning },
00201 { sJobReady, mEndJob, sJobEnded },
00202 { sJobReady, mBeginJob, sJobReady },
00203 { sJobReady, mDtor, sEnd },
00204
00205 { sJobReady, mStopAsync, sJobReady },
00206 { sJobReady, mCountComplete, sJobReady },
00207 { sJobReady, mFinished, sJobReady },
00208
00209 { sRunGiven, mException, sError },
00210 { sRunGiven, mRunAsync, sRunning },
00211 { sRunGiven, mBeginJob, sRunGiven },
00212 { sRunGiven, mShutdownAsync, sShuttingDown },
00213 { sRunGiven, mStopAsync, sStopping },
00214 { sRunning, mException, sError },
00215 { sRunning, mStopAsync, sStopping },
00216 { sRunning, mShutdownAsync, sShuttingDown },
00217 { sRunning, mShutdownSignal, sShuttingDown },
00218 { sRunning, mCountComplete, sStopping },
00219 { sRunning, mInputExhausted, sStopping },
00220
00221 { sStopping, mInputRewind, sRunning },
00222 { sStopping, mException, sError },
00223 { sStopping, mFinished, sJobReady },
00224 { sStopping, mCountComplete, sJobReady },
00225 { sStopping, mShutdownSignal, sShuttingDown },
00226 { sStopping, mStopAsync, sStopping },
00227 { sStopping, mInputExhausted, sStopping },
00228
00229 { sShuttingDown, mException, sError },
00230 { sShuttingDown, mShutdownSignal, sShuttingDown },
00231 { sShuttingDown, mCountComplete, sDone },
00232 { sShuttingDown, mInputExhausted, sDone },
00233 { sShuttingDown, mFinished, sDone },
00234
00235
00236
00237 { sDone, mEndJob, sJobEnded },
00238 { sDone, mException, sError },
00239 { sJobEnded, mDtor, sEnd },
00240 { sJobEnded, mException, sError },
00241 { sError, mEndJob, sError },
00242 { sError, mDtor, sError },
00243 { sInit, mDtor, sEnd },
00244 { sStopping, mShutdownAsync, sShuttingDown },
00245 { sInvalid, mAny, sInvalid }
00246 };
00247
00248
00249
00250
00251
00252
00253 boost::shared_ptr<InputSource>
00254 makeInput(ParameterSet& params,
00255 CommonParams const& common,
00256 ProductRegistry& preg,
00257 PrincipalCache& pCache,
00258 boost::shared_ptr<ActivityRegistry> areg,
00259 boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00260 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00261 if(main_input == 0) {
00262 throw Exception(errors::Configuration)
00263 << "There must be exactly one source in the configuration.\n"
00264 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00265 }
00266
00267 std::string modtype(main_input->getParameter<std::string>("@module_type"));
00268
00269 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00270 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00271 ConfigurationDescriptions descriptions(filler->baseType());
00272 filler->fill(descriptions);
00273
00274 try {
00275 try {
00276 descriptions.validate(*main_input, std::string("source"));
00277 }
00278 catch (cms::Exception& e) { throw; }
00279 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00280 catch (std::exception& e) { convertException::stdToEDM(e); }
00281 catch(std::string& s) { convertException::stringToEDM(s); }
00282 catch(char const* c) { convertException::charPtrToEDM(c); }
00283 catch (...) { convertException::unknownToEDM(); }
00284 }
00285 catch (cms::Exception & iException) {
00286 std::ostringstream ost;
00287 ost << "Validating configuration of input source of type " << modtype;
00288 iException.addContext(ost.str());
00289 throw;
00290 }
00291
00292 main_input->registerIt();
00293
00294
00295
00296
00297
00298
00299
00300 ModuleDescription md(main_input->id(),
00301 main_input->getParameter<std::string>("@module_type"),
00302 "source",
00303 processConfiguration.get());
00304
00305 InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00306 areg->preSourceConstructionSignal_(md);
00307 boost::shared_ptr<InputSource> input;
00308 try {
00309 try {
00310 input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00311 }
00312 catch (cms::Exception& e) { throw; }
00313 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00314 catch (std::exception& e) { convertException::stdToEDM(e); }
00315 catch(std::string& s) { convertException::stringToEDM(s); }
00316 catch(char const* c) { convertException::charPtrToEDM(c); }
00317 catch (...) { convertException::unknownToEDM(); }
00318 }
00319 catch (cms::Exception& iException) {
00320 areg->postSourceConstructionSignal_(md);
00321 std::ostringstream ost;
00322 ost << "Constructing input source of type " << modtype;
00323 iException.addContext(ost.str());
00324 throw;
00325 }
00326 areg->postSourceConstructionSignal_(md);
00327 return input;
00328 }
00329
00330
00331 boost::shared_ptr<EDLooperBase>
00332 fillLooper(eventsetup::EventSetupProvider& cp,
00333 ParameterSet& params,
00334 CommonParams const& common) {
00335 boost::shared_ptr<EDLooperBase> vLooper;
00336
00337 std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00338
00339 if(loopers.size() == 0) {
00340 return vLooper;
00341 }
00342
00343 assert(1 == loopers.size());
00344
00345 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00346 itName != itNameEnd;
00347 ++itName) {
00348
00349 ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00350 providerPSet->registerIt();
00351 vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00352 *providerPSet,
00353 common.processName_,
00354 common.releaseVersion_,
00355 common.passID_);
00356 }
00357 return vLooper;
00358
00359 }
00360
00361
00362 EventProcessor::EventProcessor(std::string const& config,
00363 ServiceToken const& iToken,
00364 serviceregistry::ServiceLegacy iLegacy,
00365 std::vector<std::string> const& defaultServices,
00366 std::vector<std::string> const& forcedServices) :
00367 preProcessEventSignal_(),
00368 postProcessEventSignal_(),
00369 actReg_(),
00370 preg_(),
00371 serviceToken_(),
00372 input_(),
00373 esp_(),
00374 act_table_(),
00375 processConfiguration_(),
00376 schedule_(),
00377 subProcess_(),
00378 state_(sInit),
00379 event_loop_(),
00380 state_lock_(),
00381 stop_lock_(),
00382 stopper_(),
00383 starter_(),
00384 stop_count_(-1),
00385 last_rc_(epSuccess),
00386 last_error_text_(),
00387 id_set_(false),
00388 event_loop_id_(),
00389 my_sig_num_(getSigNum()),
00390 fb_(),
00391 looper_(),
00392 machine_(),
00393 principalCache_(),
00394 shouldWeStop_(false),
00395 stateMachineWasInErrorState_(false),
00396 fileMode_(),
00397 emptyRunLumiMode_(),
00398 exceptionMessageFiles_(),
00399 exceptionMessageRuns_(),
00400 exceptionMessageLumis_(),
00401 alreadyHandlingException_(false),
00402 forceLooperToEnd_(false),
00403 looperBeginJobRun_(false),
00404 forceESCacheClearOnNewRun_(false),
00405 numberOfForkedChildren_(0),
00406 numberOfSequentialEventsPerChild_(1),
00407 setCpuAffinity_(false),
00408 eventSetupDataToExcludeFromPrefetching_() {
00409 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00410 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00411 processDesc->addServices(defaultServices, forcedServices);
00412 init(processDesc, iToken, iLegacy);
00413 }
00414
00415 EventProcessor::EventProcessor(std::string const& config,
00416 std::vector<std::string> const& defaultServices,
00417 std::vector<std::string> const& forcedServices) :
00418 preProcessEventSignal_(),
00419 postProcessEventSignal_(),
00420 actReg_(),
00421 preg_(),
00422 serviceToken_(),
00423 input_(),
00424 esp_(),
00425 act_table_(),
00426 processConfiguration_(),
00427 schedule_(),
00428 subProcess_(),
00429 state_(sInit),
00430 event_loop_(),
00431 state_lock_(),
00432 stop_lock_(),
00433 stopper_(),
00434 starter_(),
00435 stop_count_(-1),
00436 last_rc_(epSuccess),
00437 last_error_text_(),
00438 id_set_(false),
00439 event_loop_id_(),
00440 my_sig_num_(getSigNum()),
00441 fb_(),
00442 looper_(),
00443 machine_(),
00444 principalCache_(),
00445 shouldWeStop_(false),
00446 stateMachineWasInErrorState_(false),
00447 fileMode_(),
00448 emptyRunLumiMode_(),
00449 exceptionMessageFiles_(),
00450 exceptionMessageRuns_(),
00451 exceptionMessageLumis_(),
00452 alreadyHandlingException_(false),
00453 forceLooperToEnd_(false),
00454 looperBeginJobRun_(false),
00455 forceESCacheClearOnNewRun_(false),
00456 numberOfForkedChildren_(0),
00457 numberOfSequentialEventsPerChild_(1),
00458 setCpuAffinity_(false),
00459 eventSetupDataToExcludeFromPrefetching_() {
00460 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00461 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00462 processDesc->addServices(defaultServices, forcedServices);
00463 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00464 }
00465
00466 EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00467 ServiceToken const& token,
00468 serviceregistry::ServiceLegacy legacy) :
00469 preProcessEventSignal_(),
00470 postProcessEventSignal_(),
00471 actReg_(),
00472 preg_(),
00473 serviceToken_(),
00474 input_(),
00475 esp_(),
00476 act_table_(),
00477 processConfiguration_(),
00478 schedule_(),
00479 subProcess_(),
00480 state_(sInit),
00481 event_loop_(),
00482 state_lock_(),
00483 stop_lock_(),
00484 stopper_(),
00485 starter_(),
00486 stop_count_(-1),
00487 last_rc_(epSuccess),
00488 last_error_text_(),
00489 id_set_(false),
00490 event_loop_id_(),
00491 my_sig_num_(getSigNum()),
00492 fb_(),
00493 looper_(),
00494 machine_(),
00495 principalCache_(),
00496 shouldWeStop_(false),
00497 stateMachineWasInErrorState_(false),
00498 fileMode_(),
00499 emptyRunLumiMode_(),
00500 exceptionMessageFiles_(),
00501 exceptionMessageRuns_(),
00502 exceptionMessageLumis_(),
00503 alreadyHandlingException_(false),
00504 forceLooperToEnd_(false),
00505 looperBeginJobRun_(false),
00506 forceESCacheClearOnNewRun_(false),
00507 numberOfForkedChildren_(0),
00508 numberOfSequentialEventsPerChild_(1),
00509 setCpuAffinity_(false),
00510 eventSetupDataToExcludeFromPrefetching_() {
00511 init(processDesc, token, legacy);
00512 }
00513
00514
00515 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00516 preProcessEventSignal_(),
00517 postProcessEventSignal_(),
00518 actReg_(),
00519 preg_(),
00520 serviceToken_(),
00521 input_(),
00522 esp_(),
00523 act_table_(),
00524 processConfiguration_(),
00525 schedule_(),
00526 subProcess_(),
00527 state_(sInit),
00528 event_loop_(),
00529 state_lock_(),
00530 stop_lock_(),
00531 stopper_(),
00532 starter_(),
00533 stop_count_(-1),
00534 last_rc_(epSuccess),
00535 last_error_text_(),
00536 id_set_(false),
00537 event_loop_id_(),
00538 my_sig_num_(getSigNum()),
00539 fb_(),
00540 looper_(),
00541 machine_(),
00542 principalCache_(),
00543 shouldWeStop_(false),
00544 stateMachineWasInErrorState_(false),
00545 fileMode_(),
00546 emptyRunLumiMode_(),
00547 exceptionMessageFiles_(),
00548 exceptionMessageRuns_(),
00549 exceptionMessageLumis_(),
00550 alreadyHandlingException_(false),
00551 forceLooperToEnd_(false),
00552 looperBeginJobRun_(false),
00553 forceESCacheClearOnNewRun_(false),
00554 numberOfForkedChildren_(0),
00555 numberOfSequentialEventsPerChild_(1),
00556 setCpuAffinity_(false),
00557 eventSetupDataToExcludeFromPrefetching_() {
00558 if(isPython) {
00559 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00560 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00561 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00562 }
00563 else {
00564 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00565 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00566 }
00567 }
00568
00569 void
00570 EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00571 ServiceToken const& iToken,
00572 serviceregistry::ServiceLegacy iLegacy) {
00573
00574
00575
00576
00577 BranchIDListHelper::clearRegistries();
00578
00579 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00580
00581
00582
00583 boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00584
00585
00586 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00587 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00588 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00589 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00590 ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00591 numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00592 numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00593 setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00594 std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00595 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00596 itPS != itPSEnd;
00597 ++itPS) {
00598 eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00599 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00600 itPS->getUntrackedParameter<std::string>("label", "")));
00601 }
00602 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00603
00604 std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00605
00606
00607 ScheduleItems items;
00608
00609
00610 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00611 ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00612 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00613
00614
00615 ServiceRegistry::Operate operate(serviceToken_);
00616
00617
00618 boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00619
00620
00621 esp_ = espController->makeProvider(*parameterSet, *common);
00622
00623
00624 looper_ = fillLooper(*esp_, *parameterSet, *common);
00625 if(looper_) {
00626 looper_->setActionTable(items.act_table_.get());
00627 looper_->attachTo(*items.actReg_);
00628 }
00629
00630
00631 input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00632
00633
00634 schedule_ = items.initSchedule(*parameterSet);
00635
00636
00637 act_table_ = items.act_table_;
00638 actReg_ = items.actReg_;
00639 preg_ = items.preg_;
00640 processConfiguration_ = items.processConfiguration_;
00641
00642 FDEBUG(2) << parameterSet << std::endl;
00643 connectSigs(this);
00644
00645
00646 if(subProcessParameterSet) {
00647 subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController, *actReg_, token, serviceregistry::kConfigurationOverrides));
00648 }
00649 }
00650
00651 EventProcessor::~EventProcessor() {
00652
00653 ServiceToken token = getToken();
00654 ServiceRegistry::Operate op(token);
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665 terminateMachine();
00666
00667 try {
00668 changeState(mDtor);
00669 }
00670 catch(cms::Exception& e) {
00671 LogError("System")
00672 << e.explainSelf() << "\n";
00673 }
00674
00675
00676 subProcess_.reset();
00677 esp_.reset();
00678 schedule_.reset();
00679 input_.reset();
00680 looper_.reset();
00681 actReg_.reset();
00682
00683 pset::Registry* psetRegistry = pset::Registry::instance();
00684 psetRegistry->data().clear();
00685 psetRegistry->extra().setID(ParameterSetID());
00686
00687 EntryDescriptionRegistry::instance()->data().clear();
00688 ParentageRegistry::instance()->data().clear();
00689 ProcessConfigurationRegistry::instance()->data().clear();
00690 ProcessHistoryRegistry::instance()->data().clear();
00691 BranchIDListHelper::clearRegistries();
00692 }
00693
00694 void
00695 EventProcessor::rewind() {
00696 beginJob();
00697 changeState(mStopAsync);
00698 changeState(mInputRewind);
00699 {
00700 StateSentry toerror(this);
00701
00702
00703 ServiceRegistry::Operate operate(serviceToken_);
00704
00705 {
00706 input_->repeat();
00707 input_->rewind();
00708 }
00709 changeState(mCountComplete);
00710 toerror.succeeded();
00711 }
00712 changeState(mFinished);
00713 }
00714
00715 EventProcessor::StatusCode
00716 EventProcessor::run(int numberEventsToProcess, bool) {
00717 return runEventCount(numberEventsToProcess);
00718 }
00719
00720 EventProcessor::StatusCode
00721 EventProcessor::skip(int numberToSkip) {
00722 beginJob();
00723 changeState(mSkip);
00724 {
00725 StateSentry toerror(this);
00726
00727
00728 ServiceRegistry::Operate operate(serviceToken_);
00729
00730 {
00731 input_->skipEvents(numberToSkip);
00732 }
00733 changeState(mCountComplete);
00734 toerror.succeeded();
00735 }
00736 changeState(mFinished);
00737 return epSuccess;
00738 }
00739
00740 void
00741 EventProcessor::beginJob() {
00742 if(state_ != sInit) return;
00743 bk::beginJob();
00744
00745 changeState(mBeginJob);
00746
00747
00748
00749 ServiceRegistry::Operate operate(serviceToken_);
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765 try {
00766 try {
00767 input_->doBeginJob();
00768 }
00769 catch (cms::Exception& e) { throw; }
00770 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00771 catch (std::exception& e) { convertException::stdToEDM(e); }
00772 catch(std::string& s) { convertException::stringToEDM(s); }
00773 catch(char const* c) { convertException::charPtrToEDM(c); }
00774 catch (...) { convertException::unknownToEDM(); }
00775 }
00776 catch(cms::Exception& ex) {
00777 ex.addContext("Calling beginJob for the source");
00778 throw;
00779 }
00780 schedule_->beginJob();
00781
00782 if(hasSubProcess()) subProcess_->doBeginJob();
00783 actReg_->postBeginJobSignal_();
00784 }
00785
00786 void
00787 EventProcessor::endJob() {
00788
00789 ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00790
00791
00792 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00793
00794
00795 ServiceRegistry::Operate operate(serviceToken_);
00796
00797 c.call(boost::bind(&EventProcessor::terminateMachine, this));
00798 schedule_->endJob(c);
00799 if(hasSubProcess()) {
00800 c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00801 }
00802 c.call(boost::bind(&InputSource::doEndJob, input_));
00803 if(looper_) {
00804 c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00805 }
00806 c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00807 if(c.hasThrown()) {
00808 c.rethrow();
00809 }
00810 }
00811
00812 ServiceToken
00813 EventProcessor::getToken() {
00814 return serviceToken_;
00815 }
00816
00817
00818 namespace {
00819 volatile bool child_failed = false;
00820 volatile unsigned int num_children_done = 0;
00821 volatile int child_fail_exit_status = 0;
00822
00823 extern "C" {
00824 void ep_sigchld(int, siginfo_t*, void*) {
00825
00826
00827 int stat_loc;
00828 pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00829 while(0<p) {
00830
00831 if(WIFEXITED(stat_loc)) {
00832 ++num_children_done;
00833 if(0 != WEXITSTATUS(stat_loc)) {
00834 child_failed = true;
00835 child_fail_exit_status = WEXITSTATUS(stat_loc);
00836 }
00837 }
00838 if(WIFSIGNALED(stat_loc)) {
00839 ++num_children_done;
00840 child_failed = true;
00841 }
00842 p = waitpid(-1, &stat_loc, WNOHANG);
00843 }
00844 }
00845 }
00846
00847 }
00848
00849 enum {
00850 kChildSucceed,
00851 kChildExitBadly,
00852 kChildSegv,
00853 kMaxChildAction
00854 };
00855
00856 namespace {
00857 unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00858 unsigned int n = 0;
00859 while(numberOfChildren != 0) {
00860 ++n;
00861 numberOfChildren /= 10;
00862 }
00863 if(n == 0) {
00864 n = 3;
00865 }
00866 return n;
00867 }
00868
00869 class MessageSenderToSource {
00870 public:
00871 MessageSenderToSource(int iQueueID, long iNEventsToProcess):
00872 m_queueID(iQueueID),
00873 m_nEventsToProcess(iNEventsToProcess) {}
00874 void operator()() {
00875 std::cout << "I am controller" << std::endl;
00876
00877
00878 multicore::MessageForSource sndmsg;
00879 sndmsg.startIndex = 0;
00880 sndmsg.nIndices = m_nEventsToProcess;
00881 if(m_nEventsToProcess > 0 && m_nEventsToProcess < m_nEventsToProcess) {
00882 sndmsg.nIndices = m_nEventsToProcess;
00883 }
00884 int value = 0;
00885 do {
00886 value = msgsnd(m_queueID, &sndmsg, multicore::MessageForSource::sizeForBuffer(), 0);
00887
00888 sndmsg.startIndex += sndmsg.nIndices;
00889 } while(value == 0);
00890 if(errno != EIDRM and errno != EINVAL) {
00891
00892 perror("message queue send failed");
00893 }
00894 return;
00895 }
00896
00897 private:
00898 int const m_queueID;
00899 long const m_nEventsToProcess;
00900 };
00901 }
00902
00903 bool
00904 EventProcessor::forkProcess(std::string const& jobReportFile) {
00905
00906 if(0 == numberOfForkedChildren_) {return true;}
00907 assert(0<numberOfForkedChildren_);
00908
00909 {
00910 beginJob();
00911
00912 ServiceRegistry::Operate operate(serviceToken_);
00913
00914 InputSource::ItemType itemType;
00915 itemType = input_->nextItemType();
00916
00917 assert(itemType == InputSource::IsFile);
00918 {
00919 readFile();
00920 }
00921 itemType = input_->nextItemType();
00922 assert(itemType == InputSource::IsRun);
00923
00924 std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl;
00925 IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
00926 input_->runAuxiliary()->beginTime());
00927 EventSetup const& es = esp_->eventSetupForInstance(ts);
00928
00929
00930 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
00931 es.fillAvailableRecordKeys(recordKeys);
00932 std::vector<eventsetup::DataKey> dataKeys;
00933 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
00934 itKey != itEnd;
00935 ++itKey) {
00936 eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
00937
00938 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
00939 ExcludedData const* excludedData(0);
00940 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
00941 excludedData = &(itExcludeRec->second);
00942 if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
00943
00944 continue;
00945 }
00946 }
00947 if(0 != recordPtr) {
00948 dataKeys.clear();
00949 recordPtr->fillRegisteredDataKeys(dataKeys);
00950 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
00951 itDataKey != itDataKeyEnd;
00952 ++itDataKey) {
00953
00954 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
00955 std::cout << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
00956 continue;
00957 }
00958 try {
00959 recordPtr->doGet(*itDataKey);
00960 } catch(cms::Exception& e) {
00961 LogWarning("EventSetupPreFetching") << e.what();
00962 }
00963 }
00964 }
00965 }
00966 }
00967 std::cout << " done prefetching" << std::endl;
00968 {
00969
00970 ServiceRegistry::Operate operate(serviceToken_);
00971 Service<JobReport> jobReport;
00972 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
00973
00974
00975 actReg_->preForkReleaseResourcesSignal_();
00976 input_->doPreForkReleaseResources();
00977 schedule_->preForkReleaseResources();
00978 }
00979 installCustomHandler(SIGCHLD, ep_sigchld);
00980
00981
00982 int queueID;
00983 if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) {
00984 printf("Error obtaining message queue\n");
00985 exit(EXIT_FAILURE);
00986 }
00987
00988 unsigned int childIndex = 0;
00989 unsigned int const kMaxChildren = numberOfForkedChildren_;
00990 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
00991 std::vector<pid_t> childrenIds;
00992 childrenIds.reserve(kMaxChildren);
00993 {
00994
00995 ServiceRegistry::Operate operate(serviceToken_);
00996 Service<JobReport> jobReport;
00997 for(; childIndex < kMaxChildren; ++childIndex) {
00998 pid_t value = fork();
00999 if(value == 0) {
01000
01001 fflush(stdout);
01002 fflush(stderr);
01003 std::stringstream stout;
01004 stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01005 if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01006 std::cerr << "Error during freopen of child process "
01007 << childIndex << std::endl;
01008 }
01009 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01010 std::cerr << "Error during dup2 of child process"
01011 << childIndex << std::endl;
01012 }
01013
01014 std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl;
01015 if(setCpuAffinity_) {
01016
01017
01018
01019
01020
01021
01022 #ifdef __APPLE__
01023 std::cout << "Architecture support for CPU affinity not implemented." << std::endl;
01024 #else
01025 std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl;
01026 cpu_set_t mask;
01027 CPU_ZERO(&mask);
01028 CPU_SET(childIndex, &mask);
01029 if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01030 std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl;
01031 exit(-1);
01032 }
01033 #endif
01034 }
01035 break;
01036 }
01037 if(value < 0) {
01038 std::cerr << "failed to create a child" << std::endl;
01039
01040
01041 msgctl(queueID, IPC_RMID, 0);
01042 exit(-1);
01043 }
01044 childrenIds.push_back(value);
01045 }
01046
01047 if(childIndex < kMaxChildren) {
01048 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01049 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01050
01051 boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID));
01052 input_->doPostForkReacquireResources(receiver);
01053 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01054
01055
01056 return true;
01057 }
01058 jobReport->parentAfterFork(jobReportFile);
01059 }
01060
01061
01062
01063
01064
01065
01066
01067
01068 sigset_t blockingSigSet;
01069 sigset_t unblockingSigSet;
01070 sigset_t oldSigSet;
01071 pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01072 pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01073 sigaddset(&blockingSigSet, SIGCHLD);
01074 sigaddset(&blockingSigSet, SIGUSR2);
01075 sigaddset(&blockingSigSet, SIGINT);
01076 sigdelset(&unblockingSigSet, SIGCHLD);
01077 sigdelset(&unblockingSigSet, SIGUSR2);
01078 sigdelset(&unblockingSigSet, SIGINT);
01079 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01080
01081
01082
01083
01084 MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_);
01085 boost::thread senderThread(sender);
01086
01087 while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01088 sigsuspend(&unblockingSigSet);
01089 std::cout << "woke from sigwait" << std::endl;
01090 }
01091 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01092
01093 std::cout << "num children who have already stopped " << num_children_done << std::endl;
01094 if(child_failed) {
01095 std::cout << "child failed" << std::endl;
01096 }
01097 if(shutdown_flag) {
01098 std::cout << "asked to shutdown" << std::endl;
01099 }
01100
01101 if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01102 std::cout << "must stop children" << std::endl;
01103 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01104 it != itEnd; ++it) {
01105 kill(*it, SIGUSR2);
01106 }
01107 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01108 while(num_children_done != kMaxChildren) {
01109 sigsuspend(&unblockingSigSet);
01110 }
01111 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01112 }
01113
01114
01115 msgctl(queueID, IPC_RMID, 0);
01116
01117
01118 senderThread.join();
01119 if(child_failed) {
01120 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01121 }
01122 return false;
01123 }
01124
01125 void
01126 EventProcessor::connectSigs(EventProcessor* ep) {
01127
01128
01129
01130 actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01131 actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01132 }
01133
01134 std::vector<ModuleDescription const*>
01135 EventProcessor::getAllModuleDescriptions() const {
01136 return schedule_->getAllModuleDescriptions();
01137 }
01138
01139 int
01140 EventProcessor::totalEvents() const {
01141 return schedule_->totalEvents();
01142 }
01143
01144 int
01145 EventProcessor::totalEventsPassed() const {
01146 return schedule_->totalEventsPassed();
01147 }
01148
01149 int
01150 EventProcessor::totalEventsFailed() const {
01151 return schedule_->totalEventsFailed();
01152 }
01153
01154 void
01155 EventProcessor::enableEndPaths(bool active) {
01156 schedule_->enableEndPaths(active);
01157 }
01158
01159 bool
01160 EventProcessor::endPathsEnabled() const {
01161 return schedule_->endPathsEnabled();
01162 }
01163
01164 void
01165 EventProcessor::getTriggerReport(TriggerReport& rep) const {
01166 schedule_->getTriggerReport(rep);
01167 }
01168
01169 void
01170 EventProcessor::clearCounters() {
01171 schedule_->clearCounters();
01172 }
01173
01174 char const* EventProcessor::currentStateName() const {
01175 return stateName(getState());
01176 }
01177
01178 char const* EventProcessor::stateName(State s) const {
01179 return stateNames[s];
01180 }
01181
01182 char const* EventProcessor::msgName(Msg m) const {
01183 return msgNames[m];
01184 }
01185
01186 State EventProcessor::getState() const {
01187 return state_;
01188 }
01189
01190 EventProcessor::StatusCode EventProcessor::statusAsync() const {
01191
01192
01193 return last_rc_;
01194 }
01195
01196 void
01197 EventProcessor::setRunNumber(RunNumber_t runNumber) {
01198 if(runNumber == 0) {
01199 runNumber = 1;
01200 LogWarning("Invalid Run")
01201 << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01202 << "Run number was set to 1 instead\n";
01203 }
01204
01205
01206 beginJob();
01207 changeState(mSetRun);
01208
01209
01210 input_->setRunNumber(runNumber);
01211 }
01212
01213 void
01214 EventProcessor::declareRunNumber(RunNumber_t ) {
01215
01216 beginJob();
01217 changeState(mSetRun);
01218
01219
01220
01221 }
01222
01223 EventProcessor::StatusCode
01224 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01225 bool rc = true;
01226 boost::xtime timeout;
01227 boost::xtime_get(&timeout, boost::TIME_UTC);
01228 timeout.sec += timeout_seconds;
01229
01230
01231
01232
01233 {
01234 boost::mutex::scoped_lock sl(stop_lock_);
01235
01236
01237 if(stop_count_ < 0) return last_rc_;
01238
01239 if(timeout_seconds == 0) {
01240 while(stop_count_ == 0) stopper_.wait(sl);
01241 } else {
01242 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01243 }
01244
01245 if(rc == false) {
01246
01247
01248
01249
01250
01251
01252 if(id_set_) pthread_cancel(event_loop_id_);
01253
01254
01255 LogWarning("timeout")
01256 << "An asynchronous request was made to shut down "
01257 << "the event loop "
01258 << "and the event loop did not shutdown after "
01259 << timeout_seconds << " seconds\n";
01260 } else {
01261 event_loop_->join();
01262 event_loop_.reset();
01263 id_set_ = false;
01264 stop_count_ = -1;
01265 }
01266 }
01267 return rc == false ? epTimedOut : last_rc_;
01268 }
01269
01270 EventProcessor::StatusCode
01271 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01272 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01273 if(rc != epTimedOut) changeState(mCountComplete);
01274 else errorState();
01275 return rc;
01276 }
01277
01278
01279 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01280 changeState(mStopAsync);
01281 StatusCode rc = waitForAsyncCompletion(secs);
01282 if(rc != epTimedOut) changeState(mFinished);
01283 else errorState();
01284 return rc;
01285 }
01286
01287 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01288 changeState(mShutdownAsync);
01289 StatusCode rc = waitForAsyncCompletion(secs);
01290 if(rc != epTimedOut) changeState(mFinished);
01291 else errorState();
01292 return rc;
01293 }
01294
01295 void EventProcessor::errorState() {
01296 state_ = sError;
01297 }
01298
01299
01300 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01301
01302
01303
01304 changeState(m);
01305 return waitForAsyncCompletion(60*2);
01306 }
01307
01308 void EventProcessor::changeState(Msg msg) {
01309
01310
01311 boost::mutex::scoped_lock sl(state_lock_);
01312 State curr = state_;
01313 int rc;
01314
01315
01316 for(rc = 0;
01317 table[rc].current != sInvalid &&
01318 (curr != table[rc].current ||
01319 (curr == table[rc].current &&
01320 msg != table[rc].message && table[rc].message != mAny));
01321 ++rc);
01322
01323 if(table[rc].current == sInvalid)
01324 throw cms::Exception("BadState")
01325 << "A member function of EventProcessor has been called in an"
01326 << " inappropriate order.\n"
01327 << "Bad transition from " << stateName(curr) << " "
01328 << "using message " << msgName(msg) << "\n"
01329 << "No where to go from here.\n";
01330
01331 FDEBUG(1) << "changeState: current=" << stateName(curr)
01332 << ", message=" << msgName(msg)
01333 << " -> new=" << stateName(table[rc].final) << "\n";
01334
01335 state_ = table[rc].final;
01336 }
01337
01338 void EventProcessor::runAsync() {
01339 beginJob();
01340 {
01341 boost::mutex::scoped_lock sl(stop_lock_);
01342 if(id_set_ == true) {
01343 std::string err("runAsync called while async event loop already running\n");
01344 LogError("FwkJob") << err;
01345 throw cms::Exception("BadState") << err;
01346 }
01347
01348 changeState(mRunAsync);
01349
01350 stop_count_ = 0;
01351 last_rc_ = epSuccess;
01352 event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01353 boost::xtime timeout;
01354 boost::xtime_get(&timeout, boost::TIME_UTC);
01355 timeout.sec += 60;
01356 if(starter_.timed_wait(sl, timeout) == false) {
01357
01358 throw cms::Exception("BadState")
01359 << "Async run thread did not start in 60 seconds\n";
01360 }
01361 }
01362 }
01363
01364 void EventProcessor::asyncRun(EventProcessor* me) {
01365
01366
01367
01368
01369
01370
01371
01372
01373
01374 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01375
01376 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01377 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01378
01379 {
01380 boost::mutex::scoped_lock(me->stop_lock_);
01381 me->event_loop_id_ = pthread_self();
01382 me->id_set_ = true;
01383 me->starter_.notify_all();
01384 }
01385
01386 Status rc = epException;
01387 FDEBUG(2) << "asyncRun starting ......................\n";
01388
01389 try {
01390 bool onlineStateTransitions = true;
01391 rc = me->runToCompletion(onlineStateTransitions);
01392 }
01393 catch (cms::Exception& e) {
01394 LogError("FwkJob") << "cms::Exception caught in "
01395 << "EventProcessor::asyncRun"
01396 << "\n"
01397 << e.explainSelf();
01398 me->last_error_text_ = e.explainSelf();
01399 }
01400 catch (std::exception& e) {
01401 LogError("FwkJob") << "Standard library exception caught in "
01402 << "EventProcessor::asyncRun"
01403 << "\n"
01404 << e.what();
01405 me->last_error_text_ = e.what();
01406 }
01407 catch (...) {
01408 LogError("FwkJob") << "Unknown exception caught in "
01409 << "EventProcessor::asyncRun"
01410 << "\n";
01411 me->last_error_text_ = "Unknown exception caught";
01412 rc = epOther;
01413 }
01414
01415 me->last_rc_ = rc;
01416
01417 {
01418
01419 boost::mutex::scoped_lock sl(me->stop_lock_);
01420 ++me->stop_count_;
01421 me->stopper_.notify_all();
01422 }
01423 FDEBUG(2) << "asyncRun ending ......................\n";
01424 }
01425
01426
01427 EventProcessor::StatusCode
01428 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01429
01430 StateSentry toerror(this);
01431
01432 int numberOfEventsToProcess = -1;
01433 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01434
01435 if(machine_.get() != 0) {
01436 throw Exception(errors::LogicError)
01437 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01438 << "Please report this error to the Framework group\n";
01439 }
01440
01441 toerror.succeeded();
01442
01443 return returnCode;
01444 }
01445
01446 EventProcessor::StatusCode
01447 EventProcessor::runEventCount(int numberOfEventsToProcess) {
01448
01449 StateSentry toerror(this);
01450
01451 bool onlineStateTransitions = false;
01452 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01453
01454 toerror.succeeded();
01455
01456 return returnCode;
01457 }
01458
01459 EventProcessor::StatusCode
01460 EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01461
01462
01463 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
01464 principalCache_.insert(ep);
01465
01466 beginJob();
01467
01468 if(!onlineStateTransitions) changeState(mRunCount);
01469
01470 StatusCode returnCode = epSuccess;
01471 stateMachineWasInErrorState_ = false;
01472
01473
01474 ServiceRegistry::Operate operate(serviceToken_);
01475
01476 if(machine_.get() == 0) {
01477
01478 statemachine::FileMode fileMode;
01479 if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01480 else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01481 else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01482 else {
01483 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01484 << fileMode_ << ".\n"
01485 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01486 }
01487
01488 statemachine::EmptyRunLumiMode emptyRunLumiMode;
01489 if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01490 else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01491 else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01492 else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01493 else {
01494 throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01495 << emptyRunLumiMode_ << ".\n"
01496 << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01497 }
01498
01499 machine_.reset(new statemachine::Machine(this,
01500 fileMode,
01501 emptyRunLumiMode));
01502
01503 machine_->initiate();
01504 }
01505
01506 try {
01507 try {
01508
01509 InputSource::ItemType itemType;
01510
01511 int iEvents = 0;
01512
01513 while(true) {
01514
01515 itemType = input_->nextItemType();
01516
01517 FDEBUG(1) << "itemType = " << itemType << "\n";
01518
01519
01520
01521
01522
01523
01524
01525
01526 if(state_ == sStopping) {
01527 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01528 forceLooperToEnd_ = true;
01529 machine_->process_event(statemachine::Stop());
01530 forceLooperToEnd_ = false;
01531 break;
01532 }
01533 else if(state_ == sShuttingDown) {
01534 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01535 forceLooperToEnd_ = true;
01536 machine_->process_event(statemachine::Stop());
01537 forceLooperToEnd_ = false;
01538 break;
01539 }
01540
01541
01542 {
01543 boost::mutex::scoped_lock sl(usr2_lock);
01544 if(shutdown_flag) {
01545 changeState(mShutdownSignal);
01546 returnCode = epSignal;
01547 forceLooperToEnd_ = true;
01548 machine_->process_event(statemachine::Stop());
01549 forceLooperToEnd_ = false;
01550 break;
01551 }
01552 }
01553
01554 if(itemType == InputSource::IsStop) {
01555 machine_->process_event(statemachine::Stop());
01556 }
01557 else if(itemType == InputSource::IsFile) {
01558 machine_->process_event(statemachine::File());
01559 }
01560 else if(itemType == InputSource::IsRun) {
01561 machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run()));
01562 }
01563 else if(itemType == InputSource::IsLumi) {
01564 machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01565 }
01566 else if(itemType == InputSource::IsEvent) {
01567 machine_->process_event(statemachine::Event());
01568 ++iEvents;
01569 if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01570 returnCode = epCountComplete;
01571 changeState(mInputExhausted);
01572 FDEBUG(1) << "Event count complete, pausing event loop\n";
01573 break;
01574 }
01575 }
01576
01577 else {
01578 throw Exception(errors::LogicError)
01579 << "Unknown next item type passed to EventProcessor\n"
01580 << "Please report this error to the Framework group\n";
01581 }
01582
01583 if(machine_->terminated()) {
01584 changeState(mInputExhausted);
01585 break;
01586 }
01587 }
01588 }
01589 catch (cms::Exception& e) { throw; }
01590 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01591 catch (std::exception& e) { convertException::stdToEDM(e); }
01592 catch(std::string& s) { convertException::stringToEDM(s); }
01593 catch(char const* c) { convertException::charPtrToEDM(c); }
01594 catch (...) { convertException::unknownToEDM(); }
01595 }
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609
01610
01611
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622
01623
01624
01625
01626
01627
01628
01629
01630
01631
01632
01633
01634
01635
01636
01637
01638
01639
01640
01641 catch (cms::Exception & e) {
01642 alreadyHandlingException_ = true;
01643 terminateMachine();
01644 alreadyHandlingException_ = false;
01645 if (!exceptionMessageLumis_.empty()) {
01646 e.addAdditionalInfo(exceptionMessageLumis_);
01647 if (e.alreadyPrinted()) {
01648 LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01649 }
01650 }
01651 if (!exceptionMessageRuns_.empty()) {
01652 e.addAdditionalInfo(exceptionMessageRuns_);
01653 if (e.alreadyPrinted()) {
01654 LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01655 }
01656 }
01657 if (!exceptionMessageFiles_.empty()) {
01658 e.addAdditionalInfo(exceptionMessageFiles_);
01659 if (e.alreadyPrinted()) {
01660 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01661 }
01662 }
01663 throw;
01664 }
01665
01666 if(machine_->terminated()) {
01667 FDEBUG(1) << "The state machine reports it has been terminated\n";
01668 machine_.reset();
01669 }
01670
01671 if(!onlineStateTransitions) changeState(mFinished);
01672
01673 if(stateMachineWasInErrorState_) {
01674 throw cms::Exception("BadState")
01675 << "The boost state machine in the EventProcessor exited after\n"
01676 << "entering the Error state.\n";
01677 }
01678
01679 return returnCode;
01680 }
01681
01682 void EventProcessor::readFile() {
01683 FDEBUG(1) << " \treadFile\n";
01684 fb_ = input_->readFile();
01685 if(numberOfForkedChildren_ > 0) {
01686 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01687 }
01688 }
01689
01690 void EventProcessor::closeInputFile() {
01691 if (fb_.get() != 0) {
01692 input_->closeFile(fb_);
01693 }
01694 FDEBUG(1) << "\tcloseInputFile\n";
01695 }
01696
01697 void EventProcessor::openOutputFiles() {
01698 if (fb_.get() != 0) {
01699 schedule_->openOutputFiles(*fb_);
01700 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01701 }
01702 FDEBUG(1) << "\topenOutputFiles\n";
01703 }
01704
01705 void EventProcessor::closeOutputFiles() {
01706 if (fb_.get() != 0) {
01707 schedule_->closeOutputFiles();
01708 if(hasSubProcess()) subProcess_->closeOutputFiles();
01709 }
01710 FDEBUG(1) << "\tcloseOutputFiles\n";
01711 }
01712
01713 void EventProcessor::respondToOpenInputFile() {
01714 if (fb_.get() != 0) {
01715 schedule_->respondToOpenInputFile(*fb_);
01716 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01717 }
01718 FDEBUG(1) << "\trespondToOpenInputFile\n";
01719 }
01720
01721 void EventProcessor::respondToCloseInputFile() {
01722 if (fb_.get() != 0) {
01723 schedule_->respondToCloseInputFile(*fb_);
01724 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01725 }
01726 FDEBUG(1) << "\trespondToCloseInputFile\n";
01727 }
01728
01729 void EventProcessor::respondToOpenOutputFiles() {
01730 if (fb_.get() != 0) {
01731 schedule_->respondToOpenOutputFiles(*fb_);
01732 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01733 }
01734 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01735 }
01736
01737 void EventProcessor::respondToCloseOutputFiles() {
01738 if (fb_.get() != 0) {
01739 schedule_->respondToCloseOutputFiles(*fb_);
01740 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01741 }
01742 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01743 }
01744
01745 void EventProcessor::startingNewLoop() {
01746 shouldWeStop_ = false;
01747
01748
01749 if(looper_ && looperBeginJobRun_) {
01750 looper_->doStartingNewLoop();
01751 }
01752 FDEBUG(1) << "\tstartingNewLoop\n";
01753 }
01754
01755 bool EventProcessor::endOfLoop() {
01756 if(looper_) {
01757 ModuleChanger changer(schedule_.get());
01758 looper_->setModuleChanger(&changer);
01759 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01760 looper_->setModuleChanger(0);
01761 if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01762 else return false;
01763 }
01764 FDEBUG(1) << "\tendOfLoop\n";
01765 return true;
01766 }
01767
01768 void EventProcessor::rewindInput() {
01769 input_->repeat();
01770 input_->rewind();
01771 FDEBUG(1) << "\trewind\n";
01772 }
01773
01774 void EventProcessor::prepareForNextLoop() {
01775 looper_->prepareForNextLoop(esp_.get());
01776 FDEBUG(1) << "\tprepareForNextLoop\n";
01777 }
01778
01779 bool EventProcessor::shouldWeCloseOutput() const {
01780 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01781 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01782 }
01783
01784 void EventProcessor::doErrorStuff() {
01785 FDEBUG(1) << "\tdoErrorStuff\n";
01786 LogError("StateMachine")
01787 << "The EventProcessor state machine encountered an unexpected event\n"
01788 << "and went to the error state\n"
01789 << "Will attempt to terminate processing normally\n"
01790 << "(IF using the looper the next loop will be attempted)\n"
01791 << "This likely indicates a bug in an input module or corrupted input or both\n";
01792 stateMachineWasInErrorState_ = true;
01793 }
01794
01795 void EventProcessor::beginRun(statemachine::Run const& run) {
01796 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01797 input_->doBeginRun(runPrincipal);
01798 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01799 runPrincipal.beginTime());
01800 if(forceESCacheClearOnNewRun_){
01801 esp_->forceCacheClear();
01802 }
01803 EventSetup const& es = esp_->eventSetupForInstance(ts);
01804 if(looper_ && looperBeginJobRun_== false) {
01805 looper_->copyInfo(ScheduleInfo(schedule_.get()));
01806 looper_->beginOfJob(es);
01807 looperBeginJobRun_ = true;
01808 looper_->doStartingNewLoop();
01809 }
01810 {
01811 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01812 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01813 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01814 if(hasSubProcess()) {
01815 subProcess_->doBeginRun(runPrincipal, ts);
01816 }
01817 }
01818 FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01819 if(looper_) {
01820 looper_->doBeginRun(runPrincipal, es);
01821 }
01822 }
01823
01824 void EventProcessor::endRun(statemachine::Run const& run) {
01825 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01826 input_->doEndRun(runPrincipal);
01827 IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01828 runPrincipal.endTime());
01829 EventSetup const& es = esp_->eventSetupForInstance(ts);
01830 {
01831 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
01832 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01833 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01834 if(hasSubProcess()) {
01835 subProcess_->doEndRun(runPrincipal, ts);
01836 }
01837 }
01838 FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
01839 if(looper_) {
01840 looper_->doEndRun(runPrincipal, es);
01841 }
01842 }
01843
01844 void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
01845 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01846 input_->doBeginLumi(lumiPrincipal);
01847
01848 Service<RandomNumberGenerator> rng;
01849 if(rng.isAvailable()) {
01850 LuminosityBlock lb(lumiPrincipal, ModuleDescription());
01851 rng->preBeginLumi(lb);
01852 }
01853
01854
01855
01856 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
01857 EventSetup const& es = esp_->eventSetupForInstance(ts);
01858 {
01859 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
01860 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01861 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01862 if(hasSubProcess()) {
01863 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
01864 }
01865 }
01866 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
01867 if(looper_) {
01868 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
01869 }
01870 }
01871
01872 void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
01873 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
01874 input_->doEndLumi(lumiPrincipal);
01875
01876
01877 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
01878 lumiPrincipal.endTime());
01879 EventSetup const& es = esp_->eventSetupForInstance(ts);
01880 {
01881 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
01882 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
01883 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
01884 if(hasSubProcess()) {
01885 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts);
01886 }
01887 }
01888 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
01889 if(looper_) {
01890 looper_->doEndLuminosityBlock(lumiPrincipal, es);
01891 }
01892 }
01893
01894 statemachine::Run EventProcessor::readAndCacheRun() {
01895 input_->readAndCacheRun();
01896 input_->markRun();
01897 return statemachine::Run(input_->processHistoryID(), input_->run());
01898 }
01899
01900 int EventProcessor::readAndCacheLumi() {
01901 input_->readAndCacheLumi();
01902 input_->markLumi();
01903 return input_->luminosityBlock();
01904 }
01905
01906 void EventProcessor::writeRun(statemachine::Run const& run) {
01907 schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
01908 if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
01909 FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
01910 }
01911
01912 void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
01913 principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
01914 FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
01915 }
01916
01917 void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
01918 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
01919 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
01920 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
01921 }
01922
01923 void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
01924 principalCache_.deleteLumi(phid, run, lumi);
01925 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
01926 }
01927
01928 void EventProcessor::readAndProcessEvent() {
01929 EventPrincipal *pep = 0;
01930 try {
01931 try {
01932 pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
01933 FDEBUG(1) << "\treadEvent\n";
01934 }
01935 catch (cms::Exception& e) { throw; }
01936 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01937 catch (std::exception& e) { convertException::stdToEDM(e); }
01938 catch(std::string& s) { convertException::stringToEDM(s); }
01939 catch(char const* c) { convertException::charPtrToEDM(c); }
01940 catch (...) { convertException::unknownToEDM(); }
01941 }
01942 catch(cms::Exception& ex) {
01943 ex.addContext("Calling readEvent in the input source");
01944 throw;
01945 }
01946 assert(pep != 0);
01947
01948 IOVSyncValue ts(pep->id(), pep->time());
01949 EventSetup const& es = esp_->eventSetupForInstance(ts);
01950 {
01951 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
01952 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
01953 schedule_->processOneOccurrence<Traits>(*pep, es);
01954 if(hasSubProcess()) {
01955 subProcess_->doEvent(*pep, ts);
01956 }
01957 }
01958
01959 if(looper_) {
01960 bool randomAccess = input_->randomAccess();
01961 ProcessingController::ForwardState forwardState = input_->forwardState();
01962 ProcessingController::ReverseState reverseState = input_->reverseState();
01963 ProcessingController pc(forwardState, reverseState, randomAccess);
01964
01965 EDLooperBase::Status status = EDLooperBase::kContinue;
01966 do {
01967 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
01968
01969 bool succeeded = true;
01970 if(randomAccess) {
01971 if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
01972 input_->skipEvents(-2);
01973 }
01974 else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
01975 succeeded = input_->goToEvent(pc.specifiedEventTransition());
01976 }
01977 }
01978 pc.setLastOperationSucceeded(succeeded);
01979 } while(!pc.lastOperationSucceeded());
01980 if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
01981
01982 }
01983
01984 FDEBUG(1) << "\tprocessEvent\n";
01985 pep->clearEventPrincipal();
01986 }
01987
01988 bool EventProcessor::shouldWeStop() const {
01989 FDEBUG(1) << "\tshouldWeStop\n";
01990 if(shouldWeStop_) return true;
01991 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
01992 }
01993
01994 void EventProcessor::setExceptionMessageFiles(std::string& message) {
01995 exceptionMessageFiles_ = message;
01996 }
01997
01998 void EventProcessor::setExceptionMessageRuns(std::string& message) {
01999 exceptionMessageRuns_ = message;
02000 }
02001
02002 void EventProcessor::setExceptionMessageLumis(std::string& message) {
02003 exceptionMessageLumis_ = message;
02004 }
02005
02006 bool EventProcessor::alreadyHandlingException() const {
02007 return alreadyHandlingException_;
02008 }
02009
02010 void EventProcessor::terminateMachine() {
02011 if(machine_.get() != 0) {
02012 if(!machine_->terminated()) {
02013 forceLooperToEnd_ = true;
02014 machine_->process_event(statemachine::Stop());
02015 forceLooperToEnd_ = false;
02016 }
02017 else {
02018 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
02019 }
02020 if(machine_->terminated()) {
02021 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02022 }
02023 machine_.reset();
02024 }
02025 }
02026 }