CMS 3D CMS Logo

Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends

edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

List of all members.

Public Member Functions

virtual bool alreadyHandlingException () const
void beginJob ()
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
virtual void beginRun (statemachine::Run const &run)
void clearCounters ()
 Clears counters used by trigger report.
virtual void closeInputFile (bool cleaningUpAfterException)
virtual void closeOutputFiles ()
char const * currentStateName () const
void declareRunNumber (RunNumber_t runNumber)
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
virtual void deleteRunFromCache (statemachine::Run const &run)
virtual void doErrorStuff ()
void enableEndPaths (bool active)
void endJob ()
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
virtual bool endOfLoop ()
bool endPathsEnabled () const
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 EventProcessor (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 EventProcessor (EventProcessor const &)
bool forkProcess (std::string const &jobReportFile)
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
event_processor::State getState () const
ServiceToken getToken ()
void getTriggerReport (TriggerReport &rep) const
char const * msgName (event_processor::Msg m) const
virtual void openOutputFiles ()
EventProcessoroperator= (EventProcessor const &)
ActivityRegistry::PostProcessEventpostProcessEventSignal ()
virtual void prepareForNextLoop ()
ActivityRegistry::PreProcessEventpreProcessEventSignal ()
virtual int readAndCacheLumi ()
virtual statemachine::Run readAndCacheRun ()
virtual int readAndMergeLumi ()
virtual statemachine::Run readAndMergeRun ()
virtual void readAndProcessEvent ()
virtual void readFile ()
virtual void respondToCloseInputFile ()
virtual void respondToCloseOutputFiles ()
virtual void respondToOpenInputFile ()
virtual void respondToOpenOutputFiles ()
virtual void rewindInput ()
StatusCode run ()
void runAsync ()
virtual StatusCode runToCompletion (bool onlineStateTransitions)
virtual void setExceptionMessageFiles (std::string &message)
virtual void setExceptionMessageLumis (std::string &message)
virtual void setExceptionMessageRuns (std::string &message)
void setRunNumber (RunNumber_t runNumber)
virtual bool shouldWeCloseOutput () const
virtual bool shouldWeStop () const
StatusCode shutdownAsync (unsigned int timeout_secs=60 *2)
virtual void startingNewLoop ()
char const * stateName (event_processor::State s) const
StatusCode statusAsync () const
StatusCode stopAsync (unsigned int timeout_secs=60 *2)
int totalEvents () const
int totalEventsFailed () const
int totalEventsPassed () const
StatusCode waitTillDoneAsync (unsigned int timeout_seconds=0)
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
virtual void writeRun (statemachine::Run const &run)
 ~EventProcessor ()

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
typedef std::map< std::string,
ExcludedData
ExcludedDataMap

Private Member Functions

void changeState (event_processor::Msg)
void connectSigs (EventProcessor *ep)
std::auto_ptr
< statemachine::Machine
createStateMachine ()
StatusCode doneAsync (event_processor::Msg m)
void errorState ()
bool hasSubProcess () const
void init (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void possiblyContinueAfterForkChildFailure ()
void setupSignal ()
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
StatusCode waitForAsyncCompletion (unsigned int timeout_seconds)

Static Private Member Functions

static void asyncRun (EventProcessor *)

Private Attributes

std::unique_ptr< ActionTable
const > 
act_table_
boost::shared_ptr
< ActivityRegistry
actReg_
bool alreadyHandlingException_
boost::shared_ptr
< BranchIDListHelper
branchIDListHelper_
bool continueAfterChildFailure_
std::string emptyRunLumiMode_
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
std::unique_ptr
< eventsetup::EventSetupsController
espController_
boost::shared_ptr< boost::thread > event_loop_
volatile pthread_t event_loop_id_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::string exceptionMessageFiles_
std::string exceptionMessageLumis_
std::string exceptionMessageRuns_
std::unique_ptr< FileBlockfb_
std::string fileMode_
bool forceESCacheClearOnNewRun_
bool forceLooperToEnd_
std::unique_ptr< HistoryAppenderhistoryAppender_
volatile bool id_set_
std::unique_ptr< InputSourceinput_
std::string last_error_text_
volatile Status last_rc_
boost::shared_ptr< EDLooperBaselooper_
bool looperBeginJobRun_
int my_sig_num_
int numberOfForkedChildren_
unsigned int numberOfSequentialEventsPerChild_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr
< ProductRegistry const > 
preg_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
PrincipalCache principalCache_
boost::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
std::auto_ptr< Scheduleschedule_
ServiceToken serviceToken_
bool setCpuAffinity_
bool shouldWeStop_
boost::condition starter_
volatile event_processor::State state_
boost::mutex state_lock_
bool stateMachineWasInErrorState_
volatile int stop_count_
boost::mutex stop_lock_
boost::condition stopper_
std::auto_ptr< SubProcesssubProcess_

Friends

class event_processor::StateSentry

Detailed Description

Definition at line 70 of file EventProcessor.h.


Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData [private]

Definition at line 374 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap [private]

Definition at line 375 of file EventProcessor.h.


Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
) [explicit]

Definition at line 380 of file EventProcessor.cc.

References init(), and PythonProcessDesc::parameterSet().

                                                                            :
    preProcessEventSignal_(),
    postProcessEventSignal_(),
    actReg_(),
    preg_(),
    branchIDListHelper_(),
    serviceToken_(),
    input_(),
    espController_(new eventsetup::EventSetupsController),
    esp_(),
    act_table_(),
    processConfiguration_(),
    schedule_(),
    subProcess_(),
    historyAppender_(new HistoryAppender),
    state_(sInit),
    event_loop_(),
    state_lock_(),
    stop_lock_(),
    stopper_(),
    starter_(),
    stop_count_(-1),
    last_rc_(epSuccess),
    last_error_text_(),
    id_set_(false),
    event_loop_id_(),
    my_sig_num_(getSigNum()),
    fb_(),
    looper_(),
    principalCache_(),
    shouldWeStop_(false),
    stateMachineWasInErrorState_(false),
    fileMode_(),
    emptyRunLumiMode_(),
    exceptionMessageFiles_(),
    exceptionMessageRuns_(),
    exceptionMessageLumis_(),
    alreadyHandlingException_(false),
    forceLooperToEnd_(false),
    looperBeginJobRun_(false),
    forceESCacheClearOnNewRun_(false),
    numberOfForkedChildren_(0),
    numberOfSequentialEventsPerChild_(1),
    setCpuAffinity_(false),
    eventSetupDataToExcludeFromPrefetching_() {
    boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
    boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
    processDesc->addServices(defaultServices, forcedServices);
    init(processDesc, iToken, iLegacy);
  }

  EventProcessor::EventProcessor(std::string const& config,
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
edm::EventProcessor::EventProcessor ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 539 of file EventProcessor.cc.

References init, edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::parameterSet().

                                              {
    if(isPython) {
      boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
      boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
      init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
    }
    else {
      boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
      init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
    }
  }

  void
edm::EventProcessor::~EventProcessor ( )

Definition at line 680 of file EventProcessor.cc.

        {
      changeState(mDtor);
    }
    catch(cms::Exception& e) {
      LogError("System")
        << e.explainSelf() << "\n";
    }

    // manually destroy all these thing that may need the services around
    espController_.reset();
    subProcess_.reset();
    esp_.reset();
    schedule_.reset();
    input_.reset();
    looper_.reset();
    actReg_.reset();

    pset::Registry* psetRegistry = pset::Registry::instance();
    psetRegistry->data().clear();
    psetRegistry->extra().setID(ParameterSetID());

    EntryDescriptionRegistry::instance()->data().clear();
    ParentageRegistry::instance()->data().clear();
    ProcessConfigurationRegistry::instance()->data().clear();
    ProcessHistoryRegistry::instance()->data().clear();
  }

  void
edm::EventProcessor::EventProcessor ( EventProcessor const &  )

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const [virtual]

Implements edm::IEventProcessor.

Definition at line 2222 of file EventProcessor.cc.

                                                                                  {
void edm::EventProcessor::asyncRun ( EventProcessor me) [static, private]

Definition at line 1561 of file EventProcessor.cc.

Referenced by runAsync().

    {
      boost::mutex::scoped_lock sl(me->stop_lock_);
      me->event_loop_id_ = pthread_self();
      me->id_set_ = true;
      me->starter_.notify_all();
    }

    Status rc = epException;
    FDEBUG(2) << "asyncRun starting ......................\n";

    try {
      bool onlineStateTransitions = true;
      rc = me->runToCompletion(onlineStateTransitions);
    }
    catch (cms::Exception& e) {
      LogError("FwkJob") << "cms::Exception caught in "
                         << "EventProcessor::asyncRun"
                         << "\n"
                         << e.explainSelf();
      me->last_error_text_ = e.explainSelf();
    }
    catch (std::exception& e) {
      LogError("FwkJob") << "Standard library exception caught in "
                         << "EventProcessor::asyncRun"
                         << "\n"
                         << e.what();
      me->last_error_text_ = e.what();
    }
    catch (...) {
      LogError("FwkJob") << "Unknown exception caught in "
                         << "EventProcessor::asyncRun"
                         << "\n";
      me->last_error_text_ = "Unknown exception caught";
      rc = epOther;
    }

    me->last_rc_ = rc;

    {
      // notify anyone waiting for exit that we are doing so now
      boost::mutex::scoped_lock sl(me->stop_lock_);
      ++me->stop_count_;
      me->stopper_.notify_all();
    }
    FDEBUG(2) << "asyncRun ending ......................\n";
  }

  std::auto_ptr<statemachine::Machine>
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 712 of file EventProcessor.cc.

        {
      try {
        input_->doBeginJob();
      }
      catch (cms::Exception& e) { throw; }
      catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
      catch (std::exception& e) { convertException::stdToEDM(e); }
      catch(std::string& s) { convertException::stringToEDM(s); }
      catch(char const* c) { convertException::charPtrToEDM(c); }
      catch (...) { convertException::unknownToEDM(); }
    }
    catch(cms::Exception& ex) {
      ex.addContext("Calling beginJob for the source");
      throw;
    }
    schedule_->beginJob(*preg_);
    // toerror.succeeded(); // should we add this?
    if(hasSubProcess()) subProcess_->doBeginJob();
    actReg_->postBeginJobSignal_();
  }

  void
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
) [virtual]

