CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

edm::DaqSource Class Reference

#include <DaqSource.h>

Inheritance diagram for edm::DaqSource:
edm::InputSource evf::ModuleWeb edm::ProductRegistryHelper

List of all members.

Public Member Functions

 DaqSource (const ParameterSet &pset, const InputSourceDescription &desc)
virtual ~DaqSource ()

Private Member Functions

virtual void closeBackDoor ()
void defaultWebPage (xgi::Input *in, xgi::Output *out)
int doMyBeginRun ()
virtual ItemType getNextItemType ()
virtual void openBackDoor (unsigned int, bool *)
virtual void publish (xdata::InfoSpace *)
virtual void publishForkInfo (evf::moduleweb::ForkInfoObj *forkInfoObj)
virtual void publishToXmas (xdata::InfoSpace *)
virtual EventPrincipalreadEvent_ (EventPrincipal &eventPrincipal)
virtual EventPrincipalreadIt (EventID const &eventID)
virtual boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary_ ()
virtual boost::shared_ptr
< RunAuxiliary
readRunAuxiliary_ ()
virtual void setLumi (LuminosityBlockNumber_t lb)
virtual void setRun (RunNumber_t r)
virtual void signalWaitingThreadAndBlock ()
virtual void skip (int offset)

Private Attributes

bool alignLsToLast_
bool beginRunTiming_
int bunchCrossing_
pthread_cond_t cond_
int count
DaqProvenanceHelper daqProvenanceHelper_
unsigned int eventCounter_
EventID eventID_
EventAuxiliary::ExperimentType evttype_
bool fakeLSid_
FEDRawDataCollectionfedCollection_
evf::moduleweb::ForkInfoObjforkInfo_
bool goToStopping
bool immediateStop
xdata::InfoSpace * is_
bool keepUsingPsidFromTrigger_
xdata::UnsignedInteger32 * lastLumiPrescaleIndex_
xdata::UnsignedInteger32 * lastLumiUsingEol_
xdata::Boolean * lsTimedOut_
xdata::Boolean * lsToBeRecovered_
LuminosityBlockNumber_t luminosityBlockNumber_
xdata::UnsignedInteger32 * lumiSectionIndex_
unsigned int lumiSegmentSizeInEvents_
unsigned int lumiSegmentSizeInSeconds_
xdata::InfoSpace * mis_
pthread_mutex_t mutex_
bool noMoreEvents_
int orbitNumber_
ProcessHistoryID phid_
xdata::UnsignedInteger32 * prescaleSetIndex_
DaqBaseReaderreader_
bool runFork_
RunNumber_t runNumber_
pthread_mutex_t signal_lock_
struct timeval startOfLastLumi
unsigned int thisEventLSid
timeval tvStat_
bool useEventCounter_
bool useTimer_

Detailed Description

Definition at line 39 of file DaqSource.h.


Constructor & Destructor Documentation

DaqSource::DaqSource ( const ParameterSet pset,
const InputSourceDescription desc 
) [explicit]

Definition at line 56 of file DaqSource.cc.

References edm::Timestamp::beginOfTime(), cms::Exception::category(), cond_, count, gather_cfg::cout, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, alignCSCRings::e, fakeLSid_, reco::get(), edm::ParameterSet::getUntrackedParameter(), lumiSegmentSizeInEvents_, lumiSegmentSizeInSeconds_, mutex_, phid_, edm::InputSource::productRegistryUpdate(), matplotRender::reader, reader_, runNumber_, DaqBaseReader::setRunNumber(), edm::InputSource::setTimestamp(), signal_lock_, startOfLastLumi, and AlCaHLTBitMon_QueryRunRegistry::string.

    : InputSource(pset,desc)
    , evf::ModuleWeb("DaqSource")
    , reader_(0)
    , lumiSegmentSizeInEvents_(pset.getUntrackedParameter<unsigned int>("evtsPerLS",0))
    , lumiSegmentSizeInSeconds_(pset.getUntrackedParameter<unsigned int>("secondsPerLS",0))
    , useEventCounter_(pset.getUntrackedParameter<bool>("useEventCounter",false))
    , useTimer_(lumiSegmentSizeInSeconds_!=0)
    , eventCounter_(0)
    , keepUsingPsidFromTrigger_(pset.getUntrackedParameter<bool>("keepUsingPsidFromTrigger",false))
    , fakeLSid_(lumiSegmentSizeInEvents_ != 0 || lumiSegmentSizeInSeconds_ != 0)
    , runNumber_(pset.getUntrackedParameter<unsigned int>("runNumber",RunID::firstValidRun().run()))
    , luminosityBlockNumber_(LuminosityBlockID::firstValidLuminosityBlock().luminosityBlock())
    , daqProvenanceHelper_(TypeID(typeid(FEDRawDataCollection)))
    , noMoreEvents_(false)
    , alignLsToLast_(false)
    , lumiSectionIndex_(0)
    , prescaleSetIndex_(0)
    , lastLumiPrescaleIndex_(0)
    , lastLumiUsingEol_(0)
    , lsTimedOut_(0)
    , lsToBeRecovered_(0)
    , is_(0)
    , mis_(0)
    , thisEventLSid(0)
    , goToStopping(false)
    , immediateStop(false)
    , forkInfo_(nullptr)
    , runFork_(false)
    , beginRunTiming_(false)
    , bunchCrossing_(EventAuxiliary::invalidBunchXing)
    , orbitNumber_(EventAuxiliary::invalidBunchXing)
    , evttype_(EventAuxiliary::Undefined)
    , eventID_(0,0,0)
    , fedCollection_(nullptr)
  {
    count = 0;
    pthread_mutex_init(&mutex_,0);
    pthread_mutex_init(&signal_lock_,0);
    pthread_cond_init(&cond_,0);

    std::cout << " DaqSourceFake will use fake ls id ? " << fakeLSid_ << std::endl; 
    if(fakeLSid_) std::cout << " DaqSourceFake set to make a new ls every " << lumiSegmentSizeInEvents_ << " events " << std::endl;
    if(fakeLSid_) std::cout << " DaqSourceFake set to make a new ls every " << lumiSegmentSizeInSeconds_ << " seconds " << std::endl;

    setTimestamp(Timestamp::beginOfTime());
    
    // Instantiate the requested data source
    std::string reader = pset.getUntrackedParameter<std::string>("readerPluginName");
    
    try{
      reader_=
        DaqReaderPluginFactory::get()->create(reader,
                                            pset.getUntrackedParameter<ParameterSet>("readerPset"));
      reader_->setRunNumber(runNumber_);
    }
    catch(edm::Exception &e) {
      if(e.category() == "Configuration" && reader_ == 0) {
        reader_ = DaqReaderPluginFactoryU::get()->create(reader);
        if(reader_ == 0) throw;
        else reader_->setRunNumber(runNumber_);
      }
      else {
        throw;
      }
    }

    // Initialize LS timer if requested
    if(lumiSegmentSizeInSeconds_ != 0) gettimeofday(&startOfLastLumi,0);

    // Initialize metadata, and save the process history ID for use every event.
    phid_ = daqProvenanceHelper_.daqInit(productRegistryUpdate());

  }
  
