CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_2_SLHC2_patch1/src/EventFilter/EcalRawToDigi/plugins/MatacqProducer.cc

Go to the documentation of this file.
00001 #include "EventFilter/EcalRawToDigi/interface/MatacqProducer.h"
00002 #include "EventFilter/EcalRawToDigi/src/Majority.h"
00003 #include "FWCore/Framework/interface/Event.h"
00004 #include "FWCore/Framework/interface/EventSetup.h"
00005 #include "Geometry/Records/interface/IdealGeometryRecord.h"
00006 #include "FWCore/Framework/interface/ESHandle.h"
00007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00008 #include "DataFormats/FEDRawData/interface/FEDRawData.h"
00009 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00010 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00011 #include "DataFormats/EcalDigi/interface/EcalMatacqDigi.h"
00012 #include "DataFormats/EcalDigi/interface/EcalDigiCollections.h"
00013 #include <boost/algorithm/string.hpp>
00014 #include <boost/format.hpp>
00015 
00016 #include <memory>
00017 
00018 #include <stdio.h>
00019 
00020 #include <fstream>
00021 #include <iomanip>
00022 
00023 #include "DataFormats/EcalDigi/interface/EcalMatacqDigi.h"
00024 
00025 #include <sys/types.h>
00026 #include <unistd.h>
00027 #include <signal.h>
00028 #include <sys/stat.h>
00029 
00030 using namespace std;
00031 using namespace boost;
00032 using namespace edm;
00033 
00034 // #undef LogInfo
00035 // #define LogInfo(a) cout << "INFO " << a << ": "
00036 // #undef LogWarning
00037 // #define LogWarning(a) cout << "WARN " << a << ": "
00038 // #undef LogDebug
00039 // #define LogDebug(a) cout << "DBG " << a << ": "
00040 
00041 
00042 //verbose mode for matacq event retrieval debugging:
00043 //static const bool searchDbg = false;
00044 
00045 //laser freq is 1 every 112 orbit => >80 orbit
00046 int MatacqProducer::orbitTolerance_ = 80; 
00047 
00048 MatacqProducer::stats_t MatacqProducer::stats_init = {0,0,0};
00049 
00050 static std::string now(){
00051   struct timeval t;
00052   gettimeofday(&t, 0);
00053  
00054   char buf[256];
00055   strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
00056   buf[sizeof(buf)-1] = 0;
00057 
00058   stringstream buf2;
00059   buf2 << buf << " " << ((t.tv_usec+500)/1000)  << " ms";
00060 
00061   return buf2.str();
00062 }
00063 
00064 
00065 MatacqProducer::MatacqProducer(const edm::ParameterSet& params):
00066   fileNames_(params.getParameter<std::vector<std::string> >("fileNames")),
00067   digiInstanceName_(params.getParameter<string>("digiInstanceName")),
00068   rawInstanceName_(params.getParameter<string>("rawInstanceName")),
00069   timing_(params.getUntrackedParameter<bool>("timing", false)),
00070   disabled_(params.getParameter<bool>("disabled")),
00071   verbosity_(params.getUntrackedParameter<int>("verbosity", 0)),
00072   produceDigis_(params.getParameter<bool>("produceDigis")),
00073   produceRaw_(params.getParameter<bool>("produceRaw")),
00074   inputRawCollection_(params.getParameter<InputTag>("inputRawCollection")),
00075   mergeRaw_(params.getParameter<bool>("mergeRaw")),
00076   ignoreTriggerType_(params.getParameter<bool>("ignoreTriggerType")),
00077   matacq_(0, 0),
00078   inFile_(0),
00079   data_(bufferSize),
00080   openedFileRunNumber_(0),
00081   lastOrb_(0),
00082   fastRetrievalThresh_(0),
00083   orbitOffsetFile_(params.getUntrackedParameter<std::string>("orbitOffsetFile",
00084                                                              "")),
00085   inFileName_(""),
00086   stats_(stats_init),
00087   logFileName_(params.getUntrackedParameter<std::string>("logFileName",
00088                                                          "matacqProducer.log")),
00089   eventSkipCounter_(0),
00090   onErrorDisablingEvtCnt_(params.getParameter<int>("onErrorDisablingEvtCnt")),
00091   timeLogFile_(params.getUntrackedParameter<std::string>("timeLogFile", "")),
00092   runNumber_(0)
00093 {
00094   if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer ctor"  << endl;
00095   
00096   gettimeofday(&timer_, 0);
00097 
00098   if(timeLogFile_.size()>0){
00099     timeLog_.open(timeLogFile_.c_str());
00100     if(timeLog_.fail()){
00101       cout << "[LaserSorter " << now() << "] "
00102            << "Failed to open file " << timeLogFile_ << " to log timing.\n";
00103       logTiming_ = false;
00104     } else{
00105       logTiming_ = true;
00106     }
00107   }
00108   
00109   posEstim_.verbosity(verbosity_);
00110 
00111   logFile_.open(logFileName_.c_str(), ios::app | ios::out);
00112   
00113   if(logFile_.bad()){
00114     throw cms::Exception("FileOpen") << "Failed to open file "
00115                                      << logFileName_ << " for logging.\n";
00116   }
00117 
00118   if(produceDigis_){
00119     if(verbosity_>0) cout << "[Matacq " << now() << "] registering new "
00120                        "EcalMatacqDigiCollection product with instance name '"
00121                           << digiInstanceName_ << "'\n";
00122     produces<EcalMatacqDigiCollection>(digiInstanceName_);
00123   }
00124   
00125   if(produceRaw_){
00126     if(verbosity_>0) cout << "[Matacq " << now() << "] registering new FEDRawDataCollection "
00127                        "product with instance name '"
00128                           << rawInstanceName_ << "'\n";
00129     produces<FEDRawDataCollection>(rawInstanceName_);
00130   }
00131   
00132   startTime_.tv_sec = startTime_.tv_usec = 0;
00133   if(orbitOffsetFile_.size()>0){
00134     doOrbitOffset_ = true;
00135     loadOrbitOffset();
00136   } else{
00137     doOrbitOffset_ = false;
00138   }
00139   if(verbosity_>=4) cout << "[Matacq " << now() << "] exiting MatacqProducer ctor"  << endl;
00140 }
00141 
00142 
00143 void
00144 MatacqProducer::produce(edm::Event& event, const edm::EventSetup& eventSetup){
00145   if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer::produce"  << endl;
00146   if(logTiming_){
00147     timeval t;
00148     gettimeofday(&t, 0);
00149 
00150     timeLog_ << t.tv_sec << "."
00151              << setfill('0') << setw(3) << (t.tv_usec+500)/1000 << setfill(' ')<< "\t"
00152              << (t.tv_usec - timer_.tv_usec)*1. 
00153       + (t.tv_sec - timer_.tv_sec)*1.e6 << "\t";
00154     timer_ = t;
00155   } 
00156  
00157   if(startTime_.tv_sec==0) gettimeofday(&startTime_, 0);
00158   ++stats_.nEvents;  
00159   if(disabled_) return;
00160   const uint32_t runNumber = getRunNumber(event);
00161   if(runNumber!=runNumber_){
00162     newRun(runNumber_, runNumber);
00163   }
00164   addMatacqData(event);
00165 
00166   if(logTiming_){
00167     timeval t;
00168       gettimeofday(&t, 0);
00169       timeLog_ << (t.tv_usec - timer_.tv_usec)*1. 
00170         + (t.tv_sec - timer_.tv_sec)*1.e6 << "\n";
00171       timer_ = t;
00172   }
00173 }
00174 
00175 void
00176 MatacqProducer::addMatacqData(edm::Event& event){
00177 
00178   edm::Handle<FEDRawDataCollection> sourceColl;
00179   event.getByLabel(inputRawCollection_, sourceColl);
00180 
00181   std::auto_ptr<FEDRawDataCollection> rawColl;
00182   if(produceRaw_){
00183     if(mergeRaw_){
00184       rawColl = auto_ptr<FEDRawDataCollection>(new FEDRawDataCollection(*sourceColl));
00185     } else{
00186       rawColl = auto_ptr<FEDRawDataCollection>(new FEDRawDataCollection());
00187     }
00188   }
00189   
00190   std::auto_ptr<EcalMatacqDigiCollection>
00191     digiColl(new EcalMatacqDigiCollection());
00192   
00193   if(eventSkipCounter_==0){
00194     if(sourceColl->FEDData(matacqFedId_).size()>4 && !produceRaw_){
00195       //input raw data collection already contains matacqData
00196       formatter_.interpretRawData(sourceColl->FEDData(matacqFedId_),
00197                                   *digiColl);             
00198     } else{
00199       bool isLaserEvent = (getCalibTriggerType(event) == laserType);
00200 
00201 
00202       //      cout << "---> " << (ignoreTriggerType_?"yes":"no") << " " << getCalibTriggerType(event) << endl;
00203       
00204       if(isLaserEvent || ignoreTriggerType_){
00205         
00206         const uint32_t runNumber = getRunNumber(event);
00207         const uint32_t orbitId   = getOrbitId(event);
00208       
00209         LogInfo("Matacq") << "Run " << runNumber << "\t Orbit " << orbitId << "\n";
00210       
00211         bool fileChange;
00212         if(doOrbitOffset_){
00213           map<uint32_t,uint32_t>::iterator it = orbitOffset_.find(runNumber);
00214           if(it == orbitOffset_.end()){
00215             LogWarning("Matacq") << "Orbit offset not found for run "
00216                                  << runNumber
00217                                  << ". No orbit correction will be applied.";
00218     }
00219         }    
00220       
00221         if(getMatacqFile(runNumber, orbitId, &fileChange)){
00222           //matacq file retrieval succeeded
00223           LogInfo("Matacq") << "Matacq data file found for "
00224                             << "run " << runNumber << " orbit " << orbitId;
00225           if(getMatacqEvent(runNumber, orbitId, fileChange)){
00226             if(produceDigis_){
00227               formatter_.interpretRawData(matacq_, *digiColl);
00228             }
00229             if(produceRaw_){
00230               uint32_t dataLen64 = matacq_.getParsedLen();
00231               if(dataLen64 > bufferSize*8 || matacq_.getDccLen()!= dataLen64){
00232                 LogWarning("Matacq") << " Error in Matacq event fragment length! "
00233                                      << "DCC len: " << matacq_.getDccLen()
00234                                      << "*8 Bytes, Parsed len: "
00235                                      << matacq_.getParsedLen() << "*8 Bytes.  "
00236                                      << "Matacq data will not be included for this event.\n";
00237               } else{
00238                 rawColl->FEDData(matacqFedId_).resize(dataLen64*8);
00239                 copy(data_.begin(), data_.begin() + dataLen64*8,
00240                      rawColl->FEDData(matacqFedId_).data());
00241               }
00242             }
00243             LogInfo("Matacq") << "Associating matacq data with orbit id "
00244                               << matacq_.getOrbitId()
00245                               << " to dcc event with orbit id "
00246                               << orbitId << std::endl;
00247             if(isLaserEvent){
00248               ++stats_.nLaserEventsWithMatacq;
00249             } else{
00250               ++stats_.nNonLaserEventsWithMatacq;
00251             }
00252           } else{
00253             if(isLaserEvent){
00254               LogWarning("Matacq") << "No matacq data found for laser event "
00255                                    << "of run " << runNumber << " orbit "
00256                                    << orbitId;
00257             }
00258           }
00259         } else{
00260           LogWarning("Matacq") << "No matacq file found for event "
00261                                << event.id();
00262         }
00263       }
00264     }
00265     if(eventSkipCounter_>0){ //error occured for this events
00266       //                       and some events will be skipped following
00267       //                       to this error.
00268       LogInfo("Matacq") << " [" << now() << "] "
00269                         << eventSkipCounter_
00270                         << " next events will be skipped, following to an "
00271                         << "error on the last processed event, "
00272                         << "which is expected to be persistant.";
00273     }
00274   } else{
00275     --eventSkipCounter_;
00276   }
00277   
00278   if(produceRaw_){
00279     if(verbosity_>1) cout << "[Matacq " << now() << "] "
00280                           << "Adding FEDRawDataCollection collection "
00281                           << " to event.\n";
00282     event.put(rawColl, rawInstanceName_);
00283   }
00284 
00285   if(produceDigis_){
00286     if(verbosity_>1) cout << "[Matacq " << now() << "] "
00287                           << "Adding EcalMatacqDigiCollection collection "
00288                           << " to event.\n";
00289     event.put(digiColl, digiInstanceName_);
00290   }
00291 }
00292 
00293 // #if 0
00294 // bool
00295 // MatacqProducer::getMatacqEvent(std::ifstream& f,
00296 //                             uint32_t runNumber,
00297 //                             uint32_t orbitId,
00298 //                             bool doWrap,
00299 //                             std::streamoff maxPos){
00300 //   bool found = false;
00301 //   streampos startPos = f.tellg();
00302 
00303 //   while(!f.eof()
00304 //      && !found
00305 //      && (maxPos<0 || f.tellg()<=maxPos)){
00306 //     const streamsize headerSize = 8*8;
00307 //     f.read((char*)&data_[0], headerSize);
00308 //     if(f.eof()) break;
00309 //     int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00310 //     uint32_t len = MatacqRawEvent::getDccLen(&data_[0], headerSize);
00311 //     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
00312 //     //     cout << "Matacq: orbit = " << orb
00313 //     //        << " len = " << len
00314 //     //        << " run = " << run << endl;
00315 //     if((abs(orb-(int32_t)orbitId) < orbitTolerance_)
00316 //        && (runNumber==0 || runNumber==run)){
00317 //       found = true;
00318 //       //reads the rest of the event:
00319 //       if(data_.size() < len*8){
00320 //      throw cms::Exception("Matacq") << "Buffer overflow";
00321 //       }
00322 //       f.read((char*)&data_[0]+headerSize, len*8-headerSize);
00323 //       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
00324 //     } else{
00325 //       //moves to next event:
00326 //       f.seekg(len*8 - headerSize, ios::cur);
00327 //     }
00328 //   }
00329   
00330 //   f.clear(); //clears eof error to allow seekg  
00331 //   if(doWrap && !found){
00332 //     f.seekg(0, ios::beg);
00333 //     found =  getMatacqEvent(f, runNumber, orbitId, false, startPos);
00334 //   }
00335 //   return found;
00336 // }
00337 //#endif
00338 
00339 bool
00340 MatacqProducer::getMatacqEvent(uint32_t runNumber,
00341                                int32_t orbitId,
00342                                bool fileChange){
00343   filepos_t startPos;
00344   if(!mtell(startPos)) return false;
00345   
00346   int32_t startOrb = -1;
00347   const size_t headerSize = 8*8;
00348   if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00349     startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00350     if(startOrb<0) startOrb = 0;
00351   } else{
00352     if(verbosity_>2){
00353       cout << "[Matacq " << now() << "] Failed to read matacq header. Moved to start of "
00354         " the file.\n";
00355     }
00356     mrewind();
00357     if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00358       startPos = 0;
00359       startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00360     } else{
00361       if(verbosity_>2) cout << "[Matacq " << now() << "] Looks like matacq file is empty"
00362                             << "\n";
00363       return false;
00364     }
00365   }
00366   
00367   if(verbosity_>2) cout << "[Matacq " << now() << "] Last read orbit: " << lastOrb_
00368                         << " looking for orbit " << orbitId
00369                         << ". Current file position: " << startPos
00370                         << " Orbit at current position: " << startOrb << "\n";
00371   
00372   //  f.clear();
00373   bool didCoarseMove = false;
00374 
00375   //FIXME: case where posEtim_.invalid() is false
00376   if(!posEstim_.invalid()
00377      && (abs(lastOrb_-orbitId) > fastRetrievalThresh_)){
00378     filepos_t pos = posEstim_.pos(orbitId);
00379 
00380     //    struct stat st;
00381     filepos_t fsize;
00382     //    if(0==stat(inFileName_.c_str(), &st)){
00383     if(msize(fsize)){
00384       //      const int64_t fsize = st.st_size;
00385       if(0!=posEstim_.eventLength() && pos > fsize){
00386         //estimated position is beyong end of file
00387         //-> move to beginning of last event:
00388         int64_t evtSize = posEstim_.eventLength()*sizeof(uint64_t);
00389         pos = ((int64_t)fsize/evtSize-1)*evtSize;
00390         if(verbosity_>2){
00391           cout << "[Matacq " << now() << "] Estimated position was beyond end of file. "
00392             "Changed to " << pos << "\n";
00393         }
00394       }
00395     } else{
00396       LogWarning("Matacq") << "Failed to access file " << inFileName_ << ".";
00397     }
00398     if(pos>=0){
00399       if(verbosity_>2) cout << "[Matacq " << now() << "] jumping to estimated position "
00400                             << pos << "\n";
00401       mseek(pos, SEEK_SET, "Jumping to estimated event position");
00402       if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00403         didCoarseMove = true;
00404       } else{
00405         //estimated position might have been beyond the end of the file,
00406         //try, with original position:
00407         didCoarseMove = false;
00408         if(!mread((char*)&data_[0], headerSize, "Reading event header", true)){
00409           return false;
00410         }
00411       }
00412     } else{
00413       if(verbosity_) cout << "[Matacq " << now() << "] Event orbit outside of orbit range "
00414                        "of matacq data file events\n";
00415       return false;
00416     }
00417   }
00418   
00419   int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00420 
00421   if(didCoarseMove){
00422     //autoadjustement of threshold for coarse move:
00423     if(abs(orb-orbitId) > fastRetrievalThresh_){
00424       if(verbosity_>2) cout << "[Matacq " << now() << "] Fast retrieval threshold increased from "
00425                             << fastRetrievalThresh_;
00426       fastRetrievalThresh_ = 2*abs(orb-orbitId);
00427       if(verbosity_>2) cout << " to " << fastRetrievalThresh_ << "\n";
00428     }
00429 
00430     //if coarse move did not improve situation, rolls back:
00431     if(startOrb > 0
00432        && (abs(orb-orbitId) > abs(startOrb-orbitId))){
00433       if(verbosity_>2) cout << "[Matacq " << now() << "] Estimation (-> orbit " << orb << ") "
00434                          "was worst than original position (-> orbit "
00435                             << startOrb
00436                             << "). Restoring position (" << startPos << ").\n";
00437       mseek(startPos, SEEK_SET);
00438       mread((char*)&data_[0], headerSize, "Reading event header", true);
00439       orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00440     }
00441   }
00442   
00443   bool searchBackward = (orb>orbitId)?true:false;
00444   //BEWARE: len must be signed, because we are using latter in the code (-len)
00445   //expression
00446   int len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
00447 
00448   if(len==0){
00449     cout << "[Matacq " << now() << "] read DCC length is null! Cancels matacq event search "
00450          << " and move matacq file pointer to beginning of the file. "
00451          << "(" << __FILE__ << ":" << __LINE__ << ")."
00452          << "\n";
00453     //rewind(f);
00454     mrewind();
00455     return false;
00456   }
00457   
00458   enum state_t { searching, found, failed } state = searching;
00459   
00460   while(state == searching){
00461     orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00462     len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
00463     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
00464     if(verbosity_>3){
00465       filepos_t pos = -1;
00466       mtell(pos);
00467       cout << "[Matacq " << now() << "] Header read at file position "
00468            << pos
00469            << ":  orbit = " << orb
00470            << " len = " << len << "x8 Byte"
00471            << " run = " << run << "\n";
00472     }
00473     if((abs(orb-orbitId) < orbitTolerance_)
00474        && (runNumber==0 || runNumber==run)){
00475       state = found;
00476       lastOrb_ = orb;
00477       //reads the rest of the event:
00478       if((int)data_.size() < len*8){
00479         throw cms::Exception("Matacq") << "Buffer overflow";
00480       }
00481       if(verbosity_>2) cout << "[Matacq " << now() << "] Event found. Reading "
00482                          " matacq event." << "\n";
00483       if(!mread((char*)&data_[0], len*8, "Reading matacq event")){
00484         if(verbosity_>2) cout << "[Matacq " << now() << "] Failed to read matacq event."
00485                               << "\n";
00486         state = failed;
00487       }
00488       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
00489     } else {
00490       if((searchBackward && (orb < orbitId))
00491          || (!searchBackward && (orb > orbitId))){ //search ended
00492         lastOrb_ = orb;      
00493         state = failed;
00494         if(verbosity_>2) cout << "[Matacq " << now()
00495                             << "] No matacq data found for run " << run
00496                             << ", orbit ID " << orbitId << "." << "\n";
00497       } else{
00498         off_t offset = (searchBackward?-len:len)*8;
00499         lastOrb_ = orb; 
00500         if(verbosity_>3){
00501           cout << "[Matacq " << now() << "] In matacq file, moving "
00502                << abs(offset) << " byte " << (offset>0?"forward":"backward")
00503                << ".\n";
00504         }       
00505 
00506         if(mseek(offset, SEEK_CUR,
00507                  (searchBackward?"Moving to previous event":
00508                   "Moving to next event"))
00509            && mread((char*)&data_[0], headerSize, "Reading event header",
00510                     true)){
00511         } else{
00512           if(!searchBackward) mseek(-len*8, SEEK_CUR,
00513                                     "Moving to start of last complete event");
00514           state = failed;
00515         }
00516       }
00517     }
00518   }
00519   
00520   if(state==found){
00521     filepos_t pos = -1;
00522     filepos_t fsize = -1;
00523     mtell(pos);
00524     msize(fsize);
00525     if(pos==fsize-1){ //last byte.
00526       if(verbosity_>2){
00527         cout << "[Matacq " << now() << "] Event found was at the end of the file. Moving "
00528           "stream position to beginning of this event."
00529              << "\n";
00530       }
00531       mseek(-(int)len*8-1, SEEK_CUR,
00532             "Moving to beginning of last matacq event");
00533     }
00534   }
00535   return (state==found);
00536 }
00537 
00538 
00539 bool
00540 MatacqProducer::getMatacqFile(uint32_t runNumber, uint32_t orbitId,
00541                               bool* fileChange){
00542   if(openedFileRunNumber_!=0
00543      && openedFileRunNumber_==runNumber){
00544     if(fileChange!=0) *fileChange = false;
00545     return misOpened();
00546   }
00547 
00548   if(fileNames_.size()==0) return 0;
00549 
00550   const string runNumberFormat = "%08d";
00551   string sRunNumber = str(boost::format(runNumberFormat) % runNumber);
00552   //cout << "Run number string: " << sRunNumber << "\n";
00553   bool found = false;
00554   string fname;
00555   for(unsigned i=0; i < fileNames_.size() && !found; ++i){
00556     fname = fileNames_[i];
00557     boost::algorithm::replace_all(fname, "%run_subdir%",
00558                                   runSubDir(runNumber));
00559     boost::algorithm::replace_all(fname, "%run_number%", sRunNumber);
00560 
00561     if(verbosity_>0) cout << "[Matacq " << now() << "] "
00562                           << "Looking for a file with path "
00563                           << fname << "\n";
00564     
00565     if(mcheck(fname)){
00566       LogInfo("Matacq") << "Uses matacq data file: '" << fname << "'\n";
00567       found = true;
00568     }
00569   }
00570   if(!found){
00571     if(verbosity_>=0) cout << "[Matacq " << now() << "] no matacq file found "
00572                         "for run " << runNumber << "\n";
00573     eventSkipCounter_ = onErrorDisablingEvtCnt_;
00574     openedFileRunNumber_ = 0;
00575     if(fileChange!=0) *fileChange = false;
00576     return 0;
00577   }
00578   
00579   if(!mopen(fname)){
00580     LogWarning("Matacq") << "Failed to open file " << fname << "\n";
00581     eventSkipCounter_ = onErrorDisablingEvtCnt_;
00582     openedFileRunNumber_ = 0;
00583     if(fileChange!=0) *fileChange = false;
00584     return false;
00585   } else{
00586     openedFileRunNumber_ = runNumber;
00587     lastOrb_ = 0;
00588     posEstim_.init(this);
00589     if(fileChange!=0) *fileChange = true;
00590     return true;
00591   }
00592 }
00593  
00594 
00595 uint32_t MatacqProducer::getRunNumber(edm::Event& ev) const{
00596   return ev.run();
00597 }
00598 
00599 uint32_t MatacqProducer::getOrbitId(edm::Event& ev) const{
00600   //on CVS HEAD (June 4, 08), class Event has a method orbitNumber()
00601   //we could use here. The code would be shorten to:
00602   //return ev.orbitNumber();
00603   //we have to deal with what we have in current CMSSW releases:
00604   edm::Handle<FEDRawDataCollection> rawdata;
00605   ev.getByLabel(inputRawCollection_, rawdata);
00606   if(!(rawdata.isValid())){
00607     throw cms::Exception("NotFound")
00608       << "No FED raw data collection found. ECAL raw data are "
00609       "required to retrieve the orbit ID";
00610   }
00611   
00612   int orbit = 0;
00613   for(int id=601; id<=654; ++id){
00614     if(!FEDNumbering::inRange(id)) continue;
00615     const FEDRawData& data = rawdata->FEDData(id);
00616     const int orbitIdOffset64 = 3;
00617     if(data.size()>=8*(orbitIdOffset64+1)){//orbit id is in 4th 64-bit word
00618       const unsigned char* pOrbit = data.data() + orbitIdOffset64*8;
00619       int thisOrbit = pOrbit[0]
00620         | (pOrbit[1] <<8)
00621         | (pOrbit[2] <<16)
00622         | (pOrbit[3] <<24);
00623       if(orbit!=0 && thisOrbit!=0 && abs(orbit-thisOrbit)>orbitTolerance_){
00624         //throw cms::Exception("EventCorruption")
00625         //  << "Orbit ID inconsitency in DCC headers";
00626         LogWarning("EventCorruption")
00627           << "Orbit ID inconsitency in DCC headers";
00628         orbit = 0;
00629         break;
00630       }
00631       if(thisOrbit!=0) orbit = thisOrbit;
00632     }
00633   }
00634   
00635   if(orbit==0){
00636     //    throw cms::Exception("NotFound")
00637     //  << "Failed to retrieve orbit ID of event "<< ev.id();
00638     LogWarning("NotFound") << "Failed to retrieve orbit ID of event "
00639                                 << ev.id();
00640   }
00641   return orbit;
00642 }
00643  
00644 int MatacqProducer::getCalibTriggerType(edm::Event& ev) const{  
00645   edm::Handle<FEDRawDataCollection> rawdata;
00646   ev.getByLabel(inputRawCollection_, rawdata);
00647   if(!(rawdata.isValid())){
00648     throw cms::Exception("NotFound")
00649       << "No FED raw data collection found. ECAL raw data are "
00650       "required to retrieve the trigger type";
00651   }
00652   
00653   Majority<int> stat;
00654   for(int id=601; id<=654; ++id){
00655     if(!FEDNumbering::inRange(id)) continue;
00656     const FEDRawData& data = rawdata->FEDData(id);
00657     const int detailedTrigger32 = 5;
00658     if(data.size()>=4*(detailedTrigger32+1)){
00659       const unsigned char* pTType = data.data() + detailedTrigger32*4;
00660       int tType = pTType[1] & 0x7;
00661       stat.add(tType);
00662     }
00663   }
00664   double p;
00665   int tType = stat.result(&p);
00666   if(p<0){
00667     //throw cms::Exception("NotFound") << "No ECAL DCC data found\n";
00668     LogWarning("NotFound")  << "No ECAL DCC data found\n";
00669     tType = -1;
00670   }
00671   if(p<.8){
00672     //throw cms::Exception("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
00673     LogWarning("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
00674     tType = -1;
00675   }
00676   return tType;
00677 }
00678   
00679 void MatacqProducer::PosEstimator::init(MatacqProducer* mp){
00680   mp->mrewind();
00681 
00682   const size_t headerSize = 8*8;
00683   unsigned char data[headerSize];
00684   if(!mp->mread((char*)data, headerSize)){
00685     if(verbosity_) cout << "[Matacq " << now() << "] reached end of file!\n"; 
00686     firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
00687     return;
00688   } else{
00689     firstOrbit_ = MatacqRawEvent::getOrbitId(data, headerSize);
00690     eventLength_ = MatacqRawEvent::getDccLen(data, headerSize);
00691     if(verbosity_>1) cout << "[Matacq " << now() << "] First event orbit: " << firstOrbit_
00692                           << " event length: " << eventLength_
00693                           << "*8 byte\n";
00694   }
00695 
00696   mp->mrewind();
00697   
00698   if(eventLength_==0){    
00699     if(verbosity_) cout << "[Matacq " << now() << "] event length is null!" << endl; 
00700     return;
00701   }
00702 
00703   filepos_t s;
00704   mp->msize(s);
00705 
00706   //number of complete events:
00707   const unsigned nEvents = s/eventLength_/8;
00708 
00709   if(nEvents==0){
00710     if(verbosity_) cout << "[Matacq " << now() << "] File is empty!" << endl;
00711     orbitStepMean_ = 0;
00712     return;
00713   }
00714 
00715   if(verbosity_>1) cout << "[Matacq " << now() << "] File size: " << s
00716                         << " Number of events: " << nEvents << endl;
00717   
00718   //position of last complete events:
00719   off_t last = (nEvents-1)*(off_t)eventLength_*8;
00720   mp->mseek(last, SEEK_SET, "Moving to beginning of last complete "
00721             "matacq event");
00722   if(!mp->mread((char*) data, headerSize, "Reading matacq header", true)){
00723     LogWarning("Matacq") << "Fast matacq event retrieval failure. "
00724       "Falling back to safe retrieval mode.";
00725     orbitStepMean_ = 0;
00726   }
00727   
00728   int32_t lastOrb = MatacqRawEvent::getOrbitId(data, headerSize);
00729   int32_t lastLen = MatacqRawEvent::getDccLen(data, headerSize);
00730 
00731   if(verbosity_>1) cout << "[Matacq " << now() << "] Last event orbit: " << lastOrb
00732                         << " last event length: " << lastLen << endl;
00733   
00734   //some consistency check
00735   if(lastLen!=eventLength_){
00736     LogWarning("Matacq")
00737       //throw cms::Exception("Matacq")
00738       << "Fast matacq event retrieval failure: it looks like "
00739       "the matacq file contains events of different sizes.";
00740       //      " Falling back to safe retrieval mode.";
00741     invalid_ = false; //true;
00742     orbitStepMean_ = 112; //0;
00743     return;
00744   }
00745 
00746   orbitStepMean_ = (lastOrb - firstOrbit_)/nEvents;
00747   
00748   if(verbosity_>1) cout << "[Matacq " << now() << "] Orbit step mean: " << orbitStepMean_
00749                         << "\n";
00750 
00751   invalid_ = false;
00752 }
00753 
00754 int64_t MatacqProducer::PosEstimator::pos(int orb) const{
00755   if(orb<firstOrbit_) return -1;
00756   uint64_t r = orbitStepMean_!=0?
00757     (((uint64_t)(orb-firstOrbit_))/orbitStepMean_)*eventLength_*8
00758     :0;
00759   if(verbosity_>2) cout << "[Matacq " << now() << "] Estimated Position for orbit  " << orb
00760                         << ": " << r << endl;
00761   return r;
00762 }
00763 
00764 MatacqProducer::~MatacqProducer(){
00765   mclose();
00766   timeval t;
00767   gettimeofday(&t, 0);
00768   if(logTiming_ && startTime_.tv_sec!=0){
00769     //not using logger, to allow timing with different logging options
00770     cout << "[Matacq " << now() << "] Time elapsed between first event and "
00771       "destruction of MatacqProducer: "
00772          << ((t.tv_sec-startTime_.tv_sec)*1.
00773              + (t.tv_usec-startTime_.tv_usec)*1.e-6) << "s\n";
00774   }
00775 }
00776 
00777 void MatacqProducer::loadOrbitOffset(){
00778   ifstream f(orbitOffsetFile_.c_str());
00779   if(f.bad()){
00780     throw cms::Exception("Matacq")
00781       << "Failed to open orbit ID correction file '"
00782       << orbitOffsetFile_ << "'\n";
00783   }
00784 
00785   cout << "[Matacq " << now() << "] "
00786        << "Offset to substract to Matacq events Orbit ID: \n"
00787        << "#Run Number\t Offset\n";
00788 
00789   int iline = 0;
00790   string s;
00791   stringstream buf;
00792   while(f.eof()){
00793     getline(f, s);
00794     ++iline;
00795     if(s[0]=='#'){//comment
00796       //skip line:
00797       f.ignore(numeric_limits<streamsize>::max(), '\n');
00798       continue;
00799     }
00800     buf.str("");
00801     buf << s;
00802     int run;
00803     int orbit;
00804     buf >> run;
00805     buf >> orbit;
00806     if(buf.bad()){
00807       throw cms::Exception("Matacq")
00808         << "Syntax error in Orbit offset file '"
00809         << orbitOffsetFile_ << "'";
00810     }
00811     cout << run << "\t" << orbit << "\n";
00812     orbitOffset_.insert(pair<int, int>(run, orbit));
00813   }
00814 }
00815 
00816 #ifdef USE_STORAGE_MANAGER
00817 bool MatacqProducer::mseek(filepos_t offset, int whence, const char* mess){
00818   if(0==inFile_.get()) return false;
00819   try{
00820     Storage::Relative wh;
00821     if(whence==SEEK_SET) wh = Storage::SET;
00822     else if(whence==SEEK_CUR) wh = Storage::CURRENT;
00823     else if(whence==SEEK_END) wh = Storage::END;
00824     else throw cms::Exception("Bug") << "Bug found in "
00825                                      << __FILE__ << ": "<< __LINE__ << "\n";
00826                    
00827     inFile_->position(offset, wh);
00828   } catch(cms::Exception& e){
00829     if(verbosity_){
00830       cout << "[Matacq " << now() << "] ";
00831       if(mess) cout << mess << ". ";
00832       cout << "Random access error on input matacq file. ";
00833       if(whence==SEEK_SET) cout << "Failed to seek absolute position " << offset;
00834       else if(whence==SEEK_CUR) cout << "Failed to move " << offset << " bytes forward";
00835       else if(whence==SEEK_END) cout << "Failed to seek position at " << offset << " bytes before end of file";
00836         cout << ". Reopening file. " << e.what() << "\n";
00837       mopen(inFileName_);
00838       return false;
00839     }
00840   }
00841   return true;
00842 }
00843 
00844 bool MatacqProducer::mtell(filepos_t& pos){
00845   if(0==inFile_.get()) return false;
00846   pos = inFile_->position();
00847   return true;
00848 }
00849 
00850 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
00851   if(0==inFile_.get()) return false;
00852   
00853   filepos_t pos = -1;
00854   if(!mtell(pos)) return false;
00855 
00856   bool rc = false;
00857   try{
00858     rc =  (n==inFile_->xread(buf, n));
00859   } catch(cms::Exception& e){
00860     if(verbosity_){
00861       cout << "[Matacq " << now() << "] ";
00862       if(mess) cout << mess << ". ";
00863       cout << "Read failure from input matacq file: "
00864            << e.what() << "\n";
00865     }
00866     //recovering from error:
00867     mopen(inFileName_);
00868     mseek(pos);
00869     return false;
00870   }
00871   if(peek){//asked to restore original file position
00872     mseek(pos);
00873   }
00874   return rc;
00875 }
00876 
00877 bool MatacqProducer::msize(filepos_t& s){
00878   if(inFile_.get()==0) return false;
00879   s = inFile_.get()->size();
00880   return true;
00881 }
00882 
00883 bool MatacqProducer::mrewind(){
00884   Storage* file = inFile_.get();
00885   if(file==0) return false;
00886   try{
00887     file->rewind();
00888   } catch(cms::Exception e){
00889     if(verbosity_) cout << "Exception cautgh while rewinding file "
00890                         << inFileName_ << ": " << e.what() << ". "
00891                         << "File will be reopened.";
00892     return mopen(inFileName_);
00893   }
00894   return true;
00895 }
00896 
00897 bool MatacqProducer::mcheck(const std::string& name){
00898   return StorageFactory::get()->check(name);
00899 }
00900 
00901 bool MatacqProducer::mopen(const std::string& name){
00902   //close already opened file if any:
00903   mclose();
00904   
00905   try{
00906     inFile_
00907       = auto_ptr<Storage>(StorageFactory::get()->open(name,
00908                                                       IOFlags::OpenRead));
00909     inFileName_ = name;
00910   } catch(cms::Exception& e){
00911     LogWarning("Matacq") << e.what();
00912     inFile_.reset();
00913     inFileName_ = "";
00914     return false;
00915   }
00916   return true;
00917 }
00918 
00919 void MatacqProducer::mclose(){
00920   if(inFile_.get()!=0){
00921     inFile_->close();
00922     inFile_.reset();
00923   }
00924 }
00925 
00926 bool MatacqProducer::misOpened(){
00927   return inFile_.get()!=0;
00928 }
00929 
00930 bool MatacqProducer::meof(){
00931   if(inFile_.get()==0) return true;
00932   return inFile_->eof();
00933 }
00934 
00935 #else //USE_STORAGE_MANAGER not defined
00936 bool MatacqProducer::mseek(off_t offset, int whence, const char* mess){
00937   if(0==inFile_) return false;
00938   const int rc = fseeko(inFile_, offset, whence);
00939   if(rc!=0 && verbosity_){
00940     cout << "[Matacq " << now() << "] ";
00941     if(mess) cout << mess << ". ";
00942     cout << "Random access error on input matacq file. "
00943       "Rewind file.\n";
00944     mrewind();
00945   }
00946   return rc==0;
00947 }
00948 
00949 bool MatacqProducer::mtell(filepos_t& pos){
00950   if(0==inFile_) return false;
00951   pos = ftello(inFile_);
00952   return pos != -1;
00953     
00954 }
00955 
00956 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
00957   if(0==inFile_) return false;
00958   off_t pos = ftello(inFile_);
00959   bool rc = (pos!=-1) && (1==fread(buf, n, 1, inFile_));
00960   if(!rc){
00961     if(verbosity_){
00962       cout << "[Matacq " << now() << "] ";
00963       if(mess) cout << mess << ". ";
00964       cout << "Read failure from input matacq file.\n";
00965     }
00966     clearerr(inFile_);
00967   }
00968   if(peek || !rc){//need to restore file position
00969     if(0!=fseeko(inFile_, pos, SEEK_SET)){
00970       if(verbosity_){
00971         cout << "[Matacq " << now() << "] ";
00972         if(mess) cout << mess << ". ";
00973         cout << "Failed to restore file position of "
00974           "before read error. Rewind file.\n";
00975       }
00976       //rewind(inFile_.get());
00977       mrewind();
00978       lastOrb_ = 0;
00979     }
00980   }
00981   return rc;
00982 }
00983 
00984 bool MatacqProducer::msize(filepos_t& s){
00985   if(0==inFile_) return false;
00986   struct stat buf;
00987   if(0!=fstat(fileno(inFile_), &buf)){
00988     s = 0;
00989     return false;
00990   } else{
00991     s = buf.st_size;
00992     return true;
00993   }
00994 }
00995 
00996 bool MatacqProducer::mrewind(){
00997   if(0==inFile_) return false;
00998   clearerr(inFile_);
00999   return fseeko(inFile_, 0, SEEK_SET)!=0; 
01000 }
01001 
01002 bool MatacqProducer::mcheck(const std::string& name){
01003   struct stat dummy;
01004   return 0==stat(name.c_str(), &dummy);
01005 //   if(stat(name.c_str(), &dummy)==0){
01006 //     return true;
01007 //   } else{
01008 //     cout << "[Matacq " << now() << "] Failed to stat file '"
01009 //       << name.c_str() << "'. " 
01010 //       << "Error " << errno << ": " << strerror(errno) << "\n";
01011 //     return false;
01012 //   }
01013 }
01014 
01015 bool MatacqProducer::mopen(const std::string& name){
01016   if(inFile_!=0) mclose();
01017   inFile_ = fopen(name.c_str(), "r");
01018   if(inFile_!=0){
01019     inFileName_ = name;
01020     return true;
01021   } else{
01022     inFileName_ = "";
01023     return false;
01024   }
01025 }
01026 
01027 void MatacqProducer::mclose(){
01028   if(inFile_!=0) fclose(inFile_);
01029   inFile_ = 0;
01030 }
01031 
01032 bool MatacqProducer::misOpened(){
01033   return inFile_!=0;
01034 }
01035 
01036 bool MatacqProducer::meof(){
01037   if(0==inFile_) return true;
01038   return feof(inFile_)==0;
01039 }
01040 
01041 #endif //USE_STORAGE_MANAGER defined
01042 
01043 std::string MatacqProducer::runSubDir(uint32_t runNumber){
01044   int millions = runNumber / (1000*1000);
01045   int thousands = (runNumber-millions*1000*1000) / 1000;
01046   int units = runNumber-millions*1000*1000 - thousands*1000;
01047   return str(boost::format("%03d/%03d/%03d") % millions % thousands % units);
01048 }
01049 
01050 void MatacqProducer::newRun(int prevRun, int newRun){
01051     runNumber_ = newRun;
01052     eventSkipCounter_ = 0;
01053     logFile_ << "[" << now() << "] Event count for run "
01054              << runNumber_ << ": "
01055              << "total: " << stats_.nEvents << ", "
01056              << "Laser event with Matacq data: "
01057              << stats_.nLaserEventsWithMatacq << ", "
01058              << "Non laser event (according to DCC header) with Matacq data: "
01059              << stats_.nNonLaserEventsWithMatacq << "\n" << flush;
01060 
01061     stats_.nEvents = 0;
01062     stats_.nLaserEventsWithMatacq = 0;
01063     stats_.nNonLaserEventsWithMatacq = 0;
01064 
01065     
01066 }