00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072
00073
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080
00081
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085
00086 namespace edm {
00087
00088 namespace event_processor {
00089
00090 class StateSentry {
00091 public:
00092 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00093 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00094 void succeeded() {success_ = true;}
00095
00096 private:
00097 EventProcessor* ep_;
00098 bool success_;
00099 };
00100 }
00101
00102 using namespace event_processor;
00103
00104 namespace {
00105 template <typename T>
00106 class ScheduleSignalSentry {
00107 public:
00108 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00109 a_(a), principal_(principal), es_(es) {
00110 if (a_) T::preScheduleSignal(a_, principal_);
00111 }
00112 ~ScheduleSignalSentry() {
00113 if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00114 }
00115
00116 private:
00117
00118 ActivityRegistry* a_;
00119 typename T::MyPrincipal* principal_;
00120 EventSetup const* es_;
00121 };
00122 }
00123
00124 namespace {
00125
00126
00127
00128
00129 char const* stateNames[] = {
00130 "Init",
00131 "JobReady",
00132 "RunGiven",
00133 "Running",
00134 "Stopping",
00135 "ShuttingDown",
00136 "Done",
00137 "JobEnded",
00138 "Error",
00139 "ErrorEnded",
00140 "End",
00141 "Invalid"
00142 };
00143
00144 char const* msgNames[] = {
00145 "SetRun",
00146 "Skip",
00147 "RunAsync",
00148 "Run(ID)",
00149 "Run(count)",
00150 "BeginJob",
00151 "StopAsync",
00152 "ShutdownAsync",
00153 "EndJob",
00154 "CountComplete",
00155 "InputExhausted",
00156 "StopSignal",
00157 "ShutdownSignal",
00158 "Finished",
00159 "Any",
00160 "dtor",
00161 "Exception",
00162 "Rewind"
00163 };
00164 }
00165
00166
00167
00168
00169
00170 struct TransEntry {
00171 State current;
00172 Msg message;
00173 State final;
00174 };
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197 TransEntry table[] = {
00198
00199
00200 { sInit, mException, sError },
00201 { sInit, mBeginJob, sJobReady },
00202 { sJobReady, mException, sError },
00203 { sJobReady, mSetRun, sRunGiven },
00204 { sJobReady, mInputRewind, sRunning },
00205 { sJobReady, mSkip, sRunning },
00206 { sJobReady, mRunID, sRunning },
00207 { sJobReady, mRunCount, sRunning },
00208 { sJobReady, mEndJob, sJobEnded },
00209 { sJobReady, mBeginJob, sJobReady },
00210 { sJobReady, mDtor, sEnd },
00211
00212 { sJobReady, mStopAsync, sJobReady },
00213 { sJobReady, mCountComplete, sJobReady },
00214 { sJobReady, mFinished, sJobReady },
00215
00216 { sRunGiven, mException, sError },
00217 { sRunGiven, mRunAsync, sRunning },
00218 { sRunGiven, mBeginJob, sRunGiven },
00219 { sRunGiven, mShutdownAsync, sShuttingDown },
00220 { sRunGiven, mStopAsync, sStopping },
00221 { sRunning, mException, sError },
00222 { sRunning, mStopAsync, sStopping },
00223 { sRunning, mShutdownAsync, sShuttingDown },
00224 { sRunning, mShutdownSignal, sShuttingDown },
00225 { sRunning, mCountComplete, sStopping },
00226 { sRunning, mInputExhausted, sStopping },
00227
00228 { sStopping, mInputRewind, sRunning },
00229 { sStopping, mException, sError },
00230 { sStopping, mFinished, sJobReady },
00231 { sStopping, mCountComplete, sJobReady },
00232 { sStopping, mShutdownSignal, sShuttingDown },
00233 { sStopping, mStopAsync, sStopping },
00234 { sStopping, mInputExhausted, sStopping },
00235
00236 { sShuttingDown, mException, sError },
00237 { sShuttingDown, mShutdownSignal, sShuttingDown },
00238 { sShuttingDown, mCountComplete, sDone },
00239 { sShuttingDown, mInputExhausted, sDone },
00240 { sShuttingDown, mFinished, sDone },
00241
00242
00243
00244 { sDone, mEndJob, sJobEnded },
00245 { sDone, mException, sError },
00246 { sJobEnded, mDtor, sEnd },
00247 { sJobEnded, mException, sError },
00248 { sError, mEndJob, sError },
00249 { sError, mDtor, sError },
00250 { sInit, mDtor, sEnd },
00251 { sStopping, mShutdownAsync, sShuttingDown },
00252 { sInvalid, mAny, sInvalid }
00253 };
00254
00255
00256
00257
00258
00259
00260 boost::shared_ptr<InputSource>
00261 makeInput(ParameterSet& params,
00262 CommonParams const& common,
00263 ProductRegistry& preg,
00264 PrincipalCache& pCache,
00265 boost::shared_ptr<ActivityRegistry> areg,
00266 boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00267 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00268 if(main_input == 0) {
00269 throw Exception(errors::Configuration)
00270 << "There must be exactly one source in the configuration.\n"
00271 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00272 }
00273
00274 std::string modtype(main_input->getParameter<std::string>("@module_type"));
00275
00276 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00277 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00278 ConfigurationDescriptions descriptions(filler->baseType());
00279 filler->fill(descriptions);
00280
00281 try {
00282 try {
00283 descriptions.validate(*main_input, std::string("source"));
00284 }
00285 catch (cms::Exception& e) { throw; }
00286 catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00287 catch (std::exception& e) { convertException::stdToEDM(e); }
00288 catch (std::string& s) { convertException::stringToEDM(s); }
00289 catch (char const* c) { convertException::charPtrToEDM(c); }
00290 catch (...) { convertException::unknownToEDM(); }
00291 }
00292 catch (cms::Exception & iException) {
00293 std::ostringstream ost;
00294 ost << "Validating configuration of input source of type " << modtype;
00295 iException.addContext(ost.str());
00296 throw;
00297 }
00298
00299 main_input->registerIt();
00300
00301
00302
00303
00304
00305
00306
00307 ModuleDescription md(main_input->id(),
00308 main_input->getParameter<std::string>("@module_type"),
00309 "source",
00310 processConfiguration.get());
00311
00312 InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00313 areg->preSourceConstructionSignal_(md);
00314 boost::shared_ptr<InputSource> input;
00315 try {
00316 try {
00317 input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00318 }
00319 catch (cms::Exception& e) { throw; }
00320 catch (std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00321 catch (std::exception& e) { convertException::stdToEDM(e); }
00322 catch (std::string& s) { convertException::stringToEDM(s); }
00323 catch (char const* c) { convertException::charPtrToEDM(c); }
00324 catch (...) { convertException::unknownToEDM(); }
00325 }
00326 catch (cms::Exception& iException) {
00327 areg->postSourceConstructionSignal_(md);
00328 std::ostringstream ost;
00329 ost << "Constructing input source of type " << modtype;
00330 iException.addContext(ost.str());
00331 throw;
00332 }
00333 areg->postSourceConstructionSignal_(md);
00334 return input;
00335 }
00336
00337
00338 boost::shared_ptr<EDLooperBase>
00339 fillLooper(eventsetup::EventSetupsController& esController,
00340 eventsetup::EventSetupProvider& cp,
00341 ParameterSet& params) {
00342 boost::shared_ptr<EDLooperBase> vLooper;
00343
00344 std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00345
00346 if(loopers.size() == 0) {
00347 return vLooper;
00348 }
00349
00350 assert(1 == loopers.size());
00351
00352 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00353 itName != itNameEnd;
00354 ++itName) {
00355
00356 ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00357 providerPSet->registerIt();
00358 vLooper = eventsetup::LooperFactory::get()->addTo(esController,
00359 cp,
00360 *providerPSet);
00361 }
00362 return vLooper;
00363
00364 }
00365
00366
00367 EventProcessor::EventProcessor(std::string const& config,
00368 ServiceToken const& iToken,
00369 serviceregistry::ServiceLegacy iLegacy,
00370 std::vector<std::string> const& defaultServices,
00371 std::vector<std::string> const& forcedServices) :
00372 preProcessEventSignal_(),
00373 postProcessEventSignal_(),
00374 actReg_(),
00375 preg_(),
00376 serviceToken_(),
00377 input_(),
00378 espController_(new eventsetup::EventSetupsController),
00379 esp_(),
00380 act_table_(),
00381 processConfiguration_(),
00382 schedule_(),
00383 subProcess_(),
00384 historyAppender_(new HistoryAppender),
00385 state_(sInit),
00386 event_loop_(),
00387 state_lock_(),
00388 stop_lock_(),
00389 stopper_(),
00390 starter_(),
00391 stop_count_(-1),
00392 last_rc_(epSuccess),
00393 last_error_text_(),
00394 id_set_(false),
00395 event_loop_id_(),
00396 my_sig_num_(getSigNum()),
00397 fb_(),
00398 looper_(),
00399 machine_(),
00400 principalCache_(),
00401 shouldWeStop_(false),
00402 stateMachineWasInErrorState_(false),
00403 fileMode_(),
00404 emptyRunLumiMode_(),
00405 exceptionMessageFiles_(),
00406 exceptionMessageRuns_(),
00407 exceptionMessageLumis_(),
00408 alreadyHandlingException_(false),
00409 forceLooperToEnd_(false),
00410 looperBeginJobRun_(false),
00411 forceESCacheClearOnNewRun_(false),
00412 numberOfForkedChildren_(0),
00413 numberOfSequentialEventsPerChild_(1),
00414 setCpuAffinity_(false),
00415 eventSetupDataToExcludeFromPrefetching_() {
00416 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00417 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00418 processDesc->addServices(defaultServices, forcedServices);
00419 init(processDesc, iToken, iLegacy);
00420 }
00421
00422 EventProcessor::EventProcessor(std::string const& config,
00423 std::vector<std::string> const& defaultServices,
00424 std::vector<std::string> const& forcedServices) :
00425 preProcessEventSignal_(),
00426 postProcessEventSignal_(),
00427 actReg_(),
00428 preg_(),
00429 serviceToken_(),
00430 input_(),
00431 espController_(new eventsetup::EventSetupsController),
00432 esp_(),
00433 act_table_(),
00434 processConfiguration_(),
00435 schedule_(),
00436 subProcess_(),
00437 historyAppender_(new HistoryAppender),
00438 state_(sInit),
00439 event_loop_(),
00440 state_lock_(),
00441 stop_lock_(),
00442 stopper_(),
00443 starter_(),
00444 stop_count_(-1),
00445 last_rc_(epSuccess),
00446 last_error_text_(),
00447 id_set_(false),
00448 event_loop_id_(),
00449 my_sig_num_(getSigNum()),
00450 fb_(),
00451 looper_(),
00452 machine_(),
00453 principalCache_(),
00454 shouldWeStop_(false),
00455 stateMachineWasInErrorState_(false),
00456 fileMode_(),
00457 emptyRunLumiMode_(),
00458 exceptionMessageFiles_(),
00459 exceptionMessageRuns_(),
00460 exceptionMessageLumis_(),
00461 alreadyHandlingException_(false),
00462 forceLooperToEnd_(false),
00463 looperBeginJobRun_(false),
00464 forceESCacheClearOnNewRun_(false),
00465 numberOfForkedChildren_(0),
00466 numberOfSequentialEventsPerChild_(1),
00467 setCpuAffinity_(false),
00468 eventSetupDataToExcludeFromPrefetching_() {
00469 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00470 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00471 processDesc->addServices(defaultServices, forcedServices);
00472 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00473 }
00474
00475 EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00476 ServiceToken const& token,
00477 serviceregistry::ServiceLegacy legacy) :
00478 preProcessEventSignal_(),
00479 postProcessEventSignal_(),
00480 actReg_(),
00481 preg_(),
00482 serviceToken_(),
00483 input_(),
00484 espController_(new eventsetup::EventSetupsController),
00485 esp_(),
00486 act_table_(),
00487 processConfiguration_(),
00488 schedule_(),
00489 subProcess_(),
00490 historyAppender_(new HistoryAppender),
00491 state_(sInit),
00492 event_loop_(),
00493 state_lock_(),
00494 stop_lock_(),
00495 stopper_(),
00496 starter_(),
00497 stop_count_(-1),
00498 last_rc_(epSuccess),
00499 last_error_text_(),
00500 id_set_(false),
00501 event_loop_id_(),
00502 my_sig_num_(getSigNum()),
00503 fb_(),
00504 looper_(),
00505 machine_(),
00506 principalCache_(),
00507 shouldWeStop_(false),
00508 stateMachineWasInErrorState_(false),
00509 fileMode_(),
00510 emptyRunLumiMode_(),
00511 exceptionMessageFiles_(),
00512 exceptionMessageRuns_(),
00513 exceptionMessageLumis_(),
00514 alreadyHandlingException_(false),
00515 forceLooperToEnd_(false),
00516 looperBeginJobRun_(false),
00517 forceESCacheClearOnNewRun_(false),
00518 numberOfForkedChildren_(0),
00519 numberOfSequentialEventsPerChild_(1),
00520 setCpuAffinity_(false),
00521 eventSetupDataToExcludeFromPrefetching_() {
00522 init(processDesc, token, legacy);
00523 }
00524
00525
00526 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00527 preProcessEventSignal_(),
00528 postProcessEventSignal_(),
00529 actReg_(),
00530 preg_(),
00531 serviceToken_(),
00532 input_(),
00533 espController_(new eventsetup::EventSetupsController),
00534 esp_(),
00535 act_table_(),
00536 processConfiguration_(),
00537 schedule_(),
00538 subProcess_(),
00539 historyAppender_(new HistoryAppender),
00540 state_(sInit),
00541 event_loop_(),
00542 state_lock_(),
00543 stop_lock_(),
00544 stopper_(),
00545 starter_(),
00546 stop_count_(-1),
00547 last_rc_(epSuccess),
00548 last_error_text_(),
00549 id_set_(false),
00550 event_loop_id_(),
00551 my_sig_num_(getSigNum()),
00552 fb_(),
00553 looper_(),
00554 machine_(),
00555 principalCache_(),
00556 shouldWeStop_(false),
00557 stateMachineWasInErrorState_(false),
00558 fileMode_(),
00559 emptyRunLumiMode_(),
00560 exceptionMessageFiles_(),
00561 exceptionMessageRuns_(),
00562 exceptionMessageLumis_(),
00563 alreadyHandlingException_(false),
00564 forceLooperToEnd_(false),
00565 looperBeginJobRun_(false),
00566 forceESCacheClearOnNewRun_(false),
00567 numberOfForkedChildren_(0),
00568 numberOfSequentialEventsPerChild_(1),
00569 setCpuAffinity_(false),
00570 eventSetupDataToExcludeFromPrefetching_() {
00571 if(isPython) {
00572 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00573 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00574 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00575 }
00576 else {
00577 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00578 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00579 }
00580 }
00581
00582 void
00583 EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00584 ServiceToken const& iToken,
00585 serviceregistry::ServiceLegacy iLegacy) {
00586
00587
00588
00589
00590 BranchIDListHelper::clearRegistries();
00591
00592 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00593
00594
00595
00596 boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00597
00598
00599 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00600 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00601 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00602 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00603 ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00604 numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00605 numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00606 setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00607 std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00608 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00609 itPS != itPSEnd;
00610 ++itPS) {
00611 eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00612 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00613 itPS->getUntrackedParameter<std::string>("label", "")));
00614 }
00615 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00616
00617
00618 ScheduleItems items;
00619
00620
00621 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00622 ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00623 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00624
00625
00626 ServiceRegistry::Operate operate(serviceToken_);
00627
00628
00629 boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00630
00631
00632 esp_ = espController_->makeProvider(*parameterSet);
00633
00634
00635 looper_ = fillLooper(*espController_, *esp_, *parameterSet);
00636 if(looper_) {
00637 looper_->setActionTable(items.act_table_.get());
00638 looper_->attachTo(*items.actReg_);
00639 }
00640
00641
00642 input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00643
00644
00645 schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00646
00647
00648 act_table_ = items.act_table_;
00649 actReg_ = items.actReg_;
00650 preg_ = items.preg_;
00651 processConfiguration_ = items.processConfiguration_;
00652
00653 FDEBUG(2) << parameterSet << std::endl;
00654 connectSigs(this);
00655
00656
00657 if(subProcessParameterSet) {
00658 subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
00659 }
00660 espController_->clearComponents();
00661 }
00662
00663 EventProcessor::~EventProcessor() {
00664
00665 ServiceToken token = getToken();
00666 ServiceRegistry::Operate op(token);
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677 terminateMachine();
00678
00679 try {
00680 changeState(mDtor);
00681 }
00682 catch(cms::Exception& e) {
00683 LogError("System")
00684 << e.explainSelf() << "\n";
00685 }
00686
00687
00688 espController_.reset();
00689 subProcess_.reset();
00690 esp_.reset();
00691 schedule_.reset();
00692 input_.reset();
00693 looper_.reset();
00694 actReg_.reset();
00695
00696 pset::Registry* psetRegistry = pset::Registry::instance();
00697 psetRegistry->data().clear();
00698 psetRegistry->extra().setID(ParameterSetID());
00699
00700 EntryDescriptionRegistry::instance()->data().clear();
00701 ParentageRegistry::instance()->data().clear();
00702 ProcessConfigurationRegistry::instance()->data().clear();
00703 ProcessHistoryRegistry::instance()->data().clear();
00704 BranchIDListHelper::clearRegistries();
00705 }
00706
00707 void
00708 EventProcessor::rewind() {
00709 beginJob();
00710 changeState(mStopAsync);
00711 changeState(mInputRewind);
00712 {
00713 StateSentry toerror(this);
00714
00715
00716 ServiceRegistry::Operate operate(serviceToken_);
00717
00718 {
00719 input_->repeat();
00720 input_->rewind();
00721 }
00722 changeState(mCountComplete);
00723 toerror.succeeded();
00724 }
00725 changeState(mFinished);
00726 }
00727
00728 EventProcessor::StatusCode
00729 EventProcessor::run(int numberEventsToProcess, bool) {
00730 return runEventCount(numberEventsToProcess);
00731 }
00732
00733 EventProcessor::StatusCode
00734 EventProcessor::skip(int numberToSkip) {
00735 beginJob();
00736 changeState(mSkip);
00737 {
00738 StateSentry toerror(this);
00739
00740
00741 ServiceRegistry::Operate operate(serviceToken_);
00742
00743 {
00744 input_->skipEvents(numberToSkip);
00745 }
00746 changeState(mCountComplete);
00747 toerror.succeeded();
00748 }
00749 changeState(mFinished);
00750 return epSuccess;
00751 }
00752
00753 void
00754 EventProcessor::beginJob() {
00755 if(state_ != sInit) return;
00756 bk::beginJob();
00757
00758 changeState(mBeginJob);
00759
00760
00761
00762 ServiceRegistry::Operate operate(serviceToken_);
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778 try {
00779 try {
00780 input_->doBeginJob();
00781 }
00782 catch (cms::Exception& e) { throw; }
00783 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00784 catch (std::exception& e) { convertException::stdToEDM(e); }
00785 catch(std::string& s) { convertException::stringToEDM(s); }
00786 catch(char const* c) { convertException::charPtrToEDM(c); }
00787 catch (...) { convertException::unknownToEDM(); }
00788 }
00789 catch(cms::Exception& ex) {
00790 ex.addContext("Calling beginJob for the source");
00791 throw;
00792 }
00793 schedule_->beginJob();
00794
00795 if(hasSubProcess()) subProcess_->doBeginJob();
00796 actReg_->postBeginJobSignal_();
00797 }
00798
00799 void
00800 EventProcessor::endJob() {
00801
00802 ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00803
00804
00805 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00806
00807
00808 ServiceRegistry::Operate operate(serviceToken_);
00809
00810 c.call(boost::bind(&EventProcessor::terminateMachine, this));
00811 schedule_->endJob(c);
00812 if(hasSubProcess()) {
00813 c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00814 }
00815 c.call(boost::bind(&InputSource::doEndJob, input_));
00816 if(looper_) {
00817 c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00818 }
00819 c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00820 if(c.hasThrown()) {
00821 c.rethrow();
00822 }
00823 }
00824
00825 ServiceToken
00826 EventProcessor::getToken() {
00827 return serviceToken_;
00828 }
00829
00830
00831 namespace {
00832
00833
00834 volatile bool child_failed = false;
00835 volatile unsigned int num_children_done = 0;
00836 volatile int child_fail_exit_status = 0;
00837 volatile int child_fail_signal = 0;
00838
00839 extern "C" {
00840 void ep_sigchld(int, siginfo_t*, void*) {
00841
00842
00843 int stat_loc;
00844 pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00845 while(0<p) {
00846
00847 if(WIFEXITED(stat_loc)) {
00848 ++num_children_done;
00849 if(0 != WEXITSTATUS(stat_loc)) {
00850 child_fail_exit_status = WEXITSTATUS(stat_loc);
00851 child_failed = true;
00852 }
00853 }
00854 if(WIFSIGNALED(stat_loc)) {
00855 ++num_children_done;
00856 child_fail_signal = WTERMSIG(stat_loc);
00857 child_failed = true;
00858 }
00859 p = waitpid(-1, &stat_loc, WNOHANG);
00860 }
00861 }
00862 }
00863
00864 }
00865
00866 enum {
00867 kChildSucceed,
00868 kChildExitBadly,
00869 kChildSegv,
00870 kMaxChildAction
00871 };
00872
00873 namespace {
00874 unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00875 unsigned int n = 0;
00876 while(numberOfChildren != 0) {
00877 ++n;
00878 numberOfChildren /= 10;
00879 }
00880 if(n == 0) {
00881 n = 3;
00882 }
00883 return n;
00884 }
00885
00886
00887
00888 class MessageSenderToSource {
00889 public:
00890 MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00891 void operator()();
00892
00893 private:
00894 const std::vector<int>& m_childrenPipes;
00895 long const m_nEventsToProcess;
00896 fd_set m_socketSet;
00897 unsigned int m_aliveChildren;
00898 int m_maxFd;
00899 };
00900
00901 MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00902 std::vector<int> const& childrenPipes,
00903 long iNEventsToProcess):
00904 m_childrenPipes(childrenPipes),
00905 m_nEventsToProcess(iNEventsToProcess),
00906 m_aliveChildren(childrenSockets.size()),
00907 m_maxFd(0)
00908 {
00909 FD_ZERO(&m_socketSet);
00910 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00911 it != itEnd; it++) {
00912 FD_SET(*it, &m_socketSet);
00913 if (*it > m_maxFd) {
00914 m_maxFd = *it;
00915 }
00916 }
00917 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00918 it != itEnd; ++it) {
00919 FD_SET(*it, &m_socketSet);
00920 if (*it > m_maxFd) {
00921 m_maxFd = *it;
00922 }
00923 }
00924 m_maxFd++;
00925 }
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937
00938
00939
00940
00941
00942
00943 void
00944 MessageSenderToSource::operator()() {
00945 multicore::MessageForParent childMsg;
00946 LogInfo("ForkingController") << "I am controller";
00947
00948
00949 multicore::MessageForSource sndmsg;
00950 sndmsg.startIndex = 0;
00951 sndmsg.nIndices = m_nEventsToProcess;
00952 do {
00953
00954 fd_set readSockets, errorSockets;
00955
00956 memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00957 memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00958
00959 ssize_t rc;
00960 while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00961 if (rc < 0) {
00962 std::cerr << "select failed; should be impossible due to preconditions.\n";
00963 abort();
00964 break;
00965 }
00966
00967
00968 for (int idx=0; idx<m_maxFd; idx++) {
00969
00970
00971 if (FD_ISSET(idx, &errorSockets)) {
00972 LogInfo("ForkingController") << "Error on socket " << idx;
00973 FD_CLR(idx, &m_socketSet);
00974 close(idx);
00975
00976 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00977 if (*it == idx) {
00978 m_aliveChildren--;
00979 }
00980 }
00981 continue;
00982 }
00983
00984 if (!FD_ISSET(idx, &readSockets)) {
00985 continue;
00986 }
00987
00988
00989
00990 bool is_pipe = false;
00991 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00992 if (*it == idx) {
00993 is_pipe = true;
00994 char buf;
00995 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00996 if (rc <= 0) {
00997 m_aliveChildren--;
00998 FD_CLR(idx, &m_socketSet);
00999 close(idx);
01000 }
01001 }
01002 }
01003
01004
01005 if (!is_pipe) {
01006 while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
01007 if (rc < 0) {
01008 FD_CLR(idx, &m_socketSet);
01009 close(idx);
01010 continue;
01011 }
01012
01013
01014
01015
01016
01017 while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
01018 if (rc < 0) {
01019 FD_CLR(idx, &m_socketSet);
01020 close(idx);
01021 continue;
01022 }
01023
01024 sndmsg.startIndex += sndmsg.nIndices;
01025 }
01026 }
01027
01028 } while (m_aliveChildren > 0);
01029
01030 return;
01031 }
01032
01033 }
01034
01035 bool
01036 EventProcessor::forkProcess(std::string const& jobReportFile) {
01037
01038 if(0 == numberOfForkedChildren_) {return true;}
01039 assert(0<numberOfForkedChildren_);
01040
01041 {
01042 beginJob();
01043
01044 ServiceRegistry::Operate operate(serviceToken_);
01045
01046 InputSource::ItemType itemType;
01047 itemType = input_->nextItemType();
01048
01049 assert(itemType == InputSource::IsFile);
01050 {
01051 readFile();
01052 }
01053 itemType = input_->nextItemType();
01054 assert(itemType == InputSource::IsRun);
01055
01056 LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01057 IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01058 input_->runAuxiliary()->beginTime());
01059 espController_->eventSetupForInstance(ts);
01060 EventSetup const& es = esp_->eventSetup();
01061
01062
01063 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01064 es.fillAvailableRecordKeys(recordKeys);
01065 std::vector<eventsetup::DataKey> dataKeys;
01066 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01067 itKey != itEnd;
01068 ++itKey) {
01069 eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01070
01071 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01072 ExcludedData const* excludedData(0);
01073 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01074 excludedData = &(itExcludeRec->second);
01075 if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01076
01077 continue;
01078 }
01079 }
01080 if(0 != recordPtr) {
01081 dataKeys.clear();
01082 recordPtr->fillRegisteredDataKeys(dataKeys);
01083 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01084 itDataKey != itDataKeyEnd;
01085 ++itDataKey) {
01086
01087 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01088 LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01089 continue;
01090 }
01091 try {
01092 recordPtr->doGet(*itDataKey);
01093 } catch(cms::Exception& e) {
01094 LogWarning("ForkingEventSetupPreFetching") << e.what();
01095 }
01096 }
01097 }
01098 }
01099 }
01100 LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
01101 {
01102
01103 ServiceRegistry::Operate operate(serviceToken_);
01104 Service<JobReport> jobReport;
01105 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01106
01107
01108 actReg_->preForkReleaseResourcesSignal_();
01109 input_->doPreForkReleaseResources();
01110 schedule_->preForkReleaseResources();
01111 }
01112 installCustomHandler(SIGCHLD, ep_sigchld);
01113
01114
01115 unsigned int childIndex = 0;
01116 unsigned int const kMaxChildren = numberOfForkedChildren_;
01117 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01118 std::vector<pid_t> childrenIds;
01119 childrenIds.reserve(kMaxChildren);
01120 std::vector<int> childrenSockets;
01121 childrenSockets.reserve(kMaxChildren);
01122 std::vector<int> childrenPipes;
01123 childrenPipes.reserve(kMaxChildren);
01124 std::vector<int> childrenSocketsCopy;
01125 childrenSocketsCopy.reserve(kMaxChildren);
01126 std::vector<int> childrenPipesCopy;
01127 childrenPipesCopy.reserve(kMaxChildren);
01128 int pipes[2];
01129
01130 {
01131
01132 ServiceRegistry::Operate operate(serviceToken_);
01133 Service<JobReport> jobReport;
01134 int sockets[2], fd_flags;
01135 for(; childIndex < kMaxChildren; ++childIndex) {
01136
01137 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01138 printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01139 exit(EXIT_FAILURE);
01140 }
01141 if (pipe(pipes)) {
01142 printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01143 exit(EXIT_FAILURE);
01144 }
01145
01146 if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01147 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01148 exit(EXIT_FAILURE);
01149 }
01150
01151
01152 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01153 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01154 exit(EXIT_FAILURE);
01155 }
01156 if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01157 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01158 exit(EXIT_FAILURE);
01159 }
01160 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01161 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01162 exit(EXIT_FAILURE);
01163 }
01164
01165
01166 if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01167 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01168 exit(EXIT_FAILURE);
01169 }
01170 if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01171 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01172 exit(EXIT_FAILURE);
01173 }
01174
01175 childrenPipesCopy = childrenPipes;
01176 childrenSocketsCopy = childrenSockets;
01177
01178 pid_t value = fork();
01179 if(value == 0) {
01180
01181 close(pipes[0]);
01182 close(sockets[0]);
01183
01184 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01185 close(*it);
01186 }
01187 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01188 close(*it);
01189 }
01190
01191
01192 fflush(stdout);
01193 fflush(stderr);
01194 std::stringstream stout;
01195 stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01196 if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01197 LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01198 }
01199 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01200 LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01201 }
01202
01203 LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01204 if(setCpuAffinity_) {
01205
01206
01207
01208
01209
01210
01211 #ifdef __APPLE__
01212 LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01213 #else
01214 LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01215 cpu_set_t mask;
01216 CPU_ZERO(&mask);
01217 CPU_SET(childIndex, &mask);
01218 if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01219 LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01220 exit(-1);
01221 }
01222 #endif
01223 }
01224 break;
01225 } else {
01226
01227 close(pipes[1]);
01228 close(sockets[1]);
01229 }
01230 if(value < 0) {
01231 LogError("ForkingChild") << "failed to create a child";
01232 exit(-1);
01233 }
01234 childrenIds.push_back(value);
01235 childrenSockets.push_back(sockets[0]);
01236 childrenPipes.push_back(pipes[0]);
01237 }
01238
01239 if(childIndex < kMaxChildren) {
01240 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01241 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01242
01243 boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01244 input_->doPostForkReacquireResources(receiver);
01245 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01246
01247
01248 return true;
01249 }
01250 jobReport->parentAfterFork(jobReportFile);
01251 }
01252
01253
01254
01255
01256
01257
01258
01259
01260 sigset_t blockingSigSet;
01261 sigset_t unblockingSigSet;
01262 sigset_t oldSigSet;
01263 pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01264 pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01265 sigaddset(&blockingSigSet, SIGCHLD);
01266 sigaddset(&blockingSigSet, SIGUSR2);
01267 sigaddset(&blockingSigSet, SIGINT);
01268 sigdelset(&unblockingSigSet, SIGCHLD);
01269 sigdelset(&unblockingSigSet, SIGUSR2);
01270 sigdelset(&unblockingSigSet, SIGINT);
01271 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01272
01273
01274
01275 bool too_many_fds = false;
01276 if (pipes[1]+1 > FD_SETSIZE) {
01277 LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01278 too_many_fds = true;
01279 }
01280
01281
01282
01283
01284 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01285 boost::thread senderThread(sender);
01286
01287 while(!too_many_fds && !shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01288 sigsuspend(&unblockingSigSet);
01289 LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01290 }
01291 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01292
01293 LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01294 if(child_failed) {
01295 LogError("ForkingStopping") << "child failed";
01296 }
01297 if(shutdown_flag) {
01298 LogSystem("ForkingStopping") << "asked to shutdown";
01299 }
01300
01301 if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01302 LogInfo("ForkingStopping") << "must stop children" << std::endl;
01303 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01304 it != itEnd; ++it) {
01305 kill(*it, SIGUSR2);
01306 }
01307 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01308 while(num_children_done != kMaxChildren) {
01309 sigsuspend(&unblockingSigSet);
01310 }
01311 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01312 }
01313
01314 senderThread.join();
01315 if(child_failed) {
01316 if (child_fail_signal) {
01317 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01318 } else if (child_fail_exit_status) {
01319 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01320 } else {
01321 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01322 }
01323 }
01324 if(too_many_fds) {
01325 throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01326 }
01327 return false;
01328 }
01329
01330 void
01331 EventProcessor::connectSigs(EventProcessor* ep) {
01332
01333
01334
01335 actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01336 actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01337 }
01338
01339 std::vector<ModuleDescription const*>
01340 EventProcessor::getAllModuleDescriptions() const {
01341 return schedule_->getAllModuleDescriptions();
01342 }
01343
01344 int
01345 EventProcessor::totalEvents() const {
01346 return schedule_->totalEvents();
01347 }
01348
01349 int
01350 EventProcessor::totalEventsPassed() const {
01351 return schedule_->totalEventsPassed();
01352 }
01353
01354 int
01355 EventProcessor::totalEventsFailed() const {
01356 return schedule_->totalEventsFailed();
01357 }
01358
01359 void
01360 EventProcessor::enableEndPaths(bool active) {
01361 schedule_->enableEndPaths(active);
01362 }
01363
01364 bool
01365 EventProcessor::endPathsEnabled() const {
01366 return schedule_->endPathsEnabled();
01367 }
01368
01369 void
01370 EventProcessor::getTriggerReport(TriggerReport& rep) const {
01371 schedule_->getTriggerReport(rep);
01372 }
01373
01374 void
01375 EventProcessor::clearCounters() {
01376 schedule_->clearCounters();
01377 }
01378
01379 char const* EventProcessor::currentStateName() const {
01380 return stateName(getState());
01381 }
01382
01383 char const* EventProcessor::stateName(State s) const {
01384 return stateNames[s];
01385 }
01386
01387 char const* EventProcessor::msgName(Msg m) const {
01388 return msgNames[m];
01389 }
01390
01391 State EventProcessor::getState() const {
01392 return state_;
01393 }
01394
01395 EventProcessor::StatusCode EventProcessor::statusAsync() const {
01396
01397
01398 return last_rc_;
01399 }
01400
01401 void
01402 EventProcessor::setRunNumber(RunNumber_t runNumber) {
01403 if(runNumber == 0) {
01404 runNumber = 1;
01405 LogWarning("Invalid Run")
01406 << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01407 << "Run number was set to 1 instead\n";
01408 }
01409
01410
01411 beginJob();
01412 changeState(mSetRun);
01413
01414
01415 input_->setRunNumber(runNumber);
01416 }
01417
01418 void
01419 EventProcessor::declareRunNumber(RunNumber_t ) {
01420
01421 beginJob();
01422 changeState(mSetRun);
01423
01424
01425
01426 }
01427
01428 EventProcessor::StatusCode
01429 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01430 bool rc = true;
01431 boost::xtime timeout;
01432 boost::xtime_get(&timeout, boost::TIME_UTC);
01433 timeout.sec += timeout_seconds;
01434
01435
01436
01437
01438 {
01439 boost::mutex::scoped_lock sl(stop_lock_);
01440
01441
01442 if(stop_count_ < 0) return last_rc_;
01443
01444 if(timeout_seconds == 0) {
01445 while(stop_count_ == 0) stopper_.wait(sl);
01446 } else {
01447 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01448 }
01449
01450 if(rc == false) {
01451
01452
01453
01454
01455
01456
01457 if(id_set_) pthread_cancel(event_loop_id_);
01458
01459
01460 LogWarning("timeout")
01461 << "An asynchronous request was made to shut down "
01462 << "the event loop "
01463 << "and the event loop did not shutdown after "
01464 << timeout_seconds << " seconds\n";
01465 } else {
01466 event_loop_->join();
01467 event_loop_.reset();
01468 id_set_ = false;
01469 stop_count_ = -1;
01470 }
01471 }
01472 return rc == false ? epTimedOut : last_rc_;
01473 }
01474
01475 EventProcessor::StatusCode
01476 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01477 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01478 if(rc != epTimedOut) changeState(mCountComplete);
01479 else errorState();
01480 return rc;
01481 }
01482
01483
01484 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01485 changeState(mStopAsync);
01486 StatusCode rc = waitForAsyncCompletion(secs);
01487 if(rc != epTimedOut) changeState(mFinished);
01488 else errorState();
01489 return rc;
01490 }
01491
01492 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01493 changeState(mShutdownAsync);
01494 StatusCode rc = waitForAsyncCompletion(secs);
01495 if(rc != epTimedOut) changeState(mFinished);
01496 else errorState();
01497 return rc;
01498 }
01499
01500 void EventProcessor::errorState() {
01501 state_ = sError;
01502 }
01503
01504
01505 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01506
01507
01508
01509 changeState(m);
01510 return waitForAsyncCompletion(60*2);
01511 }
01512
01513 void EventProcessor::changeState(Msg msg) {
01514
01515
01516 boost::mutex::scoped_lock sl(state_lock_);
01517 State curr = state_;
01518 int rc;
01519
01520
01521 for(rc = 0;
01522 table[rc].current != sInvalid &&
01523 (curr != table[rc].current ||
01524 (curr == table[rc].current &&
01525 msg != table[rc].message && table[rc].message != mAny));
01526 ++rc);
01527
01528 if(table[rc].current == sInvalid)
01529 throw cms::Exception("BadState")
01530 << "A member function of EventProcessor has been called in an"
01531 << " inappropriate order.\n"
01532 << "Bad transition from " << stateName(curr) << " "
01533 << "using message " << msgName(msg) << "\n"
01534 << "No where to go from here.\n";
01535
01536 FDEBUG(1) << "changeState: current=" << stateName(curr)
01537 << ", message=" << msgName(msg)
01538 << " -> new=" << stateName(table[rc].final) << "\n";
01539
01540 state_ = table[rc].final;
01541 }
01542
01543 void EventProcessor::runAsync() {
01544 beginJob();
01545 {
01546 boost::mutex::scoped_lock sl(stop_lock_);
01547 if(id_set_ == true) {
01548 std::string err("runAsync called while async event loop already running\n");
01549 LogError("FwkJob") << err;
01550 throw cms::Exception("BadState") << err;
01551 }
01552
01553 changeState(mRunAsync);
01554
01555 stop_count_ = 0;
01556 last_rc_ = epSuccess;
01557 event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01558 boost::xtime timeout;
01559 boost::xtime_get(&timeout, boost::TIME_UTC);
01560 timeout.sec += 60;
01561 if(starter_.timed_wait(sl, timeout) == false) {
01562
01563 throw cms::Exception("BadState")
01564 << "Async run thread did not start in 60 seconds\n";
01565 }
01566 }
01567 }
01568
01569 void EventProcessor::asyncRun(EventProcessor* me) {
01570
01571
01572
01573
01574
01575
01576
01577
01578
01579 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01580
01581 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01582 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01583
01584 {
01585 boost::mutex::scoped_lock(me->stop_lock_);
01586 me->event_loop_id_ = pthread_self();
01587 me->id_set_ = true;
01588 me->starter_.notify_all();
01589 }
01590
01591 Status rc = epException;
01592 FDEBUG(2) << "asyncRun starting ......................\n";
01593
01594 try {
01595 bool onlineStateTransitions = true;
01596 rc = me->runToCompletion(onlineStateTransitions);
01597 }
01598 catch (cms::Exception& e) {
01599 LogError("FwkJob") << "cms::Exception caught in "
01600 << "EventProcessor::asyncRun"
01601 << "\n"
01602 << e.explainSelf();
01603 me->last_error_text_ = e.explainSelf();
01604 }
01605 catch (std::exception& e) {
01606 LogError("FwkJob") << "Standard library exception caught in "
01607 << "EventProcessor::asyncRun"
01608 << "\n"
01609 << e.what();
01610 me->last_error_text_ = e.what();
01611 }
01612 catch (...) {
01613 LogError("FwkJob") << "Unknown exception caught in "
01614 << "EventProcessor::asyncRun"
01615 << "\n";
01616 me->last_error_text_ = "Unknown exception caught";
01617 rc = epOther;
01618 }
01619
01620 me->last_rc_ = rc;
01621
01622 {
01623
01624 boost::mutex::scoped_lock sl(me->stop_lock_);
01625 ++me->stop_count_;
01626 me->stopper_.notify_all();
01627 }
01628 FDEBUG(2) << "asyncRun ending ......................\n";
01629 }
01630
01631
01632 EventProcessor::StatusCode
01633 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01634
01635 StateSentry toerror(this);
01636
01637 int numberOfEventsToProcess = -1;
01638 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01639
01640 if(machine_.get() != 0) {
01641 throw Exception(errors::LogicError)
01642 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01643 << "Please report this error to the Framework group\n";
01644 }
01645
01646 toerror.succeeded();
01647
01648 return returnCode;
01649 }
01650
01651 EventProcessor::StatusCode
01652 EventProcessor::runEventCount(int numberOfEventsToProcess) {
01653
01654 StateSentry toerror(this);
01655
01656 bool onlineStateTransitions = false;
01657 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01658
01659 toerror.succeeded();
01660
01661 return returnCode;
01662 }
01663
01664 EventProcessor::StatusCode
01665 EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01666
01667
01668 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_, historyAppender_.get()));
01669 principalCache_.insert(ep);
01670
01671 beginJob();
01672
01673 if(!onlineStateTransitions) changeState(mRunCount);
01674
01675 StatusCode returnCode = epSuccess;
01676 stateMachineWasInErrorState_ = false;
01677
01678
01679 ServiceRegistry::Operate operate(serviceToken_);
01680
01681 if(machine_.get() == 0) {
01682
01683 statemachine::FileMode fileMode;
01684 if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01685 else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01686 else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01687 else {
01688 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01689 << fileMode_ << ".\n"
01690 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01691 }
01692
01693 statemachine::EmptyRunLumiMode emptyRunLumiMode;
01694 if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01695 else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01696 else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01697 else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01698 else {
01699 throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01700 << emptyRunLumiMode_ << ".\n"
01701 << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01702 }
01703
01704 machine_.reset(new statemachine::Machine(this,
01705 fileMode,
01706 emptyRunLumiMode));
01707
01708 machine_->initiate();
01709 }
01710
01711 try {
01712 try {
01713
01714 InputSource::ItemType itemType;
01715
01716 int iEvents = 0;
01717
01718 while(true) {
01719
01720 itemType = input_->nextItemType();
01721
01722 FDEBUG(1) << "itemType = " << itemType << "\n";
01723
01724
01725
01726
01727
01728
01729
01730
01731 if(state_ == sStopping) {
01732 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01733 forceLooperToEnd_ = true;
01734 machine_->process_event(statemachine::Stop());
01735 forceLooperToEnd_ = false;
01736 break;
01737 }
01738 else if(state_ == sShuttingDown) {
01739 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01740 forceLooperToEnd_ = true;
01741 machine_->process_event(statemachine::Stop());
01742 forceLooperToEnd_ = false;
01743 break;
01744 }
01745
01746
01747 {
01748 boost::mutex::scoped_lock sl(usr2_lock);
01749 if(shutdown_flag) {
01750 changeState(mShutdownSignal);
01751 returnCode = epSignal;
01752 forceLooperToEnd_ = true;
01753 machine_->process_event(statemachine::Stop());
01754 forceLooperToEnd_ = false;
01755 break;
01756 }
01757 }
01758
01759 if(itemType == InputSource::IsStop) {
01760 machine_->process_event(statemachine::Stop());
01761 }
01762 else if(itemType == InputSource::IsFile) {
01763 machine_->process_event(statemachine::File());
01764 }
01765 else if(itemType == InputSource::IsRun) {
01766 machine_->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01767 }
01768 else if(itemType == InputSource::IsLumi) {
01769 machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01770 }
01771 else if(itemType == InputSource::IsEvent) {
01772 machine_->process_event(statemachine::Event());
01773 ++iEvents;
01774 if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01775 returnCode = epCountComplete;
01776 changeState(mInputExhausted);
01777 FDEBUG(1) << "Event count complete, pausing event loop\n";
01778 break;
01779 }
01780 }
01781
01782 else {
01783 throw Exception(errors::LogicError)
01784 << "Unknown next item type passed to EventProcessor\n"
01785 << "Please report this error to the Framework group\n";
01786 }
01787
01788 if(machine_->terminated()) {
01789 changeState(mInputExhausted);
01790 break;
01791 }
01792 }
01793 }
01794 catch (cms::Exception& e) { throw; }
01795 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01796 catch (std::exception& e) { convertException::stdToEDM(e); }
01797 catch(std::string& s) { convertException::stringToEDM(s); }
01798 catch(char const* c) { convertException::charPtrToEDM(c); }
01799 catch (...) { convertException::unknownToEDM(); }
01800 }
01801
01802
01803
01804
01805
01806
01807
01808
01809
01810
01811
01812
01813
01814
01815
01816
01817
01818
01819
01820
01821
01822
01823
01824
01825
01826
01827
01828
01829
01830
01831
01832
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846 catch (cms::Exception & e) {
01847 alreadyHandlingException_ = true;
01848 terminateMachine();
01849 alreadyHandlingException_ = false;
01850 if (!exceptionMessageLumis_.empty()) {
01851 e.addAdditionalInfo(exceptionMessageLumis_);
01852 if (e.alreadyPrinted()) {
01853 LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01854 }
01855 }
01856 if (!exceptionMessageRuns_.empty()) {
01857 e.addAdditionalInfo(exceptionMessageRuns_);
01858 if (e.alreadyPrinted()) {
01859 LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01860 }
01861 }
01862 if (!exceptionMessageFiles_.empty()) {
01863 e.addAdditionalInfo(exceptionMessageFiles_);
01864 if (e.alreadyPrinted()) {
01865 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01866 }
01867 }
01868 throw;
01869 }
01870
01871 if(machine_->terminated()) {
01872 FDEBUG(1) << "The state machine reports it has been terminated\n";
01873 machine_.reset();
01874 }
01875
01876 if(!onlineStateTransitions) changeState(mFinished);
01877
01878 if(stateMachineWasInErrorState_) {
01879 throw cms::Exception("BadState")
01880 << "The boost state machine in the EventProcessor exited after\n"
01881 << "entering the Error state.\n";
01882 }
01883
01884 return returnCode;
01885 }
01886
01887 void EventProcessor::readFile() {
01888 FDEBUG(1) << " \treadFile\n";
01889 fb_ = input_->readFile();
01890 if(numberOfForkedChildren_ > 0) {
01891 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01892 }
01893 }
01894
01895 void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
01896 if (fb_.get() != 0) {
01897 input_->closeFile(fb_, cleaningUpAfterException);
01898 }
01899 FDEBUG(1) << "\tcloseInputFile\n";
01900 }
01901
01902 void EventProcessor::openOutputFiles() {
01903 if (fb_.get() != 0) {
01904 schedule_->openOutputFiles(*fb_);
01905 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01906 }
01907 FDEBUG(1) << "\topenOutputFiles\n";
01908 }
01909
01910 void EventProcessor::closeOutputFiles() {
01911 if (fb_.get() != 0) {
01912 schedule_->closeOutputFiles();
01913 if(hasSubProcess()) subProcess_->closeOutputFiles();
01914 }
01915 FDEBUG(1) << "\tcloseOutputFiles\n";
01916 }
01917
01918 void EventProcessor::respondToOpenInputFile() {
01919 if (fb_.get() != 0) {
01920 schedule_->respondToOpenInputFile(*fb_);
01921 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01922 }
01923 FDEBUG(1) << "\trespondToOpenInputFile\n";
01924 }
01925
01926 void EventProcessor::respondToCloseInputFile() {
01927 if (fb_.get() != 0) {
01928 schedule_->respondToCloseInputFile(*fb_);
01929 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01930 }
01931 FDEBUG(1) << "\trespondToCloseInputFile\n";
01932 }
01933
01934 void EventProcessor::respondToOpenOutputFiles() {
01935 if (fb_.get() != 0) {
01936 schedule_->respondToOpenOutputFiles(*fb_);
01937 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01938 }
01939 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01940 }
01941
01942 void EventProcessor::respondToCloseOutputFiles() {
01943 if (fb_.get() != 0) {
01944 schedule_->respondToCloseOutputFiles(*fb_);
01945 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01946 }
01947 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01948 }
01949
01950 void EventProcessor::startingNewLoop() {
01951 shouldWeStop_ = false;
01952
01953
01954 if(looper_ && looperBeginJobRun_) {
01955 looper_->doStartingNewLoop();
01956 }
01957 FDEBUG(1) << "\tstartingNewLoop\n";
01958 }
01959
01960 bool EventProcessor::endOfLoop() {
01961 if(looper_) {
01962 ModuleChanger changer(schedule_.get());
01963 looper_->setModuleChanger(&changer);
01964 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01965 looper_->setModuleChanger(0);
01966 if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01967 else return false;
01968 }
01969 FDEBUG(1) << "\tendOfLoop\n";
01970 return true;
01971 }
01972
01973 void EventProcessor::rewindInput() {
01974 input_->repeat();
01975 input_->rewind();
01976 FDEBUG(1) << "\trewind\n";
01977 }
01978
01979 void EventProcessor::prepareForNextLoop() {
01980 looper_->prepareForNextLoop(esp_.get());
01981 FDEBUG(1) << "\tprepareForNextLoop\n";
01982 }
01983
01984 bool EventProcessor::shouldWeCloseOutput() const {
01985 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01986 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01987 }
01988
01989 void EventProcessor::doErrorStuff() {
01990 FDEBUG(1) << "\tdoErrorStuff\n";
01991 LogError("StateMachine")
01992 << "The EventProcessor state machine encountered an unexpected event\n"
01993 << "and went to the error state\n"
01994 << "Will attempt to terminate processing normally\n"
01995 << "(IF using the looper the next loop will be attempted)\n"
01996 << "This likely indicates a bug in an input module or corrupted input or both\n";
01997 stateMachineWasInErrorState_ = true;
01998 }
01999
02000 void EventProcessor::beginRun(statemachine::Run const& run) {
02001 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02002 input_->doBeginRun(runPrincipal);
02003 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
02004 runPrincipal.beginTime());
02005 if(forceESCacheClearOnNewRun_){
02006 espController_->forceCacheClear();
02007 }
02008 espController_->eventSetupForInstance(ts);
02009 EventSetup const& es = esp_->eventSetup();
02010 if(looper_ && looperBeginJobRun_== false) {
02011 looper_->copyInfo(ScheduleInfo(schedule_.get()));
02012 looper_->beginOfJob(es);
02013 looperBeginJobRun_ = true;
02014 looper_->doStartingNewLoop();
02015 }
02016 {
02017 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
02018 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02019 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
02020 if(hasSubProcess()) {
02021 subProcess_->doBeginRun(runPrincipal, ts);
02022 }
02023 }
02024 FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
02025 if(looper_) {
02026 looper_->doBeginRun(runPrincipal, es);
02027 }
02028 }
02029
02030 void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
02031 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02032 input_->doEndRun(runPrincipal, cleaningUpAfterException);
02033 IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
02034 runPrincipal.endTime());
02035 espController_->eventSetupForInstance(ts);
02036 EventSetup const& es = esp_->eventSetup();
02037 {
02038 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02039 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02040 schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
02041 if(hasSubProcess()) {
02042 subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
02043 }
02044 }
02045 FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02046 if(looper_) {
02047 looper_->doEndRun(runPrincipal, es);
02048 }
02049 }
02050
02051 void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
02052 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02053 input_->doBeginLumi(lumiPrincipal);
02054
02055 Service<RandomNumberGenerator> rng;
02056 if(rng.isAvailable()) {
02057 LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02058 rng->preBeginLumi(lb);
02059 }
02060
02061
02062
02063 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02064 espController_->eventSetupForInstance(ts);
02065 EventSetup const& es = esp_->eventSetup();
02066 {
02067 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02068 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02069 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02070 if(hasSubProcess()) {
02071 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02072 }
02073 }
02074 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02075 if(looper_) {
02076 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02077 }
02078 }
02079
02080 void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi, bool cleaningUpAfterException) {
02081 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02082 input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
02083
02084
02085 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02086 lumiPrincipal.endTime());
02087 espController_->eventSetupForInstance(ts);
02088 EventSetup const& es = esp_->eventSetup();
02089 {
02090 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02091 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02092 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
02093 if(hasSubProcess()) {
02094 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
02095 }
02096 }
02097 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02098 if(looper_) {
02099 looper_->doEndLuminosityBlock(lumiPrincipal, es);
02100 }
02101 }
02102
02103 statemachine::Run EventProcessor::readAndCacheRun(bool merge) {
02104 input_->readAndCacheRun(merge, *historyAppender_);
02105 input_->markRun();
02106 return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02107 }
02108
02109 int EventProcessor::readAndCacheLumi(bool merge) {
02110 input_->readAndCacheLumi(merge, *historyAppender_);
02111 input_->markLumi();
02112 return input_->luminosityBlock();
02113 }
02114
02115 void EventProcessor::writeRun(statemachine::Run const& run) {
02116 schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02117 if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02118 FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02119 }
02120
02121 void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02122 principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02123 if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02124 FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02125 }
02126
02127 void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
02128 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02129 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02130 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02131 }
02132
02133 void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
02134 principalCache_.deleteLumi(phid, run, lumi);
02135 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02136 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02137 }
02138
02139 void EventProcessor::readAndProcessEvent() {
02140 EventPrincipal *pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
02141 FDEBUG(1) << "\treadEvent\n";
02142 assert(pep != 0);
02143
02144 IOVSyncValue ts(pep->id(), pep->time());
02145 espController_->eventSetupForInstance(ts);
02146 EventSetup const& es = esp_->eventSetup();
02147 {
02148 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02149 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02150 schedule_->processOneOccurrence<Traits>(*pep, es);
02151 if(hasSubProcess()) {
02152 subProcess_->doEvent(*pep, ts);
02153 }
02154 }
02155
02156 if(looper_) {
02157 bool randomAccess = input_->randomAccess();
02158 ProcessingController::ForwardState forwardState = input_->forwardState();
02159 ProcessingController::ReverseState reverseState = input_->reverseState();
02160 ProcessingController pc(forwardState, reverseState, randomAccess);
02161
02162 EDLooperBase::Status status = EDLooperBase::kContinue;
02163 do {
02164 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02165
02166 bool succeeded = true;
02167 if(randomAccess) {
02168 if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02169 input_->skipEvents(-2);
02170 }
02171 else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02172 succeeded = input_->goToEvent(pc.specifiedEventTransition());
02173 }
02174 }
02175 pc.setLastOperationSucceeded(succeeded);
02176 } while(!pc.lastOperationSucceeded());
02177 if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02178
02179 }
02180
02181 FDEBUG(1) << "\tprocessEvent\n";
02182 pep->clearEventPrincipal();
02183 }
02184
02185 bool EventProcessor::shouldWeStop() const {
02186 FDEBUG(1) << "\tshouldWeStop\n";
02187 if(shouldWeStop_) return true;
02188 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02189 }
02190
02191 void EventProcessor::setExceptionMessageFiles(std::string& message) {
02192 exceptionMessageFiles_ = message;
02193 }
02194
02195 void EventProcessor::setExceptionMessageRuns(std::string& message) {
02196 exceptionMessageRuns_ = message;
02197 }
02198
02199 void EventProcessor::setExceptionMessageLumis(std::string& message) {
02200 exceptionMessageLumis_ = message;
02201 }
02202
02203 bool EventProcessor::alreadyHandlingException() const {
02204 return alreadyHandlingException_;
02205 }
02206
02207 void EventProcessor::terminateMachine() {
02208 if(machine_.get() != 0) {
02209 if(!machine_->terminated()) {
02210 forceLooperToEnd_ = true;
02211 machine_->process_event(statemachine::Stop());
02212 forceLooperToEnd_ = false;
02213 }
02214 else {
02215 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
02216 }
02217 if(machine_->terminated()) {
02218 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02219 }
02220 machine_.reset();
02221 }
02222 }
02223 }