CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_10/src/EventFilter/EcalRawToDigi/plugins/MatacqProducer.cc

Go to the documentation of this file.
00001 #include "EventFilter/EcalRawToDigi/interface/MatacqProducer.h"
00002 #include "EventFilter/EcalRawToDigi/src/Majority.h"
00003 #include "FWCore/Framework/interface/Event.h"
00004 #include "FWCore/Framework/interface/EventSetup.h"
00005 #include "Geometry/Records/interface/IdealGeometryRecord.h"
00006 #include "FWCore/Framework/interface/ESHandle.h"
00007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00008 #include "DataFormats/FEDRawData/interface/FEDRawData.h"
00009 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00010 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00011 #include "DataFormats/EcalDigi/interface/EcalMatacqDigi.h"
00012 #include "DataFormats/EcalDigi/interface/EcalDigiCollections.h"
00013 #include <boost/algorithm/string.hpp>
00014 #include <boost/format.hpp>
00015 
00016 #include <memory>
00017 
00018 #include <stdio.h>
00019 
00020 #include <fstream>
00021 #include <iomanip>
00022 
00023 #include "DataFormats/EcalDigi/interface/EcalMatacqDigi.h"
00024 
00025 #include <sys/types.h>
00026 #include <unistd.h>
00027 #include <signal.h>
00028 #include <sys/stat.h>
00029 
00030 using namespace std;
00031 using namespace boost;
00032 using namespace edm;
00033 
00034 // #undef LogInfo
00035 // #define LogInfo(a) cout << "INFO " << a << ": "
00036 // #undef LogWarning
00037 // #define LogWarning(a) cout << "WARN " << a << ": "
00038 // #undef LogDebug
00039 // #define LogDebug(a) cout << "DBG " << a << ": "
00040 
00041 
00042 //verbose mode for matacq event retrieval debugging:
00043 //static const bool searchDbg = false;
00044 
00045 //laser freq is 1 every 112 orbit => >80 orbit
00046 int MatacqProducer::orbitTolerance_ = 80; 
00047 
00048 MatacqProducer::stats_t MatacqProducer::stats_init = {0,0,0};
00049 
00050 static std::string now(){
00051   struct timeval t;
00052   gettimeofday(&t, 0);
00053  
00054   char buf[256];
00055   strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
00056   buf[sizeof(buf)-1] = 0;
00057 
00058   stringstream buf2;
00059   buf2 << buf << " " << ((t.tv_usec+500)/1000)  << " ms";
00060 
00061   return buf2.str();
00062 }
00063 
00064 
00065 MatacqProducer::MatacqProducer(const edm::ParameterSet& params):
00066   fileNames_(params.getParameter<std::vector<std::string> >("fileNames")),
00067   digiInstanceName_(params.getParameter<string>("digiInstanceName")),
00068   rawInstanceName_(params.getParameter<string>("rawInstanceName")),
00069   timing_(params.getUntrackedParameter<bool>("timing", false)),
00070   disabled_(params.getParameter<bool>("disabled")),
00071   verbosity_(params.getUntrackedParameter<int>("verbosity", 0)),
00072   produceDigis_(params.getParameter<bool>("produceDigis")),
00073   produceRaw_(params.getParameter<bool>("produceRaw")),
00074   inputRawCollection_(params.getParameter<InputTag>("inputRawCollection")),
00075   mergeRaw_(params.getParameter<bool>("mergeRaw")),
00076   ignoreTriggerType_(params.getParameter<bool>("ignoreTriggerType")),
00077   matacq_(0, 0),
00078   inFile_(0),
00079   data_(bufferSize),
00080   openedFileRunNumber_(0),
00081   lastOrb_(0),
00082   fastRetrievalThresh_(0),
00083   orbitOffsetFile_(params.getUntrackedParameter<std::string>("orbitOffsetFile",
00084                                                              "")),
00085   inFileName_(""),
00086   stats_(stats_init),
00087   logFileName_(params.getUntrackedParameter<std::string>("logFileName",
00088                                                          "matacqProducer.log")),
00089   eventSkipCounter_(0),
00090   onErrorDisablingEvtCnt_(params.getParameter<int>("onErrorDisablingEvtCnt")),
00091   timeLogFile_(params.getUntrackedParameter<std::string>("timeLogFile", "")),
00092   runNumber_(0)
00093 {
00094   if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer ctor"  << endl;
00095   
00096   gettimeofday(&timer_, 0);
00097 
00098   if(timeLogFile_.size()>0){
00099     timeLog_.open(timeLogFile_.c_str());
00100     if(timeLog_.fail()){
00101       cout << "[LaserSorter " << now() << "] "
00102            << "Failed to open file " << timeLogFile_ << " to log timing.\n";
00103       logTiming_ = false;
00104     } else{
00105       logTiming_ = true;
00106     }
00107   }
00108   
00109   posEstim_.verbosity(verbosity_);
00110 
00111   logFile_.open(logFileName_.c_str(), ios::app | ios::out);
00112   
00113   if(logFile_.bad()){
00114     throw cms::Exception("FileOpen") << "Failed to open file "
00115                                      << logFileName_ << " for logging.\n";
00116   }
00117 
00118   if(produceDigis_){
00119     if(verbosity_>0) cout << "[Matacq " << now() << "] registering new "
00120                        "EcalMatacqDigiCollection product with instance name '"
00121                           << digiInstanceName_ << "'\n";
00122     produces<EcalMatacqDigiCollection>(digiInstanceName_);
00123   }
00124   
00125   if(produceRaw_){
00126     if(verbosity_>0) cout << "[Matacq " << now() << "] registering new FEDRawDataCollection "
00127                        "product with instance name '"
00128                           << rawInstanceName_ << "'\n";
00129     produces<FEDRawDataCollection>(rawInstanceName_);
00130   }
00131   
00132   startTime_.tv_sec = startTime_.tv_usec = 0;
00133   if(orbitOffsetFile_.size()>0){
00134     doOrbitOffset_ = true;
00135     loadOrbitOffset();
00136   } else{
00137     doOrbitOffset_ = false;
00138   }
00139   if(verbosity_>=4) cout << "[Matacq " << now() << "] exiting MatacqProducer ctor"  << endl;
00140 }
00141 
00142 
00143 void
00144 MatacqProducer::produce(edm::Event& event, const edm::EventSetup& eventSetup){
00145   if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer::produce"  << endl;
00146   if(logTiming_){
00147     timeval t;
00148     gettimeofday(&t, 0);
00149 
00150     timeLog_ << t.tv_sec << "."
00151              << setfill('0') << setw(3) << (t.tv_usec+500)/1000 << setfill(' ')<< "\t"
00152              << (t.tv_usec - timer_.tv_usec)*1. 
00153       + (t.tv_sec - timer_.tv_sec)*1.e6 << "\t";
00154     timer_ = t;
00155   } 
00156  
00157   if(startTime_.tv_sec==0) gettimeofday(&startTime_, 0);
00158   ++stats_.nEvents;  
00159   if(disabled_) return;
00160   const uint32_t runNumber = getRunNumber(event);
00161   if(runNumber!=runNumber_){
00162     newRun(runNumber_, runNumber);
00163   }
00164   addMatacqData(event);
00165 
00166   if(logTiming_){
00167     timeval t;
00168       gettimeofday(&t, 0);
00169       timeLog_ << (t.tv_usec - timer_.tv_usec)*1. 
00170         + (t.tv_sec - timer_.tv_sec)*1.e6 << "\n";
00171       timer_ = t;
00172   }
00173 }
00174 
00175 void
00176 MatacqProducer::addMatacqData(edm::Event& event){
00177   edm::Handle<FEDRawDataCollection> sourceColl;
00178   if(inputRawCollection_.label().size() == 0
00179      && inputRawCollection_.instance().size() == 0){
00180     event.getByType(sourceColl);
00181   } else{
00182     event.getByLabel(inputRawCollection_, sourceColl);
00183   }
00184   
00185   std::auto_ptr<FEDRawDataCollection> rawColl;
00186   if(produceRaw_){
00187     if(mergeRaw_){
00188       rawColl = auto_ptr<FEDRawDataCollection>(new FEDRawDataCollection(*sourceColl));
00189     } else{
00190       rawColl = auto_ptr<FEDRawDataCollection>(new FEDRawDataCollection());
00191     }
00192   }
00193   
00194   std::auto_ptr<EcalMatacqDigiCollection>
00195     digiColl(new EcalMatacqDigiCollection());
00196   
00197   if(eventSkipCounter_==0){
00198     if(sourceColl->FEDData(matacqFedId_).size()>4 && !produceRaw_){
00199       //input raw data collection already contains matacqData
00200       formatter_.interpretRawData(sourceColl->FEDData(matacqFedId_),
00201                                   *digiColl);             
00202     } else{
00203       bool isLaserEvent = (getCalibTriggerType(event) == laserType);
00204 
00205 
00206       //      cout << "---> " << (ignoreTriggerType_?"yes":"no") << " " << getCalibTriggerType(event) << endl;
00207       
00208       if(isLaserEvent || ignoreTriggerType_){
00209         
00210         const uint32_t runNumber = getRunNumber(event);
00211         const uint32_t orbitId   = getOrbitId(event);
00212       
00213         LogInfo("Matacq") << "Run " << runNumber << "\t Orbit " << orbitId << "\n";
00214       
00215         bool fileChange;
00216         if(doOrbitOffset_){
00217           map<uint32_t,uint32_t>::iterator it = orbitOffset_.find(runNumber);
00218           if(it == orbitOffset_.end()){
00219             LogWarning("Matacq") << "Orbit offset not found for run "
00220                                  << runNumber
00221                                  << ". No orbit correction will be applied.";
00222     }
00223         }    
00224       
00225         if(getMatacqFile(runNumber, orbitId, &fileChange)){
00226           //matacq file retrieval succeeded
00227           LogInfo("Matacq") << "Matacq data file found for "
00228                             << "run " << runNumber << " orbit " << orbitId;
00229           if(getMatacqEvent(runNumber, orbitId, fileChange)){
00230             if(produceDigis_){
00231               formatter_.interpretRawData(matacq_, *digiColl);
00232             }
00233             if(produceRaw_){
00234               uint32_t dataLen64 = matacq_.getParsedLen();
00235               if(dataLen64 > bufferSize*8 || matacq_.getDccLen()!= dataLen64){
00236                 LogWarning("Matacq") << " Error in Matacq event fragment length! "
00237                                      << "DCC len: " << matacq_.getDccLen()
00238                                      << "*8 Bytes, Parsed len: "
00239                                      << matacq_.getParsedLen() << "*8 Bytes.  "
00240                                      << "Matacq data will not be included for this event.\n";
00241               } else{
00242                 rawColl->FEDData(matacqFedId_).resize(dataLen64*8);
00243                 copy(data_.begin(), data_.begin() + dataLen64*8,
00244                      rawColl->FEDData(matacqFedId_).data());
00245               }
00246             }
00247             LogInfo("Matacq") << "Associating matacq data with orbit id "
00248                               << matacq_.getOrbitId()
00249                               << " to dcc event with orbit id "
00250                               << orbitId << std::endl;
00251             if(isLaserEvent){
00252               ++stats_.nLaserEventsWithMatacq;
00253             } else{
00254               ++stats_.nNonLaserEventsWithMatacq;
00255             }
00256           } else{
00257             if(isLaserEvent){
00258               LogWarning("Matacq") << "No matacq data found for laser event "
00259                                    << "of run " << runNumber << " orbit "
00260                                    << orbitId;
00261             }
00262           }
00263         } else{
00264           LogWarning("Matacq") << "No matacq file found for event "
00265                                << event.id();
00266         }
00267       }
00268     }
00269     if(eventSkipCounter_>0){ //error occured for this events
00270       //                       and some events will be skipped following
00271       //                       to this error.
00272       LogInfo("Matacq") << " [" << now() << "] "
00273                         << eventSkipCounter_
00274                         << " next events will be skipped, following to an "
00275                         << "error on the last processed event, "
00276                         << "which is expected to be persistant.";
00277     }
00278   } else{
00279     --eventSkipCounter_;
00280   }
00281   
00282   if(produceRaw_){
00283     if(verbosity_>1) cout << "[Matacq " << now() << "] "
00284                           << "Adding FEDRawDataCollection collection "
00285                           << " to event.\n";
00286     event.put(rawColl, rawInstanceName_);
00287   }
00288 
00289   if(produceDigis_){
00290     if(verbosity_>1) cout << "[Matacq " << now() << "] "
00291                           << "Adding EcalMatacqDigiCollection collection "
00292                           << " to event.\n";
00293     event.put(digiColl, digiInstanceName_);
00294   }
00295 }
00296 
00297 // #if 0
00298 // bool
00299 // MatacqProducer::getMatacqEvent(std::ifstream& f,
00300 //                             uint32_t runNumber,
00301 //                             uint32_t orbitId,
00302 //                             bool doWrap,
00303 //                             std::streamoff maxPos){
00304 //   bool found = false;
00305 //   streampos startPos = f.tellg();
00306 
00307 //   while(!f.eof()
00308 //      && !found
00309 //      && (maxPos<0 || f.tellg()<=maxPos)){
00310 //     const streamsize headerSize = 8*8;
00311 //     f.read((char*)&data_[0], headerSize);
00312 //     if(f.eof()) break;
00313 //     int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00314 //     uint32_t len = MatacqRawEvent::getDccLen(&data_[0], headerSize);
00315 //     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
00316 //     //     cout << "Matacq: orbit = " << orb
00317 //     //        << " len = " << len
00318 //     //        << " run = " << run << endl;
00319 //     if((abs(orb-(int32_t)orbitId) < orbitTolerance_)
00320 //        && (runNumber==0 || runNumber==run)){
00321 //       found = true;
00322 //       //reads the rest of the event:
00323 //       if(data_.size() < len*8){
00324 //      throw cms::Exception("Matacq") << "Buffer overflow";
00325 //       }
00326 //       f.read((char*)&data_[0]+headerSize, len*8-headerSize);
00327 //       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
00328 //     } else{
00329 //       //moves to next event:
00330 //       f.seekg(len*8 - headerSize, ios::cur);
00331 //     }
00332 //   }
00333   
00334 //   f.clear(); //clears eof error to allow seekg  
00335 //   if(doWrap && !found){
00336 //     f.seekg(0, ios::beg);
00337 //     found =  getMatacqEvent(f, runNumber, orbitId, false, startPos);
00338 //   }
00339 //   return found;
00340 // }
00341 //#endif
00342 
00343 bool
00344 MatacqProducer::getMatacqEvent(uint32_t runNumber,
00345                                int32_t orbitId,
00346                                bool fileChange){
00347   filepos_t startPos;
00348   if(!mtell(startPos)) return false;
00349   
00350   int32_t startOrb = -1;
00351   const size_t headerSize = 8*8;
00352   if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00353     startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00354     if(startOrb<0) startOrb = 0;
00355   } else{
00356     if(verbosity_>2){
00357       cout << "[Matacq " << now() << "] Failed to read matacq header. Moved to start of "
00358         " the file.\n";
00359     }
00360     mrewind();
00361     if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00362       startPos = 0;
00363       startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00364     } else{
00365       if(verbosity_>2) cout << "[Matacq " << now() << "] Looks like matacq file is empty"
00366                             << "\n";
00367       return false;
00368     }
00369   }
00370   
00371   if(verbosity_>2) cout << "[Matacq " << now() << "] Last read orbit: " << lastOrb_
00372                         << " looking for orbit " << orbitId
00373                         << ". Current file position: " << startPos
00374                         << " Orbit at current position: " << startOrb << "\n";
00375   
00376   //  f.clear();
00377   bool didCoarseMove = false;
00378 
00379   //FIXME: case where posEtim_.invalid() is false
00380   if(!posEstim_.invalid()
00381      && (abs(lastOrb_-orbitId) > fastRetrievalThresh_)){
00382     filepos_t pos = posEstim_.pos(orbitId);
00383 
00384     //    struct stat st;
00385     filepos_t fsize;
00386     //    if(0==stat(inFileName_.c_str(), &st)){
00387     if(msize(fsize)){
00388       //      const int64_t fsize = st.st_size;
00389       if(0!=posEstim_.eventLength() && pos > fsize){
00390         //estimated position is beyong end of file
00391         //-> move to beginning of last event:
00392         int64_t evtSize = posEstim_.eventLength()*sizeof(uint64_t);
00393         pos = ((int64_t)fsize/evtSize-1)*evtSize;
00394         if(verbosity_>2){
00395           cout << "[Matacq " << now() << "] Estimated position was beyond end of file. "
00396             "Changed to " << pos << "\n";
00397         }
00398       }
00399     } else{
00400       LogWarning("Matacq") << "Failed to access file " << inFileName_ << ".";
00401     }
00402     if(pos>=0){
00403       if(verbosity_>2) cout << "[Matacq " << now() << "] jumping to estimated position "
00404                             << pos << "\n";
00405       mseek(pos, SEEK_SET, "Jumping to estimated event position");
00406       if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00407         didCoarseMove = true;
00408       } else{
00409         //estimated position might have been beyond the end of the file,
00410         //try, with original position:
00411         didCoarseMove = false;
00412         if(!mread((char*)&data_[0], headerSize, "Reading event header", true)){
00413           return false;
00414         }
00415       }
00416     } else{
00417       if(verbosity_) cout << "[Matacq " << now() << "] Event orbit outside of orbit range "
00418                        "of matacq data file events\n";
00419       return false;
00420     }
00421   }
00422   
00423   int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00424 
00425   if(didCoarseMove){
00426     //autoadjustement of threshold for coarse move:
00427     if(abs(orb-orbitId) > fastRetrievalThresh_){
00428       if(verbosity_>2) cout << "[Matacq " << now() << "] Fast retrieval threshold increased from "
00429                             << fastRetrievalThresh_;
00430       fastRetrievalThresh_ = 2*abs(orb-orbitId);
00431       if(verbosity_>2) cout << " to " << fastRetrievalThresh_ << "\n";
00432     }
00433 
00434     //if coarse move did not improve situation, rolls back:
00435     if(startOrb > 0
00436        && (abs(orb-orbitId) > abs(startOrb-orbitId))){
00437       if(verbosity_>2) cout << "[Matacq " << now() << "] Estimation (-> orbit " << orb << ") "
00438                          "was worst than original position (-> orbit "
00439                             << startOrb
00440                             << "). Restoring position (" << startPos << ").\n";
00441       mseek(startPos, SEEK_SET);
00442       mread((char*)&data_[0], headerSize, "Reading event header", true);
00443       orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00444     }
00445   }
00446   
00447   bool searchBackward = (orb>orbitId)?true:false;
00448   //BEWARE: len must be signed, because we are using latter in the code (-len)
00449   //expression
00450   int len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
00451 
00452   if(len==0){
00453     cout << "[Matacq " << now() << "] read DCC length is null! Cancels matacq event search "
00454          << " and move matacq file pointer to beginning of the file. "
00455          << "(" << __FILE__ << ":" << __LINE__ << ")."
00456          << "\n";
00457     //rewind(f);
00458     mrewind();
00459     return false;
00460   }
00461   
00462   enum state_t { searching, found, failed } state = searching;
00463   
00464   while(state == searching){
00465     orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00466     len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
00467     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
00468     if(verbosity_>3){
00469       filepos_t pos;
00470       mtell(pos);
00471       cout << "[Matacq " << now() << "] Header read at file position "
00472            << pos
00473            << ":  orbit = " << orb
00474            << " len = " << len << "x8 Byte"
00475            << " run = " << run << "\n";
00476     }
00477     if((abs(orb-orbitId) < orbitTolerance_)
00478        && (runNumber==0 || runNumber==run)){
00479       state = found;
00480       lastOrb_ = orb;
00481       //reads the rest of the event:
00482       if((int)data_.size() < len*8){
00483         throw cms::Exception("Matacq") << "Buffer overflow";
00484       }
00485       if(verbosity_>2) cout << "[Matacq " << now() << "] Event found. Reading "
00486                          " matacq event." << "\n";
00487       if(!mread((char*)&data_[0], len*8, "Reading matacq event")){
00488         if(verbosity_>2) cout << "[Matacq " << now() << "] Failed to read matacq event."
00489                               << "\n";
00490         state = failed;
00491       }
00492       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
00493     } else {
00494       if((searchBackward && (orb < orbitId))
00495          || (!searchBackward && (orb > orbitId))){ //search ended
00496         lastOrb_ = orb;      
00497         state = failed;
00498         if(verbosity_>2) cout << "[Matacq " << now()
00499                             << "] No matacq data found for run " << run
00500                             << ", orbit ID " << orbitId << "." << "\n";
00501       } else{
00502         off_t offset = (searchBackward?-len:len)*8;
00503         lastOrb_ = orb; 
00504         if(verbosity_>3){
00505           cout << "[Matacq " << now() << "] In matacq file, moving "
00506                << abs(offset) << " byte " << (offset>0?"forward":"backward")
00507                << ".\n";
00508         }       
00509 
00510         if(mseek(offset, SEEK_CUR,
00511                  (searchBackward?"Moving to previous event":
00512                   "Moving to next event"))
00513            && mread((char*)&data_[0], headerSize, "Reading event header",
00514                     true)){
00515         } else{
00516           if(!searchBackward) mseek(-len*8, SEEK_CUR,
00517                                     "Moving to start of last complete event");
00518           state = failed;
00519         }
00520       }
00521     }
00522   }
00523   
00524   if(state==found){
00525     filepos_t pos;
00526     filepos_t fsize;
00527     mtell(pos);
00528     msize(fsize);
00529     if(pos==fsize-1){ //last byte.
00530       if(verbosity_>2){
00531         cout << "[Matacq " << now() << "] Event found was at the end of the file. Moving "
00532           "stream position to beginning of this event."
00533              << "\n";
00534       }
00535       mseek(-(int)len*8-1, SEEK_CUR,
00536             "Moving to beginning of last matacq event");
00537     }
00538   }
00539   return (state==found);
00540 }
00541 
00542 
00543 bool
00544 MatacqProducer::getMatacqFile(uint32_t runNumber, uint32_t orbitId,
00545                               bool* fileChange){
00546   if(openedFileRunNumber_!=0
00547      && openedFileRunNumber_==runNumber){
00548     if(fileChange!=0) *fileChange = false;
00549     return misOpened();
00550   }
00551 
00552   if(fileNames_.size()==0) return 0;
00553 
00554   const string runNumberFormat = "%08d";
00555   string sRunNumber = str(boost::format(runNumberFormat) % runNumber);
00556   //cout << "Run number string: " << sRunNumber << "\n";
00557   bool found = false;
00558   string fname;
00559   for(unsigned i=0; i < fileNames_.size() && !found; ++i){
00560     fname = fileNames_[i];
00561     boost::algorithm::replace_all(fname, "%run_subdir%",
00562                                   runSubDir(runNumber));
00563     boost::algorithm::replace_all(fname, "%run_number%", sRunNumber);
00564 
00565     if(verbosity_>0) cout << "[Matacq " << now() << "] "
00566                           << "Looking for a file with path "
00567                           << fname << "\n";
00568     
00569     if(mcheck(fname)){
00570       LogInfo("Matacq") << "Uses matacq data file: '" << fname << "'\n";
00571       found = true;
00572     }
00573   }
00574   if(!found){
00575     if(verbosity_>=0) cout << "[Matacq " << now() << "] no matacq file found "
00576                         "for run " << runNumber << "\n";
00577     eventSkipCounter_ = onErrorDisablingEvtCnt_;
00578     openedFileRunNumber_ = 0;
00579     if(fileChange!=0) *fileChange = false;
00580     return 0;
00581   }
00582   
00583   if(!mopen(fname)){
00584     LogWarning("Matacq") << "Failed to open file " << fname << "\n";
00585     eventSkipCounter_ = onErrorDisablingEvtCnt_;
00586     openedFileRunNumber_ = 0;
00587     if(fileChange!=0) *fileChange = false;
00588     return false;
00589   } else{
00590     openedFileRunNumber_ = runNumber;
00591     lastOrb_ = 0;
00592     posEstim_.init(this);
00593     if(fileChange!=0) *fileChange = true;
00594     return true;
00595   }
00596 }
00597  
00598 
00599 uint32_t MatacqProducer::getRunNumber(edm::Event& ev) const{
00600   return ev.run();
00601 }
00602 
00603 uint32_t MatacqProducer::getOrbitId(edm::Event& ev) const{
00604   //on CVS HEAD (June 4, 08), class Event has a method orbitNumber()
00605   //we could use here. The code would be shorten to:
00606   //return ev.orbitNumber();
00607   //we have to deal with what we have in current CMSSW releases:
00608   edm::Handle<FEDRawDataCollection> rawdata;
00609   if(!(ev.getByType(rawdata) && rawdata.isValid())){
00610     throw cms::Exception("NotFound")
00611       << "No FED raw data collection found. ECAL raw data are "
00612       "required to retrieve the orbit ID";
00613   }
00614   
00615   int orbit = 0;
00616   for(int id=601; id<=654; ++id){
00617     if(!FEDNumbering::inRange(id)) continue;
00618     const FEDRawData& data = rawdata->FEDData(id);
00619     const int orbitIdOffset64 = 3;
00620     if(data.size()>=8*(orbitIdOffset64+1)){//orbit id is in 4th 64-bit word
00621       const unsigned char* pOrbit = data.data() + orbitIdOffset64*8;
00622       int thisOrbit = pOrbit[0]
00623         | (pOrbit[1] <<8)
00624         | (pOrbit[2] <<16)
00625         | (pOrbit[3] <<24);
00626       if(orbit!=0 && thisOrbit!=0 && abs(orbit-thisOrbit)>orbitTolerance_){
00627         //throw cms::Exception("EventCorruption")
00628         //  << "Orbit ID inconsitency in DCC headers";
00629         LogWarning("EventCorruption")
00630           << "Orbit ID inconsitency in DCC headers";
00631         orbit = 0;
00632         break;
00633       }
00634       if(thisOrbit!=0) orbit = thisOrbit;
00635     }
00636   }
00637   
00638   if(orbit==0){
00639     //    throw cms::Exception("NotFound")
00640     //  << "Failed to retrieve orbit ID of event "<< ev.id();
00641     LogWarning("NotFound") << "Failed to retrieve orbit ID of event "
00642                                 << ev.id();
00643   }
00644   return orbit;
00645 }
00646  
00647 int MatacqProducer::getCalibTriggerType(edm::Event& ev) const{  
00648   edm::Handle<FEDRawDataCollection> rawdata;
00649   if(!(ev.getByType(rawdata) && rawdata.isValid())){
00650     throw cms::Exception("NotFound")
00651       << "No FED raw data collection found. ECAL raw data are "
00652       "required to retrieve the trigger type";
00653   }
00654   
00655   Majority<int> stat;
00656   for(int id=601; id<=654; ++id){
00657     if(!FEDNumbering::inRange(id)) continue;
00658     const FEDRawData& data = rawdata->FEDData(id);
00659     const int detailedTrigger32 = 5;
00660     if(data.size()>=4*(detailedTrigger32+1)){
00661       const unsigned char* pTType = data.data() + detailedTrigger32*4;
00662       int tType = pTType[1] & 0x7;
00663       stat.add(tType);
00664     }
00665   }
00666   double p;
00667   int tType = stat.result(&p);
00668   if(p<0){
00669     //throw cms::Exception("NotFound") << "No ECAL DCC data found\n";
00670     LogWarning("NotFound")  << "No ECAL DCC data found\n";
00671     tType = -1;
00672   }
00673   if(p<.8){
00674     //throw cms::Exception("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
00675     LogWarning("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
00676     tType = -1;
00677   }
00678   return tType;
00679 }
00680   
00681 void MatacqProducer::PosEstimator::init(MatacqProducer* mp){
00682   mp->mrewind();
00683 
00684   const size_t headerSize = 8*8;
00685   unsigned char data[headerSize];
00686   if(!mp->mread((char*)data, headerSize)){
00687     if(verbosity_) cout << "[Matacq " << now() << "] reached end of file!\n"; 
00688     firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
00689     return;
00690   } else{
00691     firstOrbit_ = MatacqRawEvent::getOrbitId(data, headerSize);
00692     eventLength_ = MatacqRawEvent::getDccLen(data, headerSize);
00693     if(verbosity_>1) cout << "[Matacq " << now() << "] First event orbit: " << firstOrbit_
00694                           << " event length: " << eventLength_
00695                           << "*8 byte\n";
00696   }
00697 
00698   mp->mrewind();
00699   
00700   if(eventLength_==0){    
00701     if(verbosity_) cout << "[Matacq " << now() << "] event length is null!" << endl; 
00702     return;
00703   }
00704 
00705   filepos_t s;
00706   mp->msize(s);
00707 
00708   //number of complete events:
00709   const unsigned nEvents = s/eventLength_/8;
00710 
00711   if(nEvents==0){
00712     if(verbosity_) cout << "[Matacq " << now() << "] File is empty!" << endl;
00713     orbitStepMean_ = 0;
00714     return;
00715   }
00716 
00717   if(verbosity_>1) cout << "[Matacq " << now() << "] File size: " << s
00718                         << " Number of events: " << nEvents << endl;
00719   
00720   //position of last complete events:
00721   off_t last = (nEvents-1)*(off_t)eventLength_*8;
00722   mp->mseek(last, SEEK_SET, "Moving to beginning of last complete "
00723             "matacq event");
00724   if(!mp->mread((char*) data, headerSize, "Reading matacq header", true)){
00725     LogWarning("Matacq") << "Fast matacq event retrieval failure. "
00726       "Falling back to safe retrieval mode.";
00727     orbitStepMean_ = 0;
00728   }
00729   
00730   int32_t lastOrb = MatacqRawEvent::getOrbitId(data, headerSize);
00731   int32_t lastLen = MatacqRawEvent::getDccLen(data, headerSize);
00732 
00733   if(verbosity_>1) cout << "[Matacq " << now() << "] Last event orbit: " << lastOrb
00734                         << " last event length: " << lastLen << endl;
00735   
00736   //some consistency check
00737   if(lastLen!=eventLength_){
00738     LogWarning("Matacq")
00739       //throw cms::Exception("Matacq")
00740       << "Fast matacq event retrieval failure: it looks like "
00741       "the matacq file contains events of different sizes.";
00742       //      " Falling back to safe retrieval mode.";
00743     invalid_ = false; //true;
00744     orbitStepMean_ = 112; //0;
00745     return;
00746   }
00747 
00748   orbitStepMean_ = (lastOrb - firstOrbit_)/nEvents;
00749   
00750   if(verbosity_>1) cout << "[Matacq " << now() << "] Orbit step mean: " << orbitStepMean_
00751                         << "\n";
00752 
00753   invalid_ = false;
00754 }
00755 
00756 int64_t MatacqProducer::PosEstimator::pos(int orb) const{
00757   if(orb<firstOrbit_) return -1;
00758   uint64_t r = orbitStepMean_!=0?
00759     (((uint64_t)(orb-firstOrbit_))/orbitStepMean_)*eventLength_*8
00760     :0;
00761   if(verbosity_>2) cout << "[Matacq " << now() << "] Estimated Position for orbit  " << orb
00762                         << ": " << r << endl;
00763   return r;
00764 }
00765 
00766 MatacqProducer::~MatacqProducer(){
00767   mclose();
00768   timeval t;
00769   gettimeofday(&t, 0);
00770   if(logTiming_ && startTime_.tv_sec!=0){
00771     //not using logger, to allow timing with different logging options
00772     cout << "[Matacq " << now() << "] Time elapsed between first event and "
00773       "destruction of MatacqProducer: "
00774          << ((t.tv_sec-startTime_.tv_sec)*1.
00775              + (t.tv_usec-startTime_.tv_usec)*1.e-6) << "s\n";
00776   }
00777 }
00778 
00779 void MatacqProducer::loadOrbitOffset(){
00780   ifstream f(orbitOffsetFile_.c_str());
00781   if(f.bad()){
00782     throw cms::Exception("Matacq")
00783       << "Failed to open orbit ID correction file '"
00784       << orbitOffsetFile_ << "'\n";
00785   }
00786 
00787   cout << "[Matacq " << now() << "] "
00788        << "Offset to substract to Matacq events Orbit ID: \n"
00789        << "#Run Number\t Offset\n";
00790 
00791   int iline = 0;
00792   string s;
00793   stringstream buf;
00794   while(f.eof()){
00795     getline(f, s);
00796     ++iline;
00797     if(s[0]=='#'){//comment
00798       //skip line:
00799       f.ignore(numeric_limits<streamsize>::max(), '\n');
00800       continue;
00801     }
00802     buf.str("");
00803     buf << s;
00804     int run;
00805     int orbit;
00806     buf >> run;
00807     buf >> orbit;
00808     if(buf.bad()){
00809       throw cms::Exception("Matacq")
00810         << "Syntax error in Orbit offset file '"
00811         << orbitOffsetFile_ << "'";
00812     }
00813     cout << run << "\t" << orbit << "\n";
00814     orbitOffset_.insert(pair<int, int>(run, orbit));
00815   }
00816 }
00817 
00818 #ifdef USE_STORAGE_MANAGER
00819 bool MatacqProducer::mseek(filepos_t offset, int whence, const char* mess){
00820   if(0==inFile_.get()) return false;
00821   try{
00822     Storage::Relative wh;
00823     if(whence==SEEK_SET) wh = Storage::SET;
00824     else if(whence==SEEK_CUR) wh = Storage::CURRENT;
00825     else if(whence==SEEK_END) wh = Storage::END;
00826     else throw cms::Exception("Bug") << "Bug found in "
00827                                      << __FILE__ << ": "<< __LINE__ << "\n";
00828                    
00829     inFile_->position(offset, wh);
00830   } catch(cms::Exception& e){
00831     if(verbosity_){
00832       cout << "[Matacq " << now() << "] ";
00833       if(mess) cout << mess << ". ";
00834       cout << "Random access error on input matacq file. ";
00835       if(whence==SEEK_SET) cout << "Failed to seek absolute position " << offset;
00836       else if(whence==SEEK_CUR) cout << "Failed to move " << offset << " bytes forward";
00837       else if(whence==SEEK_END) cout << "Failed to seek position at " << offset << " bytes before end of file";
00838         cout << ". Reopening file. " << e.what() << "\n";
00839       mopen(inFileName_);
00840       return false;
00841     }
00842   }
00843   return true;
00844 }
00845 
00846 bool MatacqProducer::mtell(filepos_t& pos){
00847   if(0==inFile_.get()) return false;
00848   pos = inFile_->position();
00849   return true;
00850 }
00851 
00852 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
00853   if(0==inFile_.get()) return false;
00854   
00855   filepos_t pos;
00856   if(!mtell(pos)) return false;
00857 
00858   bool rc = false;
00859   try{
00860     rc =  (n==inFile_->xread(buf, n));
00861   } catch(cms::Exception& e){
00862     if(verbosity_){
00863       cout << "[Matacq " << now() << "] ";
00864       if(mess) cout << mess << ". ";
00865       cout << "Read failure from input matacq file: "
00866            << e.what() << "\n";
00867     }
00868     //recovering from error:
00869     mopen(inFileName_);
00870     mseek(pos);
00871     return false;
00872   }
00873   if(peek){//asked to restore original file position
00874     mseek(pos);
00875   }
00876   return rc;
00877 }
00878 
00879 bool MatacqProducer::msize(filepos_t& s){
00880   if(inFile_.get()==0) return false;
00881   s = inFile_.get()->size();
00882   return true;
00883 }
00884 
00885 bool MatacqProducer::mrewind(){
00886   Storage* file = inFile_.get();
00887   if(file==0) return false;
00888   try{
00889     file->rewind();
00890   } catch(cms::Exception e){
00891     if(verbosity_) cout << "Exception cautgh while rewinding file "
00892                         << inFileName_ << ": " << e.what() << ". "
00893                         << "File will be reopened.";
00894     return mopen(inFileName_);
00895   }
00896   return true;
00897 }
00898 
00899 bool MatacqProducer::mcheck(const std::string& name){
00900   return StorageFactory::get()->check(name);
00901 }
00902 
00903 bool MatacqProducer::mopen(const std::string& name){
00904   //close already opened file if any:
00905   mclose();
00906   
00907   try{
00908     inFile_
00909       = auto_ptr<Storage>(StorageFactory::get()->open(name,
00910                                                       IOFlags::OpenRead));
00911     inFileName_ = name;
00912   } catch(cms::Exception& e){
00913     LogWarning("Matacq") << e.what();
00914     inFile_.reset();
00915     inFileName_ = "";
00916     return false;
00917   }
00918   return true;
00919 }
00920 
00921 void MatacqProducer::mclose(){
00922   if(inFile_.get()!=0){
00923     inFile_->close();
00924     inFile_.reset();
00925   }
00926 }
00927 
00928 bool MatacqProducer::misOpened(){
00929   return inFile_.get()!=0;
00930 }
00931 
00932 bool MatacqProducer::meof(){
00933   if(inFile_.get()==0) return true;
00934   return inFile_->eof();
00935 }
00936 
00937 #else //USE_STORAGE_MANAGER not defined
00938 bool MatacqProducer::mseek(off_t offset, int whence, const char* mess){
00939   if(0==inFile_) return false;
00940   const int rc = fseeko(inFile_, offset, whence);
00941   if(rc!=0 && verbosity_){
00942     cout << "[Matacq " << now() << "] ";
00943     if(mess) cout << mess << ". ";
00944     cout << "Random access error on input matacq file. "
00945       "Rewind file.\n";
00946     mrewind();
00947   }
00948   return rc==0;
00949 }
00950 
00951 bool MatacqProducer::mtell(filepos_t& pos){
00952   if(0==inFile_) return false;
00953   pos = ftello(inFile_);
00954   return pos != -1;
00955     
00956 }
00957 
00958 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
00959   if(0==inFile_) return false;
00960   off_t pos = ftello(inFile_);
00961   bool rc = (pos!=-1) && (1==fread(buf, n, 1, inFile_));
00962   if(!rc){
00963     if(verbosity_){
00964       cout << "[Matacq " << now() << "] ";
00965       if(mess) cout << mess << ". ";
00966       cout << "Read failure from input matacq file.\n";
00967     }
00968     clearerr(inFile_);
00969   }
00970   if(peek || !rc){//need to restore file position
00971     if(0!=fseeko(inFile_, pos, SEEK_SET)){
00972       if(verbosity_){
00973         cout << "[Matacq " << now() << "] ";
00974         if(mess) cout << mess << ". ";
00975         cout << "Failed to restore file position of "
00976           "before read error. Rewind file.\n";
00977       }
00978       //rewind(inFile_.get());
00979       mrewind();
00980       lastOrb_ = 0;
00981     }
00982   }
00983   return rc;
00984 }
00985 
00986 bool MatacqProducer::msize(filepos_t& s){
00987   if(0==inFile_) return false;
00988   struct stat buf;
00989   if(0!=fstat(fileno(inFile_), &buf)){
00990     s = 0;
00991     return false;
00992   } else{
00993     s = buf.st_size;
00994     return true;
00995   }
00996 }
00997 
00998 bool MatacqProducer::mrewind(){
00999   if(0==inFile_) return false;
01000   clearerr(inFile_);
01001   return fseeko(inFile_, 0, SEEK_SET)!=0; 
01002 }
01003 
01004 bool MatacqProducer::mcheck(const std::string& name){
01005   struct stat dummy;
01006   return 0==stat(name.c_str(), &dummy);
01007 //   if(stat(name.c_str(), &dummy)==0){
01008 //     return true;
01009 //   } else{
01010 //     cout << "[Matacq " << now() << "] Failed to stat file '"
01011 //       << name.c_str() << "'. " 
01012 //       << "Error " << errno << ": " << strerror(errno) << "\n";
01013 //     return false;
01014 //   }
01015 }
01016 
01017 bool MatacqProducer::mopen(const std::string& name){
01018   if(inFile_!=0) mclose();
01019   inFile_ = fopen(name.c_str(), "r");
01020   if(inFile_!=0){
01021     inFileName_ = name;
01022     return true;
01023   } else{
01024     inFileName_ = "";
01025     return false;
01026   }
01027 }
01028 
01029 void MatacqProducer::mclose(){
01030   if(inFile_!=0) fclose(inFile_);
01031   inFile_ = 0;
01032 }
01033 
01034 bool MatacqProducer::misOpened(){
01035   return inFile_!=0;
01036 }
01037 
01038 bool MatacqProducer::meof(){
01039   if(0==inFile_) return true;
01040   return feof(inFile_)==0;
01041 }
01042 
01043 #endif //USE_STORAGE_MANAGER defined
01044 
01045 std::string MatacqProducer::runSubDir(uint32_t runNumber){
01046   int millions = runNumber / (1000*1000);
01047   int thousands = (runNumber-millions*1000*1000) / 1000;
01048   int units = runNumber-millions*1000*1000 - thousands*1000;
01049   return str(boost::format("%03d/%03d/%03d") % millions % thousands % units);
01050 }
01051 
01052 void MatacqProducer::newRun(int prevRun, int newRun){
01053     runNumber_ = newRun;
01054     eventSkipCounter_ = 0;
01055     logFile_ << "[" << now() << "] Event count for run "
01056              << runNumber_ << ": "
01057              << "total: " << stats_.nEvents << ", "
01058              << "Laser event with Matacq data: "
01059              << stats_.nLaserEventsWithMatacq << ", "
01060              << "Non laser event (according to DCC header) with Matacq data: "
01061              << stats_.nNonLaserEventsWithMatacq << "\n" << flush;
01062 
01063     stats_.nEvents = 0;
01064     stats_.nLaserEventsWithMatacq = 0;
01065     stats_.nNonLaserEventsWithMatacq = 0;
01066 
01067     
01068 }