00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072
00073
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080
00081
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085
00086 namespace edm {
00087
00088 namespace event_processor {
00089
00090 class StateSentry {
00091 public:
00092 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00093 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00094 void succeeded() {success_ = true;}
00095
00096 private:
00097 EventProcessor* ep_;
00098 bool success_;
00099 };
00100 }
00101
00102 using namespace event_processor;
00103
00104 namespace {
00105 template <typename T>
00106 class ScheduleSignalSentry {
00107 public:
00108 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00109 a_(a), principal_(principal), es_(es) {
00110 if (a_) T::preScheduleSignal(a_, principal_);
00111 }
00112 ~ScheduleSignalSentry() {
00113 if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00114 }
00115
00116 private:
00117
00118 ActivityRegistry* a_;
00119 typename T::MyPrincipal* principal_;
00120 EventSetup const* es_;
00121 };
00122 }
00123
00124 namespace {
00125
00126
00127
00128
00129 char const* stateNames[] = {
00130 "Init",
00131 "JobReady",
00132 "RunGiven",
00133 "Running",
00134 "Stopping",
00135 "ShuttingDown",
00136 "Done",
00137 "JobEnded",
00138 "Error",
00139 "ErrorEnded",
00140 "End",
00141 "Invalid"
00142 };
00143
00144 char const* msgNames[] = {
00145 "SetRun",
00146 "Skip",
00147 "RunAsync",
00148 "Run(ID)",
00149 "Run(count)",
00150 "BeginJob",
00151 "StopAsync",
00152 "ShutdownAsync",
00153 "EndJob",
00154 "CountComplete",
00155 "InputExhausted",
00156 "StopSignal",
00157 "ShutdownSignal",
00158 "Finished",
00159 "Any",
00160 "dtor",
00161 "Exception",
00162 "Rewind"
00163 };
00164 }
00165
00166
00167
00168
00169
00170 struct TransEntry {
00171 State current;
00172 Msg message;
00173 State final;
00174 };
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197 TransEntry table[] = {
00198
00199
00200 { sInit, mException, sError },
00201 { sInit, mBeginJob, sJobReady },
00202 { sJobReady, mException, sError },
00203 { sJobReady, mSetRun, sRunGiven },
00204 { sJobReady, mInputRewind, sRunning },
00205 { sJobReady, mSkip, sRunning },
00206 { sJobReady, mRunID, sRunning },
00207 { sJobReady, mRunCount, sRunning },
00208 { sJobReady, mEndJob, sJobEnded },
00209 { sJobReady, mBeginJob, sJobReady },
00210 { sJobReady, mDtor, sEnd },
00211
00212 { sJobReady, mStopAsync, sJobReady },
00213 { sJobReady, mCountComplete, sJobReady },
00214 { sJobReady, mFinished, sJobReady },
00215
00216 { sRunGiven, mException, sError },
00217 { sRunGiven, mRunAsync, sRunning },
00218 { sRunGiven, mBeginJob, sRunGiven },
00219 { sRunGiven, mShutdownAsync, sShuttingDown },
00220 { sRunGiven, mStopAsync, sStopping },
00221 { sRunning, mException, sError },
00222 { sRunning, mStopAsync, sStopping },
00223 { sRunning, mShutdownAsync, sShuttingDown },
00224 { sRunning, mShutdownSignal, sShuttingDown },
00225 { sRunning, mCountComplete, sStopping },
00226 { sRunning, mInputExhausted, sStopping },
00227
00228 { sStopping, mInputRewind, sRunning },
00229 { sStopping, mException, sError },
00230 { sStopping, mFinished, sJobReady },
00231 { sStopping, mCountComplete, sJobReady },
00232 { sStopping, mShutdownSignal, sShuttingDown },
00233 { sStopping, mStopAsync, sStopping },
00234 { sStopping, mInputExhausted, sStopping },
00235
00236 { sShuttingDown, mException, sError },
00237 { sShuttingDown, mShutdownSignal, sShuttingDown },
00238 { sShuttingDown, mCountComplete, sDone },
00239 { sShuttingDown, mInputExhausted, sDone },
00240 { sShuttingDown, mFinished, sDone },
00241
00242
00243
00244 { sDone, mEndJob, sJobEnded },
00245 { sDone, mException, sError },
00246 { sJobEnded, mDtor, sEnd },
00247 { sJobEnded, mException, sError },
00248 { sError, mEndJob, sError },
00249 { sError, mDtor, sError },
00250 { sInit, mDtor, sEnd },
00251 { sStopping, mShutdownAsync, sShuttingDown },
00252 { sInvalid, mAny, sInvalid }
00253 };
00254
00255
00256
00257
00258
00259
00260 boost::shared_ptr<InputSource>
00261 makeInput(ParameterSet& params,
00262 CommonParams const& common,
00263 ProductRegistry& preg,
00264 boost::shared_ptr<BranchIDListHelper> branchIDListHelper,
00265 boost::shared_ptr<ActivityRegistry> areg,
00266 boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00267 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00268 if(main_input == 0) {
00269 throw Exception(errors::Configuration)
00270 << "There must be exactly one source in the configuration.\n"
00271 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00272 }
00273
00274 std::string modtype(main_input->getParameter<std::string>("@module_type"));
00275
00276 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00277 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00278 ConfigurationDescriptions descriptions(filler->baseType());
00279 filler->fill(descriptions);
00280
00281 try {
00282 try {
00283 descriptions.validate(*main_input, std::string("source"));
00284 }
00285 catch (cms::Exception& e) { throw; }
00286 catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00287 catch (std::exception& e) { convertException::stdToEDM(e); }
00288 catch (std::string& s) { convertException::stringToEDM(s); }
00289 catch (char const* c) { convertException::charPtrToEDM(c); }
00290 catch (...) { convertException::unknownToEDM(); }
00291 }
00292 catch (cms::Exception & iException) {
00293 std::ostringstream ost;
00294 ost << "Validating configuration of input source of type " << modtype;
00295 iException.addContext(ost.str());
00296 throw;
00297 }
00298
00299 main_input->registerIt();
00300
00301
00302
00303
00304
00305
00306
00307 ModuleDescription md(main_input->id(),
00308 main_input->getParameter<std::string>("@module_type"),
00309 "source",
00310 processConfiguration.get());
00311
00312 InputSourceDescription isdesc(md, preg, branchIDListHelper, areg, common.maxEventsInput_, common.maxLumisInput_);
00313 areg->preSourceConstructionSignal_(md);
00314 boost::shared_ptr<InputSource> input;
00315 try {
00316 try {
00317 input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00318 }
00319 catch (cms::Exception& e) { throw; }
00320 catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00321 catch (std::exception& e) { convertException::stdToEDM(e); }
00322 catch (std::string& s) { convertException::stringToEDM(s); }
00323 catch (char const* c) { convertException::charPtrToEDM(c); }
00324 catch (...) { convertException::unknownToEDM(); }
00325 }
00326 catch (cms::Exception& iException) {
00327 areg->postSourceConstructionSignal_(md);
00328 std::ostringstream ost;
00329 ost << "Constructing input source of type " << modtype;
00330 iException.addContext(ost.str());
00331 throw;
00332 }
00333 areg->postSourceConstructionSignal_(md);
00334 return input;
00335 }
00336
00337
00338 boost::shared_ptr<EDLooperBase>
00339 fillLooper(eventsetup::EventSetupsController& esController,
00340 eventsetup::EventSetupProvider& cp,
00341 ParameterSet& params) {
00342 boost::shared_ptr<EDLooperBase> vLooper;
00343
00344 std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00345
00346 if(loopers.size() == 0) {
00347 return vLooper;
00348 }
00349
00350 assert(1 == loopers.size());
00351
00352 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00353 itName != itNameEnd;
00354 ++itName) {
00355
00356 ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00357 providerPSet->registerIt();
00358 vLooper = eventsetup::LooperFactory::get()->addTo(esController,
00359 cp,
00360 *providerPSet);
00361 }
00362 return vLooper;
00363
00364 }
00365
00366
00367 EventProcessor::EventProcessor(std::string const& config,
00368 ServiceToken const& iToken,
00369 serviceregistry::ServiceLegacy iLegacy,
00370 std::vector<std::string> const& defaultServices,
00371 std::vector<std::string> const& forcedServices) :
00372 preProcessEventSignal_(),
00373 postProcessEventSignal_(),
00374 actReg_(),
00375 preg_(),
00376 branchIDListHelper_(),
00377 serviceToken_(),
00378 input_(),
00379 espController_(new eventsetup::EventSetupsController),
00380 esp_(),
00381 act_table_(),
00382 processConfiguration_(),
00383 schedule_(),
00384 subProcess_(),
00385 historyAppender_(new HistoryAppender),
00386 state_(sInit),
00387 event_loop_(),
00388 state_lock_(),
00389 stop_lock_(),
00390 stopper_(),
00391 starter_(),
00392 stop_count_(-1),
00393 last_rc_(epSuccess),
00394 last_error_text_(),
00395 id_set_(false),
00396 event_loop_id_(),
00397 my_sig_num_(getSigNum()),
00398 fb_(),
00399 looper_(),
00400 principalCache_(),
00401 shouldWeStop_(false),
00402 stateMachineWasInErrorState_(false),
00403 fileMode_(),
00404 emptyRunLumiMode_(),
00405 exceptionMessageFiles_(),
00406 exceptionMessageRuns_(),
00407 exceptionMessageLumis_(),
00408 alreadyHandlingException_(false),
00409 forceLooperToEnd_(false),
00410 looperBeginJobRun_(false),
00411 forceESCacheClearOnNewRun_(false),
00412 numberOfForkedChildren_(0),
00413 numberOfSequentialEventsPerChild_(1),
00414 setCpuAffinity_(false),
00415 eventSetupDataToExcludeFromPrefetching_() {
00416 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00417 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00418 processDesc->addServices(defaultServices, forcedServices);
00419 init(processDesc, iToken, iLegacy);
00420 }
00421
00422 EventProcessor::EventProcessor(std::string const& config,
00423 std::vector<std::string> const& defaultServices,
00424 std::vector<std::string> const& forcedServices) :
00425 preProcessEventSignal_(),
00426 postProcessEventSignal_(),
00427 actReg_(),
00428 preg_(),
00429 branchIDListHelper_(),
00430 serviceToken_(),
00431 input_(),
00432 espController_(new eventsetup::EventSetupsController),
00433 esp_(),
00434 act_table_(),
00435 processConfiguration_(),
00436 schedule_(),
00437 subProcess_(),
00438 historyAppender_(new HistoryAppender),
00439 state_(sInit),
00440 event_loop_(),
00441 state_lock_(),
00442 stop_lock_(),
00443 stopper_(),
00444 starter_(),
00445 stop_count_(-1),
00446 last_rc_(epSuccess),
00447 last_error_text_(),
00448 id_set_(false),
00449 event_loop_id_(),
00450 my_sig_num_(getSigNum()),
00451 fb_(),
00452 looper_(),
00453 principalCache_(),
00454 shouldWeStop_(false),
00455 stateMachineWasInErrorState_(false),
00456 fileMode_(),
00457 emptyRunLumiMode_(),
00458 exceptionMessageFiles_(),
00459 exceptionMessageRuns_(),
00460 exceptionMessageLumis_(),
00461 alreadyHandlingException_(false),
00462 forceLooperToEnd_(false),
00463 looperBeginJobRun_(false),
00464 forceESCacheClearOnNewRun_(false),
00465 numberOfForkedChildren_(0),
00466 numberOfSequentialEventsPerChild_(1),
00467 setCpuAffinity_(false),
00468 eventSetupDataToExcludeFromPrefetching_() {
00469 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00470 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00471 processDesc->addServices(defaultServices, forcedServices);
00472 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00473 }
00474
00475 EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00476 ServiceToken const& token,
00477 serviceregistry::ServiceLegacy legacy) :
00478 preProcessEventSignal_(),
00479 postProcessEventSignal_(),
00480 actReg_(),
00481 preg_(),
00482 branchIDListHelper_(),
00483 serviceToken_(),
00484 input_(),
00485 espController_(new eventsetup::EventSetupsController),
00486 esp_(),
00487 act_table_(),
00488 processConfiguration_(),
00489 schedule_(),
00490 subProcess_(),
00491 historyAppender_(new HistoryAppender),
00492 state_(sInit),
00493 event_loop_(),
00494 state_lock_(),
00495 stop_lock_(),
00496 stopper_(),
00497 starter_(),
00498 stop_count_(-1),
00499 last_rc_(epSuccess),
00500 last_error_text_(),
00501 id_set_(false),
00502 event_loop_id_(),
00503 my_sig_num_(getSigNum()),
00504 fb_(),
00505 looper_(),
00506 principalCache_(),
00507 shouldWeStop_(false),
00508 stateMachineWasInErrorState_(false),
00509 fileMode_(),
00510 emptyRunLumiMode_(),
00511 exceptionMessageFiles_(),
00512 exceptionMessageRuns_(),
00513 exceptionMessageLumis_(),
00514 alreadyHandlingException_(false),
00515 forceLooperToEnd_(false),
00516 looperBeginJobRun_(false),
00517 forceESCacheClearOnNewRun_(false),
00518 numberOfForkedChildren_(0),
00519 numberOfSequentialEventsPerChild_(1),
00520 setCpuAffinity_(false),
00521 eventSetupDataToExcludeFromPrefetching_() {
00522 init(processDesc, token, legacy);
00523 }
00524
00525
00526 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00527 preProcessEventSignal_(),
00528 postProcessEventSignal_(),
00529 actReg_(),
00530 preg_(),
00531 branchIDListHelper_(),
00532 serviceToken_(),
00533 input_(),
00534 espController_(new eventsetup::EventSetupsController),
00535 esp_(),
00536 act_table_(),
00537 processConfiguration_(),
00538 schedule_(),
00539 subProcess_(),
00540 historyAppender_(new HistoryAppender),
00541 state_(sInit),
00542 event_loop_(),
00543 state_lock_(),
00544 stop_lock_(),
00545 stopper_(),
00546 starter_(),
00547 stop_count_(-1),
00548 last_rc_(epSuccess),
00549 last_error_text_(),
00550 id_set_(false),
00551 event_loop_id_(),
00552 my_sig_num_(getSigNum()),
00553 fb_(),
00554 looper_(),
00555 principalCache_(),
00556 shouldWeStop_(false),
00557 stateMachineWasInErrorState_(false),
00558 fileMode_(),
00559 emptyRunLumiMode_(),
00560 exceptionMessageFiles_(),
00561 exceptionMessageRuns_(),
00562 exceptionMessageLumis_(),
00563 alreadyHandlingException_(false),
00564 forceLooperToEnd_(false),
00565 looperBeginJobRun_(false),
00566 forceESCacheClearOnNewRun_(false),
00567 numberOfForkedChildren_(0),
00568 numberOfSequentialEventsPerChild_(1),
00569 setCpuAffinity_(false),
00570 eventSetupDataToExcludeFromPrefetching_() {
00571 if(isPython) {
00572 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00573 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00574 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00575 }
00576 else {
00577 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00578 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00579 }
00580 }
00581
00582 void
00583 EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00584 ServiceToken const& iToken,
00585 serviceregistry::ServiceLegacy iLegacy) {
00586
00587
00588
00589 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00590
00591
00592
00593 boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00594
00595
00596 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00597 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00598 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00599 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00600 ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00601 numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00602 numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00603 setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00604 continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
00605 std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00606 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00607 itPS != itPSEnd;
00608 ++itPS) {
00609 eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00610 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00611 itPS->getUntrackedParameter<std::string>("label", "")));
00612 }
00613 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00614
00615
00616 ScheduleItems items;
00617
00618
00619 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00620 ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00621 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00622
00623
00624 ServiceRegistry::Operate operate(serviceToken_);
00625
00626
00627 boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00628
00629
00630 esp_ = espController_->makeProvider(*parameterSet);
00631
00632
00633 looper_ = fillLooper(*espController_, *esp_, *parameterSet);
00634 if(looper_) {
00635 looper_->setActionTable(items.act_table_.get());
00636 looper_->attachTo(*items.actReg_);
00637 }
00638
00639
00640 input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
00641
00642
00643 schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00644
00645
00646 act_table_ = items.act_table_;
00647 actReg_ = items.actReg_;
00648 preg_ = items.preg_;
00649 branchIDListHelper_ = items.branchIDListHelper_;
00650 processConfiguration_ = items.processConfiguration_;
00651
00652 FDEBUG(2) << parameterSet << std::endl;
00653 connectSigs(this);
00654
00655
00656 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
00657 principalCache_.insert(ep);
00658
00659
00660 if(subProcessParameterSet) {
00661 subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
00662 }
00663 }
00664
00665 EventProcessor::~EventProcessor() {
00666
00667 ServiceToken token = getToken();
00668 ServiceRegistry::Operate op(token);
00669 try {
00670 changeState(mDtor);
00671 }
00672 catch(cms::Exception& e) {
00673 LogError("System")
00674 << e.explainSelf() << "\n";
00675 }
00676
00677
00678 espController_.reset();
00679 subProcess_.reset();
00680 esp_.reset();
00681 schedule_.reset();
00682 input_.reset();
00683 looper_.reset();
00684 actReg_.reset();
00685
00686 pset::Registry* psetRegistry = pset::Registry::instance();
00687 psetRegistry->data().clear();
00688 psetRegistry->extra().setID(ParameterSetID());
00689
00690 EntryDescriptionRegistry::instance()->data().clear();
00691 ParentageRegistry::instance()->data().clear();
00692 ProcessConfigurationRegistry::instance()->data().clear();
00693 ProcessHistoryRegistry::instance()->data().clear();
00694 }
00695
00696 void
00697 EventProcessor::beginJob() {
00698 if(state_ != sInit) return;
00699 bk::beginJob();
00700
00701 changeState(mBeginJob);
00702
00703
00704
00705 ServiceRegistry::Operate operate(serviceToken_);
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721 try {
00722 try {
00723 input_->doBeginJob();
00724 }
00725 catch (cms::Exception& e) { throw; }
00726 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00727 catch (std::exception& e) { convertException::stdToEDM(e); }
00728 catch(std::string& s) { convertException::stringToEDM(s); }
00729 catch(char const* c) { convertException::charPtrToEDM(c); }
00730 catch (...) { convertException::unknownToEDM(); }
00731 }
00732 catch(cms::Exception& ex) {
00733 ex.addContext("Calling beginJob for the source");
00734 throw;
00735 }
00736 schedule_->beginJob();
00737
00738 if(hasSubProcess()) subProcess_->doBeginJob();
00739 actReg_->postBeginJobSignal_();
00740 }
00741
00742 void
00743 EventProcessor::endJob() {
00744
00745 ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00746
00747
00748 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00749
00750
00751 ServiceRegistry::Operate operate(serviceToken_);
00752
00753 schedule_->endJob(c);
00754 if(hasSubProcess()) {
00755 c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00756 }
00757 c.call(boost::bind(&InputSource::doEndJob, input_));
00758 if(looper_) {
00759 c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00760 }
00761 c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00762 if(c.hasThrown()) {
00763 c.rethrow();
00764 }
00765 }
00766
00767 ServiceToken
00768 EventProcessor::getToken() {
00769 return serviceToken_;
00770 }
00771
00772
00773 namespace {
00774
00775
00776 volatile bool child_failed = false;
00777 volatile unsigned int num_children_done = 0;
00778 volatile int child_fail_exit_status = 0;
00779 volatile int child_fail_signal = 0;
00780
00781
00782
00783
00784 extern "C" {
00785 void ep_sigchld(int, siginfo_t*, void*) {
00786
00787
00788 int stat_loc;
00789 pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00790 while(0<p) {
00791
00792 if(WIFEXITED(stat_loc)) {
00793 ++num_children_done;
00794 if(0 != WEXITSTATUS(stat_loc)) {
00795 child_fail_exit_status = WEXITSTATUS(stat_loc);
00796 child_failed = true;
00797 }
00798 }
00799 if(WIFSIGNALED(stat_loc)) {
00800 ++num_children_done;
00801 child_fail_signal = WTERMSIG(stat_loc);
00802 child_failed = true;
00803 }
00804 p = waitpid(-1, &stat_loc, WNOHANG);
00805 }
00806 }
00807 }
00808
00809 }
00810
00811 enum {
00812 kChildSucceed,
00813 kChildExitBadly,
00814 kChildSegv,
00815 kMaxChildAction
00816 };
00817
00818 namespace {
00819 unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00820 unsigned int n = 0;
00821 while(numberOfChildren != 0) {
00822 ++n;
00823 numberOfChildren /= 10;
00824 }
00825 if(n == 0) {
00826 n = 3;
00827 }
00828 return n;
00829 }
00830
00831
00832
00833 class MessageSenderToSource {
00834 public:
00835 MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00836 void operator()();
00837
00838 private:
00839 const std::vector<int>& m_childrenPipes;
00840 long const m_nEventsToProcess;
00841 fd_set m_socketSet;
00842 unsigned int m_aliveChildren;
00843 int m_maxFd;
00844 };
00845
00846 MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00847 std::vector<int> const& childrenPipes,
00848 long iNEventsToProcess):
00849 m_childrenPipes(childrenPipes),
00850 m_nEventsToProcess(iNEventsToProcess),
00851 m_aliveChildren(childrenSockets.size()),
00852 m_maxFd(0)
00853 {
00854 FD_ZERO(&m_socketSet);
00855 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00856 it != itEnd; it++) {
00857 FD_SET(*it, &m_socketSet);
00858 if (*it > m_maxFd) {
00859 m_maxFd = *it;
00860 }
00861 }
00862 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00863 it != itEnd; ++it) {
00864 FD_SET(*it, &m_socketSet);
00865 if (*it > m_maxFd) {
00866 m_maxFd = *it;
00867 }
00868 }
00869 m_maxFd++;
00870 }
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888 void
00889 MessageSenderToSource::operator()() {
00890 multicore::MessageForParent childMsg;
00891 LogInfo("ForkingController") << "I am controller";
00892
00893
00894 multicore::MessageForSource sndmsg;
00895 sndmsg.startIndex = 0;
00896 sndmsg.nIndices = m_nEventsToProcess;
00897 do {
00898
00899 fd_set readSockets, errorSockets;
00900
00901 memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00902 memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00903
00904 ssize_t rc;
00905 while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00906 if (rc < 0) {
00907 std::cerr << "select failed; should be impossible due to preconditions.\n";
00908 abort();
00909 break;
00910 }
00911
00912
00913 for (int idx=0; idx<m_maxFd; idx++) {
00914
00915
00916 if (FD_ISSET(idx, &errorSockets)) {
00917 LogInfo("ForkingController") << "Error on socket " << idx;
00918 FD_CLR(idx, &m_socketSet);
00919 close(idx);
00920
00921 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00922 if (*it == idx) {
00923 m_aliveChildren--;
00924 }
00925 }
00926 continue;
00927 }
00928
00929 if (!FD_ISSET(idx, &readSockets)) {
00930 continue;
00931 }
00932
00933
00934
00935 bool is_pipe = false;
00936 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00937 if (*it == idx) {
00938 is_pipe = true;
00939 char buf;
00940 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00941 if (rc <= 0) {
00942 m_aliveChildren--;
00943 FD_CLR(idx, &m_socketSet);
00944 close(idx);
00945 }
00946 }
00947 }
00948
00949
00950 if (!is_pipe) {
00951 while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
00952 if (rc < 0) {
00953 FD_CLR(idx, &m_socketSet);
00954 close(idx);
00955 continue;
00956 }
00957
00958
00959
00960
00961
00962 while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
00963 if (rc < 0) {
00964 FD_CLR(idx, &m_socketSet);
00965 close(idx);
00966 continue;
00967 }
00968
00969 sndmsg.startIndex += sndmsg.nIndices;
00970 }
00971 }
00972
00973 } while (m_aliveChildren > 0);
00974
00975 return;
00976 }
00977
00978 }
00979
00980
00981 void EventProcessor::possiblyContinueAfterForkChildFailure() {
00982 if(child_failed && continueAfterChildFailure_) {
00983 if (child_fail_signal) {
00984 LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
00985 child_fail_signal=0;
00986 } else if (child_fail_exit_status) {
00987 LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
00988 child_fail_exit_status=0;
00989 } else {
00990 LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
00991 }
00992 child_failed =false;
00993 }
00994 }
00995
00996 bool
00997 EventProcessor::forkProcess(std::string const& jobReportFile) {
00998
00999 if(0 == numberOfForkedChildren_) {return true;}
01000 assert(0<numberOfForkedChildren_);
01001
01002 {
01003 beginJob();
01004
01005 ServiceRegistry::Operate operate(serviceToken_);
01006
01007 InputSource::ItemType itemType;
01008 itemType = input_->nextItemType();
01009
01010 assert(itemType == InputSource::IsFile);
01011 {
01012 readFile();
01013 }
01014 itemType = input_->nextItemType();
01015 assert(itemType == InputSource::IsRun);
01016
01017 LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01018 IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01019 input_->runAuxiliary()->beginTime());
01020 espController_->eventSetupForInstance(ts);
01021 EventSetup const& es = esp_->eventSetup();
01022
01023
01024 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01025 es.fillAvailableRecordKeys(recordKeys);
01026 std::vector<eventsetup::DataKey> dataKeys;
01027 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01028 itKey != itEnd;
01029 ++itKey) {
01030 eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01031
01032 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01033 ExcludedData const* excludedData(0);
01034 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01035 excludedData = &(itExcludeRec->second);
01036 if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01037
01038 continue;
01039 }
01040 }
01041 if(0 != recordPtr) {
01042 dataKeys.clear();
01043 recordPtr->fillRegisteredDataKeys(dataKeys);
01044 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01045 itDataKey != itDataKeyEnd;
01046 ++itDataKey) {
01047
01048 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01049 LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01050 continue;
01051 }
01052 try {
01053 recordPtr->doGet(*itDataKey);
01054 } catch(cms::Exception& e) {
01055 LogWarning("ForkingEventSetupPreFetching") << e.what();
01056 }
01057 }
01058 }
01059 }
01060 }
01061 LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
01062 {
01063
01064 ServiceRegistry::Operate operate(serviceToken_);
01065 Service<JobReport> jobReport;
01066 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01067
01068
01069 actReg_->preForkReleaseResourcesSignal_();
01070 input_->doPreForkReleaseResources();
01071 schedule_->preForkReleaseResources();
01072 }
01073 installCustomHandler(SIGCHLD, ep_sigchld);
01074
01075
01076 unsigned int childIndex = 0;
01077 unsigned int const kMaxChildren = numberOfForkedChildren_;
01078 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01079 std::vector<pid_t> childrenIds;
01080 childrenIds.reserve(kMaxChildren);
01081 std::vector<int> childrenSockets;
01082 childrenSockets.reserve(kMaxChildren);
01083 std::vector<int> childrenPipes;
01084 childrenPipes.reserve(kMaxChildren);
01085 std::vector<int> childrenSocketsCopy;
01086 childrenSocketsCopy.reserve(kMaxChildren);
01087 std::vector<int> childrenPipesCopy;
01088 childrenPipesCopy.reserve(kMaxChildren);
01089 int pipes[] {0, 0};
01090
01091 {
01092
01093 ServiceRegistry::Operate operate(serviceToken_);
01094 Service<JobReport> jobReport;
01095 int sockets[2], fd_flags;
01096 for(; childIndex < kMaxChildren; ++childIndex) {
01097
01098 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01099 printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01100 exit(EXIT_FAILURE);
01101 }
01102 if (pipe(pipes)) {
01103 printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01104 exit(EXIT_FAILURE);
01105 }
01106
01107 if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01108 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01109 exit(EXIT_FAILURE);
01110 }
01111
01112
01113 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01114 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01115 exit(EXIT_FAILURE);
01116 }
01117 if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01118 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01119 exit(EXIT_FAILURE);
01120 }
01121 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01122 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01123 exit(EXIT_FAILURE);
01124 }
01125
01126
01127 if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01128 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01129 exit(EXIT_FAILURE);
01130 }
01131 if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01132 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01133 exit(EXIT_FAILURE);
01134 }
01135
01136 childrenPipesCopy = childrenPipes;
01137 childrenSocketsCopy = childrenSockets;
01138
01139 pid_t value = fork();
01140 if(value == 0) {
01141
01142 close(pipes[0]);
01143 close(sockets[0]);
01144
01145 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01146 close(*it);
01147 }
01148 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01149 close(*it);
01150 }
01151
01152
01153 fflush(stdout);
01154 fflush(stderr);
01155 std::stringstream stout;
01156 stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01157 if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01158 LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01159 }
01160 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01161 LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01162 }
01163
01164 LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01165 if(setCpuAffinity_) {
01166
01167
01168
01169
01170
01171
01172 #ifdef __APPLE__
01173 LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01174 #else
01175 LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01176 cpu_set_t mask;
01177 CPU_ZERO(&mask);
01178 CPU_SET(childIndex, &mask);
01179 if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01180 LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01181 exit(-1);
01182 }
01183 #endif
01184 }
01185 break;
01186 } else {
01187
01188 close(pipes[1]);
01189 close(sockets[1]);
01190 }
01191 if(value < 0) {
01192 LogError("ForkingChild") << "failed to create a child";
01193 exit(-1);
01194 }
01195 childrenIds.push_back(value);
01196 childrenSockets.push_back(sockets[0]);
01197 childrenPipes.push_back(pipes[0]);
01198 }
01199
01200 if(childIndex < kMaxChildren) {
01201 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01202 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01203
01204 boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01205 input_->doPostForkReacquireResources(receiver);
01206 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01207
01208
01209 return true;
01210 }
01211 jobReport->parentAfterFork(jobReportFile);
01212 }
01213
01214
01215
01216
01217
01218
01219
01220
01221 sigset_t blockingSigSet;
01222 sigset_t unblockingSigSet;
01223 sigset_t oldSigSet;
01224 pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01225 pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01226 sigaddset(&blockingSigSet, SIGCHLD);
01227 sigaddset(&blockingSigSet, SIGUSR2);
01228 sigaddset(&blockingSigSet, SIGINT);
01229 sigdelset(&unblockingSigSet, SIGCHLD);
01230 sigdelset(&unblockingSigSet, SIGUSR2);
01231 sigdelset(&unblockingSigSet, SIGINT);
01232 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01233
01234
01235
01236 bool too_many_fds = false;
01237 if (pipes[1]+1 > FD_SETSIZE) {
01238 LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01239 too_many_fds = true;
01240 }
01241
01242
01243
01244
01245 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01246 boost::thread senderThread(sender);
01247
01248 if(not too_many_fds) {
01249
01250
01251 possiblyContinueAfterForkChildFailure();
01252 while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
01253 sigsuspend(&unblockingSigSet);
01254 possiblyContinueAfterForkChildFailure();
01255 LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01256 }
01257 }
01258 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01259
01260 LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01261 if(child_failed) {
01262 LogError("ForkingStopping") << "child failed";
01263 }
01264 if(shutdown_flag) {
01265 LogSystem("ForkingStopping") << "asked to shutdown";
01266 }
01267
01268 if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01269 LogInfo("ForkingStopping") << "must stop children" << std::endl;
01270 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01271 it != itEnd; ++it) {
01272 kill(*it, SIGUSR2);
01273 }
01274 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01275 while(num_children_done != kMaxChildren) {
01276 sigsuspend(&unblockingSigSet);
01277 }
01278 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01279 }
01280
01281 senderThread.join();
01282 if(child_failed && !continueAfterChildFailure_) {
01283 if (child_fail_signal) {
01284 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01285 } else if (child_fail_exit_status) {
01286 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01287 } else {
01288 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01289 }
01290 }
01291 if(too_many_fds) {
01292 throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01293 }
01294 return false;
01295 }
01296
01297 void
01298 EventProcessor::connectSigs(EventProcessor* ep) {
01299
01300
01301
01302 actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01303 actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01304 }
01305
01306 std::vector<ModuleDescription const*>
01307 EventProcessor::getAllModuleDescriptions() const {
01308 return schedule_->getAllModuleDescriptions();
01309 }
01310
01311 int
01312 EventProcessor::totalEvents() const {
01313 return schedule_->totalEvents();
01314 }
01315
01316 int
01317 EventProcessor::totalEventsPassed() const {
01318 return schedule_->totalEventsPassed();
01319 }
01320
01321 int
01322 EventProcessor::totalEventsFailed() const {
01323 return schedule_->totalEventsFailed();
01324 }
01325
01326 void
01327 EventProcessor::enableEndPaths(bool active) {
01328 schedule_->enableEndPaths(active);
01329 }
01330
01331 bool
01332 EventProcessor::endPathsEnabled() const {
01333 return schedule_->endPathsEnabled();
01334 }
01335
01336 void
01337 EventProcessor::getTriggerReport(TriggerReport& rep) const {
01338 schedule_->getTriggerReport(rep);
01339 }
01340
01341 void
01342 EventProcessor::clearCounters() {
01343 schedule_->clearCounters();
01344 }
01345
01346 char const* EventProcessor::currentStateName() const {
01347 return stateName(getState());
01348 }
01349
01350 char const* EventProcessor::stateName(State s) const {
01351 return stateNames[s];
01352 }
01353
01354 char const* EventProcessor::msgName(Msg m) const {
01355 return msgNames[m];
01356 }
01357
01358 State EventProcessor::getState() const {
01359 return state_;
01360 }
01361
01362 EventProcessor::StatusCode EventProcessor::statusAsync() const {
01363
01364
01365 return last_rc_;
01366 }
01367
01368 void
01369 EventProcessor::setRunNumber(RunNumber_t runNumber) {
01370 if(runNumber == 0) {
01371 runNumber = 1;
01372 LogWarning("Invalid Run")
01373 << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01374 << "Run number was set to 1 instead\n";
01375 }
01376
01377
01378 beginJob();
01379 changeState(mSetRun);
01380
01381
01382 input_->setRunNumber(runNumber);
01383 }
01384
01385 void
01386 EventProcessor::declareRunNumber(RunNumber_t ) {
01387
01388 beginJob();
01389 changeState(mSetRun);
01390
01391
01392
01393 }
01394
01395 EventProcessor::StatusCode
01396 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01397 bool rc = true;
01398 boost::xtime timeout;
01399
01400 #if BOOST_VERSION >= 105000
01401 boost::xtime_get(&timeout, boost::TIME_UTC_);
01402 #else
01403 boost::xtime_get(&timeout, boost::TIME_UTC);
01404 #endif
01405 timeout.sec += timeout_seconds;
01406
01407
01408
01409
01410 {
01411 boost::mutex::scoped_lock sl(stop_lock_);
01412
01413
01414 if(stop_count_ < 0) return last_rc_;
01415
01416 if(timeout_seconds == 0) {
01417 while(stop_count_ == 0) stopper_.wait(sl);
01418 } else {
01419 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01420 }
01421
01422 if(rc == false) {
01423
01424
01425
01426
01427
01428
01429 if(id_set_) pthread_cancel(event_loop_id_);
01430
01431
01432 LogWarning("timeout")
01433 << "An asynchronous request was made to shut down "
01434 << "the event loop "
01435 << "and the event loop did not shutdown after "
01436 << timeout_seconds << " seconds\n";
01437 } else {
01438 event_loop_->join();
01439 event_loop_.reset();
01440 id_set_ = false;
01441 stop_count_ = -1;
01442 }
01443 }
01444 return rc == false ? epTimedOut : last_rc_;
01445 }
01446
01447 EventProcessor::StatusCode
01448 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01449 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01450 if(rc != epTimedOut) changeState(mCountComplete);
01451 else errorState();
01452 return rc;
01453 }
01454
01455
01456 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01457 changeState(mStopAsync);
01458 StatusCode rc = waitForAsyncCompletion(secs);
01459 if(rc != epTimedOut) changeState(mFinished);
01460 else errorState();
01461 return rc;
01462 }
01463
01464 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01465 changeState(mShutdownAsync);
01466 StatusCode rc = waitForAsyncCompletion(secs);
01467 if(rc != epTimedOut) changeState(mFinished);
01468 else errorState();
01469 return rc;
01470 }
01471
01472 void EventProcessor::errorState() {
01473 state_ = sError;
01474 }
01475
01476
01477 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01478
01479
01480
01481 changeState(m);
01482 return waitForAsyncCompletion(60*2);
01483 }
01484
01485 void EventProcessor::changeState(Msg msg) {
01486
01487
01488 boost::mutex::scoped_lock sl(state_lock_);
01489 State curr = state_;
01490 int rc;
01491
01492
01493 for(rc = 0;
01494 table[rc].current != sInvalid &&
01495 (curr != table[rc].current ||
01496 (curr == table[rc].current &&
01497 msg != table[rc].message && table[rc].message != mAny));
01498 ++rc);
01499
01500 if(table[rc].current == sInvalid)
01501 throw cms::Exception("BadState")
01502 << "A member function of EventProcessor has been called in an"
01503 << " inappropriate order.\n"
01504 << "Bad transition from " << stateName(curr) << " "
01505 << "using message " << msgName(msg) << "\n"
01506 << "No where to go from here.\n";
01507
01508 FDEBUG(1) << "changeState: current=" << stateName(curr)
01509 << ", message=" << msgName(msg)
01510 << " -> new=" << stateName(table[rc].final) << "\n";
01511
01512 state_ = table[rc].final;
01513 }
01514
01515 void EventProcessor::runAsync() {
01516 beginJob();
01517 {
01518 boost::mutex::scoped_lock sl(stop_lock_);
01519 if(id_set_ == true) {
01520 std::string err("runAsync called while async event loop already running\n");
01521 LogError("FwkJob") << err;
01522 throw cms::Exception("BadState") << err;
01523 }
01524
01525 changeState(mRunAsync);
01526
01527 stop_count_ = 0;
01528 last_rc_ = epSuccess;
01529 event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01530 boost::xtime timeout;
01531 #if BOOST_VERSION >= 105000
01532 boost::xtime_get(&timeout, boost::TIME_UTC_);
01533 #else
01534 boost::xtime_get(&timeout, boost::TIME_UTC);
01535 #endif
01536 timeout.sec += 60;
01537 if(starter_.timed_wait(sl, timeout) == false) {
01538
01539 throw cms::Exception("BadState")
01540 << "Async run thread did not start in 60 seconds\n";
01541 }
01542 }
01543 }
01544
01545 void EventProcessor::asyncRun(EventProcessor* me) {
01546
01547
01548
01549
01550
01551
01552
01553
01554
01555 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01556
01557 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01558 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01559
01560 {
01561 boost::mutex::scoped_lock sl(me->stop_lock_);
01562 me->event_loop_id_ = pthread_self();
01563 me->id_set_ = true;
01564 me->starter_.notify_all();
01565 }
01566
01567 Status rc = epException;
01568 FDEBUG(2) << "asyncRun starting ......................\n";
01569
01570 try {
01571 bool onlineStateTransitions = true;
01572 rc = me->runToCompletion(onlineStateTransitions);
01573 }
01574 catch (cms::Exception& e) {
01575 LogError("FwkJob") << "cms::Exception caught in "
01576 << "EventProcessor::asyncRun"
01577 << "\n"
01578 << e.explainSelf();
01579 me->last_error_text_ = e.explainSelf();
01580 }
01581 catch (std::exception& e) {
01582 LogError("FwkJob") << "Standard library exception caught in "
01583 << "EventProcessor::asyncRun"
01584 << "\n"
01585 << e.what();
01586 me->last_error_text_ = e.what();
01587 }
01588 catch (...) {
01589 LogError("FwkJob") << "Unknown exception caught in "
01590 << "EventProcessor::asyncRun"
01591 << "\n";
01592 me->last_error_text_ = "Unknown exception caught";
01593 rc = epOther;
01594 }
01595
01596 me->last_rc_ = rc;
01597
01598 {
01599
01600 boost::mutex::scoped_lock sl(me->stop_lock_);
01601 ++me->stop_count_;
01602 me->stopper_.notify_all();
01603 }
01604 FDEBUG(2) << "asyncRun ending ......................\n";
01605 }
01606
01607 std::auto_ptr<statemachine::Machine>
01608 EventProcessor::createStateMachine() {
01609 statemachine::FileMode fileMode;
01610 if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01611 else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01612 else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01613 else {
01614 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01615 << fileMode_ << ".\n"
01616 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01617 }
01618
01619 statemachine::EmptyRunLumiMode emptyRunLumiMode;
01620 if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01621 else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01622 else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01623 else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01624 else {
01625 throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01626 << emptyRunLumiMode_ << ".\n"
01627 << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01628 }
01629
01630 std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
01631 fileMode,
01632 emptyRunLumiMode));
01633
01634 machine->initiate();
01635 return machine;
01636 }
01637
01638
01639 EventProcessor::StatusCode
01640 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01641
01642 StateSentry toerror(this);
01643
01644 StatusCode returnCode=epSuccess;
01645 std::auto_ptr<statemachine::Machine> machine;
01646 {
01647 beginJob();
01648
01649 if(!onlineStateTransitions) changeState(mRunCount);
01650
01651
01652 stateMachineWasInErrorState_ = false;
01653
01654
01655 ServiceRegistry::Operate operate(serviceToken_);
01656
01657 machine = createStateMachine();
01658 try {
01659 try {
01660
01661 InputSource::ItemType itemType;
01662
01663 while(true) {
01664
01665 bool more = true;
01666 if(numberOfForkedChildren_ > 0) {
01667 size_t size = preg_->size();
01668 more = input_->skipForForking();
01669 if(more) {
01670 if(size < preg_->size()) {
01671 principalCache_.adjustIndexesAfterProductRegistryAddition();
01672 }
01673 principalCache_.adjustEventToNewProductRegistry(preg_);
01674 }
01675 }
01676 itemType = (more ? input_->nextItemType() : InputSource::IsStop);
01677
01678 FDEBUG(1) << "itemType = " << itemType << "\n";
01679
01680
01681
01682
01683
01684
01685
01686
01687 if(state_ == sStopping) {
01688 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01689 forceLooperToEnd_ = true;
01690 machine->process_event(statemachine::Stop());
01691 forceLooperToEnd_ = false;
01692 break;
01693 }
01694 else if(state_ == sShuttingDown) {
01695 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01696 forceLooperToEnd_ = true;
01697 machine->process_event(statemachine::Stop());
01698 forceLooperToEnd_ = false;
01699 break;
01700 }
01701
01702
01703 {
01704 boost::mutex::scoped_lock sl(usr2_lock);
01705 if(shutdown_flag) {
01706 changeState(mShutdownSignal);
01707 returnCode = epSignal;
01708 forceLooperToEnd_ = true;
01709 machine->process_event(statemachine::Stop());
01710 forceLooperToEnd_ = false;
01711 break;
01712 }
01713 }
01714
01715 if(itemType == InputSource::IsStop) {
01716 machine->process_event(statemachine::Stop());
01717 }
01718 else if(itemType == InputSource::IsFile) {
01719 machine->process_event(statemachine::File());
01720 }
01721 else if(itemType == InputSource::IsRun) {
01722 machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01723 }
01724 else if(itemType == InputSource::IsLumi) {
01725 machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
01726 }
01727 else if(itemType == InputSource::IsEvent) {
01728 machine->process_event(statemachine::Event());
01729 }
01730
01731 else {
01732 throw Exception(errors::LogicError)
01733 << "Unknown next item type passed to EventProcessor\n"
01734 << "Please report this error to the Framework group\n";
01735 }
01736
01737 if(machine->terminated()) {
01738 changeState(mInputExhausted);
01739 break;
01740 }
01741 }
01742 }
01743 catch (cms::Exception& e) { throw; }
01744 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01745 catch (std::exception& e) { convertException::stdToEDM(e); }
01746 catch(std::string& s) { convertException::stringToEDM(s); }
01747 catch(char const* c) { convertException::charPtrToEDM(c); }
01748 catch (...) { convertException::unknownToEDM(); }
01749 }
01750
01751
01752
01753
01754
01755
01756
01757
01758
01759
01760
01761
01762
01763
01764
01765
01766
01767
01768
01769
01770
01771
01772
01773
01774
01775
01776
01777
01778
01779
01780
01781
01782
01783
01784
01785
01786
01787
01788
01789
01790
01791
01792
01793
01794
01795 catch (cms::Exception & e) {
01796 alreadyHandlingException_ = true;
01797 terminateMachine(machine);
01798 alreadyHandlingException_ = false;
01799 if (!exceptionMessageLumis_.empty()) {
01800 e.addAdditionalInfo(exceptionMessageLumis_);
01801 if (e.alreadyPrinted()) {
01802 LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01803 }
01804 }
01805 if (!exceptionMessageRuns_.empty()) {
01806 e.addAdditionalInfo(exceptionMessageRuns_);
01807 if (e.alreadyPrinted()) {
01808 LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01809 }
01810 }
01811 if (!exceptionMessageFiles_.empty()) {
01812 e.addAdditionalInfo(exceptionMessageFiles_);
01813 if (e.alreadyPrinted()) {
01814 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01815 }
01816 }
01817 throw;
01818 }
01819
01820 if(machine->terminated()) {
01821 FDEBUG(1) << "The state machine reports it has been terminated\n";
01822 machine.reset();
01823 }
01824
01825 if(!onlineStateTransitions) changeState(mFinished);
01826
01827 if(stateMachineWasInErrorState_) {
01828 throw cms::Exception("BadState")
01829 << "The boost state machine in the EventProcessor exited after\n"
01830 << "entering the Error state.\n";
01831 }
01832
01833 }
01834 if(machine.get() != 0) {
01835 terminateMachine(machine);
01836 throw Exception(errors::LogicError)
01837 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01838 << "Please report this error to the Framework group\n";
01839 }
01840
01841 toerror.succeeded();
01842
01843 return returnCode;
01844 }
01845
01846 void EventProcessor::readFile() {
01847 FDEBUG(1) << " \treadFile\n";
01848 size_t size = preg_->size();
01849 fb_ = input_->readFile();
01850 if(size < preg_->size()) {
01851 principalCache_.adjustIndexesAfterProductRegistryAddition();
01852 }
01853 principalCache_.adjustEventToNewProductRegistry(preg_);
01854 if(numberOfForkedChildren_ > 0) {
01855 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01856 }
01857 }
01858
01859 void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
01860 if (fb_.get() != 0) {
01861 input_->closeFile(fb_, cleaningUpAfterException);
01862 }
01863 FDEBUG(1) << "\tcloseInputFile\n";
01864 }
01865
01866 void EventProcessor::openOutputFiles() {
01867 if (fb_.get() != 0) {
01868 schedule_->openOutputFiles(*fb_);
01869 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01870 }
01871 FDEBUG(1) << "\topenOutputFiles\n";
01872 }
01873
01874 void EventProcessor::closeOutputFiles() {
01875 if (fb_.get() != 0) {
01876 schedule_->closeOutputFiles();
01877 if(hasSubProcess()) subProcess_->closeOutputFiles();
01878 }
01879 FDEBUG(1) << "\tcloseOutputFiles\n";
01880 }
01881
01882 void EventProcessor::respondToOpenInputFile() {
01883 if (fb_.get() != 0) {
01884 schedule_->respondToOpenInputFile(*fb_);
01885 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01886 }
01887 FDEBUG(1) << "\trespondToOpenInputFile\n";
01888 }
01889
01890 void EventProcessor::respondToCloseInputFile() {
01891 if (fb_.get() != 0) {
01892 schedule_->respondToCloseInputFile(*fb_);
01893 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01894 }
01895 FDEBUG(1) << "\trespondToCloseInputFile\n";
01896 }
01897
01898 void EventProcessor::respondToOpenOutputFiles() {
01899 if (fb_.get() != 0) {
01900 schedule_->respondToOpenOutputFiles(*fb_);
01901 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01902 }
01903 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01904 }
01905
01906 void EventProcessor::respondToCloseOutputFiles() {
01907 if (fb_.get() != 0) {
01908 schedule_->respondToCloseOutputFiles(*fb_);
01909 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01910 }
01911 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01912 }
01913
01914 void EventProcessor::startingNewLoop() {
01915 shouldWeStop_ = false;
01916
01917
01918 if(looper_ && looperBeginJobRun_) {
01919 looper_->doStartingNewLoop();
01920 }
01921 FDEBUG(1) << "\tstartingNewLoop\n";
01922 }
01923
01924 bool EventProcessor::endOfLoop() {
01925 if(looper_) {
01926 ModuleChanger changer(schedule_.get());
01927 looper_->setModuleChanger(&changer);
01928 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01929 looper_->setModuleChanger(0);
01930 if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01931 else return false;
01932 }
01933 FDEBUG(1) << "\tendOfLoop\n";
01934 return true;
01935 }
01936
01937 void EventProcessor::rewindInput() {
01938 input_->repeat();
01939 input_->rewind();
01940 FDEBUG(1) << "\trewind\n";
01941 }
01942
01943 void EventProcessor::prepareForNextLoop() {
01944 looper_->prepareForNextLoop(esp_.get());
01945 FDEBUG(1) << "\tprepareForNextLoop\n";
01946 }
01947
01948 bool EventProcessor::shouldWeCloseOutput() const {
01949 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01950 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01951 }
01952
01953 void EventProcessor::doErrorStuff() {
01954 FDEBUG(1) << "\tdoErrorStuff\n";
01955 LogError("StateMachine")
01956 << "The EventProcessor state machine encountered an unexpected event\n"
01957 << "and went to the error state\n"
01958 << "Will attempt to terminate processing normally\n"
01959 << "(IF using the looper the next loop will be attempted)\n"
01960 << "This likely indicates a bug in an input module or corrupted input or both\n";
01961 stateMachineWasInErrorState_ = true;
01962 }
01963
01964 void EventProcessor::beginRun(statemachine::Run const& run) {
01965 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01966 input_->doBeginRun(runPrincipal);
01967 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
01968 runPrincipal.beginTime());
01969 if(forceESCacheClearOnNewRun_){
01970 espController_->forceCacheClear();
01971 }
01972 espController_->eventSetupForInstance(ts);
01973 EventSetup const& es = esp_->eventSetup();
01974 if(looper_ && looperBeginJobRun_== false) {
01975 looper_->copyInfo(ScheduleInfo(schedule_.get()));
01976 looper_->beginOfJob(es);
01977 looperBeginJobRun_ = true;
01978 looper_->doStartingNewLoop();
01979 }
01980 {
01981 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
01982 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
01983 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
01984 if(hasSubProcess()) {
01985 subProcess_->doBeginRun(runPrincipal, ts);
01986 }
01987 }
01988 FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
01989 if(looper_) {
01990 looper_->doBeginRun(runPrincipal, es);
01991 }
01992 }
01993
01994 void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
01995 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01996 input_->doEndRun(runPrincipal, cleaningUpAfterException);
01997 IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
01998 runPrincipal.endTime());
01999 espController_->eventSetupForInstance(ts);
02000 EventSetup const& es = esp_->eventSetup();
02001 {
02002 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02003 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02004 schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
02005 if(hasSubProcess()) {
02006 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
02007 }
02008 }
02009 FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02010 if(looper_) {
02011 looper_->doEndRun(runPrincipal, es);
02012 }
02013 }
02014
02015 void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02016 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02017 input_->doBeginLumi(lumiPrincipal);
02018
02019 Service<RandomNumberGenerator> rng;
02020 if(rng.isAvailable()) {
02021 LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02022 rng->preBeginLumi(lb);
02023 }
02024
02025
02026
02027 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02028 espController_->eventSetupForInstance(ts);
02029 EventSetup const& es = esp_->eventSetup();
02030 {
02031 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02032 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02033 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02034 if(hasSubProcess()) {
02035 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02036 }
02037 }
02038 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02039 if(looper_) {
02040 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02041 }
02042 }
02043
02044 void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
02045 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02046 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
02047
02048
02049 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02050 lumiPrincipal.endTime());
02051 espController_->eventSetupForInstance(ts);
02052 EventSetup const& es = esp_->eventSetup();
02053 {
02054 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02055 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02056 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
02057 if(hasSubProcess()) {
02058 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
02059 }
02060 }
02061 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02062 if(looper_) {
02063 looper_->doEndLuminosityBlock(lumiPrincipal, es);
02064 }
02065 }
02066
02067 statemachine::Run EventProcessor::readAndCacheRun() {
02068 if (principalCache_.hasRunPrincipal()) {
02069 throw edm::Exception(edm::errors::LogicError)
02070 << "EventProcessor::readAndCacheRun\n"
02071 << "Illegal attempt to insert run into cache\n"
02072 << "Contact a Framework Developer\n";
02073 }
02074 principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
02075 return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02076 }
02077
02078 statemachine::Run EventProcessor::readAndMergeRun() {
02079 principalCache_.merge(input_->runAuxiliary(), preg_);
02080 input_->readAndMergeRun(principalCache_.runPrincipalPtr());
02081 return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02082 }
02083
02084 int EventProcessor::readAndCacheLumi() {
02085 if (principalCache_.hasLumiPrincipal()) {
02086 throw edm::Exception(edm::errors::LogicError)
02087 << "EventProcessor::readAndCacheRun\n"
02088 << "Illegal attempt to insert lumi into cache\n"
02089 << "Contact a Framework Developer\n";
02090 }
02091 if (!principalCache_.hasRunPrincipal()) {
02092 throw edm::Exception(edm::errors::LogicError)
02093 << "EventProcessor::readAndCacheRun\n"
02094 << "Illegal attempt to insert lumi into cache\n"
02095 << "Run is invalid\n"
02096 << "Contact a Framework Developer\n";
02097 }
02098 principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
02099 principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
02100 return input_->luminosityBlock();
02101 }
02102
02103 int EventProcessor::readAndMergeLumi() {
02104 principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
02105 input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
02106 return input_->luminosityBlock();
02107 }
02108
02109 void EventProcessor::writeRun(statemachine::Run const& run) {
02110 schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02111 if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02112 FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02113 }
02114
02115 void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02116 principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02117 if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02118 FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02119 }
02120
02121 void EventProcessor::writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02122 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02123 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02124 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02125 }
02126
02127 void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
02128 principalCache_.deleteLumi(phid, run, lumi);
02129 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02130 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02131 }
02132
02133 void EventProcessor::readAndProcessEvent() {
02134 EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
02135 FDEBUG(1) << "\treadEvent\n";
02136 assert(pep != 0);
02137 pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
02138 assert(pep->luminosityBlockPrincipalPtrValid());
02139 assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
02140 assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
02141
02142 IOVSyncValue ts(pep->id(), pep->time());
02143 espController_->eventSetupForInstance(ts);
02144 EventSetup const& es = esp_->eventSetup();
02145 {
02146 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02147 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02148 schedule_->processOneOccurrence<Traits>(*pep, es);
02149 if(hasSubProcess()) {
02150 subProcess_->doEvent(*pep, ts);
02151 }
02152 }
02153
02154 if(looper_) {
02155 bool randomAccess = input_->randomAccess();
02156 ProcessingController::ForwardState forwardState = input_->forwardState();
02157 ProcessingController::ReverseState reverseState = input_->reverseState();
02158 ProcessingController pc(forwardState, reverseState, randomAccess);
02159
02160 EDLooperBase::Status status = EDLooperBase::kContinue;
02161 do {
02162 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02163
02164 bool succeeded = true;
02165 if(randomAccess) {
02166 if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02167 input_->skipEvents(-2);
02168 }
02169 else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02170 succeeded = input_->goToEvent(pc.specifiedEventTransition());
02171 }
02172 }
02173 pc.setLastOperationSucceeded(succeeded);
02174 } while(!pc.lastOperationSucceeded());
02175 if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02176
02177 }
02178
02179 FDEBUG(1) << "\tprocessEvent\n";
02180 pep->clearEventPrincipal();
02181 }
02182
02183 bool EventProcessor::shouldWeStop() const {
02184 FDEBUG(1) << "\tshouldWeStop\n";
02185 if(shouldWeStop_) return true;
02186 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02187 }
02188
02189 void EventProcessor::setExceptionMessageFiles(std::string& message) {
02190 exceptionMessageFiles_ = message;
02191 }
02192
02193 void EventProcessor::setExceptionMessageRuns(std::string& message) {
02194 exceptionMessageRuns_ = message;
02195 }
02196
02197 void EventProcessor::setExceptionMessageLumis(std::string& message) {
02198 exceptionMessageLumis_ = message;
02199 }
02200
02201 bool EventProcessor::alreadyHandlingException() const {
02202 return alreadyHandlingException_;
02203 }
02204
02205 void EventProcessor::terminateMachine(std::auto_ptr<statemachine::Machine>& iMachine) {
02206 if(iMachine.get() != 0) {
02207 if(!iMachine->terminated()) {
02208 forceLooperToEnd_ = true;
02209 iMachine->process_event(statemachine::Stop());
02210 forceLooperToEnd_ = false;
02211 }
02212 else {
02213 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
02214 }
02215 if(iMachine->terminated()) {
02216 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02217 }
02218 iMachine.reset();
02219 }
02220 }
02221 }