00001
00002 #include "FWCore/Framework/interface/EventProcessor.h"
00003
00004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00005 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00006 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00007 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00008 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00009 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00010 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00011
00012 #include "FWCore/Framework/interface/CommonParams.h"
00013 #include "FWCore/Framework/interface/EDLooperBase.h"
00014 #include "FWCore/Framework/interface/EventPrincipal.h"
00015 #include "FWCore/Framework/interface/EventSetupProvider.h"
00016 #include "FWCore/Framework/interface/EventSetupRecord.h"
00017 #include "FWCore/Framework/interface/FileBlock.h"
00018 #include "FWCore/Framework/interface/HistoryAppender.h"
00019 #include "FWCore/Framework/interface/InputSourceDescription.h"
00020 #include "FWCore/Framework/interface/IOVSyncValue.h"
00021 #include "FWCore/Framework/interface/LooperFactory.h"
00022 #include "FWCore/Framework/interface/LuminosityBlock.h"
00023 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00024 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00025 #include "FWCore/Framework/interface/ModuleChanger.h"
00026 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00027 #include "FWCore/Framework/interface/ProcessingController.h"
00028 #include "FWCore/Framework/interface/RunPrincipal.h"
00029 #include "FWCore/Framework/interface/Schedule.h"
00030 #include "FWCore/Framework/interface/ScheduleInfo.h"
00031 #include "FWCore/Framework/interface/SubProcess.h"
00032 #include "FWCore/Framework/src/Breakpoints.h"
00033 #include "FWCore/Framework/src/EPStates.h"
00034 #include "FWCore/Framework/src/EventSetupsController.h"
00035 #include "FWCore/Framework/src/InputSourceFactory.h"
00036
00037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00038
00039 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00040 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00041 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
00042 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
00043 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
00044 #include "FWCore/ParameterSet/interface/Registry.h"
00045 #include "FWCore/PythonParameterSet/interface/PythonProcessDesc.h"
00046
00047 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00048 #include "FWCore/ServiceRegistry/interface/Service.h"
00049
00050 #include "FWCore/Utilities/interface/DebugMacros.h"
00051 #include "FWCore/Utilities/interface/EDMException.h"
00052 #include "FWCore/Utilities/interface/Exception.h"
00053 #include "FWCore/Utilities/interface/ConvertException.h"
00054 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00055 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00056 #include "FWCore/Utilities/interface/ExceptionCollector.h"
00057
00058 #include "MessageForSource.h"
00059 #include "MessageForParent.h"
00060
00061 #include "boost/bind.hpp"
00062 #include "boost/thread/xtime.hpp"
00063
00064 #include <exception>
00065 #include <iomanip>
00066 #include <iostream>
00067 #include <utility>
00068 #include <sstream>
00069
00070 #include <sys/ipc.h>
00071 #include <sys/msg.h>
00072
00073
00074 #include <sys/types.h>
00075 #include <sys/wait.h>
00076 #include <sys/socket.h>
00077 #include <sys/select.h>
00078 #include <sys/fcntl.h>
00079 #include <unistd.h>
00080
00081
00082 #ifndef __APPLE__
00083 #include <sched.h>
00084 #endif
00085
00086 namespace edm {
00087
00088 namespace event_processor {
00089
00090 class StateSentry {
00091 public:
00092 StateSentry(EventProcessor* ep) : ep_(ep), success_(false) { }
00093 ~StateSentry() {if(!success_) ep_->changeState(mException);}
00094 void succeeded() {success_ = true;}
00095
00096 private:
00097 EventProcessor* ep_;
00098 bool success_;
00099 };
00100 }
00101
00102 using namespace event_processor;
00103
00104 namespace {
00105 template <typename T>
00106 class ScheduleSignalSentry {
00107 public:
00108 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es) :
00109 a_(a), principal_(principal), es_(es) {
00110 if (a_) T::preScheduleSignal(a_, principal_);
00111 }
00112 ~ScheduleSignalSentry() {
00113 if (a_) if (principal_) T::postScheduleSignal(a_, principal_, es_);
00114 }
00115
00116 private:
00117
00118 ActivityRegistry* a_;
00119 typename T::MyPrincipal* principal_;
00120 EventSetup const* es_;
00121 };
00122 }
00123
00124 namespace {
00125
00126
00127
00128
00129 char const* stateNames[] = {
00130 "Init",
00131 "JobReady",
00132 "RunGiven",
00133 "Running",
00134 "Stopping",
00135 "ShuttingDown",
00136 "Done",
00137 "JobEnded",
00138 "Error",
00139 "ErrorEnded",
00140 "End",
00141 "Invalid"
00142 };
00143
00144 char const* msgNames[] = {
00145 "SetRun",
00146 "Skip",
00147 "RunAsync",
00148 "Run(ID)",
00149 "Run(count)",
00150 "BeginJob",
00151 "StopAsync",
00152 "ShutdownAsync",
00153 "EndJob",
00154 "CountComplete",
00155 "InputExhausted",
00156 "StopSignal",
00157 "ShutdownSignal",
00158 "Finished",
00159 "Any",
00160 "dtor",
00161 "Exception",
00162 "Rewind"
00163 };
00164 }
00165
00166
00167
00168
00169
00170 struct TransEntry {
00171 State current;
00172 Msg message;
00173 State final;
00174 };
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197 TransEntry table[] = {
00198
00199
00200 { sInit, mException, sError },
00201 { sInit, mBeginJob, sJobReady },
00202 { sJobReady, mException, sError },
00203 { sJobReady, mSetRun, sRunGiven },
00204 { sJobReady, mInputRewind, sRunning },
00205 { sJobReady, mSkip, sRunning },
00206 { sJobReady, mRunID, sRunning },
00207 { sJobReady, mRunCount, sRunning },
00208 { sJobReady, mEndJob, sJobEnded },
00209 { sJobReady, mBeginJob, sJobReady },
00210 { sJobReady, mDtor, sEnd },
00211
00212 { sJobReady, mStopAsync, sJobReady },
00213 { sJobReady, mCountComplete, sJobReady },
00214 { sJobReady, mFinished, sJobReady },
00215
00216 { sRunGiven, mException, sError },
00217 { sRunGiven, mRunAsync, sRunning },
00218 { sRunGiven, mBeginJob, sRunGiven },
00219 { sRunGiven, mShutdownAsync, sShuttingDown },
00220 { sRunGiven, mStopAsync, sStopping },
00221 { sRunning, mException, sError },
00222 { sRunning, mStopAsync, sStopping },
00223 { sRunning, mShutdownAsync, sShuttingDown },
00224 { sRunning, mShutdownSignal, sShuttingDown },
00225 { sRunning, mCountComplete, sStopping },
00226 { sRunning, mInputExhausted, sStopping },
00227
00228 { sStopping, mInputRewind, sRunning },
00229 { sStopping, mException, sError },
00230 { sStopping, mFinished, sJobReady },
00231 { sStopping, mCountComplete, sJobReady },
00232 { sStopping, mShutdownSignal, sShuttingDown },
00233 { sStopping, mStopAsync, sStopping },
00234 { sStopping, mInputExhausted, sStopping },
00235
00236 { sShuttingDown, mException, sError },
00237 { sShuttingDown, mShutdownSignal, sShuttingDown },
00238 { sShuttingDown, mCountComplete, sDone },
00239 { sShuttingDown, mInputExhausted, sDone },
00240 { sShuttingDown, mFinished, sDone },
00241
00242
00243
00244 { sDone, mEndJob, sJobEnded },
00245 { sDone, mException, sError },
00246 { sJobEnded, mDtor, sEnd },
00247 { sJobEnded, mException, sError },
00248 { sError, mEndJob, sError },
00249 { sError, mDtor, sError },
00250 { sInit, mDtor, sEnd },
00251 { sStopping, mShutdownAsync, sShuttingDown },
00252 { sInvalid, mAny, sInvalid }
00253 };
00254
00255
00256
00257
00258
00259
00260 boost::shared_ptr<InputSource>
00261 makeInput(ParameterSet& params,
00262 CommonParams const& common,
00263 ProductRegistry& preg,
00264 PrincipalCache& pCache,
00265 boost::shared_ptr<ActivityRegistry> areg,
00266 boost::shared_ptr<ProcessConfiguration> processConfiguration) {
00267 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
00268 if(main_input == 0) {
00269 throw Exception(errors::Configuration)
00270 << "There must be exactly one source in the configuration.\n"
00271 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
00272 }
00273
00274 std::string modtype(main_input->getParameter<std::string>("@module_type"));
00275
00276 std::auto_ptr<ParameterSetDescriptionFillerBase> filler(
00277 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
00278 ConfigurationDescriptions descriptions(filler->baseType());
00279 filler->fill(descriptions);
00280
00281 try {
00282 try {
00283 descriptions.validate(*main_input, std::string("source"));
00284 }
00285 catch (cms::Exception& e) { throw; }
00286 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00287 catch (std::exception& e) { convertException::stdToEDM(e); }
00288 catch(std::string& s) { convertException::stringToEDM(s); }
00289 catch(char const* c) { convertException::charPtrToEDM(c); }
00290 catch (...) { convertException::unknownToEDM(); }
00291 }
00292 catch (cms::Exception & iException) {
00293 std::ostringstream ost;
00294 ost << "Validating configuration of input source of type " << modtype;
00295 iException.addContext(ost.str());
00296 throw;
00297 }
00298
00299 main_input->registerIt();
00300
00301
00302
00303
00304
00305
00306
00307 ModuleDescription md(main_input->id(),
00308 main_input->getParameter<std::string>("@module_type"),
00309 "source",
00310 processConfiguration.get());
00311
00312 InputSourceDescription isdesc(md, preg, pCache, areg, common.maxEventsInput_, common.maxLumisInput_);
00313 areg->preSourceConstructionSignal_(md);
00314 boost::shared_ptr<InputSource> input;
00315 try {
00316 try {
00317 input = boost::shared_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
00318 }
00319 catch (cms::Exception& e) { throw; }
00320 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00321 catch (std::exception& e) { convertException::stdToEDM(e); }
00322 catch(std::string& s) { convertException::stringToEDM(s); }
00323 catch(char const* c) { convertException::charPtrToEDM(c); }
00324 catch (...) { convertException::unknownToEDM(); }
00325 }
00326 catch (cms::Exception& iException) {
00327 areg->postSourceConstructionSignal_(md);
00328 std::ostringstream ost;
00329 ost << "Constructing input source of type " << modtype;
00330 iException.addContext(ost.str());
00331 throw;
00332 }
00333 areg->postSourceConstructionSignal_(md);
00334 return input;
00335 }
00336
00337
00338 boost::shared_ptr<EDLooperBase>
00339 fillLooper(eventsetup::EventSetupProvider& cp,
00340 ParameterSet& params,
00341 CommonParams const& common) {
00342 boost::shared_ptr<EDLooperBase> vLooper;
00343
00344 std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
00345
00346 if(loopers.size() == 0) {
00347 return vLooper;
00348 }
00349
00350 assert(1 == loopers.size());
00351
00352 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
00353 itName != itNameEnd;
00354 ++itName) {
00355
00356 ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
00357 providerPSet->registerIt();
00358 vLooper = eventsetup::LooperFactory::get()->addTo(cp,
00359 *providerPSet,
00360 common.processName_,
00361 common.releaseVersion_,
00362 common.passID_);
00363 }
00364 return vLooper;
00365
00366 }
00367
00368
00369 EventProcessor::EventProcessor(std::string const& config,
00370 ServiceToken const& iToken,
00371 serviceregistry::ServiceLegacy iLegacy,
00372 std::vector<std::string> const& defaultServices,
00373 std::vector<std::string> const& forcedServices) :
00374 preProcessEventSignal_(),
00375 postProcessEventSignal_(),
00376 actReg_(),
00377 preg_(),
00378 serviceToken_(),
00379 input_(),
00380 esp_(),
00381 act_table_(),
00382 processConfiguration_(),
00383 schedule_(),
00384 subProcess_(),
00385 historyAppender_(new HistoryAppender),
00386 state_(sInit),
00387 event_loop_(),
00388 state_lock_(),
00389 stop_lock_(),
00390 stopper_(),
00391 starter_(),
00392 stop_count_(-1),
00393 last_rc_(epSuccess),
00394 last_error_text_(),
00395 id_set_(false),
00396 event_loop_id_(),
00397 my_sig_num_(getSigNum()),
00398 fb_(),
00399 looper_(),
00400 machine_(),
00401 principalCache_(),
00402 shouldWeStop_(false),
00403 stateMachineWasInErrorState_(false),
00404 fileMode_(),
00405 emptyRunLumiMode_(),
00406 exceptionMessageFiles_(),
00407 exceptionMessageRuns_(),
00408 exceptionMessageLumis_(),
00409 alreadyHandlingException_(false),
00410 forceLooperToEnd_(false),
00411 looperBeginJobRun_(false),
00412 forceESCacheClearOnNewRun_(false),
00413 numberOfForkedChildren_(0),
00414 numberOfSequentialEventsPerChild_(1),
00415 setCpuAffinity_(false),
00416 eventSetupDataToExcludeFromPrefetching_() {
00417 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00418 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00419 processDesc->addServices(defaultServices, forcedServices);
00420 init(processDesc, iToken, iLegacy);
00421 }
00422
00423 EventProcessor::EventProcessor(std::string const& config,
00424 std::vector<std::string> const& defaultServices,
00425 std::vector<std::string> const& forcedServices) :
00426 preProcessEventSignal_(),
00427 postProcessEventSignal_(),
00428 actReg_(),
00429 preg_(),
00430 serviceToken_(),
00431 input_(),
00432 esp_(),
00433 act_table_(),
00434 processConfiguration_(),
00435 schedule_(),
00436 subProcess_(),
00437 historyAppender_(new HistoryAppender),
00438 state_(sInit),
00439 event_loop_(),
00440 state_lock_(),
00441 stop_lock_(),
00442 stopper_(),
00443 starter_(),
00444 stop_count_(-1),
00445 last_rc_(epSuccess),
00446 last_error_text_(),
00447 id_set_(false),
00448 event_loop_id_(),
00449 my_sig_num_(getSigNum()),
00450 fb_(),
00451 looper_(),
00452 machine_(),
00453 principalCache_(),
00454 shouldWeStop_(false),
00455 stateMachineWasInErrorState_(false),
00456 fileMode_(),
00457 emptyRunLumiMode_(),
00458 exceptionMessageFiles_(),
00459 exceptionMessageRuns_(),
00460 exceptionMessageLumis_(),
00461 alreadyHandlingException_(false),
00462 forceLooperToEnd_(false),
00463 looperBeginJobRun_(false),
00464 forceESCacheClearOnNewRun_(false),
00465 numberOfForkedChildren_(0),
00466 numberOfSequentialEventsPerChild_(1),
00467 setCpuAffinity_(false),
00468 eventSetupDataToExcludeFromPrefetching_() {
00469 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00470 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00471 processDesc->addServices(defaultServices, forcedServices);
00472 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00473 }
00474
00475 EventProcessor::EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
00476 ServiceToken const& token,
00477 serviceregistry::ServiceLegacy legacy) :
00478 preProcessEventSignal_(),
00479 postProcessEventSignal_(),
00480 actReg_(),
00481 preg_(),
00482 serviceToken_(),
00483 input_(),
00484 esp_(),
00485 act_table_(),
00486 processConfiguration_(),
00487 schedule_(),
00488 subProcess_(),
00489 historyAppender_(new HistoryAppender),
00490 state_(sInit),
00491 event_loop_(),
00492 state_lock_(),
00493 stop_lock_(),
00494 stopper_(),
00495 starter_(),
00496 stop_count_(-1),
00497 last_rc_(epSuccess),
00498 last_error_text_(),
00499 id_set_(false),
00500 event_loop_id_(),
00501 my_sig_num_(getSigNum()),
00502 fb_(),
00503 looper_(),
00504 machine_(),
00505 principalCache_(),
00506 shouldWeStop_(false),
00507 stateMachineWasInErrorState_(false),
00508 fileMode_(),
00509 emptyRunLumiMode_(),
00510 exceptionMessageFiles_(),
00511 exceptionMessageRuns_(),
00512 exceptionMessageLumis_(),
00513 alreadyHandlingException_(false),
00514 forceLooperToEnd_(false),
00515 looperBeginJobRun_(false),
00516 forceESCacheClearOnNewRun_(false),
00517 numberOfForkedChildren_(0),
00518 numberOfSequentialEventsPerChild_(1),
00519 setCpuAffinity_(false),
00520 eventSetupDataToExcludeFromPrefetching_() {
00521 init(processDesc, token, legacy);
00522 }
00523
00524
00525 EventProcessor::EventProcessor(std::string const& config, bool isPython):
00526 preProcessEventSignal_(),
00527 postProcessEventSignal_(),
00528 actReg_(),
00529 preg_(),
00530 serviceToken_(),
00531 input_(),
00532 esp_(),
00533 act_table_(),
00534 processConfiguration_(),
00535 schedule_(),
00536 subProcess_(),
00537 historyAppender_(new HistoryAppender),
00538 state_(sInit),
00539 event_loop_(),
00540 state_lock_(),
00541 stop_lock_(),
00542 stopper_(),
00543 starter_(),
00544 stop_count_(-1),
00545 last_rc_(epSuccess),
00546 last_error_text_(),
00547 id_set_(false),
00548 event_loop_id_(),
00549 my_sig_num_(getSigNum()),
00550 fb_(),
00551 looper_(),
00552 machine_(),
00553 principalCache_(),
00554 shouldWeStop_(false),
00555 stateMachineWasInErrorState_(false),
00556 fileMode_(),
00557 emptyRunLumiMode_(),
00558 exceptionMessageFiles_(),
00559 exceptionMessageRuns_(),
00560 exceptionMessageLumis_(),
00561 alreadyHandlingException_(false),
00562 forceLooperToEnd_(false),
00563 looperBeginJobRun_(false),
00564 forceESCacheClearOnNewRun_(false),
00565 numberOfForkedChildren_(0),
00566 numberOfSequentialEventsPerChild_(1),
00567 setCpuAffinity_(false),
00568 eventSetupDataToExcludeFromPrefetching_() {
00569 if(isPython) {
00570 boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
00571 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
00572 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00573 }
00574 else {
00575 boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
00576 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
00577 }
00578 }
00579
00580 void
00581 EventProcessor::init(boost::shared_ptr<ProcessDesc>& processDesc,
00582 ServiceToken const& iToken,
00583 serviceregistry::ServiceLegacy iLegacy) {
00584
00585
00586
00587
00588 BranchIDListHelper::clearRegistries();
00589
00590 boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
00591
00592
00593
00594 boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
00595
00596
00597 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
00598 fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
00599 emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
00600 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
00601 ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
00602 numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
00603 numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
00604 setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
00605 std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
00606 for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
00607 itPS != itPSEnd;
00608 ++itPS) {
00609 eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
00610 std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
00611 itPS->getUntrackedParameter<std::string>("label", "")));
00612 }
00613 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00614
00615 std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
00616
00617
00618 ScheduleItems items;
00619
00620
00621 boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
00622 ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
00623 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
00624
00625
00626 ServiceRegistry::Operate operate(serviceToken_);
00627
00628
00629 boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
00630
00631
00632 esp_ = espController->makeProvider(*parameterSet, *common);
00633
00634
00635 looper_ = fillLooper(*esp_, *parameterSet, *common);
00636 if(looper_) {
00637 looper_->setActionTable(items.act_table_.get());
00638 looper_->attachTo(*items.actReg_);
00639 }
00640
00641
00642 input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
00643
00644
00645 schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
00646
00647
00648 act_table_ = items.act_table_;
00649 actReg_ = items.actReg_;
00650 preg_ = items.preg_;
00651 processConfiguration_ = items.processConfiguration_;
00652
00653 FDEBUG(2) << parameterSet << std::endl;
00654 connectSigs(this);
00655
00656
00657 if(subProcessParameterSet) {
00658 subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController, *actReg_, token, serviceregistry::kConfigurationOverrides));
00659 }
00660 }
00661
00662 EventProcessor::~EventProcessor() {
00663
00664 ServiceToken token = getToken();
00665 ServiceRegistry::Operate op(token);
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676 terminateMachine();
00677
00678 try {
00679 changeState(mDtor);
00680 }
00681 catch(cms::Exception& e) {
00682 LogError("System")
00683 << e.explainSelf() << "\n";
00684 }
00685
00686
00687 subProcess_.reset();
00688 esp_.reset();
00689 schedule_.reset();
00690 input_.reset();
00691 looper_.reset();
00692 actReg_.reset();
00693
00694 pset::Registry* psetRegistry = pset::Registry::instance();
00695 psetRegistry->data().clear();
00696 psetRegistry->extra().setID(ParameterSetID());
00697
00698 EntryDescriptionRegistry::instance()->data().clear();
00699 ParentageRegistry::instance()->data().clear();
00700 ProcessConfigurationRegistry::instance()->data().clear();
00701 ProcessHistoryRegistry::instance()->data().clear();
00702 BranchIDListHelper::clearRegistries();
00703 }
00704
00705 void
00706 EventProcessor::rewind() {
00707 beginJob();
00708 changeState(mStopAsync);
00709 changeState(mInputRewind);
00710 {
00711 StateSentry toerror(this);
00712
00713
00714 ServiceRegistry::Operate operate(serviceToken_);
00715
00716 {
00717 input_->repeat();
00718 input_->rewind();
00719 }
00720 changeState(mCountComplete);
00721 toerror.succeeded();
00722 }
00723 changeState(mFinished);
00724 }
00725
00726 EventProcessor::StatusCode
00727 EventProcessor::run(int numberEventsToProcess, bool) {
00728 return runEventCount(numberEventsToProcess);
00729 }
00730
00731 EventProcessor::StatusCode
00732 EventProcessor::skip(int numberToSkip) {
00733 beginJob();
00734 changeState(mSkip);
00735 {
00736 StateSentry toerror(this);
00737
00738
00739 ServiceRegistry::Operate operate(serviceToken_);
00740
00741 {
00742 input_->skipEvents(numberToSkip);
00743 }
00744 changeState(mCountComplete);
00745 toerror.succeeded();
00746 }
00747 changeState(mFinished);
00748 return epSuccess;
00749 }
00750
00751 void
00752 EventProcessor::beginJob() {
00753 if(state_ != sInit) return;
00754 bk::beginJob();
00755
00756 changeState(mBeginJob);
00757
00758
00759
00760 ServiceRegistry::Operate operate(serviceToken_);
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776 try {
00777 try {
00778 input_->doBeginJob();
00779 }
00780 catch (cms::Exception& e) { throw; }
00781 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00782 catch (std::exception& e) { convertException::stdToEDM(e); }
00783 catch(std::string& s) { convertException::stringToEDM(s); }
00784 catch(char const* c) { convertException::charPtrToEDM(c); }
00785 catch (...) { convertException::unknownToEDM(); }
00786 }
00787 catch(cms::Exception& ex) {
00788 ex.addContext("Calling beginJob for the source");
00789 throw;
00790 }
00791 schedule_->beginJob();
00792
00793 if(hasSubProcess()) subProcess_->doBeginJob();
00794 actReg_->postBeginJobSignal_();
00795 }
00796
00797 void
00798 EventProcessor::endJob() {
00799
00800 ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
00801
00802
00803 c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
00804
00805
00806 ServiceRegistry::Operate operate(serviceToken_);
00807
00808 c.call(boost::bind(&EventProcessor::terminateMachine, this));
00809 schedule_->endJob(c);
00810 if(hasSubProcess()) {
00811 c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
00812 }
00813 c.call(boost::bind(&InputSource::doEndJob, input_));
00814 if(looper_) {
00815 c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
00816 }
00817 c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
00818 if(c.hasThrown()) {
00819 c.rethrow();
00820 }
00821 }
00822
00823 ServiceToken
00824 EventProcessor::getToken() {
00825 return serviceToken_;
00826 }
00827
00828
00829 namespace {
00830
00831
00832 volatile bool child_failed = false;
00833 volatile unsigned int num_children_done = 0;
00834 volatile int child_fail_exit_status = 0;
00835 volatile int child_fail_signal = 0;
00836
00837 extern "C" {
00838 void ep_sigchld(int, siginfo_t*, void*) {
00839
00840
00841 int stat_loc;
00842 pid_t p = waitpid(-1, &stat_loc, WNOHANG);
00843 while(0<p) {
00844
00845 if(WIFEXITED(stat_loc)) {
00846 ++num_children_done;
00847 if(0 != WEXITSTATUS(stat_loc)) {
00848 child_fail_exit_status = WEXITSTATUS(stat_loc);
00849 child_failed = true;
00850 }
00851 }
00852 if(WIFSIGNALED(stat_loc)) {
00853 ++num_children_done;
00854 child_fail_signal = WTERMSIG(stat_loc);
00855 child_failed = true;
00856 }
00857 p = waitpid(-1, &stat_loc, WNOHANG);
00858 }
00859 }
00860 }
00861
00862 }
00863
00864 enum {
00865 kChildSucceed,
00866 kChildExitBadly,
00867 kChildSegv,
00868 kMaxChildAction
00869 };
00870
00871 namespace {
00872 unsigned int numberOfDigitsInChildIndex(unsigned int numberOfChildren) {
00873 unsigned int n = 0;
00874 while(numberOfChildren != 0) {
00875 ++n;
00876 numberOfChildren /= 10;
00877 }
00878 if(n == 0) {
00879 n = 3;
00880 }
00881 return n;
00882 }
00883
00884
00885
00886 class MessageSenderToSource {
00887 public:
00888 MessageSenderToSource(std::vector<int> const& childrenSockets, std::vector<int> const& childrenPipes, long iNEventsToProcess);
00889 void operator()();
00890
00891 private:
00892 const std::vector<int>& m_childrenPipes;
00893 long const m_nEventsToProcess;
00894 fd_set m_socketSet;
00895 unsigned int m_aliveChildren;
00896 int m_maxFd;
00897 };
00898
00899 MessageSenderToSource::MessageSenderToSource(std::vector<int> const& childrenSockets,
00900 std::vector<int> const& childrenPipes,
00901 long iNEventsToProcess):
00902 m_childrenPipes(childrenPipes),
00903 m_nEventsToProcess(iNEventsToProcess),
00904 m_aliveChildren(childrenSockets.size()),
00905 m_maxFd(0)
00906 {
00907 FD_ZERO(&m_socketSet);
00908 for (std::vector<int>::const_iterator it = childrenSockets.begin(), itEnd = childrenSockets.end();
00909 it != itEnd; it++) {
00910 FD_SET(*it, &m_socketSet);
00911 if (*it > m_maxFd) {
00912 m_maxFd = *it;
00913 }
00914 }
00915 for (std::vector<int>::const_iterator it = childrenPipes.begin(), itEnd = childrenPipes.end();
00916 it != itEnd; ++it) {
00917 FD_SET(*it, &m_socketSet);
00918 if (*it > m_maxFd) {
00919 m_maxFd = *it;
00920 }
00921 }
00922 m_maxFd++;
00923 }
00924
00925
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937
00938
00939
00940
00941 void
00942 MessageSenderToSource::operator()() {
00943 multicore::MessageForParent childMsg;
00944 LogInfo("ForkingController") << "I am controller";
00945
00946
00947 multicore::MessageForSource sndmsg;
00948 sndmsg.startIndex = 0;
00949 sndmsg.nIndices = m_nEventsToProcess;
00950 do {
00951
00952 fd_set readSockets, errorSockets;
00953
00954 memcpy(&readSockets, &m_socketSet, sizeof(m_socketSet));
00955 memcpy(&errorSockets, &m_socketSet, sizeof(m_socketSet));
00956
00957 ssize_t rc;
00958 while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, NULL)) < 0) && (errno == EINTR)) {}
00959 if (rc < 0) {
00960 std::cerr << "select failed; should be impossible due to preconditions.\n";
00961 abort();
00962 break;
00963 }
00964
00965
00966 for (int idx=0; idx<m_maxFd; idx++) {
00967
00968
00969 if (FD_ISSET(idx, &errorSockets)) {
00970 LogInfo("ForkingController") << "Error on socket " << idx;
00971 FD_CLR(idx, &m_socketSet);
00972 close(idx);
00973
00974 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(); it != m_childrenPipes.end(); it++) {
00975 if (*it == idx) {
00976 m_aliveChildren--;
00977 }
00978 }
00979 continue;
00980 }
00981
00982 if (!FD_ISSET(idx, &readSockets)) {
00983 continue;
00984 }
00985
00986
00987
00988 bool is_pipe = false;
00989 for (std::vector<int>::const_iterator it = m_childrenPipes.begin(), itEnd = m_childrenPipes.end(); it != itEnd; it++) {
00990 if (*it == idx) {
00991 is_pipe = true;
00992 char buf;
00993 while (((rc = read(idx, &buf, 1)) < 0) && (errno == EINTR)) {}
00994 if (rc <= 0) {
00995 m_aliveChildren--;
00996 FD_CLR(idx, &m_socketSet);
00997 close(idx);
00998 }
00999 }
01000 }
01001
01002
01003 if (!is_pipe) {
01004 while (((rc = recv(idx, reinterpret_cast<char*>(&childMsg),childMsg.sizeForBuffer() , 0)) < 0) && (errno == EINTR)) {}
01005 if (rc < 0) {
01006 FD_CLR(idx, &m_socketSet);
01007 close(idx);
01008 continue;
01009 }
01010
01011
01012
01013
01014
01015 while (((rc = send(idx, (char *)(&sndmsg), multicore::MessageForSource::sizeForBuffer(), 0)) < 0) && (errno == EINTR)) {}
01016 if (rc < 0) {
01017 FD_CLR(idx, &m_socketSet);
01018 close(idx);
01019 continue;
01020 }
01021
01022 sndmsg.startIndex += sndmsg.nIndices;
01023 }
01024 }
01025
01026 } while (m_aliveChildren > 0);
01027
01028 return;
01029 }
01030
01031 }
01032
01033 bool
01034 EventProcessor::forkProcess(std::string const& jobReportFile) {
01035
01036 if(0 == numberOfForkedChildren_) {return true;}
01037 assert(0<numberOfForkedChildren_);
01038
01039 {
01040 beginJob();
01041
01042 ServiceRegistry::Operate operate(serviceToken_);
01043
01044 InputSource::ItemType itemType;
01045 itemType = input_->nextItemType();
01046
01047 assert(itemType == InputSource::IsFile);
01048 {
01049 readFile();
01050 }
01051 itemType = input_->nextItemType();
01052 assert(itemType == InputSource::IsRun);
01053
01054 LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
01055 IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
01056 input_->runAuxiliary()->beginTime());
01057 EventSetup const& es = esp_->eventSetupForInstance(ts);
01058
01059
01060 std::vector<eventsetup::EventSetupRecordKey> recordKeys;
01061 es.fillAvailableRecordKeys(recordKeys);
01062 std::vector<eventsetup::DataKey> dataKeys;
01063 for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
01064 itKey != itEnd;
01065 ++itKey) {
01066 eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
01067
01068 ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
01069 ExcludedData const* excludedData(0);
01070 if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
01071 excludedData = &(itExcludeRec->second);
01072 if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
01073
01074 continue;
01075 }
01076 }
01077 if(0 != recordPtr) {
01078 dataKeys.clear();
01079 recordPtr->fillRegisteredDataKeys(dataKeys);
01080 for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
01081 itDataKey != itDataKeyEnd;
01082 ++itDataKey) {
01083
01084 if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
01085 LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
01086 continue;
01087 }
01088 try {
01089 recordPtr->doGet(*itDataKey);
01090 } catch(cms::Exception& e) {
01091 LogWarning("ForkingEventSetupPreFetching") << e.what();
01092 }
01093 }
01094 }
01095 }
01096 }
01097 LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
01098 {
01099
01100 ServiceRegistry::Operate operate(serviceToken_);
01101 Service<JobReport> jobReport;
01102 jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
01103
01104
01105 actReg_->preForkReleaseResourcesSignal_();
01106 input_->doPreForkReleaseResources();
01107 schedule_->preForkReleaseResources();
01108 }
01109 installCustomHandler(SIGCHLD, ep_sigchld);
01110
01111
01112 unsigned int childIndex = 0;
01113 unsigned int const kMaxChildren = numberOfForkedChildren_;
01114 unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
01115 std::vector<pid_t> childrenIds;
01116 childrenIds.reserve(kMaxChildren);
01117 std::vector<int> childrenSockets;
01118 childrenSockets.reserve(kMaxChildren);
01119 std::vector<int> childrenPipes;
01120 childrenPipes.reserve(kMaxChildren);
01121 std::vector<int> childrenSocketsCopy;
01122 childrenSocketsCopy.reserve(kMaxChildren);
01123 std::vector<int> childrenPipesCopy;
01124 childrenPipesCopy.reserve(kMaxChildren);
01125 int pipes[2];
01126
01127 {
01128
01129 ServiceRegistry::Operate operate(serviceToken_);
01130 Service<JobReport> jobReport;
01131 int sockets[2], fd_flags;
01132 for(; childIndex < kMaxChildren; ++childIndex) {
01133
01134 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
01135 printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
01136 exit(EXIT_FAILURE);
01137 }
01138 if (pipe(pipes)) {
01139 printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
01140 exit(EXIT_FAILURE);
01141 }
01142
01143 if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
01144 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01145 exit(EXIT_FAILURE);
01146 }
01147
01148
01149 if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
01150 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01151 exit(EXIT_FAILURE);
01152 }
01153 if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
01154 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01155 exit(EXIT_FAILURE);
01156 }
01157 if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
01158 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01159 exit(EXIT_FAILURE);
01160 }
01161
01162
01163 if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
01164 printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
01165 exit(EXIT_FAILURE);
01166 }
01167 if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
01168 printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
01169 exit(EXIT_FAILURE);
01170 }
01171
01172 childrenPipesCopy = childrenPipes;
01173 childrenSocketsCopy = childrenSockets;
01174
01175 pid_t value = fork();
01176 if(value == 0) {
01177
01178 close(pipes[0]);
01179 close(sockets[0]);
01180
01181 for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
01182 close(*it);
01183 }
01184 for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
01185 close(*it);
01186 }
01187
01188
01189 fflush(stdout);
01190 fflush(stderr);
01191 std::stringstream stout;
01192 stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
01193 if(0 == freopen(stout.str().c_str(), "w", stdout)) {
01194 LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
01195 }
01196 if(dup2(fileno(stdout), fileno(stderr)) < 0) {
01197 LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
01198 }
01199
01200 LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
01201 if(setCpuAffinity_) {
01202
01203
01204
01205
01206
01207
01208 #ifdef __APPLE__
01209 LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
01210 #else
01211 LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
01212 cpu_set_t mask;
01213 CPU_ZERO(&mask);
01214 CPU_SET(childIndex, &mask);
01215 if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
01216 LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
01217 exit(-1);
01218 }
01219 #endif
01220 }
01221 break;
01222 } else {
01223
01224 close(pipes[1]);
01225 close(sockets[1]);
01226 }
01227 if(value < 0) {
01228 LogError("ForkingChild") << "failed to create a child";
01229 exit(-1);
01230 }
01231 childrenIds.push_back(value);
01232 childrenSockets.push_back(sockets[0]);
01233 childrenPipes.push_back(pipes[0]);
01234 }
01235
01236 if(childIndex < kMaxChildren) {
01237 jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
01238 actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
01239
01240 boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
01241 input_->doPostForkReacquireResources(receiver);
01242 schedule_->postForkReacquireResources(childIndex, kMaxChildren);
01243
01244
01245 return true;
01246 }
01247 jobReport->parentAfterFork(jobReportFile);
01248 }
01249
01250
01251
01252
01253
01254
01255
01256
01257 sigset_t blockingSigSet;
01258 sigset_t unblockingSigSet;
01259 sigset_t oldSigSet;
01260 pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
01261 pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
01262 sigaddset(&blockingSigSet, SIGCHLD);
01263 sigaddset(&blockingSigSet, SIGUSR2);
01264 sigaddset(&blockingSigSet, SIGINT);
01265 sigdelset(&unblockingSigSet, SIGCHLD);
01266 sigdelset(&unblockingSigSet, SIGUSR2);
01267 sigdelset(&unblockingSigSet, SIGINT);
01268 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01269
01270
01271
01272 bool too_many_fds = false;
01273 if (pipes[1]+1 > FD_SETSIZE) {
01274 LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
01275 too_many_fds = true;
01276 }
01277
01278
01279
01280
01281 MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
01282 boost::thread senderThread(sender);
01283
01284 while(!too_many_fds && !shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
01285 sigsuspend(&unblockingSigSet);
01286 LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
01287 }
01288 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01289
01290 LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
01291 if(child_failed) {
01292 LogError("ForkingStopping") << "child failed";
01293 }
01294 if(shutdown_flag) {
01295 LogSystem("ForkingStopping") << "asked to shutdown";
01296 }
01297
01298 if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
01299 LogInfo("ForkingStopping") << "must stop children" << std::endl;
01300 for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
01301 it != itEnd; ++it) {
01302 kill(*it, SIGUSR2);
01303 }
01304 pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
01305 while(num_children_done != kMaxChildren) {
01306 sigsuspend(&unblockingSigSet);
01307 }
01308 pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
01309 }
01310
01311 senderThread.join();
01312 if(child_failed) {
01313 if (child_fail_signal) {
01314 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
01315 } else if (child_fail_exit_status) {
01316 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
01317 } else {
01318 throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
01319 }
01320 }
01321 if(too_many_fds) {
01322 throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
01323 }
01324 return false;
01325 }
01326
01327 void
01328 EventProcessor::connectSigs(EventProcessor* ep) {
01329
01330
01331
01332 actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
01333 actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
01334 }
01335
01336 std::vector<ModuleDescription const*>
01337 EventProcessor::getAllModuleDescriptions() const {
01338 return schedule_->getAllModuleDescriptions();
01339 }
01340
01341 int
01342 EventProcessor::totalEvents() const {
01343 return schedule_->totalEvents();
01344 }
01345
01346 int
01347 EventProcessor::totalEventsPassed() const {
01348 return schedule_->totalEventsPassed();
01349 }
01350
01351 int
01352 EventProcessor::totalEventsFailed() const {
01353 return schedule_->totalEventsFailed();
01354 }
01355
01356 void
01357 EventProcessor::enableEndPaths(bool active) {
01358 schedule_->enableEndPaths(active);
01359 }
01360
01361 bool
01362 EventProcessor::endPathsEnabled() const {
01363 return schedule_->endPathsEnabled();
01364 }
01365
01366 void
01367 EventProcessor::getTriggerReport(TriggerReport& rep) const {
01368 schedule_->getTriggerReport(rep);
01369 }
01370
01371 void
01372 EventProcessor::clearCounters() {
01373 schedule_->clearCounters();
01374 }
01375
01376 char const* EventProcessor::currentStateName() const {
01377 return stateName(getState());
01378 }
01379
01380 char const* EventProcessor::stateName(State s) const {
01381 return stateNames[s];
01382 }
01383
01384 char const* EventProcessor::msgName(Msg m) const {
01385 return msgNames[m];
01386 }
01387
01388 State EventProcessor::getState() const {
01389 return state_;
01390 }
01391
01392 EventProcessor::StatusCode EventProcessor::statusAsync() const {
01393
01394
01395 return last_rc_;
01396 }
01397
01398 void
01399 EventProcessor::setRunNumber(RunNumber_t runNumber) {
01400 if(runNumber == 0) {
01401 runNumber = 1;
01402 LogWarning("Invalid Run")
01403 << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
01404 << "Run number was set to 1 instead\n";
01405 }
01406
01407
01408 beginJob();
01409 changeState(mSetRun);
01410
01411
01412 input_->setRunNumber(runNumber);
01413 }
01414
01415 void
01416 EventProcessor::declareRunNumber(RunNumber_t ) {
01417
01418 beginJob();
01419 changeState(mSetRun);
01420
01421
01422
01423 }
01424
01425 EventProcessor::StatusCode
01426 EventProcessor::waitForAsyncCompletion(unsigned int timeout_seconds) {
01427 bool rc = true;
01428 boost::xtime timeout;
01429 boost::xtime_get(&timeout, boost::TIME_UTC);
01430 timeout.sec += timeout_seconds;
01431
01432
01433
01434
01435 {
01436 boost::mutex::scoped_lock sl(stop_lock_);
01437
01438
01439 if(stop_count_ < 0) return last_rc_;
01440
01441 if(timeout_seconds == 0) {
01442 while(stop_count_ == 0) stopper_.wait(sl);
01443 } else {
01444 while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
01445 }
01446
01447 if(rc == false) {
01448
01449
01450
01451
01452
01453
01454 if(id_set_) pthread_cancel(event_loop_id_);
01455
01456
01457 LogWarning("timeout")
01458 << "An asynchronous request was made to shut down "
01459 << "the event loop "
01460 << "and the event loop did not shutdown after "
01461 << timeout_seconds << " seconds\n";
01462 } else {
01463 event_loop_->join();
01464 event_loop_.reset();
01465 id_set_ = false;
01466 stop_count_ = -1;
01467 }
01468 }
01469 return rc == false ? epTimedOut : last_rc_;
01470 }
01471
01472 EventProcessor::StatusCode
01473 EventProcessor::waitTillDoneAsync(unsigned int timeout_value_secs) {
01474 StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
01475 if(rc != epTimedOut) changeState(mCountComplete);
01476 else errorState();
01477 return rc;
01478 }
01479
01480
01481 EventProcessor::StatusCode EventProcessor::stopAsync(unsigned int secs) {
01482 changeState(mStopAsync);
01483 StatusCode rc = waitForAsyncCompletion(secs);
01484 if(rc != epTimedOut) changeState(mFinished);
01485 else errorState();
01486 return rc;
01487 }
01488
01489 EventProcessor::StatusCode EventProcessor::shutdownAsync(unsigned int secs) {
01490 changeState(mShutdownAsync);
01491 StatusCode rc = waitForAsyncCompletion(secs);
01492 if(rc != epTimedOut) changeState(mFinished);
01493 else errorState();
01494 return rc;
01495 }
01496
01497 void EventProcessor::errorState() {
01498 state_ = sError;
01499 }
01500
01501
01502 EventProcessor::StatusCode EventProcessor::doneAsync(Msg m) {
01503
01504
01505
01506 changeState(m);
01507 return waitForAsyncCompletion(60*2);
01508 }
01509
01510 void EventProcessor::changeState(Msg msg) {
01511
01512
01513 boost::mutex::scoped_lock sl(state_lock_);
01514 State curr = state_;
01515 int rc;
01516
01517
01518 for(rc = 0;
01519 table[rc].current != sInvalid &&
01520 (curr != table[rc].current ||
01521 (curr == table[rc].current &&
01522 msg != table[rc].message && table[rc].message != mAny));
01523 ++rc);
01524
01525 if(table[rc].current == sInvalid)
01526 throw cms::Exception("BadState")
01527 << "A member function of EventProcessor has been called in an"
01528 << " inappropriate order.\n"
01529 << "Bad transition from " << stateName(curr) << " "
01530 << "using message " << msgName(msg) << "\n"
01531 << "No where to go from here.\n";
01532
01533 FDEBUG(1) << "changeState: current=" << stateName(curr)
01534 << ", message=" << msgName(msg)
01535 << " -> new=" << stateName(table[rc].final) << "\n";
01536
01537 state_ = table[rc].final;
01538 }
01539
01540 void EventProcessor::runAsync() {
01541 beginJob();
01542 {
01543 boost::mutex::scoped_lock sl(stop_lock_);
01544 if(id_set_ == true) {
01545 std::string err("runAsync called while async event loop already running\n");
01546 LogError("FwkJob") << err;
01547 throw cms::Exception("BadState") << err;
01548 }
01549
01550 changeState(mRunAsync);
01551
01552 stop_count_ = 0;
01553 last_rc_ = epSuccess;
01554 event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
01555 boost::xtime timeout;
01556 boost::xtime_get(&timeout, boost::TIME_UTC);
01557 timeout.sec += 60;
01558 if(starter_.timed_wait(sl, timeout) == false) {
01559
01560 throw cms::Exception("BadState")
01561 << "Async run thread did not start in 60 seconds\n";
01562 }
01563 }
01564 }
01565
01566 void EventProcessor::asyncRun(EventProcessor* me) {
01567
01568
01569
01570
01571
01572
01573
01574
01575
01576 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
01577
01578 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
01579 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
01580
01581 {
01582 boost::mutex::scoped_lock(me->stop_lock_);
01583 me->event_loop_id_ = pthread_self();
01584 me->id_set_ = true;
01585 me->starter_.notify_all();
01586 }
01587
01588 Status rc = epException;
01589 FDEBUG(2) << "asyncRun starting ......................\n";
01590
01591 try {
01592 bool onlineStateTransitions = true;
01593 rc = me->runToCompletion(onlineStateTransitions);
01594 }
01595 catch (cms::Exception& e) {
01596 LogError("FwkJob") << "cms::Exception caught in "
01597 << "EventProcessor::asyncRun"
01598 << "\n"
01599 << e.explainSelf();
01600 me->last_error_text_ = e.explainSelf();
01601 }
01602 catch (std::exception& e) {
01603 LogError("FwkJob") << "Standard library exception caught in "
01604 << "EventProcessor::asyncRun"
01605 << "\n"
01606 << e.what();
01607 me->last_error_text_ = e.what();
01608 }
01609 catch (...) {
01610 LogError("FwkJob") << "Unknown exception caught in "
01611 << "EventProcessor::asyncRun"
01612 << "\n";
01613 me->last_error_text_ = "Unknown exception caught";
01614 rc = epOther;
01615 }
01616
01617 me->last_rc_ = rc;
01618
01619 {
01620
01621 boost::mutex::scoped_lock sl(me->stop_lock_);
01622 ++me->stop_count_;
01623 me->stopper_.notify_all();
01624 }
01625 FDEBUG(2) << "asyncRun ending ......................\n";
01626 }
01627
01628
01629 EventProcessor::StatusCode
01630 EventProcessor::runToCompletion(bool onlineStateTransitions) {
01631
01632 StateSentry toerror(this);
01633
01634 int numberOfEventsToProcess = -1;
01635 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01636
01637 if(machine_.get() != 0) {
01638 throw Exception(errors::LogicError)
01639 << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
01640 << "Please report this error to the Framework group\n";
01641 }
01642
01643 toerror.succeeded();
01644
01645 return returnCode;
01646 }
01647
01648 EventProcessor::StatusCode
01649 EventProcessor::runEventCount(int numberOfEventsToProcess) {
01650
01651 StateSentry toerror(this);
01652
01653 bool onlineStateTransitions = false;
01654 StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
01655
01656 toerror.succeeded();
01657
01658 return returnCode;
01659 }
01660
01661 EventProcessor::StatusCode
01662 EventProcessor::runCommon(bool onlineStateTransitions, int numberOfEventsToProcess) {
01663
01664
01665 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_, historyAppender_.get()));
01666 principalCache_.insert(ep);
01667
01668 beginJob();
01669
01670 if(!onlineStateTransitions) changeState(mRunCount);
01671
01672 StatusCode returnCode = epSuccess;
01673 stateMachineWasInErrorState_ = false;
01674
01675
01676 ServiceRegistry::Operate operate(serviceToken_);
01677
01678 if(machine_.get() == 0) {
01679
01680 statemachine::FileMode fileMode;
01681 if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
01682 else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
01683 else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
01684 else {
01685 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
01686 << fileMode_ << ".\n"
01687 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
01688 }
01689
01690 statemachine::EmptyRunLumiMode emptyRunLumiMode;
01691 if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01692 else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
01693 else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
01694 else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
01695 else {
01696 throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
01697 << emptyRunLumiMode_ << ".\n"
01698 << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
01699 }
01700
01701 machine_.reset(new statemachine::Machine(this,
01702 fileMode,
01703 emptyRunLumiMode));
01704
01705 machine_->initiate();
01706 }
01707
01708 try {
01709 try {
01710
01711 InputSource::ItemType itemType;
01712
01713 int iEvents = 0;
01714
01715 while(true) {
01716
01717 itemType = input_->nextItemType();
01718
01719 FDEBUG(1) << "itemType = " << itemType << "\n";
01720
01721
01722
01723
01724
01725
01726
01727
01728 if(state_ == sStopping) {
01729 FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
01730 forceLooperToEnd_ = true;
01731 machine_->process_event(statemachine::Stop());
01732 forceLooperToEnd_ = false;
01733 break;
01734 }
01735 else if(state_ == sShuttingDown) {
01736 FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
01737 forceLooperToEnd_ = true;
01738 machine_->process_event(statemachine::Stop());
01739 forceLooperToEnd_ = false;
01740 break;
01741 }
01742
01743
01744 {
01745 boost::mutex::scoped_lock sl(usr2_lock);
01746 if(shutdown_flag) {
01747 changeState(mShutdownSignal);
01748 returnCode = epSignal;
01749 forceLooperToEnd_ = true;
01750 machine_->process_event(statemachine::Stop());
01751 forceLooperToEnd_ = false;
01752 break;
01753 }
01754 }
01755
01756 if(itemType == InputSource::IsStop) {
01757 machine_->process_event(statemachine::Stop());
01758 }
01759 else if(itemType == InputSource::IsFile) {
01760 machine_->process_event(statemachine::File());
01761 }
01762 else if(itemType == InputSource::IsRun) {
01763 machine_->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
01764 }
01765 else if(itemType == InputSource::IsLumi) {
01766 machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
01767 }
01768 else if(itemType == InputSource::IsEvent) {
01769 machine_->process_event(statemachine::Event());
01770 ++iEvents;
01771 if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
01772 returnCode = epCountComplete;
01773 changeState(mInputExhausted);
01774 FDEBUG(1) << "Event count complete, pausing event loop\n";
01775 break;
01776 }
01777 }
01778
01779 else {
01780 throw Exception(errors::LogicError)
01781 << "Unknown next item type passed to EventProcessor\n"
01782 << "Please report this error to the Framework group\n";
01783 }
01784
01785 if(machine_->terminated()) {
01786 changeState(mInputExhausted);
01787 break;
01788 }
01789 }
01790 }
01791 catch (cms::Exception& e) { throw; }
01792 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
01793 catch (std::exception& e) { convertException::stdToEDM(e); }
01794 catch(std::string& s) { convertException::stringToEDM(s); }
01795 catch(char const* c) { convertException::charPtrToEDM(c); }
01796 catch (...) { convertException::unknownToEDM(); }
01797 }
01798
01799
01800
01801
01802
01803
01804
01805
01806
01807
01808
01809
01810
01811
01812
01813
01814
01815
01816
01817
01818
01819
01820
01821
01822
01823
01824
01825
01826
01827
01828
01829
01830
01831
01832
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843 catch (cms::Exception & e) {
01844 alreadyHandlingException_ = true;
01845 terminateMachine();
01846 alreadyHandlingException_ = false;
01847 if (!exceptionMessageLumis_.empty()) {
01848 e.addAdditionalInfo(exceptionMessageLumis_);
01849 if (e.alreadyPrinted()) {
01850 LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
01851 }
01852 }
01853 if (!exceptionMessageRuns_.empty()) {
01854 e.addAdditionalInfo(exceptionMessageRuns_);
01855 if (e.alreadyPrinted()) {
01856 LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
01857 }
01858 }
01859 if (!exceptionMessageFiles_.empty()) {
01860 e.addAdditionalInfo(exceptionMessageFiles_);
01861 if (e.alreadyPrinted()) {
01862 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
01863 }
01864 }
01865 throw;
01866 }
01867
01868 if(machine_->terminated()) {
01869 FDEBUG(1) << "The state machine reports it has been terminated\n";
01870 machine_.reset();
01871 }
01872
01873 if(!onlineStateTransitions) changeState(mFinished);
01874
01875 if(stateMachineWasInErrorState_) {
01876 throw cms::Exception("BadState")
01877 << "The boost state machine in the EventProcessor exited after\n"
01878 << "entering the Error state.\n";
01879 }
01880
01881 return returnCode;
01882 }
01883
01884 void EventProcessor::readFile() {
01885 FDEBUG(1) << " \treadFile\n";
01886 fb_ = input_->readFile();
01887 if(numberOfForkedChildren_ > 0) {
01888 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
01889 }
01890 }
01891
01892 void EventProcessor::closeInputFile() {
01893 if (fb_.get() != 0) {
01894 input_->closeFile(fb_);
01895 }
01896 FDEBUG(1) << "\tcloseInputFile\n";
01897 }
01898
01899 void EventProcessor::openOutputFiles() {
01900 if (fb_.get() != 0) {
01901 schedule_->openOutputFiles(*fb_);
01902 if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
01903 }
01904 FDEBUG(1) << "\topenOutputFiles\n";
01905 }
01906
01907 void EventProcessor::closeOutputFiles() {
01908 if (fb_.get() != 0) {
01909 schedule_->closeOutputFiles();
01910 if(hasSubProcess()) subProcess_->closeOutputFiles();
01911 }
01912 FDEBUG(1) << "\tcloseOutputFiles\n";
01913 }
01914
01915 void EventProcessor::respondToOpenInputFile() {
01916 if (fb_.get() != 0) {
01917 schedule_->respondToOpenInputFile(*fb_);
01918 if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
01919 }
01920 FDEBUG(1) << "\trespondToOpenInputFile\n";
01921 }
01922
01923 void EventProcessor::respondToCloseInputFile() {
01924 if (fb_.get() != 0) {
01925 schedule_->respondToCloseInputFile(*fb_);
01926 if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
01927 }
01928 FDEBUG(1) << "\trespondToCloseInputFile\n";
01929 }
01930
01931 void EventProcessor::respondToOpenOutputFiles() {
01932 if (fb_.get() != 0) {
01933 schedule_->respondToOpenOutputFiles(*fb_);
01934 if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
01935 }
01936 FDEBUG(1) << "\trespondToOpenOutputFiles\n";
01937 }
01938
01939 void EventProcessor::respondToCloseOutputFiles() {
01940 if (fb_.get() != 0) {
01941 schedule_->respondToCloseOutputFiles(*fb_);
01942 if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
01943 }
01944 FDEBUG(1) << "\trespondToCloseOutputFiles\n";
01945 }
01946
01947 void EventProcessor::startingNewLoop() {
01948 shouldWeStop_ = false;
01949
01950
01951 if(looper_ && looperBeginJobRun_) {
01952 looper_->doStartingNewLoop();
01953 }
01954 FDEBUG(1) << "\tstartingNewLoop\n";
01955 }
01956
01957 bool EventProcessor::endOfLoop() {
01958 if(looper_) {
01959 ModuleChanger changer(schedule_.get());
01960 looper_->setModuleChanger(&changer);
01961 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
01962 looper_->setModuleChanger(0);
01963 if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
01964 else return false;
01965 }
01966 FDEBUG(1) << "\tendOfLoop\n";
01967 return true;
01968 }
01969
01970 void EventProcessor::rewindInput() {
01971 input_->repeat();
01972 input_->rewind();
01973 FDEBUG(1) << "\trewind\n";
01974 }
01975
01976 void EventProcessor::prepareForNextLoop() {
01977 looper_->prepareForNextLoop(esp_.get());
01978 FDEBUG(1) << "\tprepareForNextLoop\n";
01979 }
01980
01981 bool EventProcessor::shouldWeCloseOutput() const {
01982 FDEBUG(1) << "\tshouldWeCloseOutput\n";
01983 return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
01984 }
01985
01986 void EventProcessor::doErrorStuff() {
01987 FDEBUG(1) << "\tdoErrorStuff\n";
01988 LogError("StateMachine")
01989 << "The EventProcessor state machine encountered an unexpected event\n"
01990 << "and went to the error state\n"
01991 << "Will attempt to terminate processing normally\n"
01992 << "(IF using the looper the next loop will be attempted)\n"
01993 << "This likely indicates a bug in an input module or corrupted input or both\n";
01994 stateMachineWasInErrorState_ = true;
01995 }
01996
01997 void EventProcessor::beginRun(statemachine::Run const& run) {
01998 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
01999 input_->doBeginRun(runPrincipal);
02000 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
02001 runPrincipal.beginTime());
02002 if(forceESCacheClearOnNewRun_){
02003 esp_->forceCacheClear();
02004 }
02005 EventSetup const& es = esp_->eventSetupForInstance(ts);
02006 if(looper_ && looperBeginJobRun_== false) {
02007 looper_->copyInfo(ScheduleInfo(schedule_.get()));
02008 looper_->beginOfJob(es);
02009 looperBeginJobRun_ = true;
02010 looper_->doStartingNewLoop();
02011 }
02012 {
02013 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
02014 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02015 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
02016 if(hasSubProcess()) {
02017 subProcess_->doBeginRun(runPrincipal, ts);
02018 }
02019 }
02020 FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
02021 if(looper_) {
02022 looper_->doBeginRun(runPrincipal, es);
02023 }
02024 }
02025
02026 void EventProcessor::endRun(statemachine::Run const& run) {
02027 RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
02028 input_->doEndRun(runPrincipal);
02029 IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
02030 runPrincipal.endTime());
02031 EventSetup const& es = esp_->eventSetupForInstance(ts);
02032 {
02033 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
02034 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
02035 schedule_->processOneOccurrence<Traits>(runPrincipal, es);
02036 if(hasSubProcess()) {
02037 subProcess_->doEndRun(runPrincipal, ts);
02038 }
02039 }
02040 FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
02041 if(looper_) {
02042 looper_->doEndRun(runPrincipal, es);
02043 }
02044 }
02045
02046 void EventProcessor::beginLumi(ProcessHistoryID const& phid, int run, int lumi) {
02047 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02048 input_->doBeginLumi(lumiPrincipal);
02049
02050 Service<RandomNumberGenerator> rng;
02051 if(rng.isAvailable()) {
02052 LuminosityBlock lb(lumiPrincipal, ModuleDescription());
02053 rng->preBeginLumi(lb);
02054 }
02055
02056
02057
02058 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
02059 EventSetup const& es = esp_->eventSetupForInstance(ts);
02060 {
02061 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
02062 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02063 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02064 if(hasSubProcess()) {
02065 subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
02066 }
02067 }
02068 FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
02069 if(looper_) {
02070 looper_->doBeginLuminosityBlock(lumiPrincipal, es);
02071 }
02072 }
02073
02074 void EventProcessor::endLumi(ProcessHistoryID const& phid, int run, int lumi) {
02075 LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
02076 input_->doEndLumi(lumiPrincipal);
02077
02078
02079 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
02080 lumiPrincipal.endTime());
02081 EventSetup const& es = esp_->eventSetupForInstance(ts);
02082 {
02083 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
02084 ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
02085 schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
02086 if(hasSubProcess()) {
02087 subProcess_->doEndLuminosityBlock(lumiPrincipal, ts);
02088 }
02089 }
02090 FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
02091 if(looper_) {
02092 looper_->doEndLuminosityBlock(lumiPrincipal, es);
02093 }
02094 }
02095
02096 statemachine::Run EventProcessor::readAndCacheRun(bool merge) {
02097 input_->readAndCacheRun(merge, *historyAppender_);
02098 input_->markRun();
02099 return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
02100 }
02101
02102 int EventProcessor::readAndCacheLumi(bool merge) {
02103 input_->readAndCacheLumi(merge, *historyAppender_);
02104 input_->markLumi();
02105 return input_->luminosityBlock();
02106 }
02107
02108 void EventProcessor::writeRun(statemachine::Run const& run) {
02109 schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
02110 if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
02111 FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
02112 }
02113
02114 void EventProcessor::deleteRunFromCache(statemachine::Run const& run) {
02115 principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
02116 if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
02117 FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
02118 }
02119
02120 void EventProcessor::writeLumi(ProcessHistoryID const& phid, int run, int lumi) {
02121 schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
02122 if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
02123 FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
02124 }
02125
02126 void EventProcessor::deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi) {
02127 principalCache_.deleteLumi(phid, run, lumi);
02128 if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
02129 FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
02130 }
02131
02132 void EventProcessor::readAndProcessEvent() {
02133 EventPrincipal *pep = 0;
02134 try {
02135 try {
02136 pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
02137 FDEBUG(1) << "\treadEvent\n";
02138 }
02139 catch (cms::Exception& e) { throw; }
02140 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
02141 catch (std::exception& e) { convertException::stdToEDM(e); }
02142 catch(std::string& s) { convertException::stringToEDM(s); }
02143 catch(char const* c) { convertException::charPtrToEDM(c); }
02144 catch (...) { convertException::unknownToEDM(); }
02145 }
02146 catch(cms::Exception& ex) {
02147 ex.addContext("Calling readEvent in the input source");
02148 throw;
02149 }
02150 assert(pep != 0);
02151
02152 IOVSyncValue ts(pep->id(), pep->time());
02153 EventSetup const& es = esp_->eventSetupForInstance(ts);
02154 {
02155 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
02156 ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
02157 schedule_->processOneOccurrence<Traits>(*pep, es);
02158 if(hasSubProcess()) {
02159 subProcess_->doEvent(*pep, ts);
02160 }
02161 }
02162
02163 if(looper_) {
02164 bool randomAccess = input_->randomAccess();
02165 ProcessingController::ForwardState forwardState = input_->forwardState();
02166 ProcessingController::ReverseState reverseState = input_->reverseState();
02167 ProcessingController pc(forwardState, reverseState, randomAccess);
02168
02169 EDLooperBase::Status status = EDLooperBase::kContinue;
02170 do {
02171 status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
02172
02173 bool succeeded = true;
02174 if(randomAccess) {
02175 if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
02176 input_->skipEvents(-2);
02177 }
02178 else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
02179 succeeded = input_->goToEvent(pc.specifiedEventTransition());
02180 }
02181 }
02182 pc.setLastOperationSucceeded(succeeded);
02183 } while(!pc.lastOperationSucceeded());
02184 if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
02185
02186 }
02187
02188 FDEBUG(1) << "\tprocessEvent\n";
02189 pep->clearEventPrincipal();
02190 }
02191
02192 bool EventProcessor::shouldWeStop() const {
02193 FDEBUG(1) << "\tshouldWeStop\n";
02194 if(shouldWeStop_) return true;
02195 return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
02196 }
02197
02198 void EventProcessor::setExceptionMessageFiles(std::string& message) {
02199 exceptionMessageFiles_ = message;
02200 }
02201
02202 void EventProcessor::setExceptionMessageRuns(std::string& message) {
02203 exceptionMessageRuns_ = message;
02204 }
02205
02206 void EventProcessor::setExceptionMessageLumis(std::string& message) {
02207 exceptionMessageLumis_ = message;
02208 }
02209
02210 bool EventProcessor::alreadyHandlingException() const {
02211 return alreadyHandlingException_;
02212 }
02213
02214 void EventProcessor::terminateMachine() {
02215 if(machine_.get() != 0) {
02216 if(!machine_->terminated()) {
02217 forceLooperToEnd_ = true;
02218 machine_->process_event(statemachine::Stop());
02219 forceLooperToEnd_ = false;
02220 }
02221 else {
02222 FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
02223 }
02224 if(machine_->terminated()) {
02225 FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
02226 }
02227 machine_.reset();
02228 }
02229 }
02230 }