Implements edm::IEventProcessor.

Definition at line 2033 of file EventProcessor.cc.

                          {
      LuminosityBlock lb(lumiPrincipal, ModuleDescription());
      rng->preBeginLumi(lb);
    }

    // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
    // lumi blocks know their start and end times why not also start and end events?
    IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
    espController_->eventSetupForInstance(ts);
    EventSetup const& es = esp_->eventSetup();
    {
      typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
      ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
      schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
      if(hasSubProcess()) {
        subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
      }
      sentry.allowThrow();
    }
    FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
    if(looper_) {
      looper_->doBeginLuminosityBlock(lumiPrincipal, es);
    }
  }

  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
void edm::EventProcessor::beginRun ( statemachine::Run const &  run) [virtual]

Implements edm::IEventProcessor.

Definition at line 1980 of file EventProcessor.cc.

References espController_.

                                  {
      espController_->forceCacheClear();
    }
    espController_->eventSetupForInstance(ts);
    EventSetup const& es = esp_->eventSetup();
    if(looper_ && looperBeginJobRun_== false) {
      looper_->copyInfo(ScheduleInfo(schedule_.get()));
      looper_->beginOfJob(es);
      looperBeginJobRun_ = true;
      looper_->doStartingNewLoop();
    }
    {
      typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
      ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
      schedule_->processOneOccurrence<Traits>(runPrincipal, es);
      if(hasSubProcess()) {
        subProcess_->doBeginRun(runPrincipal, ts);
      }
      sentry.allowThrow();
    }
    FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
    if(looper_) {
      looper_->doBeginRun(runPrincipal, es);
    }
  }

  void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) {
void edm::EventProcessor::changeState ( event_processor::Msg  msg) [private]

Definition at line 1501 of file EventProcessor.cc.

Referenced by runAsync().

                             : current=" << stateName(curr)
              << ", message=" << msgName(msg)
              << " -> new=" << stateName(table[rc].final) << "\n";

    state_ = table[rc].final;
  }

  void EventProcessor::runAsync() {
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1358 of file EventProcessor.cc.

                                                     {
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException) [virtual]

Implements edm::IEventProcessor.

Definition at line 1875 of file EventProcessor.cc.

                                       {
void edm::EventProcessor::closeOutputFiles ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1890 of file EventProcessor.cc.

                                              {
void edm::EventProcessor::connectSigs ( EventProcessor ep) [private]

Definition at line 1314 of file EventProcessor.cc.

Referenced by init().

std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( ) [private]

Definition at line 1624 of file EventProcessor.cc.

References edm::errors::Configuration, Exception, and fileMode_.

         {
      throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
      << fileMode_ << ".\n"
      << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
    }
    
    statemachine::EmptyRunLumiMode emptyRunLumiMode;
    if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
    else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
    else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
    else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
    else {
      throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
      << emptyRunLumiMode_ << ".\n"
      << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
    }
    
    std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
                                             fileMode,
                                             emptyRunLumiMode));
    
    machine->initiate();
    return machine;
  }


