CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/EcalRawToDigi/plugins/MatacqProducer.cc

Go to the documentation of this file.
00001 #include "EventFilter/EcalRawToDigi/interface/MatacqProducer.h"
00002 #include "EventFilter/EcalRawToDigi/src/Majority.h"
00003 #include "FWCore/Framework/interface/Event.h"
00004 #include "FWCore/Framework/interface/EventSetup.h"
00005 #include "Geometry/Records/interface/IdealGeometryRecord.h"
00006 #include "FWCore/Framework/interface/ESHandle.h"
00007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00008 #include "DataFormats/FEDRawData/interface/FEDRawData.h"
00009 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00010 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00011 #include "DataFormats/EcalDigi/interface/EcalMatacqDigi.h"
00012 #include "DataFormats/EcalDigi/interface/EcalDigiCollections.h"
00013 #include <boost/algorithm/string.hpp>
00014 #include <boost/format.hpp>
00015 
00016 #include <memory>
00017 
00018 #include <stdio.h>
00019 
00020 #include <fstream>
00021 #include <iomanip>
00022 
00023 #include "DataFormats/EcalDigi/interface/EcalMatacqDigi.h"
00024 
00025 #include <sys/types.h>
00026 #include <unistd.h>
00027 #include <signal.h>
00028 #include <sys/stat.h>
00029 
00030 using namespace std;
00031 using namespace boost;
00032 using namespace edm;
00033 
00034 // #undef LogInfo
00035 // #define LogInfo(a) cout << "INFO " << a << ": "
00036 // #undef LogWarning
00037 // #define LogWarning(a) cout << "WARN " << a << ": "
00038 // #undef LogDebug
00039 // #define LogDebug(a) cout << "DBG " << a << ": "
00040 
00041 
00042 //verbose mode for matacq event retrieval debugging:
00043 //static const bool searchDbg = false;
00044 
00045 //laser freq is 1 every 112 orbit => >80 orbit
00046 int MatacqProducer::orbitTolerance_ = 80; 
00047 
00048 MatacqProducer::stats_t MatacqProducer::stats_init = {0,0,0};
00049 
00050 static std::string now(){
00051   struct timeval t;
00052   gettimeofday(&t, 0);
00053  
00054   char buf[256];
00055   strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
00056   buf[sizeof(buf)-1] = 0;
00057 
00058   stringstream buf2;
00059   buf2 << buf << " " << ((t.tv_usec+500)/1000)  << " ms";
00060 
00061   return buf2.str();
00062 }
00063 
00064 
00065 MatacqProducer::MatacqProducer(const edm::ParameterSet& params):
00066   fileNames_(params.getParameter<std::vector<std::string> >("fileNames")),
00067   digiInstanceName_(params.getParameter<string>("digiInstanceName")),
00068   rawInstanceName_(params.getParameter<string>("rawInstanceName")),
00069   timing_(params.getUntrackedParameter<bool>("timing", false)),
00070   disabled_(params.getParameter<bool>("disabled")),
00071   verbosity_(params.getUntrackedParameter<int>("verbosity", 0)),
00072   produceDigis_(params.getParameter<bool>("produceDigis")),
00073   produceRaw_(params.getParameter<bool>("produceRaw")),
00074   inputRawCollection_(params.getParameter<InputTag>("inputRawCollection")),
00075   mergeRaw_(params.getParameter<bool>("mergeRaw")),
00076   ignoreTriggerType_(params.getParameter<bool>("ignoreTriggerType")),
00077   matacq_(0, 0),
00078   inFile_(0),
00079   data_(bufferSize),
00080   openedFileRunNumber_(0),
00081   lastOrb_(0),
00082   fastRetrievalThresh_(0),
00083   orbitOffsetFile_(params.getUntrackedParameter<std::string>("orbitOffsetFile",
00084                                                              "")),
00085   inFileName_(""),
00086   stats_(stats_init),
00087   logFileName_(params.getUntrackedParameter<std::string>("logFileName",
00088                                                          "matacqProducer.log")),
00089   eventSkipCounter_(0),
00090   onErrorDisablingEvtCnt_(params.getParameter<int>("onErrorDisablingEvtCnt")),
00091   timeLogFile_(params.getUntrackedParameter<std::string>("timeLogFile", "")),
00092   runNumber_(0)
00093 {
00094   if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer ctor"  << endl;
00095   
00096   gettimeofday(&timer_, 0);
00097 
00098   if(timeLogFile_.size()>0){
00099     timeLog_.open(timeLogFile_.c_str());
00100     if(timeLog_.fail()){
00101       cout << "[LaserSorter " << now() << "] "
00102            << "Failed to open file " << timeLogFile_ << " to log timing.\n";
00103       logTiming_ = false;
00104     } else{
00105       logTiming_ = true;
00106     }
00107   }
00108   
00109   posEstim_.verbosity(verbosity_);
00110 
00111   logFile_.open(logFileName_.c_str(), ios::app | ios::out);
00112   
00113   if(logFile_.bad()){
00114     throw cms::Exception("FileOpen") << "Failed to open file "
00115                                      << logFileName_ << " for logging.\n";
00116   }
00117 
00118   if(produceDigis_){
00119     if(verbosity_>0) cout << "[Matacq " << now() << "] registering new "
00120                        "EcalMatacqDigiCollection product with instance name '"
00121                           << digiInstanceName_ << "'\n";
00122     produces<EcalMatacqDigiCollection>(digiInstanceName_);
00123   }
00124   
00125   if(produceRaw_){
00126     if(verbosity_>0) cout << "[Matacq " << now() << "] registering new FEDRawDataCollection "
00127                        "product with instance name '"
00128                           << rawInstanceName_ << "'\n";
00129     produces<FEDRawDataCollection>(rawInstanceName_);
00130   }
00131   
00132   startTime_.tv_sec = startTime_.tv_usec = 0;
00133   if(orbitOffsetFile_.size()>0){
00134     doOrbitOffset_ = true;
00135     loadOrbitOffset();
00136   } else{
00137     doOrbitOffset_ = false;
00138   }
00139   if(verbosity_>=4) cout << "[Matacq " << now() << "] exiting MatacqProducer ctor"  << endl;
00140 }
00141 
00142 
00143 void
00144 MatacqProducer::produce(edm::Event& event, const edm::EventSetup& eventSetup){
00145   if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer::produce"  << endl;
00146   if(logTiming_){
00147     timeval t;
00148     gettimeofday(&t, 0);
00149 
00150     timeLog_ << t.tv_sec << "."
00151              << setfill('0') << setw(3) << (t.tv_usec+500)/1000 << setfill(' ')<< "\t"
00152              << (t.tv_usec - timer_.tv_usec)*1. 
00153       + (t.tv_sec - timer_.tv_sec)*1.e6 << "\t";
00154     timer_ = t;
00155   } 
00156  
00157   if(startTime_.tv_sec==0) gettimeofday(&startTime_, 0);
00158   ++stats_.nEvents;  
00159   if(disabled_) return;
00160   const uint32_t runNumber = getRunNumber(event);
00161   if(runNumber!=runNumber_){
00162     newRun(runNumber_, runNumber);
00163   }
00164   addMatacqData(event);
00165 
00166   if(logTiming_){
00167     timeval t;
00168       gettimeofday(&t, 0);
00169       timeLog_ << (t.tv_usec - timer_.tv_usec)*1. 
00170         + (t.tv_sec - timer_.tv_sec)*1.e6 << "\n";
00171       timer_ = t;
00172   }
00173 }
00174 
00175 void
00176 MatacqProducer::addMatacqData(edm::Event& event){
00177   edm::Handle<FEDRawDataCollection> sourceColl;
00178   if(inputRawCollection_.label().size() == 0
00179      && inputRawCollection_.instance().size() == 0){
00180     event.getByType(sourceColl);
00181   } else{
00182     event.getByLabel(inputRawCollection_, sourceColl);
00183   }
00184   
00185   std::auto_ptr<FEDRawDataCollection> rawColl;
00186   if(produceRaw_){
00187     if(mergeRaw_){
00188       rawColl = auto_ptr<FEDRawDataCollection>(new FEDRawDataCollection(*sourceColl));
00189     } else{
00190       rawColl = auto_ptr<FEDRawDataCollection>(new FEDRawDataCollection());
00191     }
00192   }
00193   
00194   std::auto_ptr<EcalMatacqDigiCollection>
00195     digiColl(new EcalMatacqDigiCollection());
00196   
00197   if(eventSkipCounter_==0){
00198     if(sourceColl->FEDData(matacqFedId_).size()>4 && !produceRaw_){
00199       //input raw data collection already contains matacqData
00200       formatter_.interpretRawData(sourceColl->FEDData(matacqFedId_),
00201                                   *digiColl);             
00202     } else{
00203       bool isLaserEvent = (getCalibTriggerType(event) == laserType);
00204 
00205 
00206       //      cout << "---> " << (ignoreTriggerType_?"yes":"no") << " " << getCalibTriggerType(event) << endl;
00207       
00208       if(isLaserEvent || ignoreTriggerType_){
00209         
00210         const uint32_t runNumber = getRunNumber(event);
00211         const uint32_t orbitId   = getOrbitId(event);
00212       
00213         LogInfo("Matacq") << "Run " << runNumber << "\t Orbit " << orbitId << "\n";
00214       
00215         bool fileChange;
00216         uint32_t offset = 0;
00217         if(doOrbitOffset_){
00218           map<uint32_t,uint32_t>::iterator it = orbitOffset_.find(runNumber);
00219           if(it == orbitOffset_.end()){
00220             LogWarning("Matacq") << "Orbit offset not found for run "
00221                                  << runNumber
00222                                  << ". No orbit correction will be applied.";
00223           } else{
00224             offset = it->second;
00225           }
00226         }    
00227       
00228         if(getMatacqFile(runNumber, orbitId, &fileChange)){
00229           //matacq file retrieval succeeded
00230           LogInfo("Matacq") << "Matacq data file found for "
00231                             << "run " << runNumber << " orbit " << orbitId;
00232           if(getMatacqEvent(runNumber, orbitId, fileChange)){
00233             if(produceDigis_){
00234               formatter_.interpretRawData(matacq_, *digiColl);
00235             }
00236             if(produceRaw_){
00237               uint32_t dataLen64 = matacq_.getParsedLen();
00238               if(dataLen64 > bufferSize*8 || matacq_.getDccLen()!= dataLen64){
00239                 LogWarning("Matacq") << " Error in Matacq event fragment length! "
00240                                      << "DCC len: " << matacq_.getDccLen()
00241                                      << "*8 Bytes, Parsed len: "
00242                                      << matacq_.getParsedLen() << "*8 Bytes.  "
00243                                      << "Matacq data will not be included for this event.\n";
00244               } else{
00245                 rawColl->FEDData(matacqFedId_).resize(dataLen64*8);
00246                 copy(data_.begin(), data_.begin() + dataLen64*8,
00247                      rawColl->FEDData(matacqFedId_).data());
00248               }
00249             }
00250             LogInfo("Matacq") << "Associating matacq data with orbit id "
00251                               << matacq_.getOrbitId()
00252                               << " to dcc event with orbit id "
00253                               << orbitId << std::endl;
00254             if(isLaserEvent){
00255               ++stats_.nLaserEventsWithMatacq;
00256             } else{
00257               ++stats_.nNonLaserEventsWithMatacq;
00258             }
00259           } else{
00260             if(isLaserEvent){
00261               LogWarning("Matacq") << "No matacq data found for laser event "
00262                                    << "of run " << runNumber << " orbit "
00263                                    << orbitId;
00264             }
00265           }
00266         } else{
00267           LogWarning("Matacq") << "No matacq file found for event "
00268                                << event.id();
00269         }
00270       }
00271     }
00272     if(eventSkipCounter_>0){ //error occured for this events
00273       //                       and some events will be skipped following
00274       //                       to this error.
00275       LogInfo("Matacq") << " [" << now() << "] "
00276                         << eventSkipCounter_
00277                         << " next events will be skipped, following to an "
00278                         << "error on the last processed event, "
00279                         << "which is expected to be persistant.";
00280     }
00281   } else{
00282     --eventSkipCounter_;
00283   }
00284   
00285   if(produceRaw_){
00286     if(verbosity_>1) cout << "[Matacq " << now() << "] "
00287                           << "Adding FEDRawDataCollection collection "
00288                           << " to event.\n";
00289     event.put(rawColl, rawInstanceName_);
00290   }
00291 
00292   if(produceDigis_){
00293     if(verbosity_>1) cout << "[Matacq " << now() << "] "
00294                           << "Adding EcalMatacqDigiCollection collection "
00295                           << " to event.\n";
00296     event.put(digiColl, digiInstanceName_);
00297   }
00298 }
00299 
00300 // #if 0
00301 // bool
00302 // MatacqProducer::getMatacqEvent(std::ifstream& f,
00303 //                             uint32_t runNumber,
00304 //                             uint32_t orbitId,
00305 //                             bool doWrap,
00306 //                             std::streamoff maxPos){
00307 //   bool found = false;
00308 //   streampos startPos = f.tellg();
00309 
00310 //   while(!f.eof()
00311 //      && !found
00312 //      && (maxPos<0 || f.tellg()<=maxPos)){
00313 //     const streamsize headerSize = 8*8;
00314 //     f.read((char*)&data_[0], headerSize);
00315 //     if(f.eof()) break;
00316 //     int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00317 //     uint32_t len = MatacqRawEvent::getDccLen(&data_[0], headerSize);
00318 //     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
00319 //     //     cout << "Matacq: orbit = " << orb
00320 //     //        << " len = " << len
00321 //     //        << " run = " << run << endl;
00322 //     if((abs(orb-(int32_t)orbitId) < orbitTolerance_)
00323 //        && (runNumber==0 || runNumber==run)){
00324 //       found = true;
00325 //       //reads the rest of the event:
00326 //       if(data_.size() < len*8){
00327 //      throw cms::Exception("Matacq") << "Buffer overflow";
00328 //       }
00329 //       f.read((char*)&data_[0]+headerSize, len*8-headerSize);
00330 //       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
00331 //     } else{
00332 //       //moves to next event:
00333 //       f.seekg(len*8 - headerSize, ios::cur);
00334 //     }
00335 //   }
00336   
00337 //   f.clear(); //clears eof error to allow seekg  
00338 //   if(doWrap && !found){
00339 //     f.seekg(0, ios::beg);
00340 //     found =  getMatacqEvent(f, runNumber, orbitId, false, startPos);
00341 //   }
00342 //   return found;
00343 // }
00344 //#endif
00345 
00346 bool
00347 MatacqProducer::getMatacqEvent(uint32_t runNumber,
00348                                int32_t orbitId,
00349                                bool fileChange){
00350   filepos_t startPos;
00351   if(!mtell(startPos)) return false;
00352   
00353   int32_t startOrb = -1;
00354   const size_t headerSize = 8*8;
00355   if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00356     startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00357     if(startOrb<0) startOrb = 0;
00358   } else{
00359     if(verbosity_>2){
00360       cout << "[Matacq " << now() << "] Failed to read matacq header. Moved to start of "
00361         " the file.\n";
00362     }
00363     mrewind();
00364     if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00365       startPos = 0;
00366       startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00367     } else{
00368       if(verbosity_>2) cout << "[Matacq " << now() << "] Looks like matacq file is empty"
00369                             << "\n";
00370       return false;
00371     }
00372   }
00373   
00374   if(verbosity_>2) cout << "[Matacq " << now() << "] Last read orbit: " << lastOrb_
00375                         << " looking for orbit " << orbitId
00376                         << ". Current file position: " << startPos
00377                         << " Orbit at current position: " << startOrb << "\n";
00378   
00379   //  f.clear();
00380   bool didCoarseMove = false;
00381 
00382   //FIXME: case where posEtim_.invalid() is false
00383   if(!posEstim_.invalid()
00384      && (abs(lastOrb_-orbitId) > fastRetrievalThresh_)){
00385     filepos_t pos = posEstim_.pos(orbitId);
00386 
00387     //    struct stat st;
00388     filepos_t fsize;
00389     //    if(0==stat(inFileName_.c_str(), &st)){
00390     if(msize(fsize)){
00391       //      const int64_t fsize = st.st_size;
00392       if(0!=posEstim_.eventLength() && pos > fsize){
00393         //estimated position is beyong end of file
00394         //-> move to beginning of last event:
00395         int64_t evtSize = posEstim_.eventLength()*sizeof(uint64_t);
00396         pos = ((int64_t)fsize/evtSize-1)*evtSize;
00397         if(verbosity_>2){
00398           cout << "[Matacq " << now() << "] Estimated position was beyond end of file. "
00399             "Changed to " << pos << "\n";
00400         }
00401       }
00402     } else{
00403       LogWarning("Matacq") << "Failed to access file " << inFileName_ << ".";
00404     }
00405     if(pos>=0){
00406       if(verbosity_>2) cout << "[Matacq " << now() << "] jumping to estimated position "
00407                             << pos << "\n";
00408       mseek(pos, SEEK_SET, "Jumping to estimated event position");
00409       if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
00410         didCoarseMove = true;
00411       } else{
00412         //estimated position might have been beyond the end of the file,
00413         //try, with original position:
00414         didCoarseMove = false;
00415         if(!mread((char*)&data_[0], headerSize, "Reading event header", true)){
00416           return false;
00417         }
00418       }
00419     } else{
00420       if(verbosity_) cout << "[Matacq " << now() << "] Event orbit outside of orbit range "
00421                        "of matacq data file events\n";
00422       return false;
00423     }
00424   }
00425   
00426   int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00427 
00428   if(didCoarseMove){
00429     //autoadjustement of threshold for coarse move:
00430     if(abs(orb-orbitId) > fastRetrievalThresh_){
00431       if(verbosity_>2) cout << "[Matacq " << now() << "] Fast retrieval threshold increased from "
00432                             << fastRetrievalThresh_;
00433       fastRetrievalThresh_ = 2*abs(orb-orbitId);
00434       if(verbosity_>2) cout << " to " << fastRetrievalThresh_ << "\n";
00435     }
00436 
00437     //if coarse move did not improve situation, rolls back:
00438     if(startOrb > 0
00439        && (abs(orb-orbitId) > abs(startOrb-orbitId))){
00440       if(verbosity_>2) cout << "[Matacq " << now() << "] Estimation (-> orbit " << orb << ") "
00441                          "was worst than original position (-> orbit "
00442                             << startOrb
00443                             << "). Restoring position (" << startPos << ").\n";
00444       mseek(startPos, SEEK_SET);
00445       mread((char*)&data_[0], headerSize, "Reading event header", true);
00446       orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00447     }
00448   }
00449   
00450   bool searchBackward = (orb>orbitId)?true:false;
00451   //BEWARE: len must be signed, because we are using latter in the code (-len)
00452   //expression
00453   int len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
00454 
00455   if(len==0){
00456     cout << "[Matacq " << now() << "] read DCC length is null! Cancels matacq event search "
00457          << " and move matacq file pointer to beginning of the file. "
00458          << "(" << __FILE__ << ":" << __LINE__ << ")."
00459          << "\n";
00460     //rewind(f);
00461     mrewind();
00462     return false;
00463   }
00464   
00465   enum state_t { searching, found, failed } state = searching;
00466   
00467   while(state == searching){
00468     orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
00469     len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
00470     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
00471     if(verbosity_>3){
00472       filepos_t pos;
00473       mtell(pos);
00474       cout << "[Matacq " << now() << "] Header read at file position "
00475            << pos
00476            << ":  orbit = " << orb
00477            << " len = " << len << "x8 Byte"
00478            << " run = " << run << "\n";
00479     }
00480     if((abs(orb-orbitId) < orbitTolerance_)
00481        && (runNumber==0 || runNumber==run)){
00482       state = found;
00483       lastOrb_ = orb;
00484       //reads the rest of the event:
00485       if((int)data_.size() < len*8){
00486         throw cms::Exception("Matacq") << "Buffer overflow";
00487       }
00488       if(verbosity_>2) cout << "[Matacq " << now() << "] Event found. Reading "
00489                          " matacq event." << "\n";
00490       if(!mread((char*)&data_[0], len*8, "Reading matacq event")){
00491         if(verbosity_>2) cout << "[Matacq " << now() << "] Failed to read matacq event."
00492                               << "\n";
00493         state = failed;
00494       }
00495       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
00496     } else {
00497       if((searchBackward && (orb < orbitId))
00498          || (!searchBackward && (orb > orbitId))){ //search ended
00499         lastOrb_ = orb;      
00500         state = failed;
00501         if(verbosity_>2) cout << "[Matacq " << now()
00502                             << "] No matacq data found for run " << run
00503                             << ", orbit ID " << orbitId << "." << "\n";
00504       } else{
00505         off_t offset = (searchBackward?-len:len)*8;
00506         lastOrb_ = orb; 
00507         if(verbosity_>3){
00508           cout << "[Matacq " << now() << "] In matacq file, moving "
00509                << abs(offset) << " byte " << (offset>0?"forward":"backward")
00510                << ".\n";
00511         }       
00512 
00513         if(mseek(offset, SEEK_CUR,
00514                  (searchBackward?"Moving to previous event":
00515                   "Moving to next event"))
00516            && mread((char*)&data_[0], headerSize, "Reading event header",
00517                     true)){
00518         } else{
00519           if(!searchBackward) mseek(-len*8, SEEK_CUR,
00520                                     "Moving to start of last complete event");
00521           state = failed;
00522         }
00523       }
00524     }
00525   }
00526   
00527   if(state==found){
00528     filepos_t pos;
00529     filepos_t fsize;
00530     mtell(pos);
00531     msize(fsize);
00532     if(pos==fsize-1){ //last byte.
00533       if(verbosity_>2){
00534         cout << "[Matacq " << now() << "] Event found was at the end of the file. Moving "
00535           "stream position to beginning of this event."
00536              << "\n";
00537       }
00538       mseek(-(int)len*8-1, SEEK_CUR,
00539             "Moving to beginning of last matacq event");
00540     }
00541   }
00542   return (state==found);
00543 }
00544 
00545 
00546 bool
00547 MatacqProducer::getMatacqFile(uint32_t runNumber, uint32_t orbitId,
00548                               bool* fileChange){
00549   if(openedFileRunNumber_!=0
00550      && openedFileRunNumber_==runNumber){
00551     if(fileChange!=0) *fileChange = false;
00552     return misOpened();
00553   }
00554 
00555   if(fileNames_.size()==0) return 0;
00556 
00557   const string runNumberFormat = "%08d";
00558   string sRunNumber = str(boost::format(runNumberFormat) % runNumber);
00559   //cout << "Run number string: " << sRunNumber << "\n";
00560   bool found = false;
00561   string fname;
00562   for(unsigned i=0; i < fileNames_.size() && !found; ++i){
00563     fname = fileNames_[i];
00564     boost::algorithm::replace_all(fname, "%run_subdir%",
00565                                   runSubDir(runNumber));
00566     boost::algorithm::replace_all(fname, "%run_number%", sRunNumber);
00567 
00568     if(verbosity_>0) cout << "[Matacq " << now() << "] "
00569                           << "Looking for a file with path "
00570                           << fname << "\n";
00571     
00572     if(mcheck(fname)){
00573       LogInfo("Matacq") << "Uses matacq data file: '" << fname << "'\n";
00574       found = true;
00575     }
00576   }
00577   if(!found){
00578     if(verbosity_>=0) cout << "[Matacq " << now() << "] no matacq file found "
00579                         "for run " << runNumber << "\n";
00580     eventSkipCounter_ = onErrorDisablingEvtCnt_;
00581     openedFileRunNumber_ = 0;
00582     if(fileChange!=0) *fileChange = false;
00583     return 0;
00584   }
00585   
00586   if(!mopen(fname)){
00587     LogWarning("Matacq") << "Failed to open file " << fname << "\n";
00588     eventSkipCounter_ = onErrorDisablingEvtCnt_;
00589     openedFileRunNumber_ = 0;
00590     if(fileChange!=0) *fileChange = false;
00591     return false;
00592   } else{
00593     openedFileRunNumber_ = runNumber;
00594     lastOrb_ = 0;
00595     posEstim_.init(this);
00596     if(fileChange!=0) *fileChange = true;
00597     return true;
00598   }
00599 }
00600  
00601 
00602 uint32_t MatacqProducer::getRunNumber(edm::Event& ev) const{
00603   return ev.run();
00604 }
00605 
00606 uint32_t MatacqProducer::getOrbitId(edm::Event& ev) const{
00607   //on CVS HEAD (June 4, 08), class Event has a method orbitNumber()
00608   //we could use here. The code would be shorten to:
00609   //return ev.orbitNumber();
00610   //we have to deal with what we have in current CMSSW releases:
00611   edm::Handle<FEDRawDataCollection> rawdata;
00612   if(!(ev.getByType(rawdata) && rawdata.isValid())){
00613     throw cms::Exception("NotFound")
00614       << "No FED raw data collection found. ECAL raw data are "
00615       "required to retrieve the orbit ID";
00616   }
00617   
00618   int orbit = 0;
00619   for(int id=601; id<=654; ++id){
00620     if(!FEDNumbering::inRange(id)) continue;
00621     const FEDRawData& data = rawdata->FEDData(id);
00622     const int orbitIdOffset64 = 3;
00623     if(data.size()>=8*(orbitIdOffset64+1)){//orbit id is in 4th 64-bit word
00624       const unsigned char* pOrbit = data.data() + orbitIdOffset64*8;
00625       int thisOrbit = pOrbit[0]
00626         | (pOrbit[1] <<8)
00627         | (pOrbit[2] <<16)
00628         | (pOrbit[3] <<24);
00629       if(orbit!=0 && thisOrbit!=0 && abs(orbit-thisOrbit)>orbitTolerance_){
00630         //throw cms::Exception("EventCorruption")
00631         //  << "Orbit ID inconsitency in DCC headers";
00632         LogWarning("EventCorruption")
00633           << "Orbit ID inconsitency in DCC headers";
00634         orbit = 0;
00635         break;
00636       }
00637       if(thisOrbit!=0) orbit = thisOrbit;
00638     }
00639   }
00640   
00641   if(orbit==0){
00642     //    throw cms::Exception("NotFound")
00643     //  << "Failed to retrieve orbit ID of event "<< ev.id();
00644     LogWarning("NotFound") << "Failed to retrieve orbit ID of event "
00645                                 << ev.id();
00646   }
00647   return orbit;
00648 }
00649  
00650 int MatacqProducer::getCalibTriggerType(edm::Event& ev) const{  
00651   edm::Handle<FEDRawDataCollection> rawdata;
00652   if(!(ev.getByType(rawdata) && rawdata.isValid())){
00653     throw cms::Exception("NotFound")
00654       << "No FED raw data collection found. ECAL raw data are "
00655       "required to retrieve the trigger type";
00656   }
00657   
00658   Majority<int> stat;
00659   for(int id=601; id<=654; ++id){
00660     if(!FEDNumbering::inRange(id)) continue;
00661     const FEDRawData& data = rawdata->FEDData(id);
00662     const int detailedTrigger32 = 5;
00663     if(data.size()>=4*(detailedTrigger32+1)){
00664       const unsigned char* pTType = data.data() + detailedTrigger32*4;
00665       int tType = pTType[1] & 0x7;
00666       stat.add(tType);
00667     }
00668   }
00669   double p;
00670   int tType = stat.result(&p);
00671   if(p<0){
00672     //throw cms::Exception("NotFound") << "No ECAL DCC data found\n";
00673     LogWarning("NotFound")  << "No ECAL DCC data found\n";
00674     tType = -1;
00675   }
00676   if(p<.8){
00677     //throw cms::Exception("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
00678     LogWarning("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
00679     tType = -1;
00680   }
00681   return tType;
00682 }
00683   
00684 void MatacqProducer::PosEstimator::init(MatacqProducer* mp){
00685   mp->mrewind();
00686 
00687   const size_t headerSize = 8*8;
00688   unsigned char data[headerSize];
00689   if(!mp->mread((char*)data, headerSize)){
00690     if(verbosity_) cout << "[Matacq " << now() << "] reached end of file!\n"; 
00691     firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
00692     return;
00693   } else{
00694     firstOrbit_ = MatacqRawEvent::getOrbitId(data, headerSize);
00695     eventLength_ = MatacqRawEvent::getDccLen(data, headerSize);
00696     if(verbosity_>1) cout << "[Matacq " << now() << "] First event orbit: " << firstOrbit_
00697                           << " event length: " << eventLength_
00698                           << "*8 byte\n";
00699   }
00700 
00701   mp->mrewind();
00702   
00703   if(eventLength_==0){    
00704     if(verbosity_) cout << "[Matacq " << now() << "] event length is null!" << endl; 
00705     return;
00706   }
00707 
00708   filepos_t s;
00709   mp->msize(s);
00710 
00711   //number of complete events:
00712   const unsigned nEvents = s/eventLength_/8;
00713 
00714   if(nEvents==0){
00715     if(verbosity_) cout << "[Matacq " << now() << "] File is empty!" << endl;
00716     orbitStepMean_ = 0;
00717     return;
00718   }
00719 
00720   if(verbosity_>1) cout << "[Matacq " << now() << "] File size: " << s
00721                         << " Number of events: " << nEvents << endl;
00722   
00723   //position of last complete events:
00724   off_t last = (nEvents-1)*(off_t)eventLength_*8;
00725   mp->mseek(last, SEEK_SET, "Moving to beginning of last complete "
00726             "matacq event");
00727   if(!mp->mread((char*) data, headerSize, "Reading matacq header", true)){
00728     LogWarning("Matacq") << "Fast matacq event retrieval failure. "
00729       "Falling back to safe retrieval mode.";
00730     orbitStepMean_ = 0;
00731   }
00732   
00733   int32_t lastOrb = MatacqRawEvent::getOrbitId(data, headerSize);
00734   int32_t lastLen = MatacqRawEvent::getDccLen(data, headerSize);
00735 
00736   if(verbosity_>1) cout << "[Matacq " << now() << "] Last event orbit: " << lastOrb
00737                         << " last event length: " << lastLen << endl;
00738   
00739   //some consistency check
00740   if(lastLen!=eventLength_){
00741     LogWarning("Matacq")
00742       //throw cms::Exception("Matacq")
00743       << "Fast matacq event retrieval failure: it looks like "
00744       "the matacq file contains events of different sizes.";
00745       //      " Falling back to safe retrieval mode.";
00746     invalid_ = false; //true;
00747     orbitStepMean_ = 112; //0;
00748     return;
00749   }
00750 
00751   orbitStepMean_ = (lastOrb - firstOrbit_)/nEvents;
00752   
00753   if(verbosity_>1) cout << "[Matacq " << now() << "] Orbit step mean: " << orbitStepMean_
00754                         << "\n";
00755 
00756   invalid_ = false;
00757 }
00758 
00759 int64_t MatacqProducer::PosEstimator::pos(int orb) const{
00760   if(orb<firstOrbit_) return -1;
00761   uint64_t r = orbitStepMean_!=0?
00762     (((uint64_t)(orb-firstOrbit_))/orbitStepMean_)*eventLength_*8
00763     :0;
00764   if(verbosity_>2) cout << "[Matacq " << now() << "] Estimated Position for orbit  " << orb
00765                         << ": " << r << endl;
00766   return r;
00767 }
00768 
00769 MatacqProducer::~MatacqProducer(){
00770   mclose();
00771   timeval t;
00772   gettimeofday(&t, 0);
00773   if(logTiming_ && startTime_.tv_sec!=0){
00774     //not using logger, to allow timing with different logging options
00775     cout << "[Matacq " << now() << "] Time elapsed between first event and "
00776       "destruction of MatacqProducer: "
00777          << ((t.tv_sec-startTime_.tv_sec)*1.
00778              + (t.tv_usec-startTime_.tv_usec)*1.e-6) << "s\n";
00779   }
00780 }
00781 
00782 void MatacqProducer::loadOrbitOffset(){
00783   ifstream f(orbitOffsetFile_.c_str());
00784   if(f.bad()){
00785     throw cms::Exception("Matacq")
00786       << "Failed to open orbit ID correction file '"
00787       << orbitOffsetFile_ << "'\n";
00788   }
00789 
00790   cout << "[Matacq " << now() << "] "
00791        << "Offset to substract to Matacq events Orbit ID: \n"
00792        << "#Run Number\t Offset\n";
00793 
00794   int iline = 0;
00795   string s;
00796   stringstream buf;
00797   while(f.eof()){
00798     getline(f, s);
00799     ++iline;
00800     if(s[0]=='#'){//comment
00801       //skip line:
00802       f.ignore(numeric_limits<streamsize>::max(), '\n');
00803       continue;
00804     }
00805     buf.str("");
00806     buf << s;
00807     int run;
00808     int orbit;
00809     buf >> run;
00810     buf >> orbit;
00811     if(buf.bad()){
00812       throw cms::Exception("Matacq")
00813         << "Syntax error in Orbit offset file '"
00814         << orbitOffsetFile_ << "'";
00815     }
00816     cout << run << "\t" << orbit << "\n";
00817     orbitOffset_.insert(pair<int, int>(run, orbit));
00818   }
00819 }
00820 
00821 #ifdef USE_STORAGE_MANAGER
00822 bool MatacqProducer::mseek(filepos_t offset, int whence, const char* mess){
00823   if(0==inFile_.get()) return false;
00824   try{
00825     Storage::Relative wh;
00826     if(whence==SEEK_SET) wh = Storage::SET;
00827     else if(whence==SEEK_CUR) wh = Storage::CURRENT;
00828     else if(whence==SEEK_END) wh = Storage::END;
00829     else throw cms::Exception("Bug") << "Bug found in "
00830                                      << __FILE__ << ": "<< __LINE__ << "\n";
00831                    
00832     inFile_->position(offset, wh);
00833   } catch(cms::Exception& e){
00834     if(verbosity_){
00835       cout << "[Matacq " << now() << "] ";
00836       if(mess) cout << mess << ". ";
00837       cout << "Random access error on input matacq file. ";
00838       if(whence==SEEK_SET) cout << "Failed to seek absolute position " << offset;
00839       else if(whence==SEEK_CUR) cout << "Failed to move " << offset << " bytes forward";
00840       else if(whence==SEEK_END) cout << "Failed to seek position at " << offset << " bytes before end of file";
00841         cout << ". Reopening file. " << e.what() << "\n";
00842       mopen(inFileName_);
00843       return false;
00844     }
00845   }
00846   return true;
00847 }
00848 
00849 bool MatacqProducer::mtell(filepos_t& pos){
00850   if(0==inFile_.get()) return false;
00851   pos = inFile_->position();
00852   return true;
00853 }
00854 
00855 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
00856   if(0==inFile_.get()) return false;
00857   
00858   filepos_t pos;
00859   if(!mtell(pos)) return false;
00860 
00861   bool rc = false;
00862   try{
00863     rc =  (n==inFile_->xread(buf, n));
00864   } catch(cms::Exception& e){
00865     if(verbosity_){
00866       cout << "[Matacq " << now() << "] ";
00867       if(mess) cout << mess << ". ";
00868       cout << "Read failure from input matacq file: "
00869            << e.what() << "\n";
00870     }
00871     //recovering from error:
00872     mopen(inFileName_);
00873     mseek(pos);
00874     return false;
00875   }
00876   if(peek){//asked to restore original file position
00877     mseek(pos);
00878   }
00879   return rc;
00880 }
00881 
00882 bool MatacqProducer::msize(filepos_t& s){
00883   if(inFile_.get()==0) return false;
00884   s = inFile_.get()->size();
00885   return true;
00886 }
00887 
00888 bool MatacqProducer::mrewind(){
00889   Storage* file = inFile_.get();
00890   if(file==0) return false;
00891   try{
00892     file->rewind();
00893   } catch(cms::Exception e){
00894     if(verbosity_) cout << "Exception cautgh while rewinding file "
00895                         << inFileName_ << ": " << e.what() << ". "
00896                         << "File will be reopened.";
00897     return mopen(inFileName_);
00898   }
00899   return true;
00900 }
00901 
00902 bool MatacqProducer::mcheck(const std::string& name){
00903   return StorageFactory::get()->check(name);
00904 }
00905 
00906 bool MatacqProducer::mopen(const std::string& name){
00907   //close already opened file if any:
00908   mclose();
00909   
00910   try{
00911     inFile_
00912       = auto_ptr<Storage>(StorageFactory::get()->open(name,
00913                                                       IOFlags::OpenRead));
00914     inFileName_ = name;
00915   } catch(cms::Exception& e){
00916     LogWarning("Matacq") << e.what();
00917     inFile_.reset();
00918     inFileName_ = "";
00919     return false;
00920   }
00921   return true;
00922 }
00923 
00924 void MatacqProducer::mclose(){
00925   if(inFile_.get()!=0){
00926     inFile_->close();
00927     inFile_.reset();
00928   }
00929 }
00930 
00931 bool MatacqProducer::misOpened(){
00932   return inFile_.get()!=0;
00933 }
00934 
00935 bool MatacqProducer::meof(){
00936   if(inFile_.get()==0) return true;
00937   return inFile_->eof();
00938 }
00939 
00940 #else //USE_STORAGE_MANAGER not defined
00941 bool MatacqProducer::mseek(off_t offset, int whence, const char* mess){
00942   if(0==inFile_) return false;
00943   const int rc = fseeko(inFile_, offset, whence);
00944   if(rc!=0 && verbosity_){
00945     cout << "[Matacq " << now() << "] ";
00946     if(mess) cout << mess << ". ";
00947     cout << "Random access error on input matacq file. "
00948       "Rewind file.\n";
00949     mrewind();
00950   }
00951   return rc==0;
00952 }
00953 
00954 bool MatacqProducer::mtell(filepos_t& pos){
00955   if(0==inFile_) return false;
00956   pos = ftello(inFile_);
00957   return pos != -1;
00958     
00959 }
00960 
00961 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
00962   if(0==inFile_) return false;
00963   off_t pos = ftello(inFile_);
00964   bool rc = (pos!=-1) && (1==fread(buf, n, 1, inFile_));
00965   if(!rc){
00966     if(verbosity_){
00967       cout << "[Matacq " << now() << "] ";
00968       if(mess) cout << mess << ". ";
00969       cout << "Read failure from input matacq file.\n";
00970     }
00971     clearerr(inFile_);
00972   }
00973   if(peek || !rc){//need to restore file position
00974     if(0!=fseeko(inFile_, pos, SEEK_SET)){
00975       if(verbosity_){
00976         cout << "[Matacq " << now() << "] ";
00977         if(mess) cout << mess << ". ";
00978         cout << "Failed to restore file position of "
00979           "before read error. Rewind file.\n";
00980       }
00981       //rewind(inFile_.get());
00982       mrewind();
00983       lastOrb_ = 0;
00984     }
00985   }
00986   return rc;
00987 }
00988 
00989 bool MatacqProducer::msize(filepos_t& s){
00990   if(0==inFile_) return false;
00991   struct stat buf;
00992   if(0!=fstat(fileno(inFile_), &buf)){
00993     s = 0;
00994     return false;
00995   } else{
00996     s = buf.st_size;
00997     return true;
00998   }
00999 }
01000 
01001 bool MatacqProducer::mrewind(){
01002   if(0==inFile_) return false;
01003   clearerr(inFile_);
01004   return fseeko(inFile_, 0, SEEK_SET)!=0; 
01005 }
01006 
01007 bool MatacqProducer::mcheck(const std::string& name){
01008   struct stat dummy;
01009   return 0==stat(name.c_str(), &dummy);
01010 //   if(stat(name.c_str(), &dummy)==0){
01011 //     return true;
01012 //   } else{
01013 //     cout << "[Matacq " << now() << "] Failed to stat file '"
01014 //       << name.c_str() << "'. " 
01015 //       << "Error " << errno << ": " << strerror(errno) << "\n";
01016 //     return false;
01017 //   }
01018 }
01019 
01020 bool MatacqProducer::mopen(const std::string& name){
01021   if(inFile_!=0) mclose();
01022   inFile_ = fopen(name.c_str(), "r");
01023   if(inFile_!=0){
01024     inFileName_ = name;
01025     return true;
01026   } else{
01027     inFileName_ = "";
01028     return false;
01029   }
01030 }
01031 
01032 void MatacqProducer::mclose(){
01033   if(inFile_!=0) fclose(inFile_);
01034   inFile_ = 0;
01035 }
01036 
01037 bool MatacqProducer::misOpened(){
01038   return inFile_!=0;
01039 }
01040 
01041 bool MatacqProducer::meof(){
01042   if(0==inFile_) return true;
01043   return feof(inFile_)==0;
01044 }
01045 
01046 #endif //USE_STORAGE_MANAGER defined
01047 
01048 std::string MatacqProducer::runSubDir(uint32_t runNumber){
01049   int millions = runNumber / (1000*1000);
01050   int thousands = (runNumber-millions*1000*1000) / 1000;
01051   int units = runNumber-millions*1000*1000 - thousands*1000;
01052   return str(boost::format("%03d/%03d/%03d") % millions % thousands % units);
01053 }
01054 
01055 void MatacqProducer::newRun(int prevRun, int newRun){
01056     runNumber_ = newRun;
01057     eventSkipCounter_ = 0;
01058     logFile_ << "[" << now() << "] Event count for run "
01059              << runNumber_ << ": "
01060              << "total: " << stats_.nEvents << ", "
01061              << "Laser event with Matacq data: "
01062              << stats_.nLaserEventsWithMatacq << ", "
01063              << "Non laser event (according to DCC header) with Matacq data: "
01064              << stats_.nNonLaserEventsWithMatacq << "\n" << flush;
01065 
01066     stats_.nEvents = 0;
01067     stats_.nLaserEventsWithMatacq = 0;
01068     stats_.nNonLaserEventsWithMatacq = 0;
01069 
01070     
01071 }