DaqSource::~DaqSource ( ) [virtual]

Definition at line 133 of file DaqSource.cc.


Member Function Documentation

void DaqSource::closeBackDoor ( ) [private, virtual]

Reimplemented from evf::ModuleWeb.

Definition at line 693 of file DaqSource.cc.

References cond_, count, lsTimedOut_, mutex_, and signal_lock_.

  {
    count--;
    pthread_cond_signal(&cond_);
    pthread_mutex_unlock(&mutex_);
    pthread_mutex_lock(&signal_lock_);
    if (lsTimedOut_ != 0)
      lsTimedOut_->value_ = false; 
  }

void DaqSource::defaultWebPage ( xgi::Input *  in,
xgi::Output *  out 
) [private, virtual]

Reimplemented from evf::ModuleWeb.

Definition at line 718 of file DaqSource.cc.

References alignCSCRings::e, exception, edm::first(), goToStopping, LaserDQM_cfg::input, evf::ModuleWeb::moduleName_, getHLTPrescaleColumns::path, pos, o2o::query, edm::second(), and AlCaHLTBitMon_QueryRunRegistry::string.

  {
      std::string path;
      std::string urn;
      std::string mname;
      std::string query;
      std::string original_referrer_;
      try 
        {
          cgicc::Cgicc cgi(in);
          if ( xgi::Utils::hasFormElement(cgi,"gotostopping") )
            {
              goToStopping=true;
            }
          if ( xgi::Utils::hasFormElement(cgi,"module") )
            mname = xgi::Utils::getFormElement(cgi, "module")->getValue();
          cgicc::CgiEnvironment cgie(in);
          if(original_referrer_ == "")
            original_referrer_ = cgie.getReferrer();
          path = cgie.getPathInfo();
          query = cgie.getQueryString();
        }
      catch (const std::exception & e) 
        {
          // don't care if it did not work
        }

      using std::endl;
      *out << "<html>"                                                   << endl;
      *out << "<head>"                                                   << endl;


      *out << "<STYLE type=\"text/css\"> #T1 {border-width: 2px; border: solid blue; text-align: center} </STYLE> "                                      << endl; 
      *out << "<link type=\"text/css\" rel=\"stylesheet\"";
      *out << " href=\"/" <<  urn
           << "/styles.css\"/>"                   << endl;

      *out << "<title>" << moduleName_
           << " MAIN</title>"                                            << endl;

      *out << "</head>"                                                  << endl;
      *out << "<body onload=\"loadXMLDoc()\">"                           << endl;
      *out << "<table border=\"0\" width=\"100%\">"                      << endl;
      *out << "<tr>"                                                     << endl;
      *out << "  <td align=\"left\">"                                    << endl;
      *out << "    <img"                                                 << endl;
      *out << "     align=\"middle\""                                    << endl;
      *out << "     src=\"/evf/images/bugicon.jpg\""                     << endl;
      *out << "     alt=\"main\""                                        << endl;
      *out << "     width=\"90\""                                        << endl;
      *out << "     height=\"64\""                                       << endl;
      *out << "     border=\"\"/>"                                       << endl;
      *out << "    <b>"                                                  << endl;
      *out <<             moduleName_                                    << endl;
      *out << "    </b>"                                                 << endl;
      *out << "  </td>"                                                  << endl;
      *out << "  <td width=\"32\">"                                      << endl;
      *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << endl;
      *out << "      <img"                                               << endl;
      *out << "       align=\"middle\""                                  << endl;
      *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << endl;
      *out << "       alt=\"HyperDAQ\""                                  << endl;
      *out << "       width=\"32\""                                      << endl;
      *out << "       height=\"32\""                                     << endl;
      *out << "       border=\"\"/>"                                     << endl;
      *out << "    </a>"                                                 << endl;
      *out << "  </td>"                                                  << endl;
      *out << "  <td width=\"32\">"                                      << endl;
      *out << "  </td>"                                                  << endl;
      *out << "  <td width=\"32\">"                                      << endl;
      *out << "    <a href=\"" << original_referrer_  << "\">"           << endl;
      *out << "      <img"                                               << endl;
      *out << "       align=\"middle\""                                  << endl;
      *out << "       src=\"/evf/images/spoticon.jpg\""                  << endl;
      *out << "       alt=\"main\""                                      << endl;
      *out << "       width=\"32\""                                      << endl;
      *out << "       height=\"32\""                                     << endl;
      *out << "       border=\"\"/>"                                     << endl;
      *out << "    </a>"                                                 << endl;
      *out << "  </td>"                                                  << endl;
      *out << "</tr>"                                                    << endl;
      *out << "</table>"                                                 << endl;

      *out << "<hr/>"                                                    << endl;
  
      *out << cgicc::form().set("method","GET").set("action", path ) 
           << std::endl;
      boost::char_separator<char> sep("&");
      boost::tokenizer<boost::char_separator<char> > tokens(query, sep);
      for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
           tok_iter != tokens.end(); ++tok_iter){
        size_t pos = (*tok_iter).find_first_of("=");
        if(pos != std::string::npos){
          std::string first  = (*tok_iter).substr(0    ,                        pos);
          std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
          *out << cgicc::input().set("type","hidden").set("name",first).set("value", second) 
               << std::endl;
        }
      }

      *out << cgicc::input().set("type","hidden").set("name","gotostopping").set("value","true")
           << std::endl;
      *out << cgicc::input().set("type","submit").set("value","Go To Stopping")              << std::endl;
      *out << cgicc::form()                                                << std::endl;  

      *out << "</body>"                                                  << endl;
      *out << "</html>"                                                  << endl;
  }
}
int DaqSource::doMyBeginRun ( ) [private]

