CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_3/src/IORawData/DaqSource/plugins/DaqSource.cc

Go to the documentation of this file.
00001 
00008 #include "DaqSource.h"
00009 
00010 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00011 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00012 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00013 
00014 #include "IORawData/DaqSource/interface/DaqBaseReader.h"
00015 #include "IORawData/DaqSource/interface/DaqReaderPluginFactory.h"
00016 
00017 #include "DataFormats/Provenance/interface/Timestamp.h" 
00018 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00019 #include "FWCore/Framework/interface/EventPrincipal.h"
00020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00021 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00022 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00023 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00024 #include "DataFormats/Provenance/interface/EventID.h"
00025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00026 
00027 #include <string>
00028 #include <iostream>
00029 #include <time.h>
00030 #include <sys/time.h>
00031 #include <sys/types.h>
00032 
00033 #include "xgi/Method.h"
00034 #include "xgi/Utils.h"
00035 
00036 #include "cgicc/Cgicc.h"
00037 #include "cgicc/FormEntry.h"
00038 #include "cgicc/HTMLClasses.h"
00039 
00040 #include "boost/tokenizer.hpp"
00041 
00042 
00044 // construction/destruction
00046 
00047 
00048 
00049 namespace edm {
00050  namespace daqsource{
00051   static unsigned int gtpEvmId_ =  FEDNumbering::MINTriggerGTPFEDID;
00052   static unsigned int gtpeId_ =  FEDNumbering::MINTriggerEGTPFEDID;
00053  }
00054 
00055   //______________________________________________________________________________
00056   DaqSource::DaqSource(const ParameterSet& pset, 
00057                      const InputSourceDescription& desc) 
00058     : InputSource(pset,desc)
00059     , evf::ModuleWeb("DaqSource")
00060     , reader_(0)
00061     , lumiSegmentSizeInEvents_(pset.getUntrackedParameter<unsigned int>("evtsPerLS",0))
00062     , useEventCounter_(pset.getUntrackedParameter<bool>("useEventCounter",false))
00063     , eventCounter_(0)
00064     , keepUsingPsidFromTrigger_(pset.getUntrackedParameter<bool>("keepUsingPsidFromTrigger",false))
00065     , fakeLSid_(lumiSegmentSizeInEvents_ != 0)
00066     , runNumber_(RunID::firstValidRun().run())
00067     , luminosityBlockNumber_(LuminosityBlockID::firstValidLuminosityBlock().luminosityBlock())
00068     , daqProvenanceHelper_(TypeID(typeid(FEDRawDataCollection)))
00069     , noMoreEvents_(false)
00070     , newRun_(true)
00071     , newLumi_(true)
00072     , eventCached_(false)
00073     , alignLsToLast_(false)
00074     , is_(0)
00075     , mis_(0)
00076     , thisEventLSid(0)
00077     , goToStopping(false)
00078     , immediateStop(false)
00079     , forkInfo_(nullptr)
00080     , runFork_(false)
00081     , beginRunTiming_(false)
00082   {
00083     count = 0;
00084     pthread_mutex_init(&mutex_,0);
00085     pthread_mutex_init(&signal_lock_,0);
00086     pthread_cond_init(&cond_,0);
00087 
00088 
00089     setTimestamp(Timestamp::beginOfTime());
00090     
00091     // Instantiate the requested data source
00092     std::string reader = pset.getUntrackedParameter<std::string>("readerPluginName");
00093     
00094     try{
00095       reader_=
00096         DaqReaderPluginFactory::get()->create(reader,
00097                                             pset.getUntrackedParameter<ParameterSet>("readerPset"));
00098       reader_->setRunNumber(runNumber_);
00099     }
00100     catch(edm::Exception &e) {
00101       if(e.category() == "Configuration" && reader_ == 0) {
00102         reader_ = DaqReaderPluginFactoryU::get()->create(reader);
00103         if(reader_ == 0) throw;
00104         else reader_->setRunNumber(runNumber_);
00105       }
00106       else {
00107         throw;
00108       }
00109     }
00110 
00111    // Initialize metadata, and save the process history ID for use every event.
00112    phid_ = daqProvenanceHelper_.daqInit(productRegistryUpdate());
00113 
00114   }
00115   
00116   //______________________________________________________________________________
00117   DaqSource::~DaqSource() {
00118     delete reader_;
00119   }
00120   
00121   void DaqSource::publishForkInfo(evf::moduleweb::ForkInfoObj * forkInfoObj) {
00122     forkInfo_ = forkInfoObj;
00123     runFork_=true;
00124     immediateStop=false;
00125     noMoreEvents_=false;
00126   }
00127   
00129   // implementation of member functions
00131 
00132 
00133   //______________________________________________________________________________
00134   int DaqSource::doMyBeginRun() {
00135 
00136     if (forkInfo_) {
00137       while (!immediateStop) {
00138         //queue new run to Framework (causes EP beginRun to be executed)
00139         if (newRun_) {
00140           beginRunTiming_=true;
00141           gettimeofday(&tvStat_, NULL);
00142           return 2;
00143         }
00144         //measure time in fwk beginRun
00145         if (beginRunTiming_) {
00146           timeval tsTmp;
00147           gettimeofday(&tsTmp,NULL);
00148           long tusecs = (tsTmp.tv_sec-tvStat_.tv_sec)*1000000 + tsTmp.tv_usec - tvStat_.tv_usec;
00149           double tsecs = ((double)(tusecs/10000))/100.;
00150           std::cout << "DaqSource: FWK beginRun elapsed time: " << tsecs << " seconds in master EP"<< std::endl;
00151           edm::LogInfo("DaqSource") << "FWK beginRun elapsed time: " << tsecs << " seconds in master EP";
00152           beginRunTiming_=false;
00153           usleep(10000);//short sleep before fork
00154         }
00155         //first or new run init
00156         if (forkInfo_->forkParams.isMaster==-1) {
00157           forkInfo_->lock();//keeping it locked during init!
00158           forkInfo_->forkHandler(forkInfo_->fuAddr); //fork all slaves
00159         }
00160         if (forkInfo_->forkParams.isMaster==-1) {
00161           forkInfo_->unlock();
00162           std::cout << "ERROR (DaqSource): not notified to be either in  master or slave process after fork" << std::endl;
00163           return -2;
00164         }
00165 
00166         //slave process after fork: exit all this
00167         if (forkInfo_->forkParams.isMaster==0) {
00168           forkInfo_->unlock();
00169           return 1;
00170         }
00171 
00172         //master process after fork:
00173         if (forkInfo_->forkParams.isMaster==1) {
00174             forkInfo_->unlock();
00175             int slotToRestart=-1;
00176             sem_wait(forkInfo_->control_sem_);
00177             forkInfo_->lock();
00178 
00179             //got unblocked due to next run
00180             if (forkInfo_->forkParams.isMaster==-1) {
00181                     forkInfo_->unlock();
00182                     continue; // check here for newRun_?
00183             }
00184             //check if asked to stop
00185             immediateStop=forkInfo_->stopCondition;
00186             if (immediateStop) {
00187               forkInfo_->receivedStop_=true;
00188               break;
00189             }
00190             
00191             //check if asked to restart
00192             slotToRestart = forkInfo_->forkParams.slotId;
00193 
00194           if (slotToRestart==-1 && forkInfo_->forkParams.restart==0) {
00195             //this will deal with spurious semaphore signals when slave is killed
00196             forkInfo_->unlock();
00197             continue;
00198           }
00199           //restart single slave
00200           forkInfo_->forkHandler(forkInfo_->fuAddr);
00201         }
00202       }
00203       //loop exit
00204       forkInfo_->unlock();
00205       return 0;
00206     }
00207     return -1; //no forkInfo_
00208   }
00209 
00210 
00211   //______________________________________________________________________________
00212   InputSource::ItemType 
00213   DaqSource::getNextItemType() {
00214     //    std::cout << getpid() << " enter getNextItemType " << std::endl;
00215     if (runFork_) {
00216       runFork_=false;
00217       int queueNext = doMyBeginRun();
00218       //check if new run (requires returning IsRun once)
00219       if (queueNext == 2) runFork_=true;
00220     }
00221 
00222     //get initial time before beginRun (used with old forking)
00223     if (!forkInfo_ && newRun_) {
00224       beginRunTiming_=true;
00225       gettimeofday(&tvStat_, NULL);
00226     }
00227 
00228     if (immediateStop) return IsStop;
00229 
00230     // --------------
00231     if(goToStopping){noMoreEvents_ = true; goToStopping=false;}
00232     if (noMoreEvents_) {
00233       pthread_mutex_lock(&mutex_);
00234       pthread_cond_signal(&cond_);
00235       pthread_mutex_unlock(&mutex_);
00236       return IsStop;
00237     }
00238     if (newRun_) {
00239       return IsRun;
00240     }
00241 
00242     //calculate and print the beginRun the timing
00243     if (beginRunTiming_) {
00244       timeval tsTmp;
00245       gettimeofday(&tsTmp,NULL);
00246       long tusecs = (tsTmp.tv_sec-tvStat_.tv_sec)*1000000 + tsTmp.tv_usec - tvStat_.tv_usec;
00247       double tsecs = ((double)(tusecs/10000))/100.;
00248       std::cout << "DaqSource (slave pid "<< getpid() << " ): FWK beginRun elapsed time: " 
00249                 << tsecs << " seconds "<< std::endl;
00250       edm::LogInfo("DaqSource") << "DaqSource (slave pid "<< getpid() << " ): FWK beginRun elapsed time: " 
00251                 << tsecs << " seconds ";
00252       beginRunTiming_=false;
00253     }
00254 
00255     if (newLumi_ && luminosityBlockAuxiliary()) {
00256       //      std::cout << "newLumi & lumiblock valid " << std::endl;
00257       return IsLumi;
00258     }
00259     if (alignLsToLast_) { //here we are recovering from a gap in Ls number so an event may already be cached but 
00260       // we hold onto it until we have issued all the necessary endLumi/beginLumi
00261       //       std::cout << getpid() << "alignLsToLast was set and ls number is " 
00262       //                << luminosityBlockNumber_ << " before signaling" << std::endl;
00263       signalWaitingThreadAndBlock();
00264       luminosityBlockNumber_++;
00265       //       std::cout << getpid() << "alignLsToLast signaled and incremented " 
00266       //                << luminosityBlockNumber_ << " eventcached " 
00267       //                << eventCached_ << std::endl;
00268       newLumi_ = true;
00269       lumiSectionIndex_->value_ = luminosityBlockNumber_;
00270       resetLuminosityBlockAuxiliary();
00271       if(luminosityBlockNumber_ == thisEventLSid+1) 
00272       {
00273         alignLsToLast_ = false;
00274       }
00275       if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != luminosityBlockNumber_) {
00276         setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
00277               runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
00278         luminosityBlockAuxiliary()->setProcessHistoryID(phid_);
00279 
00280         //      std::cout << "nextItemType: dealt with new lumi block principal, retval is " << retval << std::endl;
00281       }
00282       return IsLumi;
00283     }
00284     if (eventCached_) {
00285       //      std::cout << "read event already cached " << std::endl;
00286       return IsEvent;
00287     }
00288     if(reader_ == 0) {
00289       throw edm::Exception(errors::LogicError)
00290           << "DaqSource is used without a reader. Check your configuration !";
00291     }
00292     EventID eventId;
00293     TimeValue_t time = 0LL;
00294     timeval stv;
00295     gettimeofday(&stv,0);
00296     time = stv.tv_sec;
00297     time = (time << 32) + stv.tv_usec;
00298     Timestamp tstamp(time);
00299 
00300     int bunchCrossing = EventAuxiliary::invalidBunchXing;
00301     int orbitNumber   = EventAuxiliary::invalidBunchXing;
00302     
00303     // pass a 0 pointer to fillRawData()!
00304     FEDRawDataCollection* fedCollection(0);
00305 
00306     edm::EventAuxiliary::ExperimentType evttype = EventAuxiliary::Undefined;
00307   
00308     // let reader_ fill the fedCollection 
00309     int retval = reader_->fillRawData(eventId, tstamp, fedCollection);
00310     if(retval==0) {
00311       // fillRawData() failed, clean up the fedCollection in case it was allocated!
00312       if (0 != fedCollection) delete fedCollection;
00313       noMoreEvents_ = true;
00314       pthread_mutex_lock(&mutex_);
00315       pthread_cond_signal(&cond_);
00316       pthread_mutex_unlock(&mutex_);
00317       return IsStop;
00318     }
00319     else if(retval<0)
00320       {
00321  
00322         unsigned int nextLsFromSignal = (-1)*retval+1;
00323 //      std::cout << getpid() << "::got end-of-lumi for " << (-1)*retval
00324 //                << " was " << luminosityBlockNumber_ << std::endl;
00325         if(luminosityBlockNumber_ == (nextLsFromSignal-1) )
00326           {
00327             lastLumiUsingEol_->value_ = nextLsFromSignal;
00328             if(lsToBeRecovered_->value_){
00329 //            std::cout << getpid() << "eol::recover ls::for " << (-1)*retval << std::endl;
00330               signalWaitingThreadAndBlock();
00331               luminosityBlockNumber_++;
00332               newLumi_ = true;
00333               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00334               resetLuminosityBlockAuxiliary();
00335               thisEventLSid = nextLsFromSignal - 1;
00336               if(luminosityBlockNumber_ != thisEventLSid+1) 
00337                 alignLsToLast_ = true;
00338               //              std::cout << getpid() << "eol::::alignLsToLast_ " << alignLsToLast_ << std::endl;
00339             }
00340             else{
00341               //              std::cout << getpid() << "eol::realign ls::for " << (-1)*retval << std::endl;
00342               luminosityBlockNumber_ = nextLsFromSignal;
00343               newLumi_ = true;
00344               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00345               resetLuminosityBlockAuxiliary();
00346             }
00347           }
00348         else {
00349           if(nextLsFromSignal >(luminosityBlockNumber_+100) ) {
00350             edm::LogError("DaqSource") << "Got EOL event with value " << retval 
00351                                        << " nextLS would be " << nextLsFromSignal 
00352                                        << " while we expected " << luminosityBlockNumber_+1 << " - disregarding... ";
00353           }
00354           if (nextLsFromSignal > luminosityBlockNumber_+2) //recover on delta > 2
00355           {
00356               lastLumiUsingEol_->value_ = nextLsFromSignal;
00357               thisEventLSid=nextLsFromSignal-1;//set new LS
00358               signalWaitingThreadAndBlock();
00359               luminosityBlockNumber_++;
00360               newLumi_ = true;
00361               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00362               alignLsToLast_ = true;
00363 
00364               //set new lumi block
00365               resetLuminosityBlockAuxiliary();
00366               setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
00367                 runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
00368               luminosityBlockAuxiliary()->setProcessHistoryID(phid_);
00369           }
00370 
00371         }
00372         //      else
00373         //        std::cout << getpid() << "::skipping end-of-lumi for " << (-1)*retval << std::endl;
00374       }
00375     else
00376       {
00377         if (eventId.event() == 0) {
00378           throw edm::Exception(errors::LogicError)
00379             << "The reader used with DaqSource has returned an invalid (zero) event number!\n"
00380             << "Event numbers must begin at 1, not 0.";
00381         }
00382         EventSourceSentry(*this);
00383         setTimestamp(tstamp);
00384     
00385         unsigned char *gtpFedAddr = fedCollection->FEDData(daqsource::gtpEvmId_).size()!=0 ? fedCollection->FEDData(daqsource::gtpEvmId_).data() : 0;
00386         uint32_t gtpsize = 0;
00387         if(gtpFedAddr !=0) gtpsize = fedCollection->FEDData(daqsource::gtpEvmId_).size();
00388         unsigned char *gtpeFedAddr = fedCollection->FEDData(daqsource::gtpeId_).size()!=0 ? fedCollection->FEDData(daqsource::gtpeId_).data() : 0; 
00389 
00390         unsigned int nextFakeLs = 0;
00391         eventCounter_++;
00392         if (fakeLSid_)
00393             evttype =  edm::EventAuxiliary::PhysicsTrigger; 
00394         if(fakeLSid_ && luminosityBlockNumber_ != 
00395            (nextFakeLs = useEventCounter_ ? ((eventCounter_-1)/lumiSegmentSizeInEvents_ + 1) :
00396             ((eventId.event() - 1)/lumiSegmentSizeInEvents_ + 1))) {
00397           lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
00398           prescaleSetIndex_->value_ = 0; // since we do not know better but we want to be able to run
00399          
00400           if(luminosityBlockNumber_ == nextFakeLs-1)
00401             signalWaitingThreadAndBlock();
00402           luminosityBlockNumber_ = nextFakeLs;
00403           thisEventLSid = nextFakeLs-1;
00404           newLumi_ = true;
00405           lumiSectionIndex_->value_ = luminosityBlockNumber_;
00406           resetLuminosityBlockAuxiliary();
00407           if(keepUsingPsidFromTrigger_ && 
00408              gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
00409             prescaleSetIndex_->value_  = (evf::evtn::getfdlpsc(gtpFedAddr) & 0xffff);
00410           }       
00411         }
00412         else if(!fakeLSid_){ 
00413 
00414           if(gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
00415             lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
00416             thisEventLSid = evf::evtn::getlbn(gtpFedAddr);
00417             prescaleSetIndex_->value_  = (evf::evtn::getfdlpsc(gtpFedAddr) & 0xffff);
00418             evttype =  edm::EventAuxiliary::ExperimentType(evf::evtn::getevtyp(gtpFedAddr));
00419             if(luminosityBlockNumber_ != (thisEventLSid + 1)){
00420               // we got here in a running process and some Ls might have been skipped so set the flag, 
00421               // increase by one, check and if appropriate set the flag then continue
00422               if(lsToBeRecovered_->value_){
00423                 //              std::cout << getpid() << "eve::recover ls::for " << thisEventLSid << std::endl;
00424                 signalWaitingThreadAndBlock();
00425                 luminosityBlockNumber_++;
00426                 newLumi_ = true;
00427                 lumiSectionIndex_->value_ = luminosityBlockNumber_;
00428                 resetLuminosityBlockAuxiliary();
00429                 if(luminosityBlockNumber_ != thisEventLSid+1) alignLsToLast_ = true;
00430                 //              std::cout << getpid() << "eve::::alignLsToLast_ " << alignLsToLast_ << std::endl;
00431               }
00432               else{ // we got here because the process was restarted. just realign the ls id and proceed with this event
00433                 //              std::cout << getpid() << "eve::realign ls::for " << thisEventLSid << std::endl;
00434                 luminosityBlockNumber_ = thisEventLSid + 1;
00435                 newLumi_ = true;
00436                 lumiSectionIndex_->value_ = luminosityBlockNumber_;
00437                 resetLuminosityBlockAuxiliary();
00438                 lsToBeRecovered_->value_ = true;
00439               }
00440             }
00441           }
00442           else if(gtpeFedAddr!=0 && evf::evtn::gtpe_board_sense(gtpeFedAddr)){
00443             lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
00444             thisEventLSid = evf::evtn::gtpe_getlbn(gtpeFedAddr);
00445             prescaleSetIndex_->value_ = 0; //waiting to get a PS index from gtpe
00446             evttype =  edm::EventAuxiliary::PhysicsTrigger; 
00447             if(luminosityBlockNumber_ != (thisEventLSid + 1)){
00448               if(luminosityBlockNumber_ == thisEventLSid)
00449                 signalWaitingThreadAndBlock();
00450               luminosityBlockNumber_ = thisEventLSid + 1;
00451               newLumi_ = true;
00452               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00453               resetLuminosityBlockAuxiliary();
00454             }
00455           }
00456         }
00457         if(gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
00458           bunchCrossing =  int(evf::evtn::getfdlbx(gtpFedAddr));
00459           orbitNumber =  int(evf::evtn::getorbit(gtpFedAddr));
00460           TimeValue_t time = evf::evtn::getgpshigh(gtpFedAddr);
00461           time = (time << 32) + evf::evtn::getgpslow(gtpFedAddr);
00462           Timestamp tstamp(time);
00463           setTimestamp(tstamp);      
00464         }
00465         else if(gtpeFedAddr!=0 && evf::evtn::gtpe_board_sense(gtpeFedAddr)){
00466           bunchCrossing =  int(evf::evtn::gtpe_getbx(gtpeFedAddr));
00467           orbitNumber =  int(evf::evtn::gtpe_getorbit(gtpeFedAddr));
00468         }
00469       }    
00470           
00471     //    std::cout << "lumiblockaux = " << luminosityBlockAuxiliary() << std::endl;
00472     // If there is no luminosity block principal, make one.
00473     if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != luminosityBlockNumber_) {
00474       newLumi_ = true;
00475       setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
00476         runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
00477       luminosityBlockAuxiliary()->setProcessHistoryID(phid_);
00478 
00479       //      std::cout << "nextItemType: dealt with new lumi block principal, retval is " << retval << std::endl;
00480     }
00481     //    std::cout << "here retval = " << retval << std::endl;
00482     if(retval<0){
00483       //      std::cout << getpid() << " returning from getnextitem because retval < 0 - IsLumi "
00484       //                << IsLumi << std::endl;
00485       if(newLumi_) return IsLumi; else return getNextItemType();
00486     }
00487 
00488     // make a brand new event principal
00489     eventId = EventID(runNumber_,thisEventLSid+1, eventId.event());
00490     EventAuxiliary eventAux(eventId, processGUID(),
00491                             timestamp(),
00492                             true,
00493                             evttype,
00494                             bunchCrossing,
00495                             EventAuxiliary::invalidStoreNumber,
00496                             orbitNumber);
00497     eventAux.setProcessHistoryID(phid_);
00498     eventPrincipalCache()->fillEventPrincipal(eventAux, boost::shared_ptr<LuminosityBlockPrincipal>());
00499     eventCached_ = true;
00500     
00501     // have fedCollection managed by a std::auto_ptr<>
00502     std::auto_ptr<FEDRawDataCollection> bare_product(fedCollection);
00503 
00504     WrapperOwningHolder edp(new Wrapper<FEDRawDataCollection>(bare_product), Wrapper<FEDRawDataCollection>::getInterface());
00505     eventPrincipalCache()->put(daqProvenanceHelper_.constBranchDescription_, edp, daqProvenanceHelper_.dummyProvenance_);
00506 
00507 /*
00508     Event e(*eventPrincipalCache(), md_);
00509     // put the fed collection into the transient event store
00510     e.put(bare_product);
00511     // The commit is needed to complete the "put" transaction.
00512     e.commit_();
00513 */
00514     if (newLumi_) {
00515       return IsLumi;
00516     }
00517     return IsEvent;
00518   }
00519 
00520   void
00521   DaqSource::setRun(RunNumber_t r) {
00522     assert(!eventCached_);
00523     reset();
00524     newRun_ = newLumi_ = true;
00525     runNumber_ = r;
00526     if (reader_) reader_->setRunNumber(runNumber_);
00527     noMoreEvents_ = false;
00528     resetLuminosityBlockAuxiliary();
00529   }
00530 
00531   boost::shared_ptr<RunAuxiliary>
00532   DaqSource::readRunAuxiliary_() {
00533     assert(newRun_);
00534     assert(!noMoreEvents_);
00535     newRun_ = false;
00536     boost::shared_ptr<RunAuxiliary> ra(new RunAuxiliary(runNumber_, timestamp(), Timestamp::invalidTimestamp()));
00537     ra->setProcessHistoryID(phid_);
00538     return ra;
00539   }
00540 
00541   boost::shared_ptr<LuminosityBlockAuxiliary>
00542   DaqSource::readLuminosityBlockAuxiliary_() {
00543     assert(!newRun_);
00544     assert(newLumi_);
00545     assert(!noMoreEvents_);
00546     assert(luminosityBlockAuxiliary());
00547     //assert(eventCached_); //the event may or may not be cached - rely on 
00548     // the call to getNextItemType to detect that.
00549     newLumi_ = false;
00550     return luminosityBlockAuxiliary();
00551   }
00552 
00553   EventPrincipal*
00554   DaqSource::readEvent_() {
00555     //    std::cout << "assert not newRun " << std::endl;
00556     assert(!newRun_);
00557     //    std::cout << "assert not newLumi " << std::endl;
00558     assert(!newLumi_);
00559     //    std::cout << "assert not noMoreEvents " << std::endl;
00560     assert(!noMoreEvents_);
00561     //    std::cout << "assert eventCached " << std::endl;
00562     assert(eventCached_);
00563     //    std::cout << "asserts done " << std::endl;
00564     eventCached_ = false;
00565     eventPrincipalCache()->setLuminosityBlockPrincipal(luminosityBlockPrincipal());
00566     return eventPrincipalCache();
00567   }
00568 
00569   void
00570   DaqSource::setLumi(LuminosityBlockNumber_t) {
00571       throw edm::Exception(errors::LogicError,"DaqSource::setLumi(LuminosityBlockNumber_t lumiNumber)")
00572         << "The luminosity block number cannot be set externally for DaqSource.\n"
00573         << "Contact a Framework developer.\n";
00574   }
00575 
00576   EventPrincipal*
00577   DaqSource::readIt(EventID const&) {
00578       throw edm::Exception(errors::LogicError,"DaqSource::readIt(EventID const& eventID)")
00579         << "Random access read cannot be used for DaqSource.\n"
00580         << "Contact a Framework developer.\n";
00581   }
00582 
00583   void
00584   DaqSource::skip(int) {
00585       throw edm::Exception(errors::LogicError,"DaqSource::skip(int offset)")
00586         << "Random access skip cannot be used for DaqSource\n"
00587         << "Contact a Framework developer.\n";
00588   }
00589 
00590   void DaqSource::publish(xdata::InfoSpace *is)
00591   {
00592     is_ = is;
00593     lumiSectionIndex_      = (xdata::UnsignedInteger32*)is_->find("lumiSectionIndex");
00594     prescaleSetIndex_      = (xdata::UnsignedInteger32*)is_->find("prescaleSetIndex");
00595     lastLumiPrescaleIndex_ = (xdata::UnsignedInteger32*)is_->find("lastLumiPrescaleIndex");
00596     lastLumiUsingEol_ = (xdata::UnsignedInteger32*)is_->find("lastLumiUsingEol");
00597     lsTimedOut_            = (xdata::Boolean*)is_->find("lsTimedOut");
00598     lsToBeRecovered_       = (xdata::Boolean*)is_->find("lsToBeRecovered");
00599   }
00600   void DaqSource::publishToXmas(xdata::InfoSpace *is)
00601   {
00602     mis_ = is;
00603   }
00604 
00605   void DaqSource::openBackDoor(unsigned int timeout_sec, bool *running)
00606   {
00607     count++;
00608     if(count==2) throw;
00609     pthread_mutex_lock(&mutex_);
00610     if (running) *running=true;
00611     pthread_mutex_unlock(&signal_lock_);
00612     timespec ts;
00613 #if _POSIX_TIMERS > 0
00614     clock_gettime(CLOCK_REALTIME, &ts);
00615 #else
00616     struct timeval tv; 
00617     gettimeofday(&tv, NULL);
00618     ts.tv_sec = tv.tv_sec + 0;
00619     ts.tv_nsec = 0;
00620 #endif
00621     ts.tv_sec += timeout_sec;
00622 
00623     int rc = pthread_cond_timedwait(&cond_, &mutex_, &ts);
00624     if(rc == ETIMEDOUT) lsTimedOut_->value_ = true; 
00625   }
00626   
00627   void DaqSource::closeBackDoor()
00628   {
00629     count--;
00630     pthread_cond_signal(&cond_);
00631     pthread_mutex_unlock(&mutex_);
00632     pthread_mutex_lock(&signal_lock_);
00633     lsTimedOut_->value_ = false; 
00634   }
00635 
00636   void DaqSource::signalWaitingThreadAndBlock()
00637   {
00638     pthread_mutex_lock(&signal_lock_);
00639     pthread_mutex_lock(&mutex_);
00640     pthread_mutex_unlock(&signal_lock_);
00641     //    std::cout << getpid() << " DS::signal from evloop " << std::endl;
00642     pthread_cond_signal(&cond_);
00643     //    std::cout << getpid() << " DS::go to wait for scalers wl " << std::endl;
00644     pthread_cond_wait(&cond_, &mutex_);
00645     pthread_mutex_unlock(&mutex_);
00646     ::usleep(1000);//allow other thread to lock
00647   }  
00648 
00649   void DaqSource::defaultWebPage(xgi::Input *in, xgi::Output *out)
00650   {
00651       std::string path;
00652       std::string urn;
00653       std::string mname;
00654       std::string query;
00655       std::string original_referrer_;
00656       try 
00657         {
00658           cgicc::Cgicc cgi(in);
00659           if ( xgi::Utils::hasFormElement(cgi,"gotostopping") )
00660             {
00661               goToStopping=true;
00662             }
00663           if ( xgi::Utils::hasFormElement(cgi,"module") )
00664             mname = xgi::Utils::getFormElement(cgi, "module")->getValue();
00665           cgicc::CgiEnvironment cgie(in);
00666           if(original_referrer_ == "")
00667             original_referrer_ = cgie.getReferrer();
00668           path = cgie.getPathInfo();
00669           query = cgie.getQueryString();
00670         }
00671       catch (const std::exception & e) 
00672         {
00673           // don't care if it did not work
00674         }
00675 
00676       using std::endl;
00677       *out << "<html>"                                                   << endl;
00678       *out << "<head>"                                                   << endl;
00679 
00680 
00681       *out << "<STYLE type=\"text/css\"> #T1 {border-width: 2px; border: solid blue; text-align: center} </STYLE> "                                      << endl; 
00682       *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00683       *out << " href=\"/" <<  urn
00684            << "/styles.css\"/>"                   << endl;
00685 
00686       *out << "<title>" << moduleName_
00687            << " MAIN</title>"                                            << endl;
00688 
00689       *out << "</head>"                                                  << endl;
00690       *out << "<body onload=\"loadXMLDoc()\">"                           << endl;
00691       *out << "<table border=\"0\" width=\"100%\">"                      << endl;
00692       *out << "<tr>"                                                     << endl;
00693       *out << "  <td align=\"left\">"                                    << endl;
00694       *out << "    <img"                                                 << endl;
00695       *out << "     align=\"middle\""                                    << endl;
00696       *out << "     src=\"/evf/images/bugicon.jpg\""                     << endl;
00697       *out << "     alt=\"main\""                                        << endl;
00698       *out << "     width=\"90\""                                        << endl;
00699       *out << "     height=\"64\""                                       << endl;
00700       *out << "     border=\"\"/>"                                       << endl;
00701       *out << "    <b>"                                                  << endl;
00702       *out <<             moduleName_                                    << endl;
00703       *out << "    </b>"                                                 << endl;
00704       *out << "  </td>"                                                  << endl;
00705       *out << "  <td width=\"32\">"                                      << endl;
00706       *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << endl;
00707       *out << "      <img"                                               << endl;
00708       *out << "       align=\"middle\""                                  << endl;
00709       *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << endl;
00710       *out << "       alt=\"HyperDAQ\""                                  << endl;
00711       *out << "       width=\"32\""                                      << endl;
00712       *out << "       height=\"32\""                                     << endl;
00713       *out << "       border=\"\"/>"                                     << endl;
00714       *out << "    </a>"                                                 << endl;
00715       *out << "  </td>"                                                  << endl;
00716       *out << "  <td width=\"32\">"                                      << endl;
00717       *out << "  </td>"                                                  << endl;
00718       *out << "  <td width=\"32\">"                                      << endl;
00719       *out << "    <a href=\"" << original_referrer_  << "\">"           << endl;
00720       *out << "      <img"                                               << endl;
00721       *out << "       align=\"middle\""                                  << endl;
00722       *out << "       src=\"/evf/images/spoticon.jpg\""                  << endl;
00723       *out << "       alt=\"main\""                                      << endl;
00724       *out << "       width=\"32\""                                      << endl;
00725       *out << "       height=\"32\""                                     << endl;
00726       *out << "       border=\"\"/>"                                     << endl;
00727       *out << "    </a>"                                                 << endl;
00728       *out << "  </td>"                                                  << endl;
00729       *out << "</tr>"                                                    << endl;
00730       *out << "</table>"                                                 << endl;
00731 
00732       *out << "<hr/>"                                                    << endl;
00733   
00734       *out << cgicc::form().set("method","GET").set("action", path ) 
00735            << std::endl;
00736       boost::char_separator<char> sep("&");
00737       boost::tokenizer<boost::char_separator<char> > tokens(query, sep);
00738       for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
00739            tok_iter != tokens.end(); ++tok_iter){
00740         size_t pos = (*tok_iter).find_first_of("=");
00741         if(pos != std::string::npos){
00742           std::string first  = (*tok_iter).substr(0    ,                        pos);
00743           std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
00744           *out << cgicc::input().set("type","hidden").set("name",first).set("value", second) 
00745                << std::endl;
00746         }
00747       }
00748 
00749       *out << cgicc::input().set("type","hidden").set("name","gotostopping").set("value","true")
00750            << std::endl;
00751       *out << cgicc::input().set("type","submit").set("value","Go To Stopping")              << std::endl;
00752       *out << cgicc::form()                                                << std::endl;  
00753 
00754       *out << "</body>"                                                  << endl;
00755       *out << "</html>"                                                  << endl;
00756   }
00757 }