00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072
00073
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080
00081
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085
00086
00087 #include "Cintex/Cintex.h"
00088
00089 namespace edm {
00090
00091 namespace event_processor {
00092
00093 class StateSentry {
00094 public:
00095 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00096 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00097 void succeeded() {success_ = true;}
00098
00099 private:
00100 EventProcessor* ep_;
00101 bool success_;
00102 };
00103 }
00104
00105 using namespace event_processor;
00106
00107 namespace {
00108 template <typename T>
00109 class ScheduleSignalSentry {
00110 public:
00111 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00112 a_(a), principal_(principal), es_(es), allowThrow_(false) {
00113 if (a_) T::preScheduleSignal(a_, principal_);
00114 }
00115 ~ScheduleSignalSentry() noexcept(false) {
00116 try {
00117 if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00118 } catch(...) {
00119 if( allowThrow_) {throw;}
00120 }
00121 }
00122
00123 void allowThrow() {
00124 allowThrow_ = true;
00125 }
00126 private:
00127
00128 ActivityRegistry* a_;
00129 typename T::MyPrincipal* principal_;
00130 EventSetup const* es_;
00131 bool allowThrow_ =true;
00132 };
00133 }
00134
00135 namespace {
00136
00137
00138
00139
00140 char const* stateNames[] = {
00141 "Init",
00142 "JobReady",
00143 "RunGiven",
00144 "Running",
00145 "Stopping",
00146 "ShuttingDown",
00147 "Done",
00148 "JobEnded",
00149 "Error",
00150 "ErrorEnded",
00151 "End",
00152 "Invalid"
00153 };
00154
00155 char const* msgNames[] = {
00156 "SetRun",
00157 "Skip",
00158 "RunAsync",
00159 "Run(ID)",
00160 "Run(count)",
00161 "BeginJob",
00162 "StopAsync",
00163 "ShutdownAsync",
00164 "EndJob",
00165 "CountComplete",
00166 "InputExhausted",
00167 "StopSignal",
00168 "ShutdownSignal",
00169 "Finished",
00170 "Any",
00171 "dtor",
00172 "Exception",
00173 "Rewind"
00174 };
00175 }
00176
00177
00178
00179
00180
00181 struct TransEntry {
00182 State current;
00183 Msg message;
00184 State final;
00185 };
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208 TransEntry table[] = {
00209
00210
00211 { sInit, mException, sError },
00212 { sInit, mBeginJob, sJobReady },
00213 { sJobReady, mException, sError },
00214 { sJobReady, mSetRun, sRunGiven },
00215 { sJobReady, mInputRewind, sRunning },
00216 { sJobReady, mSkip, sRunning },
00217 { sJobReady, mRunID, sRunning },
00218 { sJobReady, mRunCount, sRunning },
00219 { sJobReady, mEndJob, sJobEnded },
00220 { sJobReady, mBeginJob, sJobReady },
00221 { sJobReady, mDtor, sEnd },
00222
00223 { sJobReady, mStopAsync, sJobReady },
00224 { sJobReady, mCountComplete, sJobReady },
00225 { sJobReady, mFinished, sJobReady },
00226
00227 { sRunGiven, mException, sError },
00228 { sRunGiven, mRunAsync, sRunning },
00229 { sRunGiven, mBeginJob, sRunGiven },
00230 { sRunGiven, mShutdownAsync, sShuttingDown },
00231 { sRunGiven, mStopAsync, sStopping },
00232 { sRunning, mException, sError },
00233 { sRunning, mStopAsync, sStopping },
00234 { sRunning, mShutdownAsync, sShuttingDown },
00235 { sRunning, mShutdownSignal, sShuttingDown },
00236 { sRunning, mCountComplete, sStopping },
00237 { sRunning, mInputExhausted, sStopping },
00238
00239 { sStopping, mInputRewind, sRunning },
00240 { sStopping, mException, sError },
00241 { sStopping, mFinished, sJobReady },
00242 { sStopping, mCountComplete, sJobReady },
00243 { sStopping, mShutdownSignal, sShuttingDown },
00244 { sStopping, mStopAsync, sStopping },
00245 { sStopping, mInputExhausted, sStopping },
00246
00247 { sShuttingDown, mException, sError },
00248 { sShuttingDown, mShutdownSignal, sShuttingDown },
00249 { sShuttingDown, mCountComplete, sDone },
00250 { sShuttingDown, mInputExhausted, sDone },
00251 { sShuttingDown, mFinished, sDone },
00252
00253
00254
00255 { sDone, mEndJob, sJobEnded },
00256 { sDone, mException, sError },
00257 { sJobEnded, mDtor, sEnd },
00258 { sJobEnded, mException, sError },
00259 { sError, mEndJob, sError },
00260 { sError, mDtor, sError },
00261 { sInit, mDtor, sEnd },
00262 { sStopping, mShutdownAsync, sShuttingDown },
00263 { sInvalid, mAny, sInvalid }
00264 };
00265
00266
00267
00268
00269
00270
00271 std::unique_ptr<InputSource>
00272 makeInput(ParameterSet& params,
00273 CommonParams const& common,
00274 ProductRegistry& preg,
00275 boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
00276 boost::shared_ptr<ActivityRegistry> areg,
00277 boost::shared_ptr<ProcessConfiguration const> processConfiguration) {
00278 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00279 if(main_input == 0) {
00280 throw Exception(errors::Configuration)
00281 << "There must be exactly one source in the configuration.\n"
00282 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00283 }
00284
00285 std::string modtype(main_input->getParameter<std::string>("@module_type"));
00286
00287 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00288 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00289 ConfigurationDescriptions descriptions(filler->baseType());
00290 filler->fill(descriptions);
00291
00292 try {
00293 try {
00294 descriptions.validate(*main_input, std::string("source"));
00295 }
00296 catch (cms::Exception& e) { throw; }
00297 catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00298 catch (std::exception& e) { convertException::stdToEDM(e); }
00299 catch (std::string& s) { convertException::stringToEDM(s); }
00300 catch (char const* c) { convertException::charPtrToEDM(c); }
00301 catch (...) { convertException::unknownToEDM(); }
00302 }
00303 catch (cms::Exception & iException) {
00304 std::ostringstream ost;
00305 ost << "Validating configuration of input source of type " << modtype;
00306 iException.addContext(ost.str());
00307 throw;
00308 }
00309
00310 main_input->registerIt();
00311
00312
00313
00314
00315
00316
00317
00318 ModuleDescription md(main_input->id(),
00319 main_input->getParameter<std::string>("@module_type"),
00320 "source",
00321 processConfiguration.get());
00322
00323 InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_);
00324 areg->preSourceConstructionSignal_(md);
00325 std::unique_ptr<InputSource> input;
00326 try {
00327 try {
00328 input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00329 }
00330 catch (cms::Exception& e) { throw; }
00331 catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00332 catch (std::exception& e) { convertException::stdToEDM(e); }
00333 catch (std::string& s) { convertException::stringToEDM(s); }
00334 catch (char const* c) { convertException::charPtrToEDM(c); }
00335 catch (...) { convertException::unknownToEDM(); }
00336 }
00337 catch (cms::Exception& iException) {
00338 areg->postSourceConstructionSignal_(md);
00339 std::ostringstream ost;
00340 ost << "Constructing input source of type " << modtype;
00341 iException.addContext(ost.str());
00342 throw;
00343 }
00344 areg->postSourceConstructionSignal_(md);
00345 return input;
00346 }
00347
00348
00349 boost::shared_ptr<EDLooperBase>
00350 fillLooper(eventsetup::EventSetupsController& esController,
00351 eventsetup::EventSetupProvider& cp,
00352 ParameterSet& params) {
00353 boost::shared_ptr<EDLooperBase> vLooper;
00354
00355 std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00356
00357 if(loopers.size() == 0) {
00358 return vLooper;
00359 }
00360
00361 assert(1 == loopers.size());
00362
00363 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00364 itName != itNameEnd;
00365 ++itName) {
00366
00367 ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00368 providerPSet->registerIt();
00369 vLooper = eventsetup::LooperFactory::get()->addTo(esController,
00370 cp,
00371 *providerPSet);
00372 }
00373 return vLooper;
00374
00375 }
00376
00377
00378 EventProcessor::EventProcessor(std::string const& config,
00379 ServiceToken const& iToken,
00380 serviceregistry::ServiceLegacy iLegacy,
00381 std::vector<std::string> const& defaultServices,
00382 std::vector<std::string> const& forcedServices) :
00383 preProcessEventSignal_(),
00384 postProcessEventSignal_(),
00385 actReg_(),
00386 preg_(),
00387 branchIDListHelper_(),
00388 serviceToken_(),
00389 input_(),
00390 espController_(new eventsetup::EventSetupsController),
00391 esp_(),
00392 act_table_(),
00393 processConfiguration_(),
00394 schedule_(),
00395 subProcess_(),
00396 historyAppender_(new HistoryAppender),
00397 state_(sInit),
00398 event_loop_(),
00399 state_lock_(),
00400 stop_lock_(),
00401 stopper_(),
00402 starter_(),
00403 stop_count_(-1),
00404 last_rc_(epSuccess),
00405 last_error_text_(),
00406 id_set_(false),
00407 event_loop_id_(),
00408 my_sig_num_(getSigNum()),
00409 fb_(),
00410 looper_(),
00411 principalCache_(),
00412 shouldWeStop_(false),
00413 stateMachineWasInErrorState_(false),
00414 fileMode_(),
00415 emptyRunLumiMode_(),
00416 exceptionMessageFiles_(),
00417 exceptionMessageRuns_(),
00418 exceptionMessageLumis_(),
00419 alreadyHandlingException_(false),
00420 forceLooperToEnd_(false),
00421 looperBeginJobRun_(false),
00422 forceESCacheClearOnNewRun_(false),
00423 numberOfForkedChildren_(0),
00424 numberOfSequentialEventsPerChild_(1),
00425 setCpuAffinity_(false),
00426 eventSetupDataToExcludeFromPrefetching_() {
00427 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00428 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00429 processDesc->addServices(defaultServices, forcedServices);
00430 init(processDesc, iToken, iLegacy);
00431 }
00432
00433 EventProcessor::EventProcessor(std::string const& config,
00434 std::vector<std::string> const& defaultServices,
00435 std::vector<std::string> const& forcedServices) :
00436 preProcessEventSignal_(),
00437 postProcessEventSignal_(),
00438 actReg_(),
00439 preg_(),
00440 branchIDListHelper_(),
00441 serviceToken_(),
00442 input_(),
00443 espController_(new eventsetup::EventSetupsController),
00444 esp_(),
00445 act_table_(),
00446 processConfiguration_(),
00447 schedule_(),
00448 subProcess_(),
00449 historyAppender_(new HistoryAppender),
00450 state_(sInit),
00451 event_loop_(),
00452 state_lock_(),
00453 stop_lock_(),
00454 stopper_(),
00455 starter_(),
00456 stop_count_(-1),
00457 last_rc_(epSuccess),
00458 last_error_text_(),
00459 id_set_(false),
00460 event_loop_id_(),
00461 my_sig_num_(getSigNum()),
00462 fb_(),
00463 looper_(),
00464 principalCache_(),
00465 shouldWeStop_(false),
00466 stateMachineWasInErrorState_(false),
00467 fileMode_(),
00468 emptyRunLumiMode_(),
00469 exceptionMessageFiles_(),
00470 exceptionMessageRuns_(),
00471 exceptionMessageLumis_(),
00472 alreadyHandlingException_(false),
00473 forceLooperToEnd_(false),
00474 looperBeginJobRun_(false),
00475 forceESCacheClearOnNewRun_(false),
00476 numberOfForkedChildren_(0),
00477 numberOfSequentialEventsPerChild_(1),
00478 setCpuAffinity_(false),
00479 eventSetupDataToExcludeFromPrefetching_() {
00480 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00481 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00482 processDesc->addServices(defaultServices, forcedServices);
00483 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00484 }
00485
00486 EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00487 ServiceToken const& token,
00488 serviceregistry::ServiceLegacy legacy) :
00489 preProcessEventSignal_(),
00490 postProcessEventSignal_(),
00491 actReg_(),
00492 preg_(),
00493 branchIDListHelper_(),
00494 serviceToken_(),
00495 input_(),
00496 espController_(new eventsetup::EventSetupsController),
00497 esp_(),
00498 act_table_(),
00499 processConfiguration_(),
00500 schedule_(),
00501 subProcess_(),
00502 historyAppender_(new HistoryAppender),
00503 state_(sInit),
00504 event_loop_(),
00505 state_lock_(),
00506 stop_lock_(),
00507 stopper_(),
00508 starter_(),
00509 stop_count_(-1),
00510 last_rc_(epSuccess),
00511 last_error_text_(),
00512 id_set_(false),
00513 event_loop_id_(),
00514 my_sig_num_(getSigNum()),
00515 fb_(),
00516 looper_(),
00517 principalCache_(),
00518 shouldWeStop_(false),
00519 stateMachineWasInErrorState_(false),
00520 fileMode_(),
00521 emptyRunLumiMode_(),
00522 exceptionMessageFiles_(),
00523 exceptionMessageRuns_(),
00524 exceptionMessageLumis_(),
00525 alreadyHandlingException_(false),
00526 forceLooperToEnd_(false),
00527 looperBeginJobRun_(false),
00528 forceESCacheClearOnNewRun_(false),
00529 numberOfForkedChildren_(0),
00530 numberOfSequentialEventsPerChild_(1),
00531 setCpuAffinity_(false),
00532 eventSetupDataToExcludeFromPrefetching_() {
00533 init(processDesc, token, legacy);
00534 }
00535
00536
00537 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00538 preProcessEventSignal_(),
00539 postProcessEventSignal_(),
00540 actReg_(),
00541 preg_(),
00542 branchIDListHelper_(),
00543 serviceToken_(),
00544 input_(),
00545 espController_(new eventsetup::EventSetupsController),
00546 esp_(),
00547 act_table_(),
00548 processConfiguration_(),
00549 schedule_(),
00550 subProcess_(),
00551 historyAppender_(new HistoryAppender),
00552 state_(sInit),
00553 event_loop_(),
00554 state_lock_(),
00555 stop_lock_(),
00556 stopper_(),
00557 starter_(),
00558 stop_count_(-1),
00559 last_rc_(epSuccess),
00560 last_error_text_(),
00561 id_set_(false),
00562 event_loop_id_(),
00563 my_sig_num_(getSigNum()),
00564 fb_(),
00565 looper_(),
00566 principalCache_(),
00567 shouldWeStop_(false),
00568 stateMachineWasInErrorState_(false),
00569 fileMode_(),
00570 emptyRunLumiMode_(),
00571 exceptionMessageFiles_(),
00572 exceptionMessageRuns_(),
00573 exceptionMessageLumis_(),
00574 alreadyHandlingException_(false),
00575 forceLooperToEnd_(false),
00576 looperBeginJobRun_(false),
00577 forceESCacheClearOnNewRun_(false),
00578 numberOfForkedChildren_(0),
00579 numberOfSequentialEventsPerChild_(1),
00580 setCpuAffinity_(false),
00581 eventSetupDataToExcludeFromPrefetching_() {
00582 if(isPython) {
00583 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00584 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00585 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00586 }
00587 else {
00588 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00589 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00590 }
00591 }
00592
00593 void
00594 EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00595 ServiceToken const& iToken,
00596 serviceregistry::ServiceLegacy iLegacy) {
00597
00598
00599
00600 ROOT::Cintex::Cintex::Enable();
00601
00602 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00603
00604
00605
00606 boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00607
00608
00609 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00610 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00611 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00612 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00613 ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00614 numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00615 numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00616 setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00617 continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
00618 std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00619 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00620 itPS != itPSEnd;
00621 ++itPS) {
00622 eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00623 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00624 itPS->getUntrackedParameter<std::string>("label", "")));
00625 }
00626 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00627
00628
00629 ScheduleItems items;
00630
00631
00632 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00633 ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00634 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00635
00636
00637 ServiceRegistry::Operate operate(serviceToken_);
00638
00639
00640 boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00641
00642
00643 esp_ = espController_->makeProvider(*parameterSet);
00644
00645
00646 looper_ = fillLooper(*espController_, *esp_, *parameterSet);
00647 if(looper_) {
00648 looper_->setActionTable(items.act_table_.get());
00649 looper_->attachTo(*items.actReg_);
00650 }
00651
00652
00653 input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
00654
00655
00656 schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00657
00658
00659 act_table_ = std::move(items.act_table_);
00660 actReg_ = items.actReg_;
00661 preg_.reset(items.preg_.release());
00662 branchIDListHelper_ = items.branchIDListHelper_;
00663 processConfiguration_ = items.processConfiguration_;
00664
00665 FDEBUG(2) << parameterSet << std::endl;
00666 connectSigs(this);
00667
00668
00669 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
00670 principalCache_.insert(ep);
00671
00672
00673 if(subProcessParameterSet) {
00674 subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
00675 }
00676 }
00677
00678 EventProcessor::~EventProcessor() {
00679
00680 ServiceToken token = getToken();
00681 ServiceRegistry::Operate op(token);
00682 try {
00683 changeState(mDtor);
00684 }
00685 catch(cms::Exception& e) {
00686 LogError("System")
00687 << e.explainSelf() << "\n";
00688 }
00689
00690
00691 espController_.reset();
00692 subProcess_.reset();
00693 esp_.reset();
00694 schedule_.reset();
00695 input_.reset();
00696 looper_.reset();
00697 actReg_.reset();
00698
00699 pset::Registry* psetRegistry = pset::Registry::instance();
00700 psetRegistry->data().clear();
00701 psetRegistry->extra().setID(ParameterSetID());
00702
00703 EntryDescriptionRegistry::instance()->data().clear();
00704 ParentageRegistry::instance()->data().clear();
00705 ProcessConfigurationRegistry::instance()->data().clear();
00706 ProcessHistoryRegistry::instance()->data().clear();
00707 }
00708
00709 void
00710 EventProcessor::beginJob() {
00711 if(state_ != sInit) return;
00712 bk::beginJob();
00713
00714 changeState(mBeginJob);
00715
00716
00717
00718 ServiceRegistry::Operate operate(serviceToken_);
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734 try {
00735 try {
00736 input_->doBeginJob();
00737 }
00738 catch (cms::Exception& e) { throw; }
00739 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00740 catch (std::exception& e) { convertException::stdToEDM(e); }
00741 catch(std::string& s) { convertException::stringToEDM(s); }
00742 catch(char const* c) { convertException::charPtrToEDM(c); }
00743 catch (...) { convertException::unknownToEDM(); }
00744 }
00745 catch(cms::Exception& ex) {
00746 ex.addContext("Calling beginJob for the source");
00747 throw;
00748 }
00749 schedule_->beginJob(*preg_);
00750
00751 if(hasSubProcess()) subProcess_->doBeginJob();
00752 actReg_->postBeginJobSignal_();
00753 }
00754
00755 void
00756 EventProcessor::endJob() {
00757
00758 ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00759
00760
00761 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00762
00763
00764 ServiceRegistry::Operate operate(serviceToken_);
00765
00766 schedule_->endJob(c);
00767 if(hasSubProcess()) {
00768 c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00769 }
00770 c.call(boost::bind(&InputSource::doEndJob, input_.get()));
00771 if(looper_) {
00772 c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00773 }
00774 auto actReg = actReg_.get();
00775 c.call([actReg](){actReg->postEndJobSignal_();});
00776 if(c.hasThrown()) {
00777 c.rethrow();
00778 }
00779 }
00780
00781 ServiceToken
00782 EventProcessor::getToken() {
00783 return serviceToken_;
00784 }
00785
00786
00787 namespace {
00788
00789
00790 volatile bool child_failed = false;
00791 volatile unsigned int num_children_done = 0;
00792 volatile int child_fail_exit_status = 0;
00793 volatile int child_fail_signal = 0;
00794
00795
00796
00797
00798 extern "C" {
00799 void ep_sigchld(int, siginfo_t*, void*) {
00800
00801
00802 int stat_loc;
00803 pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00804 while(0<p) {
00805
00806 if(WIFEXITED(stat_loc)) {
00807 ++num_children_done;
00808 if(0 != WEXITSTATUS(stat_loc)) {
00809 child_fail_exit_status = WEXITSTATUS(stat_loc);
00810 child_failed = true;
00811 }
00812 }
00813 if(WIFSIGNALED(stat_loc)) {
00814 ++num_children_done;
00815 child_fail_signal = WTERMSIG(stat_loc);
00816 child_failed = true;
00817 }
00818 p = waitpid(-1, &stat_loc, WNOHANG);
00819 }
00820 }
00821 }
00822
00823 }
00824
00825 enum {
00826 kChildSucceed,
00827 kChildExitBadly,
00828 kChildSegv,
00829 kMaxChildAction
00830 };
00831
00832 namespace {
00833 unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00834 unsigned int n = 0;
00835 while(numberOfChildren != 0) {
00836 ++n;
00837 numberOfChildren /= 10;
00838 }
00839 if(n == 0) {
00840 n = 3;
00841 }
00842 return n;
00843 }
00844
00845
00846
00847 class MessageSenderToSource {
00848 public:
00849 MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00850 void operator()();
00851
00852 private:
00853 const std::vector<int>& m_childrenPipes;
00854 long const m_nEventsToProcess;
00855 fd_set m_socketSet;
00856 unsigned int m_aliveChildren;
00857 int m_maxFd;
00858 };
00859
00860 MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00861 std::vector<int> const& childrenPipes,
00862 long iNEventsToProcess):
00863 m_childrenPipes(childrenPipes),
00864 m_nEventsToProcess(iNEventsToProcess),
00865 m_aliveChildren(childrenSockets.size()),
00866 m_maxFd(0)
00867 {
00868 FD_ZERO(&m_socketSet);
00869 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00870 it != itEnd; it++) {
00871 FD_SET(*it, &m_socketSet);
00872 if (*it > m_maxFd) {
00873 m_maxFd = *it;
00874 }
00875 }
00876 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00877 it != itEnd; ++it) {
00878 FD_SET(*it, &m_socketSet);
00879 if (*it > m_maxFd) {
00880 m_maxFd = *it;
00881 }
00882 }
00883 m_maxFd++;
00884 }
00885
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902 void
00903 MessageSenderToSource::operator()() {
00904 multicore::MessageForParent childMsg;
00905 LogInfo("ForkingController") << "I am controller";
00906
00907
00908 multicore::MessageForSource sndmsg;
00909 sndmsg.startIndex = 0;
00910 sndmsg.nIndices = m_nEventsToProcess;
00911 do {
00912
00913 fd_set readSockets, errorSockets;
00914
00915 memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00916 memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00917
00918 ssize_t rc;
00919 while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00920 if (rc < 0) {
00921 std::cerr << "select failed; should be impossible due to preconditions.\n";
00922 abort();
00923 break;
00924 }
00925
00926
00927 for (int idx=0; idx<m_maxFd; idx++) {
00928
00929
00930 if (FD_ISSET(idx, &errorSockets)) {
00931 LogInfo("ForkingController") << "Error on socket " << idx;
00932 FD_CLR(idx, &m_socketSet);
00933 close(idx);
00934
00935 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00936 if (*it == idx) {
00937 m_aliveChildren--;
00938 }
00939 }
00940 continue;
00941 }
00942
00943 if (!FD_ISSET(idx, &readSockets)) {
00944 continue;
00945 }
00946
00947
00948
00949 bool is_pipe = false;
00950 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00951 if (*it == idx) {
00952 is_pipe = true;
00953 char buf;
00954 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00955 if (rc <= 0) {
00956 m_aliveChildren--;
00957 FD_CLR(idx, &m_socketSet);
00958 close(idx);
00959 }
00960 }
00961 }
00962
00963
00964 if (!is_pipe) {
00965 while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
00966 if (rc < 0) {
00967 FD_CLR(idx, &m_socketSet);
00968 close(idx);
00969 continue;
00970 }
00971
00972
00973
00974
00975
00976 while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
00977 if (rc < 0) {
00978 FD_CLR(idx, &m_socketSet);
00979 close(idx);
00980 continue;
00981 }
00982
00983 sndmsg.startIndex += sndmsg.nIndices;
00984 }
00985 }
00986
00987 } while (m_aliveChildren > 0);
00988
00989 return;
00990 }
00991
00992 }
00993
00994
00995 void EventProcessor::possiblyContinueAfterForkChildFailure() {
00996 if(child_failed && continueAfterChildFailure_) {
00997 if (child_fail_signal) {
00998 LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
00999 child_fail_signal=0;
01000 } else if (child_fail_exit_status) {
01001 LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01002 child_fail_exit_status=0;
01003 } else {
01004 LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01005 }
01006 child_failed =false;
01007 }
01008 }
01009
01010 bool
01011 EventProcessor::forkProcess(std::string const& jobReportFile) {
01012
01013 if(0 == numberOfForkedChildren_) {return true;}
01014 assert(0<numberOfForkedChildren_);
01015
01016 {
01017 beginJob();
01018
01019 ServiceRegistry::Operate operate(serviceToken_);
01020
01021 InputSource::ItemType itemType;
01022 itemType = input_->nextItemType();
01023
01024 assert(itemType == InputSource::IsFile);
01025 {
01026 readFile();
01027 }
01028 itemType = input_->nextItemType();
01029 assert(itemType == InputSource::IsRun);
01030
01031 LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01032 IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01033 input_->runAuxiliary()->beginTime());
01034 espController_->eventSetupForInstance(ts);
01035 EventSetup const& es = esp_->eventSetup();
01036
01037
01038 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01039 es.fillAvailableRecordKeys(recordKeys);
01040 std::vector<eventsetup::DataKey> dataKeys;
01041 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01042 itKey != itEnd;
01043 ++itKey) {
01044 eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01045
01046 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01047 ExcludedData const* excludedData(nullptr);
01048 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01049 excludedData = &(itExcludeRec->second);
01050 if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01051
01052 continue;
01053 }
01054 }
01055 if(0 != recordPtr) {
01056 dataKeys.clear();
01057 recordPtr->fillRegisteredDataKeys(dataKeys);
01058 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01059 itDataKey != itDataKeyEnd;
01060 ++itDataKey) {
01061
01062 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01063 LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01064 continue;
01065 }
01066 try {
01067 recordPtr->doGet(*itDataKey);
01068 } catch(cms::Exception& e) {
01069 LogWarning("ForkingEventSetupPreFetching") << e.what();
01070 }
01071 }
01072 }
01073 }
01074 }
01075 LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
01076 {
01077
01078 ServiceRegistry::Operate operate(serviceToken_);
01079 Service<JobReport> jobReport;
01080 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01081
01082
01083 actReg_->preForkReleaseResourcesSignal_();
01084 input_->doPreForkReleaseResources();
01085 schedule_->preForkReleaseResources();
01086 }
01087 installCustomHandler(SIGCHLD, ep_sigchld);
01088
01089
01090 unsigned int childIndex = 0;
01091 unsigned int const kMaxChildren = numberOfForkedChildren_;
01092 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01093 std::vector<pid_t> childrenIds;
01094 childrenIds.reserve(kMaxChildren);
01095 std::vector<int> childrenSockets;
01096 childrenSockets.reserve(kMaxChildren);
01097 std::vector<int> childrenPipes;
01098 childrenPipes.reserve(kMaxChildren);
01099 std::vector<int> childrenSocketsCopy;
01100 childrenSocketsCopy.reserve(kMaxChildren);
01101 std::vector<int> childrenPipesCopy;
01102 childrenPipesCopy.reserve(kMaxChildren);
01103 int pipes[] {0, 0};
01104
01105 {
01106
01107 ServiceRegistry::Operate operate(serviceToken_);
01108 Service<JobReport> jobReport;
01109 int sockets[2], fd_flags;
01110 for(; childIndex < kMaxChildren; ++childIndex) {
01111
01112 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01113 printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01114 exit(EXIT_FAILURE);
01115 }
01116 if (pipe(pipes)) {
01117 printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01118 exit(EXIT_FAILURE);
01119 }
01120
01121 if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01122 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01123 exit(EXIT_FAILURE);
01124 }
01125
01126
01127 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01128 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01129 exit(EXIT_FAILURE);
01130 }
01131 if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01132 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01133 exit(EXIT_FAILURE);
01134 }
01135 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01136 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01137 exit(EXIT_FAILURE);
01138 }
01139
01140
01141 if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01142 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01143 exit(EXIT_FAILURE);
01144 }
01145 if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01146 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01147 exit(EXIT_FAILURE);
01148 }
01149
01150 childrenPipesCopy = childrenPipes;
01151 childrenSocketsCopy = childrenSockets;
01152
01153 pid_t value = fork();
01154 if(value == 0) {
01155
01156 close(pipes[0]);
01157 close(sockets[0]);
01158
01159 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01160 close(*it);
01161 }
01162 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01163 close(*it);
01164 }
01165
01166
01167 fflush(stdout);
01168 fflush(stderr);
01169 std::stringstream stout;
01170 stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01171 if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01172 LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01173 }
01174 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01175 LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01176 }
01177
01178 LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01179 if(setCpuAffinity_) {
01180
01181
01182
01183
01184
01185
01186 #ifdef __APPLE__
01187 LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01188 #else
01189 LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01190 cpu_set_t mask;
01191 CPU_ZERO(&mask);
01192 CPU_SET(childIndex, &mask);
01193 if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01194 LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01195 exit(-1);
01196 }
01197 #endif
01198 }
01199 break;
01200 } else {
01201
01202 close(pipes[1]);
01203 close(sockets[1]);
01204 }
01205 if(value < 0) {
01206 LogError("ForkingChild") << "failed to create a child";
01207 exit(-1);
01208 }
01209 childrenIds.push_back(value);
01210 childrenSockets.push_back(sockets[0]);
01211 childrenPipes.push_back(pipes[0]);
01212 }
01213
01214 if(childIndex < kMaxChildren) {
01215 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01216 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01217
01218 boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01219 input_->doPostForkReacquireResources(receiver);
01220 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01221
01222
01223 return true;
01224 }
01225 jobReport->parentAfterFork(jobReportFile);
01226 }
01227
01228
01229
01230
01231
01232
01233
01234
01235 sigset_t blockingSigSet;
01236 sigset_t unblockingSigSet;
01237 sigset_t oldSigSet;
01238 pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01239 pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01240 sigaddset(&blockingSigSet, SIGCHLD);
01241 sigaddset(&blockingSigSet, SIGUSR2);
01242 sigaddset(&blockingSigSet, SIGINT);
01243 sigdelset(&unblockingSigSet, SIGCHLD);
01244 sigdelset(&unblockingSigSet, SIGUSR2);
01245 sigdelset(&unblockingSigSet, SIGINT);
01246 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01247
01248
01249
01250 bool too_many_fds = false;
01251 if (pipes[1]+1 > FD_SETSIZE) {
01252 LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01253 too_many_fds = true;
01254 }
01255
01256
01257
01258
01259 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01260 boost::thread senderThread(sender);
01261
01262 if(not too_many_fds) {
01263
01264
01265 possiblyContinueAfterForkChildFailure();
01266 while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
01267 sigsuspend(&unblockingSigSet);
01268 possiblyContinueAfterForkChildFailure();
01269 LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01270 }
01271 }
01272 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01273
01274 LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01275 if(child_failed) {
01276 LogError("ForkingStopping") << "child failed";
01277 }
01278 if(shutdown_flag) {
01279 LogSystem("ForkingStopping") << "asked to shutdown";
01280 }
01281
01282 if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01283 LogInfo("ForkingStopping") << "must stop children" << std::endl;
01284 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01285 it != itEnd; ++it) {
01286 kill(*it, SIGUSR2);
01287 }
01288 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01289 while(num_children_done != kMaxChildren) {
01290 sigsuspend(&unblockingSigSet);
01291 }
01292 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01293 }
01294
01295 senderThread.join();
01296 if(child_failed && !continueAfterChildFailure_) {
01297 if (child_fail_signal) {
01298 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01299 } else if (child_fail_exit_status) {
01300 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01301 } else {
01302 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01303 }
01304 }
01305 if(too_many_fds) {
01306 throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01307 }
01308 return false;
01309 }
01310
01311 void
01312 EventProcessor::connectSigs(EventProcessor* ep) {
01313
01314
01315
01316 actReg_->preProcessEventSignal_.connect(std::cref(ep->preProcessEventSignal_));
01317 actReg_->postProcessEventSignal_.connect(std::cref(ep->postProcessEventSignal_));
01318 }
01319
01320 std::vector<ModuleDescription const*>
01321 EventProcessor::getAllModuleDescriptions() const {
01322 return schedule_->getAllModuleDescriptions();
01323 }
01324
01325 int
01326 EventProcessor::totalEvents() const {
01327 return schedule_->totalEvents();
01328 }
01329
01330 int
01331 EventProcessor::totalEventsPassed() const {
01332 return schedule_->totalEventsPassed();
01333 }
01334
01335 int
01336 EventProcessor::totalEventsFailed() const {
01337 return schedule_->totalEventsFailed();
01338 }
01339
01340 void
01341 EventProcessor::enableEndPaths(bool active) {
01342 schedule_->enableEndPaths(active);
01343 }
01344
01345 bool
01346 EventProcessor::endPathsEnabled() const {
01347 return schedule_->endPathsEnabled();
01348 }
01349
01350 void
01351 EventProcessor::getTriggerReport(TriggerReport& rep) const {
01352 schedule_->getTriggerReport(rep);
01353 }
01354
01355 void
01356 EventProcessor::clearCounters() {
01357 schedule_->clearCounters();
01358 }
01359
01360 char const* EventProcessor::currentStateName() const {
01361 return stateName(getState());
01362 }
01363
01364 char const* EventProcessor::stateName(State s) const {
01365 return stateNames[s];
01366 }
01367
01368 char const* EventProcessor::msgName(Msg m) const {
01369 return msgNames[m];
01370 }
01371
01372 State EventProcessor::getState() const {
01373 return state_;
01374 }
01375
01376 EventProcessor::StatusCode EventProcessor::statusAsync() const {
01377
01378
01379 return last_rc_;
01380 }
01381
01382 void
01383 EventProcessor::setRunNumber(RunNumber_t runNumber) {
01384 if(runNumber == 0) {
01385 runNumber = 1;
01386 LogWarning("Invalid Run")
01387 << "EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
01388 << "Run number was set to 1 instead\n";
01389 }
01390
01391
01392 beginJob();
01393 changeState(mSetRun);
01394
01395
01396 input_->setRunNumber(runNumber);
01397 }
01398
01399 void
01400 EventProcessor::declareRunNumber(RunNumber_t ) {
01401
01402 beginJob();
01403 changeState(mSetRun);
01404
01405
01406
01407 }
01408
01409 EventProcessor::StatusCode
01410 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01411 bool rc = true;
01412 boost::xtime timeout;
01413
01414 #if BOOST_VERSION >= 105000
01415 boost::xtime_get(&timeout, boost::TIME_UTC_);
01416 #else
01417 boost::xtime_get(&timeout, boost::TIME_UTC);
01418 #endif
01419 timeout.sec += timeout_seconds;
01420
01421
01422
01423
01424 {
01425 boost::mutex::scoped_lock sl(stop_lock_);
01426
01427
01428 if(stop_count_ < 0) return last_rc_;
01429
01430 if(timeout_seconds == 0) {
01431 while(stop_count_ == 0) stopper_.wait(sl);
01432 } else {
01433 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01434 }
01435
01436 if(rc == false) {
01437
01438
01439
01440
01441
01442
01443 if(id_set_) pthread_cancel(event_loop_id_);
01444
01445
01446 LogWarning("timeout")
01447 << "An asynchronous request was made to shut down "
01448 << "the event loop "
01449 << "and the event loop did not shutdown after "
01450 << timeout_seconds << " seconds\n";
01451 } else {
01452 event_loop_->join();
01453 event_loop_.reset();
01454 id_set_ = false;
01455 stop_count_ = -1;
01456 }
01457 }
01458 return rc == false ? epTimedOut : last_rc_;
01459 }
01460
01461 EventProcessor::StatusCode
01462 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01463 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01464 if(rc != epTimedOut) changeState(mCountComplete);
01465 else errorState();
01466 return rc;
01467 }
01468
01469
01470 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01471 changeState(mStopAsync);
01472 StatusCode rc = waitForAsyncCompletion(secs);
01473 if(rc != epTimedOut) changeState(mFinished);
01474 else errorState();
01475 return rc;
01476 }
01477
01478 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01479 changeState(mShutdownAsync);
01480 StatusCode rc = waitForAsyncCompletion(secs);
01481 if(rc != epTimedOut) changeState(mFinished);
01482 else errorState();
01483 return rc;
01484 }
01485
01486 void EventProcessor::errorState() {
01487 state_ = sError;
01488 }
01489
01490
01491 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01492
01493
01494
01495 changeState(m);
01496 return waitForAsyncCompletion(60*2);
01497 }
01498
01499 void EventProcessor::changeState(Msg msg) {
01500
01501
01502 boost::mutex::scoped_lock sl(state_lock_);
01503 State curr = state_;
01504 int rc;
01505
01506
01507 for(rc = 0;
01508 table[rc].current != sInvalid &&
01509 (curr != table[rc].current ||
01510 (curr == table[rc].current &&
01511 msg != table[rc].message && table[rc].message != mAny));
01512 ++rc);
01513
01514 if(table[rc].current == sInvalid)
01515 throw cms::Exception("BadState")
01516 << "A member function of EventProcessor has been called in an"
01517 << " inappropriate order.\n"
01518 << "Bad transition from " << stateName(curr) << " "
01519 << "using message " << msgName(msg) << "\n"
01520 << "No where to go from here.\n";
01521
01522 FDEBUG(1) << "changeState: current=" << stateName(curr)
01523 << ", message=" << msgName(msg)
01524 << " -> new=" << stateName(table[rc].final) << "\n";
01525
01526 state_ = table[rc].final;
01527 }
01528
01529 void EventProcessor::runAsync() {
01530 beginJob();
01531 {
01532 boost::mutex::scoped_lock sl(stop_lock_);
01533 if(id_set_ == true) {
01534 std::string err("runAsync called while async event loop already running\n");
01535 LogError("FwkJob") << err;
01536 throw cms::Exception("BadState") << err;
01537 }
01538
01539 changeState(mRunAsync);
01540
01541 stop_count_ = 0;
01542 last_rc_ = epSuccess;
01543 event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01544 boost::xtime timeout;
01545 #if BOOST_VERSION >= 105000
01546 boost::xtime_get(&timeout, boost::TIME_UTC_);
01547 #else
01548 boost::xtime_get(&timeout, boost::TIME_UTC);
01549 #endif
01550 timeout.sec += 60;
01551 if(starter_.timed_wait(sl, timeout) == false) {
01552
01553 throw cms::Exception("BadState")
01554 << "Async run thread did not start in 60 seconds\n";
01555 }
01556 }
01557 }
01558
01559 void EventProcessor::asyncRun(EventProcessor* me) {
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01570
01571 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01572 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01573
01574 {
01575 boost::mutex::scoped_lock sl(me->stop_lock_);
01576 me->event_loop_id_ = pthread_self();
01577 me->id_set_ = true;
01578 me->starter_.notify_all();
01579 }
01580
01581 Status rc = epException;
01582 FDEBUG(2) << "asyncRun starting ......................\n";
01583
01584 try {
01585 bool onlineStateTransitions = true;
01586 rc = me->runToCompletion(onlineStateTransitions);
01587 }
01588 catch (cms::Exception& e) {
01589 LogError("FwkJob") << "cms::Exception caught in "
01590 << "EventProcessor::asyncRun"
01591 << "\n"
01592 << e.explainSelf();
01593 me->last_error_text_ = e.explainSelf();
01594 }
01595 catch (std::exception& e) {
01596 LogError("FwkJob") << "Standard library exception caught in "
01597 << "EventProcessor::asyncRun"
01598 << "\n"
01599 << e.what();
01600 me->last_error_text_ = e.what();
01601 }
01602 catch (...) {
01603 LogError("FwkJob") << "Unknown exception caught in "
01604 << "EventProcessor::asyncRun"
01605 << "\n";
01606 me->last_error_text_ = "Unknown exception caught";
01607 rc = epOther;
01608 }
01609
01610 me->last_rc_ = rc;
01611
01612 {
01613
01614 boost::mutex::scoped_lock sl(me->stop_lock_);
01615 ++me->stop_count_;
01616 me->stopper_.notify_all();
01617 }
01618 FDEBUG(2) << "asyncRun ending ......................\n";
01619 }
01620
01621 std::auto_ptr<statemachine::Machine>
01622 EventProcessor::createStateMachine() {
01623 statemachine::FileMode fileMode;
01624 if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01625 else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01626 else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01627 else {
01628 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01629 << fileMode_ << ".\n"
01630 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01631 }
01632
01633 statemachine::EmptyRunLumiMode emptyRunLumiMode;
01634 if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01635 else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01636 else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01637 else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01638 else {
01639 throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01640 << emptyRunLumiMode_ << ".\n"
01641 << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01642 }
01643
01644 std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
01645 fileMode,
01646 emptyRunLumiMode));
01647
01648 machine->initiate();
01649 return machine;
01650 }
01651
01652
01653 EventProcessor::StatusCode
01654 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01655
01656 StateSentry toerror(this);
01657
01658 StatusCode returnCode=epSuccess;
01659 std::auto_ptr<statemachine::Machine> machine;
01660 {
01661 beginJob();
01662
01663 if(!onlineStateTransitions) changeState(mRunCount);
01664
01665
01666 stateMachineWasInErrorState_ = false;
01667
01668
01669 ServiceRegistry::Operate operate(serviceToken_);
01670
01671 machine = createStateMachine();
01672 try {
01673 try {
01674
01675 InputSource::ItemType itemType;
01676
01677 while(true) {
01678
01679 bool more = true;
01680 if(numberOfForkedChildren_ > 0) {
01681 size_t size = preg_->size();
01682 more = input_->skipForForking();
01683 if(more) {
01684 if(size < preg_->size()) {
01685 principalCache_.adjustIndexesAfterProductRegistryAddition();
01686 }
01687 principalCache_.adjustEventToNewProductRegistry(preg_);
01688 }
01689 }
01690 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
01691
01692 FDEBUG(1) << "itemType = " << itemType << "\n";
01693
01694
01695
01696
01697
01698
01699
01700
01701 if(state_ == sStopping) {
01702 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01703 forceLooperToEnd_ = true;
01704 machine->process_event(statemachine::Stop());
01705 forceLooperToEnd_ = false;
01706 break;
01707 }
01708 else if(state_ == sShuttingDown) {
01709 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01710 forceLooperToEnd_ = true;
01711 machine->process_event(statemachine::Stop());
01712 forceLooperToEnd_ = false;
01713 break;
01714 }
01715
01716
01717 {
01718 boost::mutex::scoped_lock sl(usr2_lock);
01719 if(shutdown_flag) {
01720 changeState(mShutdownSignal);
01721 returnCode = epSignal;
01722 forceLooperToEnd_ = true;
01723 machine->process_event(statemachine::Stop());
01724 forceLooperToEnd_ = false;
01725 break;
01726 }
01727 }
01728
01729 if(itemType == InputSource::IsStop) {
01730 machine->process_event(statemachine::Stop());
01731 }
01732 else if(itemType == InputSource::IsFile) {
01733 machine->process_event(statemachine::File());
01734 }
01735 else if(itemType == InputSource::IsRun) {
01736 machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01737 }
01738 else if(itemType == InputSource::IsLumi) {
01739 machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
01740 }
01741 else if(itemType == InputSource::IsEvent) {
01742 machine->process_event(statemachine::Event());
01743 }
01744
01745 else {
01746 throw Exception(errors::LogicError)
01747 << "Unknown next item type passed to EventProcessor\n"
01748 << "Please report this error to the Framework group\n";
01749 }
01750
01751 if(machine->terminated()) {
01752 changeState(mInputExhausted);
01753 break;
01754 }
01755 }
01756 }
01757 catch (cms::Exception& e) { throw; }
01758 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01759 catch (std::exception& e) { convertException::stdToEDM(e); }
01760 catch(std::string& s) { convertException::stringToEDM(s); }
01761 catch(char const* c) { convertException::charPtrToEDM(c); }
01762 catch (...) { convertException::unknownToEDM(); }
01763 }
01764
01765
01766
01767
01768
01769
01770
01771
01772
01773
01774
01775
01776
01777
01778
01779
01780
01781
01782
01783
01784
01785
01786
01787
01788
01789
01790
01791
01792
01793
01794
01795
01796
01797
01798
01799
01800
01801
01802
01803
01804
01805
01806
01807
01808
01809 catch (cms::Exception & e) {
01810 alreadyHandlingException_ = true;
01811 terminateMachine(machine);
01812 alreadyHandlingException_ = false;
01813 if (!exceptionMessageLumis_.empty()) {
01814 e.addAdditionalInfo(exceptionMessageLumis_);
01815 if (e.alreadyPrinted()) {
01816 LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01817 }
01818 }
01819 if (!exceptionMessageRuns_.empty()) {
01820 e.addAdditionalInfo(exceptionMessageRuns_);
01821 if (e.alreadyPrinted()) {
01822 LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01823 }
01824 }
01825 if (!exceptionMessageFiles_.empty()) {
01826 e.addAdditionalInfo(exceptionMessageFiles_);
01827 if (e.alreadyPrinted()) {
01828 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01829 }
01830 }
01831 throw;
01832 }
01833
01834 if(machine->terminated()) {
01835 FDEBUG(1) << "The state machine reports it has been terminated\n";
01836 machine.reset();
01837 }
01838
01839 if(!onlineStateTransitions) changeState(mFinished);
01840
01841 if(stateMachineWasInErrorState_) {
01842 throw cms::Exception("BadState")
01843 << "The boost state machine in the EventProcessor exited after\n"
01844 << "entering the Error state.\n";
01845 }
01846
01847 }
01848 if(machine.get() != 0) {
01849 terminateMachine(machine);
01850 throw Exception(errors::LogicError)
01851 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01852 << "Please report this error to the Framework group\n";
01853 }
01854
01855 toerror.succeeded();
01856
01857 return returnCode;
01858 }
01859
01860 void EventProcessor::readFile() {
01861 FDEBUG(1) << " \treadFile\n";
01862 size_t size = preg_->size();
01863 fb_ = input_->readFile();
01864 if(size < preg_->size()) {
01865 principalCache_.adjustIndexesAfterProductRegistryAddition();
01866 }
01867 principalCache_.adjustEventToNewProductRegistry(preg_);
01868 if(numberOfForkedChildren_ > 0) {
01869 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01870 }
01871 }
01872
01873 void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
01874 if (fb_.get() != nullptr) {
01875 input_->closeFile(fb_.get(), cleaningUpAfterException);
01876 }
01877 FDEBUG(1) << "\tcloseInputFile\n";
01878 }
01879
01880 void EventProcessor::openOutputFiles() {
01881 if (fb_.get() != nullptr) {
01882 schedule_->openOutputFiles(*fb_);
01883 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01884 }
01885 FDEBUG(1) << "\topenOutputFiles\n";
01886 }
01887
01888 void EventProcessor::closeOutputFiles() {
01889 if (fb_.get() != nullptr) {
01890 schedule_->closeOutputFiles();
01891 if(hasSubProcess()) subProcess_->closeOutputFiles();
01892 }
01893 FDEBUG(1) << "\tcloseOutputFiles\n";
01894 }
01895
01896 void EventProcessor::respondToOpenInputFile() {
01897 if (fb_.get() != nullptr) {
01898 schedule_->respondToOpenInputFile(*fb_);
01899 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01900 }
01901 FDEBUG(1) << "\trespondToOpenInputFile\n";
01902 }
01903
01904 void EventProcessor::respondToCloseInputFile() {
01905 if (fb_.get() != nullptr) {
01906 schedule_->respondToCloseInputFile(*fb_);
01907 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01908 }
01909 FDEBUG(1) << "\trespondToCloseInputFile\n";
01910 }
01911
01912 void EventProcessor::respondToOpenOutputFiles() {
01913 if (fb_.get() != nullptr) {
01914 schedule_->respondToOpenOutputFiles(*fb_);
01915 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01916 }
01917 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01918 }
01919
01920 void EventProcessor::respondToCloseOutputFiles() {
01921 if (fb_.get() != nullptr) {
01922 schedule_->respondToCloseOutputFiles(*fb_);
01923 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01924 }
01925 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01926 }
01927
01928 void EventProcessor::startingNewLoop() {
01929 shouldWeStop_ = false;
01930
01931
01932 if(looper_ && looperBeginJobRun_) {
01933 looper_->doStartingNewLoop();
01934 }
01935 FDEBUG(1) << "\tstartingNewLoop\n";
01936 }
01937
01938 bool EventProcessor::endOfLoop() {
01939 if(looper_) {
01940 ModuleChanger changer(schedule_.get());
01941 looper_->setModuleChanger(&changer);
01942 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01943 looper_->setModuleChanger(nullptr);
01944 if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01945 else return false;
01946 }
01947 FDEBUG(1) << "\tendOfLoop\n";
01948 return true;
01949 }
01950
01951 void EventProcessor::rewindInput() {
01952 input_->repeat();
01953 input_->rewind();
01954 FDEBUG(1) << "\trewind\n";
01955 }
01956
01957 void EventProcessor::prepareForNextLoop() {
01958 looper_->prepareForNextLoop(esp_.get());
01959 FDEBUG(1) << "\tprepareForNextLoop\n";
01960 }
01961
01962 bool EventProcessor::shouldWeCloseOutput() const {
01963 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01964 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01965 }
01966
01967 void EventProcessor::doErrorStuff() {
01968 FDEBUG(1) << "\tdoErrorStuff\n";
01969 LogError("StateMachine")
01970 << "The EventProcessor state machine encountered an unexpected event\n"
01971 << "and went to the error state\n"
01972 << "Will attempt to terminate processing normally\n"
01973 << "(IF using the looper the next loop will be attempted)\n"
01974 << "This likely indicates a bug in an input module or corrupted input or both\n";
01975 stateMachineWasInErrorState_ = true;
01976 }
01977
01978 void EventProcessor::beginRun(statemachine::Run const& run) {
01979 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01980 input_->doBeginRun(runPrincipal);
01981 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01982 runPrincipal.beginTime());
01983 if(forceESCacheClearOnNewRun_){
01984 espController_->forceCacheClear();
01985 }
01986 espController_->eventSetupForInstance(ts);
01987 EventSetup const& es = esp_->eventSetup();
01988 if(looper_ && looperBeginJobRun_== false) {
01989 looper_->copyInfo(ScheduleInfo(schedule_.get()));
01990 looper_->beginOfJob(es);
01991 looperBeginJobRun_ = true;
01992 looper_->doStartingNewLoop();
01993 }
01994 {
01995 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01996 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01997 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01998 if(hasSubProcess()) {
01999 subProcess_->doBeginRun(runPrincipal, ts);
02000 }
02001 sentry.allowThrow();
02002 }
02003 FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
02004 if(looper_) {
02005 looper_->doBeginRun(runPrincipal, es);
02006 }
02007 }
02008
02009 void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
02010 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02011 input_->doEndRun(runPrincipal, cleaningUpAfterException);
02012 IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
02013 runPrincipal.endTime());
02014 espController_->eventSetupForInstance(ts);
02015 EventSetup const& es = esp_->eventSetup();
02016 {
02017 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02018 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02019 schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
02020 if(hasSubProcess()) {
02021 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
02022 }
02023 sentry.allowThrow();
02024 }
02025 FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02026 if(looper_) {
02027 looper_->doEndRun(runPrincipal, es);
02028 }
02029 }
02030
02031 void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02032 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02033 input_->doBeginLumi(lumiPrincipal);
02034
02035 Service<RandomNumberGenerator> rng;
02036 if(rng.isAvailable()) {
02037 LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02038 rng->preBeginLumi(lb);
02039 }
02040
02041
02042
02043 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02044 espController_->eventSetupForInstance(ts);
02045 EventSetup const& es = esp_->eventSetup();
02046 {
02047 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02048 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02049 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02050 if(hasSubProcess()) {
02051 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02052 }
02053 sentry.allowThrow();
02054 }
02055 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02056 if(looper_) {
02057 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02058 }
02059 }
02060
02061 void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
02062 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02063 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
02064
02065
02066 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02067 lumiPrincipal.endTime());
02068 espController_->eventSetupForInstance(ts);
02069 EventSetup const& es = esp_->eventSetup();
02070 {
02071 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02072 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02073 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
02074 if(hasSubProcess()) {
02075 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
02076 }
02077 sentry.allowThrow();
02078 }
02079 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02080 if(looper_) {
02081 looper_->doEndLuminosityBlock(lumiPrincipal, es);
02082 }
02083 }
02084
02085 statemachine::Run EventProcessor::readAndCacheRun() {
02086 if (principalCache_.hasRunPrincipal()) {
02087 throw edm::Exception(edm::errors::LogicError)
02088 << "EventProcessor::readAndCacheRun\n"
02089 << "Illegal attempt to insert run into cache\n"
02090 << "Contact a Framework Developer\n";
02091 }
02092 principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
02093 return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02094 }
02095
02096 statemachine::Run EventProcessor::readAndMergeRun() {
02097 principalCache_.merge(input_->runAuxiliary(), preg_);
02098 input_->readAndMergeRun(principalCache_.runPrincipalPtr());
02099 return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02100 }
02101
02102 int EventProcessor::readAndCacheLumi() {
02103 if (principalCache_.hasLumiPrincipal()) {
02104 throw edm::Exception(edm::errors::LogicError)
02105 << "EventProcessor::readAndCacheRun\n"
02106 << "Illegal attempt to insert lumi into cache\n"
02107 << "Contact a Framework Developer\n";
02108 }
02109 if (!principalCache_.hasRunPrincipal()) {
02110 throw edm::Exception(edm::errors::LogicError)
02111 << "EventProcessor::readAndCacheRun\n"
02112 << "Illegal attempt to insert lumi into cache\n"
02113 << "Run is invalid\n"
02114 << "Contact a Framework Developer\n";
02115 }
02116 principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
02117 principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
02118 return input_->luminosityBlock();
02119 }
02120
02121 int EventProcessor::readAndMergeLumi() {
02122 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
02123 input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
02124 return input_->luminosityBlock();
02125 }
02126
02127 void EventProcessor::writeRun(statemachine::Run const& run) {
02128 schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02129 if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02130 FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02131 }
02132
02133 void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02134 principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02135 if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02136 FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02137 }
02138
02139 void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02140 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02141 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02142 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02143 }
02144
02145 void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02146 principalCache_.deleteLumi(phid, run, lumi);
02147 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02148 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02149 }
02150
02151 void EventProcessor::readAndProcessEvent() {
02152 EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
02153 FDEBUG(1) << "\treadEvent\n";
02154 assert(pep != 0);
02155 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
02156 assert(pep->luminosityBlockPrincipalPtrValid());
02157 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
02158 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
02159
02160 IOVSyncValue ts(pep->id(), pep->time());
02161 espController_->eventSetupForInstance(ts);
02162 EventSetup const& es = esp_->eventSetup();
02163 {
02164 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02165 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02166 schedule_->processOneOccurrence<Traits>(*pep, es);
02167 if(hasSubProcess()) {
02168 subProcess_->doEvent(*pep, ts);
02169 }
02170 sentry.allowThrow();
02171 }
02172
02173 if(looper_) {
02174 bool randomAccess = input_->randomAccess();
02175 ProcessingController::ForwardState forwardState = input_->forwardState();
02176 ProcessingController::ReverseState reverseState = input_->reverseState();
02177 ProcessingController pc(forwardState, reverseState, randomAccess);
02178
02179 EDLooperBase::Status status = EDLooperBase::kContinue;
02180 do {
02181 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02182
02183 bool succeeded = true;
02184 if(randomAccess) {
02185 if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02186 input_->skipEvents(-2);
02187 }
02188 else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02189 succeeded = input_->goToEvent(pc.specifiedEventTransition());
02190 }
02191 }
02192 pc.setLastOperationSucceeded(succeeded);
02193 } while(!pc.lastOperationSucceeded());
02194 if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02195
02196 }
02197
02198 FDEBUG(1) << "\tprocessEvent\n";
02199 pep->clearEventPrincipal();
02200 }
02201
02202 bool EventProcessor::shouldWeStop() const {
02203 FDEBUG(1) << "\tshouldWeStop\n";
02204 if(shouldWeStop_) return true;
02205 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02206 }
02207
02208 void EventProcessor::setExceptionMessageFiles(std::string& message) {
02209 exceptionMessageFiles_ = message;
02210 }
02211
02212 void EventProcessor::setExceptionMessageRuns(std::string& message) {
02213 exceptionMessageRuns_ = message;
02214 }
02215
02216 void EventProcessor::setExceptionMessageLumis(std::string& message) {
02217 exceptionMessageLumis_ = message;
02218 }
02219
02220 bool EventProcessor::alreadyHandlingException() const {
02221 return alreadyHandlingException_;
02222 }
02223
02224 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
02225 if(iMachine.get() != 0) {
02226 if(!iMachine->terminated()) {
02227 forceLooperToEnd_ = true;
02228 iMachine->process_event(statemachine::Stop());
02229 forceLooperToEnd_ = false;
02230 }
02231 else {
02232 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
02233 }
02234 if(iMachine->terminated()) {
02235 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02236 }
02237 iMachine.reset();
02238 }
02239 }
02240 }