Definition at line 150 of file DaqSource.cc.

References beginRunTiming_, evf::moduleweb::ForkInfoObj::control_sem_, gather_cfg::cout, evf::moduleweb::ForkInfoObj::forkHandler, forkInfo_, evf::moduleweb::ForkInfoObj::forkParams, evf::moduleweb::ForkInfoObj::fuAddr, immediateStop, evf::moduleweb::ForkInfoObj::lock(), edm::InputSource::newRun(), NULL, evf::moduleweb::ForkInfoObj::receivedStop_, evf::moduleweb::ForkInfoObj::stopCondition, tvStat_, and evf::moduleweb::ForkInfoObj::unlock().

Referenced by getNextItemType().

                   {
      while (!immediateStop) {
        //queue new run to Framework (causes EP beginRun to be executed)
        if (newRun()) {
          beginRunTiming_=true;
          gettimeofday(&tvStat_, NULL);
          return 2;
        }
        //measure time in fwk beginRun
        if (beginRunTiming_) {
          timeval tsTmp;
          gettimeofday(&tsTmp,NULL);
          long tusecs = (tsTmp.tv_sec-tvStat_.tv_sec)*1000000 + tsTmp.tv_usec - tvStat_.tv_usec;
          double tsecs = ((double)(tusecs/10000))/100.;
          std::cout << "DaqSource: FWK beginRun elapsed time: " << tsecs << " seconds in master EP"<< std::endl;
          edm::LogInfo("DaqSource") << "FWK beginRun elapsed time: " << tsecs << " seconds in master EP";
          beginRunTiming_=false;
          usleep(10000);//short sleep before fork
        }
        //first or new run init
        if (forkInfo_->forkParams.isMaster==-1) {
          forkInfo_->lock();//keeping it locked during init!
          forkInfo_->forkHandler(forkInfo_->fuAddr); //fork all slaves
        }
        if (forkInfo_->forkParams.isMaster==-1) {
          forkInfo_->unlock();
          std::cout << "ERROR (DaqSource): not notified to be either in  master or slave process after fork" << std::endl;
          return -2;
        }

        //slave process after fork: exit all this
        if (forkInfo_->forkParams.isMaster==0) {
          forkInfo_->unlock();
          return 1;
        }

        //master process after fork:
        if (forkInfo_->forkParams.isMaster==1) {
            forkInfo_->unlock();
            int slotToRestart=-1;
            sem_wait(forkInfo_->control_sem_);
            forkInfo_->lock();

            //got unblocked due to next run
            if (forkInfo_->forkParams.isMaster==-1) {
                    forkInfo_->unlock();
                    continue; // check here for newRun()?
            }
            //check if asked to stop
            immediateStop=forkInfo_->stopCondition;
            if (immediateStop) {
              forkInfo_->receivedStop_=true;
              break;
            }
            
            //check if asked to restart
            slotToRestart = forkInfo_->forkParams.slotId;

          if (slotToRestart==-1 && forkInfo_->forkParams.restart==0) {
            //this will deal with spurious semaphore signals when slave is killed
            forkInfo_->unlock();
            continue;
          }
          //restart single slave
          forkInfo_->forkHandler(forkInfo_->fuAddr);
        }
      }
      //loop exit
      forkInfo_->unlock();
      return 0;
    }
    return -1; //no forkInfo_
  }