char const * edm::EventProcessor::currentStateName ( ) const

Member functions to support asynchronous interface.

Definition at line 1362 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::microState(), and evf::FWEPWrapper::monitoring().

                                                     {
void edm::EventProcessor::declareRunNumber ( RunNumber_t  runNumber)

Definition at line 1402 of file EventProcessor.cc.

void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
) [virtual]

Implements edm::IEventProcessor.

Definition at line 2147 of file EventProcessor.cc.

                                           {
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run) [virtual]

Implements edm::IEventProcessor.

Definition at line 2135 of file EventProcessor.cc.

                                                                                                            {
void edm::EventProcessor::doErrorStuff ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1969 of file EventProcessor.cc.

                                                          {
EventProcessor::StatusCode edm::EventProcessor::doneAsync ( event_processor::Msg  m) [private]

Definition at line 1493 of file EventProcessor.cc.

                                          {
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1343 of file EventProcessor.cc.

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 758 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::init(), and evf::FWEPWrapper::stopAndHalt().

                        {
      c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
    }
    c.call(boost::bind(&InputSource::doEndJob, input_.get()));
    if(looper_) {
      c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
    }
    auto actReg = actReg_.get();
    c.call([actReg](){actReg->postEndJobSignal_();});
    if(c.hasThrown()) {
      c.rethrow();
    }
  }

  ServiceToken
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
) [virtual]

Implements edm::IEventProcessor.

Definition at line 2063 of file EventProcessor.cc.

References actReg_, hasSubProcess(), schedule_, and subProcess_.

    {
      typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
      ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
      schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
      if(hasSubProcess()) {
        subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
      }
      sentry.allowThrow();
    }
    FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
    if(looper_) {
      looper_->doEndLuminosityBlock(lumiPrincipal, es);
    }
  }

  statemachine::Run EventProcessor::readAndCacheRun() {
bool edm::EventProcessor::endOfLoop ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1940 of file EventProcessor.cc.

                                   {
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1348 of file EventProcessor.cc.

void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
) [virtual]

Implements edm::IEventProcessor.

Definition at line 2011 of file EventProcessor.cc.

References actReg_, hasSubProcess(), schedule_, and subProcess_.

    {
      typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
      ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
      schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
      if(hasSubProcess()) {
        subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
      }
      sentry.allowThrow();
    }
    FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
    if(looper_) {
      looper_->doEndRun(runPrincipal, es);
    }
  }

  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) {
void edm::EventProcessor::errorState ( ) [private]

Definition at line 1488 of file EventProcessor.cc.

bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 1013 of file EventProcessor.cc.

                                     {return true;}
    assert(0<numberOfForkedChildren_);
    //do what we want done in common
    {
      beginJob(); //make sure this was run
      // make the services available
      ServiceRegistry::Operate operate(serviceToken_);

      InputSource::ItemType itemType;
      itemType = input_->nextItemType();

      assert(itemType == InputSource::IsFile);
      {
        readFile();
      }
      itemType = input_->nextItemType();
      assert(itemType == InputSource::IsRun);

      LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
      IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
                      input_->runAuxiliary()->beginTime());
      espController_->eventSetupForInstance(ts);
      EventSetup const& es = esp_->eventSetup();

      //now get all the data available in the EventSetup
      std::vector<eventsetup::EventSetupRecordKey> recordKeys;
      es.fillAvailableRecordKeys(recordKeys);
      std::vector<eventsetup::DataKey> dataKeys;
      for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
          itKey != itEnd;
          ++itKey) {
        eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
        //see if this is on our exclusion list
        ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
        ExcludedData const* excludedData(nullptr);
        if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
          excludedData = &(itExcludeRec->second);
          if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
            //skip all items in this record
            continue;
          }
        }
        if(0 != recordPtr) {
          dataKeys.clear();
          recordPtr->fillRegisteredDataKeys(dataKeys);
          for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
              itDataKey != itDataKeyEnd;
              ++itDataKey) {
            //std::cout << "  " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
            if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
              LogInfo("ForkingEventSetupPreFetching") << "   excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
              continue;
            }
            try {
              recordPtr->doGet(*itDataKey);
            } catch(cms::Exception& e) {
             LogWarning("ForkingEventSetupPreFetching") << e.what();
            }
          }
        }
      }
    }
    LogSystem("ForkingEventSetupPreFetching") <<"  done prefetching";
    {
      // make the services available
      ServiceRegistry::Operate operate(serviceToken_);
      Service<JobReport> jobReport;
      jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);

      //Now actually do the forking
      actReg_->preForkReleaseResourcesSignal_();
      input_->doPreForkReleaseResources();
      schedule_->preForkReleaseResources();
    }
    installCustomHandler(SIGCHLD, ep_sigchld);


    unsigned int childIndex = 0;
    unsigned int const kMaxChildren = numberOfForkedChildren_;
    unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
    std::vector<pid_t> childrenIds;
    childrenIds.reserve(kMaxChildren);
    std::vector<int> childrenSockets;
    childrenSockets.reserve(kMaxChildren);
    std::vector<int> childrenPipes;
    childrenPipes.reserve(kMaxChildren);
    std::vector<int> childrenSocketsCopy;
    childrenSocketsCopy.reserve(kMaxChildren);
    std::vector<int> childrenPipesCopy;
    childrenPipesCopy.reserve(kMaxChildren);
    int pipes[] {0, 0};

    {
      // make the services available
      ServiceRegistry::Operate operate(serviceToken_);
      Service<JobReport> jobReport;
      int sockets[2], fd_flags;
      for(; childIndex < kMaxChildren; ++childIndex) {
        // Create a UNIX_DGRAM socket pair
        if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
          printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }
        if (pipe(pipes)) {
          printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }
        // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
        if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
          printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }
        // Mark socket as non-block.  Child must be careful to do select prior
        // to reading from socket.
        if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
          printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }
        if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
          printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }
        if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
          printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }
        // Linux man page notes there are some edge cases where reading from a
        // fd can block, even after a select.
        if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
          printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }
        if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
          printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
          exit(EXIT_FAILURE);
        }

        childrenPipesCopy = childrenPipes;
        childrenSocketsCopy = childrenSockets;

        pid_t value = fork();
        if(value == 0) {
          // Close the parent's side of the socket and pipe which will talk to us.
          close(pipes[0]);
          close(sockets[0]);
          // Close our copies of the parent's other communication pipes.
          for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
            close(*it);
          }
          for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
            close(*it);
          }

          // this is the child process, redirect stdout and stderr to a log file
          fflush(stdout);
          fflush(stderr);
          std::stringstream stout;
          stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
          if(0 == freopen(stout.str().c_str(), "w", stdout)) {
            LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
          }
          if(dup2(fileno(stdout), fileno(stderr)) < 0) {
            LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
          }

          LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
          if(setCpuAffinity_) {
            // CPU affinity is handled differently on macosx.
            // We disable it and print a message until someone reads:
            //
            // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
            //
            // and implements it.
#ifdef __APPLE__
            LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
#else
            LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
            cpu_set_t mask;
            CPU_ZERO(&mask);
            CPU_SET(childIndex, &mask);
            if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
              LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
              exit(-1);
            }
