12 #include <boost/algorithm/string.hpp>
13 #include <boost/format.hpp>
15 #include <iostream>
16 #include <memory>
18 #include <cstdio>
20 #include <fstream>
21 #include <iomanip>
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <csignal>
28 #include <sys/stat.h>
29 #include <glob.h>
31 using namespace std;
32 using namespace boost;
33 using namespace edm;
35 // #undef LogInfo
36 // #define LogInfo(a) cout << "INFO " << a << ": "
37 // #undef LogWarning
38 // #define LogWarning(a) cout << "WARN " << a << ": "
39 // #undef LogDebug
40 // #define LogDebug(a) cout << "DBG " << a << ": "
43 //verbose mode for matacq event retrieval debugging:
44 //static const bool searchDbg = false;
46 //laser freq is 1 every 112 orbit => >80 orbit
47 const int MatacqProducer::orbitTolerance_ = 80;
51 static std::string now(){
52  struct timeval t;
53  gettimeofday(&t, nullptr);
55  char buf[256];
56  strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
57  buf[sizeof(buf)-1] = 0;
59  stringstream buf2;
60  buf2 << buf << " " << ((t.tv_usec+500)/1000) << " ms";
62  return buf2.str();
63 }
67  fileNames_(params.getParameter<std::vector<std::string> >("fileNames")),
68  digiInstanceName_(params.getParameter<string>("digiInstanceName")),
69  rawInstanceName_(params.getParameter<string>("rawInstanceName")),
70  timing_(params.getUntrackedParameter<bool>("timing", false)),
71  disabled_(params.getParameter<bool>("disabled")),
72  verbosity_(params.getUntrackedParameter<int>("verbosity", 0)),
73  produceDigis_(params.getParameter<bool>("produceDigis")),
74  produceRaw_(params.getParameter<bool>("produceRaw")),
75  inputRawCollection_(params.getParameter<edm::InputTag>("inputRawCollection")),
76  mergeRaw_(params.getParameter<bool>("mergeRaw")),
77  ignoreTriggerType_(params.getParameter<bool>("ignoreTriggerType")),
78  matacq_(nullptr, 0),
79  inFile_(nullptr),
80  data_(bufferSize),
81  openedFileRunNumber_(0),
82  lastOrb_(0),
83  fastRetrievalThresh_(0),
84  orbitOffsetFile_(params.getUntrackedParameter<std::string>("orbitOffsetFile",
85  "")),
86  inFileName_(""),
87  stats_(stats_init),
88  logFileName_(params.getUntrackedParameter<std::string>("logFileName",
89  "matacqProducer.log")),
90  eventSkipCounter_(0),
91  onErrorDisablingEvtCnt_(params.getParameter<int>("onErrorDisablingEvtCnt")),
92  timeLogFile_(params.getUntrackedParameter<std::string>("timeLogFile", "")),
93  runNumber_(0)
94 {
95  if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer ctor" << endl;
97  gettimeofday(&timer_, nullptr);
99  if(!timeLogFile_.empty()){
101  if({
102  cout << "[LaserSorter " << now() << "] "
103  << "Failed to open file " << timeLogFile_ << " to log timing.\n";
104  logTiming_ = false;
105  } else{
106  logTiming_ = true;
107  }
108  }
112, ios::app | ios::out);
114  if(logFile_.bad()){
115  throw cms::Exception("FileOpen") << "Failed to open file "
116  << logFileName_ << " for logging.\n";
117  }
119  inputRawCollectionToken_ = consumes<FEDRawDataCollection>(params.getParameter<InputTag>("inputRawCollection"));
122  if(produceDigis_){
123  if(verbosity_>0) cout << "[Matacq " << now() << "] registering new "
124  "EcalMatacqDigiCollection product with instance name '"
125  << digiInstanceName_ << "'\n";
126  produces<EcalMatacqDigiCollection>(digiInstanceName_);
127  }
129  if(produceRaw_){
130  if(verbosity_>0) cout << "[Matacq " << now() << "] registering new FEDRawDataCollection "
131  "product with instance name '"
132  << rawInstanceName_ << "'\n";
133  produces<FEDRawDataCollection>(rawInstanceName_);
134  }
136  startTime_.tv_sec = startTime_.tv_usec = 0;
137  if(!orbitOffsetFile_.empty()){
138  doOrbitOffset_ = true;
139  loadOrbitOffset();
140  } else{
141  doOrbitOffset_ = false;
142  }
143  if(verbosity_>=4) cout << "[Matacq " << now() << "] exiting MatacqProducer ctor" << endl;
144 }
147 void
149  if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer::produce" << endl;
150  if(logTiming_){
151  timeval t;
152  gettimeofday(&t, nullptr);
154  timeLog_ << t.tv_sec << "."
155  << setfill('0') << setw(3) << (t.tv_usec+500)/1000 << setfill(' ')<< "\t"
156  << (t.tv_usec - timer_.tv_usec)*1.
157  + (t.tv_sec - timer_.tv_sec)*1.e6 << "\t";
158  timer_ = t;
159  }
161  if(startTime_.tv_sec==0) gettimeofday(&startTime_, nullptr);
162  ++stats_.nEvents;
163  if(disabled_) return;
164  const uint32_t runNumber = getRunNumber(event);
165  if(runNumber!=runNumber_){
166  newRun(runNumber_, runNumber);
167  }
168  addMatacqData(event);
170  if(logTiming_){
171  timeval t;
172  gettimeofday(&t, nullptr);
173  timeLog_ << (t.tv_usec - timer_.tv_usec)*1.
174  + (t.tv_sec - timer_.tv_sec)*1.e6 << "\n";
175  timer_ = t;
176  }
177 }
179 void
183  event.getByToken(inputRawCollectionToken_, sourceColl);
185  std::unique_ptr<FEDRawDataCollection> rawColl;
186  if(produceRaw_){
187  if(mergeRaw_){
188  rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
189  } else{
190  rawColl = std::make_unique<FEDRawDataCollection>();
191  }
192  }
194  auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
196  if(eventSkipCounter_==0){
197  if(sourceColl->FEDData(matacqFedId_).size()>4 && !produceRaw_){
198  //input raw data collection already contains matacqData
200  *digiColl);
201  } else{
202  bool isLaserEvent = (getCalibTriggerType(event) == laserType);
205  // cout << "---> " << (ignoreTriggerType_?"yes":"no") << " " << getCalibTriggerType(event) << endl;
207  if(isLaserEvent || ignoreTriggerType_){
209  const uint32_t runNumber = getRunNumber(event);
210  const uint32_t orbitId = getOrbitId(event);
212  LogInfo("Matacq") << "Run " << runNumber << "\t Orbit " << orbitId << "\n";
214  bool fileChange;
215  if(doOrbitOffset_){
216  map<uint32_t,uint32_t>::iterator it = orbitOffset_.find(runNumber);
217  if(it == orbitOffset_.end()){
218  LogWarning("Matacq") << "Orbit offset not found for run "
219  << runNumber
220  << ". No orbit correction will be applied.";
221  }
222  }
224  if(getMatacqFile(runNumber, orbitId, &fileChange)){
225  //matacq file retrieval succeeded
226  LogInfo("Matacq") << "Matacq data file found for "
227  << "run " << runNumber << " orbit " << orbitId;
228  if(getMatacqEvent(runNumber, orbitId, fileChange)){
229  if(produceDigis_){
230  formatter_.interpretRawData(matacq_, *digiColl);
231  }
232  if(produceRaw_){
233  uint32_t dataLen64 = matacq_.getParsedLen();
234  if(dataLen64 > bufferSize*8 || matacq_.getDccLen()!= dataLen64){
235  LogWarning("Matacq") << " Error in Matacq event fragment length! "
236  << "DCC len: " << matacq_.getDccLen()
237  << "*8 Bytes, Parsed len: "
238  << matacq_.getParsedLen() << "*8 Bytes. "
239  << "Matacq data will not be included for this event.\n";
240  } else{
241  rawColl->FEDData(matacqFedId_).resize(dataLen64*8);
242  copy(data_.begin(), data_.begin() + dataLen64*8,
243  rawColl->FEDData(matacqFedId_).data());
244  }
245  }
246  LogInfo("Matacq") << "Associating matacq data with orbit id "
247  << matacq_.getOrbitId()
248  << " to dcc event with orbit id "
249  << orbitId << std::endl;
250  if(isLaserEvent){
252  } else{
254  }
255  } else{
256  if(isLaserEvent){
257  LogWarning("Matacq") << "No matacq data found for laser event "
258  << "of run " << runNumber << " orbit "
259  << orbitId;
260  }
261  }
262  } else{
263  LogWarning("Matacq") << "No matacq file found for event "
264  <<;
265  }
266  }
267  }
268  if(eventSkipCounter_>0){ //error occured for this events
269  // and some events will be skipped following
270  // to this error.
271  LogInfo("Matacq") << " [" << now() << "] "
273  << " next events will be skipped, following to an "
274  << "error on the last processed event, "
275  << "which is expected to be persistant.";
276  }
277  } else{
279  }
281  if(produceRaw_){
282  if(verbosity_>1) cout << "[Matacq " << now() << "] "
283  << "Adding FEDRawDataCollection collection "
284  << " to event.\n";
285  event.put(std::move(rawColl), rawInstanceName_);
286  }
288  if(produceDigis_){
289  if(verbosity_>1) cout << "[Matacq " << now() << "] "
290  << "Adding EcalMatacqDigiCollection collection "
291  << " to event.\n";
292  event.put(std::move(digiColl), digiInstanceName_);
293  }
294 }
296 // #if 0
297 // bool
298 // MatacqProducer::getMatacqEvent(std::ifstream& f,
299 // uint32_t runNumber,
300 // uint32_t orbitId,
301 // bool doWrap,
302 // std::streamoff maxPos){
303 // bool found = false;
304 // streampos startPos = f.tellg();
306 // while(!f.eof()
307 // && !found
308 // && (maxPos<0 || f.tellg()<=maxPos)){
309 // const streamsize headerSize = 8*8;
310 //*)&data_[0], headerSize);
311 // if(f.eof()) break;
312 // int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
313 // uint32_t len = MatacqRawEvent::getDccLen(&data_[0], headerSize);
314 // uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
315 // // cout << "Matacq: orbit = " << orb
316 // // << " len = " << len
317 // // << " run = " << run << endl;
318 // if((abs(orb-(int32_t)orbitId) < orbitTolerance_)
319 // && (runNumber==0 || runNumber==run)){
320 // found = true;
321 // //reads the rest of the event:
322 // if(data_.size() < len*8){
323 // throw cms::Exception("Matacq") << "Buffer overflow";
324 // }
325 //*)&data_[0]+headerSize, len*8-headerSize);
326 // matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
327 // } else{
328 // //moves to next event:
329 // f.seekg(len*8 - headerSize, ios::cur);
330 // }
331 // }
333 // f.clear(); //clears eof error to allow seekg
334 // if(doWrap && !found){
335 // f.seekg(0, ios::beg);
336 // found = getMatacqEvent(f, runNumber, orbitId, false, startPos);
337 // }
338 // return found;
339 // }
340 //#endif
342 bool
344  int32_t orbitId,
345  bool fileChange){
346  filepos_t startPos;
347  if(!mtell(startPos)) return false;
349  int32_t startOrb = -1;
350  const size_t headerSize = 8*8;
351  if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
352  startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
353  if(startOrb<0) startOrb = 0;
354  } else{
355  if(verbosity_>2){
356  cout << "[Matacq " << now() << "] Failed to read matacq header. Moved to start of "
357  " the file.\n";
358  }
359  mrewind();
360  if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
361  startPos = 0;
362  startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
363  } else{
364  if(verbosity_>2) cout << "[Matacq " << now() << "] Looks like matacq file is empty"
365  << "\n";
366  return false;
367  }
368  }
370  if(verbosity_>2) cout << "[Matacq " << now() << "] Last read orbit: " << lastOrb_
371  << " looking for orbit " << orbitId
372  << ". Current file position: " << startPos
373  << " Orbit at current position: " << startOrb << "\n";
375  // f.clear();
376  bool didCoarseMove = false;
378  //FIXME: case where posEtim_.invalid() is false
379  if(!posEstim_.invalid()
380  && (abs(lastOrb_-orbitId) > fastRetrievalThresh_)){
381  filepos_t pos = posEstim_.pos(orbitId);
383  // struct stat st;
384  filepos_t fsize;
385  // if(0==stat(inFileName_.c_str(), &st)){
386  if(msize(fsize)){
387  // const int64_t fsize = st.st_size;
388  if(0!=posEstim_.eventLength() && pos > fsize){
389  //estimated position is beyong end of file
390  //-> move to beginning of last event:
391  int64_t evtSize = posEstim_.eventLength()*sizeof(uint64_t);
392  pos = ((int64_t)fsize/evtSize-1)*evtSize;
393  if(verbosity_>2){
394  cout << "[Matacq " << now() << "] Estimated position was beyond end of file. "
395  "Changed to " << pos << "\n";
396  }
397  }
398  } else{
399  LogWarning("Matacq") << "Failed to access file " << inFileName_ << ".";
400  }
401  if(pos>=0){
402  if(verbosity_>2) cout << "[Matacq " << now() << "] jumping to estimated position "
403  << pos << "\n";
404  mseek(pos, SEEK_SET, "Jumping to estimated event position");
405  if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
406  didCoarseMove = true;
407  } else{
408  //estimated position might have been beyond the end of the file,
409  //try, with original position:
410  didCoarseMove = false;
411  if(!mread((char*)&data_[0], headerSize, "Reading event header", true)){
412  return false;
413  }
414  }
415  } else{
416  if(verbosity_) cout << "[Matacq " << now() << "] Event orbit outside of orbit range "
417  "of matacq data file events\n";
418  return false;
419  }
420  }
422  int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
424  if(didCoarseMove){
425  //autoadjustement of threshold for coarse move:
426  if(abs(orb-orbitId) > fastRetrievalThresh_){
427  if(verbosity_>2) cout << "[Matacq " << now() << "] Fast retrieval threshold increased from "
429  fastRetrievalThresh_ = 2*abs(orb-orbitId);
430  if(verbosity_>2) cout << " to " << fastRetrievalThresh_ << "\n";
431  }
433  //if coarse move did not improve situation, rolls back:
434  if(startOrb > 0
435  && (abs(orb-orbitId) > abs(startOrb-orbitId))){
436  if(verbosity_>2) cout << "[Matacq " << now() << "] Estimation (-> orbit " << orb << ") "
437  "was worst than original position (-> orbit "
438  << startOrb
439  << "). Restoring position (" << startPos << ").\n";
440  mseek(startPos, SEEK_SET);
441  mread((char*)&data_[0], headerSize, "Reading event header", true);
442  orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
443  }
444  }
446  bool searchBackward = (orb>orbitId)?true:false;
447  //BEWARE: len must be signed, because we are using latter in the code (-len)
448  //expression
449  int len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
451  if(len==0){
452  cout << "[Matacq " << now() << "] read DCC length is null! Cancels matacq event search "
453  << " and move matacq file pointer to beginning of the file. "
454  << "(" << __FILE__ << ":" << __LINE__ << ")."
455  << "\n";
456  //rewind(f);
457  mrewind();
458  return false;
459  }
461  enum state_t { searching, found, failed } state = searching;
463  while(state == searching){
464  orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
465  len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
466  uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
467  if(verbosity_>3){
468  filepos_t pos = -1;
469  mtell(pos);
470  cout << "[Matacq " << now() << "] Header read at file position "
471  << pos
472  << ": orbit = " << orb
473  << " len = " << len << "x8 Byte"
474  << " run = " << run << "\n";
475  }
476  if((abs(orb-orbitId) < orbitTolerance_)
477  && (runNumber==0 || runNumber==run)){
478  state = found;
479  lastOrb_ = orb;
480  //reads the rest of the event:
481  if((int)data_.size() < len*8){
482  throw cms::Exception("Matacq") << "Buffer overflow";
483  }
484  if(verbosity_>2) cout << "[Matacq " << now() << "] Event found. Reading "
485  " matacq event." << "\n";
486  if(!mread((char*)&data_[0], len*8, "Reading matacq event")){
487  if(verbosity_>2) cout << "[Matacq " << now() << "] Failed to read matacq event."
488  << "\n";
489  state = failed;
490  }
491  matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
492  } else {
493  if((searchBackward && (orb < orbitId))
494  || (!searchBackward && (orb > orbitId))){ //search ended
495  lastOrb_ = orb;
496  state = failed;
497  if(verbosity_>2) cout << "[Matacq " << now()
498  << "] No matacq data found for run " << run
499  << ", orbit ID " << orbitId << "." << "\n";
500  } else{
501  off_t offset = (searchBackward?-len:len)*8;
502  lastOrb_ = orb;
503  if(verbosity_>3){
504  cout << "[Matacq " << now() << "] In matacq file, moving "
505  << abs(offset) << " byte " << (offset>0?"forward":"backward")
506  << ".\n";
507  }
509  if(mseek(offset, SEEK_CUR,
510  (searchBackward?"Moving to previous event":
511  "Moving to next event"))
512  && mread((char*)&data_[0], headerSize, "Reading event header",
513  true)){
514  } else{
515  if(!searchBackward) mseek(-len*8, SEEK_CUR,
516  "Moving to start of last complete event");
517  state = failed;
518  }
519  }
520  }
521  }
523  if(state==found){
524  filepos_t pos = -1;
525  filepos_t fsize = -1;
526  mtell(pos);
527  msize(fsize);
528  if(pos==fsize-1){ //last byte.
529  if(verbosity_>2){
530  cout << "[Matacq " << now() << "] Event found was at the end of the file. Moving "
531  "stream position to beginning of this event."
532  << "\n";
533  }
534  mseek(-(int)len*8-1, SEEK_CUR,
535  "Moving to beginning of last matacq event");
536  }
537  }
538  return (state==found);
539 }
542 bool
543 MatacqProducer::getMatacqFile(uint32_t runNumber, uint32_t orbitId,
544  bool* fileChange){
545  if(openedFileRunNumber_ != 0
546  && openedFileRunNumber_ == runNumber){
547  uint32_t firstOrb, lastOrb;
548  bool goodRange = getOrbitRange(firstOrb, lastOrb);
549  // if(orbitId < firstOrb || orbitId > lastOrb) continue;
550  if(goodRange && firstOrb <= orbitId && orbitId <= lastOrb){
551  if(fileChange!=nullptr) *fileChange = false;
552  return misOpened();
553  }
554  }
556  if(fileNames_.empty()) return false;
558  const string runNumberFormat = "%08d{,_*}";
559  string sRunNumber = str(boost::format(runNumberFormat) % runNumber);
560  //cout << "Run number string: " << sRunNumber << "\n";
561  bool found = false;
562  string fname;
563  uint32_t maxOrb = 0;
564  //we make two iterations to handle the case where the event is procesed
565  //before the matacq data are available. In such case we would have
566  //orbitId > maxOrb (maxOrb: orbit of last written matacq event)
567  for(int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry){
568  if(itry > 0){
569  int n_sec = 1;
570  std::cout << "[Matacq " << now() << "] Event orbit id (" << orbitId << ") goes "
571  "beyound the range of available one. Waiting for " << n_sec << " seconds in case "
572  "it was not written yet to disk.";
573  sleep(n_sec);
574  }
576  for(unsigned i=0; i < fileNames_.size() && !found; ++i){
577  fname = fileNames_[i];
578  boost::algorithm::replace_all(fname, "%run_subdir%",
579  runSubDir(runNumber));
580  boost::algorithm::replace_all(fname, "%run_number%", sRunNumber);
582  glob_t g;
583  int rc = glob(fname.c_str(), GLOB_BRACE, nullptr, &g);
584  if(rc){
585  if(verbosity_ > 1){
586  switch(rc){
587  case GLOB_NOSPACE:
588  std::cout << "[Matacq " << now() << "] Running out of memory while calling glob function to look for matacq file paths\n";
589  break;
590  case GLOB_ABORTED:
591  std::cout << "[Matacq " << now() << "] Read error while calling glob function to look for matacq file paths\n";
592  break;
593  case GLOB_NOMATCH:
594  //ok. No message to report.
595  break;
596  }
597  continue;
598  }
599  } //rc
600  for(unsigned iglob = 0; iglob < g.gl_pathc; ++iglob){
601  char* thePath = g.gl_pathv[iglob];
602  //FIXME: add sanity check on the path
603  static std::atomic<int> nOpenErrors {0};
604  const int maxOpenErrors = 50;
605  if(!mopen(thePath) && nOpenErrors < maxOpenErrors){
606  std::cout << "[Matacq " << now() << "] Failed to open file " << thePath;
607  ++nOpenErrors;
608  if(nOpenErrors == maxOpenErrors){
609  std::cout << nOpenErrors << "This is the " << maxOpenErrors
610  << "th occurence of this error. Report of this error is now disabled.\n";
611  } else{
612  std::cout << "\n";
613  }
614  }
615  uint32_t firstOrb;
616  uint32_t lastOrb;
617  bool goodRange = getOrbitRange(firstOrb, lastOrb);
618  if(goodRange && lastOrb > maxOrb) maxOrb = lastOrb;
619  if(goodRange && firstOrb <= orbitId && orbitId <= lastOrb){
620  found = true;
621  //continue;
622  fname = thePath;
623  if(verbosity_ > 1) std::cout << "[Matacq " << now() << "] Switching to file " << fname << "\n";
624  break;
625  }
626  } //next iglob
627  globfree(&g);
628  }//next filenames
629  } //next itry
631  if(found){
632  LogInfo("Matacq") << "Uses matacq data file: '" << fname << "'\n";
633  } else{
634  if(verbosity_>=0) cout << "[Matacq " << now() << "] no matacq file found "
635  "for run " << runNumber << "\n";
638  if(fileChange!=nullptr) *fileChange = false;
639  return false;
640  }
642  if(found){
644  lastOrb_ = 0;
645  posEstim_.init(this);
646  if(fileChange!=nullptr) *fileChange = true;
647  return true;
648  } else{
649  return false;
650  }
651 }
655  return;
656 }
659  //on CVS HEAD (June 4, 08), class Event has a method orbitNumber()
660  //we could use here. The code would be shorten to:
661  //return ev.orbitNumber();
662  //we have to deal with what we have in current CMSSW releases:
665  if(!(rawdata.isValid())){
666  throw cms::Exception("NotFound")
667  << "No FED raw data collection found. ECAL raw data are "
668  "required to retrieve the orbit ID";
669  }
671  int orbit = 0;
672  for(int id=601; id<=654; ++id){
673  if(!FEDNumbering::inRange(id)) continue;
674  const FEDRawData& data = rawdata->FEDData(id);
675  const int orbitIdOffset64 = 3;
676  if(data.size()>=8*(orbitIdOffset64+1)){//orbit id is in 4th 64-bit word
677  const unsigned char* pOrbit = + orbitIdOffset64*8;
678  int thisOrbit = pOrbit[0]
679  | (pOrbit[1] <<8)
680  | (pOrbit[2] <<16)
681  | (pOrbit[3] <<24);
682  if(orbit!=0 && thisOrbit!=0 && abs(orbit-thisOrbit)>orbitTolerance_){
683  //throw cms::Exception("EventCorruption")
684  // << "Orbit ID inconsitency in DCC headers";
685  LogWarning("EventCorruption")
686  << "Orbit ID inconsitency in DCC headers";
687  orbit = 0;
688  break;
689  }
690  if(thisOrbit!=0) orbit = thisOrbit;
691  }
692  }
694  if(orbit==0){
695  // throw cms::Exception("NotFound")
696  // << "Failed to retrieve orbit ID of event "<<;
697  LogWarning("NotFound") << "Failed to retrieve orbit ID of event "
698  <<;
699  }
700  return orbit;
701 }
706  if(!(rawdata.isValid())){
707  throw cms::Exception("NotFound")
708  << "No FED raw data collection found. ECAL raw data are "
709  "required to retrieve the trigger type";
710  }
713  for(int id=601; id<=654; ++id){
714  if(!FEDNumbering::inRange(id)) continue;
715  const FEDRawData& data = rawdata->FEDData(id);
716  const int detailedTrigger32 = 5;
717  if(data.size()>=4*(detailedTrigger32+1)){
718  const unsigned char* pTType = + detailedTrigger32*4;
719  int tType = pTType[1] & 0x7;
720  stat.add(tType);
721  }
722  }
723  double p;
724  int tType = stat.result(&p);
725  if(p<0){
726  //throw cms::Exception("NotFound") << "No ECAL DCC data found\n";
727  LogWarning("NotFound") << "No ECAL DCC data found\n";
728  tType = -1;
729  }
730  if(p<.8){
731  //throw cms::Exception("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
732  LogWarning("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
733  tType = -1;
734  }
735  return tType;
736 }
739  mp->mrewind();
741  const size_t headerSize = 8*8;
742  unsigned char data[headerSize];
743  if(!mp->mread((char*)data, headerSize)){
744  if(verbosity_) cout << "[Matacq " << now() << "] reached end of file!\n";
745  firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
746  return;
747  } else{
748  firstOrbit_ = MatacqRawEvent::getOrbitId(data, headerSize);
749  eventLength_ = MatacqRawEvent::getDccLen(data, headerSize);
750  if(verbosity_>1) cout << "[Matacq " << now() << "] First event orbit: " << firstOrbit_
751  << " event length: " << eventLength_
752  << "*8 byte\n";
753  }
755  mp->mrewind();
757  if(eventLength_==0){
758  if(verbosity_) cout << "[Matacq " << now() << "] event length is null!" << endl;
759  return;
760  }
762  filepos_t s;
763  mp->msize(s);
765  //number of complete events:
766  const unsigned nEvents = s/eventLength_/8;
768  if(nEvents==0){
769  if(verbosity_) cout << "[Matacq " << now() << "] File is empty!" << endl;
770  orbitStepMean_ = 0;
771  return;
772  }
774  if(verbosity_>1) cout << "[Matacq " << now() << "] File size: " << s
775  << " Number of events: " << nEvents << endl;
777  //position of last complete events:
778  off_t last = (nEvents-1)*(off_t)eventLength_*8;
779  mp->mseek(last, SEEK_SET, "Moving to beginning of last complete "
780  "matacq event");
781  if(!mp->mread((char*) data, headerSize, "Reading matacq header", true)){
782  LogWarning("Matacq") << "Fast matacq event retrieval failure. "
783  "Falling back to safe retrieval mode.";
784  orbitStepMean_ = 0;
785  }
787  int32_t lastOrb = MatacqRawEvent::getOrbitId(data, headerSize);
788  int32_t lastLen = MatacqRawEvent::getDccLen(data, headerSize);
790  if(verbosity_>1) cout << "[Matacq " << now() << "] Last event orbit: " << lastOrb
791  << " last event length: " << lastLen << endl;
793  //some consistency check
794  if(lastLen!=eventLength_){
795  LogWarning("Matacq")
796  //throw cms::Exception("Matacq")
797  << "Fast matacq event retrieval failure: it looks like "
798  "the matacq file contains events of different sizes.";
799  // " Falling back to safe retrieval mode.";
800  invalid_ = false; //true;
801  orbitStepMean_ = 112; //0;
802  return;
803  }
805  orbitStepMean_ = (lastOrb - firstOrbit_)/nEvents;
807  if(verbosity_>1) cout << "[Matacq " << now() << "] Orbit step mean: " << orbitStepMean_
808  << "\n";
810  invalid_ = false;
811 }
813 int64_t MatacqProducer::PosEstimator::pos(int orb) const{
814  if(orb<firstOrbit_) return -1;
815  uint64_t r = orbitStepMean_!=0?
816  (((uint64_t)(orb-firstOrbit_))/orbitStepMean_)*eventLength_*8
817  :0;
818  if(verbosity_>2) cout << "[Matacq " << now() << "] Estimated Position for orbit " << orb
819  << ": " << r << endl;
820  return r;
821 }
824  mclose();
825  timeval t;
826  gettimeofday(&t, nullptr);
827  if(logTiming_ && startTime_.tv_sec!=0){
828  //not using logger, to allow timing with different logging options
829  cout << "[Matacq " << now() << "] Time elapsed between first event and "
830  "destruction of MatacqProducer: "
831  << ((t.tv_sec-startTime_.tv_sec)*1.
832  + (t.tv_usec-startTime_.tv_usec)*1.e-6) << "s\n";
833  }
834 }
837  std::ifstream f(orbitOffsetFile_.c_str());
838  if(f.bad()){
839  throw cms::Exception("Matacq")
840  << "Failed to open orbit ID correction file '"
841  << orbitOffsetFile_ << "'\n";
842  }
844  cout << "[Matacq " << now() << "] "
845  << "Offset to substract to Matacq events Orbit ID: \n"
846  << "#Run Number\t Offset\n";
848  int iline = 0;
849  string s;
850  stringstream buf;
851  while(f.eof()){
852  getline(f, s);
853  ++iline;
854  if(s[0]=='#'){//comment
855  //skip line:
856  f.ignore(numeric_limits<streamsize>::max(), '\n');
857  continue;
858  }
859  buf.str("");
860  buf << s;
861  int run;
862  int orbit;
863  buf >> run;
864  buf >> orbit;
865  if(buf.bad()){
866  throw cms::Exception("Matacq")
867  << "Syntax error in Orbit offset file '"
868  << orbitOffsetFile_ << "'";
869  }
870  cout << run << "\t" << orbit << "\n";
871  orbitOffset_.insert(pair<int, int>(run, orbit));
872  }
873 }
876 bool MatacqProducer::mseek(filepos_t offset, int whence, const char* mess){
877  if(0==inFile_.get()) return false;
878  try{
880  if(whence==SEEK_SET) wh = Storage::SET;
881  else if(whence==SEEK_CUR) wh = Storage::CURRENT;
882  else if(whence==SEEK_END) wh = Storage::END;
883  else throw cms::Exception("Bug") << "Bug found in "
884  << __FILE__ << ": "<< __LINE__ << "\n";
886  inFile_->position(offset, wh);
887  } catch(cms::Exception& e){
888  if(verbosity_){
889  cout << "[Matacq " << now() << "] ";
890  if(mess) cout << mess << ". ";
891  cout << "Random access error on input matacq file. ";
892  if(whence==SEEK_SET) cout << "Failed to seek absolute position " << offset;
893  else if(whence==SEEK_CUR) cout << "Failed to move " << offset << " bytes forward";
894  else if(whence==SEEK_END) cout << "Failed to seek position at " << offset << " bytes before end of file";
895  cout << ". Reopening file. " << e.what() << "\n";
897  return false;
898  }
899  }
900  return true;
901 }
904  if(0==inFile_.get()) return false;
905  pos = inFile_->position();
906  return true;
907 }
909 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
910  if(0==inFile_.get()) return false;
912  filepos_t pos = -1;
913  if(!mtell(pos)) return false;
915  bool rc = false;
916  try{
917  rc = (n==inFile_->xread(buf, n));
918  } catch(cms::Exception& e){
919  if(verbosity_){
920  cout << "[Matacq " << now() << "] ";
921  if(mess) cout << mess << ". ";
922  cout << "Read failure from input matacq file: "
923  << e.what() << "\n";
924  }
925  //recovering from error:
927  mseek(pos);
928  return false;
929  }
930  if(peek){//asked to restore original file position
931  mseek(pos);
932  }
933  return rc;
934 }
937  if(inFile_.get()==0) return false;
938  s = inFile_.get()->size();
939  return true;
940 }
943  Storage* file = inFile_.get();
944  if(file==0) return false;
945  try{
946  file->rewind();
947  } catch(cms::Exception e){
948  if(verbosity_) cout << "Exception cautgh while rewinding file "
949  << inFileName_ << ": " << e.what() << ". "
950  << "File will be reopened.";
951  return mopen(inFileName_);
952  }
953  return true;
954 }
957  return StorageFactory::get()->check(name);
958 }
961  //close already opened file if any:
962  mclose();
964  try{
965  inFile_
966  = auto_ptr<Storage>(StorageFactory::get()->open(name,
968  inFileName_ = name;
969  } catch(cms::Exception& e){
970  LogWarning("Matacq") << e.what();
971  inFile_.reset();
972  inFileName_ = "";
973  return false;
974  }
975  return true;
976 }
979  if(inFile_.get()!=0){
980  inFile_->close();
981  inFile_.reset();
982  }
983 }
986  return inFile_.get()!=0;
987 }
989 bool MatacqProducer::meof(){
990  if(inFile_.get()==0) return true;
991  return inFile_->eof();
992 }
994 #else //USE_STORAGE_MANAGER not defined
995 bool MatacqProducer::mseek(off_t offset, int whence, const char* mess){
996  if(nullptr==inFile_) return false;
997  const int rc = fseeko(inFile_, offset, whence);
998  if(rc!=0 && verbosity_){
999  cout << "[Matacq " << now() << "] ";
1000  if(mess) cout << mess << ". ";
1001  cout << "Random access error on input matacq file. "
1002  "Rewind file.\n";
1003  mrewind();
1004  }
1005  return rc==0;
1006 }
1009  if(nullptr==inFile_) return false;
1010  pos = ftello(inFile_);
1011  return pos != -1;
1013 }
1015 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
1016  if(nullptr==inFile_) return false;
1017  off_t pos = ftello(inFile_);
1018  bool rc = (pos!=-1) && (1==fread(buf, n, 1, inFile_));
1019  if(!rc){
1020  if(verbosity_){
1021  cout << "[Matacq " << now() << "] ";
1022  if(mess) cout << mess << ". ";
1023  cout << "Read failure from input matacq file.\n";
1024  }
1025  clearerr(inFile_);
1026  }
1027  if(peek || !rc){//need to restore file position
1028  if(0!=fseeko(inFile_, pos, SEEK_SET)){
1029  if(verbosity_){
1030  cout << "[Matacq " << now() << "] ";
1031  if(mess) cout << mess << ". ";
1032  cout << "Failed to restore file position of "
1033  "before read error. Rewind file.\n";
1034  }
1035  //rewind(inFile_.get());
1036  mrewind();
1037  lastOrb_ = 0;
1038  }
1039  }
1040  return rc;
1041 }
1044  if(nullptr==inFile_) return false;
1045  struct stat buf;
1046  if(0!=fstat(fileno(inFile_), &buf)){
1047  s = 0;
1048  return false;
1049  } else{
1050  s = buf.st_size;
1051  return true;
1052  }
1053 }
1056  if(nullptr==inFile_) return false;
1057  clearerr(inFile_);
1058  return fseeko(inFile_, 0, SEEK_SET)!=0;
1059 }
1062  struct stat dummy;
1063  return 0==stat(name.c_str(), &dummy);
1064 // if(stat(name.c_str(), &dummy)==0){
1065 // return true;
1066 // } else{
1067 // cout << "[Matacq " << now() << "] Failed to stat file '"
1068 // << name.c_str() << "'. "
1069 // << "Error " << errno << ": " << strerror(errno) << "\n";
1070 // return false;
1071 // }
1072 }
1075  if(inFile_!=nullptr) mclose();
1076  inFile_ = fopen(name.c_str(), "r");
1077  if(inFile_!=nullptr){
1078  inFileName_ = name;
1079  return true;
1080  } else{
1081  inFileName_ = "";
1082  return false;
1083  }
1084 }
1087  if(inFile_!=nullptr) fclose(inFile_);
1088  inFile_ = nullptr;
1089 }
1092  return inFile_!=nullptr;
1093 }
1096  if(nullptr==inFile_) return true;
1097  return feof(inFile_)==0;
1098 }
1100 #endif //USE_STORAGE_MANAGER defined
1103  int millions = runNumber / (1000*1000);
1104  int thousands = (runNumber-millions*1000*1000) / 1000;
1105  int units = runNumber-millions*1000*1000 - thousands*1000;
1106  return str(boost::format("%03d/%03d/%03d") % millions % thousands % units);
1107 }
1109 void MatacqProducer::newRun(int prevRun, int newRun){
1110  runNumber_ = newRun;
1111  eventSkipCounter_ = 0;
1112  logFile_ << "[" << now() << "] Event count for run "
1113  << runNumber_ << ": "
1114  << "total: " << stats_.nEvents << ", "
1115  << "Laser event with Matacq data: "
1116  << stats_.nLaserEventsWithMatacq << ", "
1117  << "Non laser event (according to DCC header) with Matacq data: "
1118  << stats_.nNonLaserEventsWithMatacq << "\n" << flush;
1120  stats_.nEvents = 0;
1125 }
1127 bool MatacqProducer::getOrbitRange(uint32_t& firstOrb, uint32_t& lastOrb){
1128  filepos_t pos = -1;
1129  filepos_t fsize = -1;
1130  mtell(pos);
1131  msize(fsize);
1132  const unsigned headerSize = 8*8;
1133  unsigned char header[headerSize];
1134  //FIXME: Don't we need here to rewind?
1135  mseek(0);
1136  if(!mread((char*)header, headerSize, nullptr, false)) return false;
1137  firstOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1138  int len = (int)MatacqRawEvent::getDccLen(header, headerSize);
1139  //number of complete events. If last event is partially written,
1140  //it won't be included in the count.
1141  unsigned nEvts = fsize / (len*64);
1142  //Position of last complete event:
1143  filepos_t lastEvtPos = (nEvts - 1) * len * 64;
1144  mseek(lastEvtPos);
1145  mread((char*)header, headerSize, nullptr, false);
1146  lastOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1148  //restore file position:
1149  mseek(pos);
1151  return true;
1152 }