InputSource::ItemType DaqSource::getNextItemType ( ) [private, virtual]

Implements edm::InputSource.

Definition at line 229 of file DaqSource.cc.

References doMyBeginRun(), and runFork_.

                  {
      runFork_=false;
      int queueNext = doMyBeginRun();
      //check if new run (requires returning IsRun once)
      if (queueNext == 2) runFork_=true;
    }

    //get initial time before beginRun (used with old forking)
    if (!forkInfo_ && newRun()) {
      beginRunTiming_=true;
      gettimeofday(&tvStat_, NULL);
    }

    if (immediateStop) return IsStop;

    // --------------
    if(goToStopping){noMoreEvents_ = true; goToStopping=false;}
    if (noMoreEvents_) {
      pthread_mutex_lock(&mutex_);
      pthread_cond_signal(&cond_);
      pthread_mutex_unlock(&mutex_);
      return IsStop;
    }
    if (newRun()) {
      return IsRun;
    }

    //calculate and print the beginRun the timing
    if (beginRunTiming_) {
      timeval tsTmp;
      gettimeofday(&tsTmp,NULL);
      long tusecs = (tsTmp.tv_sec-tvStat_.tv_sec)*1000000 + tsTmp.tv_usec - tvStat_.tv_usec;
      double tsecs = ((double)(tusecs/10000))/100.;
      std::cout << "DaqSource (slave pid "<< getpid() << " ): FWK beginRun elapsed time: " 
                << tsecs << " seconds "<< std::endl;
      edm::LogInfo("DaqSource") << "DaqSource (slave pid "<< getpid() << " ): FWK beginRun elapsed time: " 
                << tsecs << " seconds ";
      beginRunTiming_=false;
    }

    if (newLumi() && luminosityBlockAuxiliary()) {
      //      std::cout << "newLumi & lumiblock valid " << std::endl;
      return IsLumi;
    }
    if (alignLsToLast_) { //here we are recovering from a gap in Ls number so an event may already be cached but 
      // we hold onto it until we have issued all the necessary endLumi/beginLumi
      //       std::cout << getpid() << "alignLsToLast was set and ls number is " 
      //                << luminosityBlockNumber_ << " before signaling" << std::endl;
      signalWaitingThreadAndBlock();
      luminosityBlockNumber_++;
      //       std::cout << getpid() << "alignLsToLast signaled and incremented " 
      //                << luminosityBlockNumber_ << " eventcached " 
      //                << eventCached() << std::endl;
      setNewLumi();
      if (lumiSectionIndex_!=0)
        lumiSectionIndex_->value_ = luminosityBlockNumber_;
      resetLuminosityBlockAuxiliary();
      if(luminosityBlockNumber_ == thisEventLSid+1) 
      {
        alignLsToLast_ = false;
      }
      if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != luminosityBlockNumber_) {
        setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
              runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
        luminosityBlockAuxiliary()->setProcessHistoryID(phid_);

        //      std::cout << "nextItemType: dealt with new lumi block principal, retval is " << retval << std::endl;
      }
      return IsLumi;
    }
    if (eventCached()) {
      //      std::cout << "read event already cached " << std::endl;
      return IsEvent;
    }

    if(reader_ == 0) {
      throw edm::Exception(errors::LogicError)
          << "DaqSource is used without a reader. Check your configuration !";
    }
    TimeValue_t time = 0LL;
    timeval stv;
    gettimeofday(&stv,0);
    time = stv.tv_sec;
    time = (time << 32) + stv.tv_usec;
    Timestamp tstamp(time);

    bunchCrossing_ = EventAuxiliary::invalidBunchXing;
    orbitNumber_   = EventAuxiliary::invalidBunchXing;
    evttype_ = EventAuxiliary::Undefined;
    
    // pass a null pointer to fillRawData()!
    fedCollection_ = nullptr ;

  
    // let reader_ fill the fedCollection 
    int retval = reader_->fillRawData(eventID_, tstamp, fedCollection_);
    if(retval==0) {
      // fillRawData() failed, clean up the fedCollection in case it was allocated!
      if (nullptr != fedCollection_) delete fedCollection_;
      noMoreEvents_ = true;
      pthread_mutex_lock(&mutex_);
      pthread_cond_signal(&cond_);
      pthread_mutex_unlock(&mutex_);
      return IsStop;
    }
    else if(retval<0)
      {
 
        unsigned int nextLsFromSignal = (-1)*retval+1;
//      std::cout << getpid() << "::got end-of-lumi for " << (-1)*retval
//                << " was " << luminosityBlockNumber_ << std::endl;
        if(luminosityBlockNumber_ == (nextLsFromSignal-1) )
          {
            if (lastLumiUsingEol_ != 0)
              lastLumiUsingEol_->value_ = nextLsFromSignal;
            if(lsToBeRecovered_ != 0 && lsToBeRecovered_->value_){
//            std::cout << getpid() << "eol::recover ls::for " << (-1)*retval << std::endl;
              signalWaitingThreadAndBlock();
              luminosityBlockNumber_++;
              setNewLumi();
              if (lumiSectionIndex_ != 0)
                lumiSectionIndex_->value_ = luminosityBlockNumber_;
              resetLuminosityBlockAuxiliary();
              thisEventLSid = nextLsFromSignal - 1;
              if(luminosityBlockNumber_ != thisEventLSid+1) 
                alignLsToLast_ = true;
              //              std::cout << getpid() << "eol::::alignLsToLast_ " << alignLsToLast_ << std::endl;
            }
            else{
              //              std::cout << getpid() << "eol::realign ls::for " << (-1)*retval << std::endl;
              luminosityBlockNumber_ = nextLsFromSignal;
              setNewLumi();
              if (lumiSectionIndex_ != 0)
                lumiSectionIndex_->value_ = luminosityBlockNumber_;
              resetLuminosityBlockAuxiliary();
            }
          }
        else {
          if(nextLsFromSignal >(luminosityBlockNumber_+100) ) {
            edm::LogError("DaqSource") << "Got EOL event with value " << retval 
                                       << " nextLS would be " << nextLsFromSignal 
                                       << " while we expected " << luminosityBlockNumber_+1 << " - disregarding... ";
          }
          if (nextLsFromSignal > luminosityBlockNumber_+2) //recover on delta > 2
          {
              if (lastLumiUsingEol_ != 0)
                lastLumiUsingEol_->value_ = nextLsFromSignal;
              thisEventLSid=nextLsFromSignal-1;//set new LS
              signalWaitingThreadAndBlock();
              luminosityBlockNumber_++;
              setNewLumi();
              if (lumiSectionIndex_ != 0)
              lumiSectionIndex_->value_ = luminosityBlockNumber_;
              alignLsToLast_ = true;

              //set new lumi block
              resetLuminosityBlockAuxiliary();
              setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
                runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
              luminosityBlockAuxiliary()->setProcessHistoryID(phid_);
          }

        }
        //      else
        //        std::cout << getpid() << "::skipping end-of-lumi for " << (-1)*retval << std::endl;
      }
    else
      {
        if (eventID_.event() == 0) {
          throw edm::Exception(errors::LogicError)
            << "The reader used with DaqSource has returned an invalid (zero) event number!\n"
            << "Event numbers must begin at 1, not 0.";
        }
        EventSourceSentry(*this);
        setTimestamp(tstamp);
    
        unsigned char *gtpFedAddr = fedCollection_->FEDData(daqsource::gtpEvmId_).size()!=0 ? fedCollection_->FEDData(daqsource::gtpEvmId_).data() : 0;
        uint32_t gtpsize = 0;
        if(gtpFedAddr !=0) gtpsize = fedCollection_->FEDData(daqsource::gtpEvmId_).size();
        unsigned char *gtpeFedAddr = fedCollection_->FEDData(daqsource::gtpeId_).size()!=0 ? fedCollection_->FEDData(daqsource::gtpeId_).data() : 0; 

        unsigned int nextFakeLs = 0;
        eventCounter_++;
        if (fakeLSid_)
            evttype_ =  edm::EventAuxiliary::PhysicsTrigger;
        bool fakeLSFromEventCount = fakeLSid_ && (lumiSegmentSizeInEvents_ != 0);
        bool fakeLSFromTimer = fakeLSid_ && (lumiSegmentSizeInSeconds_ != 0);
        if(fakeLSFromEventCount && luminosityBlockNumber_ != 
           (nextFakeLs = useEventCounter_ ? ((eventCounter_-1)/lumiSegmentSizeInEvents_ + 1) :
            ((eventID_.event() - 1)/lumiSegmentSizeInEvents_ + 1))) {
            if (prescaleSetIndex_ != 0 && lastLumiPrescaleIndex_!= 0) {
              lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
              prescaleSetIndex_->value_ = 0; // since we do not know better but we want to be able to run
            }
          if(luminosityBlockNumber_ == nextFakeLs-1)
            signalWaitingThreadAndBlock();
          luminosityBlockNumber_ = nextFakeLs;
          thisEventLSid = nextFakeLs-1;
          setNewLumi();
          if (lumiSectionIndex_ != 0)
            lumiSectionIndex_->value_ = luminosityBlockNumber_;
          resetLuminosityBlockAuxiliary();
          if(keepUsingPsidFromTrigger_ && 
             gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
            if (prescaleSetIndex_ != 0)
              prescaleSetIndex_->value_  = (evf::evtn::getfdlpsc(gtpFedAddr) & 0xffff);
          }       
        }
        else if(fakeLSFromTimer){
          struct timeval tv;
          gettimeofday(&tv,0);
          unsigned int elapsed_time = tv.tv_sec - startOfLastLumi.tv_sec;
          //      std::cout << "daqsource elapsed time " << elapsed_time << std::endl;
          if(luminosityBlockNumber_ != 
             (nextFakeLs = elapsed_time/lumiSegmentSizeInSeconds_ + 1)) {
            if(prescaleSetIndex_ != 0){
              lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
              prescaleSetIndex_->value_ = 0; // since we do not know better but we want to be able to run
            }
            if(luminosityBlockNumber_ == (nextFakeLs-1))
              signalWaitingThreadAndBlock();
            luminosityBlockNumber_ = nextFakeLs;
            thisEventLSid = nextFakeLs-1;
            setNewLumi();
            if(lumiSectionIndex_!=0)
              lumiSectionIndex_->value_ = luminosityBlockNumber_;
            resetLuminosityBlockAuxiliary();
            if(keepUsingPsidFromTrigger_ && 
               gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
              if(prescaleSetIndex_ != 0)
                prescaleSetIndex_->value_  = (evf::evtn::getfdlpsc(gtpFedAddr) & 0xffff);
            }     
          }
        } 
        else if(!fakeLSid_){ 

          if(gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
            if (lastLumiPrescaleIndex_ != 0 && prescaleSetIndex_ != 0)
              lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
            thisEventLSid = evf::evtn::getlbn(gtpFedAddr);
            if (prescaleSetIndex_ != 0)
              prescaleSetIndex_->value_  = (evf::evtn::getfdlpsc(gtpFedAddr) & 0xffff);
            evttype_ =  edm::EventAuxiliary::ExperimentType(evf::evtn::getevtyp(gtpFedAddr));
            if(luminosityBlockNumber_ > (thisEventLSid + 1))
            {
              //late event,throw fwk exception
              std::ostringstream excptmsg;
              excptmsg << "DaqSource::event with late LS (" << thisEventLSid + 1 << ")received.";
              throw edm::Exception(errors::LogicError,excptmsg.str());
            }
            if(luminosityBlockNumber_ != (thisEventLSid + 1)){
              // we got here in a running process and some Ls might have been skipped so set the flag, 
              // increase by one, check and if appropriate set the flag then continue
              if(lsToBeRecovered_!=0 && lsToBeRecovered_->value_){
                //              std::cout << getpid() << "eve::recover ls::for " << thisEventLSid << std::endl;
                signalWaitingThreadAndBlock();
                luminosityBlockNumber_++;
                setNewLumi();
                if (lumiSectionIndex_ !=0 )
                  lumiSectionIndex_->value_ = luminosityBlockNumber_;
                resetLuminosityBlockAuxiliary();
                if(luminosityBlockNumber_ != thisEventLSid+1) alignLsToLast_ = true;
                //              std::cout << getpid() << "eve::::alignLsToLast_ " << alignLsToLast_ << std::endl;
              }
              else{ // we got here because the process was restarted. just realign the ls id and proceed with this event
                //              std::cout << getpid() << "eve::realign ls::for " << thisEventLSid << std::endl;
                luminosityBlockNumber_ = thisEventLSid + 1;
                setNewLumi();
                if (lumiSectionIndex_ !=0)
                  lumiSectionIndex_->value_ = luminosityBlockNumber_;
                resetLuminosityBlockAuxiliary();
                if (lsToBeRecovered_ !=0)
                  lsToBeRecovered_->value_ = true;
              }
            }
          }
          else if(gtpeFedAddr!=0 && evf::evtn::gtpe_board_sense(gtpeFedAddr)){
            if (lastLumiPrescaleIndex_ != 0 && prescaleSetIndex_ != 0)
              lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
            thisEventLSid = evf::evtn::gtpe_getlbn(gtpeFedAddr);
            if (prescaleSetIndex_ != 0)
              prescaleSetIndex_->value_ = 0; //waiting to get a PS index from gtpe
            evttype_ =  edm::EventAuxiliary::PhysicsTrigger; 
            if(luminosityBlockNumber_ != (thisEventLSid + 1)){
              if(luminosityBlockNumber_ == thisEventLSid)
                signalWaitingThreadAndBlock();
              luminosityBlockNumber_ = thisEventLSid + 1;
              setNewLumi();
              if (lumiSectionIndex_ !=0 )
                lumiSectionIndex_->value_ = luminosityBlockNumber_;
              resetLuminosityBlockAuxiliary();
            }
          }
        }
        if(gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
          bunchCrossing_ =  int(evf::evtn::getfdlbx(gtpFedAddr));
          orbitNumber_ =  int(evf::evtn::getorbit(gtpFedAddr));
          TimeValue_t time = evf::evtn::getgpshigh(gtpFedAddr);
          time = (time << 32) + evf::evtn::getgpslow(gtpFedAddr);
          Timestamp tstamp(time);
          setTimestamp(tstamp);      
        }
        else if(gtpeFedAddr!=0 && evf::evtn::gtpe_board_sense(gtpeFedAddr)){
          bunchCrossing_ =  int(evf::evtn::gtpe_getbx(gtpeFedAddr));
          orbitNumber_ =  int(evf::evtn::gtpe_getorbit(gtpeFedAddr));
        }
      }    
          
    //    std::cout << "lumiblockaux = " << luminosityBlockAuxiliary() << std::endl;
    // If there is no luminosity block principal, make one.
    if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != luminosityBlockNumber_) {
      setNewLumi();
      setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
        runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
      luminosityBlockAuxiliary()->setProcessHistoryID(phid_);

      //      std::cout << "nextItemType: dealt with new lumi block principal, retval is " << retval << std::endl;
    }
    //    std::cout << "here retval = " << retval << std::endl;
    if(retval<0){
      //      std::cout << getpid() << " returning from getnextitem because retval < 0 - IsLumi "
      //                << IsLumi << std::endl;
      if(newLumi()) return IsLumi; else return getNextItemType();
    }
    if (newLumi()) {
      return IsLumi;
    }
    setEventCached();
    return IsEvent;
  }

