CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/IORawData/DaqSource/plugins/DaqSource.cc

Go to the documentation of this file.
00001 
00008 #include "DaqSource.h"
00009 
00010 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00011 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00012 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00013 
00014 #include "IORawData/DaqSource/interface/DaqBaseReader.h"
00015 #include "IORawData/DaqSource/interface/DaqReaderPluginFactory.h"
00016 
00017 #include "DataFormats/Provenance/interface/Timestamp.h" 
00018 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00019 #include "FWCore/Framework/interface/EventPrincipal.h"
00020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00021 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00022 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00023 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00024 #include "DataFormats/Provenance/interface/EventID.h"
00025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00026 
00027 #include <string>
00028 #include <iostream>
00029 #include <time.h>
00030 #include <sys/time.h>
00031 #include <sys/types.h>
00032 
00033 #include "xgi/Method.h"
00034 #include "xgi/Utils.h"
00035 
00036 #include "cgicc/Cgicc.h"
00037 #include "cgicc/FormEntry.h"
00038 #include "cgicc/HTMLClasses.h"
00039 
00040 #include "boost/tokenizer.hpp"
00041 
00042 
00044 // construction/destruction
00046 
00047 
00048 
00049 namespace edm {
00050  namespace daqsource{
00051   constexpr unsigned int gtpEvmId_ =  FEDNumbering::MINTriggerGTPFEDID;
00052   constexpr unsigned int gtpeId_ =  FEDNumbering::MINTriggerEGTPFEDID;
00053  }
00054 
00055   //______________________________________________________________________________
00056   DaqSource::DaqSource(const ParameterSet& pset, 
00057                      const InputSourceDescription& desc) 
00058     : InputSource(pset,desc)
00059     , evf::ModuleWeb("DaqSource")
00060     , reader_(0)
00061     , lumiSegmentSizeInEvents_(pset.getUntrackedParameter<unsigned int>("evtsPerLS",0))
00062     , useEventCounter_(pset.getUntrackedParameter<bool>("useEventCounter",false))
00063     , eventCounter_(0)
00064     , keepUsingPsidFromTrigger_(pset.getUntrackedParameter<bool>("keepUsingPsidFromTrigger",false))
00065     , fakeLSid_(lumiSegmentSizeInEvents_ != 0)
00066     , runNumber_(RunID::firstValidRun().run())
00067     , luminosityBlockNumber_(LuminosityBlockID::firstValidLuminosityBlock().luminosityBlock())
00068     , daqProvenanceHelper_(TypeID(typeid(FEDRawDataCollection)))
00069     , noMoreEvents_(false)
00070     , newRun_(true)
00071     , newLumi_(true)
00072     , eventCached_(false)
00073     , alignLsToLast_(false)
00074     , is_(0)
00075     , mis_(0)
00076     , thisEventLSid(0)
00077     , goToStopping(false)
00078     , immediateStop(false)
00079     , forkInfo_(nullptr)
00080     , runFork_(false)
00081     , beginRunTiming_(false)
00082   {
00083     count = 0;
00084     pthread_mutex_init(&mutex_,0);
00085     pthread_mutex_init(&signal_lock_,0);
00086     pthread_cond_init(&cond_,0);
00087 
00088 
00089     setTimestamp(Timestamp::beginOfTime());
00090     
00091     // Instantiate the requested data source
00092     std::string reader = pset.getUntrackedParameter<std::string>("readerPluginName");
00093     
00094     try{
00095       reader_=
00096         DaqReaderPluginFactory::get()->create(reader,
00097                                             pset.getUntrackedParameter<ParameterSet>("readerPset"));
00098       reader_->setRunNumber(runNumber_);
00099     }
00100     catch(edm::Exception &e) {
00101       if(e.category() == "Configuration" && reader_ == 0) {
00102         reader_ = DaqReaderPluginFactoryU::get()->create(reader);
00103         if(reader_ == 0) throw;
00104         else reader_->setRunNumber(runNumber_);
00105       }
00106       else {
00107         throw;
00108       }
00109     }
00110 
00111    // Initialize metadata, and save the process history ID for use every event.
00112    phid_ = daqProvenanceHelper_.daqInit(productRegistryUpdate());
00113 
00114   }
00115   
00116   //______________________________________________________________________________
00117   DaqSource::~DaqSource() {
00118     delete reader_;
00119   }
00120   
00121   void DaqSource::publishForkInfo(evf::moduleweb::ForkInfoObj * forkInfoObj) {
00122     forkInfo_ = forkInfoObj;
00123     runFork_=true;
00124     immediateStop=false;
00125     noMoreEvents_=false;
00126   }
00127   
00129   // implementation of member functions
00131 
00132 
00133   //______________________________________________________________________________
00134   int DaqSource::doMyBeginRun() {
00135 
00136     if (forkInfo_) {
00137       while (!immediateStop) {
00138         //queue new run to Framework (causes EP beginRun to be executed)
00139         if (newRun_) {
00140           beginRunTiming_=true;
00141           gettimeofday(&tvStat_, NULL);
00142           return 2;
00143         }
00144         //measure time in fwk beginRun
00145         if (beginRunTiming_) {
00146           timeval tsTmp;
00147           gettimeofday(&tsTmp,NULL);
00148           long tusecs = (tsTmp.tv_sec-tvStat_.tv_sec)*1000000 + tsTmp.tv_usec - tvStat_.tv_usec;
00149           double tsecs = ((double)(tusecs/10000))/100.;
00150           std::cout << "DaqSource: FWK beginRun elapsed time: " << tsecs << " seconds in master EP"<< std::endl;
00151           edm::LogInfo("DaqSource") << "FWK beginRun elapsed time: " << tsecs << " seconds in master EP";
00152           beginRunTiming_=false;
00153           usleep(10000);//short sleep before fork
00154         }
00155         //first or new run init
00156         if (forkInfo_->forkParams.isMaster==-1) {
00157           forkInfo_->lock();//keeping it locked during init!
00158           forkInfo_->forkHandler(forkInfo_->fuAddr); //fork all slaves
00159         }
00160         if (forkInfo_->forkParams.isMaster==-1) {
00161           forkInfo_->unlock();
00162           std::cout << "ERROR (DaqSource): not notified to be either in  master or slave process after fork" << std::endl;
00163           return -2;
00164         }
00165 
00166         //slave process after fork: exit all this
00167         if (forkInfo_->forkParams.isMaster==0) {
00168           forkInfo_->unlock();
00169           return 1;
00170         }
00171 
00172         //master process after fork:
00173         if (forkInfo_->forkParams.isMaster==1) {
00174             forkInfo_->unlock();
00175             int slotToRestart=-1;
00176             sem_wait(forkInfo_->control_sem_);
00177             forkInfo_->lock();
00178 
00179             //got unblocked due to next run
00180             if (forkInfo_->forkParams.isMaster==-1) {
00181                     forkInfo_->unlock();
00182                     continue; // check here for newRun_?
00183             }
00184             //check if asked to stop
00185             immediateStop=forkInfo_->stopCondition;
00186             if (immediateStop) {
00187               forkInfo_->receivedStop_=true;
00188               break;
00189             }
00190             
00191             //check if asked to restart
00192             slotToRestart = forkInfo_->forkParams.slotId;
00193 
00194           if (slotToRestart==-1 && forkInfo_->forkParams.restart==0) {
00195             //this will deal with spurious semaphore signals when slave is killed
00196             forkInfo_->unlock();
00197             continue;
00198           }
00199           //restart single slave
00200           forkInfo_->forkHandler(forkInfo_->fuAddr);
00201         }
00202       }
00203       //loop exit
00204       forkInfo_->unlock();
00205       return 0;
00206     }
00207     return -1; //no forkInfo_
00208   }
00209 
00210 
00211   //______________________________________________________________________________
00212   InputSource::ItemType 
00213   DaqSource::getNextItemType() {
00214     //    std::cout << getpid() << " enter getNextItemType " << std::endl;
00215     if (runFork_) {
00216       runFork_=false;
00217       int queueNext = doMyBeginRun();
00218       //check if new run (requires returning IsRun once)
00219       if (queueNext == 2) runFork_=true;
00220     }
00221 
00222     //get initial time before beginRun (used with old forking)
00223     if (!forkInfo_ && newRun_) {
00224       beginRunTiming_=true;
00225       gettimeofday(&tvStat_, NULL);
00226     }
00227 
00228     if (immediateStop) return IsStop;
00229 
00230     // --------------
00231     if(goToStopping){noMoreEvents_ = true; goToStopping=false;}
00232     if (noMoreEvents_) {
00233       pthread_mutex_lock(&mutex_);
00234       pthread_cond_signal(&cond_);
00235       pthread_mutex_unlock(&mutex_);
00236       return IsStop;
00237     }
00238     if (newRun_) {
00239       return IsRun;
00240     }
00241 
00242     //calculate and print the beginRun the timing
00243     if (beginRunTiming_) {
00244       timeval tsTmp;
00245       gettimeofday(&tsTmp,NULL);
00246       long tusecs = (tsTmp.tv_sec-tvStat_.tv_sec)*1000000 + tsTmp.tv_usec - tvStat_.tv_usec;
00247       double tsecs = ((double)(tusecs/10000))/100.;
00248       std::cout << "DaqSource (slave pid "<< getpid() << " ): FWK beginRun elapsed time: " 
00249                 << tsecs << " seconds "<< std::endl;
00250       edm::LogInfo("DaqSource") << "DaqSource (slave pid "<< getpid() << " ): FWK beginRun elapsed time: " 
00251                 << tsecs << " seconds ";
00252       beginRunTiming_=false;
00253     }
00254 
00255     if (newLumi_ && luminosityBlockAuxiliary()) {
00256       //      std::cout << "newLumi & lumiblock valid " << std::endl;
00257       return IsLumi;
00258     }
00259     if (alignLsToLast_) { //here we are recovering from a gap in Ls number so an event may already be cached but 
00260       // we hold onto it until we have issued all the necessary endLumi/beginLumi
00261       //       std::cout << getpid() << "alignLsToLast was set and ls number is " 
00262       //                << luminosityBlockNumber_ << " before signaling" << std::endl;
00263       signalWaitingThreadAndBlock();
00264       luminosityBlockNumber_++;
00265       //       std::cout << getpid() << "alignLsToLast signaled and incremented " 
00266       //                << luminosityBlockNumber_ << " eventcached " 
00267       //                << eventCached_ << std::endl;
00268       newLumi_ = true;
00269       lumiSectionIndex_->value_ = luminosityBlockNumber_;
00270       resetLuminosityBlockAuxiliary();
00271       if(luminosityBlockNumber_ == thisEventLSid+1) 
00272       {
00273         alignLsToLast_ = false;
00274       }
00275       if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != luminosityBlockNumber_) {
00276         setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
00277               runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
00278         luminosityBlockAuxiliary()->setProcessHistoryID(phid_);
00279 
00280         //      std::cout << "nextItemType: dealt with new lumi block principal, retval is " << retval << std::endl;
00281       }
00282       return IsLumi;
00283     }
00284     if (eventCached_) {
00285       //      std::cout << "read event already cached " << std::endl;
00286       return IsEvent;
00287     }
00288     if(reader_ == 0) {
00289       throw edm::Exception(errors::LogicError)
00290           << "DaqSource is used without a reader. Check your configuration !";
00291     }
00292     EventID eventId;
00293     TimeValue_t time = 0LL;
00294     timeval stv;
00295     gettimeofday(&stv,0);
00296     time = stv.tv_sec;
00297     time = (time << 32) + stv.tv_usec;
00298     Timestamp tstamp(time);
00299 
00300     int bunchCrossing = EventAuxiliary::invalidBunchXing;
00301     int orbitNumber   = EventAuxiliary::invalidBunchXing;
00302     
00303     // pass a 0 pointer to fillRawData()!
00304     FEDRawDataCollection* fedCollection(0);
00305 
00306     edm::EventAuxiliary::ExperimentType evttype = EventAuxiliary::Undefined;
00307   
00308     // let reader_ fill the fedCollection 
00309     int retval = reader_->fillRawData(eventId, tstamp, fedCollection);
00310     if(retval==0) {
00311       // fillRawData() failed, clean up the fedCollection in case it was allocated!
00312       if (0 != fedCollection) delete fedCollection;
00313       noMoreEvents_ = true;
00314       pthread_mutex_lock(&mutex_);
00315       pthread_cond_signal(&cond_);
00316       pthread_mutex_unlock(&mutex_);
00317       return IsStop;
00318     }
00319     else if(retval<0)
00320       {
00321  
00322         unsigned int nextLsFromSignal = (-1)*retval+1;
00323 //      std::cout << getpid() << "::got end-of-lumi for " << (-1)*retval
00324 //                << " was " << luminosityBlockNumber_ << std::endl;
00325         if(luminosityBlockNumber_ == (nextLsFromSignal-1) )
00326           {
00327             lastLumiUsingEol_->value_ = nextLsFromSignal;
00328             if(lsToBeRecovered_->value_){
00329 //            std::cout << getpid() << "eol::recover ls::for " << (-1)*retval << std::endl;
00330               signalWaitingThreadAndBlock();
00331               luminosityBlockNumber_++;
00332               newLumi_ = true;
00333               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00334               resetLuminosityBlockAuxiliary();
00335               thisEventLSid = nextLsFromSignal - 1;
00336               if(luminosityBlockNumber_ != thisEventLSid+1) 
00337                 alignLsToLast_ = true;
00338               //              std::cout << getpid() << "eol::::alignLsToLast_ " << alignLsToLast_ << std::endl;
00339             }
00340             else{
00341               //              std::cout << getpid() << "eol::realign ls::for " << (-1)*retval << std::endl;
00342               luminosityBlockNumber_ = nextLsFromSignal;
00343               newLumi_ = true;
00344               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00345               resetLuminosityBlockAuxiliary();
00346             }
00347           }
00348         else {
00349           if(nextLsFromSignal >(luminosityBlockNumber_+100) ) {
00350             edm::LogError("DaqSource") << "Got EOL event with value " << retval 
00351                                        << " nextLS would be " << nextLsFromSignal 
00352                                        << " while we expected " << luminosityBlockNumber_+1 << " - disregarding... ";
00353           }
00354           if (nextLsFromSignal > luminosityBlockNumber_+2) //recover on delta > 2
00355           {
00356               lastLumiUsingEol_->value_ = nextLsFromSignal;
00357               thisEventLSid=nextLsFromSignal-1;//set new LS
00358               signalWaitingThreadAndBlock();
00359               luminosityBlockNumber_++;
00360               newLumi_ = true;
00361               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00362               alignLsToLast_ = true;
00363 
00364               //set new lumi block
00365               resetLuminosityBlockAuxiliary();
00366               setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
00367                 runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
00368               luminosityBlockAuxiliary()->setProcessHistoryID(phid_);
00369           }
00370 
00371         }
00372         //      else
00373         //        std::cout << getpid() << "::skipping end-of-lumi for " << (-1)*retval << std::endl;
00374       }
00375     else
00376       {
00377         if (eventId.event() == 0) {
00378           throw edm::Exception(errors::LogicError)
00379             << "The reader used with DaqSource has returned an invalid (zero) event number!\n"
00380             << "Event numbers must begin at 1, not 0.";
00381         }
00382         EventSourceSentry(*this);
00383         setTimestamp(tstamp);
00384     
00385         unsigned char *gtpFedAddr = fedCollection->FEDData(daqsource::gtpEvmId_).size()!=0 ? fedCollection->FEDData(daqsource::gtpEvmId_).data() : 0;
00386         uint32_t gtpsize = 0;
00387         if(gtpFedAddr !=0) gtpsize = fedCollection->FEDData(daqsource::gtpEvmId_).size();
00388         unsigned char *gtpeFedAddr = fedCollection->FEDData(daqsource::gtpeId_).size()!=0 ? fedCollection->FEDData(daqsource::gtpeId_).data() : 0; 
00389 
00390         unsigned int nextFakeLs = 0;
00391         eventCounter_++;
00392         if (fakeLSid_)
00393             evttype =  edm::EventAuxiliary::PhysicsTrigger; 
00394         if(fakeLSid_ && luminosityBlockNumber_ != 
00395            (nextFakeLs = useEventCounter_ ? ((eventCounter_-1)/lumiSegmentSizeInEvents_ + 1) :
00396             ((eventId.event() - 1)/lumiSegmentSizeInEvents_ + 1))) {
00397           lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
00398           prescaleSetIndex_->value_ = 0; // since we do not know better but we want to be able to run
00399          
00400           if(luminosityBlockNumber_ == nextFakeLs-1)
00401             signalWaitingThreadAndBlock();
00402           luminosityBlockNumber_ = nextFakeLs;
00403           thisEventLSid = nextFakeLs-1;
00404           newLumi_ = true;
00405           lumiSectionIndex_->value_ = luminosityBlockNumber_;
00406           resetLuminosityBlockAuxiliary();
00407           if(keepUsingPsidFromTrigger_ && 
00408              gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
00409             prescaleSetIndex_->value_  = (evf::evtn::getfdlpsc(gtpFedAddr) & 0xffff);
00410           }       
00411         }
00412         else if(!fakeLSid_){ 
00413 
00414           if(gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
00415             lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
00416             thisEventLSid = evf::evtn::getlbn(gtpFedAddr);
00417             prescaleSetIndex_->value_  = (evf::evtn::getfdlpsc(gtpFedAddr) & 0xffff);
00418             evttype =  edm::EventAuxiliary::ExperimentType(evf::evtn::getevtyp(gtpFedAddr));
00419             if(luminosityBlockNumber_ > (thisEventLSid + 1))
00420             {
00421               //late event,throw fwk exception
00422               std::ostringstream excptmsg;
00423               excptmsg << "DaqSource::event with late LS (" << thisEventLSid + 1 << ")received.";
00424               throw edm::Exception(errors::LogicError,excptmsg.str());
00425             }
00426             if(luminosityBlockNumber_ != (thisEventLSid + 1)){
00427               // we got here in a running process and some Ls might have been skipped so set the flag, 
00428               // increase by one, check and if appropriate set the flag then continue
00429               if(lsToBeRecovered_->value_){
00430                 //              std::cout << getpid() << "eve::recover ls::for " << thisEventLSid << std::endl;
00431                 signalWaitingThreadAndBlock();
00432                 luminosityBlockNumber_++;
00433                 newLumi_ = true;
00434                 lumiSectionIndex_->value_ = luminosityBlockNumber_;
00435                 resetLuminosityBlockAuxiliary();
00436                 if(luminosityBlockNumber_ != thisEventLSid+1) alignLsToLast_ = true;
00437                 //              std::cout << getpid() << "eve::::alignLsToLast_ " << alignLsToLast_ << std::endl;
00438               }
00439               else{ // we got here because the process was restarted. just realign the ls id and proceed with this event
00440                 //              std::cout << getpid() << "eve::realign ls::for " << thisEventLSid << std::endl;
00441                 luminosityBlockNumber_ = thisEventLSid + 1;
00442                 newLumi_ = true;
00443                 lumiSectionIndex_->value_ = luminosityBlockNumber_;
00444                 resetLuminosityBlockAuxiliary();
00445                 lsToBeRecovered_->value_ = true;
00446               }
00447             }
00448           }
00449           else if(gtpeFedAddr!=0 && evf::evtn::gtpe_board_sense(gtpeFedAddr)){
00450             lastLumiPrescaleIndex_->value_ = prescaleSetIndex_->value_;
00451             thisEventLSid = evf::evtn::gtpe_getlbn(gtpeFedAddr);
00452             prescaleSetIndex_->value_ = 0; //waiting to get a PS index from gtpe
00453             evttype =  edm::EventAuxiliary::PhysicsTrigger; 
00454             if(luminosityBlockNumber_ != (thisEventLSid + 1)){
00455               if(luminosityBlockNumber_ == thisEventLSid)
00456                 signalWaitingThreadAndBlock();
00457               luminosityBlockNumber_ = thisEventLSid + 1;
00458               newLumi_ = true;
00459               lumiSectionIndex_->value_ = luminosityBlockNumber_;
00460               resetLuminosityBlockAuxiliary();
00461             }
00462           }
00463         }
00464         if(gtpFedAddr!=0 && evf::evtn::evm_board_sense(gtpFedAddr,gtpsize)){
00465           bunchCrossing =  int(evf::evtn::getfdlbx(gtpFedAddr));
00466           orbitNumber =  int(evf::evtn::getorbit(gtpFedAddr));
00467           TimeValue_t time = evf::evtn::getgpshigh(gtpFedAddr);
00468           time = (time << 32) + evf::evtn::getgpslow(gtpFedAddr);
00469           Timestamp tstamp(time);
00470           setTimestamp(tstamp);      
00471         }
00472         else if(gtpeFedAddr!=0 && evf::evtn::gtpe_board_sense(gtpeFedAddr)){
00473           bunchCrossing =  int(evf::evtn::gtpe_getbx(gtpeFedAddr));
00474           orbitNumber =  int(evf::evtn::gtpe_getorbit(gtpeFedAddr));
00475         }
00476       }    
00477           
00478     //    std::cout << "lumiblockaux = " << luminosityBlockAuxiliary() << std::endl;
00479     // If there is no luminosity block principal, make one.
00480     if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != luminosityBlockNumber_) {
00481       newLumi_ = true;
00482       setLuminosityBlockAuxiliary(new LuminosityBlockAuxiliary(
00483         runNumber_, luminosityBlockNumber_, timestamp(), Timestamp::invalidTimestamp()));
00484       luminosityBlockAuxiliary()->setProcessHistoryID(phid_);
00485 
00486       //      std::cout << "nextItemType: dealt with new lumi block principal, retval is " << retval << std::endl;
00487     }
00488     //    std::cout << "here retval = " << retval << std::endl;
00489     if(retval<0){
00490       //      std::cout << getpid() << " returning from getnextitem because retval < 0 - IsLumi "
00491       //                << IsLumi << std::endl;
00492       if(newLumi_) return IsLumi; else return getNextItemType();
00493     }
00494 
00495     // make a brand new event principal
00496     eventId = EventID(runNumber_,thisEventLSid+1, eventId.event());
00497     EventAuxiliary eventAux(eventId, processGUID(),
00498                             timestamp(),
00499                             true,
00500                             evttype,
00501                             bunchCrossing,
00502                             EventAuxiliary::invalidStoreNumber,
00503                             orbitNumber);
00504     eventAux.setProcessHistoryID(phid_);
00505     eventPrincipalCache()->fillEventPrincipal(eventAux, boost::shared_ptr<LuminosityBlockPrincipal>());
00506     eventCached_ = true;
00507     
00508     // have fedCollection managed by a std::auto_ptr<>
00509     std::auto_ptr<FEDRawDataCollection> bare_product(fedCollection);
00510 
00511     WrapperOwningHolder edp(new Wrapper<FEDRawDataCollection>(bare_product), Wrapper<FEDRawDataCollection>::getInterface());
00512     eventPrincipalCache()->put(daqProvenanceHelper_.constBranchDescription_, edp, daqProvenanceHelper_.dummyProvenance_);
00513 
00514 /*
00515     Event e(*eventPrincipalCache(), md_);
00516     // put the fed collection into the transient event store
00517     e.put(bare_product);
00518     // The commit is needed to complete the "put" transaction.
00519     e.commit_();
00520 */
00521     if (newLumi_) {
00522       return IsLumi;
00523     }
00524     return IsEvent;
00525   }
00526 
00527   void
00528   DaqSource::setRun(RunNumber_t r) {
00529     assert(!eventCached_);
00530     reset();
00531     newRun_ = newLumi_ = true;
00532     runNumber_ = r;
00533     if (reader_) reader_->setRunNumber(runNumber_);
00534     noMoreEvents_ = false;
00535     resetLuminosityBlockAuxiliary();
00536   }
00537 
00538   boost::shared_ptr<RunAuxiliary>
00539   DaqSource::readRunAuxiliary_() {
00540     assert(newRun_);
00541     assert(!noMoreEvents_);
00542     newRun_ = false;
00543     boost::shared_ptr<RunAuxiliary> ra(new RunAuxiliary(runNumber_, timestamp(), Timestamp::invalidTimestamp()));
00544     ra->setProcessHistoryID(phid_);
00545     return ra;
00546   }
00547 
00548   boost::shared_ptr<LuminosityBlockAuxiliary>
00549   DaqSource::readLuminosityBlockAuxiliary_() {
00550     assert(!newRun_);
00551     assert(newLumi_);
00552     assert(!noMoreEvents_);
00553     assert(luminosityBlockAuxiliary());
00554     //assert(eventCached_); //the event may or may not be cached - rely on 
00555     // the call to getNextItemType to detect that.
00556     newLumi_ = false;
00557     return luminosityBlockAuxiliary();
00558   }
00559 
00560   EventPrincipal*
00561   DaqSource::readEvent_() {
00562     //    std::cout << "assert not newRun " << std::endl;
00563     assert(!newRun_);
00564     //    std::cout << "assert not newLumi " << std::endl;
00565     assert(!newLumi_);
00566     //    std::cout << "assert not noMoreEvents " << std::endl;
00567     assert(!noMoreEvents_);
00568     //    std::cout << "assert eventCached " << std::endl;
00569     assert(eventCached_);
00570     //    std::cout << "asserts done " << std::endl;
00571     eventCached_ = false;
00572     eventPrincipalCache()->setLuminosityBlockPrincipal(luminosityBlockPrincipal());
00573     return eventPrincipalCache();
00574   }
00575 
00576   void
00577   DaqSource::setLumi(LuminosityBlockNumber_t) {
00578       throw edm::Exception(errors::LogicError,"DaqSource::setLumi(LuminosityBlockNumber_t lumiNumber)")
00579         << "The luminosity block number cannot be set externally for DaqSource.\n"
00580         << "Contact a Framework developer.\n";
00581   }
00582 
00583   EventPrincipal*
00584   DaqSource::readIt(EventID const&) {
00585       throw edm::Exception(errors::LogicError,"DaqSource::readIt(EventID const& eventID)")
00586         << "Random access read cannot be used for DaqSource.\n"
00587         << "Contact a Framework developer.\n";
00588   }
00589 
00590   void
00591   DaqSource::skip(int) {
00592       throw edm::Exception(errors::LogicError,"DaqSource::skip(int offset)")
00593         << "Random access skip cannot be used for DaqSource\n"
00594         << "Contact a Framework developer.\n";
00595   }
00596 
00597   void DaqSource::publish(xdata::InfoSpace *is)
00598   {
00599     is_ = is;
00600     lumiSectionIndex_      = (xdata::UnsignedInteger32*)is_->find("lumiSectionIndex");
00601     prescaleSetIndex_      = (xdata::UnsignedInteger32*)is_->find("prescaleSetIndex");
00602     lastLumiPrescaleIndex_ = (xdata::UnsignedInteger32*)is_->find("lastLumiPrescaleIndex");
00603     lastLumiUsingEol_ = (xdata::UnsignedInteger32*)is_->find("lastLumiUsingEol");
00604     lsTimedOut_            = (xdata::Boolean*)is_->find("lsTimedOut");
00605     lsToBeRecovered_       = (xdata::Boolean*)is_->find("lsToBeRecovered");
00606   }
00607   void DaqSource::publishToXmas(xdata::InfoSpace *is)
00608   {
00609     mis_ = is;
00610   }
00611 
00612   void DaqSource::openBackDoor(unsigned int timeout_sec, bool *running)
00613   {
00614     count++;
00615     if(count==2) throw;
00616     pthread_mutex_lock(&mutex_);
00617     if (running) *running=true;
00618     pthread_mutex_unlock(&signal_lock_);
00619     timespec ts;
00620 #if _POSIX_TIMERS > 0
00621     clock_gettime(CLOCK_REALTIME, &ts);
00622 #else
00623     struct timeval tv; 
00624     gettimeofday(&tv, NULL);
00625     ts.tv_sec = tv.tv_sec + 0;
00626     ts.tv_nsec = 0;
00627 #endif
00628     ts.tv_sec += timeout_sec;
00629 
00630     int rc = pthread_cond_timedwait(&cond_, &mutex_, &ts);
00631     if(rc == ETIMEDOUT) lsTimedOut_->value_ = true; 
00632   }
00633   
00634   void DaqSource::closeBackDoor()
00635   {
00636     count--;
00637     pthread_cond_signal(&cond_);
00638     pthread_mutex_unlock(&mutex_);
00639     pthread_mutex_lock(&signal_lock_);
00640     lsTimedOut_->value_ = false; 
00641   }
00642 
00643   void DaqSource::signalWaitingThreadAndBlock()
00644   {
00645     pthread_mutex_lock(&signal_lock_);
00646     pthread_mutex_lock(&mutex_);
00647     pthread_mutex_unlock(&signal_lock_);
00648     //    std::cout << getpid() << " DS::signal from evloop " << std::endl;
00649     pthread_cond_signal(&cond_);
00650     //    std::cout << getpid() << " DS::go to wait for scalers wl " << std::endl;
00651     pthread_cond_wait(&cond_, &mutex_);
00652     pthread_mutex_unlock(&mutex_);
00653     ::usleep(1000);//allow other thread to lock
00654   }  
00655 
00656   void DaqSource::defaultWebPage(xgi::Input *in, xgi::Output *out)
00657   {
00658       std::string path;
00659       std::string urn;
00660       std::string mname;
00661       std::string query;
00662       std::string original_referrer_;
00663       try 
00664         {
00665           cgicc::Cgicc cgi(in);
00666           if ( xgi::Utils::hasFormElement(cgi,"gotostopping") )
00667             {
00668               goToStopping=true;
00669             }
00670           if ( xgi::Utils::hasFormElement(cgi,"module") )
00671             mname = xgi::Utils::getFormElement(cgi, "module")->getValue();
00672           cgicc::CgiEnvironment cgie(in);
00673           if(original_referrer_ == "")
00674             original_referrer_ = cgie.getReferrer();
00675           path = cgie.getPathInfo();
00676           query = cgie.getQueryString();
00677         }
00678       catch (const std::exception & e) 
00679         {
00680           // don't care if it did not work
00681         }
00682 
00683       using std::endl;
00684       *out << "<html>"                                                   << endl;
00685       *out << "<head>"                                                   << endl;
00686 
00687 
00688       *out << "<STYLE type=\"text/css\"> #T1 {border-width: 2px; border: solid blue; text-align: center} </STYLE> "                                      << endl; 
00689       *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00690       *out << " href=\"/" <<  urn
00691            << "/styles.css\"/>"                   << endl;
00692 
00693       *out << "<title>" << moduleName_
00694            << " MAIN</title>"                                            << endl;
00695 
00696       *out << "</head>"                                                  << endl;
00697       *out << "<body onload=\"loadXMLDoc()\">"                           << endl;
00698       *out << "<table border=\"0\" width=\"100%\">"                      << endl;
00699       *out << "<tr>"                                                     << endl;
00700       *out << "  <td align=\"left\">"                                    << endl;
00701       *out << "    <img"                                                 << endl;
00702       *out << "     align=\"middle\""                                    << endl;
00703       *out << "     src=\"/evf/images/bugicon.jpg\""                     << endl;
00704       *out << "     alt=\"main\""                                        << endl;
00705       *out << "     width=\"90\""                                        << endl;
00706       *out << "     height=\"64\""                                       << endl;
00707       *out << "     border=\"\"/>"                                       << endl;
00708       *out << "    <b>"                                                  << endl;
00709       *out <<             moduleName_                                    << endl;
00710       *out << "    </b>"                                                 << endl;
00711       *out << "  </td>"                                                  << endl;
00712       *out << "  <td width=\"32\">"                                      << endl;
00713       *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << endl;
00714       *out << "      <img"                                               << endl;
00715       *out << "       align=\"middle\""                                  << endl;
00716       *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << endl;
00717       *out << "       alt=\"HyperDAQ\""                                  << endl;
00718       *out << "       width=\"32\""                                      << endl;
00719       *out << "       height=\"32\""                                     << endl;
00720       *out << "       border=\"\"/>"                                     << endl;
00721       *out << "    </a>"                                                 << endl;
00722       *out << "  </td>"                                                  << endl;
00723       *out << "  <td width=\"32\">"                                      << endl;
00724       *out << "  </td>"                                                  << endl;
00725       *out << "  <td width=\"32\">"                                      << endl;
00726       *out << "    <a href=\"" << original_referrer_  << "\">"           << endl;
00727       *out << "      <img"                                               << endl;
00728       *out << "       align=\"middle\""                                  << endl;
00729       *out << "       src=\"/evf/images/spoticon.jpg\""                  << endl;
00730       *out << "       alt=\"main\""                                      << endl;
00731       *out << "       width=\"32\""                                      << endl;
00732       *out << "       height=\"32\""                                     << endl;
00733       *out << "       border=\"\"/>"                                     << endl;
00734       *out << "    </a>"                                                 << endl;
00735       *out << "  </td>"                                                  << endl;
00736       *out << "</tr>"                                                    << endl;
00737       *out << "</table>"                                                 << endl;
00738 
00739       *out << "<hr/>"                                                    << endl;
00740   
00741       *out << cgicc::form().set("method","GET").set("action", path ) 
00742            << std::endl;
00743       boost::char_separator<char> sep("&");
00744       boost::tokenizer<boost::char_separator<char> > tokens(query, sep);
00745       for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
00746            tok_iter != tokens.end(); ++tok_iter){
00747         size_t pos = (*tok_iter).find_first_of("=");
00748         if(pos != std::string::npos){
00749           std::string first  = (*tok_iter).substr(0    ,                        pos);
00750           std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
00751           *out << cgicc::input().set("type","hidden").set("name",first).set("value", second) 
00752                << std::endl;
00753         }
00754       }
00755 
00756       *out << cgicc::input().set("type","hidden").set("name","gotostopping").set("value","true")
00757            << std::endl;
00758       *out << cgicc::input().set("type","submit").set("value","Go To Stopping")              << std::endl;
00759       *out << cgicc::form()                                                << std::endl;  
00760 
00761       *out << "</body>"                                                  << endl;
00762       *out << "</html>"                                                  << endl;
00763   }
00764 }