#endif
          }
          break;
        } else {
          //this is the parent
          close(pipes[1]);
          close(sockets[1]);
        }
        if(value < 0) {
          LogError("ForkingChild") << "failed to create a child";
          exit(-1);
        }
        childrenIds.push_back(value);
        childrenSockets.push_back(sockets[0]);
        childrenPipes.push_back(pipes[0]);
      }

      if(childIndex < kMaxChildren) {
        jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
        actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);

        boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
        input_->doPostForkReacquireResources(receiver);
        schedule_->postForkReacquireResources(childIndex, kMaxChildren);
        //NOTE: sources have to reset themselves by listening to the post fork message
        //rewindInput();
        return true;
      }
      jobReport->parentAfterFork(jobReportFile);
    }

    //this is the original, which is now the master for all the children

    //Need to wait for signals from the children or externally
    // To wait we must
    // 1) block the signals we want to wait on so we do not have a race condition
    // 2) check that we haven't already meet our ending criteria
    // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
    sigset_t blockingSigSet;
    sigset_t unblockingSigSet;
    sigset_t oldSigSet;
    pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
    pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
    sigaddset(&blockingSigSet, SIGCHLD);
    sigaddset(&blockingSigSet, SIGUSR2);
    sigaddset(&blockingSigSet, SIGINT);
    sigdelset(&unblockingSigSet, SIGCHLD);
    sigdelset(&unblockingSigSet, SIGUSR2);
    sigdelset(&unblockingSigSet, SIGINT);
    pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);

    // If there are too many fd's (unlikely, but possible) for select, denote this 
    // because the sender will fail.
    bool too_many_fds = false;
    if (pipes[1]+1 > FD_SETSIZE) {
      LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
      too_many_fds = true;
    }

    //create a thread that sends the units of work to workers
    // we create it after all signals were blocked so that this
    // thread is never interupted by a signal
    MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
    boost::thread senderThread(sender);

    if(not too_many_fds) {
      //NOTE: a child could have failed before we got here and even after this call
      // which is why the 'if' is conditional on continueAfterChildFailure_
      possiblyContinueAfterForkChildFailure();
      while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
        sigsuspend(&unblockingSigSet);
        possiblyContinueAfterForkChildFailure();
        LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
      }
    }
    pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);

    LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
    if(child_failed) {
      LogError("ForkingStopping") << "child failed";
    }
    if(shutdown_flag) {
      LogSystem("ForkingStopping") << "asked to shutdown";
    }

    if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
      LogInfo("ForkingStopping") << "must stop children" << std::endl;
      for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
          it != itEnd; ++it) {
        /* int result = */ kill(*it, SIGUSR2);
      }
      pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
      while(num_children_done != kMaxChildren) {
        sigsuspend(&unblockingSigSet);
      }
      pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
    }
    // The senderThread will notice the pipes die off, one by one.  Once all children are gone, it will exit.
    senderThread.join();
    if(child_failed && !continueAfterChildFailure_) {
      if (child_fail_signal) {
        throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
      } else if (child_fail_exit_status) {
        throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
      } else {
        throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
      }
    }
    if(too_many_fds) {
      throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
    }
    return false;
  }

  void
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is *not* passed to the caller. Do not call delete on these pointers!