void DaqSource::openBackDoor ( unsigned int  timeout_sec,
bool *  running 
) [private, virtual]

Reimplemented from evf::ModuleWeb.

Definition at line 671 of file DaqSource.cc.

References CLOCK_REALTIME, cond_, count, lsTimedOut_, mutex_, NULL, and signal_lock_.

  {
    count++;
    if(count==2) throw;
    pthread_mutex_lock(&mutex_);
    if (running) *running=true;
    pthread_mutex_unlock(&signal_lock_);
    timespec ts;
#if _POSIX_TIMERS > 0
    clock_gettime(CLOCK_REALTIME, &ts);
#else
    struct timeval tv; 
    gettimeofday(&tv, NULL);
    ts.tv_sec = tv.tv_sec + 0;
    ts.tv_nsec = 0;
#endif
    ts.tv_sec += timeout_sec;

    int rc = pthread_cond_timedwait(&cond_, &mutex_, &ts);
    if(rc == ETIMEDOUT && lsTimedOut_ != 0) lsTimedOut_->value_ = true; 
  }
  
void DaqSource::publish ( xdata::InfoSpace *  is) [private, virtual]

Implements evf::ModuleWeb.

Definition at line 656 of file DaqSource.cc.

References is_, lastLumiPrescaleIndex_, lastLumiUsingEol_, lsTimedOut_, lsToBeRecovered_, lumiSectionIndex_, and prescaleSetIndex_.

  {
    is_ = is;
    lumiSectionIndex_      = (xdata::UnsignedInteger32*)is_->find("lumiSectionIndex");
    prescaleSetIndex_      = (xdata::UnsignedInteger32*)is_->find("prescaleSetIndex");
    lastLumiPrescaleIndex_ = (xdata::UnsignedInteger32*)is_->find("lastLumiPrescaleIndex");
    lastLumiUsingEol_ = (xdata::UnsignedInteger32*)is_->find("lastLumiUsingEol");
    lsTimedOut_            = (xdata::Boolean*)is_->find("lsTimedOut");
    lsToBeRecovered_       = (xdata::Boolean*)is_->find("lsToBeRecovered");
  }
  void DaqSource::publishToXmas(xdata::InfoSpace *is)
void DaqSource::publishForkInfo ( evf::moduleweb::ForkInfoObj forkInfoObj) [private, virtual]

Reimplemented from evf::ModuleWeb.

Definition at line 137 of file DaqSource.cc.

void DaqSource::publishToXmas ( xdata::InfoSpace *  is) [private, virtual]

Reimplemented from evf::ModuleWeb.

Definition at line 666 of file DaqSource.cc.

References mis_.

  {
    mis_ = is;
  }

EventPrincipal * DaqSource::readEvent_ ( EventPrincipal eventPrincipal) [private, virtual]

Implements edm::InputSource.

Definition at line 594 of file DaqSource.cc.

EventPrincipal * DaqSource::readIt ( EventID const &  eventID) [private, virtual]

Definition at line 643 of file DaqSource.cc.

boost::shared_ptr< LuminosityBlockAuxiliary > DaqSource::readLuminosityBlockAuxiliary_ ( ) [private, virtual]

Implements edm::InputSource.

Definition at line 584 of file DaqSource.cc.

boost::shared_ptr< RunAuxiliary > DaqSource::readRunAuxiliary_ ( ) [private, virtual]

Implements edm::InputSource.

Definition at line 574 of file DaqSource.cc.

void DaqSource::setLumi ( LuminosityBlockNumber_t  lb) [private, virtual]

Reimplemented from edm::InputSource.

Definition at line 636 of file DaqSource.cc.

void DaqSource::setRun ( RunNumber_t  r) [private, virtual]

Reimplemented from edm::InputSource.

Definition at line 563 of file DaqSource.cc.

void DaqSource::signalWaitingThreadAndBlock ( ) [private, virtual]

Definition at line 703 of file DaqSource.cc.

References cond_, is_, mutex_, and signal_lock_.

  {
    //check if we are running offline, in which case return immediately
    if(is_ == 0) return;
    pthread_mutex_lock(&signal_lock_);
    pthread_mutex_lock(&mutex_);
    pthread_mutex_unlock(&signal_lock_);
    //    std::cout << getpid() << " DS::signal from evloop " << std::endl;
    pthread_cond_signal(&cond_);
    //    std::cout << getpid() << " DS::go to wait for scalers wl " << std::endl;
    pthread_cond_wait(&cond_, &mutex_);
    pthread_mutex_unlock(&mutex_);
    ::usleep(1000);//allow other thread to lock
  }  

void DaqSource::skip ( int  offset) [private, virtual]

Reimplemented from edm::InputSource.

Definition at line 650 of file DaqSource.cc.


Member Data Documentation

Definition at line 83 of file DaqSource.h.

Definition at line 104 of file DaqSource.h.

Referenced by doMyBeginRun().

Definition at line 105 of file DaqSource.h.

pthread_cond_t edm::DaqSource::cond_ [private]

Definition at line 87 of file DaqSource.h.

Referenced by closeBackDoor(), DaqSource(), openBackDoor(), and signalWaitingThreadAndBlock().

int edm::DaqSource::count [private]

Definition at line 96 of file DaqSource.h.

Referenced by closeBackDoor(), DaqSource(), and openBackDoor().

Definition at line 80 of file DaqSource.h.

Referenced by DaqSource().

unsigned int edm::DaqSource::eventCounter_ [private]

Definition at line 74 of file DaqSource.h.

Definition at line 108 of file DaqSource.h.

Definition at line 107 of file DaqSource.h.

bool edm::DaqSource::fakeLSid_ [private]

Definition at line 76 of file DaqSource.h.

Referenced by DaqSource().

Definition at line 109 of file DaqSource.h.

Definition at line 101 of file DaqSource.h.

Referenced by doMyBeginRun().

Definition at line 98 of file DaqSource.h.

Referenced by defaultWebPage().

Definition at line 100 of file DaqSource.h.

Referenced by doMyBeginRun().

xdata::InfoSpace* edm::DaqSource::is_ [private]

Definition at line 94 of file DaqSource.h.

Referenced by publish(), and signalWaitingThreadAndBlock().

Definition at line 75 of file DaqSource.h.

xdata::UnsignedInteger32* edm::DaqSource::lastLumiPrescaleIndex_ [private]

Definition at line 90 of file DaqSource.h.

Referenced by publish().

xdata::UnsignedInteger32* edm::DaqSource::lastLumiUsingEol_ [private]

Definition at line 91 of file DaqSource.h.

Referenced by publish().

xdata::Boolean* edm::DaqSource::lsTimedOut_ [private]

Definition at line 92 of file DaqSource.h.

Referenced by closeBackDoor(), openBackDoor(), and publish().

xdata::Boolean* edm::DaqSource::lsToBeRecovered_ [private]

Definition at line 93 of file DaqSource.h.

Referenced by publish().

Definition at line 79 of file DaqSource.h.

xdata::UnsignedInteger32* edm::DaqSource::lumiSectionIndex_ [private]

Definition at line 88 of file DaqSource.h.

Referenced by publish().

Definition at line 70 of file DaqSource.h.

Referenced by DaqSource().

Definition at line 71 of file DaqSource.h.

Referenced by DaqSource().

xdata::InfoSpace* edm::DaqSource::mis_ [private]

Definition at line 95 of file DaqSource.h.

Referenced by publishToXmas().

pthread_mutex_t edm::DaqSource::mutex_ [private]

Definition at line 85 of file DaqSource.h.

Referenced by closeBackDoor(), DaqSource(), openBackDoor(), and signalWaitingThreadAndBlock().

Definition at line 82 of file DaqSource.h.

Definition at line 106 of file DaqSource.h.

Definition at line 81 of file DaqSource.h.

Referenced by DaqSource().

xdata::UnsignedInteger32* edm::DaqSource::prescaleSetIndex_ [private]

Definition at line 89 of file DaqSource.h.

Referenced by publish().

Definition at line 69 of file DaqSource.h.

Referenced by DaqSource().

bool edm::DaqSource::runFork_ [private]

Definition at line 102 of file DaqSource.h.

Referenced by getNextItemType().

Definition at line 78 of file DaqSource.h.

Referenced by DaqSource().

pthread_mutex_t edm::DaqSource::signal_lock_ [private]

Definition at line 86 of file DaqSource.h.

Referenced by closeBackDoor(), DaqSource(), openBackDoor(), and signalWaitingThreadAndBlock().

struct timeval edm::DaqSource::startOfLastLumi [private]

Definition at line 99 of file DaqSource.h.

Referenced by DaqSource().

unsigned int edm::DaqSource::thisEventLSid [private]

Definition at line 97 of file DaqSource.h.

timeval edm::DaqSource::tvStat_ [private]

Definition at line 103 of file DaqSource.h.

Referenced by doMyBeginRun().

Definition at line 72 of file DaqSource.h.

bool edm::DaqSource::useTimer_ [private]

Definition at line 73 of file DaqSource.h.