Definition at line 1323 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::init().

State edm::EventProcessor::getState ( ) const
ServiceToken edm::EventProcessor::getToken ( )
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1353 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::init(), and evf::FWEPWrapper::taskWebPage().

bool edm::EventProcessor::hasSubProcess ( ) const [inline, private]

Definition at line 313 of file EventProcessor.h.

References subProcess_.

Referenced by endLumi(), and endRun().

                               {
      return subProcess_.get() != 0;
    }
void edm::EventProcessor::init ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
) [private]

Definition at line 596 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper_, branchIDListHelper_, connectSigs(), continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::eventsetup::heterocontainer::insert(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::popSubProcessParameterSet(), edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, schedule_, serviceToken_, setCpuAffinity_, edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, and subProcess_.

Referenced by EventProcessor().

                                                              {

    //std::cerr << processDesc->dump() << std::endl;
   
    ROOT::Cintex::Cintex::Enable();

    boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
    //std::cerr << parameterSet->dump() << std::endl;

    // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
    boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());

    // Now set some parameters specific to the main process.
    ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
    fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
    emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
    forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
    ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
    numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
    numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
    setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
    continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
    std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
    for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
        itPS != itPSEnd;
        ++itPS) {
      eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
                                                std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
                                                               itPS->getUntrackedParameter<std::string>("label", "")));
    }
    IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));

    // Now do general initialization
    ScheduleItems items;

    //initialize the services
    boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
    ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
    serviceToken_ = items.addCPRandTNS(*parameterSet, token);

    //make the services available
    ServiceRegistry::Operate operate(serviceToken_);

    // intialize miscellaneous items
    boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));

    // intialize the event setup provider
    esp_ = espController_->makeProvider(*parameterSet);

    // initialize the looper, if any
    looper_ = fillLooper(*espController_, *esp_, *parameterSet);
    if(looper_) {
      looper_->setActionTable(items.act_table_.get());
      looper_->attachTo(*items.actReg_);
    }

    // initialize the input source
    input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);

    // intialize the Schedule
    schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());

    // set the data members
    act_table_ = std::move(items.act_table_);
    actReg_ = items.actReg_;
    preg_.reset(items.preg_.release());
    branchIDListHelper_ = items.branchIDListHelper_;
    processConfiguration_ = items.processConfiguration_;

    FDEBUG(2) << parameterSet << std::endl;
    connectSigs(this);

    // Reusable event principal
    boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
    principalCache_.insert(ep);
      
    // initialize the subprocess, if there is one
    if(subProcessParameterSet) {
      subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
    }
  }

  EventProcessor::~EventProcessor() {
char const * edm::EventProcessor::msgName ( event_processor::Msg  m) const

Definition at line 1370 of file EventProcessor.cc.

                                       {
void edm::EventProcessor::openOutputFiles ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1882 of file EventProcessor.cc.

                                        {
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( ) [private]

Definition at line 997 of file EventProcessor.cc.

                             {
        LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
        child_fail_signal=0;
      } else if (child_fail_exit_status) {
        LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
        child_fail_exit_status=0;
      } else {
        LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
      }
      child_failed =false;
    }
  }

  bool
ActivityRegistry::PostProcessEvent& edm::EventProcessor::postProcessEventSignal ( ) [inline]

signal is emitted after all modules have finished processing the Event

Definition at line 208 of file EventProcessor.h.

References postProcessEventSignal_.

void edm::EventProcessor::prepareForNextLoop ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1959 of file EventProcessor.cc.

                                                 {
ActivityRegistry::PreProcessEvent& edm::EventProcessor::preProcessEventSignal ( ) [inline]

signal is emitted after the Event has been created by the InputSource but before any modules have seen the Event

Definition at line 203 of file EventProcessor.h.

References preProcessEventSignal_.

int edm::EventProcessor::readAndCacheLumi ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 2104 of file EventProcessor.cc.

                                            {
      throw edm::Exception(edm::errors::LogicError)
        << "EventProcessor::readAndCacheRun\n"
        << "Illegal attempt to insert lumi into cache\n"
        << "Run is invalid\n"
        << "Contact a Framework Developer\n";
    }
    principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
    principalCache_.lumiPrincipalPtr()->setRunPrincipal(principalCache_.runPrincipalPtr());
    return input_->luminosityBlock();
  }

  int EventProcessor::readAndMergeLumi() {
statemachine::Run edm::EventProcessor::readAndCacheRun ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 2087 of file EventProcessor.cc.

                                                  {
int edm::EventProcessor::readAndMergeLumi ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 2123 of file EventProcessor.cc.

                                                          {
statemachine::Run edm::EventProcessor::readAndMergeRun ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 2098 of file EventProcessor.cc.

                                       {
void edm::EventProcessor::readAndProcessEvent ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 2153 of file EventProcessor.cc.

    {
      typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
      ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
      schedule_->processOneOccurrence<Traits>(*pep, es);
      if(hasSubProcess()) {
        subProcess_->doEvent(*pep, ts);
      }
      sentry.allowThrow();
    }

    if(looper_) {
      bool randomAccess = input_->randomAccess();
      ProcessingController::ForwardState forwardState = input_->forwardState();
      ProcessingController::ReverseState reverseState = input_->reverseState();
      ProcessingController pc(forwardState, reverseState, randomAccess);

      EDLooperBase::Status status = EDLooperBase::kContinue;
      do {
        status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);

        bool succeeded = true;
        if(randomAccess) {
          if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
            input_->skipEvents(-2);
          }
          else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
            succeeded = input_->goToEvent(pc.specifiedEventTransition());
          }
        }
        pc.setLastOperationSucceeded(succeeded);
      } while(!pc.lastOperationSucceeded());
      if(status != EDLooperBase::kContinue) shouldWeStop_ = true;

    }

    FDEBUG(1) << "\tprocessEvent\n";
    pep->clearEventPrincipal();
  }

  bool EventProcessor::shouldWeStop() const {
void edm::EventProcessor::readFile ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1862 of file EventProcessor.cc.

References edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), and principalCache_.

                             {
      principalCache_.adjustIndexesAfterProductRegistryAddition();
    }
    principalCache_.adjustEventToNewProductRegistry(preg_);
    if(numberOfForkedChildren_ > 0) {
        fb_->setNotFastClonable(FileBlock::ParallelProcesses);
    }
  }

  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
void edm::EventProcessor::respondToCloseInputFile ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1906 of file EventProcessor.cc.

                                                {
void edm::EventProcessor::respondToCloseOutputFiles ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1922 of file EventProcessor.cc.

                                       {
void edm::EventProcessor::respondToOpenInputFile ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1898 of file EventProcessor.cc.

                                               {
void edm::EventProcessor::respondToOpenOutputFiles ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1914 of file EventProcessor.cc.

                                                 {
void edm::EventProcessor::rewindInput ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1953 of file EventProcessor.cc.

                                          {
EventProcessor::StatusCode edm::EventProcessor::run ( void  ) [inline]

Definition at line 384 of file EventProcessor.h.

References runToCompletion().

                      {
    return runToCompletion(false);
  }
void edm::EventProcessor::runAsync ( )

Definition at line 1531 of file EventProcessor.cc.

References asyncRun(), changeState(), edm::IEventProcessor::epSuccess, event_loop_, Exception, id_set_, last_rc_, edm::event_processor::mRunAsync, starter_, stop_count_, stop_lock_, and AlCaHLTBitMon_QueryRunRegistry::string.

    {
      boost::mutex::scoped_lock sl(stop_lock_);
      if(id_set_ == true) {
          std::string err("runAsync called while async event loop already running\n");
          LogError("FwkJob") << err;
          throw cms::Exception("BadState") << err;
      }

      changeState(mRunAsync);

      stop_count_ = 0;
      last_rc_ = epSuccess; // forget the last value!
      event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
      boost::xtime timeout;
#if BOOST_VERSION >= 105000
      boost::xtime_get(&timeout, boost::TIME_UTC_);
#else
      boost::xtime_get(&timeout, boost::TIME_UTC);
#endif
      timeout.sec += 60; // 60 seconds to start!!!!
      if(starter_.timed_wait(sl, timeout) == false) {
          // yikes - the thread did not start
          throw cms::Exception("BadState")
            << "Async run thread did not start in 60 seconds\n";
      }
    }
  }

  void EventProcessor::asyncRun(EventProcessor* me) {
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( bool  onlineStateTransitions) [virtual]

Implements edm::IEventProcessor.

Definition at line 1656 of file EventProcessor.cc.

Referenced by run().

    {
      beginJob(); //make sure this was called
      
      if(!onlineStateTransitions) changeState(mRunCount);
      
      //StatusCode returnCode = epSuccess;
      stateMachineWasInErrorState_ = false;
      
      // make the services available
      ServiceRegistry::Operate operate(serviceToken_);

      machine = createStateMachine();
      try {
        try {
          
          InputSource::ItemType itemType;
          
          while(true) {
            
            bool more = true;
            if(numberOfForkedChildren_ > 0) {
              size_t size = preg_->size();
              more = input_->skipForForking();
              if(more) {
                if(size < preg_->size()) {
                  principalCache_.adjustIndexesAfterProductRegistryAddition();
                }
                principalCache_.adjustEventToNewProductRegistry(preg_);
              }
            } 
            itemType = (more ? input_->nextItemType() : InputSource::IsStop);
            
            FDEBUG(1) << "itemType = " << itemType << "\n";
            
            // These are used for asynchronous running only and
            // and are checking to see if stopAsync or shutdownAsync
            // were called from another thread.  In the future, we
            // may need to do something better than polling the state.
            // With the current code this is the simplest thing and
            // it should always work.  If the interaction between
            // threads becomes more complex this may cause problems.
            if(state_ == sStopping) {
              FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
              forceLooperToEnd_ = true;
              machine->process_event(statemachine::Stop());
              forceLooperToEnd_ = false;
              break;
            }
            else if(state_ == sShuttingDown) {
              FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
              forceLooperToEnd_ = true;
              machine->process_event(statemachine::Stop());
              forceLooperToEnd_ = false;
              break;
            }
            
            // Look for a shutdown signal
            {
              boost::mutex::scoped_lock sl(usr2_lock);
              if(shutdown_flag) {
                changeState(mShutdownSignal);
                returnCode = epSignal;
                forceLooperToEnd_ = true;
                machine->process_event(statemachine::Stop());
                forceLooperToEnd_ = false;
                break;
              }
            }
            
            if(itemType == InputSource::IsStop) {
              machine->process_event(statemachine::Stop());
            }
            else if(itemType == InputSource::IsFile) {
              machine->process_event(statemachine::File());
            }
            else if(itemType == InputSource::IsRun) {
              machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
            }
            else if(itemType == InputSource::IsLumi) {
              machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
            }
            else if(itemType == InputSource::IsEvent) {
              machine->process_event(statemachine::Event());
            }
            // This should be impossible
            else {
              throw Exception(errors::LogicError)
              << "Unknown next item type passed to EventProcessor\n"
              << "Please report this error to the Framework group\n";
            }
            
            if(machine->terminated()) {
              changeState(mInputExhausted);
              break;
            }
          }  // End of loop over state machine events
        } // Try block
        catch (cms::Exception& e) { throw; }
        catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
        catch (std::exception& e) { convertException::stdToEDM(e); }
        catch(std::string& s) { convertException::stringToEDM(s); }
        catch(char const* c) { convertException::charPtrToEDM(c); }
        catch (...) { convertException::unknownToEDM(); }
      } // Try block
      // Some comments on exception handling related to the boost state machine:
      //
      // Some states used in the machine are special because they
      // perform actions while the machine is being terminated, actions
      // such as close files, call endRun, call endLumi etc ...  Each of these
      // states has two functions that perform these actions.  The functions
      // are almost identical.  The major difference is that one version
      // catches all exceptions and the other lets exceptions pass through.
      // The destructor catches them and the other function named "exit" lets
      // them pass through.  On a normal termination, boost will always call
      // "exit" and then the state destructor.  In our state classes, the
      // the destructors do nothing if the exit function already took
      // care of things.  Here's the interesting part.  When boost is
      // handling an exception the "exit" function is not called (a boost
      // feature).
      //
      // If an exception occurs while the boost machine is in control
      // (which usually means inside a process_event call), then
      // the boost state machine destroys its states and "terminates" itself.
      // This already done before we hit the catch blocks below. In this case
      // the call to terminateMachine below only destroys an already
      // terminated state machine.  Because exit is not called, the state destructors
      // handle cleaning up lumis, runs, and files.  The destructors swallow
      // all exceptions and only pass through the exceptions messages, which
      // are tacked onto the original exception below.
      //
      // If an exception occurs when the boost state machine is not
      // in control (outside the process_event functions), then boost
      // cannot destroy its own states.  The terminateMachine function
      // below takes care of that.  The flag "alreadyHandlingException"
      // is set true so that the state exit functions do nothing (and
      // cannot throw more exceptions while handling the first).  Then the
      // state destructors take care of this because exit did nothing.
      //
      // In both cases above, the EventProcessor::endOfLoop function is
      // not called because it can throw exceptions.
      //
      // One tricky aspect of the state machine is that things that can
      // throw should not be invoked by the state machine while another
      // exception is being handled.
      // Another tricky aspect is that it appears to be important to
      // terminate the state machine before invoking its destructor.
      // We've seen crashes that are not understood when that is not
      // done.  Maintainers of this code should be careful about this.
      
      catch (cms::Exception & e) {
        alreadyHandlingException_ = true;
        terminateMachine(machine);
        alreadyHandlingException_ = false;
        if (!exceptionMessageLumis_.empty()) {
          e.addAdditionalInfo(exceptionMessageLumis_);
          if (e.alreadyPrinted()) {
            LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
          }
        }
        if (!exceptionMessageRuns_.empty()) {
          e.addAdditionalInfo(exceptionMessageRuns_);
          if (e.alreadyPrinted()) {
            LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
          }
        }
        if (!exceptionMessageFiles_.empty()) {
          e.addAdditionalInfo(exceptionMessageFiles_);
          if (e.alreadyPrinted()) {
            LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
          }
        }
        throw;
      }
      
      if(machine->terminated()) {
        FDEBUG(1) << "The state machine reports it has been terminated\n";
        machine.reset();
      }
      
      if(!onlineStateTransitions) changeState(mFinished);
      
      if(stateMachineWasInErrorState_) {
        throw cms::Exception("BadState")
        << "The boost state machine in the EventProcessor exited after\n"
        << "entering the Error state.\n";
      }
      
    }
    if(machine.get() != 0) {
      terminateMachine(machine);
      throw Exception(errors::LogicError)
        << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
        << "Please report this error to the Framework group\n";
    }

    toerror.succeeded();

    return returnCode;
  }

  void EventProcessor::readFile() {
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message) [virtual]

Implements edm::IEventProcessor.

Definition at line 2210 of file EventProcessor.cc.

                                                                 {
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message) [virtual]

Implements edm::IEventProcessor.

Definition at line 2218 of file EventProcessor.cc.

                                                      {
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message) [virtual]

Implements edm::IEventProcessor.

Definition at line 2214 of file EventProcessor.cc.

                                                                  {
void edm::EventProcessor::setRunNumber ( RunNumber_t  runNumber)

Definition at line 1385 of file EventProcessor.cc.

void edm::EventProcessor::setupSignal ( ) [private]
bool edm::EventProcessor::shouldWeCloseOutput ( ) const [virtual]

Implements edm::IEventProcessor.

Definition at line 1964 of file EventProcessor.cc.

                                                                : schedule_->shouldWeCloseOutput();
  }

  void EventProcessor::doErrorStuff() {
bool edm::EventProcessor::shouldWeStop ( ) const [virtual]

Implements edm::IEventProcessor.

Definition at line 2204 of file EventProcessor.cc.

                                                                  {
EventProcessor::StatusCode edm::EventProcessor::shutdownAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1480 of file EventProcessor.cc.

                                  {
void edm::EventProcessor::startingNewLoop ( ) [virtual]

Implements edm::IEventProcessor.

Definition at line 1930 of file EventProcessor.cc.

References looper_.

                                      {
      looper_->doStartingNewLoop();
    }
    FDEBUG(1) << "\tstartingNewLoop\n";
  }

  bool EventProcessor::endOfLoop() {
char const * edm::EventProcessor::stateName ( event_processor::State  s) const
EventProcessor::StatusCode edm::EventProcessor::statusAsync ( ) const

Definition at line 1378 of file EventProcessor.cc.

EventProcessor::StatusCode edm::EventProcessor::stopAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1472 of file EventProcessor.cc.

                                                                          {
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine) [private]

Definition at line 2226 of file EventProcessor.cc.

References forceLooperToEnd_.

                                  {
        forceLooperToEnd_ = true;
        iMachine->process_event(statemachine::Stop());
        forceLooperToEnd_ = false;
      }
      else {
        FDEBUG(1) << "EventProcess::terminateMachine  The state machine was already terminated \n";
      }
      if(iMachine->terminated()) {
        FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
      }
      iMachine.reset();
    }
  }
}
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1328 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), and evf::FWEPWrapper::monitoring().

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1338 of file EventProcessor.cc.

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1333 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), and evf::FWEPWrapper::monitoring().

EventProcessor::StatusCode edm::EventProcessor::waitForAsyncCompletion ( unsigned int  timeout_seconds) [private]

Definition at line 1412 of file EventProcessor.cc.

    {
      boost::mutex::scoped_lock sl(stop_lock_);

      // look here - if runAsync not active, just return the last return code
      if(stop_count_ < 0) return last_rc_;

      if(timeout_seconds == 0) {
        while(stop_count_ == 0) stopper_.wait(sl);
      } else {
        while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
      }

      if(rc == false) {
          // timeout occurred
          // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
          // this is a temporary hack until we get the input source
          // upgraded to allow blocking input sources to be unblocked

          // the next line is dangerous and causes all sorts of trouble
          if(id_set_) pthread_cancel(event_loop_id_);

          // we will not do anything yet
          LogWarning("timeout")
            << "An asynchronous request was made to shut down "
            << "the event loop "
            << "and the event loop did not shutdown after "
            << timeout_seconds << " seconds\n";
      } else {
          event_loop_->join();
          event_loop_.reset();
          id_set_ = false;
          stop_count_ = -1;
      }
    }
    return rc == false ? epTimedOut : last_rc_;
  }

  EventProcessor::StatusCode
EventProcessor::StatusCode edm::EventProcessor::waitTillDoneAsync ( unsigned int  timeout_seconds = 0)

Definition at line 1464 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::stop().

void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
) [virtual]

Implements edm::IEventProcessor.

Definition at line 2141 of file EventProcessor.cc.

                                                                                                                      {
void edm::EventProcessor::writeRun ( statemachine::Run const &  run) [virtual]

Implements edm::IEventProcessor.

Definition at line 2129 of file EventProcessor.cc.

                                                                    {

Friends And Related Function Documentation

friend class event_processor::StateSentry [friend]

Definition at line 377 of file EventProcessor.h.


Member Data Documentation

std::unique_ptr<ActionTable const> edm::EventProcessor::act_table_ [private]

Definition at line 334 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_ [private]

Definition at line 327 of file EventProcessor.h.

Referenced by endLumi(), endRun(), and init().

Definition at line 364 of file EventProcessor.h.

Definition at line 329 of file EventProcessor.h.

Referenced by init().

Definition at line 372 of file EventProcessor.h.

Referenced by init().

Definition at line 360 of file EventProcessor.h.

Referenced by init().

Definition at line 333 of file EventProcessor.h.

Referenced by init().

Definition at line 332 of file EventProcessor.h.

Referenced by beginRun(), and init().

boost::shared_ptr<boost::thread> edm::EventProcessor::event_loop_ [private]

Definition at line 341 of file EventProcessor.h.

Referenced by runAsync().

volatile pthread_t edm::EventProcessor::event_loop_id_ [private]

Definition at line 351 of file EventProcessor.h.

Definition at line 376 of file EventProcessor.h.

Referenced by init().

Definition at line 361 of file EventProcessor.h.

Definition at line 363 of file EventProcessor.h.

Definition at line 362 of file EventProcessor.h.

std::unique_ptr<FileBlock> edm::EventProcessor::fb_ [private]

Definition at line 353 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_ [private]

Definition at line 359 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

Definition at line 367 of file EventProcessor.h.

Referenced by init().

Definition at line 365 of file EventProcessor.h.

Referenced by terminateMachine().

Definition at line 338 of file EventProcessor.h.

Referenced by init().

volatile bool edm::EventProcessor::id_set_ [private]

Definition at line 350 of file EventProcessor.h.

Referenced by runAsync().

std::unique_ptr<InputSource> edm::EventProcessor::input_ [private]

Definition at line 331 of file EventProcessor.h.

Referenced by init().

Definition at line 349 of file EventProcessor.h.

Definition at line 348 of file EventProcessor.h.

Referenced by runAsync().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_ [private]

Definition at line 354 of file EventProcessor.h.

Referenced by init(), and startingNewLoop().

Definition at line 366 of file EventProcessor.h.

Definition at line 352 of file EventProcessor.h.

Definition at line 369 of file EventProcessor.h.

Referenced by init().

Definition at line 370 of file EventProcessor.h.

Referenced by init().

Definition at line 326 of file EventProcessor.h.

Referenced by postProcessEventSignal().

boost::shared_ptr<ProductRegistry const> edm::EventProcessor::preg_ [private]

Definition at line 328 of file EventProcessor.h.

Referenced by init().

Definition at line 325 of file EventProcessor.h.

Referenced by preProcessEventSignal().

Definition at line 356 of file EventProcessor.h.

Referenced by init(), and readFile().

Definition at line 335 of file EventProcessor.h.

Referenced by init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_ [private]

Definition at line 336 of file EventProcessor.h.

Referenced by endLumi(), endRun(), and init().

Definition at line 330 of file EventProcessor.h.

Referenced by init().

Definition at line 371 of file EventProcessor.h.

Referenced by init().

Definition at line 357 of file EventProcessor.h.

boost::condition edm::EventProcessor::starter_ [private]

Definition at line 346 of file EventProcessor.h.

Referenced by runAsync().

Definition at line 340 of file EventProcessor.h.

Definition at line 343 of file EventProcessor.h.

Definition at line 358 of file EventProcessor.h.

volatile int edm::EventProcessor::stop_count_ [private]

Definition at line 347 of file EventProcessor.h.

Referenced by runAsync().

Definition at line 344 of file EventProcessor.h.

Referenced by runAsync().

boost::condition edm::EventProcessor::stopper_ [private]

Definition at line 345 of file EventProcessor.h.

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_ [private]

Definition at line 337 of file EventProcessor.h.

Referenced by endLumi(), endRun(), hasSubProcess